Skip to content

Commit

Permalink
Merge branch 'master' of github.com:Red5/red5-server-common
Browse files Browse the repository at this point in the history
  • Loading branch information
mondain committed Jan 18, 2022
2 parents eb112fc + 922554b commit 85d2998
Showing 1 changed file with 80 additions and 19 deletions.
99 changes: 80 additions & 19 deletions src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.mina.core.session.IoSession;
import org.red5.io.object.StreamAction;
import org.red5.server.api.Red5;
import org.red5.server.api.event.IEventDispatcher;
import org.red5.server.api.service.IPendingServiceCall;
import org.red5.server.api.service.IPendingServiceCallback;
Expand All @@ -37,13 +40,16 @@
import org.red5.server.net.rtmp.message.Packet;
import org.red5.server.net.rtmp.status.StatusCodes;
import org.red5.server.so.SharedObjectMessage;
import org.red5.server.stream.ClientBroadcastStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Base class for all RTMP handlers.
*
* @author The Red5 Project
* @author Andy Shaules
* @author Paul Gregoire
*/
public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, StatusCodes {

Expand Down Expand Up @@ -99,17 +105,14 @@ public void messageReceived(RTMPConnection conn, Packet packet) throws Exception
// log.trace("Marking message as originating from a Live source");
message.setSourceType(Constants.SOURCE_TYPE_LIVE);
// NOTE: If we respond to "publish" with "NetStream.Publish.BadName",
// the client sends a few stream packets before stopping. We need to ignore them
// the client sends a few stream packets before stopping; we need to ignore them.
if (stream != null) {
recvDispatchExecutor.submit(() -> {
try {
Thread.currentThread().setName(String.format("RTMPRecvDispatch@%s", conn.getSessionId()));
((IEventDispatcher) stream).dispatchEvent(message);
message.release();
} catch (Exception e) {
log.warn("Exception on Media dispatch", e);
}
});
EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME);
if (epeo == null && stream != null) {
epeo = new EnsuresPacketExecutionOrder((ClientBroadcastStream) stream, conn);
conn.setAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME, epeo);
}
epeo.addPacket(message);
}
break;
case TYPE_FLEX_SHARED_OBJECT:
Expand All @@ -131,15 +134,12 @@ public void messageReceived(RTMPConnection conn, Packet packet) throws Exception
case TYPE_FLEX_STREAM_SEND:
if (((Notify) message).getData() != null && stream != null) {
// Stream metadata
recvDispatchExecutor.submit(() -> {
try {
Thread.currentThread().setName(String.format("RTMPRecvDispatch@%s", conn.getSessionId()));
((IEventDispatcher) stream).dispatchEvent(message);
message.release();
} catch (Exception e) {
log.warn("Exception on Notify dispatch", e);
}
});
EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME);
if (epeo == null) {
epeo = new EnsuresPacketExecutionOrder((ClientBroadcastStream) stream, conn);
conn.setAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME, epeo);
}
epeo.addPacket(message);
} else {
onCommand(conn, channel, header, (Notify) message);
}
Expand Down Expand Up @@ -364,4 +364,65 @@ protected void onStreamBytesRead(RTMPConnection conn, Channel channel, Header so
*/
protected abstract void onSharedObject(RTMPConnection conn, Channel channel, Header source, SharedObjectMessage message);

/**
* Class ensures a stream's event dispatching occurs on only one core at any one time. Eliminates thread racing internal to ClientBroadcastStream
* and keeps all incoming events in order.
*/
private static class EnsuresPacketExecutionOrder implements Runnable {

public final static String ATTRIBUTE_NAME = "EnsuresPacketExecutionOrder";

private LinkedBlockingQueue<IRTMPEvent> events = new LinkedBlockingQueue<>();

private AtomicBoolean state = new AtomicBoolean();

private final ClientBroadcastStream stream;

private final RTMPConnection conn;

private int iter;

public EnsuresPacketExecutionOrder(ClientBroadcastStream stream, RTMPConnection conn) {
this.stream = stream;
this.conn = conn;
}

/**
* Add packet to the stream's incoming queue.
* @param packet
*/
public void addPacket(IRTMPEvent packet) {
events.offer(packet);
if (state.compareAndSet(false, true)) {
recvDispatchExecutor.submit(this);
}
}

public void run() {
// use int to identify different thread instance
Thread.currentThread().setName(String.format("RTMPRecvDispatch@%s-%d", conn.getSessionId(), iter++));
iter &= 7;
// always set connection local on dispatch threads
Red5.setConnectionLocal(conn);
// we were created for a reason, grab the event
IRTMPEvent message = events.poll();
// null check just in case queue was drained before we woke
if (message != null) {
// dispatch to stream
stream.dispatchEvent(message);
// release / clean up
message.release();
}
// set null before resubmit
Red5.setConnectionLocal(null);
// resubmit for another go if we have more
if (!events.isEmpty()) {
recvDispatchExecutor.submit(this);
} else {
state.set(false);
}
// resubmitting rather than looping until empty plays nice with other threads
}
}

}

0 comments on commit 85d2998

Please sign in to comment.