From 7f33456749bcfd34c541721746a5c7c9703ba6e1 Mon Sep 17 00:00:00 2001 From: Felipe Ortiz Date: Tue, 29 Mar 2016 12:27:17 -0300 Subject: [PATCH 01/13] Fix include in ConnectionInfo --- src/lib/rest/ConnectionInfo.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib/rest/ConnectionInfo.cpp b/src/lib/rest/ConnectionInfo.cpp index 7b0cd427f5..278b05546a 100644 --- a/src/lib/rest/ConnectionInfo.cpp +++ b/src/lib/rest/ConnectionInfo.cpp @@ -27,7 +27,7 @@ #include "common/string.h" #include "common/globals.h" -#include "rest/uriParamFormat.h" +#include "rest/uriParamNames.h" #include "rest/ConnectionInfo.h" From 71cd2061313efbcff0e8e0ec7f984f2d33ae7ea5 Mon Sep 17 00:00:00 2001 From: Felipe Ortiz Date: Tue, 29 Mar 2016 12:28:20 -0300 Subject: [PATCH 02/13] Add a little class to set constants for Orion WS --- src/lib/orion_websocket/CMakeLists.txt | 2 ++ src/lib/orion_websocket/constants.cpp | 35 +++++++++++++++++++++ src/lib/orion_websocket/constants.h | 43 ++++++++++++++++++++++++++ 3 files changed, 80 insertions(+) create mode 100644 src/lib/orion_websocket/constants.cpp create mode 100644 src/lib/orion_websocket/constants.h diff --git a/src/lib/orion_websocket/CMakeLists.txt b/src/lib/orion_websocket/CMakeLists.txt index ead7749a13..91b00a0743 100644 --- a/src/lib/orion_websocket/CMakeLists.txt +++ b/src/lib/orion_websocket/CMakeLists.txt @@ -24,12 +24,14 @@ SET (SOURCES ws.cpp connection_manager.cpp parser.cpp + constants.cpp ) SET (HEADERS ws.h connection_manager.h parser.h + constants.h ) diff --git a/src/lib/orion_websocket/constants.cpp b/src/lib/orion_websocket/constants.cpp new file mode 100644 index 0000000000..44b43e0c4f --- /dev/null +++ b/src/lib/orion_websocket/constants.cpp @@ -0,0 +1,35 @@ +/* +* +* Copyright 2016 Telefonica Investigacion y Desarrollo, S.A.U +* +* This file is part of Orion Context Broker. +* +* Orion Context Broker is free software: you can redistribute it and/or +* modify it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* Orion Context Broker is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +* General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with Orion Context Broker. If not, see http://www.gnu.org/licenses/. +* +* For those usages not covered by this license please contact with +* iot_support at tid dot es +* +* Author: Felipe Ortiz +*/ + + +#include "constants.h" + +const int WSConstants::Port = 9010; +const int WSConstants::Pooling = 50; +const size_t WSConstants::DataSize = 128; +// Ok, this is can be a macro, but I prefer +// to be consistent +const std::string WSConstants::ProtocolName = "orion-ws"; +const std::string WSConstants::Scheme = "ws://"; diff --git a/src/lib/orion_websocket/constants.h b/src/lib/orion_websocket/constants.h new file mode 100644 index 0000000000..1cd55fe904 --- /dev/null +++ b/src/lib/orion_websocket/constants.h @@ -0,0 +1,43 @@ +/* +* +* Copyright 2016 Telefonica Investigacion y Desarrollo, S.A.U +* +* This file is part of Orion Context Broker. +* +* Orion Context Broker is free software: you can redistribute it and/or +* modify it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* Orion Context Broker is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +* General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with Orion Context Broker. If not, see http://www.gnu.org/licenses/. +* +* For those usages not covered by this license please contact with +* iot_support at tid dot es +* +* Author: Felipe Ortiz +*/ + + +#ifndef WS_CONSTANTS_H +#define WS_CONSTANTS_H + +#include + + +class WSConstants +{ +public: + static const int Port; + static const int Pooling; + static const size_t DataSize; + static const std::string ProtocolName; + static const std::string Scheme; +}; + +#endif From b602536af465e0396150b75236a637718e7c9c43 Mon Sep 17 00:00:00 2001 From: Felipe Ortiz Date: Tue, 29 Mar 2016 12:29:09 -0300 Subject: [PATCH 03/13] Modify ws.cpp to use constants in constants.h --- src/lib/orion_websocket/ws.cpp | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/src/lib/orion_websocket/ws.cpp b/src/lib/orion_websocket/ws.cpp index fece6bca6d..3c558a0e45 100644 --- a/src/lib/orion_websocket/ws.cpp +++ b/src/lib/orion_websocket/ws.cpp @@ -24,6 +24,7 @@ */ #include "ws.h" +#include "constants.h" #include "connection_manager.h" #include "parser.h" @@ -141,19 +142,6 @@ static int wsCallback(lws * ws, return 0; } -static struct lws_protocols protocols[] = { - { - "orion-ws", - wsCallback, - sizeof(data), - 128, - 1, - NULL - }, - { - NULL, NULL, 0, 0, 0, 0 - } -}; pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; @@ -174,7 +162,7 @@ void *runWS(void *ptr) pthread_mutex_unlock(&mtx); pthread_mutex_lock(&mtx); - lws_service(ws->ctx, 50); + lws_service(ws->ctx, WSConstants::Pooling); pthread_mutex_unlock(&mtx); } return 0; @@ -182,8 +170,22 @@ void *runWS(void *ptr) orion_websocket *orion_websocket_new(RestService *serv) { + static struct lws_protocols protocols[] = { + { + WSConstants::ProtocolName.c_str(), + wsCallback, + sizeof(data), + WSConstants::DataSize, + 1, + NULL + }, + { + NULL, NULL, 0, 0, 0, 0 + } + }; + struct lws_context_creation_info info = { - 9010, + WSConstants::Port, NULL, protocols, lws_get_internal_extensions(), From 7422ecd35961098cfd181314f8a122c212c9292b Mon Sep 17 00:00:00 2001 From: Felipe Ortiz Date: Tue, 5 Apr 2016 11:19:37 -0300 Subject: [PATCH 04/13] Remove connection_manager Removed because the cost of reuse a ConnectionInfo instance is not a real advantage. --- src/lib/orion_websocket/CMakeLists.txt | 4 +- .../orion_websocket/connection_manager.cpp | 100 ------------------ src/lib/orion_websocket/connection_manager.h | 36 ------- src/lib/orion_websocket/ws.cpp | 24 ++--- src/lib/rest/ConnectionInfo.cpp | 26 ----- src/lib/rest/ConnectionInfo.h | 1 - 6 files changed, 10 insertions(+), 181 deletions(-) delete mode 100644 src/lib/orion_websocket/connection_manager.cpp delete mode 100644 src/lib/orion_websocket/connection_manager.h diff --git a/src/lib/orion_websocket/CMakeLists.txt b/src/lib/orion_websocket/CMakeLists.txt index 91b00a0743..56371482cc 100644 --- a/src/lib/orion_websocket/CMakeLists.txt +++ b/src/lib/orion_websocket/CMakeLists.txt @@ -22,16 +22,16 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.6) SET (SOURCES ws.cpp - connection_manager.cpp parser.cpp constants.cpp + wsNotify.cpp ) SET (HEADERS ws.h - connection_manager.h parser.h constants.h + wsNotify.h ) diff --git a/src/lib/orion_websocket/connection_manager.cpp b/src/lib/orion_websocket/connection_manager.cpp deleted file mode 100644 index 55240b2d91..0000000000 --- a/src/lib/orion_websocket/connection_manager.cpp +++ /dev/null @@ -1,100 +0,0 @@ -/* -* -* Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U -* -* This file is part of Orion Context Broker. -* -* Orion Context Broker is free software: you can redistribute it and/or -* modify it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* Orion Context Broker is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero -* General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with Orion Context Broker. If not, see http://www.gnu.org/licenses/. -* -* For those usages not covered by this license please contact with -* iot_support at tid dot es -* -* Author: Felipe Ortiz -*/ - -#include "parser.h" -#include "connection_manager.h" -#include "rest/ConnectionInfo.h" -#include "common/Format.h" -#include "rest/HttpHeaders.h" - -#include -#include -#include - -// Max possible code (int size) -#define MAX_CID 32768 - -static pthread_mutex_t mtx_cid = PTHREAD_MUTEX_INITIALIZER; -static pthread_mutex_t mtx_conn = PTHREAD_MUTEX_INITIALIZER; - -static int cidVec[MAX_CID]; -static std::map connections; - -int connection_manager_get_cid() -{ - pthread_mutex_lock(&mtx_cid); - for (int i = 0; i < MAX_CID; ++i) - { - if (cidVec[i] == 1) - continue; - cidVec[i] = 1; - pthread_mutex_unlock(&mtx_cid); - return i; - } - pthread_mutex_unlock(&mtx_cid); - return -1; -} - -ConnectionInfo *connection_manager_get(int cid, const char *message) -{ - std::string url; - std::string verb; - std::string payload; - - pthread_mutex_lock(&mtx_conn); - std::map::iterator it; - it = connections.find(cid); - if (it != connections.end()) - { - pthread_mutex_unlock(&mtx_conn); - - it->second->reset(); - ws_parser_parse(message, it->second, url, verb, payload, it->second->httpHeaders); - it->second->modify(url, verb, payload); - return it->second; - } - - pthread_mutex_unlock(&mtx_conn); - - ConnectionInfo *ci = new ConnectionInfo("v2", JSON, true); - ws_parser_parse(message, ci, url, verb, payload, ci->httpHeaders); - ci->modify(url, verb, payload); - - pthread_mutex_lock(&mtx_conn); - connections[cid] = ci; - pthread_mutex_unlock(&mtx_conn); - return ci; -} - -void connection_manager_remove(int cid) -{ - pthread_mutex_lock(&mtx_conn); - delete connections[cid]; - pthread_mutex_unlock(&mtx_conn); - - pthread_mutex_lock(&mtx_cid); - cidVec[cid] = 0; - pthread_mutex_unlock(&mtx_cid); -} diff --git a/src/lib/orion_websocket/connection_manager.h b/src/lib/orion_websocket/connection_manager.h deleted file mode 100644 index 76ef08552f..0000000000 --- a/src/lib/orion_websocket/connection_manager.h +++ /dev/null @@ -1,36 +0,0 @@ -/* -* -* Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U -* -* This file is part of Orion Context Broker. -* -* Orion Context Broker is free software: you can redistribute it and/or -* modify it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* Orion Context Broker is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero -* General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with Orion Context Broker. If not, see http://www.gnu.org/licenses/. -* -* For those usages not covered by this license please contact with -* iot_support at tid dot es -* -* Author: Felipe Ortiz -*/ - - -#ifndef WS_CONNECTION_MANAGER_H -#define WS_CONNECTION_MANAGER_H - -class ConnectionInfo; - -int connection_manager_get_cid(); -ConnectionInfo *connection_manager_get(int cid, const char *msg); -void connection_manager_remove(int cid); - -#endif diff --git a/src/lib/orion_websocket/ws.cpp b/src/lib/orion_websocket/ws.cpp index 3c558a0e45..6d9028f414 100644 --- a/src/lib/orion_websocket/ws.cpp +++ b/src/lib/orion_websocket/ws.cpp @@ -25,7 +25,6 @@ #include "ws.h" #include "constants.h" -#include "connection_manager.h" #include "parser.h" #include "rest/RestService.h" @@ -54,7 +53,6 @@ struct _orion_websocket // Private struct for persistent data typedef struct { - unsigned cid; char *message; char *request; int index; @@ -73,24 +71,12 @@ static int wsCallback(lws * ws, { case LWS_CALLBACK_ESTABLISHED: { - int cid = connection_manager_get_cid(); - - if (cid == -1) - { - LM_E(("No more cid available!")); - break; - } - - dat->cid = cid; dat->request = NULL; dat->index = 0; - LM_I(("Connecction id: %d", dat->cid)); break; } case LWS_CALLBACK_CLOSED: { - connection_manager_remove(dat->cid); - dat->cid = -1; break; } @@ -102,7 +88,6 @@ static int wsCallback(lws * ws, unsigned char *buff = (unsigned char *) malloc(size); unsigned char *p = &buff[LWS_SEND_BUFFER_PRE_PADDING]; - sprintf((char*)p, "%s", dat->message); lws_write(ws, p, strlen(dat->message), LWS_WRITE_TEXT); free(buff); @@ -125,10 +110,17 @@ static int wsCallback(lws * ws, { dat->request[dat->index] = 0; - ConnectionInfo *ci = connection_manager_get(dat->cid, dat->request); + ConnectionInfo *ci = new ConnectionInfo("v2", JSON, true); + std::string url; + std::string verb; + std::string payload; + ws_parser_parse(dat->request, ci, url, verb, payload, ci->httpHeaders); + ci->modify(url, verb, payload); const char *restMsg = restService(ci, orionServices).c_str(); dat->message = strdup(ws_parser_message(restMsg, ci->httpHeaders, (int)ci->httpStatusCode)); + + delete ci; free(dat->request); dat->request = NULL; dat->index = 0; diff --git a/src/lib/rest/ConnectionInfo.cpp b/src/lib/rest/ConnectionInfo.cpp index 278b05546a..93c1f23470 100644 --- a/src/lib/rest/ConnectionInfo.cpp +++ b/src/lib/rest/ConnectionInfo.cpp @@ -210,32 +210,6 @@ ConnectionInfo::~ConnectionInfo() servicePathV.clear(); } - -/* **************************************************************************** -* -* reset - Reset some class memeber to use in a websocket connection -*/ -void ConnectionInfo::reset() -{ - version = "HTTP/1.1"; - servicePath = "/"; - - httpHeaders.gotHeaders = true; - httpHeaders.userAgent = "orionWS/0.1"; - httpHeaders.accept = "*/*"; - httpHeaders.contentLength = 0; - httpHeaders.servicePath = "/"; - httpHeaders.tenant.clear(); - - uriParam[URI_PARAM_PAGINATION_DETAILS] = "off"; - uriParam[URI_PARAM_PAGINATION_LIMIT] = "20"; - uriParam[URI_PARAM_PAGINATION_OFFSET] = "0"; - - tenant.clear(); - tenantFromHttpHeader.clear(); -} - - /* **************************************************************************** * * modify - Modify a ConnectionInfo using the given parameters diff --git a/src/lib/rest/ConnectionInfo.h b/src/lib/rest/ConnectionInfo.h index 6ae9d716ad..058e156091 100644 --- a/src/lib/rest/ConnectionInfo.h +++ b/src/lib/rest/ConnectionInfo.h @@ -57,7 +57,6 @@ class ConnectionInfo ConnectionInfo(const std::string &_url, const std::string &_method, const std::string &_version, MHD_Connection* _connection = NULL); ~ConnectionInfo(); - void reset(); void modify(const std::string &_url, const std::string &_verb, const std::string &_payload); MHD_Connection* connection; From d26e337a2c3c13b35035c9c210011af54a4ac3a3 Mon Sep 17 00:00:00 2001 From: Felipe Ortiz Date: Tue, 5 Apr 2016 11:27:16 -0300 Subject: [PATCH 05/13] Add more headers in the response --- src/lib/orion_websocket/parser.cpp | 38 ++++++++++++++++++++++-------- src/lib/orion_websocket/parser.h | 2 ++ src/lib/orion_websocket/ws.cpp | 3 ++- 3 files changed, 32 insertions(+), 11 deletions(-) diff --git a/src/lib/orion_websocket/parser.cpp b/src/lib/orion_websocket/parser.cpp index 6560e4a2a1..363426fcbc 100644 --- a/src/lib/orion_websocket/parser.cpp +++ b/src/lib/orion_websocket/parser.cpp @@ -103,10 +103,31 @@ void ws_parser_parse } } +static void addHeaders +( + const std::string& name, + const std::string& value, + rapidjson::Writer& writer +) +{ + writer.Key(name.c_str()); + + if (value.empty()) + { + writer.String(""); + } + else + { + writer.String(value.c_str()); + } +} + const char *ws_parser_message ( const std::string& msg, const HttpHeaders& head, + const std::vector headName, + const std::vector headValue, int statusCode ) { @@ -119,18 +140,15 @@ const char *ws_parser_message std::map::const_iterator it = head.headerMap.begin(); while (it != head.headerMap.end()) { - writer.Key(it->first.c_str()); - - if (it->second->empty()) - { - writer.String(""); - } - else - { - writer.String(it->second->c_str()); - } + addHeaders(it->first, *(it->second), writer); ++it; } + + for (unsigned i = 0; i < headName.size(); ++i) + { + addHeaders(headName[i], headValue[i], writer); + } + writer.EndObject(); if (statusCode < 100 || statusCode > 599) // Code is not a valid HTTP status code diff --git a/src/lib/orion_websocket/parser.h b/src/lib/orion_websocket/parser.h index 3387385c98..2727f5053f 100644 --- a/src/lib/orion_websocket/parser.h +++ b/src/lib/orion_websocket/parser.h @@ -46,6 +46,8 @@ const char *ws_parser_message ( const std::string& msg, const HttpHeaders& head, + const std::vector headName, + const std::vector headValue, int statusCode ); diff --git a/src/lib/orion_websocket/ws.cpp b/src/lib/orion_websocket/ws.cpp index 6d9028f414..b9db2ad9b4 100644 --- a/src/lib/orion_websocket/ws.cpp +++ b/src/lib/orion_websocket/ws.cpp @@ -118,7 +118,8 @@ static int wsCallback(lws * ws, ci->modify(url, verb, payload); const char *restMsg = restService(ci, orionServices).c_str(); - dat->message = strdup(ws_parser_message(restMsg, ci->httpHeaders, (int)ci->httpStatusCode)); + dat->message = strdup(ws_parser_message(restMsg, ci->httpHeaders, ci->httpHeader, ci->httpHeaderValue, (int)ci->httpStatusCode)); + delete ci; free(dat->request); From 5932dc20c62c932ee2052df6d07285ca5107ebf2 Mon Sep 17 00:00:00 2001 From: Felipe Ortiz Date: Tue, 5 Apr 2016 11:32:57 -0300 Subject: [PATCH 06/13] Add support for notifications through websockets --- src/lib/common/string.cpp | 4 +++ src/lib/ngsiNotify/Notifier.cpp | 9 ++--- src/lib/ngsiNotify/senderThread.cpp | 16 ++------- src/lib/orion_websocket/parser.cpp | 31 +++++++++++++++++ src/lib/orion_websocket/parser.h | 8 +++++ src/lib/orion_websocket/ws.cpp | 40 ++++++++++++++++++++++ src/lib/orion_websocket/wsNotify.cpp | 51 ++++++++++++++++++++++++++++ src/lib/orion_websocket/wsNotify.h | 19 +++++++++++ 8 files changed, 159 insertions(+), 19 deletions(-) create mode 100644 src/lib/orion_websocket/wsNotify.cpp create mode 100644 src/lib/orion_websocket/wsNotify.h diff --git a/src/lib/common/string.cpp b/src/lib/common/string.cpp index 15a64c62a8..49173fdcf3 100644 --- a/src/lib/common/string.cpp +++ b/src/lib/common/string.cpp @@ -242,6 +242,10 @@ bool parseUrl(const std::string& url, std::string& host, int& port, std::string& return false; } + if (url == "ws://") + { + return true; + } /* http://some.host.com/my/path * ^^ ^ ^ diff --git a/src/lib/ngsiNotify/Notifier.cpp b/src/lib/ngsiNotify/Notifier.cpp index b293da6c28..5791748fd8 100644 --- a/src/lib/ngsiNotify/Notifier.cpp +++ b/src/lib/ngsiNotify/Notifier.cpp @@ -35,10 +35,7 @@ #include "ngsiNotify/senderThread.h" #include "ngsiNotify/Notifier.h" - -// FIME P11 #1669: move this to WS library. @fortizc please take care of this -#define WS_SCHEME "ws://" -#define WS_SCHEME_LENGTH 5 +#include "orion_websocket/constants.h" /* **************************************************************************** @@ -99,10 +96,10 @@ void Notifier::sendNotifyContextRequest(NotifyContextRequest* ncr, const std::st std::string uriPath; std::string protocol; - if ((url.length() == WS_SCHEME_LENGTH) && (url.find(WS_SCHEME) == 0)) + if ((url.length() == WSConstants::Scheme.size()) && (url.find(WSConstants::Scheme) == 0)) { // In this case host, port and uriPath are not needed, as the WS library has all the connection information related with the WS - protocol = WS_SCHEME; + protocol = WSConstants::Scheme; } else { diff --git a/src/lib/ngsiNotify/senderThread.cpp b/src/lib/ngsiNotify/senderThread.cpp index 35dd5a2b55..3f143f34ca 100644 --- a/src/lib/ngsiNotify/senderThread.cpp +++ b/src/lib/ngsiNotify/senderThread.cpp @@ -29,18 +29,8 @@ #include "rest/httpRequestSend.h" #include "ngsiNotify/senderThread.h" - -// FIXME P11 #1669: remove this stub when the actual sendNotifyContextRequestWs() gets developed a the end. @fortizc please take care of this -// in the ws library -int sendNotifyContextRequestWs(const std::string& subId, const std::map& headers, const std::string& data) -{ - return 0; -} - - -// FIME P11 #1669: move this to WS library. @fortizc please take care of this -#define WS_SCHEME "ws://" -#define WS_SCHEME_LENGTH 5 +#include "orion_websocket/constants.h" +#include "orion_websocket/wsNotify.h" /* **************************************************************************** @@ -73,7 +63,7 @@ void* startSenderThread(void* p) std::string out; int r; - if (params->protocol == WS_SCHEME) + if (params->protocol == WSConstants::Scheme) { std::map headers; headers.insert(std::make_pair("Fiware-Service", params->tenant)); diff --git a/src/lib/orion_websocket/parser.cpp b/src/lib/orion_websocket/parser.cpp index 363426fcbc..02291c9716 100644 --- a/src/lib/orion_websocket/parser.cpp +++ b/src/lib/orion_websocket/parser.cpp @@ -164,3 +164,34 @@ const char *ws_parser_message return json; } + +const char *ws_parser_notify +( + const std::string& subId, + const std::map& headers, + const std::string &data +) +{ + const char* tmpl = "{\"subscriptionId\": \"%s\", \"headers\": %s, \"data\": %s}"; + + + rapidjson::StringBuffer buff; + rapidjson::Writer writer(buff); + writer.StartObject(); + std::map::const_iterator it = headers.begin(); + while (it != headers.end()) + { + writer.Key(it->first.c_str()); + writer.String(it->second.c_str()); + ++it; + } + writer.EndObject(); + + const char* strHeaders = buff.GetString(); + + size_t size = subId.size() + data.size() + strlen(strHeaders) + strlen(tmpl) - 6 + 1; + char *json = (char *) malloc(size); + sprintf(json, tmpl, subId.c_str(), strHeaders, data.c_str()); + + return json; +} diff --git a/src/lib/orion_websocket/parser.h b/src/lib/orion_websocket/parser.h index 2727f5053f..47c9a4c0e8 100644 --- a/src/lib/orion_websocket/parser.h +++ b/src/lib/orion_websocket/parser.h @@ -27,6 +27,7 @@ #define WS_PARSER_H #include +#include #include class HttpHeaders; @@ -51,4 +52,11 @@ const char *ws_parser_message int statusCode ); +const char *ws_parser_notify +( + const std::string& subId, + const std::map& headers, + const std::string& data +); + #endif diff --git a/src/lib/orion_websocket/ws.cpp b/src/lib/orion_websocket/ws.cpp index b9db2ad9b4..5af5f7f846 100644 --- a/src/lib/orion_websocket/ws.cpp +++ b/src/lib/orion_websocket/ws.cpp @@ -26,6 +26,7 @@ #include "ws.h" #include "constants.h" #include "parser.h" +#include "wsNotify.h" #include "rest/RestService.h" @@ -36,6 +37,7 @@ #include #include #include +#include #include @@ -53,11 +55,41 @@ struct _orion_websocket // Private struct for persistent data typedef struct { + std::vector notify; char *message; char *request; int index; }data; +static bool isSubscription +( + const std::vector& headName, + const std::vector& headValue, + std::string& subId +) +{ + for (unsigned i = 0; i < headName.size(); ++i) + { + if (headName[i] != "Location") + continue; + + size_t pos = headValue[i].find("subscriptions"); + if (pos == std::string::npos) + return false; + + pos = headValue[i].find_last_of('/'); + if (pos == std::string::npos) + return false; + + char buff[25]; + headValue[i].copy(buff, 24, pos + 1); + buff[24] = 0; + subId = std::string(buff); + return true; + } + return false; +} + static int wsCallback(lws * ws, enum lws_callback_reasons reason, void *user, @@ -71,12 +103,14 @@ static int wsCallback(lws * ws, { case LWS_CALLBACK_ESTABLISHED: { + dat->notify.clear(); dat->request = NULL; dat->index = 0; break; } case LWS_CALLBACK_CLOSED: { + removeSenders(dat->notify); break; } @@ -120,6 +154,12 @@ static int wsCallback(lws * ws, const char *restMsg = restService(ci, orionServices).c_str(); dat->message = strdup(ws_parser_message(restMsg, ci->httpHeaders, ci->httpHeader, ci->httpHeaderValue, (int)ci->httpStatusCode)); + std::string subId; + if (isSubscription(ci->httpHeader, ci->httpHeaderValue, subId)) + { + addSender(subId, ws); + dat->notify.push_back(subId); + } delete ci; free(dat->request); diff --git a/src/lib/orion_websocket/wsNotify.cpp b/src/lib/orion_websocket/wsNotify.cpp new file mode 100644 index 0000000000..f16da2b830 --- /dev/null +++ b/src/lib/orion_websocket/wsNotify.cpp @@ -0,0 +1,51 @@ +#include "wsNotify.h" +#include "constants.h" +#include "parser.h" + +#include +#include + + +std::mapclient; + +void addSender(const std::string &subId, lws *ws) +{ + client[subId] = ws; +} + +void removeSenders(const std::vector& subIds) +{ + for (unsigned i = 0; i < subIds.size(); ++i) + { + client.erase(subIds[i]); + } +} + +int sendNotifyContextRequestWs +( + const std::string& subId, + const std::map& headers, + const std::string& data +) +{ + const char *msg = ws_parser_notify(subId, headers, data); + + std::map::iterator it = client.find(subId); + + if (it == client.end()) + return 1; + + size_t size = strlen(msg) + + LWS_SEND_BUFFER_PRE_PADDING + + LWS_SEND_BUFFER_POST_PADDING; + + unsigned char *buff = (unsigned char *) malloc(size); + unsigned char *p = &buff[LWS_SEND_BUFFER_PRE_PADDING]; + sprintf((char *)p, "%s", msg); + int result = lws_write(it->second, p, strlen(msg), LWS_WRITE_TEXT); + + free((char *)msg); + free(buff); + + return result < 0; +} diff --git a/src/lib/orion_websocket/wsNotify.h b/src/lib/orion_websocket/wsNotify.h new file mode 100644 index 0000000000..f107ace453 --- /dev/null +++ b/src/lib/orion_websocket/wsNotify.h @@ -0,0 +1,19 @@ +#ifndef WS_NOTIFY_H +#define WS_NOTIFY_H + +#include +#include +#include + +struct lws; + +void addSender(const std::string &subId, lws *ws); +void removeSenders(const std::vector& subIds); +int sendNotifyContextRequestWs +( + const std::string& subId, + const std::map& headers, + const std::string& data +); + +#endif From ea76cbb1ccda285b22ae9c2a79f1e60fb121eab6 Mon Sep 17 00:00:00 2001 From: Felipe Ortiz Date: Tue, 5 Apr 2016 16:57:39 -0300 Subject: [PATCH 07/13] Use constants.h --- src/lib/common/string.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/lib/common/string.cpp b/src/lib/common/string.cpp index 49173fdcf3..c2eab66b8b 100644 --- a/src/lib/common/string.cpp +++ b/src/lib/common/string.cpp @@ -35,6 +35,7 @@ #include "common/wsStrip.h" #include "alarmMgr/alarmMgr.h" +#include "orion_websocket/constants.h" /* **************************************************************************** @@ -242,7 +243,7 @@ bool parseUrl(const std::string& url, std::string& host, int& port, std::string& return false; } - if (url == "ws://") + if (url == WSConstants::Scheme) { return true; } From 629fd7d60679374d83b77d6ffc78fc801ba9ed02 Mon Sep 17 00:00:00 2001 From: Felipe Ortiz Date: Tue, 5 Apr 2016 22:37:53 -0300 Subject: [PATCH 08/13] Remove subscription when connection is closed Remove subscription from DB when websocket connection is closed --- src/lib/orion_websocket/ws.cpp | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/lib/orion_websocket/ws.cpp b/src/lib/orion_websocket/ws.cpp index 5af5f7f846..b1782a824f 100644 --- a/src/lib/orion_websocket/ws.cpp +++ b/src/lib/orion_websocket/ws.cpp @@ -34,6 +34,10 @@ #include "logMsg/traceLevels.h" #include "rest/RestService.h" +#include "ngsi10/UnsubscribeContextRequest.h" +#include "ngsi10/UnsubscribeContextResponse.h" +#include "mongoBackend/mongoUnsubscribeContext.h" + #include #include #include @@ -55,7 +59,10 @@ struct _orion_websocket // Private struct for persistent data typedef struct { + // SubId std::vector notify; + // Teanat for each subId + std::vector tenant; char *message; char *request; int index; @@ -104,6 +111,7 @@ static int wsCallback(lws * ws, case LWS_CALLBACK_ESTABLISHED: { dat->notify.clear(); + dat->tenant.clear(); dat->request = NULL; dat->index = 0; break; @@ -111,6 +119,16 @@ static int wsCallback(lws * ws, case LWS_CALLBACK_CLOSED: { removeSenders(dat->notify); + + // Remove all subscriptions from DB + for (unsigned i = 0; i < dat->notify.size(); ++i) + { + UnsubscribeContextRequest req; + UnsubscribeContextResponse rsp; + req.subscriptionId.set(dat->notify[i]); + mongoUnsubscribeContext(&req, &rsp, dat->tenant[i]); + } + break; } @@ -159,6 +177,7 @@ static int wsCallback(lws * ws, { addSender(subId, ws); dat->notify.push_back(subId); + dat->tenant.push_back(ci->tenant); } delete ci; From 05fb052c8d58fe1f0e656d5fe3108b5b888c8669 Mon Sep 17 00:00:00 2001 From: Felipe Ortiz Date: Wed, 6 Apr 2016 11:18:19 -0300 Subject: [PATCH 09/13] Add loop to ensure lws_write write all data --- src/lib/orion_websocket/wsNotify.cpp | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/lib/orion_websocket/wsNotify.cpp b/src/lib/orion_websocket/wsNotify.cpp index f16da2b830..dfc8e07f69 100644 --- a/src/lib/orion_websocket/wsNotify.cpp +++ b/src/lib/orion_websocket/wsNotify.cpp @@ -35,17 +35,26 @@ int sendNotifyContextRequestWs if (it == client.end()) return 1; - size_t size = strlen(msg) + + int msg_size = strlen(msg); + size_t size = msg_size + LWS_SEND_BUFFER_PRE_PADDING + LWS_SEND_BUFFER_POST_PADDING; unsigned char *buff = (unsigned char *) malloc(size); unsigned char *p = &buff[LWS_SEND_BUFFER_PRE_PADDING]; sprintf((char *)p, "%s", msg); - int result = lws_write(it->second, p, strlen(msg), LWS_WRITE_TEXT); + + int written = 0; + + while (written < msg_size) + { + written += lws_write(it->second, (p + written), strlen(msg), LWS_WRITE_TEXT); + if (written == -1) + return 1; + } free((char *)msg); free(buff); - return result < 0; + return 0; } From 661796d5f3dc2c1c04532ddaba0316dc2d2acf58 Mon Sep 17 00:00:00 2001 From: Felipe Ortiz Date: Wed, 6 Apr 2016 14:25:02 -0300 Subject: [PATCH 10/13] Add constants for subId length --- src/lib/common/limits.h | 7 +++++++ src/lib/orion_websocket/ws.cpp | 8 +++++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/lib/common/limits.h b/src/lib/common/limits.h index 95a58ae868..d0d6fd9659 100644 --- a/src/lib/common/limits.h +++ b/src/lib/common/limits.h @@ -115,6 +115,13 @@ #define HTTP_HEADER_HOST_MAX_LENGTH 256 +/* **************************************************************************** +* +* Subscription id maximun length - +*/ +#define MAX_LENGTH_SUBID 24 + + /* **************************************************************************** * * Default timeout - 5000 milliseconds diff --git a/src/lib/orion_websocket/ws.cpp b/src/lib/orion_websocket/ws.cpp index b1782a824f..753e2a1db1 100644 --- a/src/lib/orion_websocket/ws.cpp +++ b/src/lib/orion_websocket/ws.cpp @@ -34,6 +34,8 @@ #include "logMsg/traceLevels.h" #include "rest/RestService.h" +#include "common/limits.h" + #include "ngsi10/UnsubscribeContextRequest.h" #include "ngsi10/UnsubscribeContextResponse.h" #include "mongoBackend/mongoUnsubscribeContext.h" @@ -88,9 +90,9 @@ static bool isSubscription if (pos == std::string::npos) return false; - char buff[25]; - headValue[i].copy(buff, 24, pos + 1); - buff[24] = 0; + char buff[MAX_LENGTH_SUBID + 1]; + headValue[i].copy(buff, MAX_LENGTH_SUBID, pos + 1); + buff[MAX_LENGTH_SUBID] = 0; subId = std::string(buff); return true; } From ec0276246dcbb7fb3bbcc7b535acba72fb7e68be Mon Sep 17 00:00:00 2001 From: Felipe Ortiz Date: Wed, 6 Apr 2016 14:37:39 -0300 Subject: [PATCH 11/13] Add comments for sendNotifyContextRequestWs --- src/lib/orion_websocket/wsNotify.cpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/lib/orion_websocket/wsNotify.cpp b/src/lib/orion_websocket/wsNotify.cpp index dfc8e07f69..f83d56f59a 100644 --- a/src/lib/orion_websocket/wsNotify.cpp +++ b/src/lib/orion_websocket/wsNotify.cpp @@ -21,6 +21,16 @@ void removeSenders(const std::vector& subIds) } } + +/* **************************************************************************** +* +* sendNotifyContextRequestWs - +* +* Send notification throught websocket. +* On success return 0, on error return 1 +*/ + + int sendNotifyContextRequestWs ( const std::string& subId, From c824a6ced8e63ee53dbd7ec2ae20bc5afab7b4a0 Mon Sep 17 00:00:00 2001 From: Felipe Ortiz Date: Wed, 6 Apr 2016 14:56:04 -0300 Subject: [PATCH 12/13] Change protocol name --- src/lib/orion_websocket/constants.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib/orion_websocket/constants.cpp b/src/lib/orion_websocket/constants.cpp index 44b43e0c4f..9221eda360 100644 --- a/src/lib/orion_websocket/constants.cpp +++ b/src/lib/orion_websocket/constants.cpp @@ -31,5 +31,5 @@ const int WSConstants::Pooling = 50; const size_t WSConstants::DataSize = 128; // Ok, this is can be a macro, but I prefer // to be consistent -const std::string WSConstants::ProtocolName = "orion-ws"; +const std::string WSConstants::ProtocolName = "ngsiv2-json"; const std::string WSConstants::Scheme = "ws://"; From 51652e9822cbcb7e3db5dfdaba91e86c0edbb932 Mon Sep 17 00:00:00 2001 From: Felipe Ortiz Date: Thu, 7 Apr 2016 10:18:51 -0300 Subject: [PATCH 13/13] Fix bytes count in wsNotify --- src/lib/orion_websocket/wsNotify.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/lib/orion_websocket/wsNotify.cpp b/src/lib/orion_websocket/wsNotify.cpp index f83d56f59a..6eb6f16105 100644 --- a/src/lib/orion_websocket/wsNotify.cpp +++ b/src/lib/orion_websocket/wsNotify.cpp @@ -55,12 +55,14 @@ int sendNotifyContextRequestWs sprintf((char *)p, "%s", msg); int written = 0; + int bytes = 0; while (written < msg_size) { - written += lws_write(it->second, (p + written), strlen(msg), LWS_WRITE_TEXT); - if (written == -1) + bytes = lws_write(it->second, (p + written), msg_size - bytes, LWS_WRITE_TEXT); + if (bytes == -1) return 1; + written += bytes; } free((char *)msg);