Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement Persister for PersisterImpl #24588

Merged
merged 6 commits into from
Jan 25, 2024

Conversation

mgattozzi
Copy link
Contributor

This PR implements the Persister as defined in #24556. It can:

  • Read/Write Catalog files to/from an Object Store
  • Read/Write Segment files to/from an Object Store
  • Read/Write Parquet files to/from an Object Store

This builds on the work from #24579 where we created types that
abstracted over the object store path names so that we had a
uniform, strict, and misuse resistant way to address these files.

Each commit in the PR documents the specific ways in which each
part of these was implemented and should be easier to review if
done one by one rather than all at once.

Closes #24556

This commit implements reading and writing the Catalog to the object
store. This was already stubbed out functionality, but it just needed an
implementation. Saving it to the object store is pretty straight forward
as it just serializes it to JSON and writes it to the object store. For
loading, it finds the most recently added Catalog based on the file name
and returns that from the object store in it's deserialized form and
returned to the caller.

This commit also adds some tests to make sure that the above
functionality works as intended.
This commit continues the work on the persister by implementing the
persist_segment and load_segment functions for the persister. Much like
the Catalog implementation, it's serialized to JSON before being
persisted to the object store in persist_segment. This is pretty
straightforward. For the loading though we need to find the most recent
n segment files and so we need to list them and then return the most
recent n. This is a little more complicated to do, but there are
comments in the code to make it easier to grok.

We also implement more tests to make sure that this part of the
persister works as expected.
This commit does a few things:

- First we add methods to the persister trait for reading and writing
  parquet files as these were not stubbed out in prior commits
- Secondly we add a method to serialize a SendableRecordBatchStream into
  Parquet bytes
- With these in place implementing the trait methods is pretty
  straightforward: hand a path in and a stream and get back some
  metadata about the file persisted and also get the bytes back if
  loading from the store

Of course we also add more tests to make sure this all works as
expected. Do note that this does nothing to make sure that we bound how
much memory is used or if this is the most efficient way to write
parquet files. This is mostly to get things working with the
understanding that future refinement on the approach might be needed.
.list(Some(&CatalogFilePath::dir()))
.await?;
let mut catalog_path: Option<ObjPath> = None;
while let Some(item) = list.next().await {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should be stopping iteration on this loop as soon as we have the first catalog entry. We don't want to loop through the entire directory of the catalog here. The point of having the names ordered in the way they are is that we expect the very first item to be the newest catalog, so we just want that entry and then exit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that they're not guaranteed to be in order given the concurrent nature of the interface and the docs:

https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html#tymethod.list

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, that's odd given that I thought the object stores returned list in lexicographical order. Also, we're making the assumption with the catalog and segment file names that they will be returned in order. Otherwise we'd have to list everything in the directory to be sure we have the latest file, which won't be scalable.

This is worth some extra investigation. Can you log an issue for it so we don't lose track?

let new_catalog_name = item
.location
.filename()
.expect("catalog names are utf-8 encoded");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we expect or return an error here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the message can change to be "catalog names are guaranteed to exist" in this dir, but we should have a filename available to us given how we store them. If however we expect other things to be written there without a name then I think we can change this up to continue to the next item in the loop if it has no file name. In fact it might be better to just do that

Some(path) => {
let bytes = self.object_store.get(&path).await?.bytes().await?;
let catalog: InnerCatalog = serde_json::from_slice(&bytes)?;
let file_name = path.filename().expect("catalog names are utf-8 encoded");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we expect or return an error here? Or is this something we just don't think will ever happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a filename if we do a get with the path that we have and so this should be fine. It should read "catalog names are guaranteed to exist for valid paths"

async fn load_segments(&self, most_recent_n: usize) -> crate::Result<Vec<PersistedSegment>> {
let segment_list = self
.object_store
.list(Some(&SegmentInfoFilePath::dir()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If n is over 1,000, I believe this will have to use the list pagination to get the next set of 1k segment files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing I would have to use this? Would the path we use then as the offset be the last item path that we load in our sort then?

https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html#method.list_with_offset

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that's right


let mut output = Vec::new();
for item in &list[range] {
let bytes = self.object_store.get(&item.location).await?.bytes().await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we parallelize this to do the object store gets at the same time? Maybe limit by some amount (like no more than 20 concurrent requests or whatever)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a good stab at this, but because of async_trait hit a real fun "higher ranked order error". It should have worked, but the lifetimes are weird given async_trait and if we don't use async_trait we hit other errors. We could merge as is and open up an issue with my findings for follow up later to take a better look at this in the future and possibly make it work better. I also was unable to get it working with rayon either for parallelization.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's merge as is and log an issue for grabbing the segments in parallel for the future. Better to at least get things working for the moment 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I'll do that after we merge this.

@mgattozzi
Copy link
Contributor Author

@pauldix I'll get it working with the offset get and have a test for it before merging.

@mgattozzi mgattozzi merged commit 001a2a6 into main Jan 25, 2024
12 checks passed
@mgattozzi mgattozzi deleted the mgattozzi/persister/object-store branch January 25, 2024 19:32
mgattozzi added a commit that referenced this pull request Jan 25, 2024
* feat: Implement Catalog r/w for persister

This commit implements reading and writing the Catalog to the object
store. This was already stubbed out functionality, but it just needed an
implementation. Saving it to the object store is pretty straight forward
as it just serializes it to JSON and writes it to the object store. For
loading, it finds the most recently added Catalog based on the file name
and returns that from the object store in it's deserialized form and
returned to the caller.

This commit also adds some tests to make sure that the above
functionality works as intended.

* feat: Implement Segment r/w for persister

This commit continues the work on the persister by implementing the
persist_segment and load_segment functions for the persister. Much like
the Catalog implementation, it's serialized to JSON before being
persisted to the object store in persist_segment. This is pretty
straightforward. For the loading though we need to find the most recent
n segment files and so we need to list them and then return the most
recent n. This is a little more complicated to do, but there are
comments in the code to make it easier to grok.

We also implement more tests to make sure that this part of the
persister works as expected.

* feat: Implement Parquet r/w to persister

This commit does a few things:

- First we add methods to the persister trait for reading and writing
  parquet files as these were not stubbed out in prior commits
- Secondly we add a method to serialize a SendableRecordBatchStream into
  Parquet bytes
- With these in place implementing the trait methods is pretty
  straightforward: hand a path in and a stream and get back some
  metadata about the file persisted and also get the bytes back if
  loading from the store

Of course we also add more tests to make sure this all works as
expected. Do note that this does nothing to make sure that we bound how
much memory is used or if this is the most efficient way to write
parquet files. This is mostly to get things working with the
understanding that future refinement on the approach might be needed.

* fix: Update smallvec for crate advisory

* fix: Implement better filename handling

* feat: Handle loading > 1000 Segment Info files
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Create PersisterImpl
2 participants