Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial gRPC spec changes for supporting index creation and querying #8829

Merged
merged 12 commits into from
Jan 30, 2025
100 changes: 57 additions & 43 deletions crates/store/re_chunk_store/src/protobuf_conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,27 @@ impl From<crate::QueryExpression> for re_protos::common::v0::Query {
}
}

impl TryFrom<crate::ComponentColumnDescriptor>
for re_protos::common::v0::ComponentColumnDescriptor
{
type Error = TypeConversionError;

fn try_from(value: crate::ComponentColumnDescriptor) -> Result<Self, Self::Error> {
Ok(Self {
entity_path: Some(value.entity_path.into()),
archetype_name: value.archetype_name.map(|an| an.to_string()),
archetype_field_name: value.archetype_field_name.map(|afn| afn.to_string()),
component_name: value.component_name.to_string(),
datatype: serde_json::to_string(&value.store_datatype)
.map_err(|err| invalid_field!(Self, "component column descriptor", err))?,
is_static: value.is_static,
is_tombstone: value.is_tombstone,
is_semantically_empty: value.is_semantically_empty,
is_indicator: value.is_indicator,
})
}
}

impl TryFrom<crate::ColumnDescriptor> for re_protos::common::v0::ColumnDescriptor {
type Error = TypeConversionError;

Expand All @@ -280,31 +301,48 @@ impl TryFrom<crate::ColumnDescriptor> for re_protos::common::v0::ColumnDescripto
crate::ColumnDescriptor::Component(component_descriptor) => Ok(Self {
descriptor_type: Some(
re_protos::common::v0::column_descriptor::DescriptorType::ComponentColumn(
re_protos::common::v0::ComponentColumnDescriptor {
entity_path: Some(component_descriptor.entity_path.into()),
archetype_name: component_descriptor
.archetype_name
.map(|an| an.to_string()),
archetype_field_name: component_descriptor
.archetype_field_name
.map(|afn| afn.to_string()),
component_name: component_descriptor.component_name.to_string(),
datatype: serde_json::to_string(&component_descriptor.store_datatype)
.map_err(|err| {
invalid_field!(Self, "component column descriptor", err)
})?,
is_static: component_descriptor.is_static,
is_tombstone: component_descriptor.is_tombstone,
is_semantically_empty: component_descriptor.is_semantically_empty,
is_indicator: component_descriptor.is_indicator,
},
component_descriptor.try_into()?,
),
),
}),
}
}
}

impl TryFrom<re_protos::common::v0::ComponentColumnDescriptor>
for crate::ComponentColumnDescriptor
{
type Error = TypeConversionError;

fn try_from(
value: re_protos::common::v0::ComponentColumnDescriptor,
) -> Result<Self, Self::Error> {
Ok(Self {
entity_path: value
.entity_path
.ok_or(missing_field!(
re_protos::common::v0::ComponentColumnDescriptor,
"entity_path",
))?
.try_into()?,
archetype_name: value.archetype_name.map(Into::into),
archetype_field_name: value.archetype_field_name.map(Into::into),
component_name: value.component_name.into(),
store_datatype: serde_json::from_str(&value.datatype).map_err(|err| {
invalid_field!(
re_protos::common::v0::ColumnDescriptor,
"component column descriptor",
err
)
})?,
is_static: value.is_static,
is_tombstone: value.is_tombstone,
is_semantically_empty: value.is_semantically_empty,
is_indicator: value.is_indicator,
})
}
}

impl TryFrom<re_protos::common::v0::ColumnDescriptor> for crate::ColumnDescriptor {
type Error = TypeConversionError;

Expand Down Expand Up @@ -335,31 +373,7 @@ impl TryFrom<re_protos::common::v0::ColumnDescriptor> for crate::ColumnDescripto
})),
re_protos::common::v0::column_descriptor::DescriptorType::ComponentColumn(
component_descriptor,
) => Ok(Self::Component(crate::ComponentColumnDescriptor {
entity_path: component_descriptor
.entity_path
.ok_or(missing_field!(
re_protos::common::v0::ComponentColumnDescriptor,
"entity_path",
))?
.try_into()?,
archetype_name: component_descriptor.archetype_name.map(Into::into),
archetype_field_name: component_descriptor.archetype_field_name.map(Into::into),
component_name: component_descriptor.component_name.into(),
store_datatype: serde_json::from_str(&component_descriptor.datatype).map_err(
|err| {
invalid_field!(
re_protos::common::v0::ColumnDescriptor,
"component column descriptor",
err
)
},
)?,
is_static: component_descriptor.is_static,
is_tombstone: component_descriptor.is_tombstone,
is_semantically_empty: component_descriptor.is_semantically_empty,
is_indicator: component_descriptor.is_indicator,
})),
) => Ok(Self::Component(component_descriptor.try_into()?)),
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions crates/store/re_dataframe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ pub use self::external::re_log_types::{
#[doc(no_inline)]
pub use self::external::re_query::{QueryCache, QueryCacheHandle, StorageEngine};

#[doc(no_inline)]
pub use self::external::re_types_core::ComponentName;

pub mod external {
pub use re_chunk;
pub use re_chunk_store;
pub use re_log_types;
pub use re_query;
pub use re_types_core;

pub use arrow;
}
112 changes: 112 additions & 0 deletions crates/store/re_protos/proto/rerun/v0/remote_store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ service StorageNode {
rpc Query(QueryRequest) returns (stream DataframePart) {}
rpc FetchRecording(FetchRecordingRequest) returns (stream rerun.common.v0.RerunChunk) {}

rpc IndexCollection(IndexCollectionRequest) returns (IndexCollectionResponse) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now everything lives under one service StorageNode. I wonder if it is cleaner to create several services that groups similar calls logically? Maybe something like CatalogService, IndexService, ...? It might be nice in the future to decide on a fine granular level which services should spin up, for example if we want to distribute loads across multiple VMs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is still a tight relation between catalog, collection, collection query path, collection index query path. This definitely requires bit of thinking how we split it. I'll create an issue, but I won't tackle this as part of this story.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to CreateIndex, we probably want an API for ReIndex that only requires the collection + ColumnDescriptor but doesn't need the parameters.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracking which recordings are in the index so we know whether reindexing is necessary would be nice to think about, but not necessary yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/rerun-io/dataplatform/issues/156

This will be tackled in a first follow up.

// Collection index query response is a RecordBatch with 2 columns:
zehiko marked this conversation as resolved.
Show resolved Hide resolved
// - 'resource_id' column with the id of the resource
// - timepoint column with the values reprensenting the points in time
// where index query matches. What time points are matched depends on the type of
// index that is queried. For example for vector search it might be timepoints where
// top-K matches are found within *each* resource in the collection. For inverted index
// it might be timepoints where the query string is found in the indexed column
rpc QueryCollectionIndex(QueryCollectionIndexRequest) returns (stream DataframePart) {}

// metadata API calls
rpc QueryCatalog(QueryCatalogRequest) returns (stream DataframePart) {}
rpc UpdateCatalog(UpdateCatalogRequest) returns (UpdateCatalogResponse) {}
Expand All @@ -32,6 +42,108 @@ message DataframePart {
bytes payload = 1000;
}

// ---------------- IndexCollection ------------------

message IndexCollectionRequest {
zehiko marked this conversation as resolved.
Show resolved Hide resolved
// which collection do we want to create index for
Collection collection = 1;
// what kind of index do we want to create and what are
// its index specific properties
IndexProperties properties = 2;
// Component / column we want to index
rerun.common.v0.ComponentColumnDescriptor column = 3;
// What is the filter index i.e. timeline for which we
// will query the timepoints
rerun.common.v0.IndexColumnSelector time_index = 4;
}

message IndexProperties {
oneof props {
InvertedIndex inverted = 1;
VectorIvfPqIndex vector = 2;
BTreeIndex btree = 3;
}
}

message InvertedIndex {
bool store_position = 1;
string base_tokenizer = 2;
// TODO(zehiko) add properties as needed
}

message VectorIvfPqIndex {
uint32 num_partitions = 1;
uint32 num_sub_vectors = 2;
VectorDistanceMetric distance_metrics = 3;
}

enum VectorDistanceMetric {
L2 = 0;
COSINE = 1;
DOT = 2;
HAMMING = 3;
}

message BTreeIndex {
// TODO(zehiko) as properties as needed
}

message IndexCollectionResponse {}


// ---------------- QueryCollectionIndex ------------------

message QueryCollectionIndexRequest {
zehiko marked this conversation as resolved.
Show resolved Hide resolved
// Collection we want to run the query against on
// If not specified, the default collection is queried
Collection collection = 1;
// Index type specific query properties
IndexQuery query = 2;
}

message IndexQuery {
// specific index query properties based on the index type
oneof query {
InvertedIndexQuery inverted = 1;
VectorIndexQuery vector = 2;
BTreeIndexQuery btree = 3;
}
}

message InvertedIndexQuery {
// Query to execute represented as the arrow data
// Query should be a unit RecordBatch with 2 columns:
// - 'index' column with the name of the column we want to query
zehiko marked this conversation as resolved.
Show resolved Hide resolved
// - 'query' column with the value we want to query. It must be
// of utf8 type
DataframePart query = 1;
// TODO(zehiko) add properties as needed
}

message VectorIndexQuery {
// Query to execute represented as the arrow data
// Query should be a unit RecordBatch with 2 columns:
// - 'index' column with the name of the column we want to query
// - 'query' column with the value we want to query. It must be of
// type of float32 array
DataframePart query = 1;
uint32 top_k = 2;
}

message BTreeIndexQuery {
// Query to execute represented as the arrow data
// Query should be a unit RecordBatch with 2 columns:
// - 'index' column with the name of the column we want to query
// - 'query' column with the value we want to query. The type should
// be of the same type as the indexed column
DataframePart query = 1;
// TODO(zehiko) add properties as needed
}

message Collection {
string name = 1;
}

// ---------------- GetRecordingSchema ------------------

message GetRecordingSchemaRequest {
Expand Down
Loading
Loading