Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of io_uring in wrk with batching #525

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
CFLAGS += -std=c99 -Wall -O2 -D_REENTRANT
LIBS := -lm -lssl -lcrypto -lpthread
LIBS := -lm -lssl -lcrypto -lpthread -luring

TARGET := $(shell uname -s | tr '[A-Z]' '[a-z]' 2>/dev/null || echo unknown)

Expand Down
224 changes: 219 additions & 5 deletions src/wrk.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,27 @@
#include "wrk.h"
#include "script.h"
#include "main.h"
#include <liburing.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <string.h>
#include <sys/uio.h>
#include <stdio.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <sys/time.h>
#include <pthread.h>

#define QUEUE_DEPTH 4096
#define BACKLOG 4096

static struct config {
uint64_t connections;
uint64_t duration;
uint64_t threads;
uint64_t timeout;
uint64_t pipeline;
bool use_io;
bool delay;
bool dynamic;
bool latency;
Expand Down Expand Up @@ -41,6 +55,12 @@ static void handler(int sig) {
stop = 1;
}

void *thread_io_uring_batch(void *data);
void prep_connect(struct io_uring_sqe *sqe, struct connection *conn);
void prep_send(struct io_uring_sqe *sqe, struct connection *conn);
void prep_read(struct io_uring_sqe *sqe, struct connection *conn);
void initialize_connection(struct connection *conn, struct io_uring *ring);

static void usage() {
printf("Usage: wrk <options> <url> \n"
" Options: \n"
Expand All @@ -53,6 +73,7 @@ static void usage() {
" --latency Print latency statistics \n"
" --timeout <T> Socket/request timeout \n"
" -v, --version Print version details \n"
" -i, --io_uring Enable io_uring \n"
" \n"
" Numeric arguments may include a SI unit (1k, 1M, 1G)\n"
" Time arguments may include a time unit (2s, 2m, 2h)\n");
Expand Down Expand Up @@ -105,6 +126,7 @@ int main(int argc, char **argv) {
thread *t = &threads[i];
t->loop = aeCreateEventLoop(10 + cfg.connections * 3);
t->connections = cfg.connections / cfg.threads;
t->port = atoi(port);

t->L = script_create(cfg.script, url, headers);
script_init(L, t, argc - optind, &argv[optind]);
Expand All @@ -118,9 +140,13 @@ int main(int argc, char **argv) {
parser_settings.on_header_value = header_value;
parser_settings.on_body = response_body;
}
}
}

if (!t->loop || pthread_create(&t->thread, NULL, &thread_main, t)) {
if(cfg.use_io) {
printf("Using io_uring!\n");
pthread_create(&t->thread, NULL, &thread_io_uring_batch, t);

} else if (!t->loop || pthread_create(&t->thread, NULL, &thread_main, t)) {
char *msg = strerror(errno);
fprintf(stderr, "unable to create thread %"PRIu64": %s\n", i, msg);
exit(2);
Expand Down Expand Up @@ -199,6 +225,177 @@ int main(int argc, char **argv) {
return 0;
}

void *thread_io_uring_batch(void *arg) {

thread *thread = arg;
struct io_uring ring;
if (io_uring_queue_init(QUEUE_DEPTH, &ring, 0) < 0) {
perror("Failed to initialize io_uring");
return NULL;
}

char *request = NULL;
size_t length = 0;

if (!cfg.dynamic) {
printf("Inside cfg.dynamic\n");
script_request(thread->L, &request, &length);
}

thread->cs = zcalloc(thread->connections * sizeof(connection));
connection *c = thread->cs;

for (uint64_t i = 0; i < thread->connections; i++, c++) {
c->thread = thread;
c->ssl = cfg.ctx ? SSL_new(cfg.ctx) : NULL;
c->request = request;
c->length = length;
c->delayed = cfg.delay;
int sockfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (sockfd == -1) {
perror("Failed to create socket");
continue;
}
c->fd = sockfd;
c->state = CONNECT;
c->addr.sin_family = AF_INET;
c->addr.sin_port = htons(c->thread->port);
inet_pton(AF_INET, cfg.host, &(c->addr.sin_addr));
c->parser.data = c;
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
prep_connect(sqe, c);

}

clock_t start_time = clock();

thread->start = time_us();
uint64_t last_record_time = time_us();
while(1) {
io_uring_submit(&ring);
struct io_uring_cqe *cqes[BACKLOG];
int cqe_count = io_uring_peek_batch_cqe(&ring, cqes, sizeof(cqes) / sizeof(cqes[0]));
for(int i = 0; i < cqe_count; i++) {
struct io_uring_cqe *cqe = cqes[i];
struct connection *conn = io_uring_cqe_get_data(cqe);
switch(conn->state) {
case CONNECT:
io_uring_cqe_seen(&ring, cqe);
http_parser_init(&conn->parser, HTTP_RESPONSE);
conn->state = SEND;
conn->written = 0;
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
prep_send(sqe, conn);
break;
case SEND:
io_uring_cqe_seen(&ring, cqe);
sqe = io_uring_get_sqe(&ring);
prep_read(sqe, conn);
break;
case READ:
io_uring_cqe_seen(&ring, cqe);
io_uring_cqe_get_data(cqe);
size_t x = http_parser_execute(&conn->parser, &parser_settings, conn->buf, cqe->res);
if (cqe->res < 0 || x < 0)
{
fprintf(stderr, "Connection failed with error: %s\n", strerror(-cqe->res));
thread->errors.connect++;
}
else if (cqe->res == 0)
{
close(conn->fd);
initialize_connection(conn, &ring);
io_uring_submit(&ring);
int y = io_uring_wait_cqe(&ring, &cqe);
if (y < 0)
{
printf("Error waiting for CQE after re-establishing connection\n");
}
}
else if (cqe->res == RECVBUF)
{

sqe = io_uring_get_sqe(&ring);
prep_read(sqe, conn);
}
else
{
conn->state = SEND;
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
prep_send(sqe, conn);
}
conn->thread->bytes += cqe->res;
break;
}
}

uint64_t current_time = time_us();

if (current_time - last_record_time >= RECORD_INTERVAL_MS * 1000) {
if (thread->requests > 0)
{
uint64_t elapsed_ms = (current_time - thread->start) / 1000;
uint64_t requests = (thread->requests / (double) elapsed_ms) * 1000;
stats_record(statistics.requests, requests);
thread->start = time_us();
thread->requests = 0;
}
last_record_time = current_time;
}


clock_t current_time_2 = clock();
double time_passed_in_seconds = ((double)(current_time_2 - start_time)) / CLOCKS_PER_SEC;


if (time_passed_in_seconds >= cfg.duration)
{
break;
}
}

printf("Connection created!\n");
thread->start = time_us();
io_uring_queue_exit(&ring);

return NULL;
}


void prep_connect(struct io_uring_sqe *sqe, struct connection *conn) {
io_uring_prep_connect(sqe, conn->fd, (struct sockaddr *)&conn->addr, sizeof(conn->addr));
io_uring_sqe_set_data(sqe, conn);
}

void prep_send(struct io_uring_sqe *sqe, struct connection *conn) {
if(!conn->written) {
conn->start = time_us();
conn->pending = cfg.pipeline;
}
conn->pending = cfg.pipeline;
char msg[256];
sprintf(msg, "GET / HTTP/1.1\r\nHost: %s\r\n\r\n", cfg.host);
io_uring_prep_send(sqe, conn->fd, msg, strlen(msg), MSG_DONTWAIT);
io_uring_sqe_set_data(sqe, conn);
}

void prep_read(struct io_uring_sqe *sqe, struct connection *conn) {
conn->state = READ;
memset(conn->buf, 0, sizeof(conn->buf));
io_uring_prep_read(sqe, conn->fd, conn->buf, RECVBUF, 0);
io_uring_sqe_set_data(sqe, conn);
}

void initialize_connection(struct connection *conn, struct io_uring *ring) {
conn->fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
conn->state = CONNECT;
conn->addr.sin_family = AF_INET;
conn->addr.sin_port = htons(conn->thread->port);
inet_pton(AF_INET, cfg.host, &(conn->addr.sin_addr));
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
prep_connect(sqe, conn);
}

void *thread_main(void *arg) {
thread *thread = arg;

Expand Down Expand Up @@ -326,9 +523,17 @@ static int response_complete(http_parser *parser) {
thread *thread = c->thread;
uint64_t now = time_us();
int status = parser->status_code;
if(cfg.use_io) {
thread->complete++;
thread->requests++;

thread->complete++;
thread->requests++;
if (--c->pending == 0) {
if (!stats_record(statistics.latency, now - c->start)) {
thread->errors.timeout++;
}
}
return 0;
}

if (status > 399) {
thread->errors.status++;
Expand All @@ -348,6 +553,9 @@ static int response_complete(http_parser *parser) {
aeCreateFileEvent(thread->loop, c->fd, AE_WRITABLE, socket_writeable, c);
}

thread->complete++;
thread->requests++;

if (!http_should_keep_alive(parser)) {
reconnect_socket(thread, c);
goto done;
Expand Down Expand Up @@ -392,6 +600,8 @@ static void socket_writeable(aeEventLoop *loop, int fd, void *data, int mask) {
return;
}



if (!c->written) {
if (cfg.dynamic) {
script_request(thread->L, &c->request, &c->length);
Expand Down Expand Up @@ -476,6 +686,7 @@ static struct option longopts[] = {
{ "timeout", required_argument, NULL, 'T' },
{ "help", no_argument, NULL, 'h' },
{ "version", no_argument, NULL, 'v' },
{ "io_uring", no_argument, NULL, 'i' },
{ NULL, 0, NULL, 0 }
};

Expand All @@ -489,7 +700,7 @@ static int parse_args(struct config *cfg, char **url, struct http_parser_url *pa
cfg->duration = 10;
cfg->timeout = SOCKET_TIMEOUT_MS;

while ((c = getopt_long(argc, argv, "t:c:d:s:H:T:Lrv?", longopts, NULL)) != -1) {
while ((c = getopt_long(argc, argv, "t:c:d:s:H:T:Lrvi?", longopts, NULL)) != -1) {
switch (c) {
case 't':
if (scan_metric(optarg, &cfg->threads)) return -1;
Expand Down Expand Up @@ -517,6 +728,9 @@ static int parse_args(struct config *cfg, char **url, struct http_parser_url *pa
printf("wrk %s [%s] ", VERSION, aeGetApiName());
printf("Copyright (C) 2012 Will Glozer\n");
break;
case 'i':
cfg->use_io = true;
break;
case 'h':
case '?':
case ':':
Expand Down
5 changes: 4 additions & 1 deletion src/wrk.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ typedef struct {
lua_State *L;
errors errors;
struct connection *cs;
int port;
char host;
} thread;

typedef struct {
Expand All @@ -48,7 +50,7 @@ typedef struct connection {
thread *thread;
http_parser parser;
enum {
FIELD, VALUE
FIELD, VALUE, CONNECT, READ, SEND
} state;
int fd;
SSL *ssl;
Expand All @@ -61,6 +63,7 @@ typedef struct connection {
buffer headers;
buffer body;
char buf[RECVBUF];
struct sockaddr_in addr;
} connection;

#endif /* WRK_H */