Skip to content

Commit

Permalink
Refactored AMF decoding
Browse files Browse the repository at this point in the history
  • Loading branch information
mondain committed Aug 10, 2016
1 parent f732ce2 commit 6d7791d
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 106 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.0.8-M6</version>
<version>1.0.8-M7</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-server-common</artifactId>
Expand Down
195 changes: 111 additions & 84 deletions src/main/java/org/red5/server/net/rtmp/codec/RTMPProtocolDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -47,7 +48,6 @@
import org.red5.server.net.rtmp.event.ChunkSize;
import org.red5.server.net.rtmp.event.ClientBW;
import org.red5.server.net.rtmp.event.FlexMessage;
import org.red5.server.net.rtmp.event.FlexStreamSend;
import org.red5.server.net.rtmp.event.IRTMPEvent;
import org.red5.server.net.rtmp.event.Invoke;
import org.red5.server.net.rtmp.event.Notify;
Expand Down Expand Up @@ -115,8 +115,8 @@ public List<Object> decodeBuffer(RTMPConnection conn, IoBuffer buffer) {
if (!conn.getSessionId().equals(state.getSessionId())) {
log.warn("Session decode overlap: {} != {}", conn.getSessionId(), state.getSessionId());
}
while (buffer.hasRemaining()) {
final int remaining = buffer.remaining();
int remaining;
while ((remaining = buffer.remaining()) > 0) {
if (state.canStartDecoding(remaining)) {
log.trace("Can start decoding");
state.startDecoding();
Expand Down Expand Up @@ -461,19 +461,6 @@ public IRTMPEvent decodeMessage(RTMPConnection conn, Header header, IoBuffer in)
IRTMPEvent message;
byte dataType = header.getDataType();
switch (dataType) {
case TYPE_INVOKE:
message = decodeInvoke(conn.getEncoding(), in);
break;
case TYPE_NOTIFY:
if (log.isTraceEnabled()) {
log.trace("Sending notify on stream id: {}", header.getStreamId());
}
if (header.getStreamId().doubleValue() == 0.0d) {
message = decodeNotify(conn.getEncoding(), in, header);
} else {
message = decodeStreamMetadata(in);
}
break;
case TYPE_AUDIO_DATA:
message = decodeAudioData(in);
message.setSourceType(Constants.SOURCE_TYPE_LIVE);
Expand All @@ -494,8 +481,27 @@ public IRTMPEvent decodeMessage(RTMPConnection conn, Header header, IoBuffer in)
case TYPE_FLEX_MESSAGE:
message = decodeFlexMessage(in);
break;
case TYPE_INVOKE:
message = decodeInvoke(conn.getEncoding(), in);
break;
case TYPE_FLEX_STREAM_SEND:
message = decodeFlexStreamSend(in);
if (log.isTraceEnabled()) {
log.trace("Decoding flex stream send on stream id: {}", header.getStreamId());
}
// skip first byte
in.get();
// decode stream data; slice from the current position
message = decodeStreamData(in.slice());
break;
case TYPE_NOTIFY:
if (log.isTraceEnabled()) {
log.trace("Decoding notify on stream id: {}", header.getStreamId());
}
if (header.getStreamId().doubleValue() == 0.0d) {
message = decodeNotify(conn.getEncoding(), in, header);
} else {
message = decodeStreamData(in);
}
break;
case TYPE_PING:
message = decodePing(in);
Expand Down Expand Up @@ -859,39 +865,38 @@ public VideoData decodeVideoData(IoBuffer in) {
}

/**
* Decodes stream meta data, to include onMetaData, onCuePoint, and onFI.
* Decodes stream data, to include onMetaData, onCuePoint, and onFI.
*
* @param in
* input buffer
* @param in input buffer
* @return Notify
*/
@SuppressWarnings("unchecked")
public Notify decodeStreamMetadata(IoBuffer in) {
public Notify decodeStreamData(IoBuffer in) {
if (log.isDebugEnabled()) {
log.debug("decodeStreamMetadata");
log.debug("decodeStreamData");
}
// our result is a notify
Notify ret = null;
// check the encoding, if its AMF3 check to see if first byte is set to AMF0
Encoding encoding = ((RTMPConnection) Red5.getConnectionLocal()).getEncoding();
log.trace("Encoding: {}", encoding);
// set mark
in.mark();
// create input
Input input = null;
// check the encoding, if its AMF3 check to see if first byte is set to AMF0
byte amfVersion = 0x00;
if (encoding == Encoding.AMF3) {
in.mark();
amfVersion = in.get();
in.reset();
}
// make a pre-emptive copy of the incoming buffer here to prevent issues that occur fairly often
IoBuffer copy = in.duplicate();
if (encoding == Encoding.AMF0 || amfVersion != AMF.TYPE_AMF3_OBJECT) {
input = new org.red5.io.amf.Input(copy);
log.trace("Decoding using AMF3");
input = new org.red5.io.amf3.Input(in);
} else {
org.red5.io.amf3.Input.RefStorage refStorage = new org.red5.io.amf3.Input.RefStorage();
input = new org.red5.io.amf3.Input(copy, refStorage);
log.trace("Decoding using AMF0");
input = new org.red5.io.amf.Input(in);
}
//get the first datatype
byte dataType = input.readDataType();
log.debug("Data type: {}", dataType);
if (dataType == DataTypes.CORE_STRING) {
String setData = input.readString();
if ("@setDataFrame".equals(setData)) {
String action = input.readString();
if ("@setDataFrame".equals(action)) {
// get the second datatype
byte dataType2 = input.readDataType();
log.debug("Dataframe method type: {}", dataType2);
Expand All @@ -901,57 +906,90 @@ public Notify decodeStreamMetadata(IoBuffer in) {
log.debug("Dataframe params type: {}", object);
Map<Object, Object> params;
if (object == DataTypes.CORE_MAP) {
// the params are sent as a Mixed-Array. Required to support the RTMP publish provided by ffmpeg/xuggler
// the params are sent as a Mixed-Array. Required to support the RTMP publish provided by ffmpeg
params = (Map<Object, Object>) input.readMap();
} else if (object == DataTypes.CORE_ARRAY) {
params = (Map<Object, Object>) input.readArray(Object[].class);
//params = new HashMap<>();
//Object[] arr = (Object[]) input.readArray(Object[].class);
//for (int a = 0; a < arr.length; a++) {
// params.put(a, arr[a]);
//}
} else {
// read the params as a standard object
params = (Map<Object, Object>) input.readObject();
try {
// read the params as a standard object
params = (Map<Object, Object>) input.readObject();
} catch (Exception e) {
log.warn("Dataframe decode error", e);
params = Collections.EMPTY_MAP;
}
}
if (log.isDebugEnabled()) {
log.debug("Dataframe: {} params: {}", onCueOrOnMeta, params.toString());
}
log.debug("Dataframe: {} params: {}", onCueOrOnMeta, params.toString());
IoBuffer buf = IoBuffer.allocate(1024);
IoBuffer buf = IoBuffer.allocate(128);
buf.setAutoExpand(true);
Output out = new Output(buf);
out.writeString(onCueOrOnMeta);
out.writeMap(params);
buf.flip();
Notify ret = new Notify(buf);
// instance a notify
ret = new Notify(buf);
// set the action
ret.setAction(onCueOrOnMeta);
return ret;
} else if ("onFI".equals(setData)) {
// the onFI request contains 2 items relative to the publishing client application
// sd = system date (12-07-2011)
// st = system time (09:11:33.387)
byte object = input.readDataType();
log.debug("onFI params type: {}", object);
if (log.isDebugEnabled()) {
Map<Object, Object> params;
if (object == DataTypes.CORE_MAP) {
// the params are sent as a Mixed-Array
params = (Map<Object, Object>) input.readMap();
} else {
// read the params as a standard object
params = (Map<Object, Object>) input.readObject();
}
log.debug("onFI params: {}", params.toString());
}
} else {
log.info("Stream send: {}", setData);
if (log.isDebugEnabled()) {
if ("onFI".equals(action)) {
// the onFI request contains 2 items relative to the publishing client application
// sd = system date (12-07-2011)
// st = system time (09:11:33.387)
byte object = input.readDataType();
log.debug("Params type: {}", object);
if (object == DataTypes.CORE_MAP) {
Map<Object, Object> params = (Map<Object, Object>) input.readMap();
log.debug("Params: {}", params.toString());
} else {
log.debug("The unknown request was did not provide a parameter map");
log.debug("onFI params type: {}", object);
if (log.isDebugEnabled()) {
Map<Object, Object> params;
if (object == DataTypes.CORE_MAP) {
// the params are sent as a Mixed-Array
params = (Map<Object, Object>) input.readMap();
} else {
// read the params as a standard object
params = (Map<Object, Object>) input.readObject();
}
log.debug("onFI params: {}", params.toString());
}
} else {
log.info("Stream send: {}", action);
if (log.isDebugEnabled()) {
byte object = input.readDataType();
log.debug("Params type: {}", object);
if (object == DataTypes.CORE_MAP) {
Map<Object, Object> params = (Map<Object, Object>) input.readMap();
log.debug("Params: {}", params.toString());
} else if (object == DataTypes.CORE_ARRAY) {
Map<Object, Object> params = (Map<Object, Object>) input.readArray(Object[].class);
log.debug("Params: {}", params);
} else if (object == DataTypes.CORE_STRING) {
String str = input.readString();
log.debug("Params: {}", str);
} else {
log.debug("Stream send did not provide a parameter map");
//Map<Object, Object> params = (Map<Object, Object>) input.readObject();
//log.debug("Params: {}", params.toString());
}
}
}
// go back to the beginning
in.reset();
// instance a notify
ret = new Notify(in.asReadOnlyBuffer());
// set the action
ret.setAction(action);
}
Notify toReturn = new Notify(in.asReadOnlyBuffer());
toReturn.setAction(setData);
return toReturn;
} else {
// go back to the beginning
in.reset();
// instance a notify
ret = new Notify(in.asReadOnlyBuffer());
}
return new Notify(in.asReadOnlyBuffer());
return ret;
}

/**
Expand All @@ -978,7 +1016,7 @@ public FlexMessage decodeFlexMessage(IoBuffer in) {
msg.setTransactionId(transactionId);
Object[] params = new Object[] {};
if (in.hasRemaining()) {
ArrayList<Object> paramList = new ArrayList<Object>();
ArrayList<Object> paramList = new ArrayList<>();
final Object obj = Deserializer.deserialize(input, Object.class);
if (obj != null) {
paramList.add(obj);
Expand Down Expand Up @@ -1025,17 +1063,6 @@ public FlexMessage decodeFlexMessage(IoBuffer in) {
return msg;
}

public Notify decodeFlexStreamSend(IoBuffer in) {
if (log.isDebugEnabled()) {
log.debug("decodeFlexStreamSend");
}
// remove the first byte
in.get();
// copy remaining to pos 0, reset mark, move to pos 0
in.compact();
return new FlexStreamSend(in.asReadOnlyBuffer());
}

/**
* Sets whether or not a header error on any channel should result in a closed connection.
*
Expand Down
5 changes: 1 addition & 4 deletions src/main/java/org/red5/server/net/rtmp/event/Notify.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,7 @@ public void setConnectionParams(Map<String, Object> connectionParams) {
/** {@inheritDoc} */
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Notify: ").append(call);
return sb.toString();
return String.format("Notify: %s", call);
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -273,7 +271,6 @@ public Notify duplicate() throws IOException, ClassNotFoundException {

public void setAction(String onCueOrOnMeta) {
this.action = onCueOrOnMeta;

}

public String getAction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public class RecordingListener implements IRecordingListener {
/**
* Queue to hold incoming stream event packets.
*/
private final BlockingQueue<CachedEvent> queue = new LinkedBlockingQueue<CachedEvent>(8192);
private final BlockingQueue<CachedEvent> queue = new LinkedBlockingQueue<>(8192);


/**
Expand Down
21 changes: 8 additions & 13 deletions src/main/java/org/red5/server/stream/consumer/FileConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand Down Expand Up @@ -172,7 +173,7 @@ public class FileConsumer implements Constants, IPushableConsumer, IPipeConnecti
*/
private volatile Future<?> writerFuture;

private volatile boolean gotVideoKeyFrame;
private AtomicBoolean gotVideoKeyFrame = new AtomicBoolean(false);

/**
* Default ctor
Expand Down Expand Up @@ -223,16 +224,16 @@ public void pushMessage(IPipe pipe, IMessage message) throws IOException {
// if we're dealing with a FlexStreamSend IRTMPEvent, this avoids
// relative timestamp calculations
if (!(msg instanceof FlexStreamSend)) {
log.trace("Not FlexStreamSend type");
//log.trace("Not FlexStreamSend type");
lastTimestamp = timestamp;
}
// ensure that our first video frame written is a key frame
if (msg instanceof VideoData) {
if (!gotVideoKeyFrame) {
if (!gotVideoKeyFrame.get()) {
VideoData video = (VideoData) msg;
if (video.getFrameType() == FrameType.KEYFRAME) {
log.debug("Got our first keyframe");
gotVideoKeyFrame = true;
gotVideoKeyFrame.set(true);
} else {
// skip this frame bail out
log.debug("Skipping video data since keyframe has not been written yet");
Expand Down Expand Up @@ -283,6 +284,8 @@ public void pushMessage(IPipe pipe, IMessage message) throws IOException {
}
} else if (message instanceof ResetMessage) {
startTimestamp = -1;
} else if (log.isDebugEnabled()) {
log.debug("Ignoring pushed message: {}", message);
}
}

Expand Down Expand Up @@ -404,6 +407,7 @@ public void onOOBControlMessage(IMessageComponent source, IPipe pipe, OOBControl
* @param event
* Pipe connection event
*/
@SuppressWarnings("incomplete-switch")
public void onPipeConnectionEvent(PipeConnectionEvent event) {
switch (event.getType()) {
case CONSUMER_CONNECT_PUSH:
Expand All @@ -414,15 +418,6 @@ public void onPipeConnectionEvent(PipeConnectionEvent event) {
}
}
break;
case CONSUMER_DISCONNECT:
if (event.getConsumer() != this) {
break;
}
case PROVIDER_DISCONNECT:
// we only support one provider at a time so releasing when provider disconnects uninit();
break;
default:
break;
}
}

Expand Down
Loading

0 comments on commit 6d7791d

Please sign in to comment.