-
Notifications
You must be signed in to change notification settings - Fork 32
/
Copy pathclient.c
144 lines (121 loc) · 4.26 KB
/
client.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/* Copyright 2013 Bliksem Labs. See the LICENSE file at the top-level directory of this distribution and at https://github.com/bliksemlabs/rrrr/. */
/* client.c : test client */
#include <time.h>
#include <zmq.h>
#include <czmq.h>
#include <assert.h>
#include <unistd.h>
#include <syslog.h>
#include "rrrr.h"
#include "router.h"
#include "config.h"
static bool verbose = false;
static bool randomize = false;
static uint32_t from_s = 0;
static uint32_t to_s = 1;
static void client_task (void *args, zctx_t *ctx, void *pipe) {
uint32_t n_requests = *((uint32_t *) args);
syslog (LOG_INFO, "test client thread will send %d requests", n_requests);
// connect client thread to load balancer
void *sock = zsocket_new (ctx, ZMQ_REQ); // auto-deleted with thread
uint32_t rc = zsocket_connect (sock, CLIENT_ENDPOINT);
assert (rc == 0);
uint32_t request_count = 0;
// load transit data from disk
tdata_t tdata;
tdata_load(RRRR_INPUT_FILE, &tdata);
while (true) {
router_request_t req;
router_request_initialize (&req);
// unfortunately tdata is not available here or we could initialize from current epoch time
if (randomize)
router_request_randomize (&req, &tdata);
else {
req.from=from_s;
req.to=to_s;
}
// if (verbose) router_request_dump(&router, &req); // router is not available in client
zmq_send(sock, &req, sizeof(req), 0);
// syslog (LOG_INFO, "test client thread has sent %d requests\n", request_count);
char *reply = zstr_recv (sock);
if (!reply)
break;
if (verbose)
printf ("%s", reply);
free (reply);
if (++request_count >= n_requests)
break;
}
syslog (LOG_INFO, "test client thread terminating");
zstr_send (pipe, "done");
}
void usage() {
printf("usage: 'client rand [nreqs] [nthreads]' or 'client id [from_stop_id] [to_stop_id]'\n" );
exit (1);
}
int main (int argc, char **argv) {
// initialize logging
setlogmask(LOG_UPTO(LOG_DEBUG));
openlog (PROGRAM_NAME, LOG_CONS | LOG_PID | LOG_PERROR, LOG_USER);
syslog (LOG_INFO, "test client starting");
// ensure different random requests on different runs
srand(time(NULL));
// read and range-check parameters
uint32_t n_requests = 1;
uint32_t concurrency = RRRR_TEST_CONCURRENCY;
if (argc != 4)
usage();
if (strcmp(argv[1], "rand") == 0) {
randomize = true;
} else if (strcmp(argv[1], "id") == 0) {
randomize = false;
} else {
usage();
}
if (randomize && argc > 2) {
n_requests = atoi(argv[2]);
if (argc > 3)
concurrency = atoi(argv[3]);
} else {
from_s = atoi(argv[2]);
to_s = atoi(argv[3]);
n_requests = 1;
concurrency = 1;
}
if (concurrency < 1 || concurrency > 32)
concurrency = RRRR_TEST_CONCURRENCY;
if (concurrency > n_requests)
concurrency = n_requests;
syslog (LOG_INFO, "test client number of requests: %d", n_requests);
syslog (LOG_INFO, "test client concurrency: %d", concurrency);
verbose = (n_requests == 1);
// divide up work between threads
uint32_t n_reqs[concurrency];
for (uint32_t i = 0; i < concurrency; i++)
n_reqs[i] = n_requests / concurrency;
for (uint32_t i = 0; i < n_requests % concurrency; i++)
n_reqs[i] += 1;
// track runtime
struct timeval t0, t1;
// create threads
void *pipes[concurrency];
zctx_t *ctx = zctx_new ();
gettimeofday(&t0, NULL);
for (uint32_t n = 0; n < concurrency; n++)
pipes[n] = zthread_fork (ctx, client_task, n_reqs + n);
// wait for all threads to complete
for (uint32_t n = 0; n < concurrency; n++) {
char *s = zstr_recv (pipes[n]);
if (s == NULL) break; // interrupted
free (s);
}
// output summary information
gettimeofday(&t1, NULL);
double dt = ((t1.tv_usec + 1000000 * t1.tv_sec) - (t0.tv_usec + 1000000 * t0.tv_sec)) / 1000000.0;
syslog (LOG_INFO, "%d threads, %d total requests, %f sec total time (%f req/sec)",
concurrency, n_requests, dt, n_requests / dt);
//printf("%c", '\0x1A');
// teardown
zctx_destroy (&ctx);
closelog();
}