Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flatMap causes memory to balloon when concurrent executions are consumed #375

Open
thepont opened this issue Oct 7, 2024 · 1 comment
Open

Comments

@thepont
Copy link

thepont commented Oct 7, 2024

IxJS version:

7.0.0

Code to reproduce:

This example demonstrates the that producer is not waiting for the consumer to finish processing before another item is made available. This creates issues where the memory balloons if the consumer is slower then the producer. This seems to be isolated to when a flatMap is used. Despite all concurrent executions being consumed, more values are still produced.

import {from} from "ix/asynciterable";
import {flatMap} from "ix/asynciterable/operators";

let current = 0;

let range = {
  [Symbol.asyncIterator]() { 
    return {
      async next() {
          await new Promise(resolve => setTimeout(resolve, 1));
          return { done: false, value: current++ };
      }
    };
  }
};

export async function example() {
  let results = (from(range)).pipe(
      flatMap(async (ii) => {
        await new Promise(resolve => setTimeout(resolve, 1000));
        return ii
      }, 1)
  );

  for await (let message of results) {
      // print heap usage
      console.log(`Got Message ${message} messages, current: ${current}, diff: ${current - message}`);
  }
}

example();

Expected behavior:

If all concurrency is used then no more values should be requested until concurrency is freed for processing, The iterator should only produce values is the consumer is able to consume them.

Expected Output

Got Message 0 messages, current: 0, diff: 0
Got Message 1 messages, current: 1, diff: 0
Got Message 2 messages, current: 2, diff: 0
Got Message 3 messages, current: 3, diff: 0

Actual behavior:

Flatmap doesn't prevent values from being produced.

Got Message 0 messages, current: 844, diff: 844
Got Message 1 messages, current: 1703, diff: 1702
Got Message 2 messages, current: 2556, diff: 2554
Got Message 3 messages, current: 3413, diff: 3410

Additional information:

@trxcllnt
Copy link
Member

trxcllnt commented Oct 7, 2024

From your description, it seems the behavior you are looking for is in concatMap()?

edit: nevermind, I see the implementation in your PR. This is an interesting case, I'll comment more there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants