Skip to content

Commit

Permalink
Fix ending a stream while updating
Browse files Browse the repository at this point in the history
  • Loading branch information
nordfjord committed Jan 31, 2024
1 parent cbb5aec commit 6e791f3
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ logs
# Dependency directory
node_modules
bower_components

.idea
6 changes: 6 additions & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ function trueFn() { return true; }

// Globals
var toUpdate = [];
var toEnd = [];
var inStream;
var order = [];
var orderNextIdx = -1;
Expand Down Expand Up @@ -696,6 +697,9 @@ function flushUpdate() {
var nextUpdateFn = stream.updaters.shift();
if (nextUpdateFn && stream.shouldUpdate) nextUpdateFn(stream);
}
while (toEnd.length > 0) {
toEnd.shift()(true);
}
flushingUpdateQueue = false;
}

Expand All @@ -715,6 +719,8 @@ function updateStreamValue(n, s) {
flushingStreamValue = false;
} else if (inStream === s) {
markListeners(s, s.listeners);
} else if (inStream.end === s) {
toEnd.push(inStream.end);
} else {
updateLaterUsing(function(s) { updateStreamValue(n, s); }, s);
}
Expand Down
40 changes: 40 additions & 0 deletions test/delayed-stream-end.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
var assert = require('assert');
var R = require('ramda');

var flyd = require('../lib');

function once(s) {
return flyd.combine(function(s, self) {
self(s.val)
self.end(true)
}, [s])
}
function withLatestFrom() {
var streams = arguments
return flyd.combine(function() {
var self = arguments[arguments.length - 2]
var result = []
for (var i = 0; i < streams.length; ++i) {
if (!streams[i].hasVal) return
result.push(streams[i].val)
}
self(result)
}, streams)
}

describe('ending a stream', function() {
it('delays ending the current stream until dependents have been updated', function() {
var stream = flyd.stream(1)
function doubled() { return stream.map(R.multiply(2)) }
var count = 0;
stream
.map(function() {
withLatestFrom(doubled(), doubled())
.pipe(once)
.map(function() {
count++
})
})
assert.equal(count, 1)
})
})

0 comments on commit 6e791f3

Please sign in to comment.