Skip to content

Commit

Permalink
Refactoring for stability and message handling
Browse files Browse the repository at this point in the history
  • Loading branch information
mondain committed Jul 9, 2016
1 parent 640499d commit 10f16a6
Show file tree
Hide file tree
Showing 15 changed files with 240 additions and 278 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.0.8-M4</version>
<version>1.0.8-M5</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-server-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class AbstractMessage implements IMessage {

protected String messageType;

protected Map<?, ?> extraHeaders = null;
protected Map<?, ?> extraHeaders;

/** {@inheritDoc} */
public String getMessageID() {
Expand Down
72 changes: 10 additions & 62 deletions src/main/java/org/red5/server/messaging/AbstractPipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,7 @@ public abstract class AbstractPipe implements IPipe {
* Consumer
* @param paramMap
* Parameters passed with connection, used in concrete pipe implementations
* @return <pre>
* true
* </pre>
*
* if consumer was added,
*
* <pre>
* false
* </pre>
*
* otherwise
* @return true if consumer was added, false otherwise
*/
public boolean subscribe(IConsumer consumer, Map<String, Object> paramMap) {
// pipe is possibly used by dozens of threads at once (like many subscribers for one server stream)
Expand All @@ -87,7 +77,6 @@ public boolean subscribe(IConsumer consumer, Map<String, Object> paramMap) {
if (success && consumer instanceof IPipeConnectionListener) {
listeners.addIfAbsent((IPipeConnectionListener) consumer);
}

return success;
}

Expand All @@ -98,17 +87,7 @@ public boolean subscribe(IConsumer consumer, Map<String, Object> paramMap) {
* Provider
* @param paramMap
* Parameters passed with connection, used in concrete pipe implementations
* @return <pre>
* true
* </pre>
*
* if provider was added,
*
* <pre>
* false
* </pre>
*
* otherwise
* @return true if provider was added, false otherwise
*/
public boolean subscribe(IProvider provider, Map<String, Object> paramMap) {
boolean success = providers.addIfAbsent(provider);
Expand All @@ -124,21 +103,11 @@ public boolean subscribe(IProvider provider, Map<String, Object> paramMap) {
*
* @param provider
* Provider that should be removed
* @return <pre>
* true
* </pre>
*
* on success,
*
* <pre>
* false
* </pre>
*
* otherwise
* @return true on success, false otherwise
*/
public boolean unsubscribe(IProvider provider) {
if (providers.remove(provider)) {
fireProviderConnectionEvent(provider, PipeConnectionEvent.PROVIDER_DISCONNECT, null);
fireProviderConnectionEvent(provider, PipeConnectionEvent.EventType.PROVIDER_DISCONNECT, null);
listeners.remove(provider);
return true;
}
Expand All @@ -150,21 +119,11 @@ public boolean unsubscribe(IProvider provider) {
*
* @param consumer
* Consumer that should be removed
* @return <pre>
* true
* </pre>
*
* on success,
*
* <pre>
* false
* </pre>
*
* otherwise
* @return true on success, false otherwise
*/
public boolean unsubscribe(IConsumer consumer) {
if (consumers.remove(consumer)) {
fireConsumerConnectionEvent(consumer, PipeConnectionEvent.CONSUMER_DISCONNECT, null);
fireConsumerConnectionEvent(consumer, PipeConnectionEvent.EventType.CONSUMER_DISCONNECT, null);
listeners.remove(consumer);
return true;
}
Expand Down Expand Up @@ -275,15 +234,8 @@ public List<IConsumer> getConsumers() {
* @param paramMap
* Parameters passed with connection
*/
protected void fireConsumerConnectionEvent(IConsumer consumer, int type, Map<String, Object> paramMap) {
// Create event object
PipeConnectionEvent event = new PipeConnectionEvent(this);
// Fill it up
event.setConsumer(consumer);
event.setType(type);
event.setParamMap(paramMap);
// Fire it
firePipeConnectionEvent(event);
protected void fireConsumerConnectionEvent(IConsumer consumer, PipeConnectionEvent.EventType type, Map<String, Object> paramMap) {
firePipeConnectionEvent(PipeConnectionEvent.build(this, type, consumer, paramMap));
}

/**
Expand All @@ -296,12 +248,8 @@ protected void fireConsumerConnectionEvent(IConsumer consumer, int type, Map<Str
* @param paramMap
* Parameters passed with connection
*/
protected void fireProviderConnectionEvent(IProvider provider, int type, Map<String, Object> paramMap) {
PipeConnectionEvent event = new PipeConnectionEvent(this);
event.setProvider(provider);
event.setType(type);
event.setParamMap(paramMap);
firePipeConnectionEvent(event);
protected void fireProviderConnectionEvent(IProvider provider, PipeConnectionEvent.EventType type, Map<String, Object> paramMap) {
firePipeConnectionEvent(PipeConnectionEvent.build(this, type, provider, paramMap));
}

/**
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/red5/server/messaging/IPassive.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,7 @@
* @author Steven Gong ([email protected])
*/
public interface IPassive {

public static final String KEY = IPassive.class.getName();

}
2 changes: 2 additions & 0 deletions src/main/java/org/red5/server/messaging/IPipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* @author Steven Gong ([email protected])
*/
public interface IPipe extends IMessageInput, IMessageOutput {

/**
* Add connection event listener to pipe
*
Expand All @@ -42,4 +43,5 @@ public interface IPipe extends IMessageInput, IMessageOutput {
* Connection event listener
*/
void removePipeConnectionListener(IPipeConnectionListener listener);

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public boolean subscribe(IConsumer consumer, Map<String, Object> paramMap) {
log.debug("Consumer subscribe{} {} params: {}", new Object[] { (success ? "d" : " failed"), consumer, paramMap });
}
if (success) {
fireConsumerConnectionEvent(consumer, PipeConnectionEvent.CONSUMER_CONNECT_PUSH, paramMap);
fireConsumerConnectionEvent(consumer, PipeConnectionEvent.EventType.CONSUMER_CONNECT_PUSH, paramMap);
}
return success;
} else {
Expand All @@ -70,7 +70,7 @@ public boolean subscribe(IProvider provider, Map<String, Object> paramMap) {
log.debug("Provider subscribe{} {} params: {}", new Object[] { (success ? "d" : " failed"), provider, paramMap });
}
if (success) {
fireProviderConnectionEvent(provider, PipeConnectionEvent.PROVIDER_CONNECT_PUSH, paramMap);
fireProviderConnectionEvent(provider, PipeConnectionEvent.EventType.PROVIDER_CONNECT_PUSH, paramMap);
}
return success;
}
Expand All @@ -91,12 +91,11 @@ public IMessage pullMessage(long wait) {
* @param message
* the message to be pushed to consumers
* @throws IOException
* In case IOException of some sort is occured
* In case IOException of some sort is occurred
*/
public void pushMessage(IMessage message) throws IOException {
if (log.isDebugEnabled()) {
log.debug("pushMessage: {}", message);
log.debug("pushMessage - consumers: {}", consumers.size());
log.debug("pushMessage: {} to {} consumers", message, consumers.size());
}
for (IConsumer consumer : consumers) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@
import java.util.Map;

/**
* Out-of-band control message used by inter-components communication which are connected with pipes. Out-of-band data is a separate data stream used for specific purposes (in TCP it's referenced as "urgent data"), like lifecycle control.
* Out-of-band control message used by inter-components communication which are connected with pipes.
* Out-of-band data is a separate data stream used for specific purposes (in TCP it's referenced as "urgent data"), like lifecycle control.
*
* <tt>'Target'</tt> is used to represent the receiver who may be interested for receiving. It's a string of any form. XXX shall we design a standard form for Target, like "class.instance"?
* <tt>'Target'</tt> is used to represent the receiver who may be interested for receiving.
* It's a string of any form. XXX shall we design a standard form for Target, like "class.instance"?
*
* @author The Red5 Project
* @author Steven Gong ([email protected])
*/
public class OOBControlMessage implements Serializable {

private static final long serialVersionUID = -6037348177653934300L;

/**
Expand Down
Loading

0 comments on commit 10f16a6

Please sign in to comment.