Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Place mutex on Connection #48

Open
cts660 opened this issue Sep 9, 2023 · 5 comments
Open

Place mutex on Connection #48

cts660 opened this issue Sep 9, 2023 · 5 comments

Comments

@cts660
Copy link

cts660 commented Sep 9, 2023

I would like to use a connection to MemGraph in rust as a state in different functions, however when I tried to place a mutex on a connection to share it safely, the compiler indicated that `Sync` is not implemented for `*mut rsmgclient::bindings::mg_session. Is there any other way to safely share a MemGraph connection across multiple threads? Thank you!

@kkuriboh
Copy link

you can create a wrapper and force it to be send, it's just dangerous if they are not properly handling the raw pointer inside the library

struct SendSync<T>(T);

unsafe impl<T> Send for SendSync<T> {}
unsafe impl<T> Sync for SendSync<T> {}

impl<T> SendSync<T> {
    fn new(value: T) -> Self {
        Self(value)
    }
}

impl<T> core::ops::Deref for SendSync<T> {
    type Target = T;
    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

let connection = SendSync::new(connection);

@katarinasupe
Copy link

Hi @cts660, did the above tip help you? Did you manage to safely share a MemGraph connection across multiple threads?

@magnusart
Copy link

magnusart commented Oct 10, 2024

I have the same issue when trying to use memgraph in a database pool. Is using the connection, is sharing a connection not recommended usage?

@gitbuda
Copy link
Member

gitbuda commented Oct 11, 2024

Hi @magnusart! Yes, in the current implementation, a single connection only works in the single-threaded context (we only focused on that so far). There should be support for multi-threaded and/or async execution, but that is yet to be implemented in the rsmgclient. There is a plan to get there, but it's not an immediate priority. I guess a few options for you:

  • it should be possible to implement a wrapper on your app side to make everything thread-safe; running a single connection under a thread (each connection has a certain state and that has to be managed)
  • feel free to contribute to the rsmgclient because that's really where the functionality should be; I'll try to help on that side as much as possible (review, merge, maybe contribute something as well) + Memgraph has some awesome swag to give away (to start with 😃)

@magnusart
Copy link

I tried to create a quick and dirty connection pool using bb8 and the code from @kkuriboh. It's not clear to me if this is safe or not, as kkuriboh commented on.

My rust-foo does not go deep enough to understand what type of risks I am taking here? Is the checking of the statuses in is_valid and is_broken enough?

But it seems to work when naively trying it out.

struct SendSync<T>(T);

unsafe impl<T> Send for SendSync<T> {}
unsafe impl<T> Sync for SendSync<T> {}

impl<T> SendSync<T> {
    fn new(value: T) -> Self {
        Self(value)
    }
}

impl<T> core::ops::Deref for SendSync<T> {
    type Target = T;
    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

impl<T> core::ops::DerefMut for SendSync<T> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.0
    }
}

#[async_trait]
impl bb8::ManageConnection for ServerAddress {
    type Connection = SendSync<Connection>;
    type Error = AppError;

    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
        let conn_params = match self.port {
            Some(port) => ConnectParams {
                host: Some(self.domain.to_string()),
                port,
                ..ConnectParams::default()
            },
            None => ConnectParams {
                host: Some(self.domain.to_string()),
                ..ConnectParams::default()
            },
        };

        let connection =
            Connection::connect(&conn_params).map_err(AppError::CreateSystemMgError)?;

        let connection = SendSync::new(connection);

        Ok(connection)
    }

    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
        match conn.status() {
            rsmgclient::ConnectionStatus::Ready => Ok(()),
            _ => Err(AppError::MgConnectionNotReady),
        }
    }

    fn has_broken(&self, conn: &mut Self::Connection) -> bool {
        matches!(conn.status(), rsmgclient::ConnectionStatus::Closed)
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants