Skip to content

Commit

Permalink
fix: refactor TCP multiplexing handling with improved header processi…
Browse files Browse the repository at this point in the history
…ng and stream management

Signed-off-by: Dengfeng Liu <[email protected]>
  • Loading branch information
liudf0716 committed Nov 14, 2024
1 parent 461ff10 commit 3f21cc8
Showing 1 changed file with 153 additions and 104 deletions.
257 changes: 153 additions & 104 deletions control.c
Original file line number Diff line number Diff line change
Expand Up @@ -525,118 +525,167 @@ handle_frps_msg(uint8_t *buf, int len, void *ctx)

static struct tmux_stream abandon_stream;

// ctx: if recv_cb was called by common control, ctx == NULL
// else ctx == client struct
static void
recv_cb(struct bufferevent *bev, void *ctx)
static int handle_tcp_mux_header(struct bufferevent *bev,
struct tcp_mux_header *tmux_hdr,
int *len,
uint32_t *stream_len)
{
uint8_t *data = (uint8_t *)tmux_hdr;
size_t nr = bufferevent_read(bev, data, sizeof(*tmux_hdr));
if (nr != sizeof(*tmux_hdr)) {
debug(LOG_ERR, "Failed to read TCP mux header");
return -1;
}

*len -= nr;

if (!validate_tcp_mux_protocol(tmux_hdr)) {
debug(LOG_ERR, "Invalid TCP mux protocol");
return -1;
}

if (tmux_hdr->type == DATA) {
*stream_len = ntohl(tmux_hdr->length);
}

return 0;
}

static struct tmux_stream* get_stream_for_data(const struct tcp_mux_header *tmux_hdr,
uint32_t stream_len)
{
struct tmux_stream *cur = NULL;
if (tmux_hdr->type == DATA) {
uint32_t stream_id = ntohl(tmux_hdr->stream_id);
cur = get_stream_by_id(stream_id);
if (!cur && stream_len > 0) {
debug(LOG_INFO, "Using abandon stream for id %d, len %d",
stream_id, stream_len);
cur = &abandon_stream;
}
}
return cur;
}

static int process_stream_data(struct bufferevent *bev,
struct tmux_stream *cur,
int *len,
uint32_t *stream_len)
{
size_t nr;
if (*len >= *stream_len) {
nr = tmux_stream_read(bev, cur, *stream_len);
if (nr != *stream_len) {
return -1;
}
*len -= *stream_len;
*stream_len = 0;
} else {
nr = tmux_stream_read(bev, cur, *len);
if (nr != *len) {
return -1;
}
*stream_len -= *len;
*len = 0;
set_cur_stream(cur);
}
return 0;
}

static void handle_tcp_mux_message(const struct tcp_mux_header *tmux_hdr,
struct tmux_stream *cur)
{
if (cur == &abandon_stream) {
debug(LOG_INFO, "Abandoning stream data");
memset(cur, 0, sizeof(abandon_stream));
set_cur_stream(NULL);
return;
}

switch(tmux_hdr->type) {
case DATA:
case WINDOW_UPDATE:
handle_tcp_mux_stream(tmux_hdr, handle_frps_msg);
break;
case PING:
handle_tcp_mux_ping(tmux_hdr);
break;
case GO_AWAY:
handle_tcp_mux_go_away(tmux_hdr);
break;
default:
debug(LOG_ERR, "Invalid TCP mux message type");
exit(-1);
}
}

static void handle_tcp_mux(struct bufferevent *bev, int len)
{
static struct tcp_mux_header tmux_hdr;
static uint32_t stream_len = 0;

while (len > 0) {
struct tmux_stream *cur = get_cur_stream();

if (!cur) {
if (len < sizeof(tmux_hdr)) {
debug(LOG_INFO, "Incomplete header: len=%d", len);
break;
}

memset(&tmux_hdr, 0, sizeof(tmux_hdr));
if (handle_tcp_mux_header(bev, &tmux_hdr, &len, &stream_len) < 0) {
break;
}

cur = get_stream_for_data(&tmux_hdr, stream_len);
if (!cur || len == 0) {
set_cur_stream(cur);
break;
}
}

if (process_stream_data(bev, cur, &len, &stream_len) < 0) {
break;
}

if (len > 0) {
handle_tcp_mux_message(&tmux_hdr, cur);
set_cur_stream(NULL);
}
}
}

static void handle_direct_message(struct bufferevent *bev, void *ctx)
{
struct evbuffer *input = bufferevent_get_input(bev);
int len = evbuffer_get_length(input);
if (len <= 0) {
return;

uint8_t *buf = calloc(len, 1);
if (!buf) {
debug(LOG_ERR, "Failed to allocate buffer");
return;
}

struct common_conf *c_conf = get_common_config();
if (c_conf->tcp_mux) {
static struct tcp_mux_header tmux_hdr;
static uint32_t stream_len = 0;
while (len > 0) {
struct tmux_stream *cur = get_cur_stream();
size_t nr = 0;
if (!cur) {
memset(&tmux_hdr, 0, sizeof(tmux_hdr));
uint8_t *data = (uint8_t *)&tmux_hdr;
if (len < sizeof(tmux_hdr)) {
debug(LOG_INFO, "len [%d] < sizeof tmux_hdr", len);
break;
}
nr = bufferevent_read(bev, data, sizeof(tmux_hdr));
assert(nr == sizeof(tmux_hdr));
assert(validate_tcp_mux_protocol(&tmux_hdr) > 0);
len -= nr;
if (tmux_hdr.type == DATA) {
uint32_t stream_id = ntohl(tmux_hdr.stream_id);
stream_len = ntohl(tmux_hdr.length);
cur = get_stream_by_id(stream_id);
if (!cur) {
debug(LOG_INFO, "cur is NULL stream_id is %d, stream_len is %d len is %d",
stream_id, stream_len, len);
if (stream_len > 0)
cur = &abandon_stream;
else
continue;
}

if (len == 0) {
set_cur_stream(cur);
break;
}
if (len >= stream_len) {
nr = tmux_stream_read(bev, cur, stream_len);
assert(nr == stream_len);
len -= stream_len;
} else {
nr = tmux_stream_read(bev, cur, len);
stream_len -= len;
assert(nr == len);
set_cur_stream(cur);
len -= nr;
break;
}
}
} else {
assert(tmux_hdr.type == DATA);
if (len >= stream_len ) {
nr = tmux_stream_read(bev, cur, stream_len);
assert(nr == stream_len);
len -= stream_len;
} else {
nr = tmux_stream_read(bev, cur, len);
stream_len -= len;
assert(nr == len);
len -= nr;
break;
}
}

if (cur == &abandon_stream) {
debug(LOG_INFO, "abandon stream data ...");
memset(cur , 0, sizeof(abandon_stream));
set_cur_stream(NULL);
continue;
}

switch(tmux_hdr.type) {
case DATA:
case WINDOW_UPDATE:
{
handle_tcp_mux_stream(&tmux_hdr, handle_frps_msg);
break;
}
case PING:
handle_tcp_mux_ping(&tmux_hdr);
break;
case GO_AWAY:
handle_tcp_mux_go_away(&tmux_hdr);
break;
default:
debug(LOG_ERR, "impossible here!!!!");
exit(-1);
}

set_cur_stream(NULL);
}
} else {
uint8_t *buf = calloc(len, 1);
assert(buf);
evbuffer_remove(input, buf, len);
evbuffer_remove(input, buf, len);
handle_frps_msg(buf, len, ctx);
free(buf);
}

handle_frps_msg(buf, len, ctx);
SAFE_FREE(buf);
static void recv_cb(struct bufferevent *bev, void *ctx)
{
struct evbuffer *input = bufferevent_get_input(bev);
int len = evbuffer_get_length(input);
if (len <= 0) {
return;
}


return;
struct common_conf *c_conf = get_common_config();
if (c_conf->tcp_mux) {
handle_tcp_mux(bev, len);
} else {
handle_direct_message(bev, ctx);
}
}

static void handle_connection_failure(struct common_conf *c_conf, int *retry_times) {
Expand Down

0 comments on commit 3f21cc8

Please sign in to comment.