From 668ec3469fdd36655dc6a76eb48476a4481b4c30 Mon Sep 17 00:00:00 2001 From: khuzema786 Date: Thu, 28 Nov 2024 03:34:08 +0530 Subject: [PATCH] backend/feat: Bus Tracking --- Cargo.lock | 71 ++-- crates/location_tracking_service/Cargo.toml | 7 +- .../src/common/types.rs | 35 +- .../src/domain/action/internal/location.rs | 32 ++ .../src/domain/action/internal/ride.rs | 10 +- .../src/domain/action/ui/location.rs | 344 +++++++++++------- .../src/domain/api/internal/location.rs | 10 + .../src/domain/api/mod.rs | 3 +- .../src/domain/types/internal/location.rs | 16 + .../src/domain/types/internal/ride.rs | 1 + .../src/kafka/types.rs | 3 +- .../src/redis/commands.rs | 68 +++- .../src/redis/keys.rs | 8 + crates/tests/Cargo.toml | 2 +- .../dev/location_tracking_service.dhall | 4 +- flake.lock | 48 +-- 16 files changed, 415 insertions(+), 247 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 25857d1b..20f8f53f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -354,15 +354,9 @@ dependencies = [ [[package]] name = "arc-swap" -version = "1.6.0" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" - -[[package]] -name = "arcstr" -version = "1.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f907281554a3d0312bb7aab855a8e0ef6cbf1614d06de54105039ca8b34460e" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" [[package]] name = "arrayvec" @@ -552,9 +546,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.5.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" [[package]] name = "bytes-utils" @@ -703,6 +697,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.18" @@ -967,31 +970,42 @@ dependencies = [ [[package]] name = "fred" -version = "6.3.2" +version = "9.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a15cc18b56395b8b15ffcdcea7fe8586e3a3ccb3d9dc3b9408800d9814efb08e" +checksum = "3cdd5378252ea124b712e0ac55147d26ae3af575883b34b8423091a4c719606b" dependencies = [ "arc-swap", - "arcstr", "async-trait", "bytes", "bytes-utils", - "cfg-if", + "crossbeam-queue", "float-cmp", + "fred-macros", "futures", - "lazy_static", "log", "parking_lot", "rand", "redis-protocol", "semver", - "sha-1", + "socket2", "tokio", "tokio-stream", "tokio-util", "tracing", "tracing-futures", "url", + "urlencoding", +] + +[[package]] +name = "fred-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1458c6e22d36d61507034d5afecc64f105c1d39712b7ac6ec3b352c423f715cc" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.43", ] [[package]] @@ -1615,7 +1629,7 @@ checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" [[package]] name = "macros" version = "0.1.0" -source = "git+https://github.com/nammayatri/shared-kernel-rs?rev=2d6cf2e#2d6cf2e7fc4af33482ead486aa8b3a96f0598d54" +source = "git+https://github.com/nammayatri/shared-kernel-rs?rev=f8d80ea#f8d80ea95464bb072cd1f33c28de71e2eb065102" dependencies = [ "proc-macro2", "quote", @@ -2213,9 +2227,9 @@ dependencies = [ [[package]] name = "redis-protocol" -version = "4.1.0" +version = "5.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c31deddf734dc0a39d3112e73490e88b61a05e83e074d211f348404cee4d2c6" +checksum = "65deb7c9501fbb2b6f812a30d59c0253779480853545153a51d8e9e444ddc99f" dependencies = [ "bytes", "bytes-utils", @@ -2502,17 +2516,6 @@ dependencies = [ "serde", ] -[[package]] -name = "sha-1" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" -dependencies = [ - "cfg-if", - "cpufeatures", - "digest", -] - [[package]] name = "sha1" version = "0.10.6" @@ -2547,7 +2550,7 @@ dependencies = [ [[package]] name = "shared" version = "0.1.0" -source = "git+https://github.com/nammayatri/shared-kernel-rs?rev=2d6cf2e#2d6cf2e7fc4af33482ead486aa8b3a96f0598d54" +source = "git+https://github.com/nammayatri/shared-kernel-rs?rev=f8d80ea#f8d80ea95464bb072cd1f33c28de71e2eb065102" dependencies = [ "actix-web", "actix-web-prom", @@ -3179,6 +3182,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "uuid" version = "1.6.1" diff --git a/crates/location_tracking_service/Cargo.toml b/crates/location_tracking_service/Cargo.toml index 9bb37609..cfb565c6 100644 --- a/crates/location_tracking_service/Cargo.toml +++ b/crates/location_tracking_service/Cargo.toml @@ -19,7 +19,7 @@ serde_dhall = "0.12.1" chrono = { version = "0.4", features = ["serde"] } log = "0.4.14" tokio = "1.29.1" -fred = { version = "6.0.0", features = ["metrics", "partial-tracing"] } +fred = { version = "9.4.0", features = ["metrics", "partial-tracing", "i-geo", "i-cluster", "i-client"] } reqwest = {version = "0.11.18", features = ["json"]} futures = "0.3.28" rand = "0.8.5" @@ -43,8 +43,9 @@ tracing-appender = "0.2.2" tracing-subscriber = { version = "0.3.16", features = ["env-filter", "registry", "json"] } prometheus = { version = "0.13.3", features = ["process"] } -shared = { git = "https://github.com/nammayatri/shared-kernel-rs", rev = "2d6cf2e" } -macros = { git = "https://github.com/nammayatri/shared-kernel-rs", rev = "2d6cf2e" } +shared = { git = "https://github.com/nammayatri/shared-kernel-rs", rev = "f8d80ea" } +# shared = { version = "0.1.0", path = "/Users/khuzema.khomosi/Documents/shared-kernel-rs/crates/shared" } +macros = { git = "https://github.com/nammayatri/shared-kernel-rs", rev = "f8d80ea" } [dev-dependencies] pprof = { version = "0.12", features = ["flamegraph"] } diff --git a/crates/location_tracking_service/src/common/types.rs b/crates/location_tracking_service/src/common/types.rs index 65719f10..a8151b25 100644 --- a/crates/location_tracking_service/src/common/types.rs +++ b/crates/location_tracking_service/src/common/types.rs @@ -1,5 +1,3 @@ -use std::collections::VecDeque; - /* Copyright 2022-23, Juspay India Pvt Ltd This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program @@ -12,6 +10,7 @@ use fred::types::GeoValue; use geo::MultiPolygon; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; +use std::collections::VecDeque; use strum_macros::{Display, EnumIter, EnumString}; #[derive(Deserialize, Serialize, Clone, Debug, Eq, PartialEq)] @@ -99,6 +98,22 @@ pub enum VehicleType { #[strum(serialize = "AMBULANCE_VENTILATOR")] #[serde(rename = "AMBULANCE_VENTILATOR")] AmbulanceVentilator, + #[strum(serialize = "BUS_AC")] + #[serde(rename = "BUS_AC")] + BusAc, + #[strum(serialize = "BUS_NON_AC")] + #[serde(rename = "BUS_NON_AC")] + BusNonAc, +} + +#[derive(Deserialize, Serialize, Clone, Debug, Display, Eq, PartialEq)] +#[serde(rename_all = "camelCase")] +pub enum RideInfo { + #[serde(rename_all = "camelCase")] + Bus { + route_code: String, + bus_number: String, + }, } #[derive(Debug, Clone, EnumString, Display, Serialize, Deserialize, Eq, Hash, PartialEq)] @@ -150,9 +165,7 @@ pub struct MultiPolygonBody { pub struct RideDetails { pub ride_id: RideId, pub ride_status: RideStatus, - pub vehicle_number: Option, - pub ride_start_otp: Option, - pub estimated_pickup_distance: Option, + pub ride_info: Option, } #[derive(Deserialize, Serialize, Clone, Debug, Eq, PartialEq)] @@ -211,3 +224,15 @@ pub struct DriverAllDetails { pub struct RideBookingDetails { pub driver_last_known_location: DriverLastKnownLocation, } + +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct VehicleTrackingInfo { + pub start_time: Option, + pub schedule_relationship: Option, + pub trip_id: Option, + pub latitude: Latitude, + pub longitude: Longitude, + pub speed: Option, + pub timestamp: TimeStamp, +} diff --git a/crates/location_tracking_service/src/domain/action/internal/location.rs b/crates/location_tracking_service/src/domain/action/internal/location.rs index 90f3cfc6..3bde2d9f 100644 --- a/crates/location_tracking_service/src/domain/action/internal/location.rs +++ b/crates/location_tracking_service/src/domain/action/internal/location.rs @@ -6,6 +6,8 @@ the GNU Affero General Public License along with this program. If not, see . */ #![allow(clippy::all)] +use std::collections::HashMap; + use crate::tools::error::AppError; use crate::{ common::{ @@ -207,3 +209,33 @@ pub async fn driver_block_till( }; Ok(APISuccess::default()) } + +#[macros::measure_duration] +pub async fn track_vehicles( + data: Data, + request_body: TrackVehicleRequest, +) -> Result, AppError> { + let track_vehicle_info = match request_body { + TrackVehicleRequest::RouteCode(route_code) => { + get_route_location(&data.redis, &route_code).await? + } + TrackVehicleRequest::TripCodes(trip_codes) => { + let mut track_vehicles_info = HashMap::new(); + for trip_code in trip_codes { + let location = get_trip_location(&data.redis, &trip_code).await?; + for (vehicle_number, vehicle_info) in location.into_iter() { + track_vehicles_info.insert(vehicle_number, vehicle_info); + } + } + track_vehicles_info + } + }; + + Ok(track_vehicle_info + .into_iter() + .map(|(vehicle_number, vehicle_info)| TrackVehicleResponse { + vehicle_number, + vehicle_info, + }) + .collect()) +} diff --git a/crates/location_tracking_service/src/domain/action/internal/ride.rs b/crates/location_tracking_service/src/domain/action/internal/ride.rs index 583e436e..fcc3f571 100644 --- a/crates/location_tracking_service/src/domain/action/internal/ride.rs +++ b/crates/location_tracking_service/src/domain/action/internal/ride.rs @@ -24,9 +24,7 @@ pub async fn ride_create( &request_body.driver_id, ride_id.to_owned(), RideStatus::NEW, - Some(request_body.vehicle_number), - Some(request_body.ride_start_otp), - Some(request_body.estimated_pickup_distance), + None, ) .await?; @@ -51,9 +49,7 @@ pub async fn ride_start( &request_body.driver_id, ride_id, RideStatus::INPROGRESS, - None, - None, - None, + request_body.ride_info, ) .await?; @@ -138,8 +134,6 @@ pub async fn ride_details( request_body.ride_id.to_owned(), request_body.ride_status, None, - None, - None, ) .await?; diff --git a/crates/location_tracking_service/src/domain/action/ui/location.rs b/crates/location_tracking_service/src/domain/action/ui/location.rs index fbe0ce16..4ef9d350 100644 --- a/crates/location_tracking_service/src/domain/action/ui/location.rs +++ b/crates/location_tracking_service/src/domain/action/ui/location.rs @@ -101,8 +101,8 @@ pub async fn update_driver_location( let (driver_id, merchant_id, merchant_operating_city_id) = if var("DEV").is_ok() { ( DriverId(token.to_owned().inner()), - MerchantId(token.to_owned().inner()), - MerchantOperatingCityId(token.to_owned().inner()), + MerchantId("dev".to_string()), + MerchantOperatingCityId("dev".to_string()), ) } else { get_driver_id_from_authentication( @@ -237,6 +237,11 @@ async fn process_driver_locations( .as_ref() .map(|ride_details| ride_details.ride_status.to_owned()); + let driver_ride_info = driver_ride_details + .as_ref() + .map(|ride_details| ride_details.ride_info.to_owned()) + .flatten(); + let TimeStamp(latest_driver_location_ts) = latest_driver_location.ts; let current_ts = Utc::now(); let latest_driver_location_ts = if latest_driver_location_ts > current_ts { @@ -249,8 +254,6 @@ async fn process_driver_locations( TimeStamp(latest_driver_location_ts) }; - let mut all_tasks: Vec>>>> = Vec::new(); - // let travelled_distance = if driver_ride_status == Some(RideStatus::NEW) // || driver_ride_status == Some(RideStatus::INPROGRESS) // { @@ -273,160 +276,221 @@ async fn process_driver_locations( // Meters(0) // }; - let (stop_detected, stop_detection) = detect_stop( - driver_ride_status.as_ref(), - driver_location_details - .as_ref() - .map(|driver_location_details| driver_location_details.stop_detection.to_owned()) - .flatten(), - DriverLocation { - location: latest_driver_location.pt.to_owned(), - timestamp: latest_driver_location.ts, - }, - latest_driver_location.v, - &data.stop_detection, - ); + let (locations, stop_detected) = match driver_ride_info { + Some(RideInfo::Bus { + route_code, + bus_number, + }) => { + let mut all_tasks: Vec>>>> = + Vec::new(); - let is_blacklist_for_special_zone = is_blacklist_for_special_zone( - &merchant_id, - &data.blacklist_merchants, - &latest_driver_location.pt.lat, - &latest_driver_location.pt.lon, - &data.blacklist_polygon, - ); + let set_route_location = async { + set_route_location( + &data.redis, + &route_code, + &bus_number, + &latest_driver_location.pt, + &latest_driver_location.v, + &latest_driver_location_ts, + ) + .await?; + Ok(()) + }; + all_tasks.push(Box::pin(set_route_location)); - if !is_blacklist_for_special_zone { - let send_driver_location_to_drainer = async { - let _ = &data - .sender - .send(( - Dimensions { - merchant_id: merchant_id.to_owned(), - city: city.to_owned(), - vehicle_type: vehicle_type.to_owned(), - created_at: Utc::now(), - }, - latest_driver_location.pt.lat, - latest_driver_location.pt.lon, - latest_driver_location_ts.to_owned(), - driver_id.to_owned(), - )) - .await - .map_err(|err| AppError::DrainerPushFailed(err.to_string()))?; - Ok(()) - }; - all_tasks.push(Box::pin(send_driver_location_to_drainer)); - } else { - warn!( - "Skipping GEOADD for special zone ({:?}) Driver Id : {:?}, Merchant Id : {:?}", - latest_driver_location.pt, driver_id, merchant_id - ); - } + let set_driver_last_location_update = async { + set_driver_last_location_update( + &data.redis, + &data.last_location_timstamp_expiry, + &driver_id, + &merchant_id, + &latest_driver_location.pt, + &latest_driver_location_ts, + &None, + None, + &driver_ride_status, + ) + .await?; + Ok(()) + }; + all_tasks.push(Box::pin(set_driver_last_location_update)); - let locations = if let Some(RideStatus::INPROGRESS) = driver_ride_status.as_ref() { - let locations = get_filtered_driver_locations( - driver_location_details - .map(|driver_location_details| driver_location_details.driver_last_known_location) - .as_ref(), - locations, - data.min_location_accuracy, - data.driver_location_accuracy_buffer, - ); - if !locations.is_empty() { - if locations.len() > data.batch_size as usize { - warn!( - "On Ride Way points more than {} points after filtering => {} points", - data.batch_size, - locations.len() - ); - } - locations - } else { join_all(all_tasks) .await .into_iter() .try_for_each(Result::from)?; - return Ok(()); - } - } else { - locations - }; - let set_driver_last_location_update = async { - set_driver_last_location_update( - &data.redis, - &data.last_location_timstamp_expiry, - &driver_id, - &merchant_id, - &latest_driver_location.pt, - &latest_driver_location_ts, - &None::, - stop_detection, - &driver_ride_status, - // travelled_distance.to_owned(), - ) - .await?; - Ok(()) - }; - all_tasks.push(Box::pin(set_driver_last_location_update)); - - if let (Some(RideStatus::INPROGRESS), Some(ride_id)) = - (driver_ride_status.as_ref(), driver_ride_id.as_ref()) - { - let geo_entries = locations - .iter() - .map(|loc| Point { - lat: loc.pt.lat, - lon: loc.pt.lon, - }) - .collect::>(); - - let on_ride_driver_locations_count = - get_on_ride_driver_locations_count(&data.redis, &driver_id.clone(), &merchant_id) - .await?; - - if on_ride_driver_locations_count + geo_entries.len() as i64 > data.batch_size { - let mut on_ride_driver_locations = get_on_ride_driver_locations_and_delete( - &data.redis, - &driver_id, + (locations, None) + } + None => { + let mut all_tasks: Vec>>>> = + Vec::new(); + + let (stop_detected, stop_detection) = detect_stop( + driver_ride_status.as_ref(), + driver_location_details + .as_ref() + .map(|driver_location_details| { + driver_location_details.stop_detection.to_owned() + }) + .flatten(), + DriverLocation { + location: latest_driver_location.pt.to_owned(), + timestamp: latest_driver_location.ts, + }, + latest_driver_location.v, + &data.stop_detection, + ); + + let is_blacklist_for_special_zone = is_blacklist_for_special_zone( &merchant_id, - on_ride_driver_locations_count, - ) - .await?; - on_ride_driver_locations.extend(geo_entries); - - let bulk_location_update_dobpp = async { - bulk_location_update_dobpp( - &data.bulk_location_callback_url, - ride_id.to_owned(), - driver_id.to_owned(), - on_ride_driver_locations, - ) - .await - .map_err(|err| AppError::DriverBulkLocationUpdateFailed(err.message()))?; - Ok(()) + &data.blacklist_merchants, + &latest_driver_location.pt.lat, + &latest_driver_location.pt.lon, + &data.blacklist_polygon, + ); + + if !is_blacklist_for_special_zone { + let send_driver_location_to_drainer = async { + let _ = &data + .sender + .send(( + Dimensions { + merchant_id: merchant_id.to_owned(), + city: city.to_owned(), + vehicle_type: vehicle_type.to_owned(), + created_at: Utc::now(), + }, + latest_driver_location.pt.lat, + latest_driver_location.pt.lon, + latest_driver_location_ts.to_owned(), + driver_id.to_owned(), + )) + .await + .map_err(|err| AppError::DrainerPushFailed(err.to_string()))?; + Ok(()) + }; + all_tasks.push(Box::pin(send_driver_location_to_drainer)); + } else { + warn!( + "Skipping GEOADD for special zone ({:?}) Driver Id : {:?}, Merchant Id : {:?}", + latest_driver_location.pt, driver_id, merchant_id + ); + } + + let locations = if let Some(RideStatus::INPROGRESS) = driver_ride_status.as_ref() { + let locations = get_filtered_driver_locations( + driver_location_details + .map(|driver_location_details| { + driver_location_details.driver_last_known_location + }) + .as_ref(), + locations, + data.min_location_accuracy, + data.driver_location_accuracy_buffer, + ); + if !locations.is_empty() { + if locations.len() > data.batch_size as usize { + warn!( + "On Ride Way points more than {} points after filtering => {} points", + data.batch_size, + locations.len() + ); + } + locations + } else { + join_all(all_tasks) + .await + .into_iter() + .try_for_each(Result::from)?; + return Ok(()); + } + } else { + locations }; - all_tasks.push(Box::pin(bulk_location_update_dobpp)); - } else { - let push_on_ride_driver_locations = async { - push_on_ride_driver_locations( + + let set_driver_last_location_update = async { + set_driver_last_location_update( &data.redis, + &data.last_location_timstamp_expiry, &driver_id, &merchant_id, - geo_entries, - &data.redis_expiry, + &latest_driver_location.pt, + &latest_driver_location_ts, + &None::, + stop_detection, + &driver_ride_status, + // travelled_distance.to_owned(), ) .await?; Ok(()) }; - all_tasks.push(Box::pin(push_on_ride_driver_locations)); - } - } + all_tasks.push(Box::pin(set_driver_last_location_update)); + + if let (Some(RideStatus::INPROGRESS), Some(ride_id)) = + (driver_ride_status.as_ref(), driver_ride_id.as_ref()) + { + let geo_entries = locations + .iter() + .map(|loc| Point { + lat: loc.pt.lat, + lon: loc.pt.lon, + }) + .collect::>(); - join_all(all_tasks) - .await - .into_iter() - .try_for_each(Result::from)?; + let on_ride_driver_locations_count = get_on_ride_driver_locations_count( + &data.redis, + &driver_id.clone(), + &merchant_id, + ) + .await?; + + if on_ride_driver_locations_count + geo_entries.len() as i64 > data.batch_size { + let mut on_ride_driver_locations = get_on_ride_driver_locations_and_delete( + &data.redis, + &driver_id, + &merchant_id, + on_ride_driver_locations_count, + ) + .await?; + on_ride_driver_locations.extend(geo_entries); + + let bulk_location_update_dobpp = async { + bulk_location_update_dobpp( + &data.bulk_location_callback_url, + ride_id.to_owned(), + driver_id.to_owned(), + on_ride_driver_locations, + ) + .await + .map_err(|err| AppError::DriverBulkLocationUpdateFailed(err.message()))?; + Ok(()) + }; + all_tasks.push(Box::pin(bulk_location_update_dobpp)); + } else { + let push_on_ride_driver_locations = async { + push_on_ride_driver_locations( + &data.redis, + &driver_id, + &merchant_id, + geo_entries, + &data.redis_expiry, + ) + .await?; + Ok(()) + }; + all_tasks.push(Box::pin(push_on_ride_driver_locations)); + } + } + + join_all(all_tasks) + .await + .into_iter() + .try_for_each(Result::from)?; + + (locations, stop_detected) + } + }; Arbiter::current().spawn(async move { // if driver_ride_status == Some(RideStatus::NEW) { diff --git a/crates/location_tracking_service/src/domain/api/internal/location.rs b/crates/location_tracking_service/src/domain/api/internal/location.rs index 896a15f1..5cd18c1e 100644 --- a/crates/location_tracking_service/src/domain/api/internal/location.rs +++ b/crates/location_tracking_service/src/domain/api/internal/location.rs @@ -53,3 +53,13 @@ async fn driver_block_till( Ok(Json(location::driver_block_till(data, request_body).await?)) } + +#[get("/internal/trackVehicles")] +async fn track_vehicles( + data: Data, + param_obj: Json, +) -> Result, AppError> { + let request_body = param_obj.into_inner(); + + Ok(Json(location::track_vehicles(data, request_body).await?)) +} diff --git a/crates/location_tracking_service/src/domain/api/mod.rs b/crates/location_tracking_service/src/domain/api/mod.rs index 5a4e0df7..7ac2b8d6 100644 --- a/crates/location_tracking_service/src/domain/api/mod.rs +++ b/crates/location_tracking_service/src/domain/api/mod.rs @@ -21,5 +21,6 @@ pub fn handler(config: &mut ServiceConfig) { .service(internal::ride::ride_details) .service(internal::ride::get_driver_locations) .service(internal::location::get_drivers_location) - .service(internal::location::driver_block_till); + .service(internal::location::driver_block_till) + .service(internal::location::track_vehicles); } diff --git a/crates/location_tracking_service/src/domain/types/internal/location.rs b/crates/location_tracking_service/src/domain/types/internal/location.rs index 39371ead..afc1f955 100644 --- a/crates/location_tracking_service/src/domain/types/internal/location.rs +++ b/crates/location_tracking_service/src/domain/types/internal/location.rs @@ -48,3 +48,19 @@ pub struct DriverBlockTillRequest { pub driver_id: DriverId, pub block_till: TimeStamp, } + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub enum TrackVehicleRequest { + RouteCode(String), + TripCodes(Vec), +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct TrackVehicleResponse { + pub vehicle_number: String, + pub vehicle_info: VehicleTrackingInfo, +} + +pub type TrackVehiclesResponse = Vec; diff --git a/crates/location_tracking_service/src/domain/types/internal/ride.rs b/crates/location_tracking_service/src/domain/types/internal/ride.rs index 82346c5e..aaf41700 100644 --- a/crates/location_tracking_service/src/domain/types/internal/ride.rs +++ b/crates/location_tracking_service/src/domain/types/internal/ride.rs @@ -24,6 +24,7 @@ pub struct RideCreateRequest { pub struct RideStartRequest { pub merchant_id: MerchantId, pub driver_id: DriverId, + pub ride_info: Option, } #[derive(Serialize, Debug)] diff --git a/crates/location_tracking_service/src/kafka/types.rs b/crates/location_tracking_service/src/kafka/types.rs index 219ec5c3..15b87b2d 100644 --- a/crates/location_tracking_service/src/kafka/types.rs +++ b/crates/location_tracking_service/src/kafka/types.rs @@ -38,5 +38,6 @@ pub struct LocationUpdate { pub vehicle_variant: VehicleType, pub is_stop_detected: Option, pub stop_lat: Option, - pub stop_lon: Option, // pub travelled_distance: Meters, + pub stop_lon: Option, + // pub travelled_distance: Meters, } diff --git a/crates/location_tracking_service/src/redis/commands.rs b/crates/location_tracking_service/src/redis/commands.rs index 24fd54de..f5878941 100644 --- a/crates/location_tracking_service/src/redis/commands.rs +++ b/crates/location_tracking_service/src/redis/commands.rs @@ -5,13 +5,14 @@ or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ -use crate::common::{types::*, utils::cat_maybes}; +use crate::common::types::*; use crate::redis::keys::*; use crate::tools::error::AppError; use fred::types::{GeoPosition, GeoUnit, SortOrder}; use futures::Future; use rustc_hash::FxHashSet; use shared::redis::types::{RedisConnectionPool, Ttl}; +use std::collections::HashMap; use tracing::{error, info}; /// Sets the ride details (i.e, rideId and rideStatus) to the Redis store. @@ -26,9 +27,7 @@ use tracing::{error, info}; /// * `driver_id` - The ID of the driver. /// * `ride_id` - The ID of the ride. /// * `ride_status` - The current status of the ride. -/// * `vehicle_number` - Driver's vehicle number. -/// * `ride_start_otp` - The otp to start the ride. -/// * `estimated_pickup_distance` - The estimated pickup distance from driver's current location to customer's pickup location. +/// * `ride_info` - Ride related details based on vehicle category. /// /// # Returns /// * A Result indicating the success or failure of the operation. @@ -40,16 +39,12 @@ pub async fn set_ride_details( driver_id: &DriverId, ride_id: RideId, ride_status: RideStatus, - vehicle_number: Option, - ride_start_otp: Option, - estimated_pickup_distance: Option, + ride_info: Option, ) -> Result<(), AppError> { let ride_details = RideDetails { ride_id, ride_status, - vehicle_number, - ride_start_otp, - estimated_pickup_distance, + ride_info, }; redis .set_key( @@ -208,7 +203,7 @@ pub async fn get_drivers_within_radius( .map(|bucket_idx| driver_loc_bucket_key(merchant_id, city, vehicle, &(bucket - bucket_idx))) .collect(); - let nearby_drivers = redis + let nearby_drivers: Vec<(DriverId, Point)> = redis .mgeo_search( bucket_keys, GeoPosition::from((lon, lat)), @@ -216,9 +211,7 @@ pub async fn get_drivers_within_radius( SortOrder::Asc, ) .await - .map_err(|err| AppError::InternalError(err.to_string()))?; - - let nearby_drivers: Vec<(DriverId, Point)> = cat_maybes(nearby_drivers) + .map_err(|err| AppError::InternalError(err.to_string()))? .into_iter() .map(|(driver_id, point)| { ( @@ -640,3 +633,50 @@ pub async fn push_drainer_driver_location( .await .map_err(|err| AppError::InternalError(err.to_string())) } + +pub async fn set_route_location( + redis: &RedisConnectionPool, + route_code: &str, + vehicle_number: &String, + location: &Point, + speed: &Option, + timestamp: &TimeStamp, +) -> Result<(), AppError> { + let vehicle_tracking_info = VehicleTrackingInfo { + schedule_relationship: None, + start_time: None, + trip_id: None, + latitude: location.lat, + longitude: location.lon, + speed: *speed, + timestamp: *timestamp, + }; + redis + .set_hash_fields_with_hashmap_expiry( + &driver_loc_based_on_route_key(route_code), + vec![(vehicle_number.to_owned(), vehicle_tracking_info)], + 86400, + ) + .await + .map_err(|err| AppError::InternalError(err.to_string())) +} + +pub async fn get_route_location( + redis: &RedisConnectionPool, + route_code: &str, +) -> Result, AppError> { + redis + .get_all_hash_fields(&driver_loc_based_on_route_key(route_code)) + .await + .map_err(|err| AppError::InternalError(err.to_string())) +} + +pub async fn get_trip_location( + redis: &RedisConnectionPool, + trip_code: &str, +) -> Result, AppError> { + redis + .get_all_hash_fields(&driver_loc_based_on_trip_key(trip_code)) + .await + .map_err(|err| AppError::InternalError(err.to_string())) +} diff --git a/crates/location_tracking_service/src/redis/keys.rs b/crates/location_tracking_service/src/redis/keys.rs index c8f9d529..d0864b18 100644 --- a/crates/location_tracking_service/src/redis/keys.rs +++ b/crates/location_tracking_service/src/redis/keys.rs @@ -172,3 +172,11 @@ pub fn driver_loc_bucket_key( ) -> String { format!("lts:dl:off_ride:loc:{merchant_id}:{vehicle_type}:{city}:{bucket}") } + +pub fn driver_loc_based_on_route_key(route_code: &str) -> String { + format!("route:{route_code}") +} + +pub fn driver_loc_based_on_trip_key(trip_id: &str) -> String { + format!("trip:{trip_id}") +} diff --git a/crates/tests/Cargo.toml b/crates/tests/Cargo.toml index 26559fc9..0f620e31 100644 --- a/crates/tests/Cargo.toml +++ b/crates/tests/Cargo.toml @@ -11,5 +11,5 @@ serde_dhall = "0.12.1" tokio-stream = "0.1" tonic = "0.10.2" -shared = { git = "https://github.com/nammayatri/shared-kernel-rs", rev = "2d6cf2e" } +shared = { git = "https://github.com/nammayatri/shared-kernel-rs", rev = "f8d80ea" } location_tracking_service = { version = "0.1.0", path = "../location_tracking_service" } \ No newline at end of file diff --git a/dhall-configs/dev/location_tracking_service.dhall b/dhall-configs/dev/location_tracking_service.dhall index 5565a2d9..76e2bdfa 100644 --- a/dhall-configs/dev/location_tracking_service.dhall +++ b/dhall-configs/dev/location_tracking_service.dhall @@ -30,7 +30,7 @@ let kafka_cfg = { let LogLevel = < TRACE | DEBUG | INFO | WARN | ERROR | OFF > let logger_cfg = { - level = LogLevel.INFO, + level = LogLevel.DEBUG, log_to_file = False } @@ -64,7 +64,7 @@ in { location_update_interval = 60, driver_location_update_topic = "location-updates", batch_size = 100, - bucket_size = 30, + bucket_size = 300, nearby_bucket_threshold = 4, blacklist_merchants = ["favorit0-0000-0000-0000-00000favorit"], request_timeout = 9000, diff --git a/flake.lock b/flake.lock index 569e05c7..fa34ada4 100644 --- a/flake.lock +++ b/flake.lock @@ -109,24 +109,6 @@ "type": "github" } }, - "flake-utils_3": { - "inputs": { - "systems": "systems_3" - }, - "locked": { - "lastModified": 1681202837, - "narHash": "sha256-H+Rh19JDwRtpVPAWp64F+rlEtxUWBAQW28eAi3SRSzg=", - "owner": "numtide", - "repo": "flake-utils", - "rev": "cfacdce06f30d2b68473a46042957675eebb3401", - "type": "github" - }, - "original": { - "owner": "numtide", - "repo": "flake-utils", - "type": "github" - } - }, "gitignore": { "inputs": { "nixpkgs": [ @@ -216,11 +198,11 @@ }, "nixpkgs_3": { "locked": { - "lastModified": 1681358109, - "narHash": "sha256-eKyxW4OohHQx9Urxi7TQlFBTDWII+F+x2hklDOQPB50=", + "lastModified": 1728538411, + "narHash": "sha256-f0SBJz1eZ2yOuKUr5CA9BHULGXVSn6miBuUWdTyhUhU=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "96ba1c52e54e74c3197f4d43026b3f3d92e83ff9", + "rev": "b69de56fac8c2b6f8fd27f2eca01dcda8e0a4221", "type": "github" }, "original": { @@ -292,7 +274,7 @@ "process-compose-flake": "process-compose-flake", "rust-overlay": "rust-overlay_2", "services-flake": "services-flake", - "systems": "systems_4", + "systems": "systems_3", "treefmt-nix": "treefmt-nix" } }, @@ -323,15 +305,14 @@ }, "rust-overlay_2": { "inputs": { - "flake-utils": "flake-utils_3", "nixpkgs": "nixpkgs_3" }, "locked": { - "lastModified": 1690942540, - "narHash": "sha256-eafSSO3Y+/TFuy+CHKyolYfGvC33IAWNx4W2NA7LfZM=", + "lastModified": 1731897198, + "narHash": "sha256-Ou7vLETSKwmE/HRQz4cImXXJBr/k9gp4J4z/PF8LzTE=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "aa3994f054038262df55122dfa552b9eab71a994", + "rev": "0be641045af6d8666c11c2c40e45ffc9667839b5", "type": "github" }, "original": { @@ -400,21 +381,6 @@ "type": "github" } }, - "systems_4": { - "locked": { - "lastModified": 1681028828, - "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", - "owner": "nix-systems", - "repo": "default", - "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", - "type": "github" - }, - "original": { - "owner": "nix-systems", - "repo": "default", - "type": "github" - } - }, "treefmt-nix": { "inputs": { "nixpkgs": "nixpkgs_4"