Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How should client connect to a channel. #4

Open
mrkraimer opened this issue Mar 30, 2016 · 0 comments
Open

How should client connect to a channel. #4

mrkraimer opened this issue Mar 30, 2016 · 0 comments

Comments

@mrkraimer
Copy link
Contributor

channelConnect

This issue discusses how pvaClient (both C++ and Java) should connect to a channel.
In particular how to manage locking.

Connection involves the following methods.

  • channelCreated - pvAccess callback
  • channelStateChange - pvAccess callback
  • issueConnect - pvaClient method
  • waitConnect - pvaClient method

issueConnect, via a channelProvider, makes a call to createChannel.
As a result of this call channelCreated is always called and channelStateChange may be called.
The channel may or may not already be connected when channelCreated is called.

Sometime after issueConnect returns waitConnect is called.
The connection may already be complete but if not then pvaClient must wait until channelCreated or channelStateChange
is called and either shows success or failure.

Both pvAccess and pvaClient have internal variables that must be protected by a locking mechanism.

I think then logic is similar for both the Java and C++ implementations of pvaClient except for waitConnect.

The Java implementation is:

public Status waitConnect(double timeout)
{
    if(isDestroyed) throw new RuntimeException("pvaClientChannel was destroyed");
    if(channel.isConnected()) return channelConnectStatus;
    lock.lock();
    try {
        long nano = (long)(timeout*1e9);
        waitForConnect.awaitNanos(nano);
    } catch(InterruptedException e) {
        Status status = statusCreate.createStatus(StatusType.ERROR,e.getMessage(), e.fillInStackTrace());
        return status;
    } finally {
        lock.unlock();
    }
    return channelConnectStatus;
}

NOTES

  • channel.isConnected() is called without locking. If called with lock held then deadly embrace.
  • channel.isConnected is synchronized method in pvAccessJava
  • Looks like extra wait can happen if connection callback occcurs after isConnected and lock.lock

The C++ implementation is:

Status PvaClientChannel::waitConnect(double timeout)
{
    {
        Lock xx(mutex);
        if(isDestroyed) throw std::runtime_error("pvaClientChannel was destroyed");
        if(channel->isConnected()) return Status::Ok;
    }
    waitForConnect.wait(timeout);
    return channelConnectStatus;
}

NOTES:

  • channel->isConnected()) is called with lock held
  • waitForConnect.wait(timeout) is called with no lock held
  • Looks like extra wait can happen if connection callback occcurs after mutex is unlocked and waitForConnect

NOTE: locking in Java is NOT like locking in C++.

SO WHAT TO DO?

Matej,

Any ideas?

The following are the Java and C++ implemetation of the methods discussed above.

pvaClientJava

public void channelCreated(Status status, Channel channel) {
    lock.lock();
    try {
        if(isDestroyed) throw new RuntimeException("pvaClientChannel was destroyed");
        if(status.isOK()) {
            this.channel = channel;
            if(channel.isConnected()) {
                boolean waitingForConnect = false;
                if(connectState==ConnectState.connectActive) waitingForConnect = true;
                connectState = ConnectState.connected;
                channelConnectStatus = statusCreate.getStatusOK();
                if(waitingForConnect) waitForConnect.signal();
            }
            return;
        }
    } finally {
        lock.unlock();
    }
    System.err.println("PvaClientChannel::channelCreated status "
            + status.getMessage() + "why??");
}

public void  (Channel channel,ConnectionState connectionState) {
    lock.lock();
    try {
        if(isDestroyed) return;
        boolean waitingForConnect = false;
        if(connectState==ConnectState.connectActive) waitingForConnect = true;
        if(connectionState!=ConnectionState.CONNECTED) {
            String message = channelName + " connection state " + connectionState.name();
            message(message,MessageType.error);
            channelConnectStatus = statusCreate.createStatus(StatusType.ERROR,message,null);
            connectState = ConnectState.notConnected;
        } else {
            connectState = ConnectState.connected;
            channelConnectStatus = statusCreate.getStatusOK();
        }   
        if(waitingForConnect) waitForConnect.signal();
    } finally {
        lock.unlock();
    }
}

public void issueConnect()
{
    lock.lock();
    try {
        if(isDestroyed) throw new RuntimeException("pvaClientChannel was destroyed");
        if(connectState==ConnectState.connected) return;
        if(connectState!=ConnectState.connectIdle) {
            throw new RuntimeException("pvaClientChannel already connected");
        }
        channelConnectStatus = statusCreate.createStatus(
                StatusType.ERROR,
                getChannelName() + " createChannel failed",null);
        connectState = ConnectState.connectActive;  
    } finally {
        lock.unlock();
    }
    ChannelProvider provider = ChannelProviderRegistryFactory
            .getChannelProviderRegistry().getProvider(providerName);
    if(provider==null) {
        String mess = getChannelName() + " provider " + providerName + " not registered";
        throw new RuntimeException(mess);
    }
    channel = provider.createChannel(channelName, this, ChannelProvider.PRIORITY_DEFAULT);
    if(channel==null) {
        String mess = getChannelName() + " channelCreate failed ";
        throw new RuntimeException(mess);
    }
}


public Status waitConnect(double timeout)
{
    if(isDestroyed) throw new RuntimeException("pvaClientChannel was destroyed");
    if(channel.isConnected()) return channelConnectStatus;
    lock.lock();
    try {
        long nano = (long)(timeout*1e9);
        waitForConnect.awaitNanos(nano);
    } catch(InterruptedException e) {
        Status status = statusCreate.createStatus(StatusType.ERROR,e.getMessage(), e.fillInStackTrace());
        return status;
    } finally {
        lock.unlock();
    }
    return channelConnectStatus;
}

pvaClientCPP

void PvaClientChannel::channelCreated(const Status& status, Channel::shared_pointer const & channel)
{
    Lock xx(mutex);
    if(isDestroyed) throw std::runtime_error("pvaClientChannel was destroyed");
    if(status.isOK()) {
        this->channel = channel;
        if(channel->isConnected()) {
             bool waitingForConnect = false;
             if(connectState==connectActive) waitingForConnect = true;
             connectState = connected;
             channelConnectStatus = Status::Ok;
             if(waitingForConnect) waitForConnect.signal();
        }
        return;
    }
    cout << "PvaClientChannel::channelCreated status " << status.getMessage() << " why??\n";
}

void PvaClientChannel::channelStateChange(
    Channel::shared_pointer const & channel,
    Channel::ConnectionState connectionState)
{
    Lock xx(mutex);
    if(isDestroyed) return;
    bool waitingForConnect = false;
    if(connectState==connectActive) waitingForConnect = true;
    if(connectionState!=Channel::CONNECTED) {
         string mess(channelName +
             " connection state " + Channel::ConnectionStateNames[connectionState]);
         message(mess,errorMessage);
         channelConnectStatus = Status(Status::STATUSTYPE_ERROR,mess);
         connectState = notConnected;
    } else {
         connectState = connected;
         channelConnectStatus = Status::Ok;
    }
    if(waitingForConnect) waitForConnect.signal();
}

void PvaClientChannel::issueConnect()
{
    {
        Lock xx(mutex);
        if(isDestroyed) throw std::runtime_error("pvaClientChannel was destroyed");
        if(connectState!=connectIdle) {
           throw std::runtime_error("pvaClientChannel already connected");
        }

        channelConnectStatus = Status(
             Status::STATUSTYPE_ERROR,
             getChannelName() + " createChannel failed");
        connectState = connectActive;
    }
    ChannelProviderRegistry::shared_pointer reg = getChannelProviderRegistry();
    ChannelProvider::shared_pointer provider = reg->getProvider(providerName);
    if(!provider) {
        throw std::runtime_error(getChannelName() + " provider " + providerName + " not registered");
    }
    channelRequester = ChannelRequester::shared_pointer(new ChannelRequesterImpl(this));
    channel = provider->createChannel(channelName,channelRequester,ChannelProvider::PRIORITY_DEFAULT);
    if(!channel) {
         throw std::runtime_error(getChannelName() + " channelCreate failed ");
    }
}

Status PvaClientChannel::waitConnect(double timeout)
{
    {
        Lock xx(mutex);
        if(isDestroyed) throw std::runtime_error("pvaClientChannel was destroyed");
        if(channel->isConnected()) return Status::Ok;
    }
    waitForConnect.wait(timeout);
    return channelConnectStatus;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant