Skip to content

Commit

Permalink
refactor: record tenant_id and request_id from header in the trac…
Browse files Browse the repository at this point in the history
…e span (#139)
  • Loading branch information
Chethan-rao authored Dec 27, 2024
1 parent 09e7ac7 commit e2c964c
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 99 deletions.
4 changes: 2 additions & 2 deletions config/development.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ log_format = "default"

[server]
host = "127.0.0.1"
port = 8080
port = 3001

[limit]
request_count = 1
Expand Down Expand Up @@ -54,7 +54,7 @@ Sau7H1Bhzy5G7rwt05LNpU6nFcAGVaZtzl4/+FYfYIulubYjuSEh72yuBHHyvi1/
"""

[tenant_secrets]
hyperswitch = { master_key = "feffe9928665731c6d6a8f9467308308feffe9928665731c6d6a8f9467308308", public_key = """
public = { master_key = "feffe9928665731c6d6a8f9467308308feffe9928665731c6d6a8f9467308308", public_key = """
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA5Z/K0JWds8iHhWCa+rj0
rhOQX1nVs/ArQ1D0vh3UlSPR2vZUTrkdP7i3amv4d2XDC+3+5/YWExTkpxqnfl1T
Expand Down
2 changes: 1 addition & 1 deletion src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ where

let router = router.layer(
tower_trace::TraceLayer::new_for_http()
.make_span_with(|request: &Request<_>| utils::record_tenant_id_from_header(request))
.make_span_with(|request: &Request<_>| utils::record_fields_from_header(request))
.on_request(tower_trace::DefaultOnRequest::new().level(tracing::Level::INFO))
.on_response(
tower_trace::DefaultOnResponse::new()
Expand Down
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub enum ApiError {
#[error("Failed while decoding data")]
DecodingError,

#[error("Failed while inserting data into \"{0}\"")]
#[error("Failed while inserting data into {0}")]
DatabaseInsertFailed(&'static str),

#[error("failed while deleting data from {0}")]
Expand Down
94 changes: 48 additions & 46 deletions src/logger/formatter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ const TIME: &str = "time";
pub static IMPLICIT_KEYS: Lazy<rustc_hash::FxHashSet<&str>> = Lazy::new(|| {
let mut set = rustc_hash::FxHashSet::default();

set.insert(MESSAGE);
set.insert(HOSTNAME);
set.insert(PID);
set.insert(LEVEL);
Expand Down Expand Up @@ -118,11 +117,22 @@ where
pub fn new_with_implicit_entries(
service: &str,
dst_writer: W,
default_fields: HashMap<String, Value>,
mut default_fields: HashMap<String, Value>,
) -> Self {
let pid = std::process::id();
let hostname = gethostname::gethostname().to_string_lossy().into_owned();
let service = service.to_string();
default_fields.retain(|key, value| {
if !IMPLICIT_KEYS.contains(key.as_str()) {
true
} else {
eprintln!(
"Attempting to log a reserved entry. It won't be added to the logs. key: {:?}, value: {:?}",
key, value
);
false
}
});

Self {
dst_writer,
Expand All @@ -138,15 +148,15 @@ where
&self,
map_serializer: &mut impl SerializeMap<Error = serde_json::Error>,
metadata: &Metadata<'_>,
_span: Option<&SpanRef<'_, S>>,
storage: Option<&Storage<'_>>,
span: Option<&SpanRef<'_, S>>,
storage: &Storage<'_>,
name: &str,
message: &str,
) -> Result<(), std::io::Error>
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
map_serializer.serialize_entry(MESSAGE, &message)?;
let is_extra = |s: &str| !IMPLICIT_KEYS.contains(s);

map_serializer.serialize_entry(HOSTNAME, &self.hostname)?;
map_serializer.serialize_entry(PID, &self.pid)?;
map_serializer.serialize_entry(LEVEL, &format_args!("{}", metadata.level()))?;
Expand All @@ -163,19 +173,30 @@ where

// Write down implicit default entries.
for (key, value) in self.default_fields.iter() {
if !IMPLICIT_KEYS.contains(key.as_str()) {
map_serializer.serialize_entry(key, value)?;
} else {
tracing::warn!("{} is a reserved field. Skipping it.", key);
}
map_serializer.serialize_entry(key, value)?;
}

let mut explicit_entries_set: HashSet<&str> = HashSet::default();
// Write down explicit event's entries.
if let Some(storage) = storage {
for (key, value) in storage.values.iter() {
map_serializer.serialize_entry(key, value)?;
explicit_entries_set.insert(key);

for (key, value) in storage.values.iter() {
map_serializer.serialize_entry(key, value)?;
explicit_entries_set.insert(key);
}

// Write down entries from the span, if it exists.
if let Some(span) = &span {
let extensions = span.extensions();
if let Some(visitor) = extensions.get::<Storage<'_>>() {
for (key, value) in &visitor.values {
if is_extra(key) && !explicit_entries_set.contains(key) {
map_serializer.serialize_entry(key, value)?;
} else {
tracing::warn!(
"Attempting to log a reserved entry. It won't be added to the logs. key: {key:?}, value: {value:?}"
);
}
}
}
}

Expand Down Expand Up @@ -205,14 +226,15 @@ where
let mut serializer = serde_json::Serializer::new(&mut buffer);
let mut map_serializer = serializer.serialize_map(None)?;
let message = Self::span_message(span, ty);
let mut storage = Storage::default();
storage.record_value(MESSAGE, message.into());

self.common_serialize(
&mut map_serializer,
span.metadata(),
Some(span),
None,
&storage,
span.name(),
&message,
)?;

map_serializer.end()?;
Expand All @@ -236,16 +258,9 @@ where
event.record(&mut storage);

let name = span.map_or("?", SpanRef::name);
let message = Self::event_message(span, event, &storage);
Self::event_message(span, event, &mut storage);

self.common_serialize(
&mut map_serializer,
event.metadata(),
*span,
Some(&storage),
name,
&message,
)?;
self.common_serialize(&mut map_serializer, event.metadata(), *span, &storage, name)?;

map_serializer.end()?;
Ok(buffer)
Expand All @@ -271,32 +286,19 @@ where
fn event_message<S>(
span: &Option<&SpanRef<'_, S>>,
event: &Event<'_>,
storage: &Storage<'_>,
) -> String
where
storage: &mut Storage<'_>,
) where
S: Subscriber + for<'a> LookupSpan<'a>,
{
// Get value of kept "message" or "target" if does not exist.
let mut message = storage
let message = storage
.values
.get("message")
.and_then(|v| match v {
Value::String(s) => Some(s.as_str()),
_ => None,
})
.unwrap_or_else(|| event.metadata().target())
.to_owned();
.entry(MESSAGE)
.or_insert_with(|| event.metadata().target().into());

// Prepend the span name to the message if span exists.
if let Some(span) = span {
message = format!(
"{} {}",
Self::span_message(span, RecordType::Event),
message,
);
if let (Some(span), Value::String(a)) = (span, message) {
*a = format!("{} {}", Self::span_message(span, RecordType::Event), a,);
}

message
}
}

Expand Down
33 changes: 17 additions & 16 deletions src/logger/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,19 @@ pub struct Storage<'a> {
pub values: HashMap<&'a str, serde_json::Value>,
}

impl Storage<'_> {
impl<'a> Storage<'a> {
/// Default constructor.
pub fn new() -> Self {
Self::default()
}

pub fn record_value(&mut self, key: &'a str, value: serde_json::Value) {
if super::formatter::IMPLICIT_KEYS.contains(key) {
tracing::warn!("{key} is a reserved entry. Skipping it. value: {value}");
} else {
self.values.insert(key, value);
}
}
}

/// Default constructor.
Expand All @@ -43,32 +51,27 @@ impl Default for Storage<'_> {
impl Visit for Storage<'_> {
/// A i64.
fn record_i64(&mut self, field: &Field, value: i64) {
self.values
.insert(field.name(), serde_json::Value::from(value));
self.record_value(field.name(), serde_json::Value::from(value));
}

/// A u64.
fn record_u64(&mut self, field: &Field, value: u64) {
self.values
.insert(field.name(), serde_json::Value::from(value));
self.record_value(field.name(), serde_json::Value::from(value));
}

/// A 64-bit floating point.
fn record_f64(&mut self, field: &Field, value: f64) {
self.values
.insert(field.name(), serde_json::Value::from(value));
self.record_value(field.name(), serde_json::Value::from(value));
}

/// A boolean.
fn record_bool(&mut self, field: &Field, value: bool) {
self.values
.insert(field.name(), serde_json::Value::from(value));
self.record_value(field.name(), serde_json::Value::from(value));
}

/// A string.
fn record_str(&mut self, field: &Field, value: &str) {
self.values
.insert(field.name(), serde_json::Value::from(value));
self.record_value(field.name(), serde_json::Value::from(value));
}

/// Otherwise.
Expand All @@ -77,12 +80,10 @@ impl Visit for Storage<'_> {
// Skip fields which are already handled
name if name.starts_with("log.") => (),
name if name.starts_with("r#") => {
self.values
.insert(&name[2..], serde_json::Value::from(format!("{value:?}")));
self.record_value(&name[2..], serde_json::Value::from(format!("{value:?}")));
}
name => {
self.values
.insert(name, serde_json::Value::from(format!("{value:?}")));
self.record_value(name, serde_json::Value::from(format!("{value:?}")));
}
};
}
Expand Down Expand Up @@ -148,7 +149,7 @@ impl<S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>> Layer
.expect("No visitor in extensions");

if let Ok(elapsed) = serde_json::to_value(elapsed_milliseconds) {
visitor.values.insert("elapsed_milliseconds", elapsed);
visitor.record_value("elapsed_milliseconds", elapsed);
}
}
}
2 changes: 2 additions & 0 deletions src/storage/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ pub const ID_LENGTH: usize = 20;

/// Header key for tenant ID
pub const X_TENANT_ID: &str = "x-tenant-id";
/// Header key for request ID
pub const X_REQUEST_ID: &str = "x-request-id";
/// Header Constants
pub mod headers {
pub const CONTENT_TYPE: &str = "Content-Type";
Expand Down
54 changes: 34 additions & 20 deletions src/storage/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,18 @@ impl MerchantInterface for Storage {
.filter(schema::merchant::merchant_id.eq(merchant_id))
.get_result(&mut conn)
.await;

let output = match output {
Err(err) => match err {
diesel::result::Error::NotFound => {
Err(err).change_error(error::StorageError::NotFoundError)
}
_ => Err(err).change_error(error::StorageError::FindError),
},
Ok(merchant) => Ok(merchant),
};

output
.map_err(|error| match error {
diesel::result::Error::NotFound => error::StorageError::NotFoundError,
_ => error::StorageError::FindError,
})
.map_err(error::ContainerError::from)
.map_err(From::from)
.and_then(|inner| Ok(inner.decrypt(key)?))
}
Expand Down Expand Up @@ -142,14 +148,17 @@ impl LockerInterface for Storage {
.get_result(&mut conn)
.await;

output
.map_err(|error| match error {
diesel::result::Error::NotFound => error::StorageError::NotFoundError,
_ => error::StorageError::FindError,
})
.map_err(error::ContainerError::from)
.map_err(From::from)
.map(|inner| inner.into())
let output = match output {
Err(err) => match err {
diesel::result::Error::NotFound => {
Err(err).change_error(error::StorageError::NotFoundError)
}
_ => Err(err).change_error(error::StorageError::FindError),
},
Ok(locker) => Ok(locker),
};

output.map_err(From::from).map(From::from)
}

async fn find_by_hash_id_merchant_id_customer_id(
Expand Down Expand Up @@ -388,13 +397,18 @@ impl super::EntityInterface for Storage {
.filter(schema::entity::entity_id.eq(entity_id))
.get_result(&mut conn)
.await;
output
.map_err(|error| match error {
diesel::result::Error::NotFound => error::StorageError::NotFoundError,
_ => error::StorageError::FindError,
})
.map_err(error::ContainerError::from)
.map_err(From::from)

let output = match output {
Err(err) => match err {
diesel::result::Error::NotFound => {
Err(err).change_error(error::StorageError::NotFoundError)
}
_ => Err(err).change_error(error::StorageError::FindError),
},
Ok(entity) => Ok(entity),
};

output.map_err(From::from)
}

async fn insert_entity(
Expand Down
Loading

0 comments on commit e2c964c

Please sign in to comment.