Skip to content

Commit

Permalink
impl and expose IFabricStatefulServicePartition3 apis
Browse files Browse the repository at this point in the history
  • Loading branch information
youyuanwu committed Nov 12, 2024
1 parent c7a9b87 commit c7b1c76
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 52 deletions.
31 changes: 3 additions & 28 deletions crates/libs/core/src/runtime/stateful.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
// ------------------------------------------------------------

// stateful contains rs definition of stateful traits that user needs to implement
use mssf_com::FabricRuntime::IFabricStatefulServicePartition;

use crate::sync::CancellationToken;
use crate::types::{LoadMetric, LoadMetricListRef, ReplicaRole};
use crate::types::ReplicaRole;

use crate::types::{Epoch, OpenMode, ReplicaInformation, ReplicaSetConfig, ReplicaSetQuarumMode};

use super::stateful_proxy::StatefulServicePartition;

/// Represents a stateful service factory that is responsible for creating replicas
/// of a specific type of stateful service. Stateful service factories are registered with
/// the FabricRuntime by service hosts via register_stateful_service_factory().
Expand Down Expand Up @@ -64,32 +65,6 @@ pub trait LocalStatefulServiceReplica: Send + Sync + 'static {
fn abort(&self);
}

#[derive(Debug, Clone)]
pub struct StatefulServicePartition {
com_impl: IFabricStatefulServicePartition,
}

impl StatefulServicePartition {
pub fn get_com(&self) -> &IFabricStatefulServicePartition {
&self.com_impl
}

/// Reports load for the current replica in the partition.
pub fn report_load(&self, metrics: &[LoadMetric]) -> crate::Result<()> {
let metrics_ref = LoadMetricListRef::from_slice(metrics);
let raw = metrics_ref.as_raw_slice();
unsafe { self.com_impl.ReportLoad(raw) }
}
}

impl From<&IFabricStatefulServicePartition> for StatefulServicePartition {
fn from(e: &IFabricStatefulServicePartition) -> Self {
StatefulServicePartition {
com_impl: e.clone(),
}
}
}

/// TODO: replicator has no public documentation
#[trait_variant::make(Replicator: Send)]
pub trait LocalReplicator: Send + Sync + 'static {
Expand Down
15 changes: 9 additions & 6 deletions crates/libs/core/src/runtime/stateful_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use std::sync::Arc;

use crate::{Interface, HSTRING};
use crate::{runtime::stateful_proxy::StatefulServicePartition, Interface, HSTRING};
use tracing::info;
use windows_core::implement;

Expand All @@ -20,8 +20,8 @@ use mssf_com::{
IFabricPrimaryReplicator, IFabricPrimaryReplicator_Impl, IFabricReplicator,
IFabricReplicatorCatchupSpecificQuorum, IFabricReplicatorCatchupSpecificQuorum_Impl,
IFabricReplicator_Impl, IFabricStatefulServiceFactory, IFabricStatefulServiceFactory_Impl,
IFabricStatefulServicePartition, IFabricStatefulServiceReplica,
IFabricStatefulServiceReplica_Impl,
IFabricStatefulServicePartition, IFabricStatefulServicePartition3,
IFabricStatefulServiceReplica, IFabricStatefulServiceReplica_Impl,
},
FabricTypes::{
FABRIC_EPOCH, FABRIC_REPLICA_INFORMATION, FABRIC_REPLICA_OPEN_MODE, FABRIC_REPLICA_ROLE,
Expand All @@ -30,7 +30,6 @@ use mssf_com::{
};

use crate::{
runtime::stateful::StatefulServicePartition,
strings::HSTRINGWrap,
sync::BridgeContext3,
types::{Epoch, OpenMode, ReplicaInformation, ReplicaRole, ReplicaSetConfig},
Expand Down Expand Up @@ -543,14 +542,18 @@ where
let inner = self.inner.clone();
let rt_cp = self.rt.clone();
let openmode2: OpenMode = openmode.into();
let partition2: StatefulServicePartition = partition.unwrap().into();
let com_partition = partition
.unwrap()
.cast::<IFabricStatefulServicePartition3>()
.expect("cannot query interface");
let partition = StatefulServicePartition::from(&com_partition);
info!(
"IFabricStatefulReplicaBridge::BeginOpen: mode {:?}",
openmode2
);
let (ctx, token) = BridgeContext3::make(callback);
ctx.spawn(&self.rt, async move {
inner.open(openmode2, &partition2, token).await.map(|s| {
inner.open(openmode2, &partition, token).await.map(|s| {
let bridge: IFabricPrimaryReplicator =
IFabricPrimaryReplicatorBridge::create(s, rt_cp).into();
bridge.clone().cast::<IFabricReplicator>().unwrap()
Expand Down
104 changes: 99 additions & 5 deletions crates/libs/core/src/runtime/stateful_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,22 @@ use std::ffi::c_void;

use mssf_com::FabricRuntime::{
IFabricPrimaryReplicator, IFabricReplicator, IFabricReplicatorCatchupSpecificQuorum,
IFabricStatefulServiceReplica,
IFabricStatefulServicePartition3, IFabricStatefulServiceReplica,
};
use tracing::info;
use windows_core::{Interface, HSTRING};

use crate::{
error::FabricErrorCode,
strings::HSTRINGWrap,
sync::{fabric_begin_end_proxy2, CancellationToken},
types::ReplicaRole,
types::{
FaultType, LoadMetric, LoadMetricListRef, MoveCost, ReplicaRole,
ServicePartitionAccessStatus, ServicePartitionInformation,
},
};

use super::stateful::{
PrimaryReplicator, Replicator, StatefulServicePartition, StatefulServiceReplica,
};
use super::stateful::{PrimaryReplicator, Replicator, StatefulServiceReplica};
use crate::types::{Epoch, OpenMode, ReplicaInformation, ReplicaSetConfig, ReplicaSetQuarumMode};

pub struct StatefulServiceReplicaProxy {
Expand Down Expand Up @@ -306,3 +308,95 @@ impl PrimaryReplicator for PrimaryReplicatorProxy {
unsafe { self.com_impl.RemoveReplica(replicaid) }
}
}

/// Proxy COM object IFabricStatefulServicePartition3
#[derive(Debug, Clone)]
pub struct StatefulServicePartition {
com_impl: IFabricStatefulServicePartition3,
}

impl StatefulServicePartition {
pub fn get_com(&self) -> &IFabricStatefulServicePartition3 {
&self.com_impl
}

/// Provides access to the ServicePartitionInformation of the service, which contains the partition type and ID.
pub fn get_partition_information(&self) -> crate::Result<ServicePartitionInformation> {
unsafe { self.com_impl.GetPartitionInfo()?.as_ref() }
.ok_or(FabricErrorCode::E_POINTER.into())
.map(ServicePartitionInformation::from)
}

/// Used to check the readiness of the replica in regard to read operations.
/// The ReadStatus should be checked before the replica is servicing a customer request that is a read operation.
pub fn get_read_status(&self) -> crate::Result<ServicePartitionAccessStatus> {
unsafe { self.com_impl.GetReadStatus() }.map(ServicePartitionAccessStatus::from)
}

/// Used to check the readiness of the partition in regard to write operations.
/// The WriteStatus should be checked before the replica services a customer request that is a write operation.
pub fn get_write_status(&self) -> crate::Result<ServicePartitionAccessStatus> {
unsafe { self.com_impl.GetWriteStatus() }.map(ServicePartitionAccessStatus::from)
}

/// TODO: not implemented
/// Creates a FabricReplicator with the specified settings and returns it to the replica.
pub fn create_replicator(&self) -> crate::Result<()> {
Err(FabricErrorCode::E_NOTIMPL.into())
}

/// Reports load for the current replica in the partition.
/// Remarks:
/// The reported metrics should correspond to those that are provided in the ServiceLoadMetricDescription
/// as a part of the ServiceDescription that is used to create the service. Load metrics that are not
/// present in the description are ignored. Reporting custom metrics allows Service Fabric to balance
/// services that are based on additional custom information.
pub fn report_load(&self, metrics: &[LoadMetric]) -> crate::Result<()> {
let metrics_ref = LoadMetricListRef::from_slice(metrics);
let raw = metrics_ref.as_raw_slice();
unsafe { self.com_impl.ReportLoad(raw) }
}

/// Enables the replica to report a fault to the runtime and indicates that it has encountered
/// an error from which it cannot recover and must either be restarted or removed.
pub fn report_fault(&self, fault_type: FaultType) -> crate::Result<()> {
unsafe { self.com_impl.ReportFault(fault_type.into()) }
}

/// Reports the move cost for a replica.
/// Remarks:
/// Services can report move cost of a replica using this method.
/// While the Service Fabric Resource Balances searches for the best balance in the cluster,
/// it examines both load information and move cost of each replica.
/// Resource balances will prefer to move replicas with lower cost in order to achieve balance.
pub fn report_move_cost(&self, move_cost: MoveCost) -> crate::Result<()> {
unsafe { self.com_impl.ReportMoveCost(move_cost.into()) }
}

/// Remarks:
/// The health information describes the report details, like the source ID, the property,
/// the health state and other relevant details. The partition uses an internal health client
/// to send the reports to the health store. The client optimizes messages to Health Manager
/// by batching reports per a configured duration (Default: 30 seconds). If the report has high priority,
/// you can specify send options to send it immediately.
/// TODO: not yet implemented
/// Reports current partition health.
pub fn report_partition_health(&self) -> crate::Result<()> {
Err(FabricErrorCode::E_NOTIMPL.into())
}

/// TODO: not yet implemented
/// Reports health on the current stateful service replica of the partition.
pub fn report_replica_health(&self) -> crate::Result<()> {
Err(FabricErrorCode::E_NOTIMPL.into())
}
}

impl From<&IFabricStatefulServicePartition3> for StatefulServicePartition {
fn from(e: &IFabricStatefulServicePartition3) -> Self {
StatefulServicePartition {
com_impl: e.clone(),
}
}
}
39 changes: 38 additions & 1 deletion crates/libs/core/src/types/common/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
//! Module for handling fabric metrics
use crate::{HSTRING, PCWSTR};
use mssf_com::FabricTypes::FABRIC_LOAD_METRIC;
use mssf_com::FabricTypes::{
FABRIC_LOAD_METRIC, FABRIC_MOVE_COST, FABRIC_MOVE_COST_HIGH, FABRIC_MOVE_COST_LOW,
FABRIC_MOVE_COST_MEDIUM, FABRIC_MOVE_COST_ZERO,
};
use std::marker::PhantomData;

/// FABRIC_LOAD_METRIC
Expand Down Expand Up @@ -53,3 +56,37 @@ impl<'a> LoadMetricListRef<'a> {
self.metrics.as_slice()
}
}

#[derive(Debug, Clone, PartialEq)]
pub enum MoveCost {
Zero,
Low,
Medium,
High,
// VeryHigh,
}

impl From<FABRIC_MOVE_COST> for MoveCost {
fn from(value: FABRIC_MOVE_COST) -> Self {
match value {
FABRIC_MOVE_COST_ZERO => Self::Zero,
FABRIC_MOVE_COST_LOW => Self::Low,
FABRIC_MOVE_COST_MEDIUM => Self::Medium,
FABRIC_MOVE_COST_HIGH => Self::High,
// Not supported in rust yet
// FABRIC_MOVE_COST_VERYHIGH =>Self::VeryHigh,
_ => Self::Zero,
}
}
}

impl From<MoveCost> for FABRIC_MOVE_COST {
fn from(value: MoveCost) -> Self {
match value {
MoveCost::Zero => FABRIC_MOVE_COST_ZERO,
MoveCost::Low => FABRIC_MOVE_COST_LOW,
MoveCost::Medium => FABRIC_MOVE_COST_MEDIUM,
MoveCost::High => FABRIC_MOVE_COST_HIGH,
}
}
}
35 changes: 33 additions & 2 deletions crates/libs/core/src/types/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ mod metrics;
pub use metrics::*;

use mssf_com::FabricTypes::{
FABRIC_HEALTH_STATE, FABRIC_HEALTH_STATE_ERROR, FABRIC_HEALTH_STATE_INVALID,
FABRIC_HEALTH_STATE_OK, FABRIC_HEALTH_STATE_UNKNOWN, FABRIC_HEALTH_STATE_WARNING,
FABRIC_FAULT_TYPE, FABRIC_FAULT_TYPE_INVALID, FABRIC_FAULT_TYPE_PERMANENT,
FABRIC_FAULT_TYPE_TRANSIENT, FABRIC_HEALTH_STATE, FABRIC_HEALTH_STATE_ERROR,
FABRIC_HEALTH_STATE_INVALID, FABRIC_HEALTH_STATE_OK, FABRIC_HEALTH_STATE_UNKNOWN,
FABRIC_HEALTH_STATE_WARNING,
};

// FABRIC_HEALTH_STATE
Expand All @@ -38,3 +40,32 @@ impl From<&FABRIC_HEALTH_STATE> for HealthState {
}
}
}

// FABRIC_FAULT_TYPE
#[derive(Debug, Clone, PartialEq)]
pub enum FaultType {
Invalid,
Permanent,
Transient,
}

impl From<FABRIC_FAULT_TYPE> for FaultType {
fn from(value: FABRIC_FAULT_TYPE) -> Self {
match value {
FABRIC_FAULT_TYPE_INVALID => Self::Invalid,
FABRIC_FAULT_TYPE_PERMANENT => Self::Permanent,
FABRIC_FAULT_TYPE_TRANSIENT => Self::Transient,
_ => Self::Invalid,
}
}
}

impl From<FaultType> for FABRIC_FAULT_TYPE {
fn from(value: FaultType) -> Self {
match value {
FaultType::Invalid => FABRIC_FAULT_TYPE_INVALID,
FaultType::Permanent => FABRIC_FAULT_TYPE_PERMANENT,
FaultType::Transient => FABRIC_FAULT_TYPE_TRANSIENT,
}
}
}
65 changes: 65 additions & 0 deletions crates/libs/core/src/types/common/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
use crate::{GUID, HSTRING};
use mssf_com::FabricTypes::{
FABRIC_INT64_RANGE_PARTITION_INFORMATION, FABRIC_NAMED_PARTITION_INFORMATION,
FABRIC_SERVICE_PARTITION_ACCESS_STATUS, FABRIC_SERVICE_PARTITION_ACCESS_STATUS_GRANTED,
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_INVALID,
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_NOT_PRIMARY,
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_NO_WRITE_QUORUM,
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_RECONFIGURATION_PENDING,
FABRIC_SERVICE_PARTITION_INFORMATION, FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE,
FABRIC_SERVICE_PARTITION_KIND_INVALID, FABRIC_SERVICE_PARTITION_KIND_NAMED,
FABRIC_SERVICE_PARTITION_KIND_SINGLETON, FABRIC_SINGLETON_PARTITION_INFORMATION,
Expand Down Expand Up @@ -97,3 +102,63 @@ impl From<&FABRIC_SERVICE_PARTITION_INFORMATION> for ServicePartitionInformation
}
}
}

/// FABRIC_SERVICE_PARTITION_ACCESS_STATUS
/// Remarks:
/// PartitionAccessStatus is used to check that a read or write operation is allowed.
/// When service replicas handle a client request, they should verify that the system is
/// in a state that allows processing. By checking the ReadStatus or WriteStatus as appropriate,
/// the replica can be notified of conditions that prevent correct operation.
/// Note that write operations might still see an exception from the replicator for one of these
/// conditions, because the condition might change between the WriteStatus check and the call
/// to StateReplicator.Replicate() (Not yet supported in mssf).
#[derive(Debug, Clone, PartialEq)]
pub enum ServicePartitionAccessStatus {
Invalid,
/// Indicates that the read or write operation access is granted and the operation is allowed.
Granted,
/// Indicates that the client should try again later, because a reconfiguration is in progress.
/// After the reconfiguration is completed, a new status is returned that gives further instructions.
/// The client should retry the operation at this replica
ReconfigurationPending,
/// Indicates that this client request was received by a replica that is not a Primary replica.
/// The read or write operation cannot be performed at this replica.
/// The client should attempt to use the naming service to identify the correct primary replica.
NotPrimary,
/// Indicates that no write quorum is available and, therefore, no write operation can be accepted.
/// The client should retry the operation at this replica.
NoWriteQuorum,
}

impl From<FABRIC_SERVICE_PARTITION_ACCESS_STATUS> for ServicePartitionAccessStatus {
fn from(value: FABRIC_SERVICE_PARTITION_ACCESS_STATUS) -> Self {
match value {
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_INVALID => Self::Invalid,
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_GRANTED => Self::Granted,
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_NOT_PRIMARY => Self::NotPrimary,
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_NO_WRITE_QUORUM => Self::NoWriteQuorum,
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_RECONFIGURATION_PENDING => {
Self::ReconfigurationPending
}
_ => Self::Invalid,
}
}
}

impl From<ServicePartitionAccessStatus> for FABRIC_SERVICE_PARTITION_ACCESS_STATUS {
fn from(value: ServicePartitionAccessStatus) -> Self {
match value {
ServicePartitionAccessStatus::Invalid => FABRIC_SERVICE_PARTITION_ACCESS_STATUS_INVALID,
ServicePartitionAccessStatus::Granted => FABRIC_SERVICE_PARTITION_ACCESS_STATUS_GRANTED,
ServicePartitionAccessStatus::ReconfigurationPending => {
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_RECONFIGURATION_PENDING
}
ServicePartitionAccessStatus::NotPrimary => {
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_NOT_PRIMARY
}
ServicePartitionAccessStatus::NoWriteQuorum => {
FABRIC_SERVICE_PARTITION_ACCESS_STATUS_NO_WRITE_QUORUM
}
}
}
}
Loading

0 comments on commit c7b1c76

Please sign in to comment.