Skip to content

Commit

Permalink
Added customMessageType to Publish, Signal, Subscribe, History, File.
Browse files Browse the repository at this point in the history
  • Loading branch information
marcin-cebo committed Oct 28, 2024
1 parent 628bb44 commit d93ca77
Show file tree
Hide file tree
Showing 64 changed files with 788 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import com.pubnub.api.java.v2.PNConfiguration
import com.pubnub.api.java.v2.callbacks.EventEmitter
import com.pubnub.api.java.v2.callbacks.StatusEmitter
import com.pubnub.api.java.v2.endpoints.pubsub.PublishBuilder
import com.pubnub.api.java.v2.endpoints.pubsub.SignalBuilder
import com.pubnub.api.java.v2.entities.Channel
import com.pubnub.api.java.v2.entities.ChannelGroup
import com.pubnub.api.java.v2.entities.ChannelMetadata
Expand Down Expand Up @@ -183,6 +184,10 @@ interface PubNub : EventEmitter, StatusEmitter {
* if more than 100 messages meet the timetoken values.
*
*/
@Deprecated(
level = DeprecationLevel.WARNING,
message = "Use fetchMessages() instead",
)
fun history(): History

/**
Expand Down Expand Up @@ -319,6 +324,11 @@ interface PubNub : EventEmitter, StatusEmitter {
* The message argument can contain any JSON serializable data, including: Objects, Arrays, Integers and Strings.
* Data should not contain special Java/Kotlin classes or functions as these will not serialize.
* String content can include any single-byte or multi-byte UTF-8 character.
*
* @param message The payload
* @param channel The channel to publish message to.
*
* @return [PublishBuilder]
*/
fun publish(message: Any, channel: String): PublishBuilder

Expand Down Expand Up @@ -360,8 +370,13 @@ interface PubNub : EventEmitter, StatusEmitter {
* By default, signals are limited to a message payload size of 30 bytes.
* This limit applies only to the payload, and not to the URI or headers.
* If you require a larger payload size, please [contact support](mailto:[email protected]).
*
* @param message The payload
* @param channel The channel to signal message to.
*
* @return [SignalBuilder]
*/
fun signal(message: Any, channel: String): com.pubnub.api.endpoints.pubsub.Signal
fun signal(message: Any, channel: String): SignalBuilder

/**
* Send a signal to all subscribers of a channel.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,6 @@ public interface FetchMessages extends Endpoint<PNFetchMessagesResult> {
FetchMessages includeMessageType(boolean includeMessageType);

FetchMessages includeUUID(boolean includeUUID);

FetchMessages includeCustomMessageType(boolean includeCustomMessageType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ public interface Publish extends Endpoint<PNPublishResult> {
Publish replicate(boolean replicate);

Publish ttl(Integer ttl);

Publish customMessageType(String type);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,63 @@
import com.pubnub.api.java.endpoints.Endpoint;
import com.pubnub.api.models.consumer.PNPublishResult;

/**
* Interface representing a builder for configuring a publish operation.
* This interface extends {@link Endpoint} to provide a fluent API for setting various parameters
* for the publish request.
*/
public interface PublishBuilder extends Endpoint<PNPublishResult> {
/**
* Specifies whether the message should be stored in the history of the channel.
*
* @param shouldStore Boolean indicating whether to store the message (true) or not (false). If not specified, then the history configuration of the key is used.
* @return The current instance of {@code PublishBuilder} for method chaining.
*/
PublishBuilder shouldStore(Boolean shouldStore);

/**
* Configures the publish request to use the POST HTTP method instead of GET.
*
* @param usePOST Boolean indicating whether to use POST (true) or GET (false) for the request. Default is `false`
* @return The current instance of {@code PublishBuilder} for method chaining.
*/
PublishBuilder usePOST(boolean usePOST);

/**
* Sets the metadata to be sent along with the message.
* Metadata can be any custom object.
*
* @param meta Metadata object which can be used with the filtering ability.
* @return The current instance of {@code PublishBuilder} for method chaining.
*/
PublishBuilder meta(Object meta);


/**
* Specifies whether the message should be replicated across datacenters.
*
* @param replicate Boolean indicating whether to replicate the message (true) or not (false). Default is true.
* @return The current instance of {@code PublishBuilder} for method chaining.
*/
PublishBuilder replicate(boolean replicate);

/**
* Sets the time-to-live (TTL) in Message Persistence.
* If shouldStore = true, and ttl = 0, the message is stored with no expiry time.
* If shouldStore = true and ttl = X (X is an Integer value), the message is stored with an expiry time of X hours.
* If shouldStore = false, the ttl parameter is ignored.
* If ttl is not specified, then expiration of the message defaults back to the expiry value for the key.
*
* @param ttl The TTL value in minutes for the message.
* @return The current instance of {@code PublishBuilder} for method chaining.
*/
PublishBuilder ttl(Integer ttl);

/**
* Specifies a custom message type for the message.
*
* @param customMessageType The custom message type as a string.
* @return The current instance of {@code PublishBuilder} for method chaining.
*/
PublishBuilder customMessageType(String customMessageType);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
package com.pubnub.api.java.v2.endpoints.pubsub;

import com.pubnub.api.java.endpoints.pubsub.Signal;
import com.pubnub.api.java.endpoints.Endpoint;
import com.pubnub.api.models.consumer.PNPublishResult;

public interface SignalBuilder extends Signal {
/**
* Interface representing a builder for configuring a signal operation.
* This interface extends {@link Endpoint} to provide a fluent API for setting parameters
* for the signal request.
*/
public interface SignalBuilder extends Endpoint<PNPublishResult> {
/**
* Specifies a custom message type for the signal.
*
* @param customMessageType The custom message type as a string.
* @return The current instance of {@code SignalBuilder} for method chaining.
*/
SignalBuilder customMessageType(String customMessageType);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.pubnub.api.java.v2.entities

import com.pubnub.api.endpoints.pubsub.Signal
import com.pubnub.api.java.v2.endpoints.pubsub.PublishBuilder
import com.pubnub.api.java.v2.endpoints.pubsub.SignalBuilder
import com.pubnub.api.java.v2.subscriptions.Subscription
import com.pubnub.api.v2.subscriptions.SubscriptionOptions

Expand Down Expand Up @@ -90,6 +90,7 @@ interface Channel : Subscribable {
* - If `shouldStore = false`, the `ttl` parameter is ignored.
* - If ttl isn't specified, then expiration of the message defaults
* back to the expiry value for the key.
* @param customMessageType The custom type associated with the message.
*/
fun publish(message: Any): PublishBuilder

Expand All @@ -101,8 +102,9 @@ interface Channel : Subscribable {
* If you require a larger payload size, please [contact support](mailto:[email protected]).
*
* @param message The payload which will be serialized and sent.
* @param customMessageType The custom type associated with the message.
*/
fun signal(message: Any): Signal
fun signal(message: Any): SignalBuilder

/**
* Send a message to PubNub Functions Event Handlers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,14 @@ public void testHistorySingleChannel_Meta_Timetoken() throws PubNubException {
@Test
public void testFetchSingleChannel() throws PubNubException {
final String expectedChannelName = random();
final String customMessageType = "myType_" + random();

publishMixed(pubNub, 10, expectedChannelName);
publishMixed(pubNub, 10, expectedChannelName, customMessageType);

final PNFetchMessagesResult fetchMessagesResult = pubNub.fetchMessages()
.channels(Collections.singletonList(expectedChannelName))
.maximumPerChannel(25)
.includeCustomMessageType(true)
.sync();

pause(3);
Expand All @@ -137,6 +139,7 @@ public void testFetchSingleChannel() throws PubNubException {
assertNotNull(messageItem.getTimetoken());
assertNull(messageItem.getMeta());
assertNull(messageItem.getActions());
assertEquals(customMessageType, messageItem.getCustomMessageType());
}

}
Expand Down Expand Up @@ -168,8 +171,9 @@ public void testFetchSingleChannel_Meta() throws PubNubException {
@Test
public void testFetchSingleChannel_Actions() throws PubNubException {
final String expectedChannelName = random();
final String customMessageType = "myType_" + random();

final List<PNPublishResult> results = publishMixed(pubNub, 120, expectedChannelName);
final List<PNPublishResult> results = publishMixed(pubNub, 120, expectedChannelName, customMessageType);

pubNub.addMessageAction()
.channel(expectedChannelName)
Expand All @@ -184,6 +188,7 @@ public void testFetchSingleChannel_Actions() throws PubNubException {
.maximumPerChannel(25)
.includeMessageActions(true)
.includeMeta(false)
.includeCustomMessageType(true)
.sync();

assert fetchMessagesResult != null;
Expand All @@ -196,6 +201,7 @@ public void testFetchSingleChannel_Actions() throws PubNubException {
} else {
assertTrue(messageItem.getActions().isEmpty());
}
assertEquals(customMessageType, messageItem.getCustomMessageType());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.pubnub.api.java.callbacks.SubscribeCallback;
import com.pubnub.api.java.v2.PNConfiguration;
import com.pubnub.api.java.v2.PNConfigurationOverride;
import com.pubnub.api.java.v2.callbacks.StatusListener;
import com.pubnub.api.java.v2.entities.Channel;
import com.pubnub.api.java.v2.subscriptions.Subscription;
import com.pubnub.api.models.consumer.PNPublishResult;
Expand All @@ -31,6 +32,11 @@
import org.junit.Test;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -58,8 +64,9 @@ public void testPublishMessage() throws PubNubException {
final AtomicBoolean success = new AtomicBoolean();
final String expectedChannel = randomChannel();
final JsonObject messagePayload = generateMessage(pubNub);
final String customMessageType = "myType";

pubNub.publish(messagePayload, expectedChannel).shouldStore(true).usePOST(true).async((result) -> {
pubNub.publish(messagePayload, expectedChannel).shouldStore(true).usePOST(true).customMessageType(customMessageType).async((result) -> {
assertFalse(result.isFailure());
success.set(true);
});
Expand All @@ -70,6 +77,7 @@ public void testPublishMessage() throws PubNubException {
pubNub.publish()
.message(messagePayload)
.channel(expectedChannel)
.customMessageType(customMessageType)
.async((result) -> {
assertFalse(result.isFailure());
success.set(true);
Expand Down Expand Up @@ -702,4 +710,35 @@ public void message(@NotNull PubNub pubnub, @NotNull PNMessageResult pnMessageRe
.with()
.untilAtomic(count, IsEqual.equalTo(2));
}

@Test
public void testPublishMessageWithCustomMessageType() throws InterruptedException, ExecutionException, TimeoutException, PubNubException {
String expectedChannelName = randomChannel();
Channel channel = pubNub.channel(expectedChannelName);
String expectedCustomMessageType = "thisIsType";
CountDownLatch connected = new CountDownLatch(1);
CompletableFuture receivedMessageFuture = new CompletableFuture<PNMessageResult>();

Subscription subscription = channel.subscription();
subscription.setOnMessage(pnMessageResult -> receivedMessageFuture.complete(pnMessageResult));
pubNub.addListener(new StatusListener() {
@Override
public void status(@NotNull PubNub pubnub, @NotNull PNStatus status) {
if (status.getCategory() == PNStatusCategory.PNConnectedCategory &&
status.getAffectedChannels().contains(expectedChannelName)
) {
connected.countDown();
}
}
});

subscription.subscribe();
assertTrue(connected.await(10, TimeUnit.SECONDS));

channel.publish("message").customMessageType(expectedCustomMessageType).sync();

PNMessageResult message = (PNMessageResult) receivedMessageFuture.get(10, TimeUnit.SECONDS);

assertEquals(expectedCustomMessageType, message.getCustomMessageType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ public void testPublishSignalMessageAsync() {
public void testPublishSignalMessageSync() throws PubNubException {
final String expectedChannel = randomChannel();
final String expectedPayload = RandomGenerator.newValue(5);
final String customMessageType = "mytype";

final PNPublishResult signalResult = pubNub.signal(expectedPayload, expectedChannel).sync();
final PNPublishResult signalResult = pubNub.signal(expectedPayload, expectedChannel).customMessageType(customMessageType).sync();

assertNotNull(signalResult);
}
Expand All @@ -68,6 +69,7 @@ public void testReceiveSignalMessage() {

final String expectedChannel = randomChannel();
final String expectedPayload = RandomGenerator.newValue(5);
final String expectedCustomMessageType = "myCustomType";

final com.pubnub.api.java.PubNub observerClient = getPubNub();

Expand All @@ -78,9 +80,8 @@ public void testReceiveSignalMessage() {
public void status(@NotNull com.pubnub.api.java.PubNub pubnub, @NotNull PNStatus status) {
if (status.getCategory() == PNStatusCategory.PNConnectedCategory) {
if (status.getAffectedChannels().contains(expectedChannel)) {
pubNub.signal()
.message(expectedPayload)
.channel(expectedChannel)
pubNub.signal(expectedPayload, expectedChannel)
.customMessageType(expectedCustomMessageType)
.async((result) -> {
assertFalse(result.isFailure());
assertNotNull(result);
Expand All @@ -94,6 +95,7 @@ public void signal(@NotNull com.pubnub.api.java.PubNub pubnub, @NotNull PNSignal
assertEquals(pubNub.getConfiguration().getUserId().getValue(), signal.getPublisher());
assertEquals(expectedChannel, signal.getChannel());
assertEquals(expectedPayload, new Gson().fromJson(signal.getMessage(), String.class));
assertEquals(expectedCustomMessageType, signal.getCustomMessageType());
success.set(true);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,21 @@ public static String randomChannel() {
// }

public static List<PNPublishResult> publishMixed(PubNub pubnub, int count, String channel) {
String customMessageType = null;
return publishMixed(pubnub, count, channel, customMessageType);
}

public static List<PNPublishResult> publishMixed(PubNub pubnub, int count, String channel, String customMessageType) {
final List<PNPublishResult> list = new ArrayList<>();
for (int i = 0; i < count; i++) {
final Publish publishBuilder = pubnub.publish()
.channel(channel)
.message(String.valueOf(i).concat("_msg"))
.shouldStore(true);
if(customMessageType != null){
publishBuilder.customMessageType(customMessageType);
}

if (i % 2 == 0) {
publishBuilder.meta(generateMap());
} else if (i % 3 == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import com.pubnub.api.java.v2.PNConfiguration
import com.pubnub.api.java.v2.callbacks.EventListener
import com.pubnub.api.java.v2.callbacks.StatusListener
import com.pubnub.api.java.v2.endpoints.pubsub.PublishBuilder
import com.pubnub.api.java.v2.endpoints.pubsub.SignalBuilder
import com.pubnub.internal.PubNubImpl
import com.pubnub.internal.java.builder.PresenceBuilderImpl
import com.pubnub.internal.java.builder.SubscribeBuilderImpl
Expand Down Expand Up @@ -218,8 +219,8 @@ open class PubNubForJavaImpl(configuration: PNConfiguration) :
return SignalImpl(this)
}

override fun signal(message: Any, channel: String): com.pubnub.api.endpoints.pubsub.Signal {
return super.signal(channel, message)
override fun signal(message: Any, channel: String): SignalBuilder {
return SignalImpl(this).message(message).channel(channel) as SignalBuilder
}

override fun listChannelsForChannelGroup(): AllChannelsChannelGroup {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class FetchMessagesImpl extends PassthroughEndpoint<PNFetchMessagesResult
private boolean includeMessageActions;
private boolean includeMessageType = true;
private boolean includeUUID = true;
private boolean includeCustomMessageType = false;

public FetchMessagesImpl(PubNub pubnub) {
super(pubnub);
Expand All @@ -49,7 +50,8 @@ protected Endpoint<PNFetchMessagesResult> createRemoteAction() {
includeUUID,
includeMeta,
includeMessageActions,
includeMessageType
includeMessageType,
includeCustomMessageType
);
}

Expand Down
Loading

0 comments on commit d93ca77

Please sign in to comment.