Skip to content

Commit

Permalink
feat: add tcp proxy (#28)
Browse files Browse the repository at this point in the history
* feat: add tcp proxy

* fix ci

* add listen log

* add docs

* typo
  • Loading branch information
jiacai2050 authored Aug 31, 2024
1 parent b8c95be commit 94cc8f5
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
zig-version: [master, 0.13.0]
zig-version: [0.13.0]
steps:
- uses: actions/checkout@v4
with:
Expand Down
6 changes: 6 additions & 0 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ fn buildBinaries(
"night-shift",
"dark-mode",
"repeat",
"tcp-proxy",
}) |name| {
try buildBinary(b, .{ .bin = name }, optimize, target, is_ci, all_tests);
}
Expand Down Expand Up @@ -168,6 +169,11 @@ fn makeCompileStep(
) ?*Build.Step.Compile {
const name = comptime source.name();
const path = comptime source.path();
// 0.13.0\x64\lib\std\net.zig:756:5: error: std.net.if_nametoindex unimplemented for this OS
if (std.mem.eql(u8, name, "tcp-proxy") and target.result.os.tag == .windows) {
return null;
}

if (std.mem.eql(u8, name, "night-shift") or std.mem.eql(u8, name, "dark-mode") or std.mem.eql(u8, name, "pidof")) {
// if (target.getOsTag() != .macos) {
if (is_ci) {
Expand Down
33 changes: 33 additions & 0 deletions docs/content/programs/tcp-proxy.org
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#+TITLE: tcp-proxy
#+DATE: 2024-09-01T00:02:43+0800
#+LASTMOD: 2024-09-01T00:14:17+0800
#+TYPE: docs
#+DESCRIPTION: Forward TCP requests hitting a specified port on the localhost to a different port on another host

Both IPv4 and IPv6 are supported.

#+begin_src bash :results verbatim :exports result :dir ../../..
./zig-out/bin/tcp-proxy -h
#+end_src

#+RESULTS:
#+begin_example
USAGE:
./zig-out/bin/tcp-proxy [OPTIONS]

OPTIONS:
-b, --bind_address STRING Local bind address(required)
-p, --local_port INTEGER Local bind port(required)
-H, --remote_host STRING Remote host(required)
-P, --remote_port INTEGER Remote port(required)
--buf_size INTEGER Buffer size for tcp read/write(default: 1024)
--thread_pool_size INTEGER (default: 24)
-h, --help
-v, --version
--verbose
#+end_example

#+begin_src bash
tcp-proxy -b 0.0.0.0 -p 8082 -H 192.168.0.2 -P 8082
#+end_src
This will forward tcp requests from =localhost:8082= to =192.168.0.2:8082=
3 changes: 2 additions & 1 deletion docs/hugo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ showLightDarkModeMenu = true
[markup.goldmark.renderer]
unsafe = true
[markup.highlight]
style = 'tango'
style = "pygments"
linenos = true
172 changes: 172 additions & 0 deletions src/bin/tcp-proxy.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
const std = @import("std");
const simargs = @import("simargs");
const util = @import("util.zig");
const process = std.process;
const fs = std.fs;
const net = std.net;
const mem = std.mem;

var verbose: bool = false;

fn debugPrint(
comptime format: []const u8,
args: anytype,
) void {
if (verbose) {
std.debug.print(format, args);
}
}

pub fn main() !void {
var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
defer arena.deinit();
const allocator = arena.allocator();

const opt = try simargs.parse(allocator, struct {
bind_host: []const u8,
local_port: u16,
remote_host: []const u8,
remote_port: u16,
buf_size: usize = 1024,
thread_pool_size: u32 = 24,
help: bool = false,
version: bool = false,
verbose: bool = false,

pub const __shorts__ = .{
.bind_host = .b,
.local_port = .p,
.remote_host = .H,
.remote_port = .P,
.help = .h,
.version = .v,
};

pub const __messages__ = .{
.bind_host = "Local bind host",
.local_port = "Local bind port",
.remote_host = "Remote host",
.remote_port = "Remote port",
.buf_size = "Buffer size for tcp read/write",
};
}, null, util.get_build_info());

verbose = opt.args.verbose;

const bind_addr = try net.Address.resolveIp(opt.args.bind_host, opt.args.local_port);
const remote_addr = try net.Address.resolveIp(opt.args.remote_host, opt.args.remote_port);
var server = try bind_addr.listen(.{
.kernel_backlog = 128,
.reuse_address = true,
});
std.log.info("Tcp proxy listen on {any}", .{bind_addr});

var pool = try allocator.create(std.Thread.Pool);
defer pool.deinit();

try pool.init(.{
.allocator = allocator,
.n_jobs = opt.args.thread_pool_size,
});
while (true) {
const client = try server.accept();
debugPrint("Got new connection, addr:{any}\n", .{client.address});

const proxy = Proxy.init(allocator, client, remote_addr) catch |e| {
std.log.err("Init proxy failed, remote:{any}, err:{any}", .{ remote_addr, e });
client.stream.close();
continue;
};
proxy.nonblockingCommunicate(pool, opt.args.buf_size) catch |e| {
proxy.deinit();
std.log.err("Communicate, remote:{any}, err:{any}", .{ remote_addr, e });
};
}
}

const Proxy = struct {
conn: net.Server.Connection,
remote_conn: net.Stream,
remote_addr: net.Address,
allocator: mem.Allocator,

pub fn init(allocator: mem.Allocator, conn: net.Server.Connection, remote: net.Address) !Proxy {
const remote_conn = try net.tcpConnectToAddress(remote);
return .{
.allocator = allocator,
.conn = conn,
.remote_conn = remote_conn,
.remote_addr = remote,
};
}

fn copyStreamNoError(
allocator: mem.Allocator,
src: net.Stream,
dst: net.Stream,
buf_size: usize,
) void {
Proxy.copyStream(allocator, src, dst, buf_size) catch |e|
switch (e) {
error.NotOpenForReading => {},
else => {
std.log.err("copy stream error: {any}\n", .{e});
},
};
}

fn copyStream(
allocator: mem.Allocator,
src: net.Stream,
dst: net.Stream,
buf_size: usize,
) !void {
var buf = try allocator.alloc(u8, buf_size);
defer allocator.free(buf);

var read = try src.read(buf);
while (read > 0) : (read = try src.read(buf)) {
_ = try dst.writeAll(buf[0..read]);
}
}

pub fn nonblockingCommunicate(
self: Proxy,
pool: *std.Thread.Pool,
buf_size: usize,
) !void {
try pool.spawn(struct {
fn run(
proxy: Proxy,
pool_inner: *std.Thread.Pool,
buf_size_inner: usize,
) void {
var wg = std.Thread.WaitGroup{};
defer {
wg.wait();
proxy.deinit();
}

// When conn.stream is closed, we close this proxy.
pool_inner.spawnWg(&wg, Proxy.copyStreamNoError, .{
proxy.allocator,
proxy.conn.stream,
proxy.remote_conn,
buf_size_inner,
});
pool_inner.spawn(Proxy.copyStreamNoError, .{
proxy.allocator,
proxy.remote_conn,
proxy.conn.stream,
buf_size_inner,
}) catch unreachable;
}
}.run, .{ self, pool, buf_size });
}

fn deinit(self: Proxy) void {
debugPrint("Close proxy, src:{any}\n", .{self.conn.address});
self.conn.stream.close();
self.remote_conn.close();
}
};

0 comments on commit 94cc8f5

Please sign in to comment.