Skip to content

Commit

Permalink
Fix for event dispatching and update for connection property exposure…
Browse files Browse the repository at this point in the history
… to extension
  • Loading branch information
mondain committed Jan 19, 2022
1 parent 85d2998 commit e2a4cd5
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 13 deletions.
9 changes: 4 additions & 5 deletions src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.red5.server.net.rtmp.message.Packet;
import org.red5.server.net.rtmp.status.StatusCodes;
import org.red5.server.so.SharedObjectMessage;
import org.red5.server.stream.ClientBroadcastStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -109,7 +108,7 @@ public void messageReceived(RTMPConnection conn, Packet packet) throws Exception
if (stream != null) {
EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME);
if (epeo == null && stream != null) {
epeo = new EnsuresPacketExecutionOrder((ClientBroadcastStream) stream, conn);
epeo = new EnsuresPacketExecutionOrder((IEventDispatcher) stream, conn);
conn.setAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME, epeo);
}
epeo.addPacket(message);
Expand All @@ -136,7 +135,7 @@ public void messageReceived(RTMPConnection conn, Packet packet) throws Exception
// Stream metadata
EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME);
if (epeo == null) {
epeo = new EnsuresPacketExecutionOrder((ClientBroadcastStream) stream, conn);
epeo = new EnsuresPacketExecutionOrder((IEventDispatcher) stream, conn);
conn.setAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME, epeo);
}
epeo.addPacket(message);
Expand Down Expand Up @@ -376,13 +375,13 @@ private static class EnsuresPacketExecutionOrder implements Runnable {

private AtomicBoolean state = new AtomicBoolean();

private final ClientBroadcastStream stream;
private final IEventDispatcher stream;

private final RTMPConnection conn;

private int iter;

public EnsuresPacketExecutionOrder(ClientBroadcastStream stream, RTMPConnection conn) {
public EnsuresPacketExecutionOrder(IEventDispatcher stream, RTMPConnection conn) {
this.stream = stream;
this.conn = conn;
}
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/org/red5/server/net/rtmp/RTMPConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,31 +185,31 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa
*
* @see org.red5.server.net.rtmp.Channel
*/
private transient ConcurrentMap<Integer, Channel> channels = new ConcurrentHashMap<>(channelsInitalCapacity, 0.9f, channelsConcurrencyLevel);
protected transient ConcurrentMap<Integer, Channel> channels = new ConcurrentHashMap<>(channelsInitalCapacity, 0.9f, channelsConcurrencyLevel);

/**
* Queues of tasks for every channel
*
* @see org.red5.server.net.rtmp.ReceivedMessageTaskQueue
*/
private final transient ConcurrentMap<Integer, ReceivedMessageTaskQueue> tasksByStreams = new ConcurrentHashMap<>(streamsInitalCapacity, 0.9f, streamsConcurrencyLevel);
protected final transient ConcurrentMap<Integer, ReceivedMessageTaskQueue> tasksByStreams = new ConcurrentHashMap<>(streamsInitalCapacity, 0.9f, streamsConcurrencyLevel);

/**
* Client streams
*
* @see org.red5.server.api.stream.IClientStream
*/
private transient ConcurrentMap<Number, IClientStream> streams = new ConcurrentHashMap<>(streamsInitalCapacity, 0.9f, streamsConcurrencyLevel);
protected transient ConcurrentMap<Number, IClientStream> streams = new ConcurrentHashMap<>(streamsInitalCapacity, 0.9f, streamsConcurrencyLevel);

/**
* Reserved stream ids. Stream id's directly relate to individual NetStream instances.
*/
private transient Set<Number> reservedStreams = Collections.newSetFromMap(new ConcurrentHashMap<Number, Boolean>(reservedStreamsInitalCapacity, 0.9f, reservedStreamsConcurrencyLevel));
protected transient Set<Number> reservedStreams = Collections.newSetFromMap(new ConcurrentHashMap<Number, Boolean>(reservedStreamsInitalCapacity, 0.9f, reservedStreamsConcurrencyLevel));

/**
* Transaction identifier for remote commands.
*/
private AtomicInteger transactionId = new AtomicInteger(1);
protected AtomicInteger transactionId = new AtomicInteger(1);

/**
* Hash map that stores pending calls and ids as pairs.
Expand Down Expand Up @@ -335,7 +335,7 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa
/**
* Timestamp generator
*/
private final AtomicInteger timer = new AtomicInteger(0);
protected final AtomicInteger timer = new AtomicInteger(0);

/**
* Closing flag
Expand Down Expand Up @@ -559,8 +559,8 @@ private void startRoundTripMeasurement() {
}
}
} else {
// reducing from error to debug as its not all that important of a message these days to have such promotion
log.debug("startRoundTripMeasurement cannot be executed due to missing scheduler. This can happen if a connection drops before handshake is complete");
// reducing from error to trace as its not all that important of a message these days to have such promotion
log.trace("startRoundTripMeasurement not enabled. If RTMP, can occur when lost before handshake is complete");
}
}

Expand Down

0 comments on commit e2a4cd5

Please sign in to comment.