Skip to content

Commit

Permalink
make worker message send non-blocking + use tokio spawn instead of sy…
Browse files Browse the repository at this point in the history
…stem thread
  • Loading branch information
kwannoel committed Nov 1, 2024
1 parent 0960678 commit 5869213
Showing 1 changed file with 18 additions and 22 deletions.
40 changes: 18 additions & 22 deletions sqlx-sqlite/src/connection/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::borrow::Cow;
use std::future::Future;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;

use futures_channel::oneshot;
use futures_intrusive::sync::{Mutex, MutexGuard};
Expand Down Expand Up @@ -79,9 +78,7 @@ impl ConnectionWorker {
pub(crate) async fn establish(params: EstablishParams) -> Result<Self, Error> {
let (establish_tx, establish_rx) = oneshot::channel();

thread::Builder::new()
.name(params.thread_name.clone())
.spawn(move || {
tokio::spawn(async move {
let (command_tx, command_rx) = flume::bounded(params.command_channel_size);

let conn = match params.establish() {
Expand Down Expand Up @@ -116,10 +113,11 @@ impl ConnectionWorker {
// would rollback an already completed transaction.
let mut ignore_next_start_rollback = false;

for (cmd, span) in command_rx {
while let Ok((cmd, span)) = command_rx.recv_async().await {
let _guard = span.enter();
match cmd {
Command::Prepare { query, tx } => {
// TODO(kwannoel): Make this async?
tx.send(prepare(&mut conn, &query).map(|prepared| {
update_cached_statements_size(
&conn,
Expand All @@ -130,6 +128,7 @@ impl ConnectionWorker {
.ok();
}
Command::Describe { query, tx } => {
// TODO(kwannoel): Make this async?
tx.send(describe(&mut conn, &query)).ok();
}
Command::Execute {
Expand All @@ -143,15 +142,15 @@ impl ConnectionWorker {
{
Ok(iter) => iter,
Err(e) => {
tx.send(Err(e)).ok();
tx.send_async(Err(e)).await.ok();
continue;
}
};

match limit {
None => {
for res in iter {
if tx.send(res).is_err() {
if tx.send_async(res).is_err() {
break;
}
}
Expand All @@ -166,12 +165,12 @@ impl ConnectionWorker {
rows_returned += 1;
if rows_returned >= limit {
drop(iter);
let _ = tx.send(res);
let _ = tx.send_async(res);
break;
}
}
}
if tx.send(res).is_err() {
if tx.send_async(res).is_err() {
break;
}
}
Expand All @@ -190,7 +189,7 @@ impl ConnectionWorker {
});
let res_ok = res.is_ok();

if tx.blocking_send(res).is_err() && res_ok {
if tx.send(res).await.is_err() && res_ok {
// The BEGIN was processed but not acknowledged. This means no
// `Transaction` was created and so there is no way to commit /
// rollback this transaction. We need to roll it back
Expand Down Expand Up @@ -224,7 +223,7 @@ impl ConnectionWorker {
};
let res_ok = res.is_ok();

if tx.blocking_send(res).is_err() && res_ok {
if tx.send(res).await.is_err() && res_ok {
// The COMMIT was processed but not acknowledged. This means that
// the `Transaction` doesn't know it was committed and will try to
// rollback on drop. We need to ignore that rollback.
Expand Down Expand Up @@ -252,7 +251,7 @@ impl ConnectionWorker {
let res_ok = res.is_ok();

if let Some(tx) = tx {
if tx.blocking_send(res).is_err() && res_ok {
if tx.send(res).await.is_err() && res_ok {
// The ROLLBACK was processed but not acknowledged. This means
// that the `Transaction` doesn't know it was rolled back and
// will try to rollback again on drop. We need to ignore that
Expand All @@ -268,7 +267,7 @@ impl ConnectionWorker {
}
Command::UnlockDb => {
drop(conn);
conn = futures_executor::block_on(shared.conn.lock());
conn = shared.conn.lock().await;
}
Command::Ping { tx } => {
tx.send(()).ok();
Expand All @@ -283,7 +282,7 @@ impl ConnectionWorker {
}
}
}
})?;
});

establish_rx.await.map_err(|_| Error::WorkerCrashed)?
}
Expand Down Expand Up @@ -408,12 +407,13 @@ impl ConnectionWorker {
pub(crate) fn shutdown(&mut self) -> impl Future<Output = Result<(), Error>> {
let (tx, rx) = oneshot::channel();

let send_res = self
.command_tx
.send((Command::Shutdown { tx }, Span::current()))
.map_err(|_| Error::WorkerCrashed);
let command_tx = self.command_tx.clone();

async move {
let send_res = command_tx
.send_async((Command::Shutdown { tx }, Span::current()))
.await
.map_err(|_| Error::WorkerCrashed);
send_res?;

// wait for the response
Expand Down Expand Up @@ -471,10 +471,6 @@ mod rendezvous_oneshot {
self.inner.send((value, ack_tx)).map_err(|_| Canceled)?;
ack_rx.await
}

pub fn blocking_send(self, value: T) -> Result<(), Canceled> {
futures_executor::block_on(self.send(value))
}
}

pub struct Receiver<T> {
Expand Down

0 comments on commit 5869213

Please sign in to comment.