From da294a265e119617819cba60ae3ec94286b217bb Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Mon, 4 Nov 2024 12:23:54 -0800 Subject: [PATCH] refactor: remove conversion step in validator (#25515) The write validator now builds the row data destined for the WAL directly, vs. creating an intermediate type to hold row data. --- influxdb3_write/src/lib.rs | 11 +- influxdb3_write/src/write_buffer/mod.rs | 19 +- influxdb3_write/src/write_buffer/validator.rs | 306 ++++++++++-------- 3 files changed, 190 insertions(+), 146 deletions(-) diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index e079fdc8dea..b8b844a5934 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -343,13 +343,14 @@ mod test_helpers { let db_name = NamespaceName::new(db_name).unwrap(); let result = WriteValidator::initialize(db_name.clone(), catalog, 0) .unwrap() - .v1_parse_lines_and_update_schema(lp, false) - .unwrap() - .convert_lines_to_buffer( + .v1_parse_lines_and_update_schema( + lp, + false, Time::from_timestamp_nanos(0), - Gen1Duration::new_5m(), Precision::Nanosecond, - ); + ) + .unwrap() + .convert_lines_to_buffer(Gen1Duration::new_5m()); result.valid_data } diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 175684b18c2..aa0f0304578 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -225,8 +225,8 @@ impl WriteBufferImpl { self.catalog(), ingest_time.timestamp_nanos(), )? - .v1_parse_lines_and_update_schema(lp, accept_partial)? - .convert_lines_to_buffer(ingest_time, self.wal_config.gen1_duration, precision); + .v1_parse_lines_and_update_schema(lp, accept_partial, ingest_time, precision)? + .convert_lines_to_buffer(self.wal_config.gen1_duration); // if there were catalog updates, ensure they get persisted to the wal, so they're // replayed on restart @@ -267,8 +267,8 @@ impl WriteBufferImpl { self.catalog(), ingest_time.timestamp_nanos(), )? - .v3_parse_lines_and_update_schema(lp, accept_partial)? - .convert_lines_to_buffer(ingest_time, self.wal_config.gen1_duration, precision); + .v3_parse_lines_and_update_schema(lp, accept_partial, ingest_time, precision)? + .convert_lines_to_buffer(self.wal_config.gen1_duration); // if there were catalog updates, ensure they get persisted to the wal, so they're // replayed on restart @@ -560,13 +560,14 @@ mod tests { let lp = "cpu,region=west user=23.2 100\nfoo f1=1i"; WriteValidator::initialize(db_name, Arc::clone(&catalog), 0) .unwrap() - .v1_parse_lines_and_update_schema(lp, false) - .unwrap() - .convert_lines_to_buffer( + .v1_parse_lines_and_update_schema( + lp, + false, Time::from_timestamp_nanos(0), - Gen1Duration::new_5m(), Precision::Nanosecond, - ); + ) + .unwrap() + .convert_lines_to_buffer(Gen1Duration::new_5m()); let db = catalog.db_schema_by_id(DbId::from(0)).unwrap(); diff --git a/influxdb3_write/src/write_buffer/validator.rs b/influxdb3_write/src/write_buffer/validator.rs index e2d480f0fa8..507487eaf72 100644 --- a/influxdb3_write/src/write_buffer/validator.rs +++ b/influxdb3_write/src/write_buffer/validator.rs @@ -73,6 +73,8 @@ impl WriteValidator { self, lp: &str, accept_partial: bool, + ingest_time: Time, + precision: Precision, ) -> Result> { let mut errors = vec![]; let mut lp_lines = lp.lines(); @@ -93,6 +95,8 @@ impl WriteValidator { line_idx, line, lp_lines.next().unwrap(), + ingest_time, + precision, ) }) { Ok((qualified_line, catalog_ops)) => (qualified_line, catalog_ops), @@ -150,6 +154,8 @@ impl WriteValidator { self, lp: &str, accept_partial: bool, + ingest_time: Time, + precision: Precision, ) -> Result> { let mut errors = vec![]; let mut lp_lines = lp.lines(); @@ -167,7 +173,14 @@ impl WriteValidator { error_message: e.to_string(), }) .and_then(|l| { - validate_and_qualify_v1_line(&mut schema, line_idx, l, lp_lines.next().unwrap()) + validate_and_qualify_v1_line( + &mut schema, + line_idx, + l, + lp_lines.next().unwrap(), + ingest_time, + precision, + ) }) { Ok((qualified_line, catalog_op)) => (qualified_line, catalog_op), Err(e) => { @@ -231,9 +244,14 @@ fn validate_and_qualify_v3_line( line_number: usize, line: v3::ParsedLine, raw_line: &str, + ingest_time: Time, + precision: Precision, ) -> Result<(QualifiedLine, Option), WriteLineError> { let mut catalog_op = None; let table_name = line.series.measurement.as_str(); + let mut fields = Vec::with_capacity(line.column_count()); + let mut index_count = 0; + let mut field_count = 0; let qualified = if let Some(table_def) = db_schema.table_definition(table_name) { let table_id = table_def.table_id; if !table_def.is_v3() { @@ -285,8 +303,7 @@ fn validate_and_qualify_v3_line( let mut columns = ColumnTracker::with_capacity(line.column_count() + 1); // qualify the series key members: - let tag_set = if let Some(sk) = &line.series.series_key { - let mut ts = Vec::with_capacity(sk.len()); + if let Some(sk) = &line.series.series_key { for (key, val) in sk.iter() { let col_id = table_def @@ -299,15 +316,12 @@ fn validate_and_qualify_v3_line( that does not exist in the catalog table definition" ), })?; - ts.push(Field::new(col_id, val)); + fields.push(Field::new(col_id, val)); + index_count += 1; } - Some(ts) - } else { - None - }; + } // qualify the fields: - let mut field_set = Vec::with_capacity(line.field_set.len()); for (field_name, field_val) in line.field_set.iter() { if let Some((col_id, col_def)) = table_def.column_def_and_id(field_name.as_str()) { let field_col_type = influx_column_type_from_field_value(field_val); @@ -325,7 +339,7 @@ fn validate_and_qualify_v3_line( ), }); } - field_set.push(Field::new(col_id, field_val)); + fields.push(Field::new(col_id, field_val)); } else { let col_id = ColumnId::new(); columns.push(( @@ -333,15 +347,28 @@ fn validate_and_qualify_v3_line( Arc::from(field_name.as_str()), influx_column_type_from_field_value(field_val), )); - field_set.push(Field::new(col_id, field_val)); + fields.push(Field::new(col_id, field_val)); } + field_count += 1; } // qualify the timestamp: let time_col_id = table_def .column_name_to_id(TIME_COLUMN_NAME) - .unwrap_or_else(ColumnId::new); - let timestamp = (time_col_id, line.timestamp); + .unwrap_or_else(|| { + let col_id = ColumnId::new(); + columns.push(( + col_id, + Arc::from(TIME_COLUMN_NAME), + InfluxColumnType::Timestamp, + )); + col_id + }); + let timestamp_ns = line + .timestamp + .map(|ts| apply_precision_to_timestamp(precision, ts)) + .unwrap_or(ingest_time.timestamp_nanos()); + fields.push(Field::new(time_col_id, FieldData::Timestamp(timestamp_ns))); // if we have new columns defined, add them to the db_schema table so that subsequent lines // won't try to add the same definitions. Collect these additions into a catalog op, which @@ -358,16 +385,16 @@ fn validate_and_qualify_v3_line( .as_ref() .clone(); - let mut fields = Vec::with_capacity(columns.len()); + let mut field_definitions = Vec::with_capacity(columns.len()); for (id, name, influx_type) in columns.iter() { - fields.push(FieldDefinition::new(*id, Arc::clone(name), influx_type)); + field_definitions.push(FieldDefinition::new(*id, Arc::clone(name), influx_type)); } catalog_op = Some(CatalogOp::AddFields(FieldAdditions { database_id, database_name, table_id: new_table_def.table_id, table_name: Arc::clone(&new_table_def.table_name), - field_definitions: fields, + field_definitions, })); new_table_def @@ -381,27 +408,26 @@ fn validate_and_qualify_v3_line( } QualifiedLine { table_id, - tag_set, - field_set, - timestamp, + row: Row { + time: timestamp_ns, + fields, + }, + index_count, + field_count, } } else { let table_id = TableId::new(); let mut columns = Vec::new(); let mut key = Vec::new(); - let tag_set = if let Some(series_key) = &line.series.series_key { - let mut ts = Vec::with_capacity(series_key.len()); + if let Some(series_key) = &line.series.series_key { for (sk, sv) in series_key.iter() { let col_id = ColumnId::new(); key.push(col_id); columns.push((col_id, Arc::from(sk.as_str()), InfluxColumnType::Tag)); - ts.push(Field::new(col_id, sv)); + fields.push(Field::new(col_id, sv)); + index_count += 1; } - Some(ts) - } else { - None - }; - let mut field_set = Vec::with_capacity(line.field_set.len()); + } for (field_name, field_val) in line.field_set.iter() { let col_id = ColumnId::new(); columns.push(( @@ -409,7 +435,8 @@ fn validate_and_qualify_v3_line( Arc::from(field_name.as_str()), influx_column_type_from_field_value(field_val), )); - field_set.push(Field::new(col_id, field_val)); + fields.push(Field::new(col_id, field_val)); + field_count += 1; } // Always add time last on new table: let time_col_id = ColumnId::new(); @@ -418,13 +445,17 @@ fn validate_and_qualify_v3_line( Arc::from(TIME_COLUMN_NAME), InfluxColumnType::Timestamp, )); - let timestamp = (time_col_id, line.timestamp); + let timestamp_ns = line + .timestamp + .map(|ts| apply_precision_to_timestamp(precision, ts)) + .unwrap_or(ingest_time.timestamp_nanos()); + fields.push(Field::new(time_col_id, FieldData::Timestamp(timestamp_ns))); let table_name = table_name.into(); - let mut fields = Vec::with_capacity(columns.len()); + let mut field_definitions = Vec::with_capacity(columns.len()); for (id, name, influx_type) in &columns { - fields.push(FieldDefinition::new(*id, Arc::clone(name), influx_type)); + field_definitions.push(FieldDefinition::new(*id, Arc::clone(name), influx_type)); } let table = TableDefinition::new( @@ -444,7 +475,7 @@ fn validate_and_qualify_v3_line( database_id: db_schema.id, database_name: Arc::clone(&db_schema.name), table_name: Arc::clone(&table_name), - field_definitions: fields, + field_definitions, key: Some(key), }); catalog_op = Some(table_definition_op); @@ -456,9 +487,12 @@ fn validate_and_qualify_v3_line( ); QualifiedLine { table_id, - tag_set, - field_set, - timestamp, + row: Row { + time: timestamp_ns, + fields, + }, + index_count, + field_count, } }; @@ -477,9 +511,14 @@ fn validate_and_qualify_v1_line( line_number: usize, line: ParsedLine, _raw_line: &str, + ingest_time: Time, + precision: Precision, ) -> Result<(QualifiedLine, Option), WriteLineError> { let mut catalog_op = None; let table_name = line.series.measurement.as_str(); + let mut fields = Vec::with_capacity(line.column_count()); + let mut index_count = 0; + let mut field_count = 0; let qualified = if let Some(table_def) = db_schema.table_definition(table_name) { if table_def.is_v3() { return Err(WriteLineError { @@ -491,22 +530,18 @@ fn validate_and_qualify_v1_line( } // This table already exists, so update with any new columns if present: let mut columns = ColumnTracker::with_capacity(line.column_count() + 1); - let tag_set = if let Some(tag_set) = &line.series.tag_set { - let mut ts = Vec::with_capacity(tag_set.len()); + if let Some(tag_set) = &line.series.tag_set { for (tag_key, tag_val) in tag_set { if let Some(col_id) = table_def.column_name_to_id(tag_key.as_str()) { - ts.push(Field::new(col_id, FieldData::Tag(tag_val.to_string()))); + fields.push(Field::new(col_id, FieldData::Tag(tag_val.to_string()))); } else { let col_id = ColumnId::new(); columns.push((col_id, Arc::from(tag_key.as_str()), InfluxColumnType::Tag)); - ts.push(Field::new(col_id, FieldData::Tag(tag_val.to_string()))); + fields.push(Field::new(col_id, FieldData::Tag(tag_val.to_string()))); } + index_count += 1; } - Some(ts) - } else { - None - }; - let mut field_set = Vec::with_capacity(line.field_set.len()); + } for (field_name, field_val) in line.field_set.iter() { // This field already exists, so check the incoming type matches existing type: if let Some((col_id, col_def)) = table_def.column_def_and_id(field_name.as_str()) { @@ -525,7 +560,7 @@ fn validate_and_qualify_v1_line( ), }); } - field_set.push(Field::new(col_id, field_val)); + fields.push(Field::new(col_id, field_val)); } else { let col_id = ColumnId::new(); columns.push(( @@ -533,14 +568,27 @@ fn validate_and_qualify_v1_line( Arc::from(field_name.as_str()), influx_column_type_from_field_value(field_val), )); - field_set.push(Field::new(col_id, field_val)); + fields.push(Field::new(col_id, field_val)); } + field_count += 1; } let time_col_id = table_def .column_name_to_id(TIME_COLUMN_NAME) - .unwrap_or_default(); - let timestamp = (time_col_id, line.timestamp); + .unwrap_or_else(|| { + let col_id = ColumnId::new(); + columns.push(( + col_id, + Arc::from(TIME_COLUMN_NAME), + InfluxColumnType::Timestamp, + )); + col_id + }); + let timestamp_ns = line + .timestamp + .map(|ts| apply_precision_to_timestamp(precision, ts)) + .unwrap_or(ingest_time.timestamp_nanos()); + fields.push(Field::new(time_col_id, FieldData::Timestamp(timestamp_ns))); // if we have new columns defined, add them to the db_schema table so that subsequent lines // won't try to add the same definitions. Collect these additions into a catalog op, which @@ -552,9 +600,9 @@ fn validate_and_qualify_v1_line( let table_name: Arc = Arc::clone(&table_def.table_name); let table_id = table_def.table_id; - let mut fields = Vec::with_capacity(columns.len()); + let mut field_definitions = Vec::with_capacity(columns.len()); for (id, name, influx_type) in &columns { - fields.push(FieldDefinition::new(*id, Arc::clone(name), influx_type)); + field_definitions.push(FieldDefinition::new(*id, Arc::clone(name), influx_type)); } let db_schema = db_schema.to_mut(); @@ -579,31 +627,30 @@ fn validate_and_qualify_v1_line( database_id, table_id, table_name, - field_definitions: fields, + field_definitions, })); } QualifiedLine { table_id: table_def.table_id, - tag_set, - field_set, - timestamp, + row: Row { + time: timestamp_ns, + fields, + }, + index_count, + field_count, } } else { let table_id = TableId::new(); // This is a new table, so build up its columns: let mut columns = Vec::new(); - let tag_set = if let Some(tag_set) = &line.series.tag_set { - let mut ts = Vec::with_capacity(tag_set.len()); + if let Some(tag_set) = &line.series.tag_set { for (tag_key, tag_val) in tag_set { let col_id = ColumnId::new(); - ts.push(Field::new(col_id, FieldData::Tag(tag_val.to_string()))); + fields.push(Field::new(col_id, FieldData::Tag(tag_val.to_string()))); columns.push((col_id, Arc::from(tag_key.as_str()), InfluxColumnType::Tag)); + index_count += 1; } - Some(ts) - } else { - None - }; - let mut field_set = Vec::with_capacity(line.field_set.len()); + } for (field_name, field_val) in &line.field_set { let col_id = ColumnId::new(); columns.push(( @@ -611,7 +658,8 @@ fn validate_and_qualify_v1_line( Arc::from(field_name.as_str()), influx_column_type_from_field_value(field_val), )); - field_set.push(Field::new(col_id, field_val)); + fields.push(Field::new(col_id, field_val)); + field_count += 1; } // Always add time last on new table: let time_col_id = ColumnId::new(); @@ -620,20 +668,24 @@ fn validate_and_qualify_v1_line( Arc::from(TIME_COLUMN_NAME), InfluxColumnType::Timestamp, )); - let timestamp = (time_col_id, line.timestamp); + let timestamp_ns = line + .timestamp + .map(|ts| apply_precision_to_timestamp(precision, ts)) + .unwrap_or(ingest_time.timestamp_nanos()); + fields.push(Field::new(time_col_id, FieldData::Timestamp(timestamp_ns))); let table_name = table_name.into(); - let mut fields = Vec::with_capacity(columns.len()); + let mut field_definitions = Vec::with_capacity(columns.len()); for (id, name, influx_type) in &columns { - fields.push(FieldDefinition::new(*id, Arc::clone(name), influx_type)); + field_definitions.push(FieldDefinition::new(*id, Arc::clone(name), influx_type)); } catalog_op = Some(CatalogOp::CreateTable(influxdb3_wal::TableDefinition { table_id, database_id: db_schema.id, database_name: Arc::clone(&db_schema.name), table_name: Arc::clone(&table_name), - field_definitions: fields, + field_definitions, key: None, })); @@ -646,9 +698,12 @@ fn validate_and_qualify_v1_line( ); QualifiedLine { table_id, - tag_set, - field_set, - timestamp, + row: Row { + time: timestamp_ns, + fields, + }, + index_count, + field_count, } }; @@ -680,28 +735,17 @@ impl WriteValidator { /// This involves splitting out the writes into different batches for each chunk, which will /// map to the `Gen1Duration`. This function should be infallible, because /// the schema for incoming writes has been fully validated. - pub(crate) fn convert_lines_to_buffer( - self, - ingest_time: Time, - gen1_duration: Gen1Duration, - precision: Precision, - ) -> ValidatedLines { + pub(crate) fn convert_lines_to_buffer(self, gen1_duration: Gen1Duration) -> ValidatedLines { let mut table_chunks = IndexMap::new(); let line_count = self.state.lines.len(); let mut field_count = 0; let mut index_count = 0; for line in self.state.lines.into_iter() { - field_count += line.field_set.len(); - index_count += line.tag_set.as_ref().map(|tags| tags.len()).unwrap_or(0); - - convert_qualified_line( - line, - &mut table_chunks, - ingest_time, - gen1_duration, - precision, - ); + field_count += line.field_count; + index_count += line.index_count; + + convert_qualified_line(line, &mut table_chunks, gen1_duration); } let write_batch = WriteBatch::new( @@ -724,55 +768,19 @@ impl WriteValidator { fn convert_qualified_line( line: QualifiedLine, table_chunk_map: &mut IndexMap, - ingest_time: Time, gen1_duration: Gen1Duration, - precision: Precision, ) { - // Set up row values: - let mut fields = Vec::with_capacity(line.column_count() + 1); - - // Add series key columns: - if let Some(tag_set) = line.tag_set { - fields.extend(tag_set); - } - - // Add fields columns: - fields.extend(line.field_set); - - // Add time column: - let time_value_nanos = line - .timestamp - .1 - .map(|ts| apply_precision_to_timestamp(precision, ts)) - .unwrap_or(ingest_time.timestamp_nanos()); - fields.push(Field::new( - line.timestamp.0, - FieldData::Timestamp(time_value_nanos), - )); - // Add the row into the correct chunk in the table - let chunk_time = gen1_duration.chunk_time_for_timestamp(Timestamp::new(time_value_nanos)); + let chunk_time = gen1_duration.chunk_time_for_timestamp(Timestamp::new(line.row.time)); let table_chunks = table_chunk_map.entry(line.table_id).or_default(); - table_chunks.push_row( - chunk_time, - Row { - time: time_value_nanos, - fields, - }, - ); + table_chunks.push_row(chunk_time, line.row); } struct QualifiedLine { table_id: TableId, - tag_set: Option>, - field_set: Vec, - timestamp: (ColumnId, Option), -} - -impl QualifiedLine { - fn column_count(&self) -> usize { - self.tag_set.as_ref().map(|ts| ts.len()).unwrap_or(0) + self.field_set.len() + 1 - } + row: Row, + index_count: usize, + field_count: usize, } fn apply_precision_to_timestamp(precision: Precision, ts: i64) -> i64 { @@ -812,13 +820,14 @@ mod tests { let instance_id = Arc::from("sample-instance-id"); let namespace = NamespaceName::new("test").unwrap(); let catalog = Arc::new(Catalog::new(host_id, instance_id)); - let result = WriteValidator::initialize(namespace.clone(), catalog, 0)? - .v1_parse_lines_and_update_schema("cpu,tag1=foo val1=\"bar\" 1234", false)? - .convert_lines_to_buffer( + let result = WriteValidator::initialize(namespace.clone(), Arc::clone(&catalog), 0)? + .v1_parse_lines_and_update_schema( + "cpu,tag1=foo val1=\"bar\" 1234", + false, Time::from_timestamp_nanos(0), - Gen1Duration::new_5m(), Precision::Auto, - ); + )? + .convert_lines_to_buffer(Gen1Duration::new_5m()); assert_eq!(result.line_count, 1); assert_eq!(result.field_count, 1); @@ -834,6 +843,39 @@ mod tests { .unwrap(); assert_eq!(batch.row_count(), 1); + // Validate another write, the result should be very similar, but now the catalog + // has the table/columns added, so it will excercise a different code path: + let result = WriteValidator::initialize(namespace.clone(), Arc::clone(&catalog), 0)? + .v1_parse_lines_and_update_schema( + "cpu,tag1=foo val1=\"bar\" 1235", + false, + Time::from_timestamp_nanos(0), + Precision::Auto, + )? + .convert_lines_to_buffer(Gen1Duration::new_5m()); + + println!("result: {result:?}"); + assert_eq!(result.line_count, 1); + assert_eq!(result.field_count, 1); + assert_eq!(result.index_count, 1); + assert!(result.errors.is_empty()); + + // Validate another write, this time adding a new field: + let result = WriteValidator::initialize(namespace.clone(), catalog, 0)? + .v1_parse_lines_and_update_schema( + "cpu,tag1=foo val1=\"bar\",val2=false 1236", + false, + Time::from_timestamp_nanos(0), + Precision::Auto, + )? + .convert_lines_to_buffer(Gen1Duration::new_5m()); + + println!("result: {result:?}"); + assert_eq!(result.line_count, 1); + assert_eq!(result.field_count, 2); + assert_eq!(result.index_count, 1); + assert!(result.errors.is_empty()); + Ok(()) } }