Skip to content

Commit

Permalink
Improved traces
Browse files Browse the repository at this point in the history
  • Loading branch information
kenstott committed Sep 18, 2024
1 parent 9c6ba36 commit 5166b7e
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 22 deletions.
10 changes: 5 additions & 5 deletions crates/connectors/ndc-calcite/src/calcite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use ndc_models::{FieldName, RowFieldValue};
use ndc_sdk::connector::QueryError;
use opentelemetry::trace::{TraceContextExt};
use serde_json::Value;
use tracing::{event, Level, Span};
use tracing::{event, Level};
use tracing_opentelemetry::OpenTelemetrySpanExt;

use ndc_calcite_schema::jvm::get_jvm;
Expand Down Expand Up @@ -130,10 +130,10 @@ pub fn connector_query(
) -> Result<Vec<Row>, QueryError> {

// This method of retrieving current span context is not working!!!
let new_span = Span::current();
let span_context = new_span.context().span().span_context().clone();
let trace_id = span_context.trace_id();
let span_id = span_context.span_id();
let span = tracing::Span::current();
let otel_context = span.context();
let span_id = otel_context.span().span_context().span_id();
let trace_id = otel_context.span().span_context().trace_id();

let jvm = get_jvm().lock().unwrap();
let mut java_env = jvm.attach_current_thread().unwrap();
Expand Down
57 changes: 40 additions & 17 deletions crates/connectors/ndc-calcite/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@
//! Aggregate are generated as additional queries, and stitched into the
//! RowSet aggregates response.
use std::collections::BTreeMap;

use indexmap::IndexMap;
use ndc_models::{ArgumentName, CollectionName, ComparisonOperatorName, ComparisonTarget, ComparisonValue, Expression, Field, FieldName, Query, Relationship, RelationshipArgument, RelationshipName, RelationshipType, RowFieldValue, VariableName};
use ndc_sdk::connector::QueryError;
use ndc_sdk::models;
use serde_json::{Number, Value};
use tracing::{event, Level};
use tracing::{event, Level, span};

use crate::calcite::{connector_query, Row};
use ndc_calcite_schema::version5::ParsedConfiguration;

use crate::calcite::{connector_query, Row};
use crate::connector::calcite::CalciteState;
use crate::sql;

Expand All @@ -36,7 +38,7 @@ pub struct QueryParams<'a> {
pub query: &'a Query,
pub vars: &'a BTreeMap<VariableName, Value>,
pub state: &'a CalciteState,
pub explain: &'a bool
pub explain: &'a bool,
}

/// A struct representing the components of a query.
Expand Down Expand Up @@ -95,7 +97,7 @@ pub struct QueryComponents {
/// }
/// }
/// ```
#[tracing::instrument(skip(query_params), level=Level::INFO)]
#[tracing::instrument(skip(query_params), level = Level::INFO)]
pub fn orchestrate_query(
query_params: QueryParams
) -> Result<models::RowSet, QueryError> {
Expand Down Expand Up @@ -132,7 +134,7 @@ pub fn orchestrate_query(
return Ok(models::RowSet { aggregates: parsed_aggregates, rows: rows_data });
}

#[tracing::instrument(skip(rows_data, sub_relationship), level=Level::DEBUG)]
#[tracing::instrument(skip(rows_data, sub_relationship), level = Level::DEBUG)]
fn generate_value_from_rows(rows_data: &Vec<Row>, sub_relationship: &Relationship) -> Result<Value, QueryError> {
let relationship_value: Value = rows_data.into_iter().map(|row| {
let mut row_values: Vec<Value> = Vec::new();
Expand All @@ -152,7 +154,7 @@ fn generate_value_from_rows(rows_data: &Vec<Row>, sub_relationship: &Relationshi
Ok(relationship_value)
}

#[tracing::instrument(skip(sub_relationship), level=Level::DEBUG)]
#[tracing::instrument(skip(sub_relationship), level = Level::DEBUG)]
fn parse_relationship(sub_relationship: &Relationship) -> Result<(Vec<(FieldName, FieldName)>, Vec<&FieldName>, RelationshipType), QueryError> {
let pks: Vec<(FieldName, FieldName)> = sub_relationship.column_mapping
.iter()
Expand All @@ -166,13 +168,13 @@ fn parse_relationship(sub_relationship: &Relationship) -> Result<(Vec<(FieldName
Ok((pks, fks, relationship_type))
}

#[tracing::instrument(skip(params, query_components), level=Level::INFO)]
#[tracing::instrument(skip(params, query_components), level = Level::INFO)]
fn process_rows(params: QueryParams, query_components: &QueryComponents) -> Result<Option<Vec<Row>>, QueryError> {
execute_query_collection(params, query_components, query_components.select.clone())
}


#[tracing::instrument(skip(params, query_components), level=Level::INFO)]
#[tracing::instrument(skip(params, query_components), level = Level::INFO)]
fn process_aggregates(params: QueryParams, query_components: &QueryComponents) -> Result<Option<IndexMap<FieldName, Value>>, QueryError> {
match execute_query_collection(params, query_components, query_components.aggregates.clone()) {
Ok(collection_option) => {
Expand Down Expand Up @@ -202,7 +204,7 @@ fn process_aggregates(params: QueryParams, query_components: &QueryComponents) -
}
}

#[tracing::instrument(skip(pks, value), level=Level::INFO)]
#[tracing::instrument(skip(pks, value), level = Level::INFO)]
fn generate_predicate(pks: &Vec<(FieldName, FieldName)>, value: Value) -> Result<Expression, QueryError> {
let (_, name) = pks[0].clone();
Ok(Expression::BinaryComparisonOperator {
Expand All @@ -216,7 +218,7 @@ fn generate_predicate(pks: &Vec<(FieldName, FieldName)>, value: Value) -> Result
})
}

#[tracing::instrument(skip(query, predicate, pks), level=Level::INFO)]
#[tracing::instrument(skip(query, predicate, pks), level = Level::INFO)]
fn revise_query(query: Box<Query>, predicate: Expression, pks: &Vec<(FieldName, FieldName)>) -> Result<Box<Query>, QueryError> {
let mut revised_query = query.clone();
revised_query.predicate = Some(predicate);
Expand All @@ -237,7 +239,9 @@ fn revise_query(query: Box<Query>, predicate: Expression, pks: &Vec<(FieldName,
Ok(revised_query)
}

#[tracing::instrument(skip(params, arguments, sub_relationship, revised_query), level=Level::INFO)]
#[tracing::instrument(
skip(params, arguments, sub_relationship, revised_query), level = Level::INFO
)]
fn execute_query(params: QueryParams, arguments: &BTreeMap<ArgumentName, RelationshipArgument>, sub_relationship: &Relationship, revised_query: &Query) -> Result<Vec<Row>, QueryError> {
let fk_rows = orchestrate_query(QueryParams {
config: params.config,
Expand All @@ -247,12 +251,12 @@ fn execute_query(params: QueryParams, arguments: &BTreeMap<ArgumentName, Relatio
query: revised_query,
vars: params.vars,
state: params.state,
explain: params.explain
explain: params.explain,
})?;
Ok(fk_rows.rows.unwrap())
}

#[tracing::instrument(skip(rows, field_name, fk_rows, pks, fks), level=Level::INFO)]
#[tracing::instrument(skip(rows, field_name, fk_rows, pks, fks), level = Level::INFO)]
fn process_object_relationship(rows: Vec<Row>, field_name: &FieldName, fk_rows: &Vec<Row>, pks: &Vec<(FieldName, FieldName)>, fks: &Vec<&FieldName>) -> Result<Option<Vec<Row>>, QueryError> {
let modified_rows: Vec<Row> = rows.clone().into_iter().map(|mut row| {
event!(Level::DEBUG, "fk_rows: {:?}, row: {:?}, field_name: {:?}", serde_json::to_string_pretty(&fk_rows), serde_json::to_string_pretty(&row), field_name);
Expand Down Expand Up @@ -285,7 +289,7 @@ fn process_object_relationship(rows: Vec<Row>, field_name: &FieldName, fk_rows:
Ok(Some(modified_rows))
}

#[tracing::instrument(skip(rows, field_name, fk_rows, pks, fks, query), level=Level::DEBUG)]
#[tracing::instrument(skip(rows, field_name, fk_rows, pks, fks, query), level = Level::DEBUG)]
fn process_array_relationship(rows: Option<Vec<Row>>, field_name: &FieldName, fk_rows: &Vec<Row>, pks: &Vec<(FieldName, FieldName)>, fks: &Vec<&FieldName>, query: &Query) -> Result<Option<Vec<Row>>, QueryError> {
let modified_rows: Vec<Row> = rows.clone().unwrap().into_iter().map(|mut row| {
event!(Level::DEBUG, "fk_rows: {:?}, row: {:?}, field_name: {:?}", serde_json::to_string_pretty(&fk_rows), serde_json::to_string_pretty(&row), field_name);
Expand Down Expand Up @@ -319,7 +323,7 @@ fn process_array_relationship(rows: Option<Vec<Row>>, field_name: &FieldName, fk
Ok(Some(modified_rows))
}

#[tracing::instrument(skip(child_rows, rowset, value), level=Level::DEBUG)]
#[tracing::instrument(skip(child_rows, rowset, value), level = Level::DEBUG)]
fn process_child_rows(child_rows: &Vec<&Row>, mut rowset: serde_json::map::Map<String, Value>, value: &mut RowFieldValue) -> Result<(), QueryError> {
rowset.insert("aggregates".to_string(), Value::Null);
if !child_rows.is_empty() {
Expand All @@ -340,16 +344,35 @@ fn process_child_rows(child_rows: &Vec<&Row>, mut rowset: serde_json::map::Map<S
Ok(())
}

#[tracing::instrument(skip(params, query_components), level=Level::INFO)]
#[tracing::instrument(
fields(internal.visibility = "user"), skip(params, query_components, phrase), level = Level::INFO
)]
fn execute_query_collection(
params: QueryParams,
query_components: &QueryComponents,
phrase: Option<String>
phrase: Option<String>,
) -> Result<Option<Vec<Row>>, QueryError> {
if phrase.is_none() || phrase.clone().unwrap().is_empty() {
return Ok(None);
}

let span = span!(tracing::Level::INFO, "query_collection_span", collection = params.coll.to_string(), explain = params.explain, internal_visibility="user");

// Parent span attach to the current context
let _enter = span.enter();
match params.query.clone().fields {
Some(fields) => {
// Create sub-span for each field attribute
for field in fields.keys() {
let sub_span = span!(tracing::Level::INFO, "field_span", field_attribute = format!("{}.{}", params.coll.to_string(), field));
let _enter_sub_span = sub_span.enter();
}
}
None => {
// Handle the 'None' case here
}
}

let q = sql::query_collection(
params.config,
params.coll,
Expand Down

0 comments on commit 5166b7e

Please sign in to comment.