From e2a4cd5d2eeeed6a9e02668fdd3f386f4b1adaee Mon Sep 17 00:00:00 2001 From: Paul Gregoire Date: Wed, 19 Jan 2022 08:01:09 -0800 Subject: [PATCH] Fix for event dispatching and update for connection property exposure to extension --- .../red5/server/net/rtmp/BaseRTMPHandler.java | 9 ++++----- .../org/red5/server/net/rtmp/RTMPConnection.java | 16 ++++++++-------- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java b/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java index 766cf9ef..3b41d1b4 100644 --- a/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java +++ b/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java @@ -40,7 +40,6 @@ import org.red5.server.net.rtmp.message.Packet; import org.red5.server.net.rtmp.status.StatusCodes; import org.red5.server.so.SharedObjectMessage; -import org.red5.server.stream.ClientBroadcastStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,7 +108,7 @@ public void messageReceived(RTMPConnection conn, Packet packet) throws Exception if (stream != null) { EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME); if (epeo == null && stream != null) { - epeo = new EnsuresPacketExecutionOrder((ClientBroadcastStream) stream, conn); + epeo = new EnsuresPacketExecutionOrder((IEventDispatcher) stream, conn); conn.setAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME, epeo); } epeo.addPacket(message); @@ -136,7 +135,7 @@ public void messageReceived(RTMPConnection conn, Packet packet) throws Exception // Stream metadata EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME); if (epeo == null) { - epeo = new EnsuresPacketExecutionOrder((ClientBroadcastStream) stream, conn); + epeo = new EnsuresPacketExecutionOrder((IEventDispatcher) stream, conn); conn.setAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME, epeo); } epeo.addPacket(message); @@ -376,13 +375,13 @@ private static class EnsuresPacketExecutionOrder implements Runnable { private AtomicBoolean state = new AtomicBoolean(); - private final ClientBroadcastStream stream; + private final IEventDispatcher stream; private final RTMPConnection conn; private int iter; - public EnsuresPacketExecutionOrder(ClientBroadcastStream stream, RTMPConnection conn) { + public EnsuresPacketExecutionOrder(IEventDispatcher stream, RTMPConnection conn) { this.stream = stream; this.conn = conn; } diff --git a/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java b/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java index 33864329..10c81a45 100755 --- a/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java +++ b/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java @@ -185,31 +185,31 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa * * @see org.red5.server.net.rtmp.Channel */ - private transient ConcurrentMap channels = new ConcurrentHashMap<>(channelsInitalCapacity, 0.9f, channelsConcurrencyLevel); + protected transient ConcurrentMap channels = new ConcurrentHashMap<>(channelsInitalCapacity, 0.9f, channelsConcurrencyLevel); /** * Queues of tasks for every channel * * @see org.red5.server.net.rtmp.ReceivedMessageTaskQueue */ - private final transient ConcurrentMap tasksByStreams = new ConcurrentHashMap<>(streamsInitalCapacity, 0.9f, streamsConcurrencyLevel); + protected final transient ConcurrentMap tasksByStreams = new ConcurrentHashMap<>(streamsInitalCapacity, 0.9f, streamsConcurrencyLevel); /** * Client streams * * @see org.red5.server.api.stream.IClientStream */ - private transient ConcurrentMap streams = new ConcurrentHashMap<>(streamsInitalCapacity, 0.9f, streamsConcurrencyLevel); + protected transient ConcurrentMap streams = new ConcurrentHashMap<>(streamsInitalCapacity, 0.9f, streamsConcurrencyLevel); /** * Reserved stream ids. Stream id's directly relate to individual NetStream instances. */ - private transient Set reservedStreams = Collections.newSetFromMap(new ConcurrentHashMap(reservedStreamsInitalCapacity, 0.9f, reservedStreamsConcurrencyLevel)); + protected transient Set reservedStreams = Collections.newSetFromMap(new ConcurrentHashMap(reservedStreamsInitalCapacity, 0.9f, reservedStreamsConcurrencyLevel)); /** * Transaction identifier for remote commands. */ - private AtomicInteger transactionId = new AtomicInteger(1); + protected AtomicInteger transactionId = new AtomicInteger(1); /** * Hash map that stores pending calls and ids as pairs. @@ -335,7 +335,7 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa /** * Timestamp generator */ - private final AtomicInteger timer = new AtomicInteger(0); + protected final AtomicInteger timer = new AtomicInteger(0); /** * Closing flag @@ -559,8 +559,8 @@ private void startRoundTripMeasurement() { } } } else { - // reducing from error to debug as its not all that important of a message these days to have such promotion - log.debug("startRoundTripMeasurement cannot be executed due to missing scheduler. This can happen if a connection drops before handshake is complete"); + // reducing from error to trace as its not all that important of a message these days to have such promotion + log.trace("startRoundTripMeasurement not enabled. If RTMP, can occur when lost before handshake is complete"); } }