Skip to content
Thomas Schwotzer edited this page Aug 31, 2021 · 11 revisions

ASAP became a fairly huge project after a while. Some useful little classes are implemented over some time. Some could be useful in your application. Feel free to use them. Why implementing such little things over and over again.

Extra data

We adopted a concept from Android: extra data.

ASAPPeer asapPeer = ...; // see previous sections
asapPeer.putExtra("KEY", "aValue".getBytes());
byte[] value = asapPeer.getExtra("KEY");

Extra data are stored with an ASAPPeer in a persistent storage and can be retrieved after system re-launch.

AlarmClock

Distributed systems are often need timeouts. A message is sent to another peer. We expect some result within a given time frame. What would we do in real life? Go to bed and set the alarm. Meet class AlarmClock.

class YourClass implements AlarmClockListener {
... 
    // define constants
    public static final int TASK_0 = 0;
    public static final int TASK_1 = 1;

    // implement AlarmClockListener - what happens after alarm
    public void alarmClockRinging(int yourKey) {
        switch (yourKey) {
            case TASK_0: /* do something */; break;
            case TASK_1: /* do something else */; break;
        }
    }

...
    long timeout0 = 1000;
    // set alarm - your class will be called in 1000 milliseconds
    new AlarmClock(timeout0, TASK_0, this).start();
    // another alarm - 5 seconds
    new AlarmClock(5000, TASK_0, this).start();

PeerIDHelper

ASAP peer need a unique identifier. If you do not get this identifier from somewhere else, feel free to use PeerIDHelper.

String id_0 = PeerIDHelper.createUniqueID();
String id_1 = PeerIDHelper.createUniqueID();

if(PeerIDHelper.sameId(id_0, id_1) 
    System.out.print("something is very wrong here");

if(!PeerIDHelper.sameId(id_0, id_0) 
    System.out.print("also wrong");

Each call creates another id. Ids can be compared.

StreamLinks

Some tricky things came up first when implementing the ASAPHub. The same thing came up in another corner of the project which brought the package streams in this core library.

Here was the task: There is a process. This process has two connections (e.g. a TCP connection) to two other processes (maybe on other machines). Each connection consists of an output- and input stream. Now, this process most connection both processes. It shall act as a mediator. Why? Have a look at our ASAPHub project for an answer. How do we do this? Meet class StreamPairLink.

// we have to stream to one process
InputStream isA, OutputStream osA;
// we have to stream to another process
InputStream isB, OutputStream osB;

// we can create stream pairs
StreamPair pairA = new StreamPairImpl(isA, osA);
StreamPair pairB = new StreamPairImpl(isB, osB);

// we can link them
StreamPairLink link = new StreamPairLink(
    pairA, 
    "connection to A" // makes debuggung message readable
    pairB, "connection to B");

The link object is a thread. It is already started with the constructor. Now, both process can communicate over this process. What happens if one process closes a channel? We close the other stream pair as well. Result? Process A closes its socket. This mediator process (link object) gets informed (IOException). It closes socket to B which is informed as well.

StreamPairWrapper

It became more complex. Same setting: Process in the middle connected to two others A and B e.g. with a socket. Now... We want to borrow that connection on either side to another process. This process writes and reads data. And - and that is the challenge – once it is done, it closes its connection.

The socket would be closed on two sides A and B. But we did not want the socket to be killed. We want to use after the process stopped. How can we solve this problem? Meet class StreamPairWrapper.

// we have to stream to one process
InputStream isA, OutputStream osA;
// we have to stream to another process
InputStream isB, OutputStream osB;

// we can create stream pairs
StreamPair pairA = new StreamPairWrapper(isA, osA);
StreamPair pairB = new StreamPairWrapper(isB, osB);

// we can link them
StreamPairLink link = new StreamPairLink(pairA, "A", pairB, "B");

// somewhere in your code - e.g. after a timeout
pairA.close();

What happens? pairA is a wrapped stream pair. The close() would not close isA or osA. Each subsequent read or write call would produce an IOException. It would behave as closed but the original connection is still there. We used that wrapper on both side of the channel. We found it helpful.