Skip to content

Commit

Permalink
Faster state machine (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
mafintosh committed Jun 16, 2023
1 parent 1caafd2 commit 915d002
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 62 deletions.
162 changes: 102 additions & 60 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const FIFO = require('fast-fifo')

/* eslint-disable no-multi-spaces */

// 26 bits used total (4 from shared, 13 from read, and 10 from write)
// 27 bits used total (4 from shared, 13 from read, and 10 from write)
const MAX = ((1 << 27) - 1)

// Shared state
Expand All @@ -21,8 +21,8 @@ const NOT_PREDESTROYING = MAX ^ PREDESTROYING

// Read state (4 bit offset from shared state)
const READ_ACTIVE = 0b0000000000001 << 4
const READ_PRIMARY = 0b0000000000010 << 4
const READ_SYNC = 0b0000000000100 << 4
const READ_UPDATING = 0b0000000000010 << 4
const READ_PRIMARY = 0b0000000000100 << 4
const READ_QUEUED = 0b0000000001000 << 4
const READ_RESUMED = 0b0000000010000 << 4
const READ_PIPE_DRAINED = 0b0000000100000 << 4
Expand All @@ -31,46 +31,45 @@ const READ_EMIT_DATA = 0b0000010000000 << 4
const READ_EMIT_READABLE = 0b0000100000000 << 4
const READ_EMITTED_READABLE = 0b0001000000000 << 4
const READ_DONE = 0b0010000000000 << 4
const READ_NEXT_TICK = 0b0100000000001 << 4 // also active
const READ_NEXT_TICK = 0b0100000000000 << 4
const READ_NEEDS_PUSH = 0b1000000000000 << 4

// Combined read state
const READ_FLOWING = READ_RESUMED | READ_PIPE_DRAINED
const READ_ACTIVE_AND_SYNC = READ_ACTIVE | READ_SYNC
const READ_ACTIVE_AND_SYNC_AND_NEEDS_PUSH = READ_ACTIVE | READ_SYNC | READ_NEEDS_PUSH
const READ_ACTIVE_AND_NEEDS_PUSH = READ_ACTIVE | READ_NEEDS_PUSH
const READ_PRIMARY_AND_ACTIVE = READ_PRIMARY | READ_ACTIVE
const READ_EMIT_READABLE_AND_QUEUED = READ_EMIT_READABLE | READ_QUEUED

const READ_NOT_ACTIVE = MAX ^ READ_ACTIVE
const READ_NON_PRIMARY = MAX ^ READ_PRIMARY
const READ_NON_PRIMARY_AND_PUSHED = MAX ^ (READ_PRIMARY | READ_NEEDS_PUSH)
const READ_NOT_SYNC = MAX ^ READ_SYNC
const READ_PUSHED = MAX ^ READ_NEEDS_PUSH
const READ_PAUSED = MAX ^ READ_RESUMED
const READ_NOT_QUEUED = MAX ^ (READ_QUEUED | READ_EMITTED_READABLE)
const READ_NOT_ENDING = MAX ^ READ_ENDING
const READ_PIPE_NOT_DRAINED = MAX ^ READ_FLOWING
const READ_NOT_NEXT_TICK = MAX ^ READ_NEXT_TICK
const READ_NOT_UPDATING = MAX ^ READ_UPDATING

// Write state (17 bit offset, 4 bit offset from shared state and 13 from read state)
const WRITE_ACTIVE = 0b0000000001 << 17
const WRITE_PRIMARY = 0b0000000010 << 17
const WRITE_SYNC = 0b0000000100 << 17
const WRITE_UPDATING = 0b0000000010 << 17
const WRITE_PRIMARY = 0b0000000100 << 17
const WRITE_QUEUED = 0b0000001000 << 17
const WRITE_UNDRAINED = 0b0000010000 << 17
const WRITE_DONE = 0b0000100000 << 17
const WRITE_EMIT_DRAIN = 0b0001000000 << 17
const WRITE_NEXT_TICK = 0b0010000001 << 17 // also active
const WRITE_FINISHING = 0b0100000000 << 17
const WRITE_WRITING = 0b1000000000 << 17
const WRITE_NEXT_TICK = 0b0010000000 << 17
const WRITE_WRITING = 0b0100000000 << 17
const WRITE_FINISHING = 0b1000000000 << 17

const WRITE_NOT_ACTIVE = MAX ^ (WRITE_ACTIVE | WRITE_WRITING)
const WRITE_NOT_SYNC = MAX ^ WRITE_SYNC
const WRITE_NON_PRIMARY = MAX ^ WRITE_PRIMARY
const WRITE_NOT_FINISHING = MAX ^ WRITE_FINISHING
const WRITE_DRAINED = MAX ^ WRITE_UNDRAINED
const WRITE_NOT_QUEUED = MAX ^ WRITE_QUEUED
const WRITE_NOT_NEXT_TICK = MAX ^ WRITE_NEXT_TICK
const WRITE_NOT_UPDATING = MAX ^ WRITE_UPDATING

// Combined shared state
const ACTIVE = READ_ACTIVE | WRITE_ACTIVE
Expand All @@ -91,6 +90,7 @@ const READ_ENDING_STATUS = OPEN_STATUS | READ_ENDING | READ_QUEUED
const READ_READABLE_STATUS = OPEN_STATUS | READ_EMIT_READABLE | READ_QUEUED | READ_EMITTED_READABLE
const SHOULD_NOT_READ = OPEN_STATUS | READ_ACTIVE | READ_ENDING | READ_DONE | READ_NEEDS_PUSH
const READ_BACKPRESSURE_STATUS = DESTROY_STATUS | READ_ENDING | READ_DONE
const READ_UPDATE_SYNC_STATUS = READ_UPDATING | OPEN_STATUS | READ_NEXT_TICK | READ_PRIMARY

// Combined write state
const WRITE_PRIMARY_STATUS = OPEN_STATUS | WRITE_FINISHING | WRITE_DONE
Expand All @@ -99,9 +99,10 @@ const WRITE_QUEUED_AND_ACTIVE = WRITE_QUEUED | WRITE_ACTIVE
const WRITE_DRAIN_STATUS = WRITE_QUEUED | WRITE_UNDRAINED | OPEN_STATUS | WRITE_ACTIVE
const WRITE_STATUS = OPEN_STATUS | WRITE_ACTIVE | WRITE_QUEUED
const WRITE_PRIMARY_AND_ACTIVE = WRITE_PRIMARY | WRITE_ACTIVE
const WRITE_ACTIVE_AND_SYNC = WRITE_ACTIVE | WRITE_SYNC | WRITE_WRITING
const WRITE_ACTIVE_AND_WRITING = WRITE_ACTIVE | WRITE_WRITING
const WRITE_FINISHING_STATUS = OPEN_STATUS | WRITE_FINISHING | WRITE_QUEUED_AND_ACTIVE | WRITE_DONE
const WRITE_BACKPRESSURE_STATUS = WRITE_UNDRAINED | DESTROY_STATUS | WRITE_FINISHING | WRITE_DONE
const WRITE_UPDATE_SYNC_STATUS = WRITE_UPDATING | OPEN_STATUS | WRITE_NEXT_TICK | WRITE_PRIMARY

const asyncIterator = Symbol.asyncIterator || Symbol('asyncIterator')

Expand Down Expand Up @@ -141,10 +142,9 @@ class WritableState {

shift () {
const data = this.queue.shift()
const stream = this.stream

this.buffered -= this.byteLength(data)
if (this.buffered === 0) stream._duplexState &= WRITE_NOT_QUEUED
if (this.buffered === 0) this.stream._duplexState &= WRITE_NOT_QUEUED

return data
}
Expand All @@ -171,14 +171,19 @@ class WritableState {
update () {
const stream = this.stream

while ((stream._duplexState & WRITE_STATUS) === WRITE_QUEUED) {
const data = this.shift()
stream._duplexState |= WRITE_ACTIVE_AND_SYNC
stream._write(data, this.afterWrite)
stream._duplexState &= WRITE_NOT_SYNC
}
stream._duplexState |= WRITE_UPDATING

do {
while ((stream._duplexState & WRITE_STATUS) === WRITE_QUEUED) {
const data = this.shift()
stream._duplexState |= WRITE_ACTIVE_AND_WRITING
stream._write(data, this.afterWrite)
}

if ((stream._duplexState & WRITE_PRIMARY_AND_ACTIVE) === 0) this.updateNonPrimary()
} while (this.continueUpdate() === true)

if ((stream._duplexState & WRITE_PRIMARY_AND_ACTIVE) === 0) this.updateNonPrimary()
stream._duplexState &= WRITE_NOT_UPDATING
}

updateNonPrimary () {
Expand All @@ -204,10 +209,21 @@ class WritableState {
}
}

continueUpdate () {
if ((this.stream._duplexState & WRITE_NEXT_TICK) === 0) return false
this.stream._duplexState &= WRITE_NOT_NEXT_TICK
return true
}

updateCallback () {
if ((this.stream._duplexState & WRITE_UPDATE_SYNC_STATUS) === WRITE_PRIMARY) this.update()
else this.updateNextTick()
}

updateNextTick () {
if ((this.stream._duplexState & WRITE_NEXT_TICK) !== 0) return
this.stream._duplexState |= WRITE_NEXT_TICK
queueTick(this.afterUpdateNextTick)
if ((this.stream._duplexState & WRITE_UPDATING) === 0) queueTick(this.afterUpdateNextTick)
}
}

Expand Down Expand Up @@ -284,18 +300,16 @@ class ReadableState {
}

unshift (data) {
let tail
const pending = []
const pending = [this.map !== null ? this.map(data) : data]
while (this.buffered > 0) pending.push(this.shift())

while ((tail = this.queue.shift()) !== undefined) {
pending.push(tail)
for (let i = 0; i < pending.length - 1; i++) {
const data = pending[i]
this.buffered += this.byteLength(data)
this.queue.push(data)
}

this.push(data)

for (let i = 0; i < pending.length; i++) {
this.queue.push(pending[i])
}
this.push(pending[pending.length - 1])
}

read () {
Expand Down Expand Up @@ -324,21 +338,26 @@ class ReadableState {
update () {
const stream = this.stream

this.drain()
stream._duplexState |= READ_UPDATING

while (this.buffered < this.highWaterMark && (stream._duplexState & SHOULD_NOT_READ) === 0) {
stream._duplexState |= READ_ACTIVE_AND_SYNC_AND_NEEDS_PUSH
stream._read(this.afterRead)
stream._duplexState &= READ_NOT_SYNC
if ((stream._duplexState & READ_ACTIVE) === 0) this.drain()
}
do {
this.drain()

if ((stream._duplexState & READ_READABLE_STATUS) === READ_EMIT_READABLE_AND_QUEUED) {
stream._duplexState |= READ_EMITTED_READABLE
stream.emit('readable')
}
while (this.buffered < this.highWaterMark && (stream._duplexState & SHOULD_NOT_READ) === 0) {
stream._duplexState |= READ_ACTIVE_AND_NEEDS_PUSH
stream._read(this.afterRead)
this.drain()
}

if ((stream._duplexState & READ_PRIMARY_AND_ACTIVE) === 0) this.updateNonPrimary()
if ((stream._duplexState & READ_READABLE_STATUS) === READ_EMIT_READABLE_AND_QUEUED) {
stream._duplexState |= READ_EMITTED_READABLE
stream.emit('readable')
}

if ((stream._duplexState & READ_PRIMARY_AND_ACTIVE) === 0) this.updateNonPrimary()
} while (this.continueUpdate() === true)

stream._duplexState &= READ_NOT_UPDATING
}

updateNonPrimary () {
Expand All @@ -365,10 +384,21 @@ class ReadableState {
}
}

continueUpdate () {
if ((this.stream._duplexState & READ_NEXT_TICK) === 0) return false
this.stream._duplexState &= READ_NOT_NEXT_TICK
return true
}

updateCallback () {
if ((this.stream._readableState & READ_UPDATE_SYNC_STATUS) === READ_PRIMARY) this.update()
else this.updateNextTick()
}

updateNextTick () {
if ((this.stream._duplexState & READ_NEXT_TICK) !== 0) return
this.stream._duplexState |= READ_NEXT_TICK
queueTick(this.afterUpdateNextTick)
if ((this.stream._duplexState & READ_UPDATING) === 0) queueTick(this.afterUpdateNextTick)
}
}

Expand Down Expand Up @@ -425,8 +455,7 @@ class Pipeline {

function afterDrain () {
this.stream._duplexState |= READ_PIPE_DRAINED
if ((this.stream._duplexState & READ_ACTIVE_AND_SYNC) === 0) this.updateNextTick()
else this.drain()
this.updateCallback()
}

function afterFinal (err) {
Expand All @@ -441,7 +470,10 @@ function afterFinal (err) {
}

stream._duplexState &= WRITE_NOT_ACTIVE
this.update()

// no need to wait the extra tick here, so we short circuit that
if ((stream._duplexState & WRITE_UPDATING) === 0) this.update()
else this.updateNextTick()
}

function afterDestroy (err) {
Expand Down Expand Up @@ -478,23 +510,27 @@ function afterWrite (err) {
}
}

if ((stream._duplexState & WRITE_SYNC) === 0) this.update()
this.updateCallback()
}

function afterRead (err) {
if (err) this.stream.destroy(err)
this.stream._duplexState &= READ_NOT_ACTIVE
if ((this.stream._duplexState & READ_SYNC) === 0) this.update()
this.updateCallback()
}

function updateReadNT () {
this.stream._duplexState &= READ_NOT_NEXT_TICK
this.update()
if ((this.stream._duplexState & READ_UPDATING) === 0) {
this.stream._duplexState &= READ_NOT_NEXT_TICK
this.update()
}
}

function updateWriteNT () {
this.stream._duplexState &= WRITE_NOT_NEXT_TICK
this.update()
if ((this.stream._duplexState & WRITE_UPDATING) === 0) {
this.stream._duplexState &= WRITE_NOT_NEXT_TICK
this.update()
}
}

function tickDrains (drains) {
Expand All @@ -521,11 +557,11 @@ function afterOpen (err) {
stream._duplexState &= NOT_ACTIVE

if (stream._writableState !== null) {
stream._writableState.update()
stream._writableState.updateCallback()
}

if (stream._readableState !== null) {
stream._readableState.update()
stream._readableState.updateCallback()
}
}

Expand Down Expand Up @@ -585,8 +621,14 @@ class Stream extends EventEmitter {
if (!err) err = STREAM_DESTROYED
this._duplexState = (this._duplexState | DESTROYING) & NON_PRIMARY

if (this._readableState !== null) this._readableState.error = err
if (this._writableState !== null) this._writableState.error = err
if (this._readableState !== null) {
this._readableState.highWaterMark = 0
this._readableState.error = err
}
if (this._writableState !== null) {
this._writableState.highWaterMark = 0
this._writableState.error = err
}

this._duplexState |= PREDESTROYING
this._predestroy()
Expand Down Expand Up @@ -638,8 +680,8 @@ class Readable extends Stream {
}

pipe (dest, cb) {
this._readableState.pipe(dest, cb)
this._readableState.updateNextTick()
this._readableState.pipe(dest, cb)
return dest
}

Expand Down

0 comments on commit 915d002

Please sign in to comment.