feat: initial async generator support #1560
Conversation
lib/internal/asyncEachOfLimit.js
Outdated
running -= 1; | ||
if (err) return handleError(err) | ||
|
||
if (err === false) { |
Can we avoid calling replenish int his case and go straight to calling the complete callback?
You're right, I should return
in this block.
lib/internal/asyncEachOfLimit.js
Outdated
|
||
function replenish() { | ||
//console.log('replenish') | ||
if (running >= limit || awaiting) return |
How do you end up with more than one generator running asynchronously with this code? If I understand the code correctly, replenish
only gets called
- to spawn the generator when
asyncEachOfLimit
is called - whenever a generator completes
- after
iterateeCallback
is called
Do we not need to loop limit - running
inside of replenish to call several generators at once?
You'd only end up with more than one iteratee running if your iteratee takes longer to process than your generator takes to generate. For example, in the tests, I have the generator ticking every 1 ms, but have the iteratee take 5 ms.
Async generators are like streams -- you get a linear sequence of items. You can't await multiple items at the same time (and if you did, you'd get the same item multiple times, as you do when you call .then()
on the same promise multiple times). This is why we can't loop in replenish
-- we'd end up calling the iteratee with the same item multiple times.
I see thanks for explaining, I haven't used generators before.
So for my understanding, the replenish inside of the generator's then
function is to spawn the asynchronous processes up until limit
are running and the replenish
inside of the iterateeCallback
is to trigger additional items to run if the limit was reached?
You can't await multiple items at the same time (and if you did, you'd get the same item multiple times...)
Are you sure of this, as is right now we have the possibility of calling next
multiple times before the promise resolves if iteratee
is synchronous. In such a case will things break?
This is why we can't loop in replenish -- we'd end up calling the iteratee with the same item multiple times.
Sorry, I may be misunderstanding something, but I thought calls to next
are queued and resolved separately. I'm looking at the Async iterators and async iterables
section of the proposal.
That being said, I think we should still probably avoid looping in replenish as we still have to wait for earlier promises to resolve.
Ah, you're right, I was confusing .next()
with .then()
. We can call .next()
as many times as we need to, perhaps up to the concurrency limit.
lib/internal/asyncEachOfLimit.js
Outdated
if (running >= limit || awaiting) return | ||
//console.log('replenish awaiting') | ||
awaiting = true | ||
generator.next().then(({value, done: iterDone}) => { |
Can you have a generator that has no items?
Nevermind dumb question, I see how this works
lib/internal/asyncEachOfLimit.js
Outdated
//console.log('done') | ||
return callback(null); | ||
} | ||
replenish() |
I think you have to avoid calling replenish
when done === true
or have replenish check if done === true
.
Lets say hypothetically we have a limit of 2 and the final 2 iteratees
are running. When one of these iteratees resolves, running = 1
and done = true
but we'll still replenish and call the generator.next().then()
. This has the possibility of entering a race condition, if the other iteratee resolves in the time between the then
callback is called, you can actually end up calling callback
twice
lib/internal/asyncEachOfLimit.js
Outdated
if (running >= limit || awaiting) return | ||
//console.log('replenish awaiting') | ||
awaiting = true | ||
generator.next().then(({value, done: iterDone}) => { |
Similar to what @megawac was saying, I think we need a if (done) return;
line at the very top of this callback. If one of the previous iteratees cancelled or errored, we shouldn't be invoking the iteratee
with successive items.
For example, if the first iteratee
starts and we start waiting for the second item. If the first iteratee
cancels before the then
callback is called with the second item, when it is called, we would still invoke the iteratee
with the second item.
@@ -15,6 +17,9 @@ export default (limit) => { | |||
if (!obj) { | |||
return callback(null); | |||
} | |||
if (isAsyncGenerator(obj)) { |
This is a minor thing, but should this be expanded to support async iterators in general? Something similar to how we currently have getIterator
except with Symbol.asyncIterator
.
Ah, good point, didnt know about that detail of the spec.
replenish() | ||
} | ||
|
||
function handleError(err) { |
This comment has been hidden.
This comment has been hidden.
Sorry, something went wrong.
|
||
function replenish() { | ||
//console.log('replenish') | ||
if (running >= limit || awaiting || done) return |
BTW, I experimented with calling replenish()
multiple times, awaiting mutiple .next()
s at the same time, but there's some grey area in how it could work. Since you don't know the length of the iterator up front, you have to set some arbitrary limits in the pure parallel case, e.g. 10. Whenever you call .next()
N times synchronously, you end up calling next() and receiving {done: true}
N times spuriously at the end. We don't need to await multiple .next()
s at the same time, because as far as I know, there will be a strict linear sequence of yield
s in the generator, meaning it's pointless to await more than one. It leaves the job of backpressure to the generator implementation too.
@aearly can we update our eslint config to be consistent on semi colons please |
Okay, I've added support for async iterables. Implementation was pretty simple, hard part is getting all the various babel stuff to be happy. |
Also, I'd want to do a linter change in a separate PR. Going to be a big ugly diff if we switch. |
Closes #1551
We have to create a special
eachOfLimit
implementation for this, but once fully tested, all collections methods should fall into place.The text was updated successfully, but these errors were encountered: