Skip to content

Commit

Permalink
Merge pull request #142 from cableramki/master
Browse files Browse the repository at this point in the history
Added downstream support to pass through other types of WRP messages.
  • Loading branch information
schmidtw authored Nov 17, 2017
2 parents b83392f + 0003bc4 commit afbb5c5
Show file tree
Hide file tree
Showing 3 changed files with 427 additions and 68 deletions.
174 changes: 106 additions & 68 deletions src/downstream.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,92 +69,130 @@ void listenerOnMessage(void * msg, size_t msgSize)
msgType = message->msg_type;
ParodusInfo("msgType received:%d\n", msgType);

if(message->msg_type == WRP_MSG_TYPE__AUTH)
switch( message->msg_type )
{
ParodusInfo("Authorization Status received with Status code :%d\n", message->u.auth.status);
}

if(message->msg_type == WRP_MSG_TYPE__REQ)
{
ParodusPrint("numOfClients registered is %d\n", get_numOfClients());
int ret = validate_partner_id(message, NULL);
if(ret < 0)
case WRP_MSG_TYPE__AUTH:
{
response = cJSON_CreateObject();
cJSON_AddNumberToObject(response, "statusCode", 430);
cJSON_AddStringToObject(response, "message", "Invalid partner_id");
ParodusInfo("Authorization Status received with Status code :%d\n", message->u.auth.status);
break;
}

if((message->u.req.dest !=NULL) && (ret >= 0))
case WRP_MSG_TYPE__EVENT:
case WRP_MSG_TYPE__REQ:
case WRP_MSG_TYPE__CREATE:
case WRP_MSG_TYPE__UPDATE:
case WRP_MSG_TYPE__RETREIVE:
case WRP_MSG_TYPE__DELETE:
{
destVal = message->u.req.dest;
strtok(destVal , "/");
parStrncpy(dest,strtok(NULL , "/"), sizeof(dest));
ParodusInfo("Received downstream dest as :%s and transaction_uuid :%s\n", dest, message->u.req.transaction_uuid);
temp = get_global_node();
//Checking for individual clients & Sending to each client

while (NULL != temp)
ParodusPrint("numOfClients registered is %d\n", get_numOfClients());
int ret = validate_partner_id(message, NULL);
if(ret < 0)
{
ParodusPrint("node is pointing to temp->service_name %s \n",temp->service_name);
// Sending message to registered clients
if( strcmp(dest, temp->service_name) == 0)
{
ParodusPrint("sending to nanomsg client %s\n", dest);
bytes = nn_send(temp->sock, recivedMsg, msgSize, 0);
ParodusInfo("sent downstream message to reg_client '%s'\n",temp->url);
ParodusPrint("downstream bytes sent:%d\n", bytes);
destFlag =1;
break;
}
ParodusPrint("checking the next item in the list\n");
temp= temp->next;
}
response = cJSON_CreateObject();
cJSON_AddNumberToObject(response, "statusCode", 430);
cJSON_AddStringToObject(response, "message", "Invalid partner_id");
}

//if any unknown dest received sending error response to server
if(destFlag ==0)
destVal = ((WRP_MSG_TYPE__EVENT == msgType) ? message->u.event.dest :
((WRP_MSG_TYPE__REQ == msgType) ? message->u.req.dest : message->u.crud.dest));
if( (destVal != NULL) && (ret >= 0) )
{
ParodusError("Unknown dest:%s\n", dest);
response = cJSON_CreateObject();
cJSON_AddNumberToObject(response, "statusCode", 531);
cJSON_AddStringToObject(response, "message", "Service Unavailable");
}
}
strtok(destVal , "/");
parStrncpy(dest,strtok(NULL , "/"), sizeof(dest));
ParodusInfo("Received downstream dest as :%s and transaction_uuid :%s\n", dest,
((WRP_MSG_TYPE__REQ == msgType) ? message->u.req.transaction_uuid :
((WRP_MSG_TYPE__EVENT == msgType) ? "NA" : message->u.crud.transaction_uuid)));
temp = get_global_node();
//Checking for individual clients & Sending to each client

if(destFlag == 0 || ret < 0)
{
resp_msg = (wrp_msg_t *)malloc(sizeof(wrp_msg_t));
memset(resp_msg, 0, sizeof(wrp_msg_t));
while (NULL != temp)
{
ParodusPrint("node is pointing to temp->service_name %s \n",temp->service_name);
// Sending message to registered clients
if( strcmp(dest, temp->service_name) == 0)
{
ParodusPrint("sending to nanomsg client %s\n", dest);
bytes = nn_send(temp->sock, recivedMsg, msgSize, 0);
ParodusInfo("sent downstream message to reg_client '%s'\n",temp->url);
ParodusPrint("downstream bytes sent:%d\n", bytes);
destFlag =1;
break;
}
ParodusPrint("checking the next item in the list\n");
temp= temp->next;
}

resp_msg ->msg_type = msgType;
resp_msg ->u.req.source = message->u.req.dest;
resp_msg ->u.req.dest = message->u.req.source;
resp_msg ->u.req.transaction_uuid=message->u.req.transaction_uuid;
//if any unknown dest received sending error response to server
if(destFlag ==0)
{
ParodusError("Unknown dest:%s\n", dest);
response = cJSON_CreateObject();
cJSON_AddNumberToObject(response, "statusCode", 531);
cJSON_AddStringToObject(response, "message", "Service Unavailable");
}
}

if(response != NULL)
if( (WRP_MSG_TYPE__EVENT != msgType) &&
((destFlag == 0) || (ret < 0)) )
{
str = cJSON_PrintUnformatted(response);
ParodusInfo("Payload Response: %s\n", str);
resp_msg = (wrp_msg_t *)malloc(sizeof(wrp_msg_t));
memset(resp_msg, 0, sizeof(wrp_msg_t));

resp_msg ->u.req.payload = (void *)str;
resp_msg ->u.req.payload_size = strlen(str);
resp_msg ->msg_type = msgType;
if( WRP_MSG_TYPE__REQ == msgType )
{
resp_msg ->u.req.source = message->u.req.dest;
resp_msg ->u.req.dest = message->u.req.source;
resp_msg ->u.req.transaction_uuid=message->u.req.transaction_uuid;
}
else
{
resp_msg ->u.crud.source = message->u.crud.dest;
resp_msg ->u.crud.dest = message->u.crud.source;
resp_msg ->u.crud.transaction_uuid = message->u.crud.transaction_uuid;
resp_msg ->u.crud.path = message->u.crud.path;
}

ParodusPrint("msgpack encode\n");
resp_size = wrp_struct_to( resp_msg, WRP_BYTES, &resp_bytes );
if(resp_size > 0)
if(response != NULL)
{
size = (size_t) resp_size;
sendUpstreamMsgToServer(&resp_bytes, size);
str = cJSON_PrintUnformatted(response);
ParodusInfo("Payload Response: %s\n", str);

if( WRP_MSG_TYPE__REQ == msgType )
{
resp_msg ->u.req.payload = (void *)str;
resp_msg ->u.req.payload_size = strlen(str);
}
else
{
resp_msg ->u.crud.payload = (void *)str;
resp_msg ->u.crud.payload_size = strlen(str);
}

ParodusPrint("msgpack encode\n");
resp_size = wrp_struct_to( resp_msg, WRP_BYTES, &resp_bytes );
if(resp_size > 0)
{
size = (size_t) resp_size;
sendUpstreamMsgToServer(&resp_bytes, size);
}
free(str);
cJSON_Delete(response);
free(resp_bytes);
resp_bytes = NULL;
}
free(str);
cJSON_Delete(response);
free(resp_bytes);
resp_bytes = NULL;
free(resp_msg);
ParodusPrint("free for downstream decoded msg\n");
wrp_free_struct(message);
}
free(resp_msg);
ParodusPrint("free for downstream decoded msg\n");
wrp_free_struct(message);
break;
}

case WRP_MSG_TYPE__SVC_REGISTRATION:
case WRP_MSG_TYPE__SVC_ALIVE:
case WRP_MSG_TYPE__UNKNOWN:
default:
break;
}
}
else
Expand Down
7 changes: 7 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,13 @@ add_test(NAME test_downstream COMMAND ${MEMORY_CHECK} ./test_downstream)
add_executable(test_downstream test_downstream.c ../src/downstream.c ../src/string_helpers.c)
target_link_libraries (test_downstream -lcmocka ${PARODUS_COMMON_LIBS} )

#-------------------------------------------------------------------------------
# test_downstream_more
#-------------------------------------------------------------------------------
add_test(NAME test_downstream_more COMMAND ${MEMORY_CHECK} ./test_downstream_more)
add_executable(test_downstream_more test_downstream_more.c ../src/downstream.c ../src/string_helpers.c)
target_link_libraries (test_downstream_more -lcmocka ${PARODUS_COMMON_LIBS} )

#-------------------------------------------------------------------------------
# test_thread_tasks
#-------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit afbb5c5

Please sign in to comment.