Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

After fanatids review #356

Merged
merged 2 commits into from
Mar 12, 2024
Merged

Conversation

godmodegalactus
Copy link
Collaborator

No description provided.

static ref TX_TIMED_OUT: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_tx_timeout", "Number of transactions that timeout")).unwrap();
pub static ref TXS_IN_CHANNEL: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_txs_in_channel", "Transactions in channel")).unwrap();

}

// making 250 as sleep time will effectively make lite rpc send
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is no longer needed with new tpu client.

}
let mut instant = Instant::now();
let mut notifications = vec![];
while let Some(transaction_info) = recv.recv().await {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can lock the process for an unknown time, better apply timeout + try_recv
something like this

loop {
    let deadline = Instant::now() + INTERVAL_PER_BATCH_IN_MS;
    let mut notifications = vec![];
    loop {
        let maybe_transaction_info = if deadline.elapsed().is_zer() {
            match tokio::time::timeout_at(deadline, recv.recv()).await {
                Ok(Some(tx_info)) => Some(tx_info),
                Ok(None) => bail!("Tx sender service stopped"),
                Err(_error) => None,
            }
        } else {
            match recv.try_recv() {
                tokio::sync::mpsc::error::TryRecvError::Empty => None,
                tokio::sync::mpsc::error::TryRecvError::Disconnected => bail!("Tx sender service stopped")
            }
        };

        if let Some(transaction_info) = maybe_transaction_info {
            self.forward_txs(&transaction_info).await;

            // send transaction notifications in batches
            if let Some(notifier) = &notifier {
                let forwarded_slot = self.data_cache.slot_cache.get_current_slot();
                let forwarded_local_time = Utc::now();
                let tx_notification = TransactionNotification {
                    signature: transaction_info.signature,
                    recent_slot: transaction_info.slot,
                    forwarded_slot,
                    forwarded_local_time,
                    processed_slot: None,
                    cu_consumed: None,
                    cu_requested: None,
                    quic_response: 0,
                };
                notifications.push(tx_notification);
            }
        } else {
            // send transaction notifications in batches
            if let Some(notifier) = &notifier {
                // send notification for sent transactions
                let _ = notifier.send(NotificationMsg::TxNotificationMsg(notifications));
            }
            break;
        }
    }
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have addressed your concerns using tokio select, it seems more readable for me.

core/src/structures/rotating_queue.rs Outdated Show resolved Hide resolved
@godmodegalactus godmodegalactus requested a review from fanatid March 11, 2024 18:21
Copy link

@fanatid fanatid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

instant = Instant::now();
// send notification for sent transactions
let _ = notifier.send(NotificationMsg::TxNotificationMsg(
notifications.drain(..).collect_vec(),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::mem::take does not need iterator, but maybe resulted generated code would be same

@godmodegalactus godmodegalactus merged commit ce215e0 into main Mar 12, 2024
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants