From f6242a663ce1e0beffffa9ca16901719ad7e0dc2 Mon Sep 17 00:00:00 2001 From: Andy--S Date: Tue, 18 Jan 2022 00:44:59 -0800 Subject: [PATCH 1/2] Ensures a stream's event dispatching occurs on only one core at any one time. Eliminates thread racing internal to ClientBroadcastStream and keeps all incoming events in order. --- .../red5/server/net/rtmp/BaseRTMPHandler.java | 112 +++++++++++++++--- 1 file changed, 94 insertions(+), 18 deletions(-) diff --git a/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java b/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java index efa01a72..307699a0 100644 --- a/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java +++ b/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java @@ -12,9 +12,12 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.mina.core.session.IoSession; import org.red5.io.object.StreamAction; +import org.red5.server.api.Red5; import org.red5.server.api.event.IEventDispatcher; import org.red5.server.api.service.IPendingServiceCall; import org.red5.server.api.service.IPendingServiceCallback; @@ -37,6 +40,7 @@ import org.red5.server.net.rtmp.message.Packet; import org.red5.server.net.rtmp.status.StatusCodes; import org.red5.server.so.SharedObjectMessage; +import org.red5.server.stream.ClientBroadcastStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,6 +80,7 @@ public void messageReceived(RTMPConnection conn, Packet packet) throws Exception final Number streamId = header.getStreamId(); final Channel channel = conn.getChannel(header.getChannelId()); final IClientStream stream = conn.getStreamById(streamId); + if (isTrace) { log.trace("Message received - header: {}", header); } @@ -101,15 +106,23 @@ public void messageReceived(RTMPConnection conn, Packet packet) throws Exception // NOTE: If we respond to "publish" with "NetStream.Publish.BadName", // the client sends a few stream packets before stopping. We need to ignore them if (stream != null) { - recvDispatchExecutor.submit(() -> { - try { - Thread.currentThread().setName(String.format("RTMPRecvDispatch@%s", conn.getSessionId())); - ((IEventDispatcher) stream).dispatchEvent(message); - message.release(); - } catch (Exception e) { - log.warn("Exception on Media dispatch", e); - } - }); + + EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME); + if (epeo == null && stream != null) { + epeo = new EnsuresPacketExecutionOrder((ClientBroadcastStream) stream, conn); + conn.setAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME, epeo); + } + epeo.addPacket(message); + + // recvDispatchExecutor.submit(() -> { + // try { + // Thread.currentThread().setName(String.format("RTMPRecvDispatch@%s", conn.getSessionId())); + // ((IEventDispatcher) stream).dispatchEvent(message); + // message.release(); + // } catch (Exception e) { + // log.warn("Exception on Media dispatch", e); + // } + // }); } break; case TYPE_FLEX_SHARED_OBJECT: @@ -131,15 +144,13 @@ public void messageReceived(RTMPConnection conn, Packet packet) throws Exception case TYPE_FLEX_STREAM_SEND: if (((Notify) message).getData() != null && stream != null) { // Stream metadata - recvDispatchExecutor.submit(() -> { - try { - Thread.currentThread().setName(String.format("RTMPRecvDispatch@%s", conn.getSessionId())); - ((IEventDispatcher) stream).dispatchEvent(message); - message.release(); - } catch (Exception e) { - log.warn("Exception on Notify dispatch", e); - } - }); + EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME); + if (epeo == null) { + epeo = new EnsuresPacketExecutionOrder((ClientBroadcastStream) stream, conn); + conn.setAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME, epeo); + } + epeo.addPacket(message); + } else { onCommand(conn, channel, header, (Notify) message); } @@ -364,4 +375,69 @@ protected void onStreamBytesRead(RTMPConnection conn, Channel channel, Header so */ protected abstract void onSharedObject(RTMPConnection conn, Channel channel, Header source, SharedObjectMessage message); + /** + * Class ensures a stream's event dispatching occurs on only one core at any one time. Eliminates thread racing internal to ClientBroadcastStream + * and keeps all incoming events in order. + * @author Andy Shaules + * + */ + private static class EnsuresPacketExecutionOrder implements Runnable { + + public final static String ATTRIBUTE_NAME = "EnsuresPacketExecutionOrder"; + + public LinkedBlockingQueue events = new LinkedBlockingQueue(); + + public AtomicBoolean state = new AtomicBoolean(); + + public ClientBroadcastStream stream; + + private RTMPConnection conn; + + private int iter; + + public EnsuresPacketExecutionOrder(ClientBroadcastStream str, RTMPConnection conn) { + stream = str; + this.conn = conn; + } + + /** + * Add packet to the stream's incoming queue. + * @param packet + */ + public void addPacket(IRTMPEvent packet) { + + events.offer(packet); + + if (state.compareAndSet(false, true)) { + recvDispatchExecutor.submit(this); + } + } + + public void run() { + + //use int to identify different thread instance. + Thread.currentThread().setName(String.format("RTMPRecvDispatch@%s-%d", conn.getSessionId(), iter++)); + iter &= 7; + //Always set connection local on dispatch threads. + Red5.setConnectionLocal(conn); + + //We were created for a reason. Grab the event. + IRTMPEvent message = events.poll(); + //null check incase queue was drained before we woke. + if (message != null) { + stream.dispatchEvent(message); + message.release(); + } + //set null before resubmit. + Red5.setConnectionLocal(null); + //Resubmit for another go if we have more. + if (!events.isEmpty()) { + recvDispatchExecutor.submit(this); + } else { + state.set(false); + } + //resubmitting rather than looping until empty plays nice with other threads. + } + } + } From 66e79edc8222df2f326bd5ce736e14b00398cd3b Mon Sep 17 00:00:00 2001 From: Paul Gregoire Date: Tue, 18 Jan 2022 10:02:10 -0800 Subject: [PATCH 2/2] Reworked just a tad --- .../red5/server/net/rtmp/BaseRTMPHandler.java | 53 +++++++------------ 1 file changed, 19 insertions(+), 34 deletions(-) diff --git a/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java b/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java index 307699a0..766cf9ef 100644 --- a/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java +++ b/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java @@ -48,6 +48,8 @@ * Base class for all RTMP handlers. * * @author The Red5 Project + * @author Andy Shaules + * @author Paul Gregoire */ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, StatusCodes { @@ -80,7 +82,6 @@ public void messageReceived(RTMPConnection conn, Packet packet) throws Exception final Number streamId = header.getStreamId(); final Channel channel = conn.getChannel(header.getChannelId()); final IClientStream stream = conn.getStreamById(streamId); - if (isTrace) { log.trace("Message received - header: {}", header); } @@ -104,25 +105,14 @@ public void messageReceived(RTMPConnection conn, Packet packet) throws Exception // log.trace("Marking message as originating from a Live source"); message.setSourceType(Constants.SOURCE_TYPE_LIVE); // NOTE: If we respond to "publish" with "NetStream.Publish.BadName", - // the client sends a few stream packets before stopping. We need to ignore them + // the client sends a few stream packets before stopping; we need to ignore them. if (stream != null) { - EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME); if (epeo == null && stream != null) { epeo = new EnsuresPacketExecutionOrder((ClientBroadcastStream) stream, conn); conn.setAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME, epeo); } epeo.addPacket(message); - - // recvDispatchExecutor.submit(() -> { - // try { - // Thread.currentThread().setName(String.format("RTMPRecvDispatch@%s", conn.getSessionId())); - // ((IEventDispatcher) stream).dispatchEvent(message); - // message.release(); - // } catch (Exception e) { - // log.warn("Exception on Media dispatch", e); - // } - // }); } break; case TYPE_FLEX_SHARED_OBJECT: @@ -150,7 +140,6 @@ public void messageReceived(RTMPConnection conn, Packet packet) throws Exception conn.setAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME, epeo); } epeo.addPacket(message); - } else { onCommand(conn, channel, header, (Notify) message); } @@ -377,26 +366,24 @@ protected void onStreamBytesRead(RTMPConnection conn, Channel channel, Header so /** * Class ensures a stream's event dispatching occurs on only one core at any one time. Eliminates thread racing internal to ClientBroadcastStream - * and keeps all incoming events in order. - * @author Andy Shaules - * + * and keeps all incoming events in order. */ private static class EnsuresPacketExecutionOrder implements Runnable { public final static String ATTRIBUTE_NAME = "EnsuresPacketExecutionOrder"; - public LinkedBlockingQueue events = new LinkedBlockingQueue(); + private LinkedBlockingQueue events = new LinkedBlockingQueue<>(); - public AtomicBoolean state = new AtomicBoolean(); + private AtomicBoolean state = new AtomicBoolean(); - public ClientBroadcastStream stream; + private final ClientBroadcastStream stream; - private RTMPConnection conn; + private final RTMPConnection conn; private int iter; - public EnsuresPacketExecutionOrder(ClientBroadcastStream str, RTMPConnection conn) { - stream = str; + public EnsuresPacketExecutionOrder(ClientBroadcastStream stream, RTMPConnection conn) { + this.stream = stream; this.conn = conn; } @@ -405,38 +392,36 @@ public EnsuresPacketExecutionOrder(ClientBroadcastStream str, RTMPConnection con * @param packet */ public void addPacket(IRTMPEvent packet) { - events.offer(packet); - if (state.compareAndSet(false, true)) { recvDispatchExecutor.submit(this); } } public void run() { - - //use int to identify different thread instance. + // use int to identify different thread instance Thread.currentThread().setName(String.format("RTMPRecvDispatch@%s-%d", conn.getSessionId(), iter++)); iter &= 7; - //Always set connection local on dispatch threads. + // always set connection local on dispatch threads Red5.setConnectionLocal(conn); - - //We were created for a reason. Grab the event. + // we were created for a reason, grab the event IRTMPEvent message = events.poll(); - //null check incase queue was drained before we woke. + // null check just in case queue was drained before we woke if (message != null) { + // dispatch to stream stream.dispatchEvent(message); + // release / clean up message.release(); } - //set null before resubmit. + // set null before resubmit Red5.setConnectionLocal(null); - //Resubmit for another go if we have more. + // resubmit for another go if we have more if (!events.isEmpty()) { recvDispatchExecutor.submit(this); } else { state.set(false); } - //resubmitting rather than looping until empty plays nice with other threads. + // resubmitting rather than looping until empty plays nice with other threads } }