Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support apache arrow #45

Open
domoritz opened this issue Nov 3, 2024 · 12 comments
Open

Support apache arrow #45

domoritz opened this issue Nov 3, 2024 · 12 comments
Assignees
Labels
enhancement New feature or request

Comments

@domoritz
Copy link

domoritz commented Nov 3, 2024

It's on the roadmap but I wanted to create an issue for it so I can see when it may be supported. I'm very interested in adopting this package but need arrow support (I just need access to the binary ipc).

@jraymakers
Copy link
Contributor

Whether & when I work on Arrow support will depend on the fate of this currently-deprecated part of the C API.

Based on this PR, it looks like they will remain deprecated. I'm not sure what the long-term plan is.

@jraymakers jraymakers self-assigned this Dec 10, 2024
@jraymakers
Copy link
Contributor

jraymakers commented Dec 20, 2024

It seems the Arrow support in the C API has a "DEPRECATION NOTICE", which is distinct from "DEPRECATED". Apparently this means the functions are likely to change, but the functionality itself will likely be preserved. So it seems possible I could expose Arrow support using the current functions with some confidence I can preserve that support when the C API is changed.

@jraymakers
Copy link
Contributor

I reviewed the Arrow support in the C API, and, while I believe I can expose it, I'm unsure whether it provides the desired functionality. In particular, I'm unsure it provides access to the binary IPC format.

For folks interested in Arrow support: What functionality is useful? What parts of the Arrow C API would you use, and what would you like that's missing from that API?

@domoritz
Copy link
Author

I mostly need IPC which I could then read with flechette or arrow js. In most cases I'll send the arrow straight over the wire somewhere else.

Arrow seems like the ideal format for streaming data from a server to a client. I'm not sure what alternative there is so this seems like an essential functionality almost.

@jraymakers jraymakers added the enhancement New feature or request label Dec 29, 2024
@jraymakers
Copy link
Contributor

While it's admittedly more awkward than direct API support, have you tried using the to_arrow_ipc function of the DuckDB Arrow extension? This can generate Arrow IPC for an arbitrary query as BLOBs, which you can then read using the existing result consumption APIs.

It's not very well documented, but the tests are informative.

@domoritz
Copy link
Author

domoritz commented Jan 1, 2025

Oh, fun idea. This should be easy to wrap as well.

It would be nice to have something built in, though, to make sure we didn't fall into some perf trap with a workaround.

@bmschmidt
Copy link

Chiming in: the core feature that is keeping us on the old node client is the ability to have some kind of iterator over arrow record batches. Basically wanting to be able to stream data through duckdb and keep memory usage low and the types as close as possible, like @domoritz for subsequent passing across the network.

E.g. we have essentially the following case running now.

import { RecordBatchStreamReader } from 'apache-arrow';

const stream = db.arrowIPCStream("SELECT * FROM 's3://huggingface-datasets/somebigtable-part*.parquet'")
const reader = await RecordBatchStreamReader.from(stream);
for await (const batch of reader) {
   await myUploadFunction(batch)
}

To be fair, we (@RLesser) have found some problems with this workflow in the node client so it may not be easy!

@jraymakers
Copy link
Contributor

jraymakers commented Jan 2, 2025

I'd be curious to see how to_arrow_ipc serves that use case. That appears to be the only way to generate IPC today; the DuckDB C API doesn't have functions for that, and thus Node Neo can't (without reimplementing it, which seems like a bad idea).

It does seems that to_arrow_ipc returns a separate row for each record batch message. So, if you run the to_arrow_ipc query in streaming mode, it will likely be fairly efficient.

Example:

install arrow;
load arrow;
from to_arrow_ipc((from range(5000)));
┌──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬─────────┐
│                                                                                                                                                                         ipc                                                                                                                                                                          │ header  │
│                                                                                                                                                                         blob                                                                                                                                                                         │ boolean │
├──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼─────────┤
│ \xFF\xFF\xFF\xFF\x80\x00\x00\x00\x10\x00\x00\x00\x00\x00\x0A\x00\x0C\x00\x06\x00\x05\x00\x08\x00\x0A\x00\x00\x00\x00\x01\x04\x00\x0C\x00\x00\x00\x08\x00\x08\x00\x00\x00\x04\x00\x08\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x14\x00\x00\x00\x10\x00\x14\x00\x08\x00\x06\x00\x07\x00\x0C\x00\x00\x00\x10\x00\x10\x00\x00\x00\x00\x00\x01\x02\x…  │ true    │
│ \xFF\xFF\xFF\xFF\x88\x00\x00\x00\x14\x00\x00\x00\x00\x00\x00\x00\x0C\x00\x16\x00\x06\x00\x05\x00\x08\x00\x0C\x00\x0C\x00\x00\x00\x00\x03\x04\x00\x18\x00\x00\x00\x00@\x00\x00\x00\x00\x00\x00\x00\x00\x0A\x00\x18\x00\x0C\x00\x04\x00\x08\x00\x0A\x00\x00\x00<\x00\x00\x00\x10\x00\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00…  │ false   │
│ \xFF\xFF\xFF\xFF\x88\x00\x00\x00\x14\x00\x00\x00\x00\x00\x00\x00\x0C\x00\x16\x00\x06\x00\x05\x00\x08\x00\x0C\x00\x0C\x00\x00\x00\x00\x03\x04\x00\x18\x00\x00\x00\x00@\x00\x00\x00\x00\x00\x00\x00\x00\x0A\x00\x18\x00\x0C\x00\x04\x00\x08\x00\x0A\x00\x00\x00<\x00\x00\x00\x10\x00\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00…  │ false   │
│ \xFF\xFF\xFF\xFF\x88\x00\x00\x00\x14\x00\x00\x00\x00\x00\x00\x00\x0C\x00\x16\x00\x06\x00\x05\x00\x08\x00\x0C\x00\x0C\x00\x00\x00\x00\x03\x04\x00\x18\x00\x00\x00@\x1C\x00\x00\x00\x00\x00\x00\x00\x00\x0A\x00\x18\x00\x0C\x00\x04\x00\x08\x00\x0A\x00\x00\x00<\x00\x00\x00\x10\x00\x00\x00\x88\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00…  │ false   │
└──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴─────────┘

@bmschmidt
Copy link

Thanks for the suggestion! Having dug in a little more, it turns out true streaming is tricky because of the to_arrow_ipc. In any form of duckdb it downloads the entire dataset before it starts yielding batches. But! I'm able to get true streaming in arrow format by iterating across chunks using the IPC, so I think you could call this supported, although the only way I've found to run a sql query on a DuckDBDataChunk is to create a new table, insert the data from the chunk back into duckdb in a different table (and connection), and then query that with to_arrow_ipc.

So I think you can probably call this supported.

This is an insanely roundabout way to run a query on a streaming chunk though… Is there an easy way to just run sql against a DuckDBDataChunk directly rather than putting it into an appender? Or would you have any interest in a helper method that adds something like this as DuckDBResult.fetchArrowBatch() or DuckDBResult.iterArrowBatches()?

  const instance = await DuckDBInstance.create(':memory:');
  const connection = await instance.connect();
  const connection2 = await instance.connect();
  await connection2.run('INSTALL ARROW');
  await connection2.run('LOAD ARROW');
  const query = `FROM 'hf://datasets/HuggingFaceGECLM/REDDIT_comments@~parquet/default/AskHistorians/*.parquet'`;f
  await connection2.run(`CREATE TABLE eric AS ${query} LIMIT 1`)

  const result = await connection.stream(query);
  while (true) {
    const chunk = await result.fetchChunk();
    if (!chunk?.rowCount) {break}

    // Write the chunk to the holder table.
    await connection2.run('DELETE FROM eric')
    const appender = await connection2.createAppender('main', 'eric');
    appender.appendDataChunk(chunk);
    appender.flush();

    // Pull the chunk as arrow from the holder.
    const reader = await connection2.runAndReadAll(`FROM to_arrow_ipc((FROM eric))`);

    const batches = []
    const rows = reader.getRows();
    for (let row of rows) {
      batches.push((row[0]! as DuckDBBlobValue).bytes)
    }

    // Write the chunk bytes to a single Uint8 Array we can deserialize.
    const buff = new Uint8Array(sum(batches.map(d => d.length)))
    let offset = 0;
    for (let batch of batches) {
      buff.set(batch, offset)
      offset += batch.length
    }
    const v = tableFromIPC(buff)

@jraymakers
Copy link
Contributor

That's a neat trick @bmschmidt. I suspect it's actually fairly efficient. Yes, the data has to go back and forth more than it should, but moving single chunks and buffers around like that is going to be pretty fast.

It's definitely not possible to query a DuckDBDataChunk directly without putting it into a table. Perhaps one could implement a table function that wraps the chunk instead of writing it back to a table but (a) user-defined table functions aren't implemented in Node Neo yet, and (b) it's not clear that would be faster.

I can see the value in putting this approach you've concocted into a helper method in the library. I'll have to think about how best to fit it in.

@bmschmidt
Copy link

Cool, thanks. Yeah I always figure the name of the game in these things is to avoid ever casting to js-native types, so shoving around the DuckDBDataChunk object under the table feels sensible enough…

As I see it there are two things that would be nice from the duckdb-node API perspective here, happy to help if there's a need.

  1. Something that exposes an async iterator over the Uint8Array buffers above, which are Arrow tables. (You want to yield deserialized arrow tables directly because there are now competing js clients).
  2. (possibly part of that) Something that makes it slightly easier to build a duckdb table from a DuckDBDataChunk. I don't like in the above code how I have to run the query twice, the first time only to get the table schema; it would be a lot nicer if there was a way to learn the table schema directly from a chunk.

@jraymakers
Copy link
Contributor

Both of those sound feasible. Certainly exposing the buffers in a convenient way. I think it's also possible to get enough information about the column names and types from the (non-materialized) result and the initial chunk to create the table to append to.

I've got a bunch of other items on my queue, but I'll try to get to this at some point. PRs are also welcome.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants