diff --git a/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java b/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java index efa01a72..766cf9ef 100644 --- a/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java +++ b/src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java @@ -12,9 +12,12 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.mina.core.session.IoSession; import org.red5.io.object.StreamAction; +import org.red5.server.api.Red5; import org.red5.server.api.event.IEventDispatcher; import org.red5.server.api.service.IPendingServiceCall; import org.red5.server.api.service.IPendingServiceCallback; @@ -37,6 +40,7 @@ import org.red5.server.net.rtmp.message.Packet; import org.red5.server.net.rtmp.status.StatusCodes; import org.red5.server.so.SharedObjectMessage; +import org.red5.server.stream.ClientBroadcastStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +48,8 @@ * Base class for all RTMP handlers. * * @author The Red5 Project + * @author Andy Shaules + * @author Paul Gregoire */ public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, StatusCodes { @@ -99,17 +105,14 @@ public void messageReceived(RTMPConnection conn, Packet packet) throws Exception // log.trace("Marking message as originating from a Live source"); message.setSourceType(Constants.SOURCE_TYPE_LIVE); // NOTE: If we respond to "publish" with "NetStream.Publish.BadName", - // the client sends a few stream packets before stopping. We need to ignore them + // the client sends a few stream packets before stopping; we need to ignore them. if (stream != null) { - recvDispatchExecutor.submit(() -> { - try { - Thread.currentThread().setName(String.format("RTMPRecvDispatch@%s", conn.getSessionId())); - ((IEventDispatcher) stream).dispatchEvent(message); - message.release(); - } catch (Exception e) { - log.warn("Exception on Media dispatch", e); - } - }); + EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME); + if (epeo == null && stream != null) { + epeo = new EnsuresPacketExecutionOrder((ClientBroadcastStream) stream, conn); + conn.setAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME, epeo); + } + epeo.addPacket(message); } break; case TYPE_FLEX_SHARED_OBJECT: @@ -131,15 +134,12 @@ public void messageReceived(RTMPConnection conn, Packet packet) throws Exception case TYPE_FLEX_STREAM_SEND: if (((Notify) message).getData() != null && stream != null) { // Stream metadata - recvDispatchExecutor.submit(() -> { - try { - Thread.currentThread().setName(String.format("RTMPRecvDispatch@%s", conn.getSessionId())); - ((IEventDispatcher) stream).dispatchEvent(message); - message.release(); - } catch (Exception e) { - log.warn("Exception on Notify dispatch", e); - } - }); + EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME); + if (epeo == null) { + epeo = new EnsuresPacketExecutionOrder((ClientBroadcastStream) stream, conn); + conn.setAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME, epeo); + } + epeo.addPacket(message); } else { onCommand(conn, channel, header, (Notify) message); } @@ -364,4 +364,65 @@ protected void onStreamBytesRead(RTMPConnection conn, Channel channel, Header so */ protected abstract void onSharedObject(RTMPConnection conn, Channel channel, Header source, SharedObjectMessage message); + /** + * Class ensures a stream's event dispatching occurs on only one core at any one time. Eliminates thread racing internal to ClientBroadcastStream + * and keeps all incoming events in order. + */ + private static class EnsuresPacketExecutionOrder implements Runnable { + + public final static String ATTRIBUTE_NAME = "EnsuresPacketExecutionOrder"; + + private LinkedBlockingQueue events = new LinkedBlockingQueue<>(); + + private AtomicBoolean state = new AtomicBoolean(); + + private final ClientBroadcastStream stream; + + private final RTMPConnection conn; + + private int iter; + + public EnsuresPacketExecutionOrder(ClientBroadcastStream stream, RTMPConnection conn) { + this.stream = stream; + this.conn = conn; + } + + /** + * Add packet to the stream's incoming queue. + * @param packet + */ + public void addPacket(IRTMPEvent packet) { + events.offer(packet); + if (state.compareAndSet(false, true)) { + recvDispatchExecutor.submit(this); + } + } + + public void run() { + // use int to identify different thread instance + Thread.currentThread().setName(String.format("RTMPRecvDispatch@%s-%d", conn.getSessionId(), iter++)); + iter &= 7; + // always set connection local on dispatch threads + Red5.setConnectionLocal(conn); + // we were created for a reason, grab the event + IRTMPEvent message = events.poll(); + // null check just in case queue was drained before we woke + if (message != null) { + // dispatch to stream + stream.dispatchEvent(message); + // release / clean up + message.release(); + } + // set null before resubmit + Red5.setConnectionLocal(null); + // resubmit for another go if we have more + if (!events.isEmpty()) { + recvDispatchExecutor.submit(this); + } else { + state.set(false); + } + // resubmitting rather than looping until empty plays nice with other threads + } + } + }