From be990b0953981fc555a2fa7fa8a57f1d23e57dc7 Mon Sep 17 00:00:00 2001 From: Paul Gregoire Date: Fri, 24 Mar 2017 17:20:15 -0700 Subject: [PATCH] Added property change events for CBS. Update for 1.0.9-M5 tag --- pom.xml | 2 +- .../java/org/red5/server/BaseConnection.java | 12 +++-- .../red5/server/api/stream/StreamState.java | 2 +- .../red5/server/stream/AbstractStream.java | 47 ++++++++++++++++++- .../server/stream/ClientBroadcastStream.java | 8 +++- 5 files changed, 63 insertions(+), 8 deletions(-) diff --git a/pom.xml b/pom.xml index 2239d8d8..d526455d 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ org.red5 red5-parent - 1.0.9-M4 + 1.0.9-M5 4.0.0 red5-server-common diff --git a/src/main/java/org/red5/server/BaseConnection.java b/src/main/java/org/red5/server/BaseConnection.java index f951c549..308804ac 100644 --- a/src/main/java/org/red5/server/BaseConnection.java +++ b/src/main/java/org/red5/server/BaseConnection.java @@ -123,7 +123,7 @@ public abstract class BaseConnection extends AttributeStore implements IConnecti /** * Set of basic scopes. The scopes may be of shared object or broadcast stream type. */ - protected transient CopyOnWriteArraySet basicScopes = new CopyOnWriteArraySet(); + protected transient CopyOnWriteArraySet basicScopes = new CopyOnWriteArraySet<>(); /** * Is the connection closed? @@ -133,7 +133,7 @@ public abstract class BaseConnection extends AttributeStore implements IConnecti /** * Listeners */ - protected transient CopyOnWriteArrayList connectionListeners = new CopyOnWriteArrayList(); + protected transient CopyOnWriteArrayList connectionListeners = new CopyOnWriteArrayList<>(); /** * Used to protect mulit-threaded operations on write @@ -141,7 +141,7 @@ public abstract class BaseConnection extends AttributeStore implements IConnecti private final transient Semaphore writeLock = new Semaphore(1, true); // Support for stream ids - private transient ThreadLocal streamLocal = new ThreadLocal(); + private transient ThreadLocal streamLocal = new ThreadLocal<>(); /** * Creates a new persistent base connection @@ -191,7 +191,11 @@ public BaseConnection(String type) { public BaseConnection(String type, String host, String remoteAddress, int remotePort, String path, String sessionId, Map params) { log.debug("New BaseConnection - type: {} host: {} remoteAddress: {} remotePort: {} path: {} sessionId: {}", new Object[] { type, host, remoteAddress, remotePort, path, sessionId }); log.debug("Params: {}", params); - this.type = IConnection.Type.valueOf(type.toUpperCase()); + if (type != null) { + this.type = IConnection.Type.valueOf(type.toUpperCase()); + } else { + this.type = IConnection.Type.UNKNOWN; + } this.host = host; this.remoteAddress = remoteAddress; this.remoteAddresses = new ArrayList(1); diff --git a/src/main/java/org/red5/server/api/stream/StreamState.java b/src/main/java/org/red5/server/api/stream/StreamState.java index 218e211c..787512a9 100644 --- a/src/main/java/org/red5/server/api/stream/StreamState.java +++ b/src/main/java/org/red5/server/api/stream/StreamState.java @@ -25,6 +25,6 @@ */ public enum StreamState { - INIT, UNINIT, OPEN, CLOSED, STARTED, STOPPED, PLAYING, PAUSED, RESUMED, END, SEEK; + INIT, UNINIT, OPEN, CLOSED, STARTED, STOPPED, PUBLISHING, PLAYING, PAUSED, RESUMED, END, SEEK; } diff --git a/src/main/java/org/red5/server/stream/AbstractStream.java b/src/main/java/org/red5/server/stream/AbstractStream.java index 8c5a9a88..5bc78a8c 100644 --- a/src/main/java/org/red5/server/stream/AbstractStream.java +++ b/src/main/java/org/red5/server/stream/AbstractStream.java @@ -18,6 +18,9 @@ package org.red5.server.stream; +import java.beans.PropertyChangeEvent; +import java.beans.PropertyChangeListener; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicReference; @@ -62,6 +65,11 @@ public abstract class AbstractStream implements IStream { */ private IScope scope; + /** + * Contains {@link PropertyChangeListener}s registered with this stream and following its changes of state. + */ + private CopyOnWriteArrayList stateListeners = new CopyOnWriteArrayList<>(); + /** * Timestamp the stream was created. */ @@ -72,6 +80,41 @@ public abstract class AbstractStream implements IStream { */ protected final transient Semaphore lock = new Semaphore(1, true); + /** + * Creates a new {@link PropertyChangeEvent} and delivers it to all currently registered state listeners. + * + * @param oldState + * the {@link StreamState} we had before the change + * @param newState + * the {@link StreamState} we had after the change + */ + protected void fireStateChange(StreamState oldState, StreamState newState) { + final PropertyChangeEvent evt = new PropertyChangeEvent(this, "StreamState", oldState, newState); + for (PropertyChangeListener listener : stateListeners) { + listener.propertyChange(evt); + } + } + + /** + * Adds to the list of listeners tracking changes of the {@link StreamState} of this stream. + * + * @param listener the listener to register + */ + public void addStateChangeListener(PropertyChangeListener listener) { + if (!stateListeners.contains(listener)) { + stateListeners.add(listener); + } + } + + /** + * Removes from the list of listeners tracking changes of the {@link StreamState} of this stream. + * + * @param listener the listener to remove + */ + public void removeStateChangeListener(PropertyChangeListener listener) { + stateListeners.remove(listener); + } + /** * Return stream name. * @@ -184,12 +227,14 @@ public StreamState getState() { * stream state */ public void setState(StreamState state) { - if (!this.state.equals(state)) { + StreamState oldState = this.state; + if (!oldState.equals(state)) { try { lock.acquireUninterruptibly(); this.state = state; } finally { lock.release(); + fireStateChange(oldState, state); } } } diff --git a/src/main/java/org/red5/server/stream/ClientBroadcastStream.java b/src/main/java/org/red5/server/stream/ClientBroadcastStream.java index e9ac0993..e43a805b 100644 --- a/src/main/java/org/red5/server/stream/ClientBroadcastStream.java +++ b/src/main/java/org/red5/server/stream/ClientBroadcastStream.java @@ -52,6 +52,7 @@ import org.red5.server.api.stream.IStreamCapableConnection; import org.red5.server.api.stream.IStreamListener; import org.red5.server.api.stream.IStreamPacket; +import org.red5.server.api.stream.StreamState; import org.red5.server.jmx.mxbeans.ClientBroadcastStreamMXBean; import org.red5.server.messaging.IConsumer; import org.red5.server.messaging.IFilter; @@ -225,6 +226,7 @@ public void close() { } // deregister with jmx unregisterJMX(); + setState(StreamState.CLOSED); } /** @@ -500,7 +502,7 @@ private void notifyRecordingStop() { try { handler.streamRecordStop(this); } catch (Throwable t) { - log.error("Error in notifyBroadcastClose", t); + log.error("Error in notifyRecordingStop", t); } } } @@ -711,6 +713,7 @@ private void sendPublishStartNotify() { StatusMessage startMsg = new StatusMessage(); startMsg.setBody(publishStatus); pushMessage(startMsg); + setState(StreamState.PUBLISHING); } /** @@ -724,6 +727,7 @@ private void sendPublishStopNotify() { StatusMessage stopMsg = new StatusMessage(); stopMsg.setBody(stopStatus); pushMessage(stopMsg); + setState(StreamState.STOPPED); } /** @@ -843,6 +847,7 @@ public void start() { } else { log.warn("Subscribe failed"); } + setState(StreamState.STARTED); } /** {@inheritDoc} */ @@ -864,6 +869,7 @@ public void startPublishing() { /** {@inheritDoc} */ public void stop() { //log.info("Stream stop: {}", publishedName); + setState(StreamState.STOPPED); stopRecording(); close(); }