Skip to content

Commit

Permalink
Added connection type for null or unknown
Browse files Browse the repository at this point in the history
  • Loading branch information
mondain committed Mar 24, 2017
1 parent 2ed448e commit 9ec2443
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 37 deletions.
6 changes: 5 additions & 1 deletion src/main/java/org/red5/server/BaseConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,11 @@ public BaseConnection() {
@ConstructorProperties({ "type" })
public BaseConnection(String type) {
log.debug("New BaseConnection - type: {}", type);
this.type = IConnection.Type.valueOf(type.toUpperCase());
if (type != null) {
this.type = IConnection.Type.valueOf(type.toUpperCase());
} else {
this.type = IConnection.Type.UNKNOWN;
}
this.sessionId = RandomStringUtils.randomAlphanumeric(13).toUpperCase();
log.debug("Generated session id: {}", sessionId);
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/red5/server/api/IConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public static enum Duty {
public static enum Type {
PERSISTENT, // Persistent connection type, eg RTMP
POLLING, // Polling connection type, eg RTMPT
TRANSIENT // Transient connection type, eg Remoting, HTTP, etc
TRANSIENT, // Transient connection type, eg Remoting, HTTP, etc
UNKNOWN // all others not matching known types
};

/**
Expand Down
72 changes: 37 additions & 35 deletions src/main/java/org/red5/server/stream/ClientBroadcastStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ private void checkSendNotifications(IEvent event) {
* Closes stream, unsubscribes provides, sends stoppage notifications and broadcast close notification.
*/
public void close() {
log.debug("Stream close: {}", publishedName);
//log.debug("Stream close: {}", publishedName);
if (closed) {
log.debug("{} already closed", publishedName);
//log.debug("{} already closed", publishedName);
return;
}
closed = true;
Expand Down Expand Up @@ -247,6 +247,7 @@ public void dispatchEvent(IEvent event) {
return;
}
int eventTime = rtmpEvent.getTimestamp();
/*
if (log.isTraceEnabled()) {
// If this is first packet save its timestamp; expect it is
// absolute? no matter: it's never used!
Expand All @@ -257,6 +258,7 @@ public void dispatchEvent(IEvent event) {
log.trace(String.format("CBS=@%08x: rtmpEvent=%s creation=%s firstPacketTime=%d timestamp=%d", System.identityHashCode(this), rtmpEvent.getClass().getSimpleName(), creationTime, firstPacketTime, eventTime));
}
}
*/
//get the buffer only once per call
IoBuffer buf = null;
if (rtmpEvent instanceof IStreamData && (buf = ((IStreamData<?>) rtmpEvent).getData()) != null) {
Expand All @@ -270,7 +272,7 @@ public void dispatchEvent(IEvent event) {
}
//log.trace("Stream codec info: {}", info);
if (rtmpEvent instanceof AudioData) {
log.trace("Audio: {}", eventTime);
//log.trace("Audio: {}", eventTime);
IAudioStreamCodec audioStreamCodec = null;
if (checkAudioCodec) {
// dont try to read codec info from 0 length audio packets
Expand All @@ -291,7 +293,7 @@ public void dispatchEvent(IEvent event) {
info.setHasAudio(true);
}
} else if (rtmpEvent instanceof VideoData) {
log.trace("Video: {}", eventTime);
//log.trace("Video: {}", eventTime);
IVideoStreamCodec videoStreamCodec = null;
if (checkVideoCodec) {
videoStreamCodec = VideoCodecFactory.getVideoCodec(buf);
Expand All @@ -309,20 +311,20 @@ public void dispatchEvent(IEvent event) {
info.setHasVideo(true);
}
} else if (rtmpEvent instanceof Invoke) {
Invoke invokeEvent = (Invoke) rtmpEvent;
log.debug("Invoke action: {}", invokeEvent.getAction());
//Invoke invokeEvent = (Invoke) rtmpEvent;
//log.debug("Invoke action: {}", invokeEvent.getAction());
// event / stream listeners will not be notified of invokes
return;
} else if (rtmpEvent instanceof Notify) {
Notify notifyEvent = (Notify) rtmpEvent;
String action = notifyEvent.getAction();
if (log.isDebugEnabled()) {
log.debug("Notify action: {}", action);
}
//if (log.isDebugEnabled()) {
//log.debug("Notify action: {}", action);
//}
if ("onMetaData".equals(action)) {
// store the metadata
try {
log.debug("Setting metadata");
//log.debug("Setting metadata");
setMetaData(notifyEvent.duplicate());
} catch (Exception e) {
log.warn("Metadata could not be duplicated for this stream", e);
Expand Down Expand Up @@ -364,7 +366,7 @@ public void dispatchEvent(IEvent event) {
break;
default:
// ignored event
log.debug("Ignoring event: {}", event.getType());
//log.debug("Ignoring event: {}", event.getType());
}
} else {
log.debug("Event was of wrong type or stream is closed ({})", closed);
Expand Down Expand Up @@ -407,7 +409,7 @@ public IProvider getProvider() {
* Name that used for publishing. Set at client side when begin to broadcast with NetStream#publish.
*/
public void setPublishedName(String name) {
log.debug("setPublishedName: {}", name);
//log.debug("setPublishedName: {}", name);
// a publish name of "false" is a special case, used when stopping a stream
if (StringUtils.isNotEmpty(name) && !"false".equals(name)) {
this.publishedName = name;
Expand Down Expand Up @@ -568,37 +570,37 @@ public void onOOBControlMessage(IMessageComponent source, IPipe pipe, OOBControl
public void onPipeConnectionEvent(PipeConnectionEvent event) {
switch (event.getType()) {
case PROVIDER_CONNECT_PUSH:
log.debug("Provider connect");
//log.debug("Provider connect");
if (event.getProvider() == this && event.getSource() != connMsgOut && (event.getParamMap() == null || !event.getParamMap().containsKey("record"))) {
livePipe = (IPipe) event.getSource();
log.debug("Provider: {}", livePipe.getClass().getName());
//log.debug("Provider: {}", livePipe.getClass().getName());
for (IConsumer consumer : livePipe.getConsumers()) {
subscriberStats.increment();
}
}
break;
case PROVIDER_DISCONNECT:
log.debug("Provider disconnect");
if (log.isDebugEnabled() && livePipe != null) {
log.debug("Provider: {}", livePipe.getClass().getName());
}
//log.debug("Provider disconnect");
//if (log.isDebugEnabled() && livePipe != null) {
//log.debug("Provider: {}", livePipe.getClass().getName());
//}
if (livePipe == event.getSource()) {
livePipe = null;
}
break;
case CONSUMER_CONNECT_PUSH:
log.debug("Consumer connect");
//log.debug("Consumer connect");
IPipe pipe = (IPipe) event.getSource();
if (log.isDebugEnabled() && pipe != null) {
log.debug("Consumer: {}", pipe.getClass().getName());
}
//if (log.isDebugEnabled() && pipe != null) {
//log.debug("Consumer: {}", pipe.getClass().getName());
//}
if (livePipe == pipe) {
notifyChunkSize();
}
subscriberStats.increment();
break;
case CONSUMER_DISCONNECT:
log.debug("Consumer disconnect: {}", event.getSource().getClass().getName());
//log.debug("Consumer disconnect: {}", event.getSource().getClass().getName());
subscriberStats.decrement();
break;
default:
Expand Down Expand Up @@ -627,7 +629,7 @@ public void pushMessage(IPipe pipe, IMessage message) {
* File could not be created/written to
*/
public void saveAs(String name, boolean isAppend) throws IOException {
log.debug("SaveAs - name: {} append: {}", name, isAppend);
//log.debug("SaveAs - name: {} append: {}", name, isAppend);
// get connection to check if client is still streaming
IStreamCapableConnection conn = getConnection();
if (conn == null) {
Expand All @@ -639,24 +641,24 @@ public void saveAs(String name, boolean isAppend) throws IOException {
//IRecordingListener listener = (IRecordingListener) ScopeUtils.getScopeService(conn.getScope(), IRecordingListener.class, RecordingListener.class, false);
// create a recording listener
IRecordingListener listener = new RecordingListener();
log.debug("Created: {}", listener);
//log.debug("Created: {}", listener);
// initialize the listener
if (listener.init(conn, name, isAppend)) {
// get decoder info if it exists for the stream
IStreamCodecInfo codecInfo = getCodecInfo();
log.debug("Codec info: {}", codecInfo);
//log.debug("Codec info: {}", codecInfo);
if (codecInfo instanceof StreamCodecInfo) {
StreamCodecInfo info = (StreamCodecInfo) codecInfo;
IVideoStreamCodec videoCodec = info.getVideoCodec();
log.debug("Video codec: {}", videoCodec);
//log.debug("Video codec: {}", videoCodec);
if (videoCodec != null) {
//check for decoder configuration to send
IoBuffer config = videoCodec.getDecoderConfiguration();
if (config != null) {
log.debug("Decoder configuration is available for {}", videoCodec.getName());
//log.debug("Decoder configuration is available for {}", videoCodec.getName());
VideoData videoConf = new VideoData(config.asReadOnlyBuffer());
try {
log.debug("Setting decoder configuration for recording");
//log.debug("Setting decoder configuration for recording");
listener.getFileConsumer().setVideoDecoderConfiguration(videoConf);
} finally {
videoConf.release();
Expand All @@ -666,15 +668,15 @@ public void saveAs(String name, boolean isAppend) throws IOException {
log.debug("Could not initialize stream output, videoCodec is null.");
}
IAudioStreamCodec audioCodec = info.getAudioCodec();
log.debug("Audio codec: {}", audioCodec);
//log.debug("Audio codec: {}", audioCodec);
if (audioCodec != null) {
//check for decoder configuration to send
IoBuffer config = audioCodec.getDecoderConfiguration();
if (config != null) {
log.debug("Decoder configuration is available for {}", audioCodec.getName());
//log.debug("Decoder configuration is available for {}", audioCodec.getName());
AudioData audioConf = new AudioData(config.asReadOnlyBuffer());
try {
log.debug("Setting decoder configuration for recording");
//log.debug("Setting decoder configuration for recording");
listener.getFileConsumer().setAudioDecoderConfiguration(audioConf);
} finally {
audioConf.release();
Expand Down Expand Up @@ -827,7 +829,7 @@ private void sendStartNotifications(IEventListener source) {
* Starts stream, creates pipes, connects
*/
public void start() {
log.info("Stream start: {}", publishedName);
//log.info("Stream start: {}", publishedName);
checkVideoCodec = true;
checkAudioCodec = true;
firstPacketTime = -1;
Expand All @@ -850,7 +852,7 @@ public void startPublishing() {
sendStartNotifications(Red5.getConnectionLocal());
// force recording if set
if (automaticRecording) {
log.debug("Starting automatic recording of {}", publishedName);
//log.debug("Starting automatic recording of {}", publishedName);
try {
saveAs(publishedName, false);
} catch (Exception e) {
Expand All @@ -861,7 +863,7 @@ public void startPublishing() {

/** {@inheritDoc} */
public void stop() {
log.info("Stream stop: {}", publishedName);
//log.info("Stream stop: {}", publishedName);
stopRecording();
close();
}
Expand Down

0 comments on commit 9ec2443

Please sign in to comment.