diff --git a/CMakeLists.txt b/CMakeLists.txt index 37ad5f02..32c34b42 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -60,9 +60,9 @@ num_option(ONLY_PUBSUB_API "Only pubsub API" OFF) num_option(USE_PROXY "Use proxy" ON) num_option(USE_GZIP_COMPRESSION "Use gzip compression" ON) num_option(RECEIVE_GZIP_RESPONSE "Use gzip decompression" ON) -num_option(USE_RETRY_CONFIGURATION "Use requests retry" ON) +num_option(USE_RETRY_CONFIGURATION "Use requests retry" OFF) num_option(USE_SUBSCRIBE_V2 "Use subscribe v2" ON) -num_option(USE_SUBSCRIBE_EVENT_ENGINE "Use Subscribe Event Engine" ON) +num_option(USE_SUBSCRIBE_EVENT_ENGINE "Use Subscribe Event Engine" OFF) num_option(USE_ADVANCED_HISTORY "Use advanced history" ON) num_option(USE_OBJECTS_API "Use objects API" ON) num_option(USE_AUTO_HEARTBEAT "Use auto heartbeat" ON) @@ -529,8 +529,7 @@ set(SOURCEFILES ${LIB_SOURCEFILES} ${OS_SOURCEFILES} ${FEATURE_SOURCEFILES} - ${INTF_SOURCEFILES} - core/samples/subscribe_event_engine_sample.c) + ${INTF_SOURCEFILES}) if(NOT ESP_PLATFORM) if(${SHARED_LIB}) @@ -708,9 +707,13 @@ if(${EXAMPLES}) pubnub_callback_subloop_sample subscribe_publish_callback_sample subscribe_publish_from_callback - subscribe_event_engine_sample publish_callback_subloop_sample publish_queue_callback_subloop) + if(${USE_SUBSCRIBE_EVENT_ENGINE}) + set(EXAMPLE_LIST + subscribe_event_engine_sample + ${EXAMPLE_LIST}) + endif() if (WITH_CPP) set(CPP_EXAMPLE_LIST subscribe_publish_callback_sample # Only supports callback! diff --git a/core/pbcc_event_engine.c b/core/pbcc_event_engine.c index 0b732d75..e41535c1 100644 --- a/core/pbcc_event_engine.c +++ b/core/pbcc_event_engine.c @@ -73,6 +73,8 @@ typedef enum { /** Business logic invocation definition. */ struct pbcc_ee_invocation { + /** A specific Event Engine invocation type. */ + int type; /** * @brief Whether invocation should be processed immediately without * addition to the queue or not. @@ -198,10 +200,15 @@ static void pbcc_ee_invocation_exec_(pbcc_ee_invocation_t* invocation); * execution. * @param invocation Pointer to the invocation which has been used to execute * effect. + * @param paused Whether `invocation` execution has been paused or not. + * Paused invocation will reset its execution status to + * 'CREATED'. When paused, then the Event Engine should be + * told when the next invocation should be processed. */ static void pbcc_ee_effect_completion_( pbcc_event_engine_t* ee, - pbcc_ee_invocation_t* invocation); + pbcc_ee_invocation_t* invocation, + bool paused); // ---------------------------------------------- @@ -234,13 +241,11 @@ void pbcc_ee_free(pbcc_event_engine_t** ee) { if (NULL == ee || NULL == *ee) { return; } - // pubnub_mutex_lock((*ee)->mutw); - printf("~~~~~~~~ pbcc_ee_free acquired lock (%p). LOCKED? %s\n", ee, pubnub_mutex_lock((*ee)->mutw) ? "YES" : "NO"); + pubnub_mutex_lock((*ee)->mutw); if (NULL != (*ee)->invocations) { pbarray_free(&(*ee)->invocations); } if (NULL != (*ee)->current_state) pbcc_ee_state_free(&(*ee)->current_state); - // pubnub_mutex_unlock((*ee)->mutw); - printf("~~~~~~~~ pbcc_ee_free released lock (%p). UNLOCKED? %s\n", ee, pubnub_mutex_unlock((*ee)->mutw) ? "YES" : "NO"); + pubnub_mutex_unlock((*ee)->mutw); pubnub_mutex_destroy((*ee)->mutw); free(*ee); *ee = NULL; @@ -248,19 +253,9 @@ void pbcc_ee_free(pbcc_event_engine_t** ee) pbcc_ee_state_t* pbcc_ee_current_state(pbcc_event_engine_t* ee) { - printf("~~~~~~~~ pbcc_ee_current_state (%p)\n", ee); - const bool locked = pbpal_mutex_trylock(ee->mutw); - if(locked) { - printf("~~~~~~~~ pbcc_ee_current_state acquired lock (%p)\n", ee); - } else { - printf("~~~~~~~~ pbcc_ee_current_state CAN'T acquire lock. Already locked (%p)\n", ee); - } - // pubnub_mutex_lock(ee->mutw); + pubnub_mutex_lock(ee->mutw); pbcc_ee_state_t* state = pbcc_ee_state_copy_(ee->current_state); - if (locked) { - // pubnub_mutex_unlock(ee->mutw); - printf("~~~~~~~~ pbcc_ee_current_state released lock (%p). UNLOCKED? %s\n", ee, pubnub_mutex_unlock(ee->mutw) ? "YES" : "NO"); - } + pubnub_mutex_unlock(ee->mutw); return state; } @@ -270,12 +265,8 @@ enum pubnub_res pbcc_ee_handle_event( pbcc_ee_event_t* event) { PUBNUB_ASSERT_OPT(NULL != ee->current_state); - printf("~~~~ pbcc_ee_handle_event (event type: %d)\n", event->type); - pthread_t thread_id = pthread_self(); - printf("Current thread ID: %lu\n", (unsigned long)thread_id); - // pubnub_mutex_lock(ee->mutw); - printf("~~~~~~~~ pbcc_ee_handle_event acquired lock (%p). LOCKED? %s\n", ee, pubnub_mutex_lock(ee->mutw) ? "YES" : "NO"); + pubnub_mutex_lock(ee->mutw); pbcc_ee_state_t* state = pbcc_ee_state_copy_(ee->current_state); pbcc_ee_transition_t* transition = ee->current_state->transition( ee, @@ -286,8 +277,7 @@ enum pubnub_res pbcc_ee_handle_event( if (NULL == transition) { PUBNUB_LOG_ERROR("pbcc_ee_handle_event: failed to allocate memory\n"); - // pubnub_mutex_unlock(ee->mutw); - printf("~~~~~~~~ pbcc_ee_handle_event released lock (%p). UNLOCKED? %s\n", ee, pubnub_mutex_unlock(ee->mutw) ? "YES" : "NO"); + pubnub_mutex_unlock(ee->mutw); return PNR_OUT_OF_MEMORY; } @@ -298,8 +288,7 @@ enum pubnub_res pbcc_ee_handle_event( */ if (NULL == transition->target_state) { pbcc_ee_transition_free(&transition); - // pubnub_mutex_unlock(ee->mutw); - printf("~~~~~~~~ pbcc_ee_handle_event released lock (%p). UNLOCKED? %s\n", ee, pubnub_mutex_unlock(ee->mutw) ? "YES" : "NO"); + pubnub_mutex_unlock(ee->mutw); return PNR_OK; } @@ -310,32 +299,39 @@ enum pubnub_res pbcc_ee_handle_event( if (NULL == transition->invocations) { pbcc_ee_current_state_set_(ee, transition->target_state); pbcc_ee_transition_free(&transition); - // pubnub_mutex_unlock(ee->mutw); - printf("~~~~~~~~ pbcc_ee_handle_event released lock (%p). UNLOCKED? %s\n", ee, pubnub_mutex_unlock(ee->mutw) ? "YES" : "NO"); + pubnub_mutex_unlock(ee->mutw); + return PNR_OK; } /** * Merging transition invocations into Event Engine invocations queue. */ - enum pubnub_res rslt = PNR_OK; + enum pubnub_res rslt = PNR_OK; pbarray_t* invocations = transition->invocations; - const size_t count = pbarray_count(invocations); - const pbarray_res merge_rslt = pbarray_merge(ee->invocations, invocations); - bool executing_immediate = false; + const size_t count = pbarray_count(invocations); + const pbarray_res merge_rslt = pbarray_merge(ee->invocations, invocations); if (PBAR_OK != merge_rslt) { rslt = PBTT_ADD_MEMBERS == merge_rslt ? PNR_OUT_OF_MEMORY : PNR_INVALID_PARAMETERS; } - /** Restore previous state if new invocations schedule failed. */ - if (PNR_OK != rslt) { - pbarray_subtract(ee->invocations, invocations, true); + if (PNR_OK != rslt || 0 == count) { + if (0 != count) + pbarray_subtract(ee->invocations, invocations, true); + if (PNR_OK == rslt) + pbcc_ee_current_state_set_(ee, transition->target_state); + pubnub_mutex_unlock(ee->mutw); + + pbcc_ee_transition_free(&transition); + pbcc_ee_process_next_invocation(ee); + + return PNR_OK; } - else { pbcc_ee_current_state_set_(ee, transition->target_state); } - // pubnub_mutex_unlock(ee->mutw); - printf("~~~~~~~~ pbcc_ee_handle_event released lock (%p). UNLOCKED? %s\n", ee, pubnub_mutex_unlock(ee->mutw) ? "YES" : "NO"); + + pbcc_ee_current_state_set_(ee, transition->target_state); + pubnub_mutex_unlock(ee->mutw); for (size_t i = 0; i < count; ++i) { pbcc_ee_invocation_t* inv = (pbcc_ee_invocation_t*) @@ -345,46 +341,48 @@ enum pubnub_res pbcc_ee_handle_event( * Event Engine invocations queue from transition. */ pbref_counter_increment(inv->counter); + } - if (!inv->immediate) { continue; } + /** Check whether any immediate */ + pbcc_ee_invocation_t* inv = + (pbcc_ee_invocation_t*)pbarray_first(invocations); + if (inv->immediate) { pbcc_ee_invocation_exec_(inv); } + else { pbcc_ee_process_next_invocation(ee); } - pbcc_ee_invocation_exec_(inv); - executing_immediate = true; - } pbcc_ee_transition_free(&transition); - if (!executing_immediate) { pbcc_ee_process_next_invocation(ee, false); } - return rslt; } -void pbcc_ee_process_next_invocation( - pbcc_event_engine_t* ee, - const bool remove_previous) +size_t pbcc_ee_process_next_invocation(pbcc_event_engine_t* ee) { - // pubnub_mutex_lock(ee->mutw); - printf("~~~~~~~~ pbcc_ee_process_next_invocation acquired lock (%p). LOCKED? %s\n", ee, pubnub_mutex_lock(ee->mutw) ? "YES" : "NO"); - if (remove_previous && pbarray_count(ee->invocations) > 0) { - pbcc_ee_invocation_t* inv = (pbcc_ee_invocation_t*) - pbarray_first(ee->invocations); - /** Remove invocation only if it has running state. */ - if (PBCC_EE_INVOCATION_RUNNING == inv->status) { - inv->status = PBCC_EE_INVOCATION_COMPLETED; - pbarray_remove_element_at(ee->invocations, 0); - } - } + pubnub_mutex_lock(ee->mutw); + const size_t count = pbarray_count(ee->invocations); - if (0 == pbarray_count(ee->invocations)) { - // pubnub_mutex_unlock(ee->mutw); - printf("~~~~~~~~ pbcc_ee_process_next_invocation released lock (%p). UNLOCKED? %s\n", ee, pubnub_mutex_unlock(ee->mutw) ? "YES" : "NO"); - return; + if (0 == count) { + pubnub_mutex_unlock(ee->mutw); + return count; } + pbcc_ee_invocation_t* invocation = (pbcc_ee_invocation_t*) pbarray_first(ee->invocations); - // pubnub_mutex_unlock(ee->mutw); - printf("~~~~~~~~ pbcc_ee_process_next_invocation released lock (%p). UNLOCKED? %s\n", ee, pubnub_mutex_unlock(ee->mutw) ? "YES" : "NO"); - + pubnub_mutex_unlock(ee->mutw); pbcc_ee_invocation_exec_(invocation); + + return count; +} + +void pbcc_ee_handle_effect_completion( + pbcc_event_engine_t* ee, + pbcc_ee_invocation_t* invocation) +{ + /** Check whether invocation is running or not. */ + if (PBCC_EE_INVOCATION_RUNNING != invocation->status) { return; } + + pubnub_mutex_lock(ee->mutw); + invocation->status = PBCC_EE_INVOCATION_COMPLETED; + pbarray_remove(ee->invocations, (void**)&invocation, true); + pubnub_mutex_unlock(ee->mutw); } pbcc_ee_data_t* pbcc_ee_data_alloc( @@ -409,7 +407,7 @@ void pbcc_ee_data_free(pbcc_ee_data_t* data) } } -const void* pbcc_ee_data_value(const pbcc_ee_data_t* data) +void* pbcc_ee_data_value(pbcc_ee_data_t* data) { if (NULL == data) { return NULL; } return data->data; @@ -502,6 +500,7 @@ void pbcc_ee_event_free(pbcc_ee_event_t** event) } pbcc_ee_invocation_t* pbcc_ee_invocation_alloc( + int type, const pbcc_ee_effect_function_t effect, pbcc_ee_data_t* data, const bool immediate) @@ -512,10 +511,33 @@ pbcc_ee_invocation_t* pbcc_ee_invocation_alloc( invocation->status = PBCC_EE_INVOCATION_CREATED; invocation->effect = effect; invocation->data = pbcc_ee_data_copy(data); + invocation->type = type; return invocation; } +void pbcc_ee_invocation_cancel_by_type(pbcc_event_engine_t* ee, int type) +{ + pubnub_mutex_lock(ee->mutw); + const size_t count = pbarray_count(ee->invocations); + if (0 == count) { + pubnub_mutex_unlock(ee->mutw); + return; + } + + for (size_t i = 0; i < count; i++) { + const pbcc_ee_invocation_t* invocation = + pbarray_element_at(ee->invocations, i); + + if (invocation->status == PBCC_EE_INVOCATION_RUNNING) { continue; } + if (type == invocation->type) { + pbarray_remove(ee->invocations, (void**)&invocation, false); + break; + } + } + pubnub_mutex_unlock(ee->mutw); +} + void pbcc_ee_invocation_free(pbcc_ee_invocation_t* invocation) { if (NULL == invocation) { return; } @@ -533,19 +555,18 @@ pbcc_ee_transition_t* pbcc_ee_transition_alloc( { PUBNUB_ASSERT_OPT(NULL != ee->current_state); - // pubnub_mutex_lock(ee->mutw); - printf("~~~~~~~~ pbcc_ee_transition_alloc acquired lock (%p). LOCKED? %s\n", ee, pubnub_mutex_lock(ee->mutw) ? "YES" : "NO"); + pubnub_mutex_lock(ee->mutw); const pbcc_ee_state_t* current_state = ee->current_state; PBCC_ALLOCATE_TYPE(transition, pbcc_ee_transition_t, true, NULL); transition->target_state = pbcc_ee_state_copy_(state); /** Gather transition invocations list. */ size_t invocations_count = 0; - if (NULL != current_state->on_exit_invocations) + if (NULL != state && NULL != current_state->on_exit_invocations) invocations_count += pbarray_count(current_state->on_exit_invocations); - if (NULL != invocations) + if (NULL != state && NULL != invocations) invocations_count += pbarray_count(invocations); - if (NULL != state->on_enter_invocations) + if (NULL != state && NULL != state->on_enter_invocations) invocations_count += pbarray_count(state->on_enter_invocations); if (invocations_count > 0) { @@ -556,26 +577,32 @@ pbcc_ee_transition_t* pbcc_ee_transition_alloc( pbcc_ee_invocation_free); pbarray_t* on_exit = current_state->on_exit_invocations; - pbarray_t* on_enter = state->on_enter_invocations; + pbarray_t* on_enter = + NULL != state ? state->on_enter_invocations : NULL; + /** + * Attention: `invocations` should be called before `on_exit` because + * in most of the cases `on_exit` cancels previous request and it will + * reset all read buffers making it impossible to process data received + * for current state. + */ if (NULL == transition->invocations || - !pbcc_ee_transition_add_invocations_(transition, on_exit) || !pbcc_ee_transition_add_invocations_(transition, invocations) || + !pbcc_ee_transition_add_invocations_(transition, on_exit) || !pbcc_ee_transition_add_invocations_(transition, on_enter)) { PUBNUB_LOG_ERROR( "pbcc_ee_transition_alloc: failed to allocate memory " "for transition invocations\n"); if (NULL != invocations) { pbarray_free(&invocations); } pbcc_ee_transition_free(&transition); - // pubnub_mutex_unlock(ee->mutw); - printf("~~~~~~~~ pbcc_ee_transition_alloc released lock (%p). UNLOCKED? %s\n", ee, pubnub_mutex_unlock(ee->mutw) ? "YES" : "NO"); + pubnub_mutex_unlock(ee->mutw); + return NULL; } } else { transition->invocations = NULL; } if (NULL != invocations) { pbarray_free(&invocations); } - // pubnub_mutex_lock(ee->mutw); - printf("~~~~~~~~ pbcc_ee_transition_alloc released lock (%p). UNLOCKED? %s\n", ee, pubnub_mutex_lock(ee->mutw) ? "YES" : "NO"); + pubnub_mutex_unlock(ee->mutw); return transition; } @@ -666,16 +693,18 @@ void pbcc_ee_invocation_exec_(pbcc_ee_invocation_t* invocation) void pbcc_ee_effect_completion_( pbcc_event_engine_t* ee, - pbcc_ee_invocation_t* invocation) + pbcc_ee_invocation_t* invocation, + const bool paused) { /** Check whether invocation is running or not. */ if (PBCC_EE_INVOCATION_RUNNING != invocation->status) { return; } - // pubnub_mutex_lock(ee->mutw); - printf("~~~~~~~~ pbcc_ee_effect_completion_ acquired lock (%p). UNLOCKED? %s\n", ee, pubnub_mutex_lock(ee->mutw) ? "YES" : "NO"); - invocation->status = PBCC_EE_INVOCATION_COMPLETED; - pbarray_remove(ee->invocations, (void**)&invocation, true); - // pubnub_mutex_unlock(ee->mutw); - printf("~~~~~~~~ pbcc_ee_effect_completion_ released lock (%p). UNLOCKED? %s\n", ee, pubnub_mutex_unlock(ee->mutw) ? "YES" : "NO"); - pbcc_ee_process_next_invocation(ee, false); + /** Check whether called effect asked to pospone its execution or not. */ + if (paused) { + invocation->status = PBCC_EE_INVOCATION_CREATED; + return; + } + + pbcc_ee_handle_effect_completion(ee, invocation); + pbcc_ee_process_next_invocation(ee); } \ No newline at end of file diff --git a/core/pbcc_event_engine.h b/core/pbcc_event_engine.h index 111ad898..b51ad844 100644 --- a/core/pbcc_event_engine.h +++ b/core/pbcc_event_engine.h @@ -88,10 +88,15 @@ typedef void (*pbcc_ee_data_free_function_t)(void* data); * execution. * @param invocation Pointer to the invocation which has been used to execute * effect. + * @param paused Whether `invocation` execution has been paused or not. + * Paused invocation will reset its execution status to + * 'CREATED'. When paused, then the Event Engine should be + * told when the next invocation should be processed. */ typedef void (*pbcc_ee_effect_completion_function_t)( pbcc_event_engine_t* ee, - pbcc_ee_invocation_t* invocation); + pbcc_ee_invocation_t* invocation, + bool paused); /** * @brief Invocation effect function definition. @@ -105,6 +110,7 @@ typedef void (*pbcc_ee_effect_completion_function_t)( * into function for processing. * @param cb Pointer to the effect execution completion callback * function. + * @return Effect execution result. */ typedef void (*pbcc_ee_effect_function_t)( const pbcc_ee_invocation_t* invocation, @@ -183,14 +189,21 @@ enum pubnub_res pbcc_ee_handle_event( * * @param ee Pointer to the Event Engine which should dequeue * and process next invocation. - * @param remove_previous Whether current first element should be removed from - * the list or not.
\b Important: This flag can be - * in case of asynchronous operations when effect - * execution callback can't be called in place. + * @return Number of enqueued effect invocations (ongoing and just created). + */ +size_t pbcc_ee_process_next_invocation(pbcc_event_engine_t* ee); + +/** + * @brief Handle effect invocation completion. + * + * @param ee Pointer to the event engine which enqueued effect + * execution. + * @param invocation Pointer to the invocation which has been used to execute + * effect. */ -void pbcc_ee_process_next_invocation( +void pbcc_ee_handle_effect_completion( pbcc_event_engine_t* ee, - bool remove_previous); + pbcc_ee_invocation_t* invocation); /** * @brief Create sharable user-provided data object. @@ -224,7 +237,7 @@ void pbcc_ee_data_free(pbcc_ee_data_t* data); * should be retrieved. * @return Pointer to the user-provided value. */ -const void* pbcc_ee_data_value(const pbcc_ee_data_t* data); +void* pbcc_ee_data_value(pbcc_ee_data_t* data); /** * @brief Create a copy of existing data. @@ -367,6 +380,7 @@ void pbcc_ee_event_free(pbcc_ee_event_t** event); * @note Memory allocated for `data` will be managed by the Event Engine if * `data` destructor provided (`data_free`). * + * @param type A specific Event Engine invocation type. * @param effect Function which will be called by dispatcher during * invocation handling. * @param data Pointer to the data which has been associated by business @@ -379,10 +393,22 @@ void pbcc_ee_event_free(pbcc_ee_event_t** event); * the `pbcc_ee_invocation_free` to avoid a memory leak. */ pbcc_ee_invocation_t* pbcc_ee_invocation_alloc( + int type, pbcc_ee_effect_function_t effect, pbcc_ee_data_t* data, bool immediate); +/** + * @brief Cancel next non-running invocation of specified type. + * + * The first invocation which is currently not active and matches the specified + * type will be dequeued and freed. + * + * @param ee Pointer to the Event Engine which enqueue scheduled invocations. + * @param type A type of invocation which should be cancelled. + */ +void pbcc_ee_invocation_cancel_by_type(pbcc_event_engine_t* ee, int type); + /** * @brief Clean up resources used by the invocation object. * diff --git a/core/pbcc_subscribe_event_engine.c b/core/pbcc_subscribe_event_engine.c index cb02c1fa..5370f72e 100644 --- a/core/pbcc_subscribe_event_engine.c +++ b/core/pbcc_subscribe_event_engine.c @@ -43,21 +43,25 @@ * Updated list of subscribables will be used to trigger proper event for * subscription loop. * - * @param ee Pointer to the Subscribe Event Engine, which should transit to - * `receiving` state. - * @param cursor Pointer to the subscription cursor to be used with subscribe - * REST API. The SDK will try to catch up on missed messages if a - * cursor with older PubNub high-precision timetoken has been - * provided. Pass `NULL` to keep using cursor received from the - * previous subscribe REST API response. - * @param update Whether list of subscribable objects should be updated or not - * (for unsubscribe, it updated beforehand). + * @param ee Pointer to the Subscribe Event Engine, which should transit + * to `receiving` state. + * @param cursor Pointer to the subscription cursor to be used with + * subscribe REST API. The SDK will try to catch up on missed + * messages if a cursor with older PubNub high-precision + * timetoken has been provided. Pass `NULL` to keep using + * cursor received from the previous subscribe REST API + * response. + * @param update Whether list of subscribable objects should be updated or + * not (for unsubscribe, it updated beforehand). + * @param sent_by_ee Whether event has been sent by Subscribe Event Engine or + * not. * @return Result of subscribe enqueue transaction. */ static enum pubnub_res pbcc_subscribe_ee_subscribe_( pbcc_subscribe_ee_t* ee, const pubnub_subscribe_cursor_t* cursor, - bool update); + bool update, + bool sent_by_ee); /** * @brief Transit to `handshaking` / `receiving` or `unsubscribed` state. @@ -77,6 +81,18 @@ static enum pubnub_res pbcc_subscribe_ee_unsubscribe_( pbcc_subscribe_ee_t* ee, pbhash_set_t* subscribables); +/** + * @brief Perform postponed `unsubscribe` / `leave` operation. + * + * Subscribe Event Engine may save channels and groups from which it should + * unsubscribe for later time because there is ongoing request. Saved + * information will be used in this function to complete `leave` operation. + * + * @param ee Pointer to the Subscribe Event Engine, which may have information for `leave` request. + * @return `true` in case if `leave` request sending has been scheduled. + */ +static bool pbcc_subscribe_ee_postponed_unsubscribe_(pbcc_subscribe_ee_t* ee); + /** * @brief Actualize list of subscribable objects. * @@ -91,19 +107,15 @@ static enum pubnub_res pbcc_subscribe_ee_unsubscribe_( * another subscription object from subscription set tried to do the same. * As a result, the object may be removed from the subscription loop. * - * @param ee Pointer to the Subscribe Event Engine for which - * list of subscribable objects should be updated. - * @param excluded_subscribables Pointer to the set of subscribables which - * shouldn't be part of resulting subscribables - * list. + * @param ee Pointer to the Subscribe Event Engine for which list of + * subscribable objects should be updated. * @return Result of subscribable list update. * * @see pubnub_subscription_t * @see pubnub_subscription_set_t */ static enum pubnub_res pbcc_subscribe_ee_update_subscribables_( - const pbcc_subscribe_ee_t* ee, - pbhash_set_t* excluded_subscribables); + const pbcc_subscribe_ee_t* ee); /** * @brief Update Subscribe Event Engine list of subscribable objects. @@ -158,6 +170,20 @@ static void pbcc_subscribe_callback_( enum pubnub_res result, const void* user_data); +/** + * @brief Create string from the `array` elements. + * + * @param array Pointer to the array with string elements which should be + * joined by the separator. + * @param separator Pointer to the string which should be used to join elements. + * @return Pointer to the string from the `array` elements or `NULL` in case of + * insufficient memory error. The returned pointer must be passed to the + * `free` to avoid a memory leak. + */ +static char* pbcc_subscribe_ee_joined_array_elements_( + pbarray_t* array, + const char* separator); + // ---------------------------------------------- // Functions @@ -178,6 +204,17 @@ pbcc_subscribe_ee_t* pbcc_subscribe_ee_alloc(pubnub_t* pb) PBARRAY_RESIZE_BALANCED, PBARRAY_GENERIC_CONTENT_TYPE, (pbarray_element_free)pubnub_subscription_free_); + ee->leave_channels = pbarray_alloc( + SUBSCRIBABLE_LENGTH, + PBARRAY_RESIZE_BALANCED, + PBARRAY_CHAR_CONTENT_TYPE, + (pbarray_element_free)free); + ee->leave_channel_groups = pbarray_alloc( + SUBSCRIBABLE_LENGTH, + PBARRAY_RESIZE_BALANCED, + PBARRAY_CHAR_CONTENT_TYPE, + (pbarray_element_free)free); + ee->current_transaction = PBTT_NONE; pubnub_mutex_init(ee->mutw); if (NULL == ee->subscriptions) { @@ -192,6 +229,18 @@ pbcc_subscribe_ee_t* pbcc_subscribe_ee_alloc(pubnub_t* pb) pbcc_subscribe_ee_free(&ee); return NULL; } + if (NULL == ee->leave_channels) { + PUBNUB_LOG_ERROR("pbcc_subscribe_ee_alloc: failed to allocate memory " + "for unsubscribed channels\n"); + pbcc_subscribe_ee_free(&ee); + return NULL; + } + if (NULL == ee->leave_channel_groups) { + PUBNUB_LOG_ERROR("pbcc_subscribe_ee_alloc: failed to allocate memory " + "for unsubscribed channel groups\n"); + pbcc_subscribe_ee_free(&ee); + return NULL; + } /** Prepare storage for computed list of subscribable objects. */ ee->subscribables = pbhash_set_alloc( @@ -206,7 +255,7 @@ pbcc_subscribe_ee_t* pbcc_subscribe_ee_alloc(pubnub_t* pb) } ee->filter_expr = NULL; ee->heartbeat = PUBNUB_DEFAULT_PRESENCE_HEARTBEAT_VALUE; - ee->status = SUBSCRIPTION_STATUS_DISCONNECTED; + ee->status = PNSS_SUBSCRIPTION_STATUS_DISCONNECTED; ee->pb = pb; /** Setup event engine */ @@ -244,6 +293,9 @@ void pbcc_subscribe_ee_free(pbcc_subscribe_ee_t** ee) if (NULL != (*ee)->subscriptions) { pbarray_free(&(*ee)->subscriptions); } if (NULL != (*ee)->subscribables) pbhash_set_free(&(*ee)->subscribables); + if (NULL != (*ee)->leave_channel_groups) + pbarray_free(&(*ee)->leave_channel_groups); + if (NULL != (*ee)->leave_channels) { pbarray_free(&(*ee)->leave_channels); } if (NULL != (*ee)->filter_expr) { free((*ee)->filter_expr); } if (NULL != (*ee)->ee) { pbcc_ee_free(&(*ee)->ee); } if (NULL != (*ee)->event_listener) @@ -263,10 +315,8 @@ pbcc_event_listener_t* pbcc_subscribe_ee_event_listener( pbcc_ee_data_t* pbcc_subscribe_ee_current_state_context( const pbcc_subscribe_ee_t* ee) { - printf("~~~~~~~~ pbcc_subscribe_ee_current_state_context asking current context\n"); pbcc_ee_state_t* current_state = pbcc_ee_current_state(ee->ee); const pbcc_ee_data_t* data = NULL; - printf("~~~~~~~~ pbcc_subscribe_ee_current_state_context asking current context done\n"); if (NULL != current_state) { data = pbcc_ee_state_data(current_state); } pbcc_ee_state_free(¤t_state); @@ -315,7 +365,11 @@ enum pubnub_res pbcc_subscribe_ee_subscribe_with_subscription( return PNR_OUT_OF_MEMORY; } - const enum pubnub_res rslt = pbcc_subscribe_ee_subscribe_(ee, cursor, true); + const enum pubnub_res rslt = pbcc_subscribe_ee_subscribe_( + ee, + cursor, + true, + false); pubnub_mutex_unlock(ee->mutw); return rslt; @@ -374,7 +428,11 @@ enum pubnub_res pbcc_subscribe_ee_subscribe_with_subscription_set( return PNR_OUT_OF_MEMORY; } - const enum pubnub_res rslt = pbcc_subscribe_ee_subscribe_(ee, cursor, true); + const enum pubnub_res rslt = pbcc_subscribe_ee_subscribe_( + ee, + cursor, + true, + false); pubnub_mutex_unlock(ee->mutw); return rslt; @@ -428,7 +486,7 @@ enum pubnub_res pbcc_subscribe_ee_change_subscription_with_subscription_set( pubnub_mutex_lock(ee->mutw); enum pubnub_res rslt; - if (added) { rslt = pbcc_subscribe_ee_subscribe_(ee, NULL, true); } + if (added) { rslt = pbcc_subscribe_ee_subscribe_(ee, NULL, true, false); } else { const pubnub_subscription_options_t options = *(pubnub_subscription_options_t*)set; @@ -519,6 +577,36 @@ enum pubnub_res pbcc_subscribe_ee_unsubscribe_all(pbcc_subscribe_ee_t* ee) pbarray_remove_all(ee->subscription_sets); pbarray_remove_all(ee->subscriptions); + /** + * Update user presence information for channels which user actually + * left. + */ + if ((NULL != ch && 0 != strlen(ch)) || + (NULL != cg && 0 != strlen(cg))) { + pubnub_mutex_lock(ee->pb->monitor); + if (!pbnc_can_start_transaction(ee->pb)) { + pubnub_mutex_unlock(ee->pb->monitor); + /** Using array to handle consequencive call to unsubscribe. */ + pubnub_mutex_lock(ee->mutw); + if (NULL != ch && strlen(ch) > 0) + pbarray_add(ee->leave_channels, ch); + if (NULL != cg && strlen(cg) > 0) + pbarray_add(ee->leave_channel_groups, cg); + pubnub_mutex_unlock(ee->mutw); + } + else { + pubnub_mutex_unlock(ee->pb->monitor); + + pubnub_mutex_lock(ee->mutw); + ee->current_transaction = PBTT_LEAVE; + pubnub_mutex_unlock(ee->mutw); + + pubnub_leave(ee->pb, + 0 == strlen(ch) ? NULL : ch, + 0 == strlen(cg) ? NULL : cg); + } + } + rslt = pbcc_ee_handle_event(ee->ee, event); } } @@ -530,6 +618,22 @@ enum pubnub_res pbcc_subscribe_ee_unsubscribe_all(pbcc_subscribe_ee_t* ee) return rslt; } +void pbcc_subscribe_ee_handle_subscribe_error( + pbcc_subscribe_ee_t* ee, + const enum pubnub_res error) +{ + pbcc_ee_event_t* event = NULL; + pbcc_ee_state_t* state_object = pbcc_ee_current_state(ee->ee); + + if (SUBSCRIBE_EE_STATE_HANDSHAKING == pbcc_ee_state_type(state_object)) + event = pbcc_handshake_failure_event_alloc(ee, error); + else if (SUBSCRIBE_EE_STATE_RECEIVING == pbcc_ee_state_type(state_object)) + event = pbcc_receive_failure_event_alloc(ee, error); + + if (NULL != event) { pbcc_ee_handle_event(ee->ee, event); } + if (NULL != state_object) { pbcc_ee_state_free(&state_object); } +} + pubnub_subscription_t** pbcc_subscribe_ee_subscriptions( pbcc_subscribe_ee_t* ee, size_t* count) @@ -565,30 +669,38 @@ void pbcc_subscribe_callback_( const void* user_data ) { - printf( - "~~~~~~~~:::::::>>>> CALLBACK FIRED (RESULT: %s | TRANSACTION: %d)\n", - pubnub_res_2_string(result), - trans); - pthread_t thread_id = pthread_self(); - printf("Current thread ID: %lu\n", (unsigned long)thread_id); pbcc_subscribe_ee_t* ee = (pbcc_subscribe_ee_t*)user_data; /** - * Asynchronously cancelled subscription and presence leave shouldn't - * trigger any events. + * Checking whether a leave request should be performed if a previous + * (potentially subscribe) request has been cancelled or was another leave + * request. */ - if ((PBTT_SUBSCRIBE_V2 == trans && PNR_CANCELLED == result) || + if (PNR_CANCELLED == result || PBTT_HEARTBEAT == trans || PBTT_LEAVE == trans) { - /** Asynchronous cancellation mean that */ - pbcc_ee_process_next_invocation(ee->ee, PBTT_LEAVE != trans); - return; + pubnub_mutex_lock(ee->mutw); + if (NULL != ee->cancel_invocation) { + pbcc_ee_handle_effect_completion(ee->ee, ee->cancel_invocation); + ee->cancel_invocation = NULL; + } + if (PBTT_HEARTBEAT == trans || PBTT_LEAVE == trans) { + ee->current_transaction = PBTT_NONE; + /** Flush read buffer. */ + pubnub_get(ee->pb); + } + pubnub_mutex_unlock(ee->mutw); + + if (pbcc_subscribe_ee_postponed_unsubscribe_(ee)) { return; } + + /** + * Leave or heartbeat request successfully processed. Trying schedule + * any pending effect invocations. + */ + if (0 != pbcc_ee_process_next_invocation(ee->ee)) { return; } } - PUBNUB_ASSERT_OPT(trans == PBTT_SUBSCRIBE_V2); const bool error = PNR_OK != result; - printf("~~~~~~~~ pbcc_subscribe_callback_ asking current context\n"); pbcc_ee_state_t* state_object = pbcc_ee_current_state(ee->ee); - printf("~~~~~~~~ pbcc_subscribe_callback_ asking current context done\n"); pbcc_ee_event_t* event = NULL; pubnub_subscribe_cursor_t cursor; cursor.region = 0; @@ -619,13 +731,14 @@ void pbcc_subscribe_callback_( enum pubnub_res pbcc_subscribe_ee_subscribe_( pbcc_subscribe_ee_t* ee, const pubnub_subscribe_cursor_t* cursor, - const bool update) + const bool update, + const bool sent_by_ee) { pbcc_ee_event_t* event = NULL; char* ch = NULL,* cg = NULL; enum pubnub_res rslt = PNR_OK; - if (update) { rslt = pbcc_subscribe_ee_update_subscribables_(ee, NULL); } + if (update) { rslt = pbcc_subscribe_ee_update_subscribables_(ee); } if (PNR_OK == rslt) { rslt = pbcc_subscribe_ee_subscribables_(ee->subscribables, &ch, @@ -649,13 +762,18 @@ enum pubnub_res pbcc_subscribe_ee_subscribe_( const bool restore = NULL != cursor && '0' != cursor->timetoken[0]; if (NULL == cursor || !restore) - event = pbcc_subscription_changed_event_alloc(ee, &ch, &cg); + event = pbcc_subscription_changed_event_alloc( + ee, + &ch, + &cg, + sent_by_ee); else event = pbcc_subscription_restored_event_alloc( ee, &ch, &cg, - *cursor); + *cursor, + sent_by_ee); if (NULL == event) { PUBNUB_LOG_ERROR( "pbcc_subscribe_ee_subscribe: failed to allocate memory for " @@ -679,22 +797,29 @@ enum pubnub_res pbcc_subscribe_ee_unsubscribe_( pbcc_subscribe_ee_t* ee, pbhash_set_t* subscribables) { - char* ch = NULL,* cg = NULL; - bool send_leave = false; - enum pubnub_res rslt; + char* ch = NULL,* cg = NULL; + bool send_leave = false; size_t count = 0; pubnub_subscribable_t** subs = (pubnub_subscribable_t**) pbhash_set_elements(subscribables, &count); if (NULL == subs) { return PNR_OUT_OF_MEMORY; } + /** + * After subscription or subscription set removal we need to update + * subscribables list. + */ + enum pubnub_res rslt = pbcc_subscribe_ee_update_subscribables_(ee); + if (PNR_OK != rslt) { + free(subs); + return rslt; + } + /** Removing subscribable which is not part of subscription loop. */ for (size_t i = 0; i < count; ++i) { pubnub_subscribable_t* sub = subs[i]; - printf("~~~~> SUBSCRIBABLE: %s\n", sub->id->ptr); - if (pbhash_set_contains(ee->subscribables, sub)) { - printf("~~~~> STILL USED: %s\n", sub->id->ptr); + if (pbhash_set_contains(ee->subscribables, sub->id->ptr)) { pbhash_set_remove(subscribables, (void**)&sub->id->ptr, (void**)&sub); @@ -703,62 +828,103 @@ enum pubnub_res pbcc_subscribe_ee_unsubscribe_( } free(subs); - if (PNR_OK == (rslt = pbcc_subscribe_ee_update_subscribables_( - ee, - subscribables))) { - pbcc_subscribe_ee_subscribables_( - subscribables, - &ch, - &cg, - false); - if (PNR_OK == rslt || PNR_INVALID_PARAMETERS == rslt) { - send_leave = PNR_OK == rslt; - rslt = PNR_OK; - } + rslt = pbcc_subscribe_ee_subscribables_(subscribables, &ch, &cg, false); + if (PNR_OK == rslt || PNR_INVALID_PARAMETERS == rslt) { + send_leave = PNR_OK == rslt; + rslt = PNR_OK; } /** * Update user presence information for channels which user actually * left. */ + bool sending_leave = false; if (PNR_OK == rslt && send_leave) { - printf("~~~~======> 1\n"); - // TODO: CLIENT MAY NOT BE READY TO PROCESS LEAVE. - pubnub_leave(ee->pb, - 0 == strlen(ch) ? NULL : ch, - 0 == strlen(cg) ? NULL : cg); - printf("~~~~======> 2"); + pubnub_mutex_lock(ee->pb->monitor); + if (!pbnc_can_start_transaction(ee->pb)) { + pubnub_mutex_unlock(ee->pb->monitor); + /** Using array to handle consequencive call to unsubscribe. */ + pubnub_mutex_lock(ee->mutw); + if (NULL != ch && strlen(ch) > 0) + pbarray_add(ee->leave_channels, ch); + if (NULL != cg && strlen(cg) > 0) + pbarray_add(ee->leave_channel_groups, cg); + pubnub_mutex_unlock(ee->mutw); + } + else { + pubnub_mutex_unlock(ee->pb->monitor); + + pubnub_mutex_lock(ee->mutw); + ee->current_transaction = PBTT_LEAVE; + pubnub_mutex_unlock(ee->mutw); + sending_leave = true; + + pubnub_leave(ee->pb, + 0 == strlen(ch) ? NULL : ch, + 0 == strlen(cg) ? NULL : cg); + } + } + + if (PNR_OK != rslt || !send_leave || sending_leave) { + if (NULL != ch) { free(ch); } + if (NULL != cg) { free(cg); } } /** Update subscription loop. */ - printf("~~~~======> 3"); - if (PNR_OK == rslt) { - printf("~~~~======> 4"); - rslt = pbcc_subscribe_ee_subscribe_(ee, NULL, false); - printf("~~~~======> 5"); + if (PNR_OK == rslt) + rslt = pbcc_subscribe_ee_subscribe_(ee, NULL, false, true); + + return rslt; +} + +bool pbcc_subscribe_ee_postponed_unsubscribe_(pbcc_subscribe_ee_t* ee) +{ + pubnub_mutex_lock(ee->mutw); + if (0 == pbarray_count(ee->leave_channels) && + 0 == pbarray_count(ee->leave_channel_groups)) { + pubnub_mutex_unlock(ee->mutw); + return false; } - if (NULL != ch) { - printf("~~~~======> 6"); - free(ch); - printf("~~~~======> 7"); + /** + * Checking whether there is another ongoing request and we need to wait + * till its completion to start another one. + */ + if (PBTT_NONE != ee->current_transaction) { + pubnub_mutex_unlock(ee->mutw); + return true; } - if (NULL != cg) { - printf("~~~~======> 8"); - free(cg); - printf("~~~~======> 9"); + + char* ch = + pbcc_subscribe_ee_joined_array_elements_(ee->leave_channels, ","); + char* cg = + pbcc_subscribe_ee_joined_array_elements_(ee->leave_channel_groups, ","); + + /** Cleaning up postponed channels and groups. */ + pbarray_remove_all(ee->leave_channels); + pbarray_remove_all(ee->leave_channel_groups); + + if (NULL == ch && NULL == cg) { + pubnub_mutex_unlock(ee->mutw); + return false; } - printf("~~~~======> 10"); - return rslt; + ee->current_transaction = PBTT_LEAVE; + pubnub_mutex_unlock(ee->mutw); + + pubnub_leave(ee->pb, + NULL == ch || 0 == strlen(ch) ? NULL : ch, + NULL == cg || 0 == strlen(cg) ? NULL : cg); + if (NULL != ch) { free(ch); } + if (NULL != cg) { free(cg); } + + return true; } enum pubnub_res pbcc_subscribe_ee_update_subscribables_( - const pbcc_subscribe_ee_t* ee, - pbhash_set_t* excluded_subscribables) + const pbcc_subscribe_ee_t* ee) { PUBNUB_ASSERT_OPT(NULL != ee); - pbarray_t* sets = ee->subscription_sets; pbarray_t* subs = ee->subscriptions; enum pubnub_res rslt = PNR_OK; @@ -782,9 +948,6 @@ enum pubnub_res pbcc_subscribe_ee_update_subscribables_( if (PNR_OK != rslt) { return rslt; } } - if (NULL != excluded_subscribables) - pbhash_set_subtract(ee->subscribables, excluded_subscribables); - return rslt; } @@ -871,6 +1034,7 @@ enum pubnub_res pbcc_subscribe_ee_subscribables_( } } + /** Extra byte for null-terminator. */ const size_t ch_len = ch_list_length + ch_count + 1; const size_t cg_len = cg_list_length + cg_count + 1; *channels = calloc(ch_len, sizeof(char)); @@ -909,7 +1073,7 @@ enum pubnub_res pbcc_subscribe_ee_subscribables_( sub->id->ptr); } else { - cg_offset += snprintf(*channels + cg_offset, + cg_offset += snprintf(*channel_groups + cg_offset, cg_len - cg_offset, "%s%s", cg_offset > 0 ? ",": "", @@ -921,4 +1085,34 @@ enum pubnub_res pbcc_subscribe_ee_subscribables_( return ch_list_length == 0 && cg_list_length == 0 ? PNR_INVALID_PARAMETERS : PNR_OK; +} + +char* pbcc_subscribe_ee_joined_array_elements_( + pbarray_t* array, + const char* separator) +{ + size_t offset = 0; + size_t count; + char** elements = (char**)pbarray_elements(array, &count); + if (NULL == elements) { return NULL; } + + /** Extra byte for null-terminator. */ + size_t len = (count > 0 ? (count - 1) * strlen(separator) : 0) + 1; + for (size_t i = 0; i < count; ++i) { len += strlen(elements[i]); } + + char* joined_str = malloc(len * sizeof(char)); + if (NULL == joined_str || 0 == count) { + free(elements); + return joined_str; + } + + for (int i = 0; i < count; ++i) { + offset += snprintf(joined_str + offset, + len - offset, + "%s%s", + offset > 0 ? separator : "", + elements[i]); + } + + return joined_str; } \ No newline at end of file diff --git a/core/pbcc_subscribe_event_engine.h b/core/pbcc_subscribe_event_engine.h index e580d097..992ba915 100644 --- a/core/pbcc_subscribe_event_engine.h +++ b/core/pbcc_subscribe_event_engine.h @@ -237,6 +237,17 @@ enum pubnub_res pbcc_subscribe_ee_reconnect( */ enum pubnub_res pbcc_subscribe_ee_unsubscribe_all(pbcc_subscribe_ee_t* ee); +/** + * @brief Handle Subscribe v2 request schedule error. + * + * @param ee Pointer to the Subscribe Event Engine, which should handle next + * subscription schedule error. + * @param error Subscribe operation call result (error). + */ +void pbcc_subscribe_ee_handle_subscribe_error( + pbcc_subscribe_ee_t* ee, + enum pubnub_res error); + /** * @brief Retrieve created subscriptions. * diff --git a/core/pbcc_subscribe_event_engine_effects.c b/core/pbcc_subscribe_event_engine_effects.c index 0be70c26..2dd0c499 100644 --- a/core/pbcc_subscribe_event_engine_effects.c +++ b/core/pbcc_subscribe_event_engine_effects.c @@ -1,13 +1,24 @@ /* -*- c-file-style:"stroustrup"; indent-tabs-mode: nil -*- */ #include "pbcc_subscribe_event_engine_effects.h" +#include + #include "core/pbcc_subscribe_event_engine_types.h" #include "core/pubnub_subscribe_v2.h" #include "core/pubnub_pubsubapi.h" +#include "core/pubnub_coreapi.h" #include "core/pubnub_assert.h" #include "pubnub_internal.h" +// ---------------------------------------------- +// Constants +// ---------------------------------------------- + +/** Maximum channel name length */ +#define PBCC_SUBSCRIBE_EE_CHANNEL_MAXIMUM_LENGTH 2100 + + // ---------------------------------------------- // Function prototypes // ---------------------------------------------- @@ -21,9 +32,8 @@ * information to perform Subscribe v2 call. * @param cb Pointer to the effect execution completion callback * function. - * @return Subscribe v2 call operation result. */ -static enum pubnub_res make_subscribe_request_( +static void make_subscribe_request_( pbcc_ee_invocation_t* invocation, pbcc_ee_data_t* context, pbcc_ee_effect_completion_function_t cb); @@ -33,23 +43,23 @@ static enum pubnub_res make_subscribe_request_( // Functions // ---------------------------------------------- -enum pubnub_res pbcc_subscribe_ee_handshake_effect( +void pbcc_subscribe_ee_handshake_effect( pbcc_ee_invocation_t* invocation, pbcc_ee_data_t* context, const pbcc_ee_effect_completion_function_t cb) { - return make_subscribe_request_(invocation, context, cb); + make_subscribe_request_(invocation, context, cb); } -enum pubnub_res pbcc_subscribe_ee_receive_effect( +void pbcc_subscribe_ee_receive_effect( pbcc_ee_invocation_t* invocation, pbcc_ee_data_t* context, const pbcc_ee_effect_completion_function_t cb) { - return make_subscribe_request_(invocation, context, cb); + make_subscribe_request_(invocation, context, cb); } -enum pubnub_res pbcc_subscribe_ee_emit_status_effect( +void pbcc_subscribe_ee_emit_status_effect( pbcc_ee_invocation_t* invocation, pbcc_ee_data_t* context, const pbcc_ee_effect_completion_function_t cb) @@ -67,17 +77,16 @@ enum pubnub_res pbcc_subscribe_ee_emit_status_effect( ctx->reason, pbcc_ee_data_value(ctx->channels), pbcc_ee_data_value(ctx->channel_groups)); - cb(subscribe_ee->ee, invocation); + cb(subscribe_ee->ee, invocation, false); pbcc_ee_data_free(context_copy); - - return PNR_OK; } -enum pubnub_res pbcc_subscribe_ee_emit_messages_effect( +void pbcc_subscribe_ee_emit_messages_effect( pbcc_ee_invocation_t* invocation, pbcc_ee_data_t* context, const pbcc_ee_effect_completion_function_t cb) { + char subscribable_name[PBCC_SUBSCRIBE_EE_CHANNEL_MAXIMUM_LENGTH]; pbcc_ee_data_t* context_copy = pbcc_ee_data_copy(context); const pbcc_subscribe_ee_context_t* ctx = pbcc_ee_data_value(context_copy); const pbcc_subscribe_ee_t* subscribe_ee = ctx->pb->core.subscribe_ee; @@ -85,13 +94,16 @@ enum pubnub_res pbcc_subscribe_ee_emit_messages_effect( for (struct pubnub_v2_message msg = pubnub_get_v2(pb); msg.payload.size > 0; msg = pubnub_get_v2(pb)) { - pbcc_event_listener_emit_message(subscribe_ee->event_listener, msg); + struct pubnub_char_mem_block subscribable; + if (msg.match_or_group.size) { subscribable = msg.match_or_group; } + else {subscribable = msg.channel;} + memcpy(subscribable_name, subscribable.ptr, subscribable.size); + subscribable_name[subscribable.size] = '\0'; + pbcc_event_listener_emit_message(subscribe_ee->event_listener, subscribable_name, msg); } - cb(subscribe_ee->ee, invocation); + cb(subscribe_ee->ee, invocation, false); pbcc_ee_data_free(context_copy); - - return PNR_OK; } void pbcc_subscribe_ee_cancel_effect( @@ -103,15 +115,32 @@ void pbcc_subscribe_ee_cancel_effect( pbcc_ee_data_t* context_copy = pbcc_ee_data_copy(context); const pbcc_subscribe_ee_context_t* ctx = pbcc_ee_data_value(context_copy); - const pbcc_subscribe_ee_t* subscribe_ee = ctx->pb->core.subscribe_ee; + pbcc_subscribe_ee_t* subscribe_ee = ctx->pb->core.subscribe_ee; PUBNUB_ASSERT(pb_valid_ctx_ptr(ctx->pb)); + /** + * Check whether there is any request is in progress and don't let it to be + * cancelled. + */ + pubnub_mutex_lock(subscribe_ee->mutw); + const enum pubnub_trans transaction = subscribe_ee->current_transaction; + if (PBTT_SUBSCRIBE_V2 != transaction && PBTT_NONE != transaction) { + pubnub_mutex_unlock(subscribe_ee->mutw); + pbcc_ee_data_free(context_copy); + cb(subscribe_ee->ee, invocation, true); + + return; + } + pubnub_mutex_unlock(subscribe_ee->mutw); + if (PN_CANCEL_FINISHED == pubnub_cancel(ctx->pb)) - cb(subscribe_ee->ee, invocation); + cb(subscribe_ee->ee, invocation, false); + else + subscribe_ee->cancel_invocation = invocation; pbcc_ee_data_free(context_copy); } -enum pubnub_res make_subscribe_request_( +void make_subscribe_request_( pbcc_ee_invocation_t* invocation, pbcc_ee_data_t* context, const pbcc_ee_effect_completion_function_t cb) @@ -119,27 +148,78 @@ enum pubnub_res make_subscribe_request_( PUBNUB_ASSERT_OPT(NULL != context); pbcc_ee_data_t* context_copy = pbcc_ee_data_copy(context); - const pbcc_subscribe_ee_context_t* ctx = pbcc_ee_data_value(context_copy); + pbcc_subscribe_ee_context_t* ctx = pbcc_ee_data_value(context_copy); + pbcc_subscribe_ee_t* subscribe_ee = ctx->pb->core.subscribe_ee; pubnub_t* pb = ctx->pb; + + /** + * Check whether there any request is in progress and postpone subscribe + * effect execution untill it will be completed. + */ + pubnub_mutex_lock(subscribe_ee->mutw); + if (PBTT_NONE != subscribe_ee->current_transaction) { + pubnub_mutex_unlock(subscribe_ee->mutw); + cb(subscribe_ee->ee, invocation, true); + pbcc_ee_data_free(context_copy); + + return; + } + + if (ctx->send_heartbeat) { + pubnub_mutex_lock(pb->monitor); + if (!pbnc_can_start_transaction(pb)) { + pubnub_mutex_unlock(pb->monitor); + + /** Postpone invocation because there is ongoing request. */ + cb(subscribe_ee->ee, invocation, true); + pbcc_ee_data_free(context_copy); + + return; + } + + pubnub_mutex_unlock(pb->monitor); + ctx->send_heartbeat = false; + subscribe_ee->current_transaction = PBTT_HEARTBEAT; + pubnub_heartbeat(subscribe_ee->pb, + pbcc_ee_data_value(ctx->channels), + pbcc_ee_data_value(ctx->channel_groups)); + pubnub_mutex_unlock(subscribe_ee->mutw); + + /** Postpone invocation because there is ongoing heratbeat request. */ + cb(subscribe_ee->ee, invocation, true); + pbcc_ee_data_free(context_copy); + + return; + } + pubnub_mutex_unlock(subscribe_ee->mutw); + /** Override timetoken stored for next subscription loop. */ pubnub_mutex_lock(pb->monitor); - size_t token_len = strlen(pb->core.timetoken); + size_t token_len = strlen(ctx->cursor.timetoken); memcpy(pb->core.timetoken, ctx->cursor.timetoken, token_len); pb->core.timetoken[token_len] = '\0'; if (ctx->cursor.region > 0) { pb->core.region = ctx->cursor.region; } pbpal_mutex_unlock(pb->monitor); struct pubnub_subscribe_v2_options opts = pubnub_subscribe_v2_defopts(); - opts.filter_expr = pb->core.subscribe_ee->filter_expr; + opts.filter_expr = subscribe_ee->filter_expr; opts.channel_group = pbcc_ee_data_value(ctx->channel_groups); - opts.heartbeat = pb->core.subscribe_ee->heartbeat; + opts.heartbeat = subscribe_ee->heartbeat; const enum pubnub_res rslt = pubnub_subscribe_v2( pb, pbcc_ee_data_value(ctx->channels), opts); - if (PNR_STARTED == rslt) { cb(pb->core.subscribe_ee->ee, invocation); } - pbcc_ee_data_free(context_copy); - return rslt; + /** + * Report effect invocation called or should be paused if not started. + * If other request in progress, after registered callback call it will + * trigger next invocation which will call paused once more. + */ + cb(subscribe_ee->ee, invocation, PNR_IN_PROGRESS == rslt); + + if (PNR_OK != rslt && PNR_STARTED != rslt && PNR_IN_PROGRESS != rslt) + pbcc_subscribe_ee_handle_subscribe_error(subscribe_ee, rslt); + + pbcc_ee_data_free(context_copy); } \ No newline at end of file diff --git a/core/pbcc_subscribe_event_engine_effects.h b/core/pbcc_subscribe_event_engine_effects.h index bda9f064..527cc0e9 100644 --- a/core/pbcc_subscribe_event_engine_effects.h +++ b/core/pbcc_subscribe_event_engine_effects.h @@ -31,9 +31,8 @@ * for initial subscribe (tt=0). * @param cb Pointer to the effect execution completion callback * function. - * @return Handshake effect execution (scheduling) operation result. */ -enum pubnub_res pbcc_subscribe_ee_handshake_effect( +void pbcc_subscribe_ee_handshake_effect( pbcc_ee_invocation_t* invocation, pbcc_ee_data_t* context, pbcc_ee_effect_completion_function_t cb); @@ -50,9 +49,8 @@ enum pubnub_res pbcc_subscribe_ee_handshake_effect( * for next subscription loop (tt!=0). * @param cb Pointer to the effect execution completion callback * function. - * @return Receive effect execution (scheduling) operation result. */ -enum pubnub_res pbcc_subscribe_ee_receive_effect( +void pbcc_subscribe_ee_receive_effect( pbcc_ee_invocation_t* invocation, pbcc_ee_data_t* context, pbcc_ee_effect_completion_function_t cb); @@ -67,9 +65,8 @@ enum pubnub_res pbcc_subscribe_ee_receive_effect( * change listeners. * @param cb Pointer to the effect execution completion callback * function. - * @return Status change emit operation result. */ -enum pubnub_res pbcc_subscribe_ee_emit_status_effect( +void pbcc_subscribe_ee_emit_status_effect( pbcc_ee_invocation_t* invocation, pbcc_ee_data_t* context, pbcc_ee_effect_completion_function_t cb); @@ -84,9 +81,8 @@ enum pubnub_res pbcc_subscribe_ee_emit_status_effect( * update listeners. * @param cb Pointer to the effect execution completion callback * function. - * @return Real-time updates emit operation result. */ -enum pubnub_res pbcc_subscribe_ee_emit_messages_effect( +void pbcc_subscribe_ee_emit_messages_effect( pbcc_ee_invocation_t* invocation, pbcc_ee_data_t* context, pbcc_ee_effect_completion_function_t cb); diff --git a/core/pbcc_subscribe_event_engine_events.c b/core/pbcc_subscribe_event_engine_events.c index 80c89e07..2ba8afe3 100644 --- a/core/pbcc_subscribe_event_engine_events.c +++ b/core/pbcc_subscribe_event_engine_events.c @@ -26,6 +26,8 @@ * subscription REST API loop call. * @param reason Subscription processing result code which * explain failure reason. + * @param sent_by_ee Whether event created per Event Engine request + * or not. * @return Pointer to requested `event` type, which will be processed by the * Subscribe Event Engine to get transition instructions from the * current state. @@ -36,7 +38,8 @@ static pbcc_ee_event_t* pbcc_subscribe_ee_event_alloc_( char** channels, char** channel_groups, const pubnub_subscribe_cursor_t* cursor, - const enum pubnub_res* reason); + const enum pubnub_res* reason, + bool sent_by_ee); /** * @brief Create Subscribe Event Engine context object. @@ -76,6 +79,8 @@ pbcc_subscribe_ee_context_t* pbcc_subscribe_ee_context_alloc_( * comma-separated channel groups which should be * used instead of `channel_groups` from provided * context. + * @param copy_subscribables Whether subscribables should be copied from + * the context or not. * @return Pointer to the ready to use Subscribe Event Engine context created * from the data of source context, or `NULL` in case of insufficient * memory error. @@ -86,7 +91,8 @@ pbcc_subscribe_ee_context_t* pbcc_subscribe_ee_context_copy_( pubnub_t* pb, const pbcc_subscribe_ee_context_t* ctx, char** channels, - char** channel_groups); + char** channel_groups, + bool copy_subscribables); /** * @brief Clean up resources used by subscription context. @@ -103,13 +109,15 @@ void pbcc_subscribe_ee_context_free_(pbcc_subscribe_ee_context_t* ctx); pbcc_ee_event_t* pbcc_subscription_changed_event_alloc( pbcc_subscribe_ee_t* ee, char** channels, - char** channel_groups) + char** channel_groups, + const bool sent_by_ee) { - if (NULL != channels && 0 == strlen(*channels)) { + if (NULL != channels && NULL != *channels && 0 == strlen(*channels)) { free(*channels); *channels = NULL; } - if (NULL != channel_groups && 0 == strlen(*channel_groups)) { + if (NULL != channel_groups && NULL != *channel_groups && + 0 == strlen(*channel_groups)) { free(*channel_groups); *channel_groups = NULL; } @@ -121,20 +129,23 @@ pbcc_ee_event_t* pbcc_subscription_changed_event_alloc( channels, channel_groups, NULL, - NULL); + NULL, + sent_by_ee); } pbcc_ee_event_t* pbcc_subscription_restored_event_alloc( pbcc_subscribe_ee_t* ee, char** channels, char** channel_groups, - const pubnub_subscribe_cursor_t cursor) + const pubnub_subscribe_cursor_t cursor, + const bool sent_by_ee) { - if (NULL != channels && 0 == strlen(*channels)) { + if (NULL != channels && NULL != *channels && 0 == strlen(*channels)) { free(*channels); *channels = NULL; } - if (NULL != channel_groups && 0 == strlen(*channel_groups)) { + if (NULL != channel_groups && NULL != *channel_groups && + 0 == strlen(*channel_groups)) { free(*channel_groups); *channel_groups = NULL; } @@ -146,7 +157,8 @@ pbcc_ee_event_t* pbcc_subscription_restored_event_alloc( channels, channel_groups, &cursor, - NULL); + NULL, + sent_by_ee); } pbcc_ee_event_t* pbcc_handshake_success_event_alloc( @@ -154,7 +166,13 @@ pbcc_ee_event_t* pbcc_handshake_success_event_alloc( const pubnub_subscribe_cursor_t cursor) { const pbcc_subscribe_ee_event type = SUBSCRIBE_EE_EVENT_HANDSHAKE_SUCCESS; - return pbcc_subscribe_ee_event_alloc_(ee, type, NULL, NULL, &cursor, NULL); + return pbcc_subscribe_ee_event_alloc_(ee, + type, + NULL, + NULL, + &cursor, + NULL, + true); } pbcc_ee_event_t* pbcc_handshake_failure_event_alloc( @@ -162,7 +180,13 @@ pbcc_ee_event_t* pbcc_handshake_failure_event_alloc( const enum pubnub_res reason) { const pbcc_subscribe_ee_event type = SUBSCRIBE_EE_EVENT_HANDSHAKE_FAILURE; - return pbcc_subscribe_ee_event_alloc_(ee, type, NULL, NULL, NULL, &reason); + return pbcc_subscribe_ee_event_alloc_(ee, + type, + NULL, + NULL, + NULL, + &reason, + true); } pbcc_ee_event_t* pbcc_receive_success_event_alloc( @@ -170,7 +194,13 @@ pbcc_ee_event_t* pbcc_receive_success_event_alloc( const pubnub_subscribe_cursor_t cursor) { const pbcc_subscribe_ee_event type = SUBSCRIBE_EE_EVENT_RECEIVE_SUCCESS; - return pbcc_subscribe_ee_event_alloc_(ee, type, NULL, NULL, &cursor, NULL); + return pbcc_subscribe_ee_event_alloc_(ee, + type, + NULL, + NULL, + &cursor, + NULL, + true); } pbcc_ee_event_t* pbcc_receive_failure_event_alloc( @@ -178,13 +208,25 @@ pbcc_ee_event_t* pbcc_receive_failure_event_alloc( const enum pubnub_res reason) { const pbcc_subscribe_ee_event type = SUBSCRIBE_EE_EVENT_RECEIVE_FAILURE; - return pbcc_subscribe_ee_event_alloc_(ee, type, NULL, NULL, NULL, &reason); + return pbcc_subscribe_ee_event_alloc_(ee, + type, + NULL, + NULL, + NULL, + &reason, + true); } pbcc_ee_event_t* pbcc_disconnect_event_alloc(pbcc_subscribe_ee_t* ee) { const pbcc_subscribe_ee_event type = SUBSCRIBE_EE_EVENT_DISCONNECT; - return pbcc_subscribe_ee_event_alloc_(ee, type, NULL, NULL, NULL, NULL); + return pbcc_subscribe_ee_event_alloc_(ee, + type, + NULL, + NULL, + NULL, + NULL, + true); } pbcc_ee_event_t* pbcc_reconnect_event_alloc( @@ -192,7 +234,13 @@ pbcc_ee_event_t* pbcc_reconnect_event_alloc( const pubnub_subscribe_cursor_t cursor) { const pbcc_subscribe_ee_event type = SUBSCRIBE_EE_EVENT_RECONNECT; - return pbcc_subscribe_ee_event_alloc_(ee, type, NULL, NULL, &cursor, NULL); + return pbcc_subscribe_ee_event_alloc_(ee, + type, + NULL, + NULL, + &cursor, + NULL, + true); } pbcc_ee_event_t* pbcc_unsubscribe_all_event_alloc( @@ -200,11 +248,12 @@ pbcc_ee_event_t* pbcc_unsubscribe_all_event_alloc( char** channels, char** channel_groups) { - if (NULL != channels && 0 == strlen(*channels)) { + if (NULL != channels && NULL != *channels && 0 == strlen(*channels)) { free(*channels); *channels = NULL; } - if (NULL != channel_groups && 0 == strlen(*channel_groups)) { + if (NULL != channel_groups && NULL != *channel_groups && + 0 == strlen(*channel_groups)) { free(*channel_groups); *channel_groups = NULL; } @@ -215,7 +264,8 @@ pbcc_ee_event_t* pbcc_unsubscribe_all_event_alloc( channels, channel_groups, NULL, - NULL); + NULL, + false); } pbcc_ee_event_t* pbcc_subscribe_ee_event_alloc_( @@ -224,18 +274,26 @@ pbcc_ee_event_t* pbcc_subscribe_ee_event_alloc_( char** channels, char** channel_groups, const pubnub_subscribe_cursor_t* cursor, - const enum pubnub_res* reason) + const enum pubnub_res* reason, + const bool sent_by_ee) { pubnub_mutex_lock(ee->mutw); pbcc_ee_data_t* current_state_data = pbcc_subscribe_ee_current_state_context(ee); const pbcc_subscribe_ee_context_t* current_context = pbcc_ee_data_value(current_state_data); + const bool copy_subscribables = + event != SUBSCRIBE_EE_EVENT_SUBSCRIPTION_CHANGED && + event != SUBSCRIBE_EE_EVENT_SUBSCRIPTION_RESTORED; + pbcc_subscribe_ee_context_t* ctx = pbcc_subscribe_ee_context_copy_( ee->pb, current_context, channels, - channel_groups); + channel_groups, + copy_subscribables); + /** For subscription change / restore we need to send heartbeat. */ + ctx->send_heartbeat = !copy_subscribables && !sent_by_ee; pbcc_ee_data_free(current_state_data); if (NULL == ctx) { @@ -289,6 +347,7 @@ pbcc_subscribe_ee_context_t* pbcc_subscribe_ee_context_alloc_( context->reason = PNR_OK; context->channels = NULL; context->channel_groups = NULL; + context->send_heartbeat = false; if (NULL != channels && NULL != *channels) context->channels = pbcc_ee_data_alloc(*channels, free); if (NULL != channel_groups && NULL != *channel_groups) { @@ -303,7 +362,8 @@ pbcc_subscribe_ee_context_t* pbcc_subscribe_ee_context_copy_( pubnub_t* pb, const pbcc_subscribe_ee_context_t* ctx, char** channels, - char** channel_groups) + char** channel_groups, + const bool copy_subscribables) { pbcc_subscribe_ee_context_t* context = pbcc_subscribe_ee_context_alloc_( pb, @@ -312,10 +372,12 @@ pbcc_subscribe_ee_context_t* pbcc_subscribe_ee_context_copy_( if (NULL == context) { return NULL; } if (NULL == ctx) { return context; } - if (NULL == context->channels && NULL != ctx->channels) - context->channels = pbcc_ee_data_copy(ctx->channels); - if (NULL == context->channel_groups && NULL != ctx->channel_groups) - context->channel_groups = pbcc_ee_data_copy(ctx->channel_groups); + if (copy_subscribables) { + if (NULL == context->channels && NULL != ctx->channels) + context->channels = pbcc_ee_data_copy(ctx->channels); + if (NULL == context->channel_groups && NULL != ctx->channel_groups) + context->channel_groups = pbcc_ee_data_copy(ctx->channel_groups); + } context->cursor = ctx->cursor; return context; diff --git a/core/pbcc_subscribe_event_engine_events.h b/core/pbcc_subscribe_event_engine_events.h index 65e20d25..a73c7e4b 100644 --- a/core/pbcc_subscribe_event_engine_events.h +++ b/core/pbcc_subscribe_event_engine_events.h @@ -39,14 +39,17 @@ * @param [in,out] channel_groups Pointer to the byte sting of comma-separated * channel groups from which PubNub client should * receive real-time updates. + * @param sent_by_ee Whether event has been sent by Subscribe Event + * Engine or not. * @return Pointer to the `Subscription change event`, which will be processed * by the Subscribe Event Engine to get transition instructions from the * current state. */ pbcc_ee_event_t* pbcc_subscription_changed_event_alloc( pbcc_subscribe_ee_t* ee, - char** channels, - char** channel_groups); + char** channels, + char** channel_groups, + bool sent_by_ee); /** * @brief Subscription restore / catchup event. @@ -70,15 +73,18 @@ pbcc_ee_event_t* pbcc_subscription_changed_event_alloc( * @param cursor Time token to which PubNub client should try * to restore subscription (catch up on missing * messages) loop. + * @param sent_by_ee Whether event has been sent by Subscribe Event + * Engine or not. * @return Pointer to the `Subscription restore event`, which will be processed * by the Subscribe Event Engine to get transition instructions from the * current state. */ pbcc_ee_event_t* pbcc_subscription_restored_event_alloc( - pbcc_subscribe_ee_t* ee, - char** channels, - char** channel_groups, - pubnub_subscribe_cursor_t cursor); + pbcc_subscribe_ee_t* ee, + char** channels, + char** channel_groups, + pubnub_subscribe_cursor_t cursor, + bool sent_by_ee); /** * @brief Initial subscription handshake success event. @@ -95,7 +101,7 @@ pbcc_ee_event_t* pbcc_subscription_restored_event_alloc( * instructions from the current state. */ pbcc_ee_event_t* pbcc_handshake_success_event_alloc( - pbcc_subscribe_ee_t* ee, + pbcc_subscribe_ee_t* ee, pubnub_subscribe_cursor_t cursor); /** @@ -115,7 +121,7 @@ pbcc_ee_event_t* pbcc_handshake_success_event_alloc( */ pbcc_ee_event_t* pbcc_handshake_failure_event_alloc( pbcc_subscribe_ee_t* ee, - enum pubnub_res reason); + enum pubnub_res reason); /** * @default Real-time updates receive success event. @@ -131,7 +137,7 @@ pbcc_ee_event_t* pbcc_handshake_failure_event_alloc( * instructions from the current state. */ pbcc_ee_event_t* pbcc_receive_success_event_alloc( - pbcc_subscribe_ee_t* ee, + pbcc_subscribe_ee_t* ee, pubnub_subscribe_cursor_t cursor); /** @@ -147,7 +153,7 @@ pbcc_ee_event_t* pbcc_receive_success_event_alloc( */ pbcc_ee_event_t* pbcc_receive_failure_event_alloc( pbcc_subscribe_ee_t* ee, - enum pubnub_res reason); + enum pubnub_res reason); /** * @brief Disconnect from real-time updates event. @@ -178,7 +184,7 @@ pbcc_ee_event_t* pbcc_disconnect_event_alloc(pbcc_subscribe_ee_t* ee); * current state. */ pbcc_ee_event_t* pbcc_reconnect_event_alloc( - pbcc_subscribe_ee_t* ee, + pbcc_subscribe_ee_t* ee, pubnub_subscribe_cursor_t cursor); /** @@ -208,8 +214,8 @@ pbcc_ee_event_t* pbcc_reconnect_event_alloc( */ pbcc_ee_event_t* pbcc_unsubscribe_all_event_alloc( pbcc_subscribe_ee_t* ee, - char** channels, - char** channel_groups); + char** channels, + char** channel_groups); #else // #if PUBNUB_USE_SUBSCRIBE_EVENT_ENGINE #error To use subscribe event engine API you must define PUBNUB_USE_SUBSCRIBE_EVENT_ENGINE=1 #endif // #if PUBNUB_USE_SUBSCRIBE_EVENT_ENGINE diff --git a/core/pbcc_subscribe_event_engine_states.c b/core/pbcc_subscribe_event_engine_states.c index c961e1ee..b9b80944 100644 --- a/core/pbcc_subscribe_event_engine_states.c +++ b/core/pbcc_subscribe_event_engine_states.c @@ -1,8 +1,10 @@ /* -*- c-file-style:"stroustrup"; indent-tabs-mode: nil -*- */ #include "pbcc_subscribe_event_engine_states.h" + #include "core/pbcc_subscribe_event_engine_transitions.h" #include "core/pbcc_subscribe_event_engine_effects.h" #include "core/pbcc_subscribe_event_engine_types.h" +#include "core/pubnub_assert.h" // ---------------------------------------------- @@ -17,7 +19,9 @@ pbcc_ee_state_t* pbcc_unsubscribed_state_alloc() pbcc_unsubscribed_state_transition_alloc); } -pbcc_ee_state_t* pbcc_handshaking_state_alloc(pbcc_ee_data_t* context) +pbcc_ee_state_t* pbcc_handshaking_state_alloc( + pbcc_event_engine_t* ee, + pbcc_ee_data_t* context) { pbcc_ee_state_t* state = pbcc_ee_state_alloc( SUBSCRIBE_EE_STATE_HANDSHAKING, @@ -26,10 +30,14 @@ pbcc_ee_state_t* pbcc_handshaking_state_alloc(pbcc_ee_data_t* context) if (NULL == state) { return NULL; } pbcc_ee_invocation_t* on_enter = pbcc_ee_invocation_alloc( + SUBSCRIBE_EE_INVOCATION_HANDSHAKE, (pbcc_ee_effect_function_t)pbcc_subscribe_ee_handshake_effect, context, false); + /** Make sure that there is no outdated subscribe call. */ + pbcc_ee_invocation_cancel_by_type(ee, SUBSCRIBE_EE_INVOCATION_HANDSHAKE); + if (NULL == on_enter || PNR_OK != pbcc_ee_state_add_on_enter_invocation(state, on_enter)) { if (NULL != on_enter) { pbcc_ee_invocation_free(on_enter); } @@ -38,6 +46,7 @@ pbcc_ee_state_t* pbcc_handshaking_state_alloc(pbcc_ee_data_t* context) } pbcc_ee_invocation_t* on_exit = pbcc_ee_invocation_alloc( + SUBSCRIBE_EE_INVOCATION_CANCEL, (pbcc_ee_effect_function_t)pbcc_subscribe_ee_cancel_effect, context, true); @@ -67,7 +76,9 @@ pbcc_ee_state_t* pbcc_handshake_stopped_state_alloc(pbcc_ee_data_t* context) pbcc_handshake_stopped_state_transition_alloc); } -pbcc_ee_state_t* pbcc_receiving_state_alloc(pbcc_ee_data_t* context) +pbcc_ee_state_t* pbcc_receiving_state_alloc( + pbcc_event_engine_t* ee, + pbcc_ee_data_t* context) { pbcc_ee_state_t* state = pbcc_ee_state_alloc( SUBSCRIBE_EE_STATE_RECEIVING, @@ -76,10 +87,14 @@ pbcc_ee_state_t* pbcc_receiving_state_alloc(pbcc_ee_data_t* context) if (NULL == state) { return NULL; } pbcc_ee_invocation_t* on_enter = pbcc_ee_invocation_alloc( + SUBSCRIBE_EE_INVOCATION_RECEIVE, (pbcc_ee_effect_function_t)pbcc_subscribe_ee_receive_effect, context, false); + /** Make sure that there is no outdated subscribe call. */ + pbcc_ee_invocation_cancel_by_type(ee, SUBSCRIBE_EE_INVOCATION_RECEIVE); + if (NULL == on_enter || PNR_OK != pbcc_ee_state_add_on_enter_invocation(state, on_enter)) { if (NULL != on_enter) { pbcc_ee_invocation_free(on_enter); } @@ -88,6 +103,7 @@ pbcc_ee_state_t* pbcc_receiving_state_alloc(pbcc_ee_data_t* context) } pbcc_ee_invocation_t* on_exit = pbcc_ee_invocation_alloc( + SUBSCRIBE_EE_INVOCATION_CANCEL, (pbcc_ee_effect_function_t)pbcc_subscribe_ee_cancel_effect, context, true); diff --git a/core/pbcc_subscribe_event_engine_states.h b/core/pbcc_subscribe_event_engine_states.h index df821070..58e79610 100644 --- a/core/pbcc_subscribe_event_engine_states.h +++ b/core/pbcc_subscribe_event_engine_states.h @@ -25,6 +25,7 @@ * State with which Subscribe Event Engine will be initialized and end up * when there will be no channels or groups to subscribe to. * + * @param ee Pointer to the Event Engine which new state should be created. * @return Pointer to the `Unsubscribed` state. */ pbcc_ee_state_t* pbcc_unsubscribed_state_alloc(void); @@ -36,13 +37,16 @@ pbcc_ee_state_t* pbcc_unsubscribed_state_alloc(void); * subscription loop time token (cursor) and notify channels / groups subscribes * about change in subscriber's presence. * + * @param ee Pointer to the Event Engine which new state should be created. * @param context Pointer to the context with updated Subscribe Event Engine * information which should be used for the next subscription * loop. * @return Pointer to the `Handshaking state`, which should be used as * Subscribe Event Engine state transition target state. */ -pbcc_ee_state_t* pbcc_handshaking_state_alloc(pbcc_ee_data_t* context); +pbcc_ee_state_t* pbcc_handshaking_state_alloc( + pbcc_event_engine_t* ee, + pbcc_ee_data_t* context); /** * @brief Initial subscription failed state. @@ -78,13 +82,16 @@ pbcc_ee_state_t* pbcc_handshake_stopped_state_alloc(pbcc_ee_data_t* context); * State, which is used by the Subscribe Event Engine to perform a long-poll * subscription loop to receive real-time updates. * + * @param ee Pointer to the Event Engine which new state should be created. * @param context Pointer to the context with list of channels / groups and time * token (cursor) received from previous subscription loop to * receive next real-time updates. * @return Pointer to the `Receiving real-time updates state`, which should be * used as Subscribe Event Engine state transition target state. */ -pbcc_ee_state_t* pbcc_receiving_state_alloc(pbcc_ee_data_t* context); +pbcc_ee_state_t* pbcc_receiving_state_alloc( + pbcc_event_engine_t* ee, + pbcc_ee_data_t* context); /** * @brief Real-time updates receive failed state. @@ -111,6 +118,7 @@ pbcc_ee_state_t* pbcc_receive_failed_state_alloc(pbcc_ee_data_t* context); * @param context Pointer to the context with list of channels / groups and time * token (cursor) received from previous subscription loop to * receive next real-time updates. + * receive next real-time updates. * @return Pointer to the `Receive real-time updates stopped state`, which * should be used as Subscribe Event Engine state transition target * state. diff --git a/core/pbcc_subscribe_event_engine_transitions.c b/core/pbcc_subscribe_event_engine_transitions.c index 1c07a0fc..4887154a 100644 --- a/core/pbcc_subscribe_event_engine_transitions.c +++ b/core/pbcc_subscribe_event_engine_transitions.c @@ -104,6 +104,7 @@ pbcc_ee_transition_t* pbcc_handshaking_state_transition_alloc( PBARRAY_GENERIC_CONTENT_TYPE, (pbarray_element_free)pbcc_ee_invocation_free); pbcc_ee_invocation_t* invocation = pbcc_ee_invocation_alloc( + SUBSCRIBE_EE_INVOCATION_EMIT_STATUS, (pbcc_ee_effect_function_t)pbcc_subscribe_ee_emit_status_effect, data, false); @@ -127,8 +128,8 @@ pbcc_ee_transition_t* pbcc_handshaking_state_transition_alloc( /** Update latest Subscribe Event Engine subscription status. */ pubnub_mutex_lock(subscribe_ee->mutw); subscribe_ee->status = SUBSCRIBE_EE_STATE_RECEIVING == target_state_type - ? SUBSCRIPTION_STATUS_CONNECTED - : SUBSCRIPTION_STATUS_CONNECTION_ERROR; + ? PNSS_SUBSCRIPTION_STATUS_CONNECTED + : PNSS_SUBSCRIPTION_STATUS_CONNECTION_ERROR; pubnub_mutex_unlock(subscribe_ee->mutw); } break; @@ -168,6 +169,7 @@ pbcc_ee_transition_t* pbcc_handshake_failed_state_transition_alloc( switch (event->type) { case SUBSCRIBE_EE_EVENT_SUBSCRIPTION_CHANGED: + case SUBSCRIBE_EE_EVENT_SUBSCRIPTION_RESTORED: { PUBNUB_ASSERT_OPT(NULL != data); const pbcc_subscribe_ee_context_t* context = pbcc_ee_data_value(data); @@ -175,14 +177,13 @@ pbcc_ee_transition_t* pbcc_handshake_failed_state_transition_alloc( pbcc_ee_data_value(context->channel_groups); const char* channels = pbcc_ee_data_value(context->channels); - if (NULL != context && 0 == strlen(channels) && - 0 == strlen(channel_groups)) { + if (NULL != context && (NULL == channels || 0 == strlen(channels)) && + (NULL == channel_groups || 0 == strlen(channel_groups))) { target_state_type = SUBSCRIBE_EE_STATE_UNSUBSCRIBED; - data = NULL; } else { target_state_type = SUBSCRIBE_EE_STATE_HANDSHAKING; } - break; - case SUBSCRIBE_EE_EVENT_SUBSCRIPTION_RESTORED: + } + break; case SUBSCRIBE_EE_EVENT_RECONNECT: target_state_type = SUBSCRIBE_EE_STATE_HANDSHAKING; break; @@ -255,24 +256,39 @@ pbcc_ee_transition_t* pbcc_receiving_state_transition_alloc( switch (event->type) { case SUBSCRIBE_EE_EVENT_SUBSCRIPTION_CHANGED: - case SUBSCRIBE_EE_EVENT_SUBSCRIPTION_RESTORED: - target_state_type = SUBSCRIBE_EE_STATE_RECEIVING; - status = SUBSCRIPTION_STATUS_SUBSCRIPTION_CHANGED; - break; + case SUBSCRIBE_EE_EVENT_SUBSCRIPTION_RESTORED: { + const pbcc_subscribe_ee_context_t* context = ( + pbcc_subscribe_ee_context_t*) + pbcc_ee_data_value(data); + const char* channel_groups = + pbcc_ee_data_value(context->channel_groups); + const char* channels = pbcc_ee_data_value(context->channels); + + if (NULL != context && (NULL == channels || 0 == strlen(channels)) && + (NULL == channel_groups || 0 == strlen(channel_groups))) { + target_state_type = SUBSCRIBE_EE_STATE_UNSUBSCRIBED; + status = PNSS_SUBSCRIPTION_STATUS_DISCONNECTED; + } + else { + target_state_type = SUBSCRIBE_EE_STATE_RECEIVING; + status = PNSS_SUBSCRIPTION_STATUS_SUBSCRIPTION_CHANGED; + } + } + break; case SUBSCRIBE_EE_EVENT_RECEIVE_SUCCESS: target_state_type = SUBSCRIBE_EE_STATE_RECEIVING; break; case SUBSCRIBE_EE_EVENT_RECEIVE_FAILURE: target_state_type = SUBSCRIBE_EE_STATE_RECEIVE_FAILED; - status = SUBSCRIPTION_STATUS_DISCONNECTED_UNEXPECTEDLY; + status = PNSS_SUBSCRIPTION_STATUS_DISCONNECTED_UNEXPECTEDLY; break; case SUBSCRIBE_EE_EVENT_DISCONNECT: target_state_type = SUBSCRIBE_EE_STATE_RECEIVE_STOPPED; - status = SUBSCRIPTION_STATUS_DISCONNECTED; + status = PNSS_SUBSCRIPTION_STATUS_DISCONNECTED; break; case SUBSCRIBE_EE_EVENT_UNSUBSCRIBE_ALL: target_state_type = SUBSCRIBE_EE_STATE_UNSUBSCRIBED; - status = SUBSCRIPTION_STATUS_DISCONNECTED; + status = PNSS_SUBSCRIPTION_STATUS_DISCONNECTED; break; default: data = NULL; @@ -297,6 +313,7 @@ pbcc_ee_transition_t* pbcc_receiving_state_transition_alloc( const pbcc_subscribe_ee_context_t* context = pbcc_ee_data_value(data); pbcc_subscribe_ee_t* subscribe_ee = context->pb->core.subscribe_ee; invocation = pbcc_ee_invocation_alloc( + SUBSCRIBE_EE_INVOCATION_EMIT_STATUS, (pbcc_ee_effect_function_t)pbcc_subscribe_ee_emit_status_effect, data, false); @@ -309,6 +326,7 @@ pbcc_ee_transition_t* pbcc_receiving_state_transition_alloc( if (SUBSCRIBE_EE_EVENT_RECEIVE_SUCCESS == event->type) { invocation = pbcc_ee_invocation_alloc( + SUBSCRIBE_EE_INVOCATION_EMIT_MESSAGE, (pbcc_ee_effect_function_t)pbcc_subscribe_ee_emit_messages_effect, data, false); @@ -339,7 +357,7 @@ pbcc_ee_transition_t* pbcc_receiving_state_transition_alloc( return pbcc_transition_alloc_(ee, target_state_type, - event->data, + data, invocations); } @@ -432,7 +450,7 @@ pbcc_ee_transition_t* pbcc_transition_alloc_( target_state = pbcc_unsubscribed_state_alloc(); break; case SUBSCRIBE_EE_STATE_HANDSHAKING: - target_state = pbcc_handshaking_state_alloc(state_context); + target_state = pbcc_handshaking_state_alloc(ee, state_context); break; case SUBSCRIBE_EE_STATE_HANDSHAKE_FAILED: target_state = pbcc_handshake_failed_state_alloc(state_context); @@ -441,7 +459,7 @@ pbcc_ee_transition_t* pbcc_transition_alloc_( target_state = pbcc_handshake_stopped_state_alloc(state_context); break; case SUBSCRIBE_EE_STATE_RECEIVING: - target_state = pbcc_receiving_state_alloc(state_context); + target_state = pbcc_receiving_state_alloc(ee, state_context); break; case SUBSCRIBE_EE_STATE_RECEIVE_FAILED: target_state = pbcc_receive_failed_state_alloc(state_context); @@ -451,7 +469,7 @@ pbcc_ee_transition_t* pbcc_transition_alloc_( break; default: PUBNUB_LOG_ERROR("pbcc_transition_alloc: unknown target state type\n"); - return NULL; + break; } if (NULL == target_state && SUBSCRIBE_EE_STATE_NONE != target_state_type) { diff --git a/core/pbcc_subscribe_event_engine_types.h b/core/pbcc_subscribe_event_engine_types.h index c3c4298a..e6f363c6 100644 --- a/core/pbcc_subscribe_event_engine_types.h +++ b/core/pbcc_subscribe_event_engine_types.h @@ -65,6 +65,20 @@ typedef enum { SUBSCRIBE_EE_STATE_RECEIVE_STOPPED, } pbcc_subscribe_ee_state; +/** Subscribe Event Engine events. */ +typedef enum { + /** Subscription list of channels / groups handshake invocation. */ + SUBSCRIBE_EE_INVOCATION_HANDSHAKE, + /** Receive real-time updates for list of channels / groups invocation. */ + SUBSCRIBE_EE_INVOCATION_RECEIVE, + /** Emit subscription change status invocation. */ + SUBSCRIBE_EE_INVOCATION_EMIT_STATUS, + /** Emit real-time update invocation. */ + SUBSCRIBE_EE_INVOCATION_EMIT_MESSAGE, + /** Cancel subscription invocation. */ + SUBSCRIBE_EE_INVOCATION_CANCEL, +} pbcc_subscribe_ee_invocation; + /** Subscribe event engine structure. */ struct pbcc_subscribe_ee { /** @@ -104,8 +118,33 @@ struct pbcc_subscribe_ee { char* filter_expr; /** Recent subscription status. */ pubnub_subscription_status status; - /** Subscribe Event Listener. */ + /** Pointer to the Subscribe Event Listener. */ pbcc_event_listener_t* event_listener; + /** + * @brief Pointer to the active subscription cancel invocation. + * + * PubNub context may need some time to complete cancellation operation and + * report to the callback + */ + pbcc_ee_invocation_t* cancel_invocation; + /** + * @brief Pointer to the list of channels to leave. + * + * @note Because of PubNub specific with single request at a time we need to + * wait when cancel invocation will complete to be able to send + * `leave` request for channels. + */ + pbarray_t* leave_channels; + /** + * @brief Pointer to the list of channel groups to leave. + * + * @note Because of PubNub specific with single request at a time we need to + * wait when cancel invocation will complete to be able to send + * `leave` request for channel groups. + */ + pbarray_t* leave_channel_groups; + /** Type of transaction which is currently in progress. */ + enum pubnub_trans current_transaction; /** * @brief Pointer to the Event Engine which handles all states and * transitions on events. @@ -142,6 +181,8 @@ typedef struct { pubnub_subscribe_cursor_t cursor; /** Previous request failure reason. */ enum pubnub_res reason; + /** Whether subscription requires heartbeat to be sent before subscribe. */ + bool send_heartbeat; /** * @brief Pointer to the PubNub context, which should be used for effects * execution by Subscribe Event Engine effects dispatcher. diff --git a/core/pbcc_subscribe_event_listener.c b/core/pbcc_subscribe_event_listener.c index dc81e7c7..6b72b304 100644 --- a/core/pbcc_subscribe_event_listener.c +++ b/core/pbcc_subscribe_event_listener.c @@ -471,6 +471,7 @@ void pbcc_event_listener_emit_status( void pbcc_event_listener_emit_message( pbcc_event_listener_t* listener, + const char* subscribable, const struct pubnub_v2_message message) { if (NULL == listener) { return; } @@ -485,7 +486,7 @@ void pbcc_event_listener_emit_message( // Notify subscribable object message listeners (if any has been registered). const pbcc_object_listener_t* object_listener = (pbcc_object_listener_t*) - pbhash_set_element(listener->listeners, message.channel.ptr); + pbhash_set_element(listener->listeners, subscribable); if (NULL != object_listener) { pbcc_event_listener_emit_message_(listener, object_listener->listeners, @@ -649,18 +650,18 @@ pubnub_subscribe_listener_type pbcc_message_type_to_listener_type_( { switch (type) { case pbsbPublished: - return LISTENER_ON_MESSAGE; + return PBSL_LISTENER_ON_MESSAGE; case pbsbSignal: - return LISTENER_ON_SIGNAL; + return PBSL_LISTENER_ON_SIGNAL; case pbsbAction: - return LISTENER_ON_MESSAGE_ACTION; + return PBSL_LISTENER_ON_MESSAGE_ACTION; case pbsbObjects: - return LISTENER_ON_OBJECTS; + return PBSL_LISTENER_ON_OBJECTS; case pbsbFiles: - return LISTENER_ON_FILES; + return PBSL_LISTENER_ON_FILES; } - return LISTENER_ON_MESSAGE; + return PBSL_LISTENER_ON_MESSAGE; } pbarray_t* pbcc_initialize_array_( diff --git a/core/pbcc_subscribe_event_listener.h b/core/pbcc_subscribe_event_listener.h index 4c914aee..37971723 100644 --- a/core/pbcc_subscribe_event_listener.h +++ b/core/pbcc_subscribe_event_listener.h @@ -160,9 +160,9 @@ enum pubnub_res pbcc_event_listener_remove_subscription_object_listener( * listeners for subscription status change event. * @param status New subscription status which should be sent the the * listeners. - * @param reason In case of `SUBSCRIPTION_STATUS_CONNECTION_ERROR` and - * `SUBSCRIPTION_STATUS_DISCONNECTED_UNEXPECTEDLY` may - * contain additional information about reasons of + * @param reason In case of `PNSS_SUBSCRIPTION_STATUS_CONNECTION_ERROR` + * and `PNSS_SUBSCRIPTION_STATUS_DISCONNECTED_UNEXPECTEDLY` + * may contain additional information about reasons of * failure. * @param channels Byte string with comma-separated / `NULL` channel * identifiers which have been used with recent operation. @@ -179,12 +179,16 @@ void pbcc_event_listener_emit_status( /** * @brief Notify listeners about new real-time update / message. * - * @param listener Pointer to the Event Listener which contains list of - * listeners for subscription real-time update / message event. + * @param listener Pointer to the Event Listener which contains list of + * listeners for subscription real-time update / message + * event. + * @param subscribable Pointer to the subscrbable entity for which `message` has + * been received. * @param message Received message which should be delivered to the listeners. */ void pbcc_event_listener_emit_message( pbcc_event_listener_t* listener, + const char* subscribable, struct pubnub_v2_message message); /** diff --git a/core/pbcc_subscribe_v2.c b/core/pbcc_subscribe_v2.c index 90b2e36f..9d424b1e 100644 --- a/core/pbcc_subscribe_v2.c +++ b/core/pbcc_subscribe_v2.c @@ -302,7 +302,7 @@ struct pubnub_v2_message pbcc_get_msg_v2(struct pbcc_context* p) else { rslt.message_type = pbsbPublished; } - + jpresult = pbjson_get_object_value(&el, "p", &found); if (jonmpOK == jpresult) { struct pbjson_elem titel; @@ -328,8 +328,8 @@ struct pubnub_v2_message pbcc_get_msg_v2(struct pbcc_context* p) } if (jonmpOK == pbjson_get_object_value(&el, "b", &found)) { - rslt.match_or_group.ptr = (char*)found.start; - rslt.match_or_group.size = found.end - found.start; + rslt.match_or_group.ptr = (char*)found.start + 1; + rslt.match_or_group.size = found.end - found.start - 2; } if (jonmpOK == pbjson_get_object_value(&el, "u", &found)) { diff --git a/core/pbpal_ntf_callback_admin.c b/core/pbpal_ntf_callback_admin.c index 3ec175e4..46a67e42 100644 --- a/core/pbpal_ntf_callback_admin.c +++ b/core/pbpal_ntf_callback_admin.c @@ -44,7 +44,6 @@ void pbntf_trans_outcome(pubnub_t* pb, enum pubnub_state state) PUBNUB_LOG_TRACE("pbntf_trans_outcome(pb=%p) calling callback:\n" "pb->trans = %d, pb->core.last_result=%d, pb->user_data=%p\n", pb, pb->trans, pb->core.last_result, pb->user_data); - printf("~~~~ pbntf_trans_outcome 1\n"); pb->cb(pb, pb->trans, pb->core.last_result, pb->user_data); } } diff --git a/core/pubnub_coreapi.c b/core/pubnub_coreapi.c index 2f2c49d6..5eec3c42 100644 --- a/core/pubnub_coreapi.c +++ b/core/pubnub_coreapi.c @@ -85,19 +85,13 @@ enum pubnub_res pubnub_leave(pubnub_t* p, const char* channel, const char* chann char const* prep_channel; char const* prep_channel_group; - printf("~~~~ pubnub_leave 1\n"); PUBNUB_ASSERT(pb_valid_ctx_ptr(p)); - printf("~~~~ pubnub_leave 2\n"); pubnub_mutex_lock(p->monitor); - printf("~~~~ pubnub_leave 3\n"); if (!pbnc_can_start_transaction(p)) { - printf("~~~~ pubnub_leave 4\n"); pubnub_mutex_unlock(p->monitor); - printf("~~~~ pubnub_leave 5\n"); return rslt; } - printf("~~~~ pubnub_leave 6\n"); check_if_default_channel_and_groups(p, channel, channel_group, diff --git a/core/pubnub_subscribe_event_engine.c b/core/pubnub_subscribe_event_engine.c index 56dcd5e5..d1ea0d78 100644 --- a/core/pubnub_subscribe_event_engine.c +++ b/core/pubnub_subscribe_event_engine.c @@ -240,10 +240,11 @@ pubnub_subscription_set_alloc_with_entities( if (NULL == pb) { pb = entity->pb; } if (NULL == sub || PNR_OUT_OF_MEMORY == pubnub_subscription_set_add(set, sub)) { - if (NULL != sub) { pubnub_subscription_free(&sub); } pubnub_subscription_set_free(&set); return NULL; } + /** Release extra `subscription` reference because `set` retained it. */ + if (NULL != sub) { pubnub_subscription_free(&sub); } } if (pb) { set->ee = pb->core.subscribe_ee; } @@ -459,8 +460,9 @@ pubnub_subscribe_cursor_t pubnub_subscribe_cursor(const char* timetoken) pubnub_subscribe_cursor_t cursor; if (NULL != timetoken) { - memcpy(cursor.timetoken, timetoken, sizeof(cursor.timetoken) - 1); - cursor.timetoken[sizeof(cursor.timetoken) - 1] = '\0'; + size_t token_len = strlen(timetoken); + memcpy(cursor.timetoken, timetoken, token_len); + cursor.timetoken[token_len] = '\0'; } else { cursor.timetoken[0] = '0'; @@ -476,7 +478,6 @@ enum pubnub_res pubnub_subscribe_with_subscription( const pubnub_subscribe_cursor_t* cursor) { if (NULL == sub) { return PNR_INVALID_PARAMETERS; } - printf("~~~~~ pubnub_subscribe_with_subscription: %p\n", sub->ee); const enum pubnub_res rslt = pbcc_subscribe_ee_subscribe_with_subscription( sub->ee, @@ -626,7 +627,7 @@ enum pubnub_res pubnub_subscription_set_add_( enum pubnub_res pubnub_subscription_set_remove_( const pubnub_subscription_set_t* set, pubnub_subscription_t** sub, - bool notify_change) + const bool notify_change) { const bool subscribed = set->subscribed; @@ -639,6 +640,15 @@ enum pubnub_res pubnub_subscription_set_remove_( return PNR_SUB_NOT_FOUND; } + /** Preventing `pbhash_set` (set->subscriptions) from freeing `sub`. */ + pubnub_subscription_t* stored_subscription = (pubnub_subscription_t*) + pbhash_set_element(set->subscriptions, (*sub)->entity->id.ptr); + bool same_object = stored_subscription == *sub; + subscription_reference_count_update_(stored_subscription, true); + pbhash_set_remove(set->subscriptions, + (void**)&stored_subscription->entity->id.ptr, + (void**)&stored_subscription); + enum pubnub_res rslt = PNR_OK; if (subscribed && notify_change) { /** Notify changes in active subscription set. */ @@ -649,9 +659,9 @@ enum pubnub_res pubnub_subscription_set_remove_( false); } - pbhash_set_remove(set->subscriptions, - (void**)&(*sub)->entity->id.ptr, - (void**)sub); + /** Trying to free up memory used for `sub`. */ + pubnub_subscription_free(&stored_subscription); + if (NULL == stored_subscription && same_object) { *sub = NULL; } return rslt; } @@ -808,7 +818,6 @@ void subscription_set_reference_count_update_( bool pubnub_subscription_set_subscription_free_(pubnub_subscription_t* sub) { - subscription_reference_count_update_(sub, false); return pubnub_subscription_free(&sub); } diff --git a/core/pubnub_subscribe_event_engine.h b/core/pubnub_subscribe_event_engine.h index 44e5f94a..c65ed4c8 100644 --- a/core/pubnub_subscribe_event_engine.h +++ b/core/pubnub_subscribe_event_engine.h @@ -646,7 +646,7 @@ PUBNUB_EXTERN enum pubnub_res pubnub_subscribe_with_subscription( /** * @brief Stop receiving real-time updates for subscription entity. * @code - * enum pubnub_res rslt = pubnub_unsubscribe_with_subscription(subscription); + * enum pubnub_res rslt = pubnub_unsubscribe_with_subscription(&subscription); * if (PNR_OK != rslt) { * // handle unsubscription error (mostly because of parameters error). * } diff --git a/core/pubnub_subscribe_event_engine_types.h b/core/pubnub_subscribe_event_engine_types.h index bf30fa68..b436cf41 100644 --- a/core/pubnub_subscribe_event_engine_types.h +++ b/core/pubnub_subscribe_event_engine_types.h @@ -87,21 +87,21 @@ typedef struct { /** PubNub subscription statuses. */ typedef enum { /** PubNub client subscribe and ready to receive real-time updates. */ - SUBSCRIPTION_STATUS_CONNECTED, + PNSS_SUBSCRIPTION_STATUS_CONNECTED, /** PubNub client were unable to subscribe to receive real-time updates. */ - SUBSCRIPTION_STATUS_CONNECTION_ERROR, + PNSS_SUBSCRIPTION_STATUS_CONNECTION_ERROR, /** PubNub client has been disconnected because of some error. */ - SUBSCRIPTION_STATUS_DISCONNECTED_UNEXPECTEDLY, + PNSS_SUBSCRIPTION_STATUS_DISCONNECTED_UNEXPECTEDLY, /** * @brief PubNub client has been intentionally temporarily disconnected from * the real-time updates. */ - SUBSCRIPTION_STATUS_DISCONNECTED, + PNSS_SUBSCRIPTION_STATUS_DISCONNECTED, /** * @brief PubNub client has been unsubscribed from all real-time update * sources. */ - SUBSCRIPTION_STATUS_SUBSCRIPTION_CHANGED + PNSS_SUBSCRIPTION_STATUS_SUBSCRIPTION_CHANGED } pubnub_subscription_status; #else // #if PUBNUB_USE_SUBSCRIBE_EVENT_ENGINE #error To use subscribe event engine API you must define PUBNUB_USE_SUBSCRIBE_EVENT_ENGINE=1 diff --git a/core/pubnub_subscribe_event_listener_types.h b/core/pubnub_subscribe_event_listener_types.h index 8d9cfe56..c2edb9f4 100644 --- a/core/pubnub_subscribe_event_listener_types.h +++ b/core/pubnub_subscribe_event_listener_types.h @@ -21,15 +21,15 @@ typedef enum { /** Listener to handle real-time messages. */ - LISTENER_ON_MESSAGE, + PBSL_LISTENER_ON_MESSAGE, /** Listener to handle real-time signals. */ - LISTENER_ON_SIGNAL, + PBSL_LISTENER_ON_SIGNAL, /** Listener to handle message action real-time updates. */ - LISTENER_ON_MESSAGE_ACTION, + PBSL_LISTENER_ON_MESSAGE_ACTION, /** Listener to handle App Context real-time updates. */ - LISTENER_ON_OBJECTS, + PBSL_LISTENER_ON_OBJECTS, /** Listener to handle real-time files sharing events. */ - LISTENER_ON_FILES + PBSL_LISTENER_ON_FILES } pubnub_subscribe_listener_type; /** PubNub subscription status change data object. */ @@ -38,9 +38,9 @@ typedef struct /** * @brief Error details in case of error. * - * In case of `SUBSCRIPTION_STATUS_CONNECTION_ERROR` and - * `SUBSCRIPTION_STATUS_DISCONNECTED_UNEXPECTEDLY` may contain additional - * information about reasons of failure. + * In case of `PNSS_SUBSCRIPTION_STATUS_CONNECTION_ERROR` and + * `PNSS_SUBSCRIPTION_STATUS_DISCONNECTED_UNEXPECTEDLY` may contain + * additional information about reasons of failure. */ const enum pubnub_res reason; /** diff --git a/core/samples/subscribe_event_engine_sample.c b/core/samples/subscribe_event_engine_sample.c index c6735cac..e7b747e4 100644 --- a/core/samples/subscribe_event_engine_sample.c +++ b/core/samples/subscribe_event_engine_sample.c @@ -1,51 +1,325 @@ /* -*- c-file-style:"stroustrup"; indent-tabs-mode: nil -*- */ -#include "pubnub_callback.h" +#include +#include +#include +#include + +#include "core/pubnub_subscribe_event_listener.h" #include "core/pubnub_subscribe_event_engine.h" #include "core/pubnub_helper.h" +#include "pubnub_callback.h" -#include -#include -#include -static void wait_seconds(double time_in_seconds) +// ---------------------------------------------- +// Function prototypes +// ---------------------------------------------- + +/** + * @brief Common message listener callback function. + * + * @param pb Pointer to the PubNub context which received real-time update. + * @param global Whether listener called from PubNub context or specific + * entity. + * @param message Received real-time update information. + */ +void subscribe_message_listener_( + const pubnub_t* pb, + bool global, + struct pubnub_v2_message message); + + +// ---------------------------------------------- +// Functions +// ---------------------------------------------- + +/** + * @brief Create string from the memory block. + * + * @param block Memory block with information required to create string. + * @return Pointer to the string or `NULL` in case if empty memory block has + * been provided. + */ +static char* string_from_mem_block(struct pubnub_char_mem_block block) { - time_t start = time(NULL); - double time_passed_in_seconds; + if (0 == block.size) { return NULL; } + + char* string = malloc(block.size + 1); + memcpy(string, block.ptr, block.size); + string[block.size] = '\0'; + + return string; +} + +/** + * @brief Translate PubNub-defined real-time update type (enum field) to + * human-readable format. + * + * @param type PubNub-defined real-time update type. + * @return Human-readable real-time update type. + */ +static char* message_type_2_string(const enum pubnub_message_type type) +{ + switch (type) { + case pbsbSignal: + return "signal"; + case pbsbPublished: + return "message / presence update"; + case pbsbAction: + return "action"; + case pbsbObjects: + return "app context"; + case pbsbFiles: + return "file"; + } + + return "unknown update"; +} + +/** + * @brief Temporarily pause app execution. + * + * @param time_in_seconds How long further execution should be postponed. + */ +static void wait_seconds(const double time_in_seconds) +{ + const time_t start = time(NULL); + double time_passed_in_seconds; do { time_passed_in_seconds = difftime(time(NULL), start); usleep(10); } while (time_passed_in_seconds < time_in_seconds); } +/** + * @brief Subscription status change listener callback. + * + * @param pb Pointer to the PubNub context for which subscription + * status has been changed. + * @param status Current subscription status. + * @param status_data Information from subscriber. + */ +void subscribe_status_change_listener( + const pubnub_t* pb, + const pubnub_subscription_status status, + const pubnub_subscription_status_data_t status_data) +{ + switch (status) { + case PNSS_SUBSCRIPTION_STATUS_CONNECTED: + printf("PubNub client connected to:\n"); + break; + case PNSS_SUBSCRIPTION_STATUS_CONNECTION_ERROR: + printf("PubNub client connection error: %s\n", + pubnub_res_2_string(status_data.reason)); + break; + case PNSS_SUBSCRIPTION_STATUS_DISCONNECTED_UNEXPECTEDLY: + printf("PubNub client disconnected unexpectedly with error: %s\n", + pubnub_res_2_string(status_data.reason)); + break; + case PNSS_SUBSCRIPTION_STATUS_DISCONNECTED: + printf("PubNub client disconnected:\n"); + break; + case PNSS_SUBSCRIPTION_STATUS_SUBSCRIPTION_CHANGED: + printf("PubNub client changed subscription:\n"); + break; + } + + if (NULL != status_data.channels) + printf("\t- channels: %s\n", status_data.channels); + if (NULL != status_data.channel_groups) + printf("\t- channel groups: %s\n", status_data.channel_groups); +} + +/** + * @brief Global message listener callback function. + * + * @param pb Pointer to the PubNub context which received real-time update. + * @param message Received real-time update information. + */ +void global_message_listener( + const pubnub_t* pb, + const struct pubnub_v2_message message) +{ + subscribe_message_listener_(pb, true, message); +} + +/** + * @brief Subscription / subscription set message listener callback function. + * + * @param pb Pointer to the PubNub context which received real-time update. + * @param message Received real-time update information. + */ +void subscribe_message_listener( + const pubnub_t* pb, + const struct pubnub_v2_message message) +{ + subscribe_message_listener_(pb, false, message); +} + +void subscribe_message_listener_( + const pubnub_t* pb, + const bool global, + const struct pubnub_v2_message message) +{ + char* uuid = string_from_mem_block(message.publisher); + char* ch = string_from_mem_block(message.channel); + char* msg = string_from_mem_block(message.payload); + char* tt = string_from_mem_block(message.tt); + char* type = message_type_2_string(message.message_type); + char client[100]; + + if (global) { + snprintf(client, sizeof(client), "PubNub (%p) received", pb); + } + else { snprintf(client, sizeof(client), "Received"); } + + printf("%s %s on '%s' at %s:\n", client, type, ch, tt); + if (NULL != uuid) { + printf("\t- publisher: %s\n", uuid); + free(uuid); + } + if (NULL != msg) { + printf("\t- message: %s\n", msg); + free(msg); + } + + free(type); + free(ch); + free(tt); +} + int main() { - const unsigned minutes_in_loop = 1; - const char* publish_key = getenv("PUBNUB_PUBLISH_KEY"); + /** Setup PubNub context. */ + const char* publish_key = getenv("PUBNUB_PUBLISH_KEY"); if (NULL == publish_key) { publish_key = "demo"; } const char* subscribe_key = getenv("PUBNUB_SUBSCRIBE_KEY"); if (NULL == subscribe_key) { subscribe_key = "demo"; } pubnub_t* pubnub = pubnub_alloc(); pubnub_init(pubnub, publish_key, subscribe_key); - pubnub_set_user_id(pubnub, "demo"); + pubnub_set_user_id(pubnub, "demo-user"); - pubnub_channel_t* channel = pubnub_channel_alloc(pubnub, "my_channel"); + /** Add subscription status change listener. */ + pubnub_subscribe_add_status_listener(pubnub, + subscribe_status_change_listener); + pubnub_subscribe_add_message_listener(pubnub, + PBSL_LISTENER_ON_MESSAGE, + global_message_listener); + + + /** + * Single subscription setup. + */ + pubnub_channel_t* channel = pubnub_channel_alloc( + pubnub, + "channel-test-history1"); pubnub_subscription_t* subscription = pubnub_subscription_alloc((pubnub_entity_t*)channel, NULL); + /** Subscription retained entity and it is safe to free */ + pubnub_entity_free((void**)&channel); - const enum pubnub_res rslt = pubnub_subscribe_with_subscription(subscription, NULL); - printf("~~~~> Subscribe result: %s\n", pubnub_res_2_string(rslt)); - // wait_seconds(minutes_in_loop * 60); - wait_seconds(minutes_in_loop * 5); - printf("~~~~> 9\n"); + /** Add messages listeners for subscription. */ + pubnub_subscribe_add_subscription_listener(subscription, + PBSL_LISTENER_ON_MESSAGE, + global_message_listener); + printf("Subscribing with subscription...\n"); + /** Subscribe using subscription. */ + enum pubnub_res rslt = pubnub_subscribe_with_subscription( + subscription, + NULL); + printf("Subscribe with subscription result: %s\n", + pubnub_res_2_string(rslt)); - pubnub_unsubscribe_with_subscription(&subscription); - printf("~~~~> 10\n"); + + /** + * Subscription set setup. + */ + pubnub_channel_t* channel_mx1 = pubnub_channel_alloc( + pubnub, + "channel-test-history1"); + pubnub_channel_t* channel_mx2 = pubnub_channel_alloc( + pubnub, + "channel-test-history2"); + pubnub_channel_group_t* group_mx3 = pubnub_channel_group_alloc( + pubnub, + "my-channel-group"); + pubnub_entity_t** entities = malloc(3 * sizeof(pubnub_entity_t*)); + entities[0] = (pubnub_entity_t*)channel_mx1; + entities[1] = (pubnub_entity_t*)channel_mx2; + entities[2] = (pubnub_entity_t*)group_mx3; + pubnub_subscription_set_t* set = + pubnub_subscription_set_alloc_with_entities(entities, 3, NULL); + /** Unredlying subscriptions in set retained entities and it is safe to free */ + pubnub_entity_free((void**)&channel_mx1); + pubnub_entity_free((void**)&channel_mx2); + pubnub_entity_free((void**)&group_mx3); + free(entities); + + /** Add messages listeners for subscription set. */ + pubnub_subscribe_add_subscription_set_listener( + set, + PBSL_LISTENER_ON_MESSAGE, + subscribe_message_listener); + printf("Subscribing with subscription set...\n"); + rslt = pubnub_subscribe_with_subscription_set( + set, + NULL); + printf("Subscribe with subscription set result: %s\n", + pubnub_res_2_string(rslt)); + + /** Wait for messages published to one of the channels (manual). */ + wait_seconds(60); + + + /** + * Remove subscription from subscription set (which is equal to unsubscribe + * on actively subscribed set). + */ + size_t count = 0; + pubnub_subscription_t** subs = pubnub_subscription_set_subscriptions( + set, + &count); + /** + * Removing first subscription in the list which will be `entities[0]`. + * + * Important: Actual unsubscribe won't happen because there exist separate + * subscription for the same entity for which subscription will be removed + * from set. + */ + printf("Removing subscription from subscription set...\n"); + rslt = pubnub_subscription_set_remove(set, &subs[0]); + printf("Subscription remove result: %s\n", + pubnub_res_2_string(rslt)); + free(subs); + + + /** + * Unsubscribe using subscription. + */ + printf("Unsubscribing from subscription...\n"); + rslt = pubnub_unsubscribe_with_subscription(&subscription); + printf("Unsubscribe with subscription result: %s\n", + pubnub_res_2_string(rslt)); pubnub_subscription_free(&subscription); - pubnub_entity_free((void**)&channel); - printf("~~~~> 11\n"); + + /** Sleep a bit before compltion of all stuff. */ + wait_seconds(3); + + + /** + * Unsubscribe from everything. + */ + printf("Unsubscribing from all...\n"); + rslt = pubnub_unsubscribe_all(pubnub); + printf("Unsubscribe from all result: %s\n", + pubnub_res_2_string(rslt)); + + /** Giving some time to complete unsubscribe. */ + wait_seconds(3); + + if (NULL != set) { pubnub_subscription_set_free(&set); } puts("Pubnub subscribe event engine demo is over."); diff --git a/freertos/pubnub_ntf_callback_freertos.c b/freertos/pubnub_ntf_callback_freertos.c index 7f3e1ca9..5d7d9e7b 100644 --- a/freertos/pubnub_ntf_callback_freertos.c +++ b/freertos/pubnub_ntf_callback_freertos.c @@ -318,7 +318,6 @@ void pbntf_trans_outcome(pubnub_t *pb) } #endif // #if PUBNUB_USE_RETRY_CONFIGURATION if (pb->cb != NULL) { - printf("~~~~ pbntf_trans_outcome 2\n"); pb->cb(pb, pb->trans, pb->core.last_result, pb->user_data); } } diff --git a/lib/pbarray.c b/lib/pbarray.c index 123c399a..cb34e974 100644 --- a/lib/pbarray.c +++ b/lib/pbarray.c @@ -243,6 +243,8 @@ pbarray_t* pbarray_copy(pbarray_t* array) size_t pbarray_count(pbarray_t* array) { + if (NULL == array) { return 0; } + pubnub_mutex_lock(array->mutw); const size_t count = array->count; pubnub_mutex_unlock(array->mutw); diff --git a/lib/sockets/pbpal_ntf_callback_poller_poll.c b/lib/sockets/pbpal_ntf_callback_poller_poll.c index f36b9d86..cf0f744d 100644 --- a/lib/sockets/pbpal_ntf_callback_poller_poll.c +++ b/lib/sockets/pbpal_ntf_callback_poller_poll.c @@ -106,7 +106,7 @@ void pbpal_ntf_callback_remove_socket(struct pbpal_poll_data* data, pubnub_t* pb } } PUBNUB_LOG_DEBUG( - "pbpal_ntf_callback_remove_socket(pb=%p) sockt=%d: Not Found!", pb, sockt); + "pbpal_ntf_callback_remove_socket(pb=%p) sockt=%d: Not Found!\n", pb, sockt); } @@ -127,7 +127,7 @@ void pbpal_ntf_callback_update_socket(struct pbpal_poll_data* data, pubnub_t* pb } } PUBNUB_LOG_WARNING( - "pbpal_ntf_callback_update_socket(pb=%p) sockt=%d: Not Found!", pb, sockt); + "pbpal_ntf_callback_update_socket(pb=%p) sockt=%d: Not Found!\n", pb, sockt); } @@ -140,7 +140,7 @@ int pbpal_ntf_watch_out_events(struct pbpal_poll_data* data, pubnub_t* pbp) return 0; } } - PUBNUB_LOG_WARNING("pbpal_ntf_watch_out_events(pbp=%p): Not Found!", pbp); + PUBNUB_LOG_WARNING("pbpal_ntf_watch_out_events(pbp=%p): Not Found!\n", pbp); return -1; } @@ -154,7 +154,7 @@ int pbpal_ntf_watch_in_events(struct pbpal_poll_data* data, pubnub_t* pbp) return 0; } } - PUBNUB_LOG_WARNING("pbpal_ntf_watch_in_events(pbp=%p): Not Found!", pbp); + PUBNUB_LOG_WARNING("pbpal_ntf_watch_in_events(pbp=%p): Not Found!\n", pbp); return -1; } diff --git a/microchip_harmony/pubnub_ntf_harmony.c b/microchip_harmony/pubnub_ntf_harmony.c index 547c5ab8..4623dd06 100644 --- a/microchip_harmony/pubnub_ntf_harmony.c +++ b/microchip_harmony/pubnub_ntf_harmony.c @@ -152,7 +152,6 @@ void pbntf_lost_socket(pubnub_t *pb, pb_socket_t socket) void pbntf_trans_outcome(pubnub_t *pb) { - printf("~~~~~~=====>>> 4\n"); PBNTF_TRANS_OUTCOME_COMMON(pb); #if PUBNUB_USE_RETRY_CONFIGURATION if (NULL != pb->core.retry_configuration && @@ -176,7 +175,6 @@ void pbntf_trans_outcome(pubnub_t *pb) } #endif // #if PUBNUB_USE_RETRY_CONFIGURATION if (pb->cb != NULL) { - printf("~~~~ pbntf_trans_outcome 3\n"); pb->cb(pb, pb->trans, pb->core.last_result, pb->user_data); } } diff --git a/posix/posix.mk b/posix/posix.mk index d3d33e1f..80cc008a 100644 --- a/posix/posix.mk +++ b/posix/posix.mk @@ -124,9 +124,10 @@ OBJFILES += monotonic_clock_get_time_posix.o LDLIBS=-lrt -lpthread endif -CFLAGS =-g -Wall -D PUBNUB_THREADSAFE -D PUBNUB_LOG_LEVEL=PUBNUB_LOG_LEVEL_DEBUG -D PUBNUB_ONLY_PUBSUB_API=$(ONLY_PUBSUB_API) -D PUBNUB_PROXY_API=$(USE_PROXY) -D PUBNUB_USE_RETRY_CONFIGURATION=$(USE_RETRY_CONFIGURATION) -D PUBNUB_USE_GZIP_COMPRESSION=$(USE_GZIP_COMPRESSION) -D PUBNUB_RECEIVE_GZIP_RESPONSE=$(RECEIVE_GZIP_RESPONSE) -D PUBNUB_USE_SUBSCRIBE_V2=$(USE_SUBSCRIBE_V2) -D PUBNUB_USE_SUBSCRIBE_EVENT_ENGINE=$(USE_SUBSCRIBE_EVENT_ENGINE) -D PUBNUB_USE_OBJECTS_API=$(USE_OBJECTS_API) -D PUBNUB_USE_ACTIONS_API=$(USE_ACTIONS_API) -D PUBNUB_USE_AUTO_HEARTBEAT=$(USE_AUTO_HEARTBEAT) -D PUBNUB_USE_GRANT_TOKEN_API=$(USE_GRANT_TOKEN) -D PUBNUB_USE_REVOKE_TOKEN_API=$(USE_REVOKE_TOKEN) -D PUBNUB_USE_FETCH_HISTORY=$(USE_FETCH_HISTORY) -fsanitize=thread +CFLAGS =-g -Wall -D PUBNUB_THREADSAFE -D PUBNUB_LOG_LEVEL=PUBNUB_LOG_LEVEL_WARNING -D PUBNUB_ONLY_PUBSUB_API=$(ONLY_PUBSUB_API) -D PUBNUB_PROXY_API=$(USE_PROXY) -D PUBNUB_USE_RETRY_CONFIGURATION=$(USE_RETRY_CONFIGURATION) -D PUBNUB_USE_GZIP_COMPRESSION=$(USE_GZIP_COMPRESSION) -D PUBNUB_RECEIVE_GZIP_RESPONSE=$(RECEIVE_GZIP_RESPONSE) -D PUBNUB_USE_SUBSCRIBE_V2=$(USE_SUBSCRIBE_V2) -D PUBNUB_USE_SUBSCRIBE_EVENT_ENGINE=$(USE_SUBSCRIBE_EVENT_ENGINE) -D PUBNUB_USE_OBJECTS_API=$(USE_OBJECTS_API) -D PUBNUB_USE_ACTIONS_API=$(USE_ACTIONS_API) -D PUBNUB_USE_AUTO_HEARTBEAT=$(USE_AUTO_HEARTBEAT) -D PUBNUB_USE_GRANT_TOKEN_API=$(USE_GRANT_TOKEN) -D PUBNUB_USE_REVOKE_TOKEN_API=$(USE_REVOKE_TOKEN) -D PUBNUB_USE_FETCH_HISTORY=$(USE_FETCH_HISTORY) # -g # enables debugging, remove to get a smaller executable -# -fsanitize-address # Use AddressSanitizer +# -fsanitize=address # Use AddressSanitizer +# -fsanitize=thread # Use ThreadSanitizer: INCLUDES=-I .. -I .