diff --git a/go.mod b/go.mod index e1c9596..d8bf758 100644 --- a/go.mod +++ b/go.mod @@ -10,9 +10,10 @@ retract ( replace github.com/tetratelabs/wazero => github.com/refraction-networking/wazero v1.7.3-w require ( + github.com/gaukas/io v0.0.2 github.com/gaukas/wazerofs v0.1.0 github.com/tetratelabs/wazero v1.7.3 - google.golang.org/protobuf v1.33.0 + google.golang.org/protobuf v1.34.2 ) require github.com/blang/vfs v1.0.0 // indirect diff --git a/go.sum b/go.sum index f18256d..00088fe 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/blang/vfs v1.0.0 h1:AUZUgulCDzbaNjTRWEP45X7m/J10brAptZpSRKRZBZc= github.com/blang/vfs v1.0.0/go.mod h1:jjuNUc/IKcRNNWC9NUCvz4fR9PZLPIKxEygtPs/4tSI= +github.com/gaukas/io v0.0.2 h1:VA+fx5fMM0/OKLmWXoHBlEFCEUoANYbjYJ2965Q59XE= +github.com/gaukas/io v0.0.2/go.mod h1:+AGfTQVkFhTA/FmN4OCk7f0hip/XYEP++WRPqbm2W40= github.com/gaukas/wazerofs v0.1.0 h1:wIkW1bAxSnpaaVkQ5LOb1tm1BXdVap3eKjJpVWIqt2E= github.com/gaukas/wazerofs v0.1.0/go.mod h1:+JECB9Fwt0taPqSgHckG9lmT3tcoVK+9VJozTsq9UlI= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= @@ -8,5 +10,5 @@ github.com/refraction-networking/wazero v1.7.3-w h1:Br3UuVPrKAD3pUSIlpT1+iBIYMbs github.com/refraction-networking/wazero v1.7.3-w/go.mod h1:ytl6Zuh20R/eROuyDaGPkp82O9C/DJfXAwJfQ3X6/7Y= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= diff --git a/internal/socket/README.md b/internal/socket/README.md deleted file mode 100644 index eb910f2..0000000 --- a/internal/socket/README.md +++ /dev/null @@ -1,5 +0,0 @@ -# `socket` - -This package provides some helper function to abuse network sockets and do weird things, including but not limited to: -- Spawning connection pairs -- Wrap a readable/writable interface into a `net.Conn` \ No newline at end of file diff --git a/internal/socket/tcpconn.go b/internal/socket/tcpconn.go deleted file mode 100644 index 185a860..0000000 --- a/internal/socket/tcpconn.go +++ /dev/null @@ -1,179 +0,0 @@ -package socket - -import ( - "context" - "fmt" - "io" - "net" - "os" - "sync" - - "github.com/refraction-networking/water/internal/log" -) - -// TCPConnPair returns a pair of connected net.TCPConn. -func TCPConnPair(address ...string) (c1, c2 *net.TCPConn, err error) { - var addr string = "localhost:0" // use a localhost TCP connection by default - if len(address) > 0 && address[0] != "" { - addr = address[0] - } - - tcpAddr, err := net.ResolveTCPAddr("tcp", addr) - if err != nil { - return nil, nil, fmt.Errorf("net.ResolveTCPAddr returned error: %w", err) - } - - l, err := net.ListenTCP("tcp", tcpAddr) // skipcq: GSC-G102 - if err != nil { - return nil, nil, fmt.Errorf("net.Listen returned error: %w", err) - } - - var wg *sync.WaitGroup = new(sync.WaitGroup) - var goroutineErr error - wg.Add(1) - go func() { - defer wg.Done() - c2, goroutineErr = l.AcceptTCP() - }() - - c1, err = net.DialTCP("tcp", nil, l.Addr().(*net.TCPAddr)) - if err != nil { - return nil, nil, fmt.Errorf("net.Dial returned error: %w", err) - } - wg.Wait() - - if goroutineErr != nil { - return nil, nil, fmt.Errorf("l.Accept returned error: %w", goroutineErr) - } - - if c1 == nil || c2 == nil { - return nil, nil, fmt.Errorf("c1 or c2 is nil") - } - - return c1, c2, l.Close() -} - -// TCPConnWrap wraps an io.Reader/io.Writer/io.Closer -// interface into a TCPConn. -// -// This function spins up goroutine(s) to copy data between the -// ReadWrite(Close)r and the TCPConn. Anything written to the -// TCPConn by caller will be written to the wrapped object if -// the object implements io.Writer, and if the object implements -// io.Reader, anything read by goroutine from the wrapped object -// will be readable from the TCPConn by caller. -// -// Once this function is invoked, the caller should not perform I/O -// operations on the wrapped connection anymore. -// -// The returned context.Context can be used to check if the connection -// is still alive. If the connection is closed, the context will be -// canceled. -func TCPConnWrap(wrapped any) (wrapperConn *net.TCPConn, ctxCancel context.Context, err error) { - // get a pair of connected TCPConn - tcpConn, reverseTCPConn, err := TCPConnPair() - if err != nil && (tcpConn == nil || reverseTCPConn == nil) { // ignore error caused by closing TCP Listener - return nil, nil, err - } - - var cancel context.CancelFunc - ctxCancel, cancel = context.WithCancel(context.Background()) - - reader, readerOk := wrapped.(io.Reader) - writer, writerOk := wrapped.(io.Writer) - var wg *sync.WaitGroup = new(sync.WaitGroup) - if !readerOk && !writerOk { - cancel() - return nil, nil, fmt.Errorf("wrapped does not implement io.Reader nor io.Writer") - } else if readerOk && !writerOk { - // only reader is implemented - log.Debugf("wrapped does not implement io.Writer, skipping copy from wrapped to wrapper") - - wg.Add(1) - go func(wg *sync.WaitGroup) { - defer wg.Done() - _, _ = io.Copy(reverseTCPConn, reader) // unsafe: error is ignored - _ = reverseTCPConn.Close() // unsafe: error is ignored - _ = tcpConn.Close() // unsafe: error is ignored - }(wg) - } else if !readerOk && writerOk { - // only writer is implemented - log.Debugf("wrapped does not implement io.Reader, skipping copy from wrapper to wrapped") - - wg.Add(1) - go func(wg *sync.WaitGroup) { - defer wg.Done() - _, _ = io.Copy(writer, reverseTCPConn) // unsafe: error is ignored - // when the src is closed, we will close the dst (if implements io.Closer) - if closer, ok := wrapped.(io.Closer); ok { - _ = closer.Close() // unsafe: error is ignored - } - }(wg) - } else { - // both reader and writer are implemented - wg.Add(2) - - // copy from wrapped to wrapper - go func(wg *sync.WaitGroup) { - defer wg.Done() - _, _ = io.Copy(reverseTCPConn, reader) // unsafe: error is ignored - _ = reverseTCPConn.Close() // unsafe: error is ignored - _ = tcpConn.Close() // unsafe: error is ignored - }(wg) - - // copy from wrapper to wrapped - go func(wg *sync.WaitGroup) { - defer wg.Done() - _, _ = io.Copy(writer, reverseTCPConn) // unsafe: error is ignored - // when the src is closed, we will close the dst (if implements io.Closer) - if closer, ok := wrapped.(io.Closer); ok { - _ = closer.Close() // unsafe: error is ignored - } - }(wg) - } - - // spawn a goroutine to wait for all copying to finish - go func(wg *sync.WaitGroup) { - wg.Wait() - cancel() - - // close again to make sure we don't forget to close anything - // if io.Reader or io.Writer is not implemented. - - // close the reverseTCPConn - _ = reverseTCPConn.Close() // unsafe: error is ignored - - // close the tcpConn - _ = tcpConn.Close() // unsafe: error is ignored - - // close the wrapped - if closer, ok := wrapped.(io.Closer); ok { - _ = closer.Close() // unsafe: error is ignored - } - }(wg) - - return tcpConn, ctxCancel, nil -} - -// TCPConnFileWrap wraps an object into a *os.File from an -// underlying net.TCPConn. The object must implement io.Reader -// and/or io.Writer. -// -// If the object implements io.Reader, upon completing copying -// the object to the returned *os.File, the callback functions -// will be called. -// -// It is caller's responsibility to close the returned *os.File. -func TCPConnFileWrap(wrapped any) (wrapperFile *os.File, ctxCancel context.Context, err error) { - tcpWrapperConn, ctxCancel, err := TCPConnWrap(wrapped) - if err != nil { - return nil, nil, err - } - - tcpWrapperFile, err := tcpWrapperConn.File() - if err != nil { - return nil, nil, fmt.Errorf("(*net.TCPConn).File returned error: %w", err) - } - - return tcpWrapperFile, ctxCancel, nil -} diff --git a/internal/socket/tcpconn_test.go b/internal/socket/tcpconn_test.go deleted file mode 100644 index 8f6ef93..0000000 --- a/internal/socket/tcpconn_test.go +++ /dev/null @@ -1,38 +0,0 @@ -package socket_test - -import ( - "runtime" - "testing" - "time" - - "github.com/refraction-networking/water/internal/socket" -) - -func TestTCPConnPair(t *testing.T) { - c1, c2, err := socket.TCPConnPair() - if err != nil { - if c1 == nil || c2 == nil { - t.Fatal(err) - } else { // likely due to Close() call errored - t.Logf("socket.TCPConnPair returned non-fatal error: %v", err) - } - } - - runtime.GC() - time.Sleep(100 * time.Microsecond) - - // test c1 -> c2 - err = testIO(c1, c2, 1000, 1024, 0) - if err != nil { - t.Fatal(err) - } - - runtime.GC() - time.Sleep(100 * time.Microsecond) - - // test c2 -> c1 - err = testIO(c2, c1, 1000, 1024, 0) - if err != nil { - t.Fatal(err) - } -} diff --git a/internal/socket/unixconn.go b/internal/socket/unixconn.go deleted file mode 100644 index 0c188c2..0000000 --- a/internal/socket/unixconn.go +++ /dev/null @@ -1,195 +0,0 @@ -package socket - -import ( - "context" - "crypto/rand" - "fmt" - "io" - "net" - "os" - "sync" - - "github.com/refraction-networking/water/internal/log" -) - -// UnixConnPair returns a pair of connected net.UnixConn. -func UnixConnPair(path ...string) (*net.UnixConn, *net.UnixConn, error) { - var c1, c2 net.Conn - - unixPath := "" - if len(path) == 0 || path[0] == "" { - // randomize a socket name - randBytes := make([]byte, 16) - if _, err := rand.Read(randBytes); err != nil { - return nil, nil, fmt.Errorf("crypto/rand.Read returned error: %w", err) - } - unixPath = os.TempDir() + string(os.PathSeparator) + fmt.Sprintf("%x", randBytes) - } else { - unixPath = path[0] - } - - // create a one-time use UnixListener - ul, err := net.Listen("unix", unixPath) - if err != nil { - return nil, nil, fmt.Errorf("net.Listen returned error: %w", err) - } - - var wg *sync.WaitGroup = new(sync.WaitGroup) - var goroutineErr error - wg.Add(1) - go func() { - defer wg.Done() - c2, goroutineErr = ul.Accept() - }() - - // dial the one-time use UnixListener - c1, err = net.Dial("unix", ul.Addr().String()) - if err != nil { - return nil, nil, fmt.Errorf("net.Dial returned error: %w", err) - } - wg.Wait() - - if goroutineErr != nil { - return nil, nil, fmt.Errorf("ul.Accept returned error: %w", goroutineErr) - } - - if c1 == nil || c2 == nil { - return nil, nil, fmt.Errorf("c1 or c2 is nil") - } - - // type assertion - if uc1, ok := c1.(*net.UnixConn); ok { - if uc2, ok := c2.(*net.UnixConn); ok { - return uc1, uc2, ul.Close() - } else { - return nil, nil, fmt.Errorf("c2 is not *net.UnixConn") - } - } else { - return nil, nil, fmt.Errorf("c1 is not *net.UnixConn") - } -} - -// UnixConnWrap wraps an io.Reader/io.Writer/io.Closer -// interface into a UnixConn. -// -// This function spins up goroutine(s) to copy data between the -// ReadWrite(Close)r and the UnixConn. Anything written to the -// UnixConn by caller will be written to the wrapped object if -// the object implements io.Writer, and if the object implements -// io.Reader, anything read by goroutine from the wrapped object -// will be readable from the UnixConn by caller. -// -// Once this function is invoked, the caller should not perform I/O -// operations on the wrapped connection anymore. -// -// The returned context.Context can be used to check if the connection -// is still alive. If the connection is closed, the context will be -// canceled. -func UnixConnWrap(wrapped any) (wrapperConn *net.UnixConn, ctxCancel context.Context, err error) { - // get a pair of connected UnixConn - unixConn, reverseUnixConn, err := UnixConnPair() - if err != nil && (unixConn == nil || reverseUnixConn == nil) { - return nil, nil, err - } - - var cancel context.CancelFunc - ctxCancel, cancel = context.WithCancel(context.Background()) - - reader, readerOk := wrapped.(io.Reader) - writer, writerOk := wrapped.(io.Writer) - var wg *sync.WaitGroup = new(sync.WaitGroup) - if !readerOk && !writerOk { - cancel() - return nil, nil, fmt.Errorf("wrapped does not implement io.Reader nor io.Writer") - } else if readerOk && !writerOk { - // only reader is implemented - log.Debugf("wrapped does not implement io.Writer, skipping copy from wrapped to wrapper") - - wg.Add(1) - go func(wg *sync.WaitGroup) { - defer wg.Done() - _, _ = io.Copy(reverseUnixConn, reader) // unsafe: error is ignored - _ = reverseUnixConn.Close() // unsafe: error is ignored - _ = unixConn.Close() // unsafe: error is ignored - }(wg) - } else if !readerOk && writerOk { - // only writer is implemented - log.Debugf("wrapped does not implement io.Reader, skipping copy from wrapper to wrapped") - - wg.Add(1) - go func(wg *sync.WaitGroup) { - defer wg.Done() - _, _ = io.Copy(writer, reverseUnixConn) // unsafe: error is ignored - // when the src is closed, we will close the dst (if implements io.Closer) - if closer, ok := wrapped.(io.Closer); ok { - _ = closer.Close() // unsafe: error is ignored - } - }(wg) - } else { - // both reader and writer are implemented - wg.Add(2) - - // copy from wrapped to wrapper - go func(wg *sync.WaitGroup) { - defer wg.Done() - _, _ = io.Copy(reverseUnixConn, reader) // unsafe: error is ignored - _ = reverseUnixConn.Close() // unsafe: error is ignored - _ = unixConn.Close() // unsafe: error is ignored - }(wg) - - // copy from wrapper to wrapped - go func(wg *sync.WaitGroup) { - defer wg.Done() - _, _ = io.Copy(writer, reverseUnixConn) // unsafe: error is ignored - // when the src is closed, we will close the dst (if implements io.Closer) - if closer, ok := wrapped.(io.Closer); ok { - _ = closer.Close() // unsafe: error is ignored - } - }(wg) - } - - // spawn a goroutine to wait for all copying to finish - go func(wg *sync.WaitGroup) { - wg.Wait() - cancel() - - // close again to make sure we don't forget to close anything - // if io.Reader or io.Writer is not implemented. - - // close the reverseTCPConn - _ = reverseUnixConn.Close() // unsafe: error is ignored - - // close the tcpConn - _ = unixConn.Close() // unsafe: error is ignored - - // close the wrapped - if closer, ok := wrapped.(io.Closer); ok { - _ = closer.Close() // unsafe: error is ignored - } - }(wg) - - return unixConn, ctxCancel, nil -} - -// UnixConnFileWrap wraps an object into a *os.File from an -// underlying net.UnixConn. The object must implement io.Reader -// and/or io.Writer. -// -// If the object implements io.Reader, upon completing copying -// the object to the returned *os.File, the callback functions -// will be called. -// -// It is caller's responsibility to close the returned *os.File. -func UnixConnFileWrap(wrapped any) (wrapperFile *os.File, ctxCancel context.Context, err error) { - unixWrapperConn, ctxCancel, err := UnixConnWrap(wrapped) - if err != nil { - return nil, nil, err - } - - unixWrapperFile, err := unixWrapperConn.File() - if err != nil { - return nil, nil, fmt.Errorf("(*net.UnixConn).File returned error: %w", err) - } - - return unixWrapperFile, ctxCancel, nil -} diff --git a/internal/socket/unixconn_test.go b/internal/socket/unixconn_test.go deleted file mode 100644 index f91570c..0000000 --- a/internal/socket/unixconn_test.go +++ /dev/null @@ -1,67 +0,0 @@ -package socket_test - -import ( - "crypto/rand" - "fmt" - "net" - "runtime" - "testing" - "time" - - "github.com/refraction-networking/water/internal/socket" -) - -func TestUnixConnPair(t *testing.T) { - c1, c2, err := socket.UnixConnPair() - if err != nil { - if c1 == nil || c2 == nil { - t.Fatal(err) - } else { // likely due to Close() call errored - t.Logf("socket.UnixConnPair returned non-fatal error: %v", err) - } - } - - runtime.GC() - time.Sleep(100 * time.Microsecond) - - // test c1 -> c2 - err = testIO(c1, c2, 1000, 1024, 0) - if err != nil { - t.Fatal(err) - } - - runtime.GC() - time.Sleep(100 * time.Microsecond) - - // test c2 -> c1 - err = testIO(c2, c1, 1000, 1024, 0) - if err != nil { - t.Fatal(err) - } -} - -func testIO(wrConn, rdConn net.Conn, N int, sz int, sleep time.Duration) error { - var sendMsg []byte = make([]byte, sz) - _, err := rand.Read(sendMsg) - if err != nil { - return fmt.Errorf("rand.Read error: %w", err) - } - - for i := 0; i < N; i++ { - _, err = wrConn.Write(sendMsg) - if err != nil { - return fmt.Errorf("Write error: %w, cntr: %d, N: %d", err, i, N) - } - - // receive data - buf := make([]byte, 1024) - _, err = rdConn.Read(buf) - if err != nil { - return fmt.Errorf("Read error: %w, cntr: %d, N: %d", err, i, N) - } - - time.Sleep(sleep) - } - - return nil -} diff --git a/transport/v0/conn.go b/transport/v0/conn.go index 167d705..4731fc8 100644 --- a/transport/v0/conn.go +++ b/transport/v0/conn.go @@ -11,7 +11,8 @@ import ( "github.com/refraction-networking/water" "github.com/refraction-networking/water/internal/log" - "github.com/refraction-networking/water/internal/socket" + + "github.com/gaukas/io/pipe" ) // Conn is the first experimental version of Conn implementation. @@ -55,13 +56,12 @@ func dial(core water.Core, network, address string) (c water.Conn, err error) { return nil, err } - reverseCallerConn, callerConn, err := socket.TCPConnPair() - // wasmCallerConn, conn.uoConn, err = socket.TCPConnPair() + reverseCallerConn, callerConn, err := pipe.TCPPipe(nil) if err != nil { if reverseCallerConn == nil || callerConn == nil { - return nil, fmt.Errorf("water: socket.TCPConnPair returned error: %w", err) + return nil, fmt.Errorf("water: pipe.TCPPipe returned error: %w", err) } else { // likely due to Close() call errored - log.LErrorf(core.Logger(), "water: socket.TCPConnPair returned error: %v", err) + log.LErrorf(core.Logger(), "water: pipe.TCPPipe returned error: %v", err) } } conn.callerConn = callerConn @@ -99,15 +99,15 @@ func accept(core water.Core) (c water.Conn, err error) { return nil, err } - reverseCallerConn, callerConn, err := socket.TCPConnPair() + reverseCallerConn, callerConn, err := pipe.TCPPipe(nil) if err != nil { if reverseCallerConn == nil || callerConn == nil { - return nil, fmt.Errorf("water: socket.TCPConnPair returned error: %w", err) + return nil, fmt.Errorf("water: pipe.TCPPipe returned error: %w", err) } else { // likely due to Close() call errored - log.LErrorf(core.Logger(), "water: socket.TCPConnPair returned error: %v", err) + log.LErrorf(core.Logger(), "water: pipe.TCPPipe returned error: %v", err) } } else if reverseCallerConn == nil || callerConn == nil { - return nil, errors.New("water: socket.TCPConnPair returned nil") + return nil, errors.New("water: pipe.TCPPipe returned nil") } conn.callerConn = callerConn diff --git a/transport/v0/transport_module.go b/transport/v0/transport_module.go index a9546a7..2f92921 100644 --- a/transport/v0/transport_module.go +++ b/transport/v0/transport_module.go @@ -8,9 +8,9 @@ import ( "sync" "syscall" + "github.com/gaukas/io/pipe" "github.com/refraction-networking/water" "github.com/refraction-networking/water/internal/log" - "github.com/refraction-networking/water/internal/socket" "github.com/refraction-networking/water/internal/wasip1" "github.com/tetratelabs/wazero/api" ) @@ -628,7 +628,7 @@ func (tm *TransportModule) Worker() error { } // create cancel pipe - cancelConnR, cancelConnW, err := socket.TCPConnPair() + cancelConnR, cancelConnW, err := pipe.TCPPipe(nil) if err != nil { return fmt.Errorf("water: creating cancel pipe failed: %w", err) } diff --git a/transport/v1/conn.go b/transport/v1/conn.go index 3cbe210..7864754 100644 --- a/transport/v1/conn.go +++ b/transport/v1/conn.go @@ -9,9 +9,9 @@ import ( "sync/atomic" "time" + "github.com/gaukas/io/pipe" "github.com/refraction-networking/water" "github.com/refraction-networking/water/internal/log" - "github.com/refraction-networking/water/internal/socket" ) // Conn is the first experimental version of Conn implementation. @@ -57,8 +57,7 @@ func dialFixed(core water.Core) (c water.Conn, err error) { return nil, err } - reverseCallerConn, callerConn, err := socket.TCPConnPair() - // wasmCallerConn, conn.uoConn, err = socket.TCPConnPair() + reverseCallerConn, callerConn, err := pipe.TCPPipe(nil) if err != nil { if reverseCallerConn == nil || callerConn == nil { return nil, fmt.Errorf("water: socket.TCPConnPair returned error: %w", err) @@ -111,13 +110,12 @@ func dial(core water.Core, network, address string) (c water.Conn, err error) { return nil, err } - reverseCallerConn, callerConn, err := socket.TCPConnPair() - // wasmCallerConn, conn.uoConn, err = socket.TCPConnPair() + reverseCallerConn, callerConn, err := pipe.TCPPipe(nil) if err != nil { if reverseCallerConn == nil || callerConn == nil { - return nil, fmt.Errorf("water: socket.TCPConnPair returned error: %w", err) + return nil, fmt.Errorf("water: pipe.TCPPipe returned error: %w", err) } else { // likely due to Close() call errored - log.LErrorf(core.Logger(), "water: socket.TCPConnPair returned error: %v", err) + log.LErrorf(core.Logger(), "water: pipe.TCPPipe returned error: %v", err) } } conn.callerConn = callerConn @@ -155,7 +153,7 @@ func accept(core water.Core) (c water.Conn, err error) { return nil, err } - reverseCallerConn, callerConn, err := socket.TCPConnPair() + reverseCallerConn, callerConn, err := pipe.TCPPipe(nil) if err != nil { if reverseCallerConn == nil || callerConn == nil { return nil, fmt.Errorf("water: socket.TCPConnPair returned error: %w", err) diff --git a/transport/v1/transport_module.go b/transport/v1/transport_module.go index 738390d..bfe52a4 100644 --- a/transport/v1/transport_module.go +++ b/transport/v1/transport_module.go @@ -10,9 +10,9 @@ import ( "syscall" "time" + "github.com/gaukas/io/pipe" "github.com/refraction-networking/water" "github.com/refraction-networking/water/internal/log" - "github.com/refraction-networking/water/internal/socket" "github.com/refraction-networking/water/internal/wasip1" "github.com/tetratelabs/wazero/api" ) @@ -773,7 +773,7 @@ func (tm *TransportModule) StartWorker() error { } // create control pipe connection pair - ctrlConnR, ctrlConnW, err := socket.TCPConnPair() + ctrlConnR, ctrlConnW, err := pipe.TCPPipe(nil) if err != nil { return fmt.Errorf("water: creating cancel pipe failed: %w", err) }