Back | Home
الـ Path الحالي: /home/picotech/domains/instantly.picotech.app/public_html/public/uploads/../uploads/../../../../instantly.picotech.app/homes/../../wa.picotech.app/public_html/node_modules/path-exists/../duplexify/./../@protobufjs/../cheerio/../@socket.io/../thread-stream/.husky/./..
الملفات الموجودة في هذا الـ Path:
.
..
.github
.husky
.taprc
LICENSE
README.md
bench.js
index.d.ts
index.js
lib
package.json
test
مشاهدة ملف: index.js
'use strict'
const { EventEmitter } = require('events')
const { Worker } = require('worker_threads')
const { join } = require('path')
const { pathToFileURL } = require('url')
const { wait } = require('./lib/wait')
const {
WRITE_INDEX,
READ_INDEX
} = require('./lib/indexes')
const buffer = require('buffer')
const assert = require('assert')
const kImpl = Symbol('kImpl')
// V8 limit for string size
const MAX_STRING = buffer.constants.MAX_STRING_LENGTH
class FakeWeakRef {
constructor (value) {
this._value = value
}
deref () {
return this._value
}
}
const FinalizationRegistry = global.FinalizationRegistry || class FakeFinalizationRegistry {
register () {}
unregister () {}
}
const WeakRef = global.WeakRef || FakeWeakRef
const registry = new FinalizationRegistry((worker) => {
if (worker.exited) {
return
}
worker.terminate()
})
function createWorker (stream, opts) {
const { filename, workerData } = opts
const bundlerOverrides = '__bundlerPathsOverrides' in globalThis ? globalThis.__bundlerPathsOverrides : {}
const toExecute = bundlerOverrides['thread-stream-worker'] || join(__dirname, 'lib', 'worker.js')
const worker = new Worker(toExecute, {
...opts.workerOpts,
workerData: {
filename: filename.indexOf('file://') === 0
? filename
: pathToFileURL(filename).href,
dataBuf: stream[kImpl].dataBuf,
stateBuf: stream[kImpl].stateBuf,
workerData
}
})
// We keep a strong reference for now,
// we need to start writing first
worker.stream = new FakeWeakRef(stream)
worker.on('message', onWorkerMessage)
worker.on('exit', onWorkerExit)
registry.register(stream, worker)
return worker
}
function drain (stream) {
assert(!stream[kImpl].sync)
if (stream[kImpl].needDrain) {
stream[kImpl].needDrain = false
stream.emit('drain')
}
}
function nextFlush (stream) {
const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX)
let leftover = stream[kImpl].data.length - writeIndex
if (leftover > 0) {
if (stream[kImpl].buf.length === 0) {
stream[kImpl].flushing = false
if (stream[kImpl].ending) {
end(stream)
} else if (stream[kImpl].needDrain) {
process.nextTick(drain, stream)
}
return
}
let toWrite = stream[kImpl].buf.slice(0, leftover)
let toWriteBytes = Buffer.byteLength(toWrite)
if (toWriteBytes <= leftover) {
stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
// process._rawDebug('writing ' + toWrite.length)
write(stream, toWrite, nextFlush.bind(null, stream))
} else {
// multi-byte utf-8
stream.flush(() => {
// err is already handled in flush()
if (stream.destroyed) {
return
}
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
// Find a toWrite length that fits the buffer
// it must exists as the buffer is at least 4 bytes length
// and the max utf-8 length for a char is 4 bytes.
while (toWriteBytes > stream[kImpl].data.length) {
leftover = leftover / 2
toWrite = stream[kImpl].buf.slice(0, leftover)
toWriteBytes = Buffer.byteLength(toWrite)
}
stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
write(stream, toWrite, nextFlush.bind(null, stream))
})
}
} else if (leftover === 0) {
if (writeIndex === 0 && stream[kImpl].buf.length === 0) {
// we had a flushSync in the meanwhile
return
}
stream.flush(() => {
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
nextFlush(stream)
})
} else {
// This should never happen
throw new Error('overwritten')
}
}
function onWorkerMessage (msg) {
const stream = this.stream.deref()
if (stream === undefined) {
this.exited = true
// Terminate the worker.
this.terminate()
return
}
switch (msg.code) {
case 'READY':
// Replace the FakeWeakRef with a
// proper one.
this.stream = new WeakRef(stream)
stream.flush(() => {
stream[kImpl].ready = true
stream.emit('ready')
})
break
case 'ERROR':
destroy(stream, msg.err)
break
default:
throw new Error('this should not happen: ' + msg.code)
}
}
function onWorkerExit (code) {
const stream = this.stream.deref()
if (stream === undefined) {
// Nothing to do, the worker already exit
return
}
registry.unregister(stream)
stream.worker.exited = true
stream.worker.off('exit', onWorkerExit)
destroy(stream, code !== 0 ? new Error('The worker thread exited') : null)
}
class ThreadStream extends EventEmitter {
constructor (opts = {}) {
super()
if (opts.bufferSize < 4) {
throw new Error('bufferSize must at least fit a 4-byte utf-8 char')
}
this[kImpl] = {}
this[kImpl].stateBuf = new SharedArrayBuffer(128)
this[kImpl].state = new Int32Array(this[kImpl].stateBuf)
this[kImpl].dataBuf = new SharedArrayBuffer(opts.bufferSize || 4 * 1024 * 1024)
this[kImpl].data = Buffer.from(this[kImpl].dataBuf)
this[kImpl].sync = opts.sync || false
this[kImpl].ending = false
this[kImpl].ended = false
this[kImpl].needDrain = false
this[kImpl].destroyed = false
this[kImpl].flushing = false
this[kImpl].ready = false
this[kImpl].finished = false
this[kImpl].errored = null
this[kImpl].closed = false
this[kImpl].buf = ''
// TODO (fix): Make private?
this.worker = createWorker(this, opts) // TODO (fix): make private
}
write (data) {
if (this[kImpl].destroyed) {
throw new Error('the worker has exited')
}
if (this[kImpl].ending) {
throw new Error('the worker is ending')
}
if (this[kImpl].flushing && this[kImpl].buf.length + data.length >= MAX_STRING) {
try {
writeSync(this)
this[kImpl].flushing = true
} catch (err) {
destroy(this, err)
return false
}
}
this[kImpl].buf += data
if (this[kImpl].sync) {
try {
writeSync(this)
return true
} catch (err) {
destroy(this, err)
return false
}
}
if (!this[kImpl].flushing) {
this[kImpl].flushing = true
setImmediate(nextFlush, this)
}
this[kImpl].needDrain = this[kImpl].data.length - this[kImpl].buf.length - Atomics.load(this[kImpl].state, WRITE_INDEX) <= 0
return !this[kImpl].needDrain
}
end () {
if (this[kImpl].destroyed) {
return
}
this[kImpl].ending = true
end(this)
}
flush (cb) {
if (this[kImpl].destroyed) {
if (typeof cb === 'function') {
process.nextTick(cb, new Error('the worker has exited'))
}
return
}
// TODO write all .buf
const writeIndex = Atomics.load(this[kImpl].state, WRITE_INDEX)
// process._rawDebug(`(flush) readIndex (${Atomics.load(this.state, READ_INDEX)}) writeIndex (${Atomics.load(this.state, WRITE_INDEX)})`)
wait(this[kImpl].state, READ_INDEX, writeIndex, Infinity, (err, res) => {
if (err) {
destroy(this, err)
process.nextTick(cb, err)
return
}
if (res === 'not-equal') {
// TODO handle deadlock
this.flush(cb)
return
}
process.nextTick(cb)
})
}
flushSync () {
if (this[kImpl].destroyed) {
return
}
writeSync(this)
flushSync(this)
}
unref () {
this.worker.unref()
}
ref () {
this.worker.ref()
}
get ready () {
return this[kImpl].ready
}
get destroyed () {
return this[kImpl].destroyed
}
get closed () {
return this[kImpl].closed
}
get writable () {
return !this[kImpl].destroyed && !this[kImpl].ending
}
get writableEnded () {
return this[kImpl].ending
}
get writableFinished () {
return this[kImpl].finished
}
get writableNeedDrain () {
return this[kImpl].needDrain
}
get writableObjectMode () {
return false
}
get writableErrored () {
return this[kImpl].errored
}
}
function destroy (stream, err) {
if (stream[kImpl].destroyed) {
return
}
stream[kImpl].destroyed = true
if (err) {
stream[kImpl].errored = err
stream.emit('error', err)
}
if (!stream.worker.exited) {
stream.worker.terminate()
.catch(() => {})
.then(() => {
stream[kImpl].closed = true
stream.emit('close')
})
} else {
setImmediate(() => {
stream[kImpl].closed = true
stream.emit('close')
})
}
}
function write (stream, data, cb) {
// data is smaller than the shared buffer length
const current = Atomics.load(stream[kImpl].state, WRITE_INDEX)
const length = Buffer.byteLength(data)
stream[kImpl].data.write(data, current)
Atomics.store(stream[kImpl].state, WRITE_INDEX, current + length)
Atomics.notify(stream[kImpl].state, WRITE_INDEX)
cb()
return true
}
function end (stream) {
if (stream[kImpl].ended || !stream[kImpl].ending || stream[kImpl].flushing) {
return
}
stream[kImpl].ended = true
try {
stream.flushSync()
let readIndex = Atomics.load(stream[kImpl].state, READ_INDEX)
// process._rawDebug('writing index')
Atomics.store(stream[kImpl].state, WRITE_INDEX, -1)
// process._rawDebug(`(end) readIndex (${Atomics.load(stream.state, READ_INDEX)}) writeIndex (${Atomics.load(stream.state, WRITE_INDEX)})`)
Atomics.notify(stream[kImpl].state, WRITE_INDEX)
// Wait for the process to complete
let spins = 0
while (readIndex !== -1) {
// process._rawDebug(`read = ${read}`)
Atomics.wait(stream[kImpl].state, READ_INDEX, readIndex, 1000)
readIndex = Atomics.load(stream[kImpl].state, READ_INDEX)
if (readIndex === -2) {
throw new Error('end() failed')
}
if (++spins === 10) {
throw new Error('end() took too long (10s)')
}
}
process.nextTick(() => {
stream[kImpl].finished = true
stream.emit('finish')
})
} catch (err) {
destroy(stream, err)
}
// process._rawDebug('end finished...')
}
function writeSync (stream) {
const cb = () => {
if (stream[kImpl].ending) {
end(stream)
} else if (stream[kImpl].needDrain) {
process.nextTick(drain, stream)
}
}
stream[kImpl].flushing = false
while (stream[kImpl].buf.length !== 0) {
const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX)
let leftover = stream[kImpl].data.length - writeIndex
if (leftover === 0) {
flushSync(stream)
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
continue
} else if (leftover < 0) {
// stream should never happen
throw new Error('overwritten')
}
let toWrite = stream[kImpl].buf.slice(0, leftover)
let toWriteBytes = Buffer.byteLength(toWrite)
if (toWriteBytes <= leftover) {
stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
// process._rawDebug('writing ' + toWrite.length)
write(stream, toWrite, cb)
} else {
// multi-byte utf-8
flushSync(stream)
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
// Find a toWrite length that fits the buffer
// it must exists as the buffer is at least 4 bytes length
// and the max utf-8 length for a char is 4 bytes.
while (toWriteBytes > stream[kImpl].buf.length) {
leftover = leftover / 2
toWrite = stream[kImpl].buf.slice(0, leftover)
toWriteBytes = Buffer.byteLength(toWrite)
}
stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
write(stream, toWrite, cb)
}
}
}
function flushSync (stream) {
if (stream[kImpl].flushing) {
throw new Error('unable to flush while flushing')
}
// process._rawDebug('flushSync started')
const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX)
let spins = 0
// TODO handle deadlock
while (true) {
const readIndex = Atomics.load(stream[kImpl].state, READ_INDEX)
if (readIndex === -2) {
throw new Error('_flushSync failed')
}
// process._rawDebug(`(flushSync) readIndex (${readIndex}) writeIndex (${writeIndex})`)
if (readIndex !== writeIndex) {
// TODO stream timeouts for some reason.
Atomics.wait(stream[kImpl].state, READ_INDEX, readIndex, 1000)
} else {
break
}
if (++spins === 10) {
throw new Error('_flushSync took too long (10s)')
}
}
// process._rawDebug('flushSync finished')
}
module.exports = ThreadStream