Skip to content

Commit

Permalink
WIP: add QH command to access the query handler via livestatus
Browse files Browse the repository at this point in the history
this PR adds access to the query handler via livestatus. Since livestatus is
commonly used it would be nice to subscribe to events via livestatus too.
So we simply pass through query handler commands and therefor allow things like:

  echo "QH help" | nc -U tmp/run/live

or

  echo "QH @nerd subscribe servicechecks" | nc -U tmp/run/live

TODO:
  - It is not yet interactive, so you send a command once and will get the results.
  - implement proper shutdown

Signed-off-by: Sven Nierlein <[email protected]>
  • Loading branch information
sni committed Nov 5, 2019
1 parent 10f8797 commit dab7a7e
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 6 deletions.
33 changes: 32 additions & 1 deletion src/Store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ void Store::registerDowntime(nebstruct_downtime_data *d)
_table_downtimes.addDowntime(d);
}

bool Store::answerRequest(InputBuffer *input, OutputBuffer *output)
bool Store::answerRequest(InputBuffer *input, OutputBuffer *output, int fd)
{
output->reset();
int r = input->readRequest();
Expand All @@ -135,6 +135,10 @@ bool Store::answerRequest(InputBuffer *input, OutputBuffer *output)
answerCommandRequest(lstrip((char *)line + 8), output);
output->setDoKeepalive(true);
}
else if (!strncmp(line, "QH ", 3)) {
output->setDoKeepalive(true);
answerQueryHandlerRequest(lstrip((char *)line + 3), output, fd);
}
else if (!strncmp(line, "LOGROTATE", 9)) {
logger(LG_INFO, "Forcing logfile rotation");
rotate_log_file(time(0));
Expand Down Expand Up @@ -171,6 +175,33 @@ void Store::answerCommandRequest(const char *command, OutputBuffer *output)
return;
}

void Store::answerQueryHandlerRequest(const char *command, OutputBuffer *output, int fd)
{
int ret, sd;
char buf[4096];
sd = nsock_unix(qh_socket_path, NSOCK_TCP | NSOCK_CONNECT);
if (sd < 0) {
logger(LG_INFO, "Failed to connect to query socket '%s': %s: %s", qh_socket_path, nsock_strerror(sd), strerror(errno));
return;
}
ret = nsock_printf_nul(sd, "%s", command);
if (ret < 0) {
logger(LG_INFO, "failed to submit command by query handler");
}
output->reset();
while(TRUE) {
ret = read(sd, buf, 4095);
if(ret <= 0)
break;
buf[ret] = 0;
dprintf(fd, "%s", buf);
}
close(sd);
return;
}




void Store::answerGetRequest(InputBuffer *input, OutputBuffer *output, const char *tablename)
{
Expand Down
3 changes: 2 additions & 1 deletion src/Store.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,13 @@ class Store
void registerHostgroup(hostgroup *);
void registerComment(nebstruct_comment_data *);
void registerDowntime(nebstruct_downtime_data *);
bool answerRequest(InputBuffer *, OutputBuffer *);
bool answerRequest(InputBuffer *, OutputBuffer *, int);

private:
Table *findTable(string name);
void answerGetRequest(InputBuffer *, OutputBuffer *, const char *);
void answerCommandRequest(const char *, OutputBuffer *);
void answerQueryHandlerRequest(const char *, OutputBuffer *, int);
};

#endif // Store_h
Expand Down
2 changes: 1 addition & 1 deletion src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ void *client_thread(void *data)
while (keepalive && !g_should_terminate) {
if (g_debug_level >= 2 && requestnr > 1)
logger(LG_INFO, "Handling request %d on same connection", requestnr);
keepalive = store_answer_request(input_buffer, output_buffer);
keepalive = store_answer_request(input_buffer, output_buffer, cc);
flush_output_buffer(output_buffer, cc);
g_counters[COUNTER_REQUESTS]++;
requestnr ++;
Expand Down
4 changes: 2 additions & 2 deletions src/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ void store_register_downtime(nebstruct_downtime_data *d)
g_store->registerDowntime(d);
}

int store_answer_request(void *ib, void *ob)
int store_answer_request(void *ib, void *ob, int fd)
{
return g_store->answerRequest((InputBuffer *)ib, (OutputBuffer *)ob);
return g_store->answerRequest((InputBuffer *)ib, (OutputBuffer *)ob, fd);
}

void *create_outputbuffer(int *termination_flag)
Expand Down
2 changes: 1 addition & 1 deletion src/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ extern "C"
void store_deinit();
void store_register_comment(nebstruct_comment_data *);
void store_register_downtime(nebstruct_downtime_data *);
int store_answer_request(void *input_buffer, void *output_buffer);
int store_answer_request(void *input_buffer, void *output_buffer, int fd);
void *create_outputbuffer(int *termination_flag);
void flush_output_buffer(void *ob, int fd);
void delete_outputbuffer(void *);
Expand Down

0 comments on commit dab7a7e

Please sign in to comment.