diff --git a/src/lib/common/limits.h b/src/lib/common/limits.h index 95a58ae868..d0d6fd9659 100644 --- a/src/lib/common/limits.h +++ b/src/lib/common/limits.h @@ -115,6 +115,13 @@ #define HTTP_HEADER_HOST_MAX_LENGTH 256 +/* **************************************************************************** +* +* Subscription id maximun length - +*/ +#define MAX_LENGTH_SUBID 24 + + /* **************************************************************************** * * Default timeout - 5000 milliseconds diff --git a/src/lib/common/string.cpp b/src/lib/common/string.cpp index 15a64c62a8..c2eab66b8b 100644 --- a/src/lib/common/string.cpp +++ b/src/lib/common/string.cpp @@ -35,6 +35,7 @@ #include "common/wsStrip.h" #include "alarmMgr/alarmMgr.h" +#include "orion_websocket/constants.h" /* **************************************************************************** @@ -242,6 +243,10 @@ bool parseUrl(const std::string& url, std::string& host, int& port, std::string& return false; } + if (url == WSConstants::Scheme) + { + return true; + } /* http://some.host.com/my/path * ^^ ^ ^ diff --git a/src/lib/ngsiNotify/Notifier.cpp b/src/lib/ngsiNotify/Notifier.cpp index b293da6c28..5791748fd8 100644 --- a/src/lib/ngsiNotify/Notifier.cpp +++ b/src/lib/ngsiNotify/Notifier.cpp @@ -35,10 +35,7 @@ #include "ngsiNotify/senderThread.h" #include "ngsiNotify/Notifier.h" - -// FIME P11 #1669: move this to WS library. @fortizc please take care of this -#define WS_SCHEME "ws://" -#define WS_SCHEME_LENGTH 5 +#include "orion_websocket/constants.h" /* **************************************************************************** @@ -99,10 +96,10 @@ void Notifier::sendNotifyContextRequest(NotifyContextRequest* ncr, const std::st std::string uriPath; std::string protocol; - if ((url.length() == WS_SCHEME_LENGTH) && (url.find(WS_SCHEME) == 0)) + if ((url.length() == WSConstants::Scheme.size()) && (url.find(WSConstants::Scheme) == 0)) { // In this case host, port and uriPath are not needed, as the WS library has all the connection information related with the WS - protocol = WS_SCHEME; + protocol = WSConstants::Scheme; } else { diff --git a/src/lib/ngsiNotify/senderThread.cpp b/src/lib/ngsiNotify/senderThread.cpp index 35dd5a2b55..3f143f34ca 100644 --- a/src/lib/ngsiNotify/senderThread.cpp +++ b/src/lib/ngsiNotify/senderThread.cpp @@ -29,18 +29,8 @@ #include "rest/httpRequestSend.h" #include "ngsiNotify/senderThread.h" - -// FIXME P11 #1669: remove this stub when the actual sendNotifyContextRequestWs() gets developed a the end. @fortizc please take care of this -// in the ws library -int sendNotifyContextRequestWs(const std::string& subId, const std::map& headers, const std::string& data) -{ - return 0; -} - - -// FIME P11 #1669: move this to WS library. @fortizc please take care of this -#define WS_SCHEME "ws://" -#define WS_SCHEME_LENGTH 5 +#include "orion_websocket/constants.h" +#include "orion_websocket/wsNotify.h" /* **************************************************************************** @@ -73,7 +63,7 @@ void* startSenderThread(void* p) std::string out; int r; - if (params->protocol == WS_SCHEME) + if (params->protocol == WSConstants::Scheme) { std::map headers; headers.insert(std::make_pair("Fiware-Service", params->tenant)); diff --git a/src/lib/orion_websocket/CMakeLists.txt b/src/lib/orion_websocket/CMakeLists.txt index ead7749a13..56371482cc 100644 --- a/src/lib/orion_websocket/CMakeLists.txt +++ b/src/lib/orion_websocket/CMakeLists.txt @@ -22,14 +22,16 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.6) SET (SOURCES ws.cpp - connection_manager.cpp parser.cpp + constants.cpp + wsNotify.cpp ) SET (HEADERS ws.h - connection_manager.h parser.h + constants.h + wsNotify.h ) diff --git a/src/lib/orion_websocket/connection_manager.cpp b/src/lib/orion_websocket/connection_manager.cpp deleted file mode 100644 index 55240b2d91..0000000000 --- a/src/lib/orion_websocket/connection_manager.cpp +++ /dev/null @@ -1,100 +0,0 @@ -/* -* -* Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U -* -* This file is part of Orion Context Broker. -* -* Orion Context Broker is free software: you can redistribute it and/or -* modify it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* Orion Context Broker is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero -* General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with Orion Context Broker. If not, see http://www.gnu.org/licenses/. -* -* For those usages not covered by this license please contact with -* iot_support at tid dot es -* -* Author: Felipe Ortiz -*/ - -#include "parser.h" -#include "connection_manager.h" -#include "rest/ConnectionInfo.h" -#include "common/Format.h" -#include "rest/HttpHeaders.h" - -#include -#include -#include - -// Max possible code (int size) -#define MAX_CID 32768 - -static pthread_mutex_t mtx_cid = PTHREAD_MUTEX_INITIALIZER; -static pthread_mutex_t mtx_conn = PTHREAD_MUTEX_INITIALIZER; - -static int cidVec[MAX_CID]; -static std::map connections; - -int connection_manager_get_cid() -{ - pthread_mutex_lock(&mtx_cid); - for (int i = 0; i < MAX_CID; ++i) - { - if (cidVec[i] == 1) - continue; - cidVec[i] = 1; - pthread_mutex_unlock(&mtx_cid); - return i; - } - pthread_mutex_unlock(&mtx_cid); - return -1; -} - -ConnectionInfo *connection_manager_get(int cid, const char *message) -{ - std::string url; - std::string verb; - std::string payload; - - pthread_mutex_lock(&mtx_conn); - std::map::iterator it; - it = connections.find(cid); - if (it != connections.end()) - { - pthread_mutex_unlock(&mtx_conn); - - it->second->reset(); - ws_parser_parse(message, it->second, url, verb, payload, it->second->httpHeaders); - it->second->modify(url, verb, payload); - return it->second; - } - - pthread_mutex_unlock(&mtx_conn); - - ConnectionInfo *ci = new ConnectionInfo("v2", JSON, true); - ws_parser_parse(message, ci, url, verb, payload, ci->httpHeaders); - ci->modify(url, verb, payload); - - pthread_mutex_lock(&mtx_conn); - connections[cid] = ci; - pthread_mutex_unlock(&mtx_conn); - return ci; -} - -void connection_manager_remove(int cid) -{ - pthread_mutex_lock(&mtx_conn); - delete connections[cid]; - pthread_mutex_unlock(&mtx_conn); - - pthread_mutex_lock(&mtx_cid); - cidVec[cid] = 0; - pthread_mutex_unlock(&mtx_cid); -} diff --git a/src/lib/orion_websocket/constants.cpp b/src/lib/orion_websocket/constants.cpp new file mode 100644 index 0000000000..9221eda360 --- /dev/null +++ b/src/lib/orion_websocket/constants.cpp @@ -0,0 +1,35 @@ +/* +* +* Copyright 2016 Telefonica Investigacion y Desarrollo, S.A.U +* +* This file is part of Orion Context Broker. +* +* Orion Context Broker is free software: you can redistribute it and/or +* modify it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* Orion Context Broker is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +* General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with Orion Context Broker. If not, see http://www.gnu.org/licenses/. +* +* For those usages not covered by this license please contact with +* iot_support at tid dot es +* +* Author: Felipe Ortiz +*/ + + +#include "constants.h" + +const int WSConstants::Port = 9010; +const int WSConstants::Pooling = 50; +const size_t WSConstants::DataSize = 128; +// Ok, this is can be a macro, but I prefer +// to be consistent +const std::string WSConstants::ProtocolName = "ngsiv2-json"; +const std::string WSConstants::Scheme = "ws://"; diff --git a/src/lib/orion_websocket/connection_manager.h b/src/lib/orion_websocket/constants.h similarity index 73% rename from src/lib/orion_websocket/connection_manager.h rename to src/lib/orion_websocket/constants.h index 76ef08552f..1cd55fe904 100644 --- a/src/lib/orion_websocket/connection_manager.h +++ b/src/lib/orion_websocket/constants.h @@ -1,6 +1,6 @@ /* * -* Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U +* Copyright 2016 Telefonica Investigacion y Desarrollo, S.A.U * * This file is part of Orion Context Broker. * @@ -24,13 +24,20 @@ */ -#ifndef WS_CONNECTION_MANAGER_H -#define WS_CONNECTION_MANAGER_H +#ifndef WS_CONSTANTS_H +#define WS_CONSTANTS_H -class ConnectionInfo; +#include -int connection_manager_get_cid(); -ConnectionInfo *connection_manager_get(int cid, const char *msg); -void connection_manager_remove(int cid); + +class WSConstants +{ +public: + static const int Port; + static const int Pooling; + static const size_t DataSize; + static const std::string ProtocolName; + static const std::string Scheme; +}; #endif diff --git a/src/lib/orion_websocket/parser.cpp b/src/lib/orion_websocket/parser.cpp index 6560e4a2a1..02291c9716 100644 --- a/src/lib/orion_websocket/parser.cpp +++ b/src/lib/orion_websocket/parser.cpp @@ -103,10 +103,31 @@ void ws_parser_parse } } +static void addHeaders +( + const std::string& name, + const std::string& value, + rapidjson::Writer& writer +) +{ + writer.Key(name.c_str()); + + if (value.empty()) + { + writer.String(""); + } + else + { + writer.String(value.c_str()); + } +} + const char *ws_parser_message ( const std::string& msg, const HttpHeaders& head, + const std::vector headName, + const std::vector headValue, int statusCode ) { @@ -119,18 +140,15 @@ const char *ws_parser_message std::map::const_iterator it = head.headerMap.begin(); while (it != head.headerMap.end()) { - writer.Key(it->first.c_str()); - - if (it->second->empty()) - { - writer.String(""); - } - else - { - writer.String(it->second->c_str()); - } + addHeaders(it->first, *(it->second), writer); ++it; } + + for (unsigned i = 0; i < headName.size(); ++i) + { + addHeaders(headName[i], headValue[i], writer); + } + writer.EndObject(); if (statusCode < 100 || statusCode > 599) // Code is not a valid HTTP status code @@ -146,3 +164,34 @@ const char *ws_parser_message return json; } + +const char *ws_parser_notify +( + const std::string& subId, + const std::map& headers, + const std::string &data +) +{ + const char* tmpl = "{\"subscriptionId\": \"%s\", \"headers\": %s, \"data\": %s}"; + + + rapidjson::StringBuffer buff; + rapidjson::Writer writer(buff); + writer.StartObject(); + std::map::const_iterator it = headers.begin(); + while (it != headers.end()) + { + writer.Key(it->first.c_str()); + writer.String(it->second.c_str()); + ++it; + } + writer.EndObject(); + + const char* strHeaders = buff.GetString(); + + size_t size = subId.size() + data.size() + strlen(strHeaders) + strlen(tmpl) - 6 + 1; + char *json = (char *) malloc(size); + sprintf(json, tmpl, subId.c_str(), strHeaders, data.c_str()); + + return json; +} diff --git a/src/lib/orion_websocket/parser.h b/src/lib/orion_websocket/parser.h index 3387385c98..47c9a4c0e8 100644 --- a/src/lib/orion_websocket/parser.h +++ b/src/lib/orion_websocket/parser.h @@ -27,6 +27,7 @@ #define WS_PARSER_H #include +#include #include class HttpHeaders; @@ -46,7 +47,16 @@ const char *ws_parser_message ( const std::string& msg, const HttpHeaders& head, + const std::vector headName, + const std::vector headValue, int statusCode ); +const char *ws_parser_notify +( + const std::string& subId, + const std::map& headers, + const std::string& data +); + #endif diff --git a/src/lib/orion_websocket/ws.cpp b/src/lib/orion_websocket/ws.cpp index fece6bca6d..753e2a1db1 100644 --- a/src/lib/orion_websocket/ws.cpp +++ b/src/lib/orion_websocket/ws.cpp @@ -24,8 +24,9 @@ */ #include "ws.h" -#include "connection_manager.h" +#include "constants.h" #include "parser.h" +#include "wsNotify.h" #include "rest/RestService.h" @@ -33,9 +34,16 @@ #include "logMsg/traceLevels.h" #include "rest/RestService.h" +#include "common/limits.h" + +#include "ngsi10/UnsubscribeContextRequest.h" +#include "ngsi10/UnsubscribeContextResponse.h" +#include "mongoBackend/mongoUnsubscribeContext.h" + #include #include #include +#include #include @@ -53,12 +61,44 @@ struct _orion_websocket // Private struct for persistent data typedef struct { - unsigned cid; + // SubId + std::vector notify; + // Teanat for each subId + std::vector tenant; char *message; char *request; int index; }data; +static bool isSubscription +( + const std::vector& headName, + const std::vector& headValue, + std::string& subId +) +{ + for (unsigned i = 0; i < headName.size(); ++i) + { + if (headName[i] != "Location") + continue; + + size_t pos = headValue[i].find("subscriptions"); + if (pos == std::string::npos) + return false; + + pos = headValue[i].find_last_of('/'); + if (pos == std::string::npos) + return false; + + char buff[MAX_LENGTH_SUBID + 1]; + headValue[i].copy(buff, MAX_LENGTH_SUBID, pos + 1); + buff[MAX_LENGTH_SUBID] = 0; + subId = std::string(buff); + return true; + } + return false; +} + static int wsCallback(lws * ws, enum lws_callback_reasons reason, void *user, @@ -72,24 +112,25 @@ static int wsCallback(lws * ws, { case LWS_CALLBACK_ESTABLISHED: { - int cid = connection_manager_get_cid(); - - if (cid == -1) - { - LM_E(("No more cid available!")); - break; - } - - dat->cid = cid; + dat->notify.clear(); + dat->tenant.clear(); dat->request = NULL; dat->index = 0; - LM_I(("Connecction id: %d", dat->cid)); break; } case LWS_CALLBACK_CLOSED: { - connection_manager_remove(dat->cid); - dat->cid = -1; + removeSenders(dat->notify); + + // Remove all subscriptions from DB + for (unsigned i = 0; i < dat->notify.size(); ++i) + { + UnsubscribeContextRequest req; + UnsubscribeContextResponse rsp; + req.subscriptionId.set(dat->notify[i]); + mongoUnsubscribeContext(&req, &rsp, dat->tenant[i]); + } + break; } @@ -101,7 +142,6 @@ static int wsCallback(lws * ws, unsigned char *buff = (unsigned char *) malloc(size); unsigned char *p = &buff[LWS_SEND_BUFFER_PRE_PADDING]; - sprintf((char*)p, "%s", dat->message); lws_write(ws, p, strlen(dat->message), LWS_WRITE_TEXT); free(buff); @@ -124,10 +164,25 @@ static int wsCallback(lws * ws, { dat->request[dat->index] = 0; - ConnectionInfo *ci = connection_manager_get(dat->cid, dat->request); + ConnectionInfo *ci = new ConnectionInfo("v2", JSON, true); + std::string url; + std::string verb; + std::string payload; + ws_parser_parse(dat->request, ci, url, verb, payload, ci->httpHeaders); + ci->modify(url, verb, payload); const char *restMsg = restService(ci, orionServices).c_str(); - dat->message = strdup(ws_parser_message(restMsg, ci->httpHeaders, (int)ci->httpStatusCode)); + dat->message = strdup(ws_parser_message(restMsg, ci->httpHeaders, ci->httpHeader, ci->httpHeaderValue, (int)ci->httpStatusCode)); + + std::string subId; + if (isSubscription(ci->httpHeader, ci->httpHeaderValue, subId)) + { + addSender(subId, ws); + dat->notify.push_back(subId); + dat->tenant.push_back(ci->tenant); + } + + delete ci; free(dat->request); dat->request = NULL; dat->index = 0; @@ -141,19 +196,6 @@ static int wsCallback(lws * ws, return 0; } -static struct lws_protocols protocols[] = { - { - "orion-ws", - wsCallback, - sizeof(data), - 128, - 1, - NULL - }, - { - NULL, NULL, 0, 0, 0, 0 - } -}; pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; @@ -174,7 +216,7 @@ void *runWS(void *ptr) pthread_mutex_unlock(&mtx); pthread_mutex_lock(&mtx); - lws_service(ws->ctx, 50); + lws_service(ws->ctx, WSConstants::Pooling); pthread_mutex_unlock(&mtx); } return 0; @@ -182,8 +224,22 @@ void *runWS(void *ptr) orion_websocket *orion_websocket_new(RestService *serv) { + static struct lws_protocols protocols[] = { + { + WSConstants::ProtocolName.c_str(), + wsCallback, + sizeof(data), + WSConstants::DataSize, + 1, + NULL + }, + { + NULL, NULL, 0, 0, 0, 0 + } + }; + struct lws_context_creation_info info = { - 9010, + WSConstants::Port, NULL, protocols, lws_get_internal_extensions(), diff --git a/src/lib/orion_websocket/wsNotify.cpp b/src/lib/orion_websocket/wsNotify.cpp new file mode 100644 index 0000000000..6eb6f16105 --- /dev/null +++ b/src/lib/orion_websocket/wsNotify.cpp @@ -0,0 +1,72 @@ +#include "wsNotify.h" +#include "constants.h" +#include "parser.h" + +#include +#include + + +std::mapclient; + +void addSender(const std::string &subId, lws *ws) +{ + client[subId] = ws; +} + +void removeSenders(const std::vector& subIds) +{ + for (unsigned i = 0; i < subIds.size(); ++i) + { + client.erase(subIds[i]); + } +} + + +/* **************************************************************************** +* +* sendNotifyContextRequestWs - +* +* Send notification throught websocket. +* On success return 0, on error return 1 +*/ + + +int sendNotifyContextRequestWs +( + const std::string& subId, + const std::map& headers, + const std::string& data +) +{ + const char *msg = ws_parser_notify(subId, headers, data); + + std::map::iterator it = client.find(subId); + + if (it == client.end()) + return 1; + + int msg_size = strlen(msg); + size_t size = msg_size + + LWS_SEND_BUFFER_PRE_PADDING + + LWS_SEND_BUFFER_POST_PADDING; + + unsigned char *buff = (unsigned char *) malloc(size); + unsigned char *p = &buff[LWS_SEND_BUFFER_PRE_PADDING]; + sprintf((char *)p, "%s", msg); + + int written = 0; + int bytes = 0; + + while (written < msg_size) + { + bytes = lws_write(it->second, (p + written), msg_size - bytes, LWS_WRITE_TEXT); + if (bytes == -1) + return 1; + written += bytes; + } + + free((char *)msg); + free(buff); + + return 0; +} diff --git a/src/lib/orion_websocket/wsNotify.h b/src/lib/orion_websocket/wsNotify.h new file mode 100644 index 0000000000..f107ace453 --- /dev/null +++ b/src/lib/orion_websocket/wsNotify.h @@ -0,0 +1,19 @@ +#ifndef WS_NOTIFY_H +#define WS_NOTIFY_H + +#include +#include +#include + +struct lws; + +void addSender(const std::string &subId, lws *ws); +void removeSenders(const std::vector& subIds); +int sendNotifyContextRequestWs +( + const std::string& subId, + const std::map& headers, + const std::string& data +); + +#endif diff --git a/src/lib/rest/ConnectionInfo.cpp b/src/lib/rest/ConnectionInfo.cpp index 7b0cd427f5..93c1f23470 100644 --- a/src/lib/rest/ConnectionInfo.cpp +++ b/src/lib/rest/ConnectionInfo.cpp @@ -27,7 +27,7 @@ #include "common/string.h" #include "common/globals.h" -#include "rest/uriParamFormat.h" +#include "rest/uriParamNames.h" #include "rest/ConnectionInfo.h" @@ -210,32 +210,6 @@ ConnectionInfo::~ConnectionInfo() servicePathV.clear(); } - -/* **************************************************************************** -* -* reset - Reset some class memeber to use in a websocket connection -*/ -void ConnectionInfo::reset() -{ - version = "HTTP/1.1"; - servicePath = "/"; - - httpHeaders.gotHeaders = true; - httpHeaders.userAgent = "orionWS/0.1"; - httpHeaders.accept = "*/*"; - httpHeaders.contentLength = 0; - httpHeaders.servicePath = "/"; - httpHeaders.tenant.clear(); - - uriParam[URI_PARAM_PAGINATION_DETAILS] = "off"; - uriParam[URI_PARAM_PAGINATION_LIMIT] = "20"; - uriParam[URI_PARAM_PAGINATION_OFFSET] = "0"; - - tenant.clear(); - tenantFromHttpHeader.clear(); -} - - /* **************************************************************************** * * modify - Modify a ConnectionInfo using the given parameters diff --git a/src/lib/rest/ConnectionInfo.h b/src/lib/rest/ConnectionInfo.h index 6ae9d716ad..058e156091 100644 --- a/src/lib/rest/ConnectionInfo.h +++ b/src/lib/rest/ConnectionInfo.h @@ -57,7 +57,6 @@ class ConnectionInfo ConnectionInfo(const std::string &_url, const std::string &_method, const std::string &_version, MHD_Connection* _connection = NULL); ~ConnectionInfo(); - void reset(); void modify(const std::string &_url, const std::string &_verb, const std::string &_payload); MHD_Connection* connection;