Skip to content

Commit

Permalink
[CHANGED] More verbose endpoint INFO (#663)
Browse files Browse the repository at this point in the history
* More verbose endpoint INFO

* better clone/free for metadata

* (valgrind) free test JSON array
  • Loading branch information
levb authored Jun 13, 2023
1 parent a268db3 commit eaa4925
Show file tree
Hide file tree
Showing 7 changed files with 385 additions and 134 deletions.
216 changes: 147 additions & 69 deletions src/micro.c
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ _clone_service_config(microServiceConfig **out, microServiceConfig *cfg)
MICRO_CALL(err, micro_strdup((char **)&new_cfg->Name, cfg->Name));
MICRO_CALL(err, micro_strdup((char **)&new_cfg->Version, cfg->Version));
MICRO_CALL(err, micro_strdup((char **)&new_cfg->Description, cfg->Description));
MICRO_CALL(err, micro_clone_metadata(&new_cfg->Metadata, &new_cfg->MetadataLen, cfg->Metadata, cfg->MetadataLen));
MICRO_CALL(err, micro_clone_endpoint_config(&new_cfg->Endpoint, cfg->Endpoint));
if (err != NULL)
{
Expand All @@ -479,6 +480,7 @@ _free_cloned_service_config(microServiceConfig *cfg)
NATS_FREE((char *)cfg->Name);
NATS_FREE((char *)cfg->Version);
NATS_FREE((char *)cfg->Description);
micro_free_cloned_metadata(cfg->Metadata, cfg->MetadataLen);
micro_free_cloned_endpoint_config(cfg->Endpoint);
NATS_FREE(cfg);
}
Expand Down Expand Up @@ -507,7 +509,7 @@ static microError *
_services_for_connection(microService ***to_call, int *num_microservices, natsConnection *nc)
{
natsMutex *mu = natsLib_getServiceCallbackMutex();
natsHash *h = natsLib_getAllServicesToCallback();
natsHash *h = natsLib_getAllServicesToCallback();
microService *m = NULL;
microService **p = NULL;
natsHashIter iter;
Expand Down Expand Up @@ -721,6 +723,7 @@ microService_GetConnection(microService *m)
microError *
microService_GetInfo(microServiceInfo **new_info, microService *m)
{
microError *err = NULL;
microServiceInfo *info = NULL;
microEndpoint *ep = NULL;
int len;
Expand All @@ -732,41 +735,55 @@ microService_GetInfo(microServiceInfo **new_info, microService *m)
if (info == NULL)
return micro_ErrorOutOfMemory;

micro_strdup((char **)&info->Name, m->cfg->Name);
micro_strdup((char **)&info->Version, m->cfg->Version);
micro_strdup((char **)&info->Description, m->cfg->Description);
micro_strdup((char **)&info->Id, m->id);
info->Type = MICRO_INFO_RESPONSE_TYPE;

_lock_service(m);
MICRO_CALL(err, micro_strdup((char **)&info->Name, m->cfg->Name));
MICRO_CALL(err, micro_strdup((char **)&info->Version, m->cfg->Version));
MICRO_CALL(err, micro_strdup((char **)&info->Description, m->cfg->Description));
MICRO_CALL(err, micro_strdup((char **)&info->Id, m->id));
MICRO_CALL(err, micro_clone_metadata(&info->Metadata, &info->MetadataLen, m->cfg->Metadata, m->cfg->MetadataLen));

len = 0;
for (ep = m->first_ep; ep != NULL; ep = ep->next)
if (err == NULL)
{
if ((!ep->is_monitoring_endpoint) && (ep->subject != NULL))
len++;
}
info->Type = MICRO_INFO_RESPONSE_TYPE;

// Overallocate subjects, will filter out internal ones.
info->Subjects = NATS_CALLOC(len, sizeof(char *));
if (info->Subjects == NULL)
{
_lock_service(m);

len = 0;
for (ep = m->first_ep; ep != NULL; ep = ep->next)
{
if ((!ep->is_monitoring_endpoint) && (ep->subject != NULL))
len++;
}

// Overallocate subjects, will filter out internal ones.
info->Endpoints = NATS_CALLOC(len, sizeof(microEndpointInfo));
if (info->Endpoints == NULL)
{
err = micro_ErrorOutOfMemory;
}

len = 0;
for (ep = m->first_ep; (err == NULL) && (ep != NULL); ep = ep->next)
{
if ((!ep->is_monitoring_endpoint) && (ep->subject != NULL))
{
MICRO_CALL(err, micro_strdup((char **)&info->Endpoints[len].Name, ep->name));
MICRO_CALL(err, micro_strdup((char **)&info->Endpoints[len].Subject, ep->subject));
MICRO_CALL(err, micro_clone_metadata(&(info->Endpoints[len].Metadata), &info->Endpoints[len].MetadataLen, ep->config->Metadata, ep->config->MetadataLen));
if (err == NULL)
{
len++;
info->EndpointsLen = len;
}
}
}
_unlock_service(m);
NATS_FREE(info);
return micro_ErrorOutOfMemory;
}

len = 0;
for (ep = m->first_ep; ep != NULL; ep = ep->next)
if (err != NULL)
{
if ((!ep->is_monitoring_endpoint) && (ep->subject != NULL))
{
micro_strdup((char **)&info->Subjects[len], ep->subject);
len++;
}
microServiceInfo_Destroy(info);
return err;
}
info->SubjectsLen = len;
_unlock_service(m);

*new_info = info;
return NULL;
Expand All @@ -780,19 +797,25 @@ void microServiceInfo_Destroy(microServiceInfo *info)
return;

// casts to quiet the compiler.
for (i = 0; i < info->SubjectsLen; i++)
NATS_FREE((char *)info->Subjects[i]);
NATS_FREE((char *)info->Subjects);
for (i = 0; i < info->EndpointsLen; i++)
{
NATS_FREE((char *)info->Endpoints[i].Name);
NATS_FREE((char *)info->Endpoints[i].Subject);
micro_free_cloned_metadata(info->Endpoints[i].Metadata, info->Endpoints[i].MetadataLen);
}
NATS_FREE((char *)info->Endpoints);
NATS_FREE((char *)info->Name);
NATS_FREE((char *)info->Version);
NATS_FREE((char *)info->Description);
NATS_FREE((char *)info->Id);
micro_free_cloned_metadata(info->Metadata, info->MetadataLen);
NATS_FREE(info);
}

microError *
microService_GetStats(microServiceStats **new_stats, microService *m)
{
microError *err = NULL;
microServiceStats *stats = NULL;
microEndpoint *ep = NULL;
int len;
Expand All @@ -805,52 +828,62 @@ microService_GetStats(microServiceStats **new_stats, microService *m)
if (stats == NULL)
return micro_ErrorOutOfMemory;

micro_strdup((char **)&stats->Name, m->cfg->Name);
micro_strdup((char **)&stats->Version, m->cfg->Version);
micro_strdup((char **)&stats->Id, m->id);
stats->Started = m->started;
stats->Type = MICRO_STATS_RESPONSE_TYPE;
MICRO_CALL(err, micro_strdup((char **)&stats->Name, m->cfg->Name));
MICRO_CALL(err, micro_strdup((char **)&stats->Version, m->cfg->Version));
MICRO_CALL(err, micro_strdup((char **)&stats->Id, m->id));

_lock_service(m);

len = 0;
for (ep = m->first_ep; ep != NULL; ep = ep->next)
if (err == NULL)
{
if ((ep != NULL) && (!ep->is_monitoring_endpoint))
len++;
}
stats->Started = m->started;
stats->Type = MICRO_STATS_RESPONSE_TYPE;

// Allocate the actual structs, not pointers.
stats->Endpoints = NATS_CALLOC(len, sizeof(microEndpointStats));
if (stats->Endpoints == NULL)
{
_unlock_service(m);
NATS_FREE(stats);
return micro_ErrorOutOfMemory;
}
_lock_service(m);

len = 0;
for (ep = m->first_ep; ep != NULL; ep = ep->next)
{
if ((ep != NULL) && (!ep->is_monitoring_endpoint) && (ep->endpoint_mu != NULL))
len = 0;
for (ep = m->first_ep; ep != NULL; ep = ep->next)
{
micro_lock_endpoint(ep);
// copy the entire struct, including the last error buffer.
stats->Endpoints[len] = ep->stats;

micro_strdup((char **)&stats->Endpoints[len].Name, ep->name);
micro_strdup((char **)&stats->Endpoints[len].Subject, ep->subject);
avg = (long double)ep->stats.ProcessingTimeSeconds * 1000000000.0 + (long double)ep->stats.ProcessingTimeNanoseconds;
avg = avg / (long double)ep->stats.NumRequests;
stats->Endpoints[len].AverageProcessingTimeNanoseconds = (int64_t)avg;
len++;
micro_unlock_endpoint(ep);
if ((ep != NULL) && (!ep->is_monitoring_endpoint))
len++;
}
}

_unlock_service(m);
stats->EndpointsLen = len;
// Allocate the actual structs, not pointers.
stats->Endpoints = NATS_CALLOC(len, sizeof(microEndpointStats));
if (stats->Endpoints == NULL)
{
err = micro_ErrorOutOfMemory;
}

len = 0;
for (ep = m->first_ep; ((err == NULL) && (ep != NULL)); ep = ep->next)
{
if ((ep != NULL) && (!ep->is_monitoring_endpoint) && (ep->endpoint_mu != NULL))
{
micro_lock_endpoint(ep);
// copy the entire struct, including the last error buffer.
stats->Endpoints[len] = ep->stats;

MICRO_CALL(err, micro_strdup((char **)&stats->Endpoints[len].Name, ep->name));
MICRO_CALL(err, micro_strdup((char **)&stats->Endpoints[len].Subject, ep->subject));
if (err == NULL)
{
avg = (long double)ep->stats.ProcessingTimeSeconds * 1000000000.0 + (long double)ep->stats.ProcessingTimeNanoseconds;
avg = avg / (long double)ep->stats.NumRequests;
stats->Endpoints[len].AverageProcessingTimeNanoseconds = (int64_t)avg;
len++;
stats->EndpointsLen = len;
}
micro_unlock_endpoint(ep);
}
}

_unlock_service(m);
}

if (err != NULL)
{
microServiceStats_Destroy(stats);
return err;
}
*new_stats = stats;
return NULL;
}
Expand All @@ -873,3 +906,48 @@ void microServiceStats_Destroy(microServiceStats *stats)
NATS_FREE((char *)stats->Id);
NATS_FREE(stats);
}

void micro_free_cloned_metadata(const char **metadata, int len)
{
int i;

if (metadata == NULL)
return;

for (i = 0; i < len*2; i++)
{
NATS_FREE((char *)metadata[i]);
}
NATS_FREE((char **)metadata);
}

microError *micro_clone_metadata(const char ***new_metadata, int *new_len, const char **metadata, int len)
{
char **dup = NULL;
int i;

if (new_metadata == NULL)
return micro_ErrorInvalidArg;
*new_metadata = NULL;

if (len == 0)
return NULL;

dup = NATS_CALLOC(len * 2, sizeof(char *));
if (dup == NULL)
return micro_ErrorOutOfMemory;

for (i = 0; i < len*2; i++)
{
micro_strdup(&dup[i], metadata[i]);
if (dup[i] == NULL)
{
micro_free_cloned_metadata((const char **)dup, i);
return micro_ErrorOutOfMemory;
}
}

*new_metadata = (const char **)dup;
*new_len = len;
return NULL;
}
2 changes: 2 additions & 0 deletions src/micro_endpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ micro_clone_endpoint_config(microEndpointConfig **out, microEndpointConfig *cfg)

MICRO_CALL(err, micro_strdup((char **)&new_cfg->Name, cfg->Name));
MICRO_CALL(err, micro_strdup((char **)&new_cfg->Subject, cfg->Subject));
MICRO_CALL(err, micro_clone_metadata(&new_cfg->Metadata, &new_cfg->MetadataLen, cfg->Metadata, cfg->MetadataLen));

if (err != NULL)
{
Expand All @@ -343,6 +344,7 @@ void micro_free_cloned_endpoint_config(microEndpointConfig *cfg)
// to be freed.
NATS_FREE((char *)cfg->Name);
NATS_FREE((char *)cfg->Subject);
micro_free_cloned_metadata(cfg->Metadata, cfg->MetadataLen);

NATS_FREE(cfg);
}
Expand Down
53 changes: 42 additions & 11 deletions src/micro_monitoring.c
Original file line number Diff line number Diff line change
Expand Up @@ -250,32 +250,63 @@ marshal_ping(natsBuffer **new_buf, microService *m)
return NULL;
}

natsStatus
_marshal_metadata(natsBuffer *buf, const char **metadata, int len)
{
natsStatus s = NATS_OK;
int i;

if (len > 0)
{
IFOK(s, natsBuf_Append(buf, "\"metadata\":{", -1));
for (i = 0; ((s == NATS_OK) && (i < len)); i++)
{
IFOK(s, natsBuf_AppendByte(buf, '"'));
IFOK(s, natsBuf_Append(buf, metadata[i * 2], -1));
IFOK(s, natsBuf_Append(buf, "\":\"", 3));
IFOK(s, natsBuf_Append(buf, metadata[i * 2 + 1], -1));
IFOK(s, natsBuf_AppendByte(buf, '"'));
if (i != len - 1)
IFOK(s, natsBuf_AppendByte(buf, ','));
}
IFOK(s, natsBuf_Append(buf, "},", 2));
}
return NATS_OK;
}

static microError *
marshal_info(natsBuffer **new_buf, microServiceInfo *info)
{
natsBuffer *buf = NULL;
natsStatus s;
int i;

s = natsBuf_Create(&buf, 4096);
IFOK(s, natsBuf_AppendByte(buf, '{'));

IFOK_attr("description", info->Description, ",");
IFOK_attr("id", info->Id, ",");
IFOK_attr("name", info->Name, ",");
IFOK_attr("type", info->Type, ",");
if ((s == NATS_OK) && (info->SubjectsLen > 0))

// "endpoints":{...}
if ((s == NATS_OK) && (info->EndpointsLen > 0))
{
int i;
IFOK(s, natsBuf_Append(buf, "\"subjects\":[", -1));
for (i = 0; i < info->SubjectsLen; i++)
IFOK(s, natsBuf_Append(buf, "\"endpoints\":[", -1));
for (i = 0; ((s == NATS_OK) && (i < info->EndpointsLen)); i++)
{
IFOK(s, natsBuf_AppendByte(buf, '"'));
IFOK(s, natsBuf_Append(buf, info->Subjects[i], -1));
IFOK(s, natsBuf_AppendByte(buf, '"'));
if (i < (info->SubjectsLen - 1))
IFOK(s, natsBuf_AppendByte(buf, '{'));
IFOK_attr("name", info->Endpoints[i].Name, ",");
IFOK(s, _marshal_metadata(buf, info->Endpoints[i].Metadata, info->Endpoints[i].MetadataLen));
IFOK_attr("subject", info->Endpoints[i].Subject, "");
IFOK(s, natsBuf_AppendByte(buf, '}')); // end endpoint
if (i != info->EndpointsLen - 1)
IFOK(s, natsBuf_AppendByte(buf, ','));
}
IFOK(s, natsBuf_Append(buf, "],", 2));
}

IFOK_attr("id", info->Id, ",");
IFOK(s, _marshal_metadata(buf, info->Metadata, info->MetadataLen));
IFOK_attr("name", info->Name, ",");
IFOK_attr("type", info->Type, ",");
IFOK_attr("version", info->Version, "");
IFOK(s, natsBuf_AppendByte(buf, '}'));

Expand Down
Loading

0 comments on commit eaa4925

Please sign in to comment.