Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send to Event Hub hangs if idle for 30+ minute #200

Open
ciaran-conditionalai opened this issue Apr 9, 2023 · 18 comments
Open

Send to Event Hub hangs if idle for 30+ minute #200

ciaran-conditionalai opened this issue Apr 9, 2023 · 18 comments

Comments

@ciaran-conditionalai
Copy link

Hi,

I've been running some tests based on the event hubs simple sender example. If a simulated idle occurs for 30+ minutes (this worked for 20 minutes) the second send call in the test pasted below hangs. Before calling it both the connection and session indicate they are not closed or ended. Ideally, the send should return an error instead of hanging. Details follow.

Dependencies

[dependencies]
fe2o3-amqp = { version = "0.8.20", features = ["rustls"] }
tokio = {version = "1.27.0", features = ["net", "rt", "rt-multi-thread", "macros"] }

Test to Reproduce

use std::env;

use dotenv::dotenv;
use fe2o3_amqp::{
    connection::ConnectionHandle,
    sasl_profile::SaslProfile,
    session::SessionHandle,
    types::{
        messaging::{Message, Properties},
        primitives::Binary,
    },
    Connection, Sender, Session,
};
use tokio::time::{sleep, Duration};

// https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-troubleshoot
// cargo test test_send_after_idle_for_30_mins -- --nocapture
#[tokio::test]
pub async fn test_send_after_idle_for_30_mins() {
    println!("Start");

    dotenv().ok();

    let hostname = env::var("HOST_NAME").unwrap();
    let sa_key_name = env::var("SHARED_ACCESS_KEY_NAME").unwrap();
    let sa_key_value = env::var("SHARED_ACCESS_KEY_VALUE").unwrap();
    let event_hub_name = env::var("EVENT_HUB_NAME").unwrap();

    let url = format!("amqps://{}", hostname);
    let mut connection = Connection::builder()
        .container_id("rust-connection-1")
        .alt_tls_establishment(true) // EventHubs uses alternative TLS establishment
        .sasl_profile(SaslProfile::Plain {
            username: sa_key_name,
            password: sa_key_value,
        })
        .open(&url[..])
        .await
        .unwrap();
    let mut session = Session::begin(&mut connection).await.unwrap();
    let mut sender = Sender::attach(&mut session, "rust-simple-sender", event_hub_name)
        .await
        .unwrap();

    println!("Before first send message");
    send(&connection, &session, &mut sender, "test message 1").await;

    println!("Before simulate idling for 30mins");
    sleep(Duration::from_secs(60 * 30)).await;

    println!("Before second send message");
    send(&connection, &session, &mut sender, "test message 2").await;

    println!("Before close, end, close");
    sender.close().await.unwrap();
    session.end().await.unwrap();
    connection.close().await.unwrap();

    println!("Finished");
}

async fn send(
    connection: &ConnectionHandle<()>,
    session: &SessionHandle<()>,
    sender: &mut Sender,
    data: &str,
) {
    println!(
        "connection.closed={}, session.ended={}",
        connection.is_closed(),
        session.is_ended()
    );

    let message = Message::builder()
        .properties(
            Properties::builder()
                .group_id(String::from("send_to_event_hub"))
                .build(),
        )
        .data(Binary::from(data))
        .build();
    let outcome = sender.send(message).await.unwrap();
    outcome.accepted_or_else(|outcome| outcome).unwrap();
}
@minghuaw
Copy link
Owner

I will look into it

@minghuaw
Copy link
Owner

My initial test actually ended up giving me an IllegalState error after sleeping for 40 minutes. I will do more investigations

@ciaran-conditionalai
Copy link
Author

sorry, hit wrong button. Running test again with trace output enabled to see if I can get any additional information for you. One thing I have noticed is that just before the idle timeout ends there are 4 requests successfully processed by the event hub - what they are I don't know yet.

@minghuaw
Copy link
Owner

@ciaran-conditionalai I found the problem. Event Hubs force detaching a link if the link has been idle for 30 minutes. Below is the detach frame received from Event Hubs

frame=Detach { handle: Handle(0), closed: true, error: Some(Error { condition: LinkError(DetachForced), description: Some("The link 'G29:140866372:rust-simple-sender' is force detached. Code: ServerError. Details: AmqpEventHubPublisher.IdleTimerExpired: Idle timeout: 00:30:00. TrackingId:af1ffc0b0000a4ba0064c6a364339590_G29_B53, SystemTracker:fe2o3-amqp-event-hubs-example:eventhub:test-example~10922, Timestamp:2023-04-10T05:30:24"), info: None }) }

@minghuaw
Copy link
Owner

I guess one thing you can do is to detach and then re-attach the link if there is a LinkStateError. This method may be helpful https://docs.rs/fe2o3-amqp/0.8.20/fe2o3_amqp/link/sender/struct.Sender.html#method.detach_then_resume_on_session

@ciaran-conditionalai
Copy link
Author

Interesting, when logging trace output the test does generate an expected error for this scenario and the test bombs out:

thread 'test_send_after_idle_for_30_mins' panicked at 'called `Result::unwrap()` on an `Err` value: LinkStateError(RemoteClosedWithError(Error { condition: LinkError(DetachForced), description: Some("Idle link tracker, link rust-simple-sender has been idle for 1800000ms TrackingId:ccae0df3-9e98-4969-8cec-ef6c8c5707fa_G14, SystemTracker:coreilly-dev-eventhub-ns:EventHub:dev_coreilly, Timestamp:2023-04-10T05:30:07"), info: None }))'

but I do see the test hang if no logging output being generated so adding recovery code around the LinkStateError looks to be the right thing to do it likely won't get called due to the hang.

@minghuaw
Copy link
Owner

That is interesting. My test cases never had the link hanging whether logging with either tracing or log is enabled or not.

Another thing I found in the log is that the session will be forced to close after the link is forced to close. And then the AMQP connection will be considered inactive after all its sessions/links are closed, and the connection will be closed after the connection is inactive for 300000 milliseconds.

So you may need recover all the way from connection if there is only one link on that connection.

@minghuaw
Copy link
Owner

@ciaran-conditionalai FYI, I am currently working on an AMQP 1.0 based Event Hubs SDK for rust (Azure/azure-sdk-for-rust#1260). Though the producer client API already works, I haven't implemented auto-recovery for the producer client unfortunately. I was planning to work on recovery after implementing the consumer client, but now I may prioritize auto-recovery

@ciaran-conditionalai
Copy link
Author

@minghuaw thanks for the link, I'd just started on trying to write similar event hub producer/consumer clients myself based on the Java SDK, which I've used previously.

@minghuaw
Copy link
Owner

@ciaran-conditionalai I was wondering if you have any suggestion for this issue #40 ? The sender is kinda lazy that in the case of being forced to close after inactivity, it won't automatically reply to the remote Detach unless the sender tries to send something or detach/close itself.

@ciaran-conditionalai
Copy link
Author

I am not overly familiar with the underlying protocol, but my thoughts would be that the sender should be actively listening (blocking on a separate thread) on protocol control signals sent from the remote. It might be worth plumbing the depths of the Java amqp library as they are most likely addressing (I have used in production and it handles these idle scenarios with little issue - i.e. the setup is bursts of streamed data).

@minghuaw
Copy link
Owner

Thanks for your feedback. I have discovered another behavior of Event Hubs. It doesn't allow detaching then re-attaching the same link. So upon closed link/session/connection due to inactivity, you would actually need to create entirely new links.

@minghuaw
Copy link
Owner

It doesn't allow detaching then re-attaching the same link. So upon closed link/session/connection due to inactivity, you would actually need to create entirely new links.

This is probably my fault. I didn't do CBS auth before re-attaching

@ciaran-conditionalai
Copy link
Author

ciaran-conditionalai commented Apr 11, 2023 via email

@minghuaw
Copy link
Owner

minghuaw commented Apr 11, 2023

@ciaran-conditionalai I have briefly tested auto recovery on the recent commit Azure/azure-sdk-for-rust@7e881fa (in this branch https://github.com/minghuaw/azure-sdk-for-rust/tree/eventhubs_over_amqp), which you could probably give a try (I haven't add any documentation yet). I have tested both inactivity and manually turning off my router. It seems to work fine so far.

The receiver client has not been implemented yet. However, it doesn't seem like Event Hubs enforce the same inactivity rule for the receivers anyway.

@ciaran-conditionalai
Copy link
Author

ciaran-conditionalai commented Apr 14, 2023 via email

@minghuaw
Copy link
Owner

@ciaran-conditionalai I have published the initial release of the event hub sdk on crates.io (https://crates.io/crates/azeventhubs). Both EventHubProducerClient and EventHubConsumerClient are implemented. Processor APIs have not been implemented yet.

@ciaran-conditionalai
Copy link
Author

ciaran-conditionalai commented Apr 25, 2023 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants