Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE#903] Reset telemetry bidirectional stream on heartbeat timeout #906

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl Client {

for session in sessions.unwrap() {
let peer = session.peer().to_string();
let response = Self::heart_beat_inner(session, &group, &namespace, &client_type).await;
let response = Self::heartbeat_inner(session, &group, &namespace, &client_type).await;
if response.is_err() {
error!(
logger,
Expand Down Expand Up @@ -322,7 +322,7 @@ impl Client {
.await
}

async fn heart_beat_inner<T: RPCClient + 'static>(
async fn heartbeat_inner<T: RPCClient + 'static>(
mut rpc_client: T,
group: &Option<String>,
namespace: &str,
Expand Down Expand Up @@ -731,7 +731,7 @@ impl SessionManager {
) -> Result<Session, ClientError> {
let mut session_map = self.session_map.lock().await;
let endpoint_url = endpoints.endpoint_url().to_string();
return if session_map.contains_key(&endpoint_url) {
if session_map.contains_key(&endpoint_url) {
Ok(session_map.get(&endpoint_url).unwrap().shadow_session())
} else {
let mut session = Session::new(
Expand All @@ -745,7 +745,7 @@ impl SessionManager {
let shadow_session = session.shadow_session();
session_map.insert(endpoint_url.clone(), session);
Ok(shadow_session)
};
}
}

pub(crate) async fn get_all_sessions(&self) -> Result<Vec<Session>, ClientError> {
Expand Down Expand Up @@ -1123,7 +1123,7 @@ pub(crate) mod tests {
.return_once(|_| Box::pin(futures::future::ready(response)));

let send_result =
Client::heart_beat_inner(mock, &Some("group".to_string()), "", &ClientType::Producer)
Client::heartbeat_inner(mock, &Some("group".to_string()), "", &ClientType::Producer)
.await;
assert!(send_result.is_ok());
}
Expand Down
3 changes: 3 additions & 0 deletions rust/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ pub enum ErrorKind {
#[error("Failed to receive message via channel")]
ChannelReceive,

#[error("Failed to receive RPC response before timeout elapsed")]
RpcTimeout,

#[error("Unknown error")]
Unknown,
}
Expand Down
71 changes: 34 additions & 37 deletions rust/src/model/message_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,43 +23,40 @@ use once_cell::sync::Lazy;
use parking_lot::Mutex;
use time::{Date, OffsetDateTime, PrimitiveDateTime, Time};

/**
* The codec for the message-id.
*
* <p>Codec here provides the following two functions:
* 1. Provide decoding function of message-id of all versions above v0.
* 2. Provide a generator of message-id of v1 version.
*
* <p>The message-id of versions above V1 consists of 17 bytes in total. The first two bytes represent the version
* number. For V1, these two bytes are 0x0001.
*
* <h3>V1 message id example</h3>
*
* <pre>
* ┌──┬────────────┬────┬────────┬────────┐
* │01│56F7E71C361B│21BC│024CCDBE│00000000│
* └──┴────────────┴────┴────────┴────────┘
* </pre>
*
* <h3>V1 version message id generation rules</h3>
*
* <pre>
* process id(lower 2bytes)
* ▲
* mac address(lower 6bytes) │ sequence number(big endian)
* ▲ │ ▲ (4bytes)
* │ │ │
* ┌─────┴─────┐ ┌┴┐ ┌───┐ ┌─┴─┐
* 0x01+ │ 6 │ │2│ │ 4 │ │ 4 │
* └───────────┘ └─┘ └─┬─┘ └───┘
* │
* ▼
* seconds since 2021-01-01 00:00:00(UTC+0)
* (lower 4bytes)
* </pre>
*/

// inspired by https://github.com/messense/rocketmq-rs
/// The codec for the message-id.
///
/// 1. Provide decoding function of message-id of all versions above v0.
/// 2. Provide a generator of message-id of v1 version.
///
/// The message-id of versions above V1 consists of 17 bytes in total. The first two bytes represent the version
/// number. For V1, these two bytes are 0x0001.
///
/// <h3>V1 message id example</h3>
///
/// <pre>
/// ┌──┬────────────┬────┬────────┬────────┐
/// │01│56F7E71C361B│21BC│024CCDBE│00000000│
/// └──┴────────────┴────┴────────┴────────┘
/// </pre>
///
/// <h3>V1 version message id generation rules</h3>
///
/// <pre>
/// process id(lower 2bytes)
/// ▲
/// mac address(lower 6bytes) │ sequence number(big endian)
/// ▲ │ ▲ (4bytes)
/// │ │ │
/// ┌─────┴─────┐ ┌┴┐ ┌───┐ ┌─┴─┐
/// 0x01+ │ 6 │ │2│ │ 4 │ │ 4 │
/// └───────────┘ └─┘ └─┬─┘ └───┘
/// │
/// ▼
/// seconds since 2021-01-01 00:00:00(UTC+0)
/// (lower 4bytes)
/// </pre>
///
/// inspired by https://github.com/messense/rocketmq-rs
pub(crate) static UNIQ_ID_GENERATOR: Lazy<Mutex<UniqueIdGenerator>> = Lazy::new(|| {
let mut wtr = Vec::new();
wtr.write_u8(1).unwrap();
Expand Down
93 changes: 79 additions & 14 deletions rust/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use async_trait::async_trait;
use mockall::{automock, mock};
use ring::hmac;
use slog::{debug, error, info, o, Logger};
use slog::{debug, error, info, o, warn, Logger};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
use tokio::sync::{mpsc, oneshot};
Expand Down Expand Up @@ -109,6 +111,9 @@ pub(crate) struct Session {
option: ClientOption,
endpoints: Endpoints,
stub: MessagingServiceClient<Channel>,
settings: Option<TelemetryCommand>,
telemetry_command_tx: Option<mpsc::Sender<Command>>,
telemetry_stream_epoch: Arc<AtomicUsize>,
telemetry_tx: Option<mpsc::Sender<TelemetryCommand>>,
shutdown_tx: Option<oneshot::Sender<()>>,
}
Expand All @@ -126,6 +131,9 @@ impl Session {
option: self.option.clone(),
endpoints: self.endpoints.clone(),
stub: self.stub.clone(),
settings: self.settings.clone(),
telemetry_command_tx: self.telemetry_command_tx.clone(),
telemetry_stream_epoch: Arc::clone(&self.telemetry_stream_epoch),
telemetry_tx: self.telemetry_tx.clone(),
shutdown_tx: None,
}
Expand Down Expand Up @@ -178,6 +186,9 @@ impl Session {
endpoints: endpoints.clone(),
client_id,
stub,
settings: None,
telemetry_command_tx: None,
telemetry_stream_epoch: Arc::new(AtomicUsize::new(0)),
telemetry_tx: None,
shutdown_tx: None,
})
Expand Down Expand Up @@ -287,6 +298,14 @@ impl Session {
settings: TelemetryCommand,
telemetry_command_tx: mpsc::Sender<Command>,
) -> Result<(), ClientError> {
if self.settings.is_none() {
self.settings = Some(settings.clone());
}

if self.telemetry_command_tx.is_none() {
self.telemetry_command_tx = Some(telemetry_command_tx.clone());
}

let (tx, rx) = mpsc::channel(16);
tx.send(settings).await.map_err(|e| {
ClientError::new(
Expand All @@ -307,15 +326,30 @@ impl Session {
)
.set_source(e)
})?;

let logger = self.logger.clone();
let epoch = self.telemetry_stream_epoch.load(Ordering::Relaxed);
info!(
logger,
"Started telemetry bidirectional stream, stream-epoch={}", epoch
);
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
self.shutdown_tx = Some(shutdown_tx);

let logger = self.logger.clone();
let stream_epoch = Arc::clone(&self.telemetry_stream_epoch);
tokio::spawn(async move {
let mut stream = response.into_inner();
let mut interval = tokio::time::interval(Duration::from_secs(3));
loop {
tokio::select! {
_ = interval.tick() => {
// If the bidirectional stream has been deprecated, finish this daemon task
// immediately.
let latest_stream_epoch = stream_epoch.load(Ordering::Relaxed);
if latest_stream_epoch != epoch {
info!(logger, "Telemetry bidirectional stream epoch has changed: {} --> {}",
epoch, latest_stream_epoch);
break;
}
}
message = stream.message() => {
match message {
Ok(Some(item)) => {
Expand Down Expand Up @@ -371,6 +405,18 @@ impl Session {
}
Ok(())
}

async fn reset_telemetry_stream(&mut self) -> Result<(), ClientError> {
if let Some((settings, tx)) = self
.settings
.as_ref()
.zip(self.telemetry_command_tx.as_ref())
{
self.telemetry_stream_epoch.fetch_add(1, Ordering::Relaxed);
self.start(settings.clone(), tx.clone()).await?;
}
Ok(())
}
}

#[async_trait]
Expand All @@ -396,15 +442,34 @@ impl RPCClient for Session {
request: HeartbeatRequest,
) -> Result<HeartbeatResponse, ClientError> {
let request = self.sign(request);
let response = self.stub.heartbeat(request).await.map_err(|e| {
ClientError::new(
ErrorKind::ClientInternal,
"send rpc heartbeat failed",
OPERATION_HEARTBEAT,
)
.set_source(e)
})?;
Ok(response.into_inner())
let heartbeat_future = self.stub.heartbeat(request);
let future = tokio::time::timeout(self.option.timeout, heartbeat_future);
match future.await {
Ok(res) => {
let response = res.map_err(|e| {
ClientError::new(
ErrorKind::ClientInternal,
"send rpc heartbeat failed",
OPERATION_HEARTBEAT,
)
.set_source(e)
})?;
Ok(response.into_inner())
}
Err(elapsed) => {
warn!(
self.logger,
"Heartbeat RPC timed out, reset telemetry bidirectional stream"
);
self.reset_telemetry_stream().await?;
Err(ClientError::new(
ErrorKind::RpcTimeout,
"Heartbeat RPC timed out",
OPERATION_HEARTBEAT,
)
.set_source(elapsed))
}
}
}

async fn send_message(
Expand Down
Loading