Skip to content

Commit

Permalink
Example: inspect read write status (#103)
Browse files Browse the repository at this point in the history
Utilizes and inspects partition read write status in the example
stateful app.
Changes mssf-core tracing to debug. App usually has info traces already,
of the same content.
  • Loading branch information
youyuanwu authored Dec 2, 2024
1 parent 7d1ad3a commit a0665c1
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 88 deletions.
56 changes: 28 additions & 28 deletions crates/libs/core/src/runtime/stateful_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use std::sync::Arc;

use crate::{runtime::stateful_proxy::StatefulServicePartition, Interface, HSTRING};
use tracing::info;
use tracing::debug;
use windows_core::implement;

use mssf_com::{
Expand Down Expand Up @@ -78,7 +78,7 @@ where
partitionid: &crate::GUID,
replicaid: i64,
) -> crate::Result<IFabricStatefulServiceReplica> {
info!("StatefulServiceFactoryBridge::CreateReplica");
debug!("StatefulServiceFactoryBridge::CreateReplica");
let p_servicename = crate::PCWSTR::from_raw(servicename.0);
let h_servicename = HSTRING::from_wide(unsafe { p_servicename.as_wide() }).unwrap();
let h_servicetypename = HSTRING::from_wide(unsafe { servicetypename.as_wide() }).unwrap();
Expand Down Expand Up @@ -148,7 +148,7 @@ where
&self,
callback: ::core::option::Option<&super::IFabricAsyncOperationCallback>,
) -> crate::Result<super::IFabricAsyncOperationContext> {
info!("IFabricReplicatorBridge::BeginOpen");
debug!("IFabricReplicatorBridge::BeginOpen");
let inner = self.inner.clone();
let (ctx, token) = BridgeContext3::make(callback);
ctx.spawn(&self.rt, async move {
Expand All @@ -163,7 +163,7 @@ where
&self,
context: ::core::option::Option<&super::IFabricAsyncOperationContext>,
) -> crate::Result<IFabricStringResult> {
info!("IFabricReplicatorBridge::EndOpen");
debug!("IFabricReplicatorBridge::EndOpen");
BridgeContext3::result(context)?
}

Expand All @@ -177,7 +177,7 @@ where
let inner = self.inner.clone();
let epoch2: Epoch = unsafe { epoch.as_ref().unwrap().into() };
let role2: ReplicaRole = (&role).into();
info!(
debug!(
"IFabricReplicatorBridge::BeginChangeRole epoch {:?}, role {:?}",
epoch2, role2
);
Expand All @@ -192,7 +192,7 @@ where
&self,
context: ::core::option::Option<&super::IFabricAsyncOperationContext>,
) -> crate::Result<()> {
info!("IFabricReplicatorBridge::EndChangeRole");
debug!("IFabricReplicatorBridge::EndChangeRole");
BridgeContext3::result(context)?
}

Expand All @@ -204,7 +204,7 @@ where
) -> crate::Result<super::IFabricAsyncOperationContext> {
let inner = self.inner.clone();
let epoch2: Epoch = unsafe { epoch.as_ref().unwrap().into() };
info!(
debug!(
"IFabricReplicatorBridge::BeginUpdateEpoch epoch {:?}",
epoch2
);
Expand All @@ -219,15 +219,15 @@ where
&self,
context: ::core::option::Option<&super::IFabricAsyncOperationContext>,
) -> crate::Result<()> {
info!("IFabricReplicatorBridge::BeginUpdateEpoch");
debug!("IFabricReplicatorBridge::BeginUpdateEpoch");
BridgeContext3::result(context)?
}

fn BeginClose(
&self,
callback: ::core::option::Option<&super::IFabricAsyncOperationCallback>,
) -> crate::Result<super::IFabricAsyncOperationContext> {
info!("IFabricReplicatorBridge::BeginClose");
debug!("IFabricReplicatorBridge::BeginClose");
let inner = self.inner.clone();
let (ctx, token) = BridgeContext3::make(callback);
ctx.spawn(&self.rt, async move { inner.close(token).await })
Expand All @@ -237,24 +237,24 @@ where
&self,
context: ::core::option::Option<&super::IFabricAsyncOperationContext>,
) -> crate::Result<()> {
info!("IFabricReplicatorBridge::EndClose");
debug!("IFabricReplicatorBridge::EndClose");
BridgeContext3::result(context)?
}

fn Abort(&self) {
info!("IFabricReplicatorBridge::Abort");
debug!("IFabricReplicatorBridge::Abort");
self.inner.abort();
}

fn GetCurrentProgress(&self) -> crate::Result<i64> {
let lsn = self.inner.get_current_progress();
info!("IFabricReplicatorBridge::GetCurrentProgress: {:?}", lsn);
debug!("IFabricReplicatorBridge::GetCurrentProgress: {:?}", lsn);
lsn
}

fn GetCatchUpCapability(&self) -> crate::Result<i64> {
let lsn = self.inner.get_catch_up_capability();
info!("IFabricReplicatorBridge::GetCatchUpCapability: {:?}", lsn);
debug!("IFabricReplicatorBridge::GetCatchUpCapability: {:?}", lsn);
lsn
}
}
Expand Down Expand Up @@ -395,7 +395,7 @@ where
&self,
callback: ::core::option::Option<&super::IFabricAsyncOperationCallback>,
) -> crate::Result<super::IFabricAsyncOperationContext> {
info!("IFabricPrimaryReplicatorBridge::BeginOnDataLoss");
debug!("IFabricPrimaryReplicatorBridge::BeginOnDataLoss");
let inner = self.inner.clone();

let (ctx, token) = BridgeContext3::make(callback);
Expand All @@ -406,7 +406,7 @@ where
&self,
context: ::core::option::Option<&super::IFabricAsyncOperationContext>,
) -> crate::Result<u8> {
info!("IFabricPrimaryReplicatorBridge::EndOnDataLoss");
debug!("IFabricPrimaryReplicatorBridge::EndOnDataLoss");
BridgeContext3::result(context)?
}

Expand All @@ -418,7 +418,7 @@ where
) -> crate::Result<()> {
let cc = ReplicaSetConfig::from(unsafe { currentconfiguration.as_ref().unwrap() });
let pc = ReplicaSetConfig::from(unsafe { previousconfiguration.as_ref().unwrap() });
info!("IFabricPrimaryReplicatorBridge::UpdateCatchUpReplicaSetConfiguration: curr {:?}, prev {:?}", cc, pc);
debug!("IFabricPrimaryReplicatorBridge::UpdateCatchUpReplicaSetConfiguration: curr {:?}, prev {:?}", cc, pc);
self.inner
.update_catch_up_replica_set_configuration(&cc, &pc)
}
Expand All @@ -429,7 +429,7 @@ where
callback: ::core::option::Option<&super::IFabricAsyncOperationCallback>,
) -> crate::Result<super::IFabricAsyncOperationContext> {
let catchupmode = catchupmode.into();
info!(
debug!(
"IFabricPrimaryReplicatorBridge::BeginWaitForCatchUpQuorum: mode {:?}",
catchupmode
);
Expand All @@ -444,7 +444,7 @@ where
&self,
context: ::core::option::Option<&super::IFabricAsyncOperationContext>,
) -> crate::Result<()> {
info!("IFabricPrimaryReplicatorBridge::BeginWaitForCatchUpQuorum");
debug!("IFabricPrimaryReplicatorBridge::BeginWaitForCatchUpQuorum");
BridgeContext3::result(context)?
}

Expand All @@ -454,7 +454,7 @@ where
currentconfiguration: *const FABRIC_REPLICA_SET_CONFIGURATION,
) -> crate::Result<()> {
let c = ReplicaSetConfig::from(unsafe { currentconfiguration.as_ref() }.unwrap());
info!(
debug!(
"IFabricPrimaryReplicatorBridge::UpdateCurrentReplicaSetConfiguration {:?}",
c
);
Expand All @@ -469,7 +469,7 @@ where
) -> crate::Result<super::IFabricAsyncOperationContext> {
let inner = self.inner.clone();
let r = ReplicaInformation::from(unsafe { replica.as_ref().unwrap() });
info!("IFabricPrimaryReplicatorBridge::BeginBuildReplica: {:?}", r);
debug!("IFabricPrimaryReplicatorBridge::BeginBuildReplica: {:?}", r);
let (ctx, token) = BridgeContext3::make(callback);
ctx.spawn(
&self.rt,
Expand All @@ -481,12 +481,12 @@ where
&self,
context: ::core::option::Option<&super::IFabricAsyncOperationContext>,
) -> crate::Result<()> {
info!("IFabricPrimaryReplicatorBridge::EndBuildReplica");
debug!("IFabricPrimaryReplicatorBridge::EndBuildReplica");
BridgeContext3::result(context)?
}

fn RemoveReplica(&self, replicaid: i64) -> crate::Result<()> {
info!("IFabricPrimaryReplicatorBridge::RemoveReplica: replicaid {replicaid}");
debug!("IFabricPrimaryReplicatorBridge::RemoveReplica: replicaid {replicaid}");
self.inner.remove_replica(replicaid)
}
}
Expand Down Expand Up @@ -546,7 +546,7 @@ where
.cast::<IFabricStatefulServicePartition3>()
.expect("cannot query interface");
let partition = StatefulServicePartition::from(&com_partition);
info!(
debug!(
"IFabricStatefulReplicaBridge::BeginOpen: mode {:?}",
openmode2
);
Expand All @@ -564,7 +564,7 @@ where
&self,
context: ::core::option::Option<&super::IFabricAsyncOperationContext>,
) -> crate::Result<IFabricReplicator> {
info!("IFabricStatefulReplicaBridge::EndOpen");
debug!("IFabricStatefulReplicaBridge::EndOpen");
BridgeContext3::result(context)?
}

Expand All @@ -575,7 +575,7 @@ where
) -> crate::Result<super::IFabricAsyncOperationContext> {
let inner = self.inner.clone();
let newrole2: ReplicaRole = (&newrole).into();
info!(
debug!(
"IFabricStatefulReplicaBridge::BeginChangeRole: {:?}",
newrole2
);
Expand All @@ -592,15 +592,15 @@ where
&self,
context: ::core::option::Option<&super::IFabricAsyncOperationContext>,
) -> crate::Result<IFabricStringResult> {
info!("IFabricStatefulReplicaBridge::EndChangeRole");
debug!("IFabricStatefulReplicaBridge::EndChangeRole");
BridgeContext3::result(context)?
}

fn BeginClose(
&self,
callback: ::core::option::Option<&super::IFabricAsyncOperationCallback>,
) -> crate::Result<super::IFabricAsyncOperationContext> {
info!("IFabricStatefulReplicaBridge::BeginClose");
debug!("IFabricStatefulReplicaBridge::BeginClose");
let inner = self.inner.clone();
let (ctx, token) = BridgeContext3::make(callback);
ctx.spawn(&self.rt, async move { inner.close(token).await })
Expand All @@ -610,7 +610,7 @@ where
&self,
context: ::core::option::Option<&super::IFabricAsyncOperationContext>,
) -> crate::Result<()> {
info!("IFabricStatefulReplicaBridge::EndClose");
debug!("IFabricStatefulReplicaBridge::EndClose");
BridgeContext3::result(context)?
}

Expand Down
36 changes: 18 additions & 18 deletions crates/libs/core/src/runtime/stateful_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use mssf_com::FabricRuntime::{
IFabricPrimaryReplicator, IFabricReplicator, IFabricReplicatorCatchupSpecificQuorum,
IFabricStatefulServicePartition3, IFabricStatefulServiceReplica,
};
use tracing::info;
use tracing::debug;
use windows_core::{Interface, HSTRING};

use crate::{
Expand Down Expand Up @@ -45,7 +45,7 @@ impl StatefulServiceReplica for StatefulServiceReplicaProxy {
partition: &StatefulServicePartition,
cancellation_token: CancellationToken,
) -> crate::Result<impl PrimaryReplicator> {
info!("StatefulServiceReplicaProxy::open with mode {:?}", openmode);
debug!("StatefulServiceReplicaProxy::open with mode {:?}", openmode);
let com1 = &self.com_impl;
let com2 = self.com_impl.clone();
let rx = fabric_begin_end_proxy2(
Expand Down Expand Up @@ -79,7 +79,7 @@ impl StatefulServiceReplica for StatefulServiceReplicaProxy {
cancellation_token: CancellationToken,
) -> crate::Result<HSTRING> {
// replica address
info!("StatefulServiceReplicaProxy::change_role {:?}", newrole);
debug!("StatefulServiceReplicaProxy::change_role {:?}", newrole);
let com1 = &self.com_impl;
let com2 = self.com_impl.clone();
let rx = fabric_begin_end_proxy2(
Expand All @@ -91,7 +91,7 @@ impl StatefulServiceReplica for StatefulServiceReplicaProxy {
Ok(HSTRINGWrap::from(&addr).into())
}
async fn close(&self, cancellation_token: CancellationToken) -> crate::Result<()> {
info!("StatefulServiceReplicaProxy::close");
debug!("StatefulServiceReplicaProxy::close");
let com1 = &self.com_impl;
let com2 = self.com_impl.clone();
let rx = fabric_begin_end_proxy2(
Expand All @@ -102,7 +102,7 @@ impl StatefulServiceReplica for StatefulServiceReplicaProxy {
rx.await?
}
fn abort(&self) {
info!("StatefulServiceReplicaProxy::abort");
debug!("StatefulServiceReplicaProxy::abort");
unsafe { self.com_impl.Abort() }
}
}
Expand All @@ -119,7 +119,7 @@ impl ReplicatorProxy {

impl Replicator for ReplicatorProxy {
async fn open(&self, cancellation_token: CancellationToken) -> crate::Result<HSTRING> {
info!("ReplicatorProxy::open");
debug!("ReplicatorProxy::open");
// replicator address
let com1 = &self.com_impl;
let com2 = self.com_impl.clone();
Expand All @@ -132,7 +132,7 @@ impl Replicator for ReplicatorProxy {
Ok(HSTRINGWrap::from(&addr).into())
}
async fn close(&self, cancellation_token: CancellationToken) -> crate::Result<()> {
info!("ReplicatorProxy::close");
debug!("ReplicatorProxy::close");
let com1 = &self.com_impl;
let com2 = self.com_impl.clone();
let rx = fabric_begin_end_proxy2(
Expand All @@ -148,7 +148,7 @@ impl Replicator for ReplicatorProxy {
role: &ReplicaRole,
cancellation_token: CancellationToken,
) -> crate::Result<()> {
info!("ReplicatorProxy::change_role");
debug!("ReplicatorProxy::change_role");
let com1 = &self.com_impl;
let com2 = self.com_impl.clone();
let rx = fabric_begin_end_proxy2(
Expand All @@ -163,7 +163,7 @@ impl Replicator for ReplicatorProxy {
epoch: &Epoch,
cancellation_token: CancellationToken,
) -> crate::Result<()> {
info!("ReplicatorProxy::update_epoch");
debug!("ReplicatorProxy::update_epoch");
let com1 = &self.com_impl;
let com2 = self.com_impl.clone();
let rx = fabric_begin_end_proxy2(
Expand All @@ -174,15 +174,15 @@ impl Replicator for ReplicatorProxy {
rx.await?
}
fn get_current_progress(&self) -> crate::Result<i64> {
info!("ReplicatorProxy::get_current_progress");
debug!("ReplicatorProxy::get_current_progress");
unsafe { self.com_impl.GetCurrentProgress() }
}
fn get_catch_up_capability(&self) -> crate::Result<i64> {
info!("ReplicatorProxy::get_catch_up_capability");
debug!("ReplicatorProxy::get_catch_up_capability");
unsafe { self.com_impl.GetCatchUpCapability() }
}
fn abort(&self) {
info!("ReplicatorProxy::abort");
debug!("ReplicatorProxy::abort");
unsafe { self.com_impl.Abort() }
}
}
Expand Down Expand Up @@ -236,7 +236,7 @@ impl Replicator for PrimaryReplicatorProxy {

impl PrimaryReplicator for PrimaryReplicatorProxy {
async fn on_data_loss(&self, cancellation_token: CancellationToken) -> crate::Result<u8> {
info!("PrimaryReplicatorProxy::on_data_loss");
debug!("PrimaryReplicatorProxy::on_data_loss");
let com1 = &self.com_impl;
let com2 = self.com_impl.clone();
let rx = fabric_begin_end_proxy2(
Expand All @@ -251,7 +251,7 @@ impl PrimaryReplicator for PrimaryReplicatorProxy {
currentconfiguration: &ReplicaSetConfig,
previousconfiguration: &ReplicaSetConfig,
) -> crate::Result<()> {
info!("PrimaryReplicatorProxy::update_catch_up_replica_set_configuration");
debug!("PrimaryReplicatorProxy::update_catch_up_replica_set_configuration");
let cc_view = currentconfiguration.get_view();
let pc_view = previousconfiguration.get_view();
unsafe {
Expand All @@ -264,7 +264,7 @@ impl PrimaryReplicator for PrimaryReplicatorProxy {
catchupmode: ReplicaSetQuorumMode,
cancellation_token: CancellationToken,
) -> crate::Result<()> {
info!("PrimaryReplicatorProxy::wait_for_catch_up_quorum: catchupmode {catchupmode:?}");
debug!("PrimaryReplicatorProxy::wait_for_catch_up_quorum: catchupmode {catchupmode:?}");
let com1 = &self.com_impl;
let com2 = self.com_impl.clone();
let rx = fabric_begin_end_proxy2(
Expand All @@ -278,7 +278,7 @@ impl PrimaryReplicator for PrimaryReplicatorProxy {
&self,
currentconfiguration: &ReplicaSetConfig,
) -> crate::Result<()> {
info!("PrimaryReplicatorProxy::update_current_replica_set_configuration");
debug!("PrimaryReplicatorProxy::update_current_replica_set_configuration");
unsafe {
self.com_impl
.UpdateCurrentReplicaSetConfiguration(currentconfiguration.get_view().get_raw())
Expand All @@ -289,7 +289,7 @@ impl PrimaryReplicator for PrimaryReplicatorProxy {
replica: &ReplicaInformation,
cancellation_token: CancellationToken,
) -> crate::Result<()> {
info!("PrimaryReplicatorProxy::build_replica");
debug!("PrimaryReplicatorProxy::build_replica");
let com1 = &self.com_impl;
let com2 = self.com_impl.clone();
let rx = fabric_begin_end_proxy2(
Expand All @@ -304,7 +304,7 @@ impl PrimaryReplicator for PrimaryReplicatorProxy {
rx.await?
}
fn remove_replica(&self, replicaid: i64) -> crate::Result<()> {
info!("PrimaryReplicatorProxy::remove_replica");
debug!("PrimaryReplicatorProxy::remove_replica");
unsafe { self.com_impl.RemoveReplica(replicaid) }
}
}
Expand Down
Loading

0 comments on commit a0665c1

Please sign in to comment.