diff --git a/Makefile b/Makefile index 395b98a7..04933bd2 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ CFLAGS += -std=c99 -Wall -O2 -D_REENTRANT -LIBS := -lm -lssl -lcrypto -lpthread +LIBS := -lm -lssl -lcrypto -lpthread -luring TARGET := $(shell uname -s | tr '[A-Z]' '[a-z]' 2>/dev/null || echo unknown) diff --git a/src/wrk.c b/src/wrk.c index 51f46f72..5e3c1323 100644 --- a/src/wrk.c +++ b/src/wrk.c @@ -3,6 +3,19 @@ #include "wrk.h" #include "script.h" #include "main.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define QUEUE_DEPTH 4096 +#define BACKLOG 4096 static struct config { uint64_t connections; @@ -10,6 +23,7 @@ static struct config { uint64_t threads; uint64_t timeout; uint64_t pipeline; + bool use_io; bool delay; bool dynamic; bool latency; @@ -41,6 +55,12 @@ static void handler(int sig) { stop = 1; } +void *thread_io_uring_batch(void *data); +void prep_connect(struct io_uring_sqe *sqe, struct connection *conn); +void prep_send(struct io_uring_sqe *sqe, struct connection *conn); +void prep_read(struct io_uring_sqe *sqe, struct connection *conn); +void initialize_connection(struct connection *conn, struct io_uring *ring); + static void usage() { printf("Usage: wrk \n" " Options: \n" @@ -53,6 +73,7 @@ static void usage() { " --latency Print latency statistics \n" " --timeout Socket/request timeout \n" " -v, --version Print version details \n" + " -i, --io_uring Enable io_uring \n" " \n" " Numeric arguments may include a SI unit (1k, 1M, 1G)\n" " Time arguments may include a time unit (2s, 2m, 2h)\n"); @@ -105,6 +126,7 @@ int main(int argc, char **argv) { thread *t = &threads[i]; t->loop = aeCreateEventLoop(10 + cfg.connections * 3); t->connections = cfg.connections / cfg.threads; + t->port = atoi(port); t->L = script_create(cfg.script, url, headers); script_init(L, t, argc - optind, &argv[optind]); @@ -118,9 +140,13 @@ int main(int argc, char **argv) { parser_settings.on_header_value = header_value; parser_settings.on_body = response_body; } - } + } - if (!t->loop || pthread_create(&t->thread, NULL, &thread_main, t)) { + if(cfg.use_io) { + printf("Using io_uring!\n"); + pthread_create(&t->thread, NULL, &thread_io_uring_batch, t); + + } else if (!t->loop || pthread_create(&t->thread, NULL, &thread_main, t)) { char *msg = strerror(errno); fprintf(stderr, "unable to create thread %"PRIu64": %s\n", i, msg); exit(2); @@ -199,6 +225,177 @@ int main(int argc, char **argv) { return 0; } +void *thread_io_uring_batch(void *arg) { + + thread *thread = arg; + struct io_uring ring; + if (io_uring_queue_init(QUEUE_DEPTH, &ring, 0) < 0) { + perror("Failed to initialize io_uring"); + return NULL; +} + + char *request = NULL; + size_t length = 0; + + if (!cfg.dynamic) { + printf("Inside cfg.dynamic\n"); + script_request(thread->L, &request, &length); + } + + thread->cs = zcalloc(thread->connections * sizeof(connection)); + connection *c = thread->cs; + + for (uint64_t i = 0; i < thread->connections; i++, c++) { + c->thread = thread; + c->ssl = cfg.ctx ? SSL_new(cfg.ctx) : NULL; + c->request = request; + c->length = length; + c->delayed = cfg.delay; + int sockfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); + if (sockfd == -1) { + perror("Failed to create socket"); + continue; + } + c->fd = sockfd; + c->state = CONNECT; + c->addr.sin_family = AF_INET; + c->addr.sin_port = htons(c->thread->port); + inet_pton(AF_INET, cfg.host, &(c->addr.sin_addr)); + c->parser.data = c; + struct io_uring_sqe *sqe = io_uring_get_sqe(&ring); + prep_connect(sqe, c); + + } + + clock_t start_time = clock(); + + thread->start = time_us(); + uint64_t last_record_time = time_us(); + while(1) { + io_uring_submit(&ring); + struct io_uring_cqe *cqes[BACKLOG]; + int cqe_count = io_uring_peek_batch_cqe(&ring, cqes, sizeof(cqes) / sizeof(cqes[0])); + for(int i = 0; i < cqe_count; i++) { + struct io_uring_cqe *cqe = cqes[i]; + struct connection *conn = io_uring_cqe_get_data(cqe); + switch(conn->state) { + case CONNECT: + io_uring_cqe_seen(&ring, cqe); + http_parser_init(&conn->parser, HTTP_RESPONSE); + conn->state = SEND; + conn->written = 0; + struct io_uring_sqe *sqe = io_uring_get_sqe(&ring); + prep_send(sqe, conn); + break; + case SEND: + io_uring_cqe_seen(&ring, cqe); + sqe = io_uring_get_sqe(&ring); + prep_read(sqe, conn); + break; + case READ: + io_uring_cqe_seen(&ring, cqe); + io_uring_cqe_get_data(cqe); + size_t x = http_parser_execute(&conn->parser, &parser_settings, conn->buf, cqe->res); + if (cqe->res < 0 || x < 0) + { + fprintf(stderr, "Connection failed with error: %s\n", strerror(-cqe->res)); + thread->errors.connect++; + } + else if (cqe->res == 0) + { + close(conn->fd); + initialize_connection(conn, &ring); + io_uring_submit(&ring); + int y = io_uring_wait_cqe(&ring, &cqe); + if (y < 0) + { + printf("Error waiting for CQE after re-establishing connection\n"); + } + } + else if (cqe->res == RECVBUF) + { + + sqe = io_uring_get_sqe(&ring); + prep_read(sqe, conn); + } + else + { + conn->state = SEND; + struct io_uring_sqe *sqe = io_uring_get_sqe(&ring); + prep_send(sqe, conn); + } + conn->thread->bytes += cqe->res; + break; + } + } + + uint64_t current_time = time_us(); + + if (current_time - last_record_time >= RECORD_INTERVAL_MS * 1000) { + if (thread->requests > 0) + { + uint64_t elapsed_ms = (current_time - thread->start) / 1000; + uint64_t requests = (thread->requests / (double) elapsed_ms) * 1000; + stats_record(statistics.requests, requests); + thread->start = time_us(); + thread->requests = 0; + } + last_record_time = current_time; + } + + + clock_t current_time_2 = clock(); + double time_passed_in_seconds = ((double)(current_time_2 - start_time)) / CLOCKS_PER_SEC; + + + if (time_passed_in_seconds >= cfg.duration) + { + break; + } + } + + printf("Connection created!\n"); + thread->start = time_us(); + io_uring_queue_exit(&ring); + + return NULL; +} + + +void prep_connect(struct io_uring_sqe *sqe, struct connection *conn) { + io_uring_prep_connect(sqe, conn->fd, (struct sockaddr *)&conn->addr, sizeof(conn->addr)); + io_uring_sqe_set_data(sqe, conn); +} + +void prep_send(struct io_uring_sqe *sqe, struct connection *conn) { + if(!conn->written) { + conn->start = time_us(); + conn->pending = cfg.pipeline; + } + conn->pending = cfg.pipeline; + char msg[256]; + sprintf(msg, "GET / HTTP/1.1\r\nHost: %s\r\n\r\n", cfg.host); + io_uring_prep_send(sqe, conn->fd, msg, strlen(msg), MSG_DONTWAIT); + io_uring_sqe_set_data(sqe, conn); +} + +void prep_read(struct io_uring_sqe *sqe, struct connection *conn) { + conn->state = READ; + memset(conn->buf, 0, sizeof(conn->buf)); + io_uring_prep_read(sqe, conn->fd, conn->buf, RECVBUF, 0); + io_uring_sqe_set_data(sqe, conn); +} + +void initialize_connection(struct connection *conn, struct io_uring *ring) { + conn->fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); + conn->state = CONNECT; + conn->addr.sin_family = AF_INET; + conn->addr.sin_port = htons(conn->thread->port); + inet_pton(AF_INET, cfg.host, &(conn->addr.sin_addr)); + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + prep_connect(sqe, conn); +} + void *thread_main(void *arg) { thread *thread = arg; @@ -326,9 +523,17 @@ static int response_complete(http_parser *parser) { thread *thread = c->thread; uint64_t now = time_us(); int status = parser->status_code; + if(cfg.use_io) { + thread->complete++; + thread->requests++; - thread->complete++; - thread->requests++; + if (--c->pending == 0) { + if (!stats_record(statistics.latency, now - c->start)) { + thread->errors.timeout++; + } + } + return 0; + } if (status > 399) { thread->errors.status++; @@ -348,6 +553,9 @@ static int response_complete(http_parser *parser) { aeCreateFileEvent(thread->loop, c->fd, AE_WRITABLE, socket_writeable, c); } + thread->complete++; + thread->requests++; + if (!http_should_keep_alive(parser)) { reconnect_socket(thread, c); goto done; @@ -392,6 +600,8 @@ static void socket_writeable(aeEventLoop *loop, int fd, void *data, int mask) { return; } + + if (!c->written) { if (cfg.dynamic) { script_request(thread->L, &c->request, &c->length); @@ -476,6 +686,7 @@ static struct option longopts[] = { { "timeout", required_argument, NULL, 'T' }, { "help", no_argument, NULL, 'h' }, { "version", no_argument, NULL, 'v' }, + { "io_uring", no_argument, NULL, 'i' }, { NULL, 0, NULL, 0 } }; @@ -489,7 +700,7 @@ static int parse_args(struct config *cfg, char **url, struct http_parser_url *pa cfg->duration = 10; cfg->timeout = SOCKET_TIMEOUT_MS; - while ((c = getopt_long(argc, argv, "t:c:d:s:H:T:Lrv?", longopts, NULL)) != -1) { + while ((c = getopt_long(argc, argv, "t:c:d:s:H:T:Lrvi?", longopts, NULL)) != -1) { switch (c) { case 't': if (scan_metric(optarg, &cfg->threads)) return -1; @@ -517,6 +728,9 @@ static int parse_args(struct config *cfg, char **url, struct http_parser_url *pa printf("wrk %s [%s] ", VERSION, aeGetApiName()); printf("Copyright (C) 2012 Will Glozer\n"); break; + case 'i': + cfg->use_io = true; + break; case 'h': case '?': case ':': diff --git a/src/wrk.h b/src/wrk.h index 2d0ac84e..ccaefba3 100644 --- a/src/wrk.h +++ b/src/wrk.h @@ -36,6 +36,8 @@ typedef struct { lua_State *L; errors errors; struct connection *cs; + int port; + char host; } thread; typedef struct { @@ -48,7 +50,7 @@ typedef struct connection { thread *thread; http_parser parser; enum { - FIELD, VALUE + FIELD, VALUE, CONNECT, READ, SEND } state; int fd; SSL *ssl; @@ -61,6 +63,7 @@ typedef struct connection { buffer headers; buffer body; char buf[RECVBUF]; + struct sockaddr_in addr; } connection; #endif /* WRK_H */