From 844b49de03969da21245951db3b3dcfb8011196d Mon Sep 17 00:00:00 2001 From: Dima Krasner Date: Wed, 8 Nov 2023 18:17:19 +0200 Subject: [PATCH] drop the limitation of 8*4K buffer --- guppy.c | 170 +++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 112 insertions(+), 58 deletions(-) diff --git a/guppy.c b/guppy.c index 7c544d0..6337272 100644 --- a/guppy.c +++ b/guppy.c @@ -22,20 +22,28 @@ #include +/*============================================================================*/ +typedef struct GuppyChunk { + long seq; + ssize_t length, skip; + char buffer[USHRT_MAX + 1]; + LIST_ENTRY(GuppyChunk) next; +} GuppyChunk; +typedef LIST_HEAD(GuppyChunks, GuppyChunk) GuppyChunks; + typedef struct GuppySocket { int fd; /* must be first so socket_*() work */ + GuppyChunks chunks; long first, last; - struct { - long seq; - ssize_t length, skip; - char buffer[4096]; - } chunks[8]; } GuppySocket; /*============================================================================*/ static void guppy_close(void *c) { - close(((GuppySocket *)c)->fd); + GuppySocket *s = (GuppySocket *)c; + GuppyChunk *chunk, *tmp; + close(s->fd); + LIST_FOREACH_SAFE(chunk, &s->chunks, next, tmp) free(chunk); free(c); } @@ -57,9 +65,10 @@ static int guppy_ack(int fd, long seq) { static int do_guppy_download(URL *url, GuppySocket *s, char **mime, int ask) { static char buffer[1024], prompt[1024]; struct pollfd pfd = {.fd = s->fd, .events = POLLIN}; + GuppyChunk *chunk = NULL, *last, *tmp; char *crlf, *end, *input; - ssize_t j = -1; - int len, timeout, i, n; + long eof = 0; + int len, timeout, i, n, rc, dup; if ((len = strlen(url->url)) > (int)sizeof(buffer) - 2) return 4; @@ -73,6 +82,7 @@ static int do_guppy_download(URL *url, GuppySocket *s, char **mime, int ask) { request: /* send or re-transmit the request */ if (send(pfd.fd, buffer, len + 2, 0) <= 0) { + free(chunk); if (errno == EAGAIN || errno == EWOULDBLOCK) error(0, "cannot send request to `%s`:`%s`: cancelled", url->host, url->port); else error(0, "cannot send request to `%s`:`%s`: %s", url->host, url->port, strerror(errno)); return 4; @@ -80,51 +90,82 @@ static int do_guppy_download(URL *url, GuppySocket *s, char **mime, int ask) { pfd.revents = 0; if ((n = poll(&pfd, 1, 1000)) == 0) continue; - if (n < 0) return 4; + if (n < 0) { free(chunk); return 4; } while (1) { - j = (j == sizeof(s->chunks) / sizeof(s->chunks[0]) - 1) ? 0 : j + 1; + if (chunk == NULL && (chunk = malloc(sizeof(GuppyChunk))) == NULL) { free(chunk); return 4; } - if ((s->chunks[j].length = recv(pfd.fd, s->chunks[j].buffer, sizeof(s->chunks[j].buffer) - 1, MSG_DONTWAIT)) < 0) { + if ((chunk->length = recv(pfd.fd, chunk->buffer, sizeof(chunk->buffer) - 1, MSG_DONTWAIT)) < 0) { + /* if we received all incoming packets, resend the request */ if (errno == EAGAIN || errno == EWOULDBLOCK) goto request; error(0, "cannot send request to `%s`:`%s`: %s", url->host, url->port, strerror(errno)); + free(chunk); return 4; } - if (s->chunks[j].length < 5 || (crlf = memchr(s->chunks[j].buffer, '\r', s->chunks[j].length - 1)) == NULL || *(crlf + 1) != '\n') continue; + if (chunk->length < 5 || (crlf = memchr(chunk->buffer, '\r', chunk->length - 1)) == NULL || *(crlf + 1) != '\n') continue; *crlf = '\0'; - if (s->chunks[j].buffer[1] == ' ') { - if (s->chunks[j].buffer[0] == '1') { - if (!ask) return 4; - if (color) snprintf(prompt, sizeof(prompt), "\33[35m%.*s>\33[0m ", get_terminal_width() - 2, &s->chunks[j].buffer[2]); - else snprintf(prompt, sizeof(prompt), "%.*s> ", get_terminal_width() - 2, &s->chunks[j].buffer[2]); + if (chunk->buffer[1] == ' ') { + rc = chunk->buffer[0] - '0'; + if (chunk->buffer[0] == '1') { + if (!ask) { free(chunk); return 4; } + if (color) snprintf(prompt, sizeof(prompt), "\33[35m%.*s>\33[0m ", get_terminal_width() - 2, &chunk->buffer[2]); + else snprintf(prompt, sizeof(prompt), "%.*s> ", get_terminal_width() - 2, &chunk->buffer[2]); + free(chunk); if ((input = bestline(prompt)) == NULL) return 4; if (interactive) bestlineHistoryAdd(input); if (!set_input(url, input)) { free(input); return 4; } free(input); if (interactive) bestlineHistoryAdd(url->url); - } else if (s->chunks[j].buffer[0] == '3') { - if (!redirect(url, &s->chunks[j].buffer[2], s->chunks[j].length - 4, ask)) return 4; - } else if (s->chunks[j].buffer[0] == '4') error(0, "cannot download `%s`: %s", url->url, &s->chunks[j].buffer[2]); - return s->chunks[j].buffer[0] - '0'; + } else if (chunk->buffer[0] == '3') { + if (!redirect(url, &chunk->buffer[2], chunk->length - 4, ask)) { free(chunk); return 4; } + free(chunk); + } else if (chunk->buffer[0] == '4') { + error(0, "cannot download `%s`: %s", url->url, &chunk->buffer[2]); + free(chunk); + } + return rc; } - s->chunks[j].seq = strtol(s->chunks[j].buffer, &end, 10); - if (s->chunks[j].seq < 6 || s->chunks[j].seq > INT_MAX || end == NULL || (*end != ' ' && (*end != '\r' || *(end + 1) != '\n'))) { s->chunks[j].seq = -1; continue; } + chunk->seq = strtol(chunk->buffer, &end, 10); + if (chunk->seq < 6 || chunk->seq > INT_MAX || end == NULL || (*end != ' ' && (*end != '\r' || *(end + 1) != '\n'))) continue; *crlf = '\r'; - s->chunks[j].skip = crlf - s->chunks[j].buffer + 2; + chunk->skip = crlf - chunk->buffer + 2; - guppy_ack(s->fd, s->chunks[j].seq); + /* ack the chunk */ + if (!guppy_ack(s->fd, chunk->seq)) { free(chunk); return 4; } + + /* check if we already have this chunk */ + dup = 0; + LIST_FOREACH_SAFE(last, &s->chunks, next, tmp) { + dup = dup || (last->seq == chunk->seq); + } + if (dup) continue; - if (*end != ' ') continue; + if (!eof && chunk->skip == chunk->length) eof = chunk->seq; - s->first = s->chunks[j].seq; + /* add the chunk to the queue */ + if (last == NULL || *end == ' ') LIST_INSERT_HEAD(&s->chunks, chunk, next); + else LIST_INSERT_AFTER(last, chunk, next); + + /* if this is not the first chunk, receive another one */ + if (*end != ' ') { chunk = NULL; continue; } + + /* otherwise, free chunks we won't need and stop */ + s->first = chunk->seq; *mime = end + 1; - return s->chunks[j].seq; + rc = chunk->seq; + + LIST_FOREACH_SAFE(chunk, &s->chunks, next, tmp) { + if (chunk->seq < s->first || (eof && chunk->seq > eof)) { LIST_REMOVE(chunk, next); free(chunk); } + } + + return rc; } } + free(chunk); error(0, "cannot send request to `%s`:`%s`: cancelled", url->host, url->port); return 4; } @@ -132,7 +173,6 @@ static int do_guppy_download(URL *url, GuppySocket *s, char **mime, int ask) { static void *guppy_download(const Selector *sel, URL *url, char **mime, Parser *parser, int ask) { GuppySocket *s = NULL; - size_t i; int status, redirs = 0; (void)sel; @@ -140,7 +180,7 @@ static void *guppy_download(const Selector *sel, URL *url, char **mime, Parser * if ((s = malloc(sizeof(GuppySocket))) == NULL) return NULL; if ((s->fd = socket_connect(url, SOCK_DGRAM)) == -1) { free(s); return NULL; } s->last = -1; - for (i = 0; i < sizeof(s->chunks) / sizeof(s->chunks[0]); ++i) s->chunks[i].seq = -1; + LIST_INIT(&s->chunks); do { status = do_guppy_download(url, s, mime, ask); @@ -152,66 +192,80 @@ static void *guppy_download(const Selector *sel, URL *url, char **mime, Parser * else if (status > 6 && strncmp(*mime, "text/plain", 10) == 0) *parser = parse_plaintext_line; else if (redirs == 5) error(0, "too many redirects from `%s`", url->url); - if (status <= 6) { close(s->fd); free(s); return NULL; } + if (status <= 6) { guppy_close(s); return NULL; } return s; } static int guppy_read(void *c, void *buffer, int length) { GuppySocket *s = (GuppySocket*)c; + GuppyChunk *chunk = NULL, *last, *tmp; struct pollfd pfd = {.fd = s->fd, .events = POLLIN}; char *end; - size_t j; - int timeout, i, n, ret; + int timeout, i, n, ret, dup; - /* check if we have the packet already */ - for (j = 0; j < sizeof(s->chunks) / sizeof(s->chunks[0]); ++j) { - if ((s->last == -1 && s->chunks[j].seq == s->first) || s->chunks[j].seq == s->last + 1) goto have; + LIST_FOREACH(chunk, &s->chunks, next) { + /* if we already have the next chunk, remove it from the queue */ + if ((s->last == -1 && chunk->seq == s->first) || chunk->seq == s->last + 1) { + LIST_REMOVE(chunk, next); + goto have; + } } if ((timeout = get_var_integer("TIMEOUT", 15)) < 1) timeout = 15; for (i = 0; i < timeout; ++i) { while (1) { - /* find a free slot or just use the first if all slots are used and we don't have the next chunk */ - for (j = 0; j < sizeof(s->chunks) / sizeof(s->chunks[0]) && s->chunks[j].seq > s->last; ++j); - if (j == sizeof(s->chunks) / sizeof(s->chunks[0])) j = 0; + if (chunk == NULL && (chunk = malloc(sizeof(GuppyChunk))) == NULL) return -1; - if ((s->chunks[j].length = recv(s->fd, s->chunks[j].buffer, sizeof(s->chunks[j].buffer) - 1, MSG_DONTWAIT)) < 0) { + /* otherwise, receive a chunk */ + if ((chunk->length = recv(s->fd, chunk->buffer, sizeof(chunk->buffer) - 1, MSG_DONTWAIT)) < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; + free(chunk); return -1; } - if (s->chunks[j].length == 0) { errno = ECONNRESET; return 0; } + if (chunk->length == 0) { free(chunk); errno = ECONNRESET; return 0; } /* extract the sequence number */ - s->chunks[j].buffer[s->chunks[j].length] = '\0'; - if ((s->chunks[j].seq = strtol(s->chunks[j].buffer, &end, 10)) < s->first || s->chunks[j].seq > INT_MAX || end == NULL || *end != '\r' || *(end + 1) != '\n') { s->chunks[j].seq = -1; continue; } - s->chunks[j].skip = end - s->chunks[j].buffer + 2; - - /* ack the packet */ - if (!guppy_ack(s->fd, s->chunks[j].seq)) return -1; - - if (s->last != -1 && s->chunks[j].seq <= s->last) s->chunks[j].seq = -1; - else if (s->chunks[j].seq == s->last + 1) goto have; + chunk->buffer[chunk->length] = '\0'; + if ((chunk->seq = strtol(chunk->buffer, &end, 10)) < s->first || chunk->seq > INT_MAX || end == NULL || *end != '\r' || *(end + 1) != '\n') continue; + chunk->skip = end - chunk->buffer + 2; + + /* ack the chunk */ + if (!guppy_ack(s->fd, chunk->seq)) { free(chunk); return -1; } + + /* receive another chunk if we already have this one */ + if (s->last != -1 && chunk->seq <= s->last) continue; + /* stop if this is the next chunk */ + if (chunk->seq == s->last + 1) goto have; + + /* otherwise, append the chunk to the queue if needed and receive another one */ + dup = 0; + LIST_FOREACH_SAFE(last, &s->chunks, next, tmp) { + dup = dup || (last->seq == chunk->seq); + } + if (last == NULL) LIST_INSERT_HEAD(&s->chunks, chunk, next); + else if (dup) continue; + else LIST_INSERT_AFTER(last, chunk, next); + chunk = NULL; } /* wait for the next chunk and resend ack for the previous on timeout */ - pfd.revents = 0; - if ((n = poll(&pfd, 1, 1000)) < 0) return -1; - else if (n == 0 && s->last != -1 && !guppy_ack(s->fd, s->last)) return -1; - else if (n == 0) continue; + if ((n = poll(&pfd, 1, 200)) < 0 || (n == 0 && s->last != -1 && !guppy_ack(s->fd, s->last))) { free(chunk); return -1; } } + free(chunk); errno = ETIMEDOUT; return -1; have: /* signal EOF if this is the EOF packet */ - if (s->chunks[j].skip == s->chunks[j].length) return 0; + if (chunk->skip == chunk->length) { free(chunk); return 0; } - s->last = s->chunks[j].seq; - ret = (length > s->chunks[j].length - s->chunks[j].skip ? s->chunks[j].length - s->chunks[j].skip : length); - memmove(buffer, s->chunks[j].buffer + s->chunks[j].skip, ret); + s->last = chunk->seq; + ret = (length > chunk->length - chunk->skip ? chunk->length - chunk->skip : length); + memmove(buffer, chunk->buffer + chunk->skip, ret); + free(chunk); return ret; }