Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Crashing when adding to transform stream #10

Open
1mike12 opened this issue Feb 23, 2024 · 3 comments
Open

Crashing when adding to transform stream #10

1mike12 opened this issue Feb 23, 2024 · 3 comments
Labels
question Further information is requested

Comments

@1mike12
Copy link

1mike12 commented Feb 23, 2024

I'm no stream expert, but given the examples, it seemed like it would be super easy to create a transform stream with your module.

I am able to verify that it works fine when running like the example in the readme, but when I port it into a custom Transform stream, it ends super early, doesn't hit the writeable, and I get no errors. There's a good chance this isn't a problem with uDSV, but it's strange, given your apis, this should work.

// lines = 16_182_971 , 100% finished in 13.1 seconds = 1_235_341 rows per second
// 597.5 MB  13.1s   45.5MB/s
const streamedCSV = async () => {
  const fileSize = await fs.promises.stat(filePath).then((stats) => stats.size)
  const stream = fs.createReadStream(filePath).pipe(new ProgressBarStream(fileSize))

  let parser: Parser;
  for await (const buffer of stream) {
    const strChunk = buffer.toString();
    parser ??= initParser(inferSchema(strChunk));
    parser.chunk<string[]>(strChunk, parser.typedArrs, (batch) => {
      const x = batch // reaches here fine
    });
  }
  parser!.end();
}

const transform = async () => {
  const fileSize = await fs.promises.stat(filePath).then((stats) => stats.size)
  return new Promise<void>((resolve, reject) => {
    pipeline(
      fs.createReadStream(filePath),
      new SimpleUDSVTransform(),
      new Writable({
        write(chunk, encoding, callback) {
          const x = chunk // never reaches here
          callback()
        }
      }),
      (err) => {
        if (err) {
          reject(err) // never reaches here
        } else {
          resolve() // never reaches here
        }
      }
    )
  })
}
(async () => {
  await streamedCSV() //works
  await transform() //breaks
})()
@leeoniya
Copy link
Owner

I'm no stream expert

heh, me neither 😅. uDSV's api is fully synchronous, so there shouldn't be any surprises there.

does this reproduce with something you can attach here and i can debug (that hopefully isnt 16M records)?

@1mike12
Copy link
Author

1mike12 commented Feb 24, 2024

yeah for sure, I just created a example repo you can use to run everything. just run example.ts
I plopped in some random csv I found online, but really, any CSV will work to repro this issue it seems like

https://github.com/1mike12/usdv-stream-example

But regarding the api, I think the way to get it to work with streams is to use the chunk function, which seems to be async right now. I imagine it should be able to be synchronous though too though, since there shouldn't be a reason for the parser to not be able to give us what it's already got so far in a sync way?

this.parser.chunk<string[]>(strChunk, this.parser.stringArrs, (parsedData) => {
      this.push(parsedData);
      callback()
    });

https://github.com/1mike12/usdv-stream-example/blob/d64f3165174248e647b57853dbda4d50ac498719/src/streams/SimpleUDSVTransform.ts#L17

But either way, whether its sync or async, it really doesn't matter to a Transform stream, which is why I'm so confused why it just stops and ends so soon

You should see something like

██████████████████████████████ 100% 1.4 / 1.4 MB  0.0s   36.1MB/s
███░░░░░░░░░░░░░░░░░░░░░░░░░░░ 9% 128.0 / 1.4 MB  0.0s   62.5MB/s
Process finished with exit code 0

@leeoniya
Copy link
Owner

just heads up, it might be week or so before i can get to testing this.

@leeoniya leeoniya added the question Further information is requested label Feb 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants