Skip to content

Commit

Permalink
feat: implement new reliability and recovery v2 for streaming endpoin…
Browse files Browse the repository at this point in the history
…ts (#231)

* init

* fixing params

* remove useless comments
  • Loading branch information
redouane59 authored Jun 29, 2021
1 parent d5b7d11 commit 5b6957f
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 70 deletions.
22 changes: 20 additions & 2 deletions src/main/java/io/github/redouane59/twitter/ITwitterClientV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public interface ITwitterClientV2 {
* @return a list of users who are following the targeted user
*/
UserList getFollowers(String userId, AdditionalParameters additionalParameters);

/**
* Get a list of the user following limited to 1000 results calling https://api.twitter.com/2/users/:id/following
*
Expand Down Expand Up @@ -144,15 +144,24 @@ public interface ITwitterClientV2 {
*/
TweetList searchAllTweets(String query, AdditionalParameters additionalParameters);

/**
* Stream using previous set up filters calling https://api.twitter.com/2/tweets/search/stream
*/
Future<Response> startFilteredStream(Consumer<Tweet> tweet);

/**
* Stream using previous set up filters calling https://api.twitter.com/2/tweets/search/stream
*/
Future<Response> startFilteredStream(IAPIEventListener listener);

/**
* Stream using previous set up filters calling https://api.twitter.com/2/tweets/search/stream
*
* @param backfillMinutes By passing this parameter, you can recover up to five minutes worth of data that you might have missed during a
* disconnection. The backfilled Tweets will automatically flow through a reconnected stream, with older Tweets generally being delivered before any
* newly matching Tweets. You must include a whole number between 1 and 5 as the value to this parameter.
*/
Future<Response> startFilteredStream(Consumer<Tweet> tweet);
Future<Response> startFilteredStream(IAPIEventListener listener, int backfillMinutes);

/**
* Stops the filtered stream with the result of the startFilteredStream. It'll close the socket opened.
Expand Down Expand Up @@ -210,6 +219,15 @@ public interface ITwitterClientV2 {
*/
Future<Response> startSampledStream(IAPIEventListener listener);

/**
* Stream about 1% of all tweets calling https://api.twitter.com/2/tweets/sample/stream
*
* @param backfillMinutes By passing this parameter, you can recover up to five minutes worth of data that you might have missed during a
* disconnection. The backfilled Tweets will automatically flow through a reconnected stream, with older Tweets generally being delivered before any
* newly matching Tweets. You must include a whole number between 1 and 5 as the value to this parameter.
*/
Future<Response> startSampledStream(IAPIEventListener listener, int backfillMinutes);

/**
* Get the most recent Tweets posted by the user calling https://api.twitter.com/2/users/:id/tweets
*
Expand Down
49 changes: 41 additions & 8 deletions src/main/java/io/github/redouane59/twitter/TwitterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public class TwitterClient implements ITwitterClientV1, ITwitterClientV2, ITwitt
private static final String NEXT = "next";
private static final String PAGINATION_TOKEN = "pagination_token";
private static final String PINNED_TWEET_ID = "pinned_tweet_id";
private static final String BACKFILL_MINUTES = "backfill_minutes";
private static final String[] DEFAULT_VALID_CREDENTIALS_FILE_NAMES = {"test-twitter-credentials.json",
"twitter-credentials.json"};

Expand Down Expand Up @@ -671,14 +672,30 @@ public List<Tweet> searchForTweetsArchive(String query, LocalDateTime fromDate,

@Override
public Future<Response> startFilteredStream(Consumer<Tweet> consumer) {
String url = this.urlHelper.getFilteredStreamUrl();
return this.requestHelperV2.getAsyncRequest(url, consumer);
String url = this.urlHelper.getFilteredStreamUrl();
Map<String, String> parameters = new HashMap<>();
parameters.put(EXPANSION, ALL_EXPANSIONS);
parameters.put(TWEET_FIELDS, ALL_TWEET_FIELDS);
parameters.put(USER_FIELDS, ALL_USER_FIELDS);
return this.requestHelperV2.getAsyncRequest(url, parameters, consumer);
}

@Override
public Future<Response> startFilteredStream(IAPIEventListener listener) {
String url = this.urlHelper.getFilteredStreamUrl();
return this.requestHelperV2.getAsyncRequest(url, listener);
return this.startFilteredStream(listener, 0);
}

@Override
public Future<Response> startFilteredStream(IAPIEventListener listener, int backfillMinutes) {
String url = this.urlHelper.getFilteredStreamUrl();
Map<String, String> parameters = new HashMap<>();
parameters.put(EXPANSION, ALL_EXPANSIONS);
parameters.put(TWEET_FIELDS, ALL_TWEET_FIELDS);
parameters.put(USER_FIELDS, ALL_USER_FIELDS);
if (backfillMinutes > 0) {
parameters.put(BACKFILL_MINUTES, String.valueOf(backfillMinutes));
}
return this.requestHelperV2.getAsyncRequest(url, parameters, listener);
}

@Override
Expand Down Expand Up @@ -738,14 +755,30 @@ public StreamMeta deleteFilteredStreamRuletag(String ruleTag) {

@Override
public Future<Response> startSampledStream(Consumer<Tweet> consumer) {
String url = this.urlHelper.getSampledStreamUrl();
return this.requestHelperV2.getAsyncRequest(url, consumer);
String url = this.urlHelper.getSampledStreamUrl();
Map<String, String> parameters = new HashMap<>();
parameters.put(EXPANSION, ALL_EXPANSIONS);
parameters.put(TWEET_FIELDS, ALL_TWEET_FIELDS);
parameters.put(USER_FIELDS, ALL_USER_FIELDS);
return this.requestHelperV2.getAsyncRequest(url, parameters, consumer);
}

@Override
public Future<Response> startSampledStream(IAPIEventListener listener) {
String url = this.urlHelper.getSampledStreamUrl();
return this.requestHelperV2.getAsyncRequest(url, listener);
return this.startSampledStream(listener, 0);
}

@Override
public Future<Response> startSampledStream(IAPIEventListener listener, int backfillMinutes) {
String url = this.urlHelper.getSampledStreamUrl();
Map<String, String> parameters = new HashMap<>();
parameters.put(EXPANSION, ALL_EXPANSIONS);
parameters.put(TWEET_FIELDS, ALL_TWEET_FIELDS);
parameters.put(USER_FIELDS, ALL_USER_FIELDS);
if (backfillMinutes > 0) {
parameters.put(BACKFILL_MINUTES, String.valueOf(backfillMinutes));
}
return this.requestHelperV2.getAsyncRequest(url, parameters, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ public <T> Optional<T> getRequestWithParameters(String url, Map<String, String>
return makeRequest(Verb.GET, url, parameters, null, true, classType);
}

// @todo add parameters arguments
public Future<Response> getAsyncRequest(String url, Consumer<Tweet> consumer) {
public Future<Response> getAsyncRequest(String url, Map<String, String> parameters, Consumer<Tweet> consumer) {
// All the stream are handled internally with an IAPIEventListener.
IAPIEventListener listener = new IAPIEventListener() {

Expand All @@ -68,14 +67,24 @@ public void onStreamEnded(Exception e) {

};

return getAsyncRequest(url, listener, TweetV2.class);
return getAsyncRequest(url, parameters, listener, TweetV2.class);
}

public Future<Response> getAsyncRequest(String url, IAPIEventListener listener) {
return getAsyncRequest(url, listener, TweetV2.class);
public Future<Response> getAsyncRequest(String url, Map<String, String> parameters, IAPIEventListener listener) {
return getAsyncRequest(url, parameters, listener, TweetV2.class);
}

public <T> Future<Response> getAsyncRequest(String url, IAPIEventListener listener, final Class<? extends T> targetClass) {
public <T> Future<Response> getAsyncRequest(String url,
Map<String, String> parameters,
IAPIEventListener listener,
final Class<? extends T> targetClass) {
if (parameters != null) {
url += parameters.entrySet().stream()
.map(p -> p.getKey() + "=" + p.getValue())
.reduce((p1, p2) -> p1 + "&" + p2)
.map(s -> "?" + s)
.orElse("");
}
OAuthRequest request = new OAuthRequest(Verb.GET, url);
signRequest(request);
return getService().execute(request, new OAuthAsyncRequestCallback<Response>() {
Expand Down
34 changes: 3 additions & 31 deletions src/main/java/io/github/redouane59/twitter/helpers/URLHelper.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.github.redouane59.twitter.helpers;

import io.github.redouane59.twitter.TwitterClient;
import io.github.redouane59.twitter.dto.tweet.MediaCategory;
import lombok.Getter;

Expand Down Expand Up @@ -50,6 +49,8 @@ public class URLHelper {
@Getter
private final String filteredStreamUrl = "https://api.twitter.com/2/tweets/search/stream";
@Getter
private final String sampledStreamUrl = "https://api.twitter.com/2/tweets/sample/stream";
@Getter
private final String tweetsCountUrl = "https://api.twitter.com/2/tweets/counts/recent";
@Getter
private final String tweetsCountAllUrl = "https://api.twitter.com/2/tweets/counts/all";
Expand All @@ -69,7 +70,6 @@ public class URLHelper {
private final String likeUrl = "https://api.twitter.com/2/users/:id/likes";
private final String unlikeUrl = "https://api.twitter.com/2/users/:userId/likes/:tweetId";
private final String hideUrl = "https://api.twitter.com/2/tweets/:id/hidden";
private final String sampledStreamUrl = "https://api.twitter.com/2/tweets/sample/stream";
private final String userTimelineUrl = "https://api.twitter.com/2/users/:id/tweets";
private final String userMentionsUrl = "https://api.twitter.com/2/users/:id/mentions";
private final String blockUserUrl = "https://api.twitter.com/2/users/:id/blocking";
Expand Down Expand Up @@ -174,35 +174,7 @@ public String getFavoriteTweetsUrl(String userId, String maxId) {
public String getHideReplyUrl(final String tweetId) {
return hideUrl.replace(idVariable, tweetId);
}

// @todo to improve
public String getFilteredStreamUrl() {
return filteredStreamUrl
+ "?"
+ TwitterClient.EXPANSION
+ TwitterClient.ALL_EXPANSIONS
+ "&"
+ TwitterClient.TWEET_FIELDS
+ TwitterClient.ALL_TWEET_FIELDS
+ "&"
+ TwitterClient.USER_FIELDS
+ TwitterClient.ALL_USER_FIELDS;
}

// @todo to improve
public String getSampledStreamUrl() {
return sampledStreamUrl
+ "?"
+ TwitterClient.EXPANSION
+ TwitterClient.ALL_EXPANSIONS
+ "&"
+ TwitterClient.TWEET_FIELDS
+ TwitterClient.ALL_TWEET_FIELDS
+ "&"
+ TwitterClient.USER_FIELDS
+ TwitterClient.ALL_USER_FIELDS;
}


public String getUserTimelineUrl(String userId) {
return userTimelineUrl.replace(idVariable, userId);
}
Expand Down
25 changes: 2 additions & 23 deletions src/test/java/io/github/redouane59/twitter/unit/UrlHelperTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static org.junit.jupiter.api.Assertions.assertEquals;

import io.github.redouane59.twitter.TwitterClient;
import io.github.redouane59.twitter.dto.tweet.MediaCategory;
import io.github.redouane59.twitter.helpers.URLHelper;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -163,32 +162,12 @@ public void testRetrieveFilteredStreamRulesUrl() {

@Test
public void testFilteredStreamUrl() {
assertEquals(
"https://api.twitter.com/2/tweets/search/stream?"
+ TwitterClient.EXPANSION
+ TwitterClient.ALL_EXPANSIONS
+ "&"
+ TwitterClient.TWEET_FIELDS
+ TwitterClient.ALL_TWEET_FIELDS
+ "&"
+ TwitterClient.USER_FIELDS
+ TwitterClient.ALL_USER_FIELDS,
urlHelper.getFilteredStreamUrl());
assertEquals("https://api.twitter.com/2/tweets/search/stream", urlHelper.getFilteredStreamUrl());
}

@Test
public void testSampledStreamUrl() {
assertEquals(
"https://api.twitter.com/2/tweets/sample/stream?"
+ TwitterClient.EXPANSION
+ TwitterClient.ALL_EXPANSIONS
+ "&"
+ TwitterClient.TWEET_FIELDS
+ TwitterClient.ALL_TWEET_FIELDS
+ "&"
+ TwitterClient.USER_FIELDS
+ TwitterClient.ALL_USER_FIELDS,
urlHelper.getSampledStreamUrl());
assertEquals("https://api.twitter.com/2/tweets/sample/stream", urlHelper.getSampledStreamUrl());
}

@Test
Expand Down

0 comments on commit 5b6957f

Please sign in to comment.