Skip to content

Commit

Permalink
Added property change events for CBS. Update for 1.0.9-M5 tag
Browse files Browse the repository at this point in the history
  • Loading branch information
mondain committed Mar 25, 2017
1 parent 9ec2443 commit be990b0
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.0.9-M4</version>
<version>1.0.9-M5</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-server-common</artifactId>
Expand Down
12 changes: 8 additions & 4 deletions src/main/java/org/red5/server/BaseConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public abstract class BaseConnection extends AttributeStore implements IConnecti
/**
* Set of basic scopes. The scopes may be of shared object or broadcast stream type.
*/
protected transient CopyOnWriteArraySet<IBasicScope> basicScopes = new CopyOnWriteArraySet<IBasicScope>();
protected transient CopyOnWriteArraySet<IBasicScope> basicScopes = new CopyOnWriteArraySet<>();

/**
* Is the connection closed?
Expand All @@ -133,15 +133,15 @@ public abstract class BaseConnection extends AttributeStore implements IConnecti
/**
* Listeners
*/
protected transient CopyOnWriteArrayList<IConnectionListener> connectionListeners = new CopyOnWriteArrayList<IConnectionListener>();
protected transient CopyOnWriteArrayList<IConnectionListener> connectionListeners = new CopyOnWriteArrayList<>();

/**
* Used to protect mulit-threaded operations on write
*/
private final transient Semaphore writeLock = new Semaphore(1, true);

// Support for stream ids
private transient ThreadLocal<Number> streamLocal = new ThreadLocal<Number>();
private transient ThreadLocal<Number> streamLocal = new ThreadLocal<>();

/**
* Creates a new persistent base connection
Expand Down Expand Up @@ -191,7 +191,11 @@ public BaseConnection(String type) {
public BaseConnection(String type, String host, String remoteAddress, int remotePort, String path, String sessionId, Map<String, Object> params) {
log.debug("New BaseConnection - type: {} host: {} remoteAddress: {} remotePort: {} path: {} sessionId: {}", new Object[] { type, host, remoteAddress, remotePort, path, sessionId });
log.debug("Params: {}", params);
this.type = IConnection.Type.valueOf(type.toUpperCase());
if (type != null) {
this.type = IConnection.Type.valueOf(type.toUpperCase());
} else {
this.type = IConnection.Type.UNKNOWN;
}
this.host = host;
this.remoteAddress = remoteAddress;
this.remoteAddresses = new ArrayList<String>(1);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/red5/server/api/stream/StreamState.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@
*/
public enum StreamState {

INIT, UNINIT, OPEN, CLOSED, STARTED, STOPPED, PLAYING, PAUSED, RESUMED, END, SEEK;
INIT, UNINIT, OPEN, CLOSED, STARTED, STOPPED, PUBLISHING, PLAYING, PAUSED, RESUMED, END, SEEK;

}
47 changes: 46 additions & 1 deletion src/main/java/org/red5/server/stream/AbstractStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.red5.server.stream;

import java.beans.PropertyChangeEvent;
import java.beans.PropertyChangeListener;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -62,6 +65,11 @@ public abstract class AbstractStream implements IStream {
*/
private IScope scope;

/**
* Contains {@link PropertyChangeListener}s registered with this stream and following its changes of state.
*/
private CopyOnWriteArrayList<PropertyChangeListener> stateListeners = new CopyOnWriteArrayList<>();

/**
* Timestamp the stream was created.
*/
Expand All @@ -72,6 +80,41 @@ public abstract class AbstractStream implements IStream {
*/
protected final transient Semaphore lock = new Semaphore(1, true);

/**
* Creates a new {@link PropertyChangeEvent} and delivers it to all currently registered state listeners.
*
* @param oldState
* the {@link StreamState} we had before the change
* @param newState
* the {@link StreamState} we had after the change
*/
protected void fireStateChange(StreamState oldState, StreamState newState) {
final PropertyChangeEvent evt = new PropertyChangeEvent(this, "StreamState", oldState, newState);
for (PropertyChangeListener listener : stateListeners) {
listener.propertyChange(evt);
}
}

/**
* Adds to the list of listeners tracking changes of the {@link StreamState} of this stream.
*
* @param listener the listener to register
*/
public void addStateChangeListener(PropertyChangeListener listener) {
if (!stateListeners.contains(listener)) {
stateListeners.add(listener);
}
}

/**
* Removes from the list of listeners tracking changes of the {@link StreamState} of this stream.
*
* @param listener the listener to remove
*/
public void removeStateChangeListener(PropertyChangeListener listener) {
stateListeners.remove(listener);
}

/**
* Return stream name.
*
Expand Down Expand Up @@ -184,12 +227,14 @@ public StreamState getState() {
* stream state
*/
public void setState(StreamState state) {
if (!this.state.equals(state)) {
StreamState oldState = this.state;
if (!oldState.equals(state)) {
try {
lock.acquireUninterruptibly();
this.state = state;
} finally {
lock.release();
fireStateChange(oldState, state);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.red5.server.api.stream.IStreamCapableConnection;
import org.red5.server.api.stream.IStreamListener;
import org.red5.server.api.stream.IStreamPacket;
import org.red5.server.api.stream.StreamState;
import org.red5.server.jmx.mxbeans.ClientBroadcastStreamMXBean;
import org.red5.server.messaging.IConsumer;
import org.red5.server.messaging.IFilter;
Expand Down Expand Up @@ -225,6 +226,7 @@ public void close() {
}
// deregister with jmx
unregisterJMX();
setState(StreamState.CLOSED);
}

/**
Expand Down Expand Up @@ -500,7 +502,7 @@ private void notifyRecordingStop() {
try {
handler.streamRecordStop(this);
} catch (Throwable t) {
log.error("Error in notifyBroadcastClose", t);
log.error("Error in notifyRecordingStop", t);
}
}
}
Expand Down Expand Up @@ -711,6 +713,7 @@ private void sendPublishStartNotify() {
StatusMessage startMsg = new StatusMessage();
startMsg.setBody(publishStatus);
pushMessage(startMsg);
setState(StreamState.PUBLISHING);
}

/**
Expand All @@ -724,6 +727,7 @@ private void sendPublishStopNotify() {
StatusMessage stopMsg = new StatusMessage();
stopMsg.setBody(stopStatus);
pushMessage(stopMsg);
setState(StreamState.STOPPED);
}

/**
Expand Down Expand Up @@ -843,6 +847,7 @@ public void start() {
} else {
log.warn("Subscribe failed");
}
setState(StreamState.STARTED);
}

/** {@inheritDoc} */
Expand All @@ -864,6 +869,7 @@ public void startPublishing() {
/** {@inheritDoc} */
public void stop() {
//log.info("Stream stop: {}", publishedName);
setState(StreamState.STOPPED);
stopRecording();
close();
}
Expand Down

0 comments on commit be990b0

Please sign in to comment.