From 86bea501b0b807c8d16140dcbe1ea962fd1d8916 Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Wed, 13 Dec 2023 16:42:38 +0100 Subject: [PATCH 1/4] fix: make sure we can parse integers from JSON (#1435) This commit ensures we can parse integer values provided using OONI Run v2 descriptors into actual integers. Values are originally parsed as float64, so we need to add a specific conversion case for that scenario. Diff extracted from: https://github.com/ooni/probe-cli/pull/1423 Closes https://github.com/ooni/probe/issues/2645 --- internal/cmd/miniooni/oonirun.go | 2 ++ internal/registry/factory.go | 19 ++++++++++++++ internal/registry/factory_test.go | 43 +++++++++++++++++++++++++++++++ 3 files changed, 64 insertions(+) diff --git a/internal/cmd/miniooni/oonirun.go b/internal/cmd/miniooni/oonirun.go index 420ddcc0b..ce41e58fc 100644 --- a/internal/cmd/miniooni/oonirun.go +++ b/internal/cmd/miniooni/oonirun.go @@ -53,6 +53,8 @@ func ooniRunMain(ctx context.Context, logger.Warnf("oonirun: parsing OONI Run v2 descriptor failed: %s", err.Error()) continue } + logger.Infof("oonirun: running '%s'", descr.Name) + logger.Infof("oonirun: link authored by '%s'", descr.Author) if err := oonirun.V2MeasureDescriptor(ctx, cfg, &descr); err != nil { logger.Warnf("oonirun: running link failed: %s", err.Error()) continue diff --git a/internal/registry/factory.go b/internal/registry/factory.go index 2a704740e..937c22e97 100644 --- a/internal/registry/factory.go +++ b/internal/registry/factory.go @@ -7,6 +7,7 @@ package registry import ( "errors" "fmt" + "math" "os" "reflect" "strconv" @@ -107,6 +108,12 @@ func (b *Factory) setOptionBool(field reflect.Value, value any) error { } } +// With JSON we're limited by the 52 bits in the mantissa +const ( + jsonMaxInteger = 1<<53 - 1 + jsonMinInteger = -1<<53 + 1 +) + // setOptionInt sets an int option func (b *Factory) setOptionInt(field reflect.Value, value any) error { switch v := value.(type) { @@ -132,6 +139,18 @@ func (b *Factory) setOptionInt(field reflect.Value, value any) error { } field.SetInt(number) return nil + case float64: + if math.IsNaN(v) || math.IsInf(v, 0) { + return fmt.Errorf("%w from: %v", ErrCannotSetIntegerOption, value) + } + if math.Trunc(v) != v { + return fmt.Errorf("%w from: %v", ErrCannotSetIntegerOption, value) + } + if v > jsonMaxInteger || v < jsonMinInteger { + return fmt.Errorf("%w from: %v", ErrCannotSetIntegerOption, value) + } + field.SetInt(int64(v)) + return nil default: return fmt.Errorf("%w from a value of type %T", ErrCannotSetIntegerOption, value) } diff --git a/internal/registry/factory_test.go b/internal/registry/factory_test.go index 49f11338a..b407f074e 100644 --- a/internal/registry/factory_test.go +++ b/internal/registry/factory_test.go @@ -3,6 +3,7 @@ package registry import ( "errors" "fmt" + "math" "os" "testing" @@ -251,6 +252,48 @@ func TestExperimentBuilderSetOptionAny(t *testing.T) { FieldValue: make(chan any), ExpectErr: ErrCannotSetIntegerOption, ExpectConfig: &fakeExperimentConfig{}, + }, { + TestCaseName: "[int] for NaN", + InitialConfig: &fakeExperimentConfig{}, + FieldName: "Value", + FieldValue: math.NaN(), + ExpectErr: ErrCannotSetIntegerOption, + ExpectConfig: &fakeExperimentConfig{}, + }, { + TestCaseName: "[int] for +Inf", + InitialConfig: &fakeExperimentConfig{}, + FieldName: "Value", + FieldValue: math.Inf(1), + ExpectErr: ErrCannotSetIntegerOption, + ExpectConfig: &fakeExperimentConfig{}, + }, { + TestCaseName: "[int] for -Inf", + InitialConfig: &fakeExperimentConfig{}, + FieldName: "Value", + FieldValue: math.Inf(-1), + ExpectErr: ErrCannotSetIntegerOption, + ExpectConfig: &fakeExperimentConfig{}, + }, { + TestCaseName: "[int] for too large value", + InitialConfig: &fakeExperimentConfig{}, + FieldName: "Value", + FieldValue: float64(jsonMaxInteger + 1), + ExpectErr: ErrCannotSetIntegerOption, + ExpectConfig: &fakeExperimentConfig{}, + }, { + TestCaseName: "[int] for too small value", + InitialConfig: &fakeExperimentConfig{}, + FieldName: "Value", + FieldValue: float64(jsonMinInteger - 1), + ExpectErr: ErrCannotSetIntegerOption, + ExpectConfig: &fakeExperimentConfig{}, + }, { + TestCaseName: "[int] for float64 with nonzero fractional value", + InitialConfig: &fakeExperimentConfig{}, + FieldName: "Value", + FieldValue: 1.11, + ExpectErr: ErrCannotSetIntegerOption, + ExpectConfig: &fakeExperimentConfig{}, }, { TestCaseName: "[string] for serialized bool value while setting a string value", InitialConfig: &fakeExperimentConfig{}, From fdce52c9dcfa8fa111ae33b2cda95888e7a40dc2 Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Wed, 13 Dec 2023 17:16:06 +0100 Subject: [PATCH 2/4] cleanup(miniooni): rename kvstore2 to engine (#1436) Noticed when preparing the SplinterCon demo. We should have the same name for the engine state directory for miniooni and ooniprobe. Otherwise, we need to tell people about this difference. Extracted from: https://github.com/ooni/probe-cli/pull/1423 Closes: https://github.com/ooni/probe/issues/2646 --- internal/cmd/miniooni/session.go | 10 +- internal/legacy/kvstore2dir/kvstore2dir.go | 34 ++++++ .../legacy/kvstore2dir/kvstore2dir_test.go | 102 ++++++++++++++++++ 3 files changed, 143 insertions(+), 3 deletions(-) create mode 100644 internal/legacy/kvstore2dir/kvstore2dir.go create mode 100644 internal/legacy/kvstore2dir/kvstore2dir_test.go diff --git a/internal/cmd/miniooni/session.go b/internal/cmd/miniooni/session.go index a56262c02..e02fff21e 100644 --- a/internal/cmd/miniooni/session.go +++ b/internal/cmd/miniooni/session.go @@ -9,6 +9,7 @@ import ( "github.com/apex/log" "github.com/ooni/probe-cli/v3/internal/engine" "github.com/ooni/probe-cli/v3/internal/kvstore" + "github.com/ooni/probe-cli/v3/internal/legacy/kvstore2dir" "github.com/ooni/probe-cli/v3/internal/model" "github.com/ooni/probe-cli/v3/internal/runtimex" "github.com/ooni/probe-cli/v3/internal/version" @@ -27,9 +28,12 @@ func newSessionOrPanic(ctx context.Context, currentOptions *Options, proxyURL = mustParseURL(currentOptions.Proxy) } - kvstore2dir := filepath.Join(miniooniDir, "kvstore2") - kvstore, err := kvstore.NewFS(kvstore2dir) - runtimex.PanicOnError(err, "cannot create kvstore2 directory") + // We renamed kvstore2 to engine in the 3.20 development cycle + _ = kvstore2dir.Move(miniooniDir) + + enginedir := filepath.Join(miniooniDir, "engine") + kvstore, err := kvstore.NewFS(enginedir) + runtimex.PanicOnError(err, "cannot create engine directory") tunnelDir := filepath.Join(miniooniDir, "tunnel") err = os.MkdirAll(tunnelDir, 0700) diff --git a/internal/legacy/kvstore2dir/kvstore2dir.go b/internal/legacy/kvstore2dir/kvstore2dir.go new file mode 100644 index 000000000..ef671a939 --- /dev/null +++ b/internal/legacy/kvstore2dir/kvstore2dir.go @@ -0,0 +1,34 @@ +// Package kvstore2dir migrates $OONI_HOME/kvstore2 to $OONI_HOME/engine. This ensures +// that miniooni and ooniprobe use the same directory name for the engine state. +package kvstore2dir + +import ( + "os" + "path/filepath" +) + +type statBuf interface { + IsDir() bool +} + +func simplifiedStat(path string) (statBuf, error) { + return os.Stat(path) +} + +var ( + osStat = simplifiedStat + osRename = os.Rename +) + +// Move moves $OONI_HOME/kvstore2 to $OONI_HOME/engine, if possible. +func Move(dir string) error { + kvstore2dir := filepath.Join(dir, "kvstore2") + if stat, err := osStat(kvstore2dir); err != nil || !stat.IsDir() { + return nil + } + enginedir := filepath.Join(dir, "engine") + if _, err := osStat(enginedir); err == nil { + return nil + } + return osRename(kvstore2dir, enginedir) +} diff --git a/internal/legacy/kvstore2dir/kvstore2dir_test.go b/internal/legacy/kvstore2dir/kvstore2dir_test.go new file mode 100644 index 000000000..03f6b1276 --- /dev/null +++ b/internal/legacy/kvstore2dir/kvstore2dir_test.go @@ -0,0 +1,102 @@ +package kvstore2dir + +import ( + "errors" + "io" + "os" + "path/filepath" + "testing" +) + +type booleanStatBuf bool + +var _ statBuf = booleanStatBuf(true) + +// IsDir implements statBuf. +func (v booleanStatBuf) IsDir() bool { + return bool(v) +} + +func TestMove(t *testing.T) { + // testcase is a test case implemented by this function + type testcase struct { + name string + osStat func(name string) (statBuf, error) + osRename func(oldpath string, newpath string) error + expect error + } + + cases := []testcase{{ + name: "when we cannot stat kvstore2", + osStat: func(name string) (statBuf, error) { + return nil, io.EOF + }, + osRename: func(oldpath string, newpath string) error { + panic("should not be called") + }, + expect: nil, + }, { + name: "when kvstore2 is not a directory", + osStat: func(name string) (statBuf, error) { + return booleanStatBuf(false), nil + }, + osRename: func(oldpath string, newpath string) error { + panic("should not be called") + }, + expect: nil, + }, { + name: "when we can find kvstore2 as a dir and engine", + osStat: func(name string) (statBuf, error) { + if name == filepath.Join("xo", "kvstore2") { + return booleanStatBuf(true), nil + } + return booleanStatBuf(true), nil + }, + osRename: func(oldpath string, newpath string) error { + panic("should not be called") + }, + expect: nil, + }, { + name: "when we can find kvstore2 as a dir without engine", + osStat: func(name string) (statBuf, error) { + if name == filepath.Join("xo", "kvstore2") { + return booleanStatBuf(true), nil + } + return nil, io.EOF + }, + osRename: func(oldpath string, newpath string) error { + return nil + }, + expect: nil, + }} + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + // override and restore functions + osStat = tc.osStat + osRename = tc.osRename + defer func() { + osStat = simplifiedStat + osRename = os.Rename + }() + + // invoke Move + err := Move("xo") + + // check the result + if !errors.Is(err, tc.expect) { + t.Fatal("expected", tc.expect, "got", err) + } + }) + } +} + +func TestSimplifiedStat(t *testing.T) { + buf, err := simplifiedStat("kvstore2dir.go") + if err != nil { + t.Fatal(err) + } + if buf.IsDir() { + t.Fatal("expected not dir") + } +} From 7a2c9fd3929033740fa31121e9138ca5b8f28a6a Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Wed, 13 Dec 2023 18:07:12 +0100 Subject: [PATCH 3/4] feat(miniooni): minimal JavaScript-ing capabilities (#1437) This diff implements part of my SplinterCon JavaScript demo (https://github.com/ooni/probe-cli/pull/1423). We only include minimal functionality here. The bare minimum to be useful when doing research. Closes https://github.com/ooni/probe/issues/2647. Because this is experimental code and there's no commitment to productionize it, I have decided to create a specific subpackage `internal/x` that should host our in-tree experiments. (I will also mode `dslx` in there.) --- go.mod | 4 + go.sum | 25 +++ internal/cmd/miniooni/javascript.go | 40 ++++ internal/cmd/miniooni/main.go | 1 + internal/x/doc.go | 2 + internal/x/dslengine/measurexlite.go | 36 +++ internal/x/dslengine/minimal.go | 184 ++++++++++++++++ internal/x/dslengine/options.go | 67 ++++++ internal/x/dsljavascript/consolemodule.go | 44 ++++ internal/x/dsljavascript/doc.go | 2 + internal/x/dsljavascript/golangmodule.go | 21 ++ internal/x/dsljavascript/oonimodule.go | 53 +++++ internal/x/dsljavascript/vm.go | 213 ++++++++++++++++++ internal/x/dsljson/dedupaddrs.go | 43 ++++ internal/x/dsljson/dnslookupudp.go | 40 ++++ internal/x/dsljson/doc.go | 2 + internal/x/dsljson/drop.go | 71 ++++++ internal/x/dsljson/getaddrinfo.go | 38 ++++ internal/x/dsljson/http.go | 97 ++++++++ internal/x/dsljson/loader.go | 98 +++++++++ internal/x/dsljson/makeendpoints.go | 44 ++++ internal/x/dsljson/quic.go | 65 ++++++ internal/x/dsljson/register.go | 37 ++++ internal/x/dsljson/rootnode.go | 6 + internal/x/dsljson/run.go | 20 ++ internal/x/dsljson/stagenode.go | 9 + internal/x/dsljson/taken.go | 83 +++++++ internal/x/dsljson/tcp.go | 44 ++++ internal/x/dsljson/teeaddrs.go | 43 ++++ internal/x/dsljson/tls.go | 63 ++++++ internal/x/dslvm/closer.go | 8 + internal/x/dslvm/dedupaddrs.go | 77 +++++++ internal/x/dslvm/dnslookupudp.go | 83 +++++++ internal/x/dslvm/doc.go | 2 + internal/x/dslvm/done.go | 4 + internal/x/dslvm/drop.go | 36 +++ internal/x/dslvm/getaddrinfo.go | 74 +++++++ internal/x/dslvm/http.go | 257 ++++++++++++++++++++++ internal/x/dslvm/makeendpoints.go | 30 +++ internal/x/dslvm/observations.go | 54 +++++ internal/x/dslvm/quic.go | 172 +++++++++++++++ internal/x/dslvm/runtime.go | 49 +++++ internal/x/dslvm/semaphore.go | 62 ++++++ internal/x/dslvm/stage.go | 8 + internal/x/dslvm/start.go | 10 + internal/x/dslvm/taken.go | 38 ++++ internal/x/dslvm/tcp.go | 148 +++++++++++++ internal/x/dslvm/teeaddrs.go | 56 +++++ internal/x/dslvm/tls.go | 171 ++++++++++++++ internal/x/dslvm/trace.go | 74 +++++++ internal/x/dslvm/wait.go | 18 ++ 51 files changed, 2926 insertions(+) create mode 100644 internal/cmd/miniooni/javascript.go create mode 100644 internal/x/doc.go create mode 100644 internal/x/dslengine/measurexlite.go create mode 100644 internal/x/dslengine/minimal.go create mode 100644 internal/x/dslengine/options.go create mode 100644 internal/x/dsljavascript/consolemodule.go create mode 100644 internal/x/dsljavascript/doc.go create mode 100644 internal/x/dsljavascript/golangmodule.go create mode 100644 internal/x/dsljavascript/oonimodule.go create mode 100644 internal/x/dsljavascript/vm.go create mode 100644 internal/x/dsljson/dedupaddrs.go create mode 100644 internal/x/dsljson/dnslookupudp.go create mode 100644 internal/x/dsljson/doc.go create mode 100644 internal/x/dsljson/drop.go create mode 100644 internal/x/dsljson/getaddrinfo.go create mode 100644 internal/x/dsljson/http.go create mode 100644 internal/x/dsljson/loader.go create mode 100644 internal/x/dsljson/makeendpoints.go create mode 100644 internal/x/dsljson/quic.go create mode 100644 internal/x/dsljson/register.go create mode 100644 internal/x/dsljson/rootnode.go create mode 100644 internal/x/dsljson/run.go create mode 100644 internal/x/dsljson/stagenode.go create mode 100644 internal/x/dsljson/taken.go create mode 100644 internal/x/dsljson/tcp.go create mode 100644 internal/x/dsljson/teeaddrs.go create mode 100644 internal/x/dsljson/tls.go create mode 100644 internal/x/dslvm/closer.go create mode 100644 internal/x/dslvm/dedupaddrs.go create mode 100644 internal/x/dslvm/dnslookupudp.go create mode 100644 internal/x/dslvm/doc.go create mode 100644 internal/x/dslvm/done.go create mode 100644 internal/x/dslvm/drop.go create mode 100644 internal/x/dslvm/getaddrinfo.go create mode 100644 internal/x/dslvm/http.go create mode 100644 internal/x/dslvm/makeendpoints.go create mode 100644 internal/x/dslvm/observations.go create mode 100644 internal/x/dslvm/quic.go create mode 100644 internal/x/dslvm/runtime.go create mode 100644 internal/x/dslvm/semaphore.go create mode 100644 internal/x/dslvm/stage.go create mode 100644 internal/x/dslvm/start.go create mode 100644 internal/x/dslvm/taken.go create mode 100644 internal/x/dslvm/tcp.go create mode 100644 internal/x/dslvm/teeaddrs.go create mode 100644 internal/x/dslvm/tls.go create mode 100644 internal/x/dslvm/trace.go create mode 100644 internal/x/dslvm/wait.go diff --git a/go.mod b/go.mod index 46387fdf3..d2c324d45 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,8 @@ require ( github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 github.com/cloudflare/circl v1.3.6 github.com/cretz/bine v0.2.0 + github.com/dop251/goja v0.0.0-20231027120936-b396bb4c349d + github.com/dop251/goja_nodejs v0.0.0-20231122114759-e84d9a924c5c github.com/fatih/color v1.16.0 github.com/google/go-cmp v0.6.0 github.com/google/gopacket v1.1.19 @@ -54,8 +56,10 @@ require ( github.com/Psiphon-Labs/tls-tris v0.0.0-20230824155421-58bf6d336a9a // indirect github.com/andybalholm/brotli v1.0.6 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dlclark/regexp2 v1.10.0 // indirect github.com/flynn/noise v1.0.1 // indirect github.com/gaukas/godicttls v0.0.4 // indirect + github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect github.com/google/btree v1.1.2 // indirect github.com/google/pprof v0.0.0-20231212022811-ec68065c825e // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect diff --git a/go.sum b/go.sum index c76756ad8..f640cf111 100644 --- a/go.sum +++ b/go.sum @@ -65,6 +65,9 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE= github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ= +github.com/chzyer/logex v1.2.0/go.mod h1:9+9sk7u7pGNWYMkh0hdiL++6OeibzJccyQU4p4MedaY= +github.com/chzyer/readline v1.5.0/go.mod h1:x22KAscuvRqlLoK9CsoYsmxoXZMMFVyOl86cAH8qUic= +github.com/chzyer/test v0.0.0-20210722231415-061457976a23/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudflare/circl v1.3.6 h1:/xbKIqSHbZXHwkhbrhrt2YOHIwYJlXH94E3tI/gDlUg= github.com/cloudflare/circl v1.3.6/go.mod h1:5XYMA4rFBvNIrhs50XuiBJ15vF2pZn4nnUKZrLbUZFA= @@ -80,6 +83,7 @@ github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7 github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.17 h1:QeVUsEDNrLBW4tMgZHvxy18sKtr6VI492kBhUfhDJNI= github.com/creack/pty v1.1.17/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/cretz/bine v0.2.0 h1:8GiDRGlTgz+o8H9DSnsl+5MeBK4HsExxgl6WgzOCuZo= @@ -99,7 +103,18 @@ github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkz github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc= +github.com/dlclark/regexp2 v1.7.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= +github.com/dlclark/regexp2 v1.10.0 h1:+/GIL799phkJqYW+3YbOd8LCcbHzT0Pbo8zl70MHsq0= +github.com/dlclark/regexp2 v1.10.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= +github.com/dop251/goja v0.0.0-20211022113120-dc8c55024d06/go.mod h1:R9ET47fwRVRPZnOGvHxxhuZcbrMCuiqOz3Rlrh4KSnk= +github.com/dop251/goja v0.0.0-20231027120936-b396bb4c349d h1:wi6jN5LVt/ljaBG4ue79Ekzb12QfJ52L9Q98tl8SWhw= +github.com/dop251/goja v0.0.0-20231027120936-b396bb4c349d/go.mod h1:QMWlm50DNe14hD7t24KEqZuUdC9sOTy8W6XbCU1mlw4= +github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7/go.mod h1:hn7BA7c8pLvoGndExHudxTDKZ84Pyvv+90pbBjbTz0Y= +github.com/dop251/goja_nodejs v0.0.0-20211022123610-8dd9abb0616d/go.mod h1:DngW8aVqWbuLRMHItjPUyqdj+HWPvnQe8V8y1nDpIbM= +github.com/dop251/goja_nodejs v0.0.0-20231122114759-e84d9a924c5c h1:hLoodLRD4KLWIH8eyAQCLcH8EqIrjac7fCkp/fHnvuQ= +github.com/dop251/goja_nodejs v0.0.0-20231122114759-e84d9a924c5c/go.mod h1:bhGPmCgCCTSRfiMYWjpS46IDo9EUZXlsuUaPXSWGbv0= github.com/dsnet/compress v0.0.1 h1:PlZu0n3Tuv04TzpfPbrnI0HW/YwodEXDS+oPKahKF0Q= github.com/dsnet/compress v0.0.1/go.mod h1:Aw8dCMJ7RioblQeTqt88akK31OvO8Dhf5JflhBbQEHo= github.com/dsnet/golib v0.0.0-20171103203638-1ea166775780/go.mod h1:Lj+Z9rebOhdfkVLjJ8T6VcRQv3SXugXy999NBtR9aFY= @@ -129,6 +144,8 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU= +github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -175,6 +192,7 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= +github.com/google/pprof v0.0.0-20230207041349-798e818bf904/go.mod h1:uglQLonpP8qtYCYyzA+8c/9qtqgA3qsXGYqCPKARAFg= github.com/google/pprof v0.0.0-20231212022811-ec68065c825e h1:bwOy7hAFd0C91URzMIEBfr6BAz29yk7Qj0cy6S7DJlU= github.com/google/pprof v0.0.0-20231212022811-ec68065c825e/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= @@ -197,6 +215,7 @@ github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSo github.com/hinshun/vt10x v0.0.0-20220119200601-820417d04eec h1:qv2VnGeEQHchGaZ/u7lxST/RaJw+cv273q79D81Xbog= github.com/hinshun/vt10x v0.0.0-20220119200601-820417d04eec/go.mod h1:Q48J4R4DvxnHolD5P8pOtXigYlRuPLGl6moFx3ulM68= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= @@ -269,11 +288,13 @@ github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFB github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= @@ -439,6 +460,7 @@ github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rogpeppe/fastuuid v1.1.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= @@ -668,6 +690,7 @@ golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -707,6 +730,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= @@ -777,6 +801,7 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/cmd/miniooni/javascript.go b/internal/cmd/miniooni/javascript.go new file mode 100644 index 000000000..e0cfec051 --- /dev/null +++ b/internal/cmd/miniooni/javascript.go @@ -0,0 +1,40 @@ +package main + +import ( + "path/filepath" + + "github.com/apex/log" + "github.com/ooni/probe-cli/v3/internal/runtimex" + "github.com/ooni/probe-cli/v3/internal/x/dsljavascript" + "github.com/spf13/cobra" +) + +// registerJavaScript registers the javascript subcommand +func registerJavaScript(rootCmd *cobra.Command, globalOptions *Options) { + subCmd := &cobra.Command{ + Use: "javascript", + Short: "Very experimental command to run JavaScript snippets", + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + runtimex.Assert(len(args) == 1, "expected exactly one argument") + javaScriptMain(args[0]) + }, + } + rootCmd.AddCommand(subCmd) +} + +func javaScriptMain(scriptPath string) { + // TODO(bassosimone): for an initial prototype, using a local directory is + // good, but, if we make this more production ready, we probably need to define + // a specific location under the $OONI_HOME. + config := &dsljavascript.VMConfig{ + Logger: log.Log, + ScriptBaseDir: filepath.Join(".", "./javascript"), + } + + log.Warnf("The javascript subcommand is highly experimental and may be removed") + log.Warnf("or heavily modified without prior notice. For more information, for now") + log.Warnf("see https://github.com/bassosimone/2023-12-09-ooni-javascript.") + + runtimex.Try0(dsljavascript.RunScript(config, scriptPath)) +} diff --git a/internal/cmd/miniooni/main.go b/internal/cmd/miniooni/main.go index f78004c1b..dc3e5b734 100644 --- a/internal/cmd/miniooni/main.go +++ b/internal/cmd/miniooni/main.go @@ -174,6 +174,7 @@ func main() { registerAllExperiments(rootCmd, &globalOptions) registerOONIRun(rootCmd, &globalOptions) + registerJavaScript(rootCmd, &globalOptions) if err := rootCmd.Execute(); err != nil { os.Exit(1) diff --git a/internal/x/doc.go b/internal/x/doc.go new file mode 100644 index 000000000..746d2c25c --- /dev/null +++ b/internal/x/doc.go @@ -0,0 +1,2 @@ +// Package x contains highly experimental packages. +package x diff --git a/internal/x/dslengine/measurexlite.go b/internal/x/dslengine/measurexlite.go new file mode 100644 index 000000000..468461b00 --- /dev/null +++ b/internal/x/dslengine/measurexlite.go @@ -0,0 +1,36 @@ +package dslengine + +import ( + "time" + + "github.com/ooni/probe-cli/v3/internal/measurexlite" + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/x/dslvm" +) + +// NewRuntimeMeasurexLite creates a [Runtime] using [measurexlite] to collect [*Observations]. +func NewRuntimeMeasurexLite(logger model.Logger, zeroTime time.Time, options ...Option) *RuntimeMeasurexLite { + values := newOptionValues(options...) + + rt := &RuntimeMeasurexLite{ + MinimalRuntime: NewMinimalRuntime(logger, zeroTime, options...), + netx: values.netx, + } + + return rt +} + +// RuntimeMeasurexLite uses [measurexlite] to collect [*Observations.] +type RuntimeMeasurexLite struct { + *MinimalRuntime + netx model.MeasuringNetwork +} + +// NewTrace implements Runtime. +func (p *RuntimeMeasurexLite) NewTrace(index int64, zeroTime time.Time, tags ...string) dslvm.Trace { + trace := measurexlite.NewTrace(index, zeroTime, tags...) + trace.Netx = p.netx + return trace +} + +var _ dslvm.Runtime = &RuntimeMeasurexLite{} diff --git a/internal/x/dslengine/minimal.go b/internal/x/dslengine/minimal.go new file mode 100644 index 000000000..c658ceb90 --- /dev/null +++ b/internal/x/dslengine/minimal.go @@ -0,0 +1,184 @@ +package dslengine + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/x/dslvm" +) + +// NewMinimalRuntime creates a minimal [Runtime] implementation. +// +// This [Runtime] implementation does not collect any [*Observations]. +func NewMinimalRuntime(logger model.Logger, zeroTime time.Time, options ...Option) *MinimalRuntime { + values := newOptionValues(options...) + + rt := &MinimalRuntime{ + activeConn: dslvm.NewSemaphore("activeConn", values.activeConns), + activeDNS: dslvm.NewSemaphore("activeDNS", values.activeDNS), + idg: &atomic.Int64{}, + logger: logger, + mu: sync.Mutex{}, + netx: values.netx, + ob: dslvm.NewObservations(), + zeroT: zeroTime, + } + + return rt +} + +var _ dslvm.Runtime = &MinimalRuntime{} + +// MinimalRuntime is a minimal [Runtime] implementation. +type MinimalRuntime struct { + activeConn *dslvm.Semaphore + activeDNS *dslvm.Semaphore + idg *atomic.Int64 + logger model.Logger + mu sync.Mutex + netx model.MeasuringNetwork + ob *dslvm.Observations + zeroT time.Time +} + +// ActiveConnections implements [Runtime]. +func (p *MinimalRuntime) ActiveConnections() *dslvm.Semaphore { + return p.activeConn +} + +// ActiveDNSLookups implements [Runtime]. +func (p *MinimalRuntime) ActiveDNSLookups() *dslvm.Semaphore { + return p.activeDNS +} + +// Observations implements Runtime. +func (p *MinimalRuntime) Observations() *dslvm.Observations { + defer p.mu.Unlock() + p.mu.Lock() + o := p.ob + p.ob = dslvm.NewObservations() + return o +} + +// SaveObservations implements Runtime. +func (p *MinimalRuntime) SaveObservations(obs ...*dslvm.Observations) { + defer p.mu.Unlock() + p.mu.Lock() + for _, o := range obs { + p.ob.NetworkEvents = append(p.ob.NetworkEvents, o.NetworkEvents...) + p.ob.QUICHandshakes = append(p.ob.QUICHandshakes, o.QUICHandshakes...) + p.ob.Queries = append(p.ob.Queries, o.Queries...) + p.ob.Requests = append(p.ob.Requests, o.Requests...) + p.ob.TCPConnect = append(p.ob.TCPConnect, o.TCPConnect...) + p.ob.TLSHandshakes = append(p.ob.TLSHandshakes, o.TLSHandshakes...) + } +} + +// IDGenerator implements Runtime. +func (p *MinimalRuntime) IDGenerator() *atomic.Int64 { + return p.idg +} + +// Logger implements Runtime. +func (p *MinimalRuntime) Logger() model.Logger { + return p.logger +} + +// ZeroTime implements Runtime. +func (p *MinimalRuntime) ZeroTime() time.Time { + return p.zeroT +} + +// NewTrace implements Runtime. +func (p *MinimalRuntime) NewTrace(index int64, zeroTime time.Time, tags ...string) dslvm.Trace { + return &minimalTrace{idx: index, netx: p.netx, tags: tags, zt: zeroTime} +} + +type minimalTrace struct { + idx int64 + netx model.MeasuringNetwork + tags []string + zt time.Time +} + +// CloneBytesReceivedMap implements Trace. +func (tx *minimalTrace) CloneBytesReceivedMap() (out map[string]int64) { + return make(map[string]int64) +} + +// DNSLookupsFromRoundTrip implements Trace. +func (tx *minimalTrace) DNSLookupsFromRoundTrip() (out []*model.ArchivalDNSLookupResult) { + return []*model.ArchivalDNSLookupResult{} +} + +// Index implements Trace. +func (tx *minimalTrace) Index() int64 { + return tx.idx +} + +// NetworkEvents implements Trace. +func (tx *minimalTrace) NetworkEvents() (out []*model.ArchivalNetworkEvent) { + return []*model.ArchivalNetworkEvent{} +} + +// NewDialerWithoutResolver implements Trace. +func (tx *minimalTrace) NewDialerWithoutResolver(dl model.DebugLogger, wrappers ...model.DialerWrapper) model.Dialer { + return tx.netx.NewDialerWithoutResolver(dl, wrappers...) +} + +// NewParallelUDPResolver implements Trace. +func (tx *minimalTrace) NewParallelUDPResolver(logger model.DebugLogger, dialer model.Dialer, address string) model.Resolver { + return tx.netx.NewParallelUDPResolver(logger, dialer, address) +} + +// NewQUICDialerWithoutResolver implements Trace. +func (tx *minimalTrace) NewQUICDialerWithoutResolver(listener model.UDPListener, dl model.DebugLogger, wrappers ...model.QUICDialerWrapper) model.QUICDialer { + return tx.netx.NewQUICDialerWithoutResolver(listener, dl, wrappers...) +} + +// NewStdlibResolver implements Trace. +func (tx *minimalTrace) NewStdlibResolver(logger model.DebugLogger) model.Resolver { + return tx.netx.NewStdlibResolver(logger) +} + +// NewTLSHandshakerStdlib implements Trace. +func (tx *minimalTrace) NewTLSHandshakerStdlib(dl model.DebugLogger) model.TLSHandshaker { + return tx.netx.NewTLSHandshakerStdlib(dl) +} + +// NewUDPListener implements Trace +func (tx *minimalTrace) NewUDPListener() model.UDPListener { + return tx.netx.NewUDPListener() +} + +// QUICHandshakes implements Trace. +func (tx *minimalTrace) QUICHandshakes() (out []*model.ArchivalTLSOrQUICHandshakeResult) { + return []*model.ArchivalTLSOrQUICHandshakeResult{} +} + +// TCPConnects implements Trace. +func (tx *minimalTrace) TCPConnects() (out []*model.ArchivalTCPConnectResult) { + return []*model.ArchivalTCPConnectResult{} +} + +// TLSHandshakes implements Trace. +func (tx *minimalTrace) TLSHandshakes() (out []*model.ArchivalTLSOrQUICHandshakeResult) { + return []*model.ArchivalTLSOrQUICHandshakeResult{} +} + +// Tags implements Trace. +func (tx *minimalTrace) Tags() []string { + return tx.tags +} + +// TimeSince implements Trace. +func (tx *minimalTrace) TimeSince(t0 time.Time) time.Duration { + return time.Since(t0) +} + +// ZeroTime implements Trace. +func (tx *minimalTrace) ZeroTime() time.Time { + return tx.zt +} diff --git a/internal/x/dslengine/options.go b/internal/x/dslengine/options.go new file mode 100644 index 000000000..4d468046f --- /dev/null +++ b/internal/x/dslengine/options.go @@ -0,0 +1,67 @@ +package dslengine + +import ( + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/netxlite" +) + +type optionsValues struct { + // activeConns controls the maximum number of active conns + activeConns int + + // activeDNS controls the maximum number of active DNS lookups + activeDNS int + + // netx is the underlying measuring network + netx model.MeasuringNetwork +} + +func newOptionValues(options ...Option) *optionsValues { + values := &optionsValues{ + activeConns: 1, + activeDNS: 1, + netx: &netxlite.Netx{Underlying: nil}, // implies using the host's network + } + for _, option := range options { + option(values) + } + return values +} + +// Option is an option for configuring a runtime. +type Option func(opts *optionsValues) + +// OptionMeasuringNetwork configures the [model.MeasuringNetwork] to use. +func OptionMeasuringNetwork(netx model.MeasuringNetwork) Option { + return func(opts *optionsValues) { + opts.netx = netx + } +} + +// OptionMaxActiveConns configures the maximum number of endpoint +// measurements that we may run in parallel. If the provided value +// is <= 1, we set a maximum of 1 measurements in parallel. +func OptionMaxActiveConns(count int) Option { + return func(opts *optionsValues) { + switch { + case count > 1: + opts.activeConns = count + default: + opts.activeConns = 1 + } + } +} + +// OptionMaxActiveDNSLookups configures the maximum number of DNS lookup +// measurements that we may run in parallel. If the provided value +// is <= 1, we set a maximum of 1 measurements in parallel. +func OptionMaxActiveDNSLookups(count int) Option { + return func(opts *optionsValues) { + switch { + case count > 1: + opts.activeDNS = count + default: + opts.activeDNS = 1 + } + } +} diff --git a/internal/x/dsljavascript/consolemodule.go b/internal/x/dsljavascript/consolemodule.go new file mode 100644 index 000000000..4f5794f69 --- /dev/null +++ b/internal/x/dsljavascript/consolemodule.go @@ -0,0 +1,44 @@ +package dsljavascript + +// +// Adapted from github.com/dop251/goja_nodejs +// +// SPDX-License-Identifier: MIT +// + +import ( + "github.com/dop251/goja" + "github.com/ooni/probe-cli/v3/internal/runtimex" +) + +// newModuleConsole creates the console module in JavaScript +func (vm *VM) newModuleConsole(gojaVM *goja.Runtime, mod *goja.Object) { + runtimex.Assert(vm.vm == gojaVM, "dsljavascript: unexpected gojaVM pointer value") + exports := mod.Get("exports").(*goja.Object) + exports.Set("log", vm.consoleLog) + exports.Set("error", vm.consoleError) + exports.Set("warn", vm.consoleWarn) +} + +// consoleLog implements console.log +func (vm *VM) consoleLog(call goja.FunctionCall) goja.Value { + return vm.consoleDo(call, vm.logger.Info) +} + +// consoleError implements console.Error +func (vm *VM) consoleError(call goja.FunctionCall) goja.Value { + return vm.consoleDo(call, vm.logger.Warn) +} + +// consoleWarn implements console.Warn +func (vm *VM) consoleWarn(call goja.FunctionCall) goja.Value { + return vm.consoleDo(call, vm.logger.Warn) +} + +func (vm *VM) consoleDo(call goja.FunctionCall, emit func(msg string)) goja.Value { + format, ok := goja.AssertFunction(vm.util.Get("format")) + runtimex.Assert(ok, "dsljavascript: util.format is not a function") + ret := runtimex.Try1(format(vm.util, call.Arguments...)) + emit(ret.String()) + return nil +} diff --git a/internal/x/dsljavascript/doc.go b/internal/x/dsljavascript/doc.go new file mode 100644 index 000000000..d89b0785d --- /dev/null +++ b/internal/x/dsljavascript/doc.go @@ -0,0 +1,2 @@ +// Package dsljavascript allows running experiments written in JavaScript. +package dsljavascript diff --git a/internal/x/dsljavascript/golangmodule.go b/internal/x/dsljavascript/golangmodule.go new file mode 100644 index 000000000..219a9e2d3 --- /dev/null +++ b/internal/x/dsljavascript/golangmodule.go @@ -0,0 +1,21 @@ +package dsljavascript + +import ( + "time" + + "github.com/dop251/goja" + "github.com/ooni/probe-cli/v3/internal/runtimex" +) + +// newModuleGolang creates the _golang module in JavaScript +func (vm *VM) newModuleGolang(gojaVM *goja.Runtime, mod *goja.Object) { + runtimex.Assert(vm.vm == gojaVM, "dsljavascript: unexpected gojaVM pointer value") + exports := mod.Get("exports").(*goja.Object) + exports.Set("timeNow", vm.golangTimeNow) +} + +// golangTimeNow returns the current time using golang [time.Now] +func (vm *VM) golangTimeNow(call goja.FunctionCall) goja.Value { + runtimex.Assert(len(call.Arguments) == 0, "dsljavascript: _golang.timeNow expects zero arguments") + return vm.vm.ToValue(time.Now()) +} diff --git a/internal/x/dsljavascript/oonimodule.go b/internal/x/dsljavascript/oonimodule.go new file mode 100644 index 000000000..d6640ea4f --- /dev/null +++ b/internal/x/dsljavascript/oonimodule.go @@ -0,0 +1,53 @@ +package dsljavascript + +import ( + "context" + "encoding/json" + "time" + + "github.com/dop251/goja" + "github.com/ooni/probe-cli/v3/internal/runtimex" + "github.com/ooni/probe-cli/v3/internal/x/dslengine" + "github.com/ooni/probe-cli/v3/internal/x/dsljson" +) + +// newModuleOONI creates the _ooni module in JavaScript +func (vm *VM) newModuleOONI(gojaVM *goja.Runtime, mod *goja.Object) { + runtimex.Assert(vm.vm == gojaVM, "dsljavascript: unexpected gojaVM pointer value") + exports := mod.Get("exports").(*goja.Object) + exports.Set("runDSL", vm.ooniRunDSL) +} + +func (vm *VM) ooniRunDSL(jsAST *goja.Object, zeroTime time.Time) (string, error) { + // serialize the incoming JS object + rawAST, err := jsAST.MarshalJSON() + if err != nil { + return "", err + } + + // parse the raw AST into the loadable AST format + var root dsljson.RootNode + if err := json.Unmarshal(rawAST, &root); err != nil { + return "", err + } + + // create a background context for now but ideally we should allow to interrupt + ctx := context.Background() + + // create a runtime for executing the DSL + // TODO(bassosimone): maybe we should configure the parallelism? + rtx := dslengine.NewRuntimeMeasurexLite( + vm.logger, zeroTime, + dslengine.OptionMaxActiveDNSLookups(4), + dslengine.OptionMaxActiveConns(16), + ) + + // interpret the JSON representation of the DSL + if err := dsljson.Run(ctx, rtx, &root); err != nil { + return "", err + } + + // serialize the observations to JSON and return + resultRaw := runtimex.Try1(json.Marshal(rtx.Observations())) + return string(resultRaw), nil +} diff --git a/internal/x/dsljavascript/vm.go b/internal/x/dsljavascript/vm.go new file mode 100644 index 000000000..193f4eb9f --- /dev/null +++ b/internal/x/dsljavascript/vm.go @@ -0,0 +1,213 @@ +package dsljavascript + +import ( + "errors" + "fmt" + "os" + "path/filepath" + + "github.com/dop251/goja" + "github.com/dop251/goja_nodejs/require" + "github.com/dop251/goja_nodejs/util" + "github.com/ooni/probe-cli/v3/internal/model" +) + +// VMConfig contains configuration for creating a VM. +type VMConfig struct { + // Logger is the MANDATORY logger to use. + Logger model.Logger + + // ScriptBaseDir is the MANDATORY script base dir to use. + ScriptBaseDir string +} + +// errVMConfig indicates that some setting in the [*VMConfig] is invalid. +var errVMConfig = errors.New("dsljavascript: invalid VMConfig") + +// check returns an explanatory error if the [*VMConfig] is invalid. +func (cfg *VMConfig) check() error { + if cfg.Logger == nil { + return fmt.Errorf("%w: the Logger field is nil", errVMConfig) + } + + if cfg.ScriptBaseDir == "" { + return fmt.Errorf("%w: the ScriptBaseDir field is empty", errVMConfig) + } + + return nil +} + +// VM wraps the [*github.com/dop251/goja.Runtime]. The zero value of this +// struct is invalid; please, use [NewVM] to construct. +type VM struct { + // logger is the logger to use. + logger model.Logger + + // registry is the JavaScript package registry to use. + registry *require.Registry + + // scriptBaseDir is the base directory containing scripts. + scriptBaseDir string + + // util is a reference to goja's util model. + util *goja.Object + + // vm is a reference to goja's runtime. + vm *goja.Runtime +} + +// RunScript runs the given script using a transient VM. +func RunScript(config *VMConfig, scriptPath string) error { + // create a VM + vm, err := NewVM(config, scriptPath) + if err != nil { + return err + } + + // run the script + return vm.RunScript(scriptPath) +} + +// NewVM creates a new VM instance. +func NewVM(config *VMConfig, scriptPath string) (*VM, error) { + // make sure the provided config is correct + if err := config.check(); err != nil { + return nil, err + } + + // convert the script base dir to be an absolute path + scriptBaseDir, err := filepath.Abs(config.ScriptBaseDir) + if err != nil { + return nil, err + } + + // create package registry ("By default, a registry's global folders list is empty") + registry := require.NewRegistry(require.WithGlobalFolders(scriptBaseDir)) + + // create the goja virtual machine + gojaVM := goja.New() + + // enable 'require' for the virtual machine + registry.Enable(gojaVM) + + // create the virtual machine wrapper + vm := &VM{ + logger: config.Logger, + registry: registry, + scriptBaseDir: scriptBaseDir, + util: require.Require(gojaVM, util.ModuleName).(*goja.Object), + vm: gojaVM, + } + + // register the console module in JavaScript + registry.RegisterNativeModule("console", vm.newModuleConsole) + + // make sure the 'console' object exists in the VM before running scripts + gojaVM.Set("console", require.Require(gojaVM, "console")) + + // register the _golang module in JavaScript + registry.RegisterNativeModule("_golang", vm.newModuleGolang) + + // register the _ooni module in JavaScript + registry.RegisterNativeModule("_ooni", vm.newModuleOONI) + + return vm, nil +} + +// LoadExperiment loads the given experiment file and returns a new VM primed +// to execute the experiment several times for several inputs. +func LoadExperiment(config *VMConfig, exPath string) (*VM, error) { + // create a new VM instance + vm, err := NewVM(config, exPath) + if err != nil { + return nil, err + } + + // make sure there's an empty dictionary containing exports + vm.vm.Set("exports", vm.vm.NewObject()) + + // run the script + if err := vm.RunScript(exPath); err != nil { + return nil, err + } + + return vm, nil +} + +func (vm *VM) RunScript(exPath string) error { + // read the file content + content, err := os.ReadFile(exPath) + if err != nil { + return err + } + + // interpret the script defining the experiment + if _, err = vm.vm.RunScript(exPath, string(content)); err != nil { + return err + } + + return nil +} + +func (vm *VM) findExportedSymbol(name string) (goja.Value, error) { + // obtain the toplevel exports object + value := vm.vm.Get("exports") + if value == nil { + return nil, errors.New("cannot find symbol: exports") + } + + // convert to object + exports := value.ToObject(vm.vm) + if exports == nil { + return nil, errors.New("cannot convert exports to object") + } + + // obtain the symbol inside exports + symbol := exports.Get(name) + if symbol == nil { + return nil, fmt.Errorf("cannot find symbol: exports.%s", name) + } + + return symbol, nil +} + +// ExperimentName returns the experiment name. Invoking this method +// before invoking LoadScript always produces an error. +func (vm *VM) ExperimentName() (string, error) { + var experimentName func() (string, error) + value, err := vm.findExportedSymbol("experimentName") + if err != nil { + return "", err + } + if err := vm.vm.ExportTo(value, &experimentName); err != nil { + return "", err + } + return experimentName() +} + +// ExperimentVersion returns the experiment version. Invoking this method +// before invoking LoadScript always produces an error. +func (vm *VM) ExperimentVersion() (string, error) { + var experimentVersion func() (string, error) + value, err := vm.findExportedSymbol("experimentVersion") + if err != nil { + return "", err + } + if err := vm.vm.ExportTo(value, &experimentVersion); err != nil { + return "", err + } + return experimentVersion() +} + +// Run performs a measurement and returns the test keys. +func (vm *VM) Run(input string) (string, error) { + var run func(string) (string, error) + value, err := vm.findExportedSymbol("run") + if err != nil { + return "", err + } + if err := vm.vm.ExportTo(value, &run); err != nil { + return "", err + } + return run(input) +} diff --git a/internal/x/dsljson/dedupaddrs.go b/internal/x/dsljson/dedupaddrs.go new file mode 100644 index 000000000..81b1e6aa0 --- /dev/null +++ b/internal/x/dsljson/dedupaddrs.go @@ -0,0 +1,43 @@ +package dsljson + +import ( + "encoding/json" + + "github.com/ooni/probe-cli/v3/internal/x/dslvm" +) + +type dedupAddrsValue struct { + Inputs []string `json:"inputs"` + Output string `json:"output"` +} + +func (lx *loader) onDedupAddrs(raw json.RawMessage) error { + // parse the raw value + var value dedupAddrsValue + if err := json.Unmarshal(raw, &value); err != nil { + return err + } + + // create the required output registers + output, err := registerMakeOutput[string](lx, value.Output) + if err != nil { + return err + } + + // instantiate the stage + sx := &dslvm.DedupAddrsStage{ + Inputs: []<-chan string{}, + Output: output, + } + for _, name := range value.Inputs { + input, err := registerPopInput[string](lx, name) + if err != nil { + return err + } + sx.Inputs = append(sx.Inputs, input) + } + + // remember the stage for later + lx.stages = append(lx.stages, sx) + return nil +} diff --git a/internal/x/dsljson/dnslookupudp.go b/internal/x/dsljson/dnslookupudp.go new file mode 100644 index 000000000..a39a86d92 --- /dev/null +++ b/internal/x/dsljson/dnslookupudp.go @@ -0,0 +1,40 @@ +package dsljson + +import ( + "encoding/json" + + "github.com/ooni/probe-cli/v3/internal/x/dslvm" +) + +type dnsLookupUDPValue struct { + Domain string `json:"domain"` + Output string `json:"output"` + Resolver string `json:"resolver"` + Tags []string `json:"tags"` +} + +func (lx *loader) onDNSLookupUDP(raw json.RawMessage) error { + // parse the raw value + var value dnsLookupUDPValue + if err := json.Unmarshal(raw, &value); err != nil { + return err + } + + // create the required output registers + output, err := registerMakeOutput[string](lx, value.Output) + if err != nil { + return err + } + + // instantiate the stage + sx := &dslvm.DNSLookupUDPStage{ + Domain: value.Domain, + Output: output, + Resolver: value.Resolver, + Tags: value.Tags, + } + + // remember the stage for later + lx.stages = append(lx.stages, sx) + return nil +} diff --git a/internal/x/dsljson/doc.go b/internal/x/dsljson/doc.go new file mode 100644 index 000000000..3986ef463 --- /dev/null +++ b/internal/x/dsljson/doc.go @@ -0,0 +1,2 @@ +// Package dsljson allows expressing the measurement DSL using JSON. +package dsljson diff --git a/internal/x/dsljson/drop.go b/internal/x/dsljson/drop.go new file mode 100644 index 000000000..7cb300db0 --- /dev/null +++ b/internal/x/dsljson/drop.go @@ -0,0 +1,71 @@ +package dsljson + +import ( + "encoding/json" + "errors" + + "github.com/ooni/probe-cli/v3/internal/x/dslvm" +) + +type dropValue struct { + Input string `json:"input"` + Output string `json:"output"` +} + +func (lx *loader) onDrop(raw json.RawMessage) error { + // parse the raw value + var value dropValue + if err := json.Unmarshal(raw, &value); err != nil { + return err + } + + // create the required output registers + output, err := registerMakeOutput[dslvm.Done](lx, value.Output) + if err != nil { + return err + } + + // make sure we register output as something to wait for + lx.toWait = append(lx.toWait, output) + + // fetch the required input register as a generic any value + xinput, err := registerPopInputRaw(lx, value.Input) + if err != nil { + return err + } + + // figure out the correct xinput type + var sx dslvm.Stage + switch input := xinput.(type) { + case chan *dslvm.TCPConnection: + sx = &dslvm.DropStage[*dslvm.TCPConnection]{ + Input: input, + Output: output, + } + + case chan *dslvm.TLSConnection: + sx = &dslvm.DropStage[*dslvm.TLSConnection]{ + Input: input, + Output: output, + } + + case chan *dslvm.QUICConnection: + sx = &dslvm.DropStage[*dslvm.QUICConnection]{ + Input: input, + Output: output, + } + + case chan string: + sx = &dslvm.DropStage[string]{ + Input: input, + Output: output, + } + + default: + return errors.New("drop: cannot instantiate output stage") + } + + // remember the stage for later + lx.stages = append(lx.stages, sx) + return nil +} diff --git a/internal/x/dsljson/getaddrinfo.go b/internal/x/dsljson/getaddrinfo.go new file mode 100644 index 000000000..c5cd9e73f --- /dev/null +++ b/internal/x/dsljson/getaddrinfo.go @@ -0,0 +1,38 @@ +package dsljson + +import ( + "encoding/json" + + "github.com/ooni/probe-cli/v3/internal/x/dslvm" +) + +type getaddrinfoValue struct { + Domain string `json:"domain"` + Output string `json:"output"` + Tags []string `json:"tags"` +} + +func (lx *loader) onGetaddrinfo(raw json.RawMessage) error { + // parse the raw value + var value getaddrinfoValue + if err := json.Unmarshal(raw, &value); err != nil { + return err + } + + // create the required output registers + output, err := registerMakeOutput[string](lx, value.Output) + if err != nil { + return err + } + + // instantiate the stage + sx := &dslvm.GetaddrinfoStage{ + Domain: value.Domain, + Output: output, + Tags: value.Tags, + } + + // remember the stage for later + lx.stages = append(lx.stages, sx) + return nil +} diff --git a/internal/x/dsljson/http.go b/internal/x/dsljson/http.go new file mode 100644 index 000000000..98f8f8763 --- /dev/null +++ b/internal/x/dsljson/http.go @@ -0,0 +1,97 @@ +package dsljson + +import ( + "encoding/json" + "errors" + + "github.com/ooni/probe-cli/v3/internal/x/dslvm" +) + +type httpRoundTripValue struct { + Accept string `json:"accept"` + AcceptLanguage string `json:"accept_language"` + Host string `json:"host"` + Input string `json:"input"` + MaxBodySnapshotSize int64 `json:"max_body_snapshot_size"` + Method string `json:"method"` + Output string `json:"output"` + Referer string `json:"referer"` + URLPath string `json:"url_path"` + UserAgent string `json:"user_agent"` +} + +func (lx *loader) onHTTPRoundTrip(raw json.RawMessage) error { + // parse the raw value + var value httpRoundTripValue + if err := json.Unmarshal(raw, &value); err != nil { + return err + } + + // create the required output registers + output, err := registerMakeOutput[dslvm.Done](lx, value.Output) + if err != nil { + return err + } + + // make sure we register output as something to wait for + lx.toWait = append(lx.toWait, output) + + // fetch the required input register as a generic any value + xinput, err := registerPopInputRaw(lx, value.Input) + if err != nil { + return err + } + + // figure out the correct xinput type + var sx dslvm.Stage + switch input := xinput.(type) { + case chan *dslvm.TCPConnection: + sx = &dslvm.HTTPRoundTripStage[*dslvm.TCPConnection]{ + Accept: value.Accept, + AcceptLanguage: value.AcceptLanguage, + Host: value.Host, + Input: input, + MaxBodySnapshotSize: value.MaxBodySnapshotSize, + Method: value.Method, + Output: output, + Referer: value.Referer, + URLPath: value.URLPath, + UserAgent: value.UserAgent, + } + + case chan *dslvm.TLSConnection: + sx = &dslvm.HTTPRoundTripStage[*dslvm.TLSConnection]{ + Accept: value.Accept, + AcceptLanguage: value.AcceptLanguage, + Host: value.Host, + Input: input, + MaxBodySnapshotSize: value.MaxBodySnapshotSize, + Method: value.Method, + Output: output, + Referer: value.Referer, + URLPath: value.URLPath, + UserAgent: value.UserAgent, + } + + case chan *dslvm.QUICConnection: + sx = &dslvm.HTTPRoundTripStage[*dslvm.QUICConnection]{ + Accept: value.Accept, + AcceptLanguage: value.AcceptLanguage, + Host: value.Host, + Input: input, + MaxBodySnapshotSize: value.MaxBodySnapshotSize, + Method: value.Method, + Output: output, + Referer: value.Referer, + URLPath: value.URLPath, + UserAgent: value.UserAgent, + } + + default: + return errors.New("http_round_trip: cannot instantiate output stage") + } + + // remember the stage for later + lx.stages = append(lx.stages, sx) + return nil +} diff --git a/internal/x/dsljson/loader.go b/internal/x/dsljson/loader.go new file mode 100644 index 000000000..9b9299565 --- /dev/null +++ b/internal/x/dsljson/loader.go @@ -0,0 +1,98 @@ +package dsljson + +import ( + "encoding/json" + "fmt" + + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/must" + "github.com/ooni/probe-cli/v3/internal/x/dslvm" +) + +type loader struct { + // gone contains the names of registers we have already used. + gone map[string]bool + + // loaders contains the loaders. + loaders map[string]func(json.RawMessage) error + + // registers maps variable names to values. + registers map[string]any + + // stages contains the stages of DSL ASM stages. + stages []dslvm.Stage + + // toWait contains the channels to wait for. + toWait []<-chan dslvm.Done +} + +func newLoader() *loader { + lx := &loader{ + gone: map[string]bool{}, + loaders: make(map[string]func(json.RawMessage) error), + registers: map[string]any{}, + stages: []dslvm.Stage{}, + } + + lx.loaders["drop"] = lx.onDrop + lx.loaders["dns_lookup_udp"] = lx.onDNSLookupUDP + lx.loaders["dedup_addrs"] = lx.onDedupAddrs + lx.loaders["getaddrinfo"] = lx.onGetaddrinfo + lx.loaders["http_round_trip"] = lx.onHTTPRoundTrip + lx.loaders["make_endpoints"] = lx.onMakeEndpoints + lx.loaders["quic_handshake"] = lx.onQUICHandshake + lx.loaders["take_n"] = lx.onTakeN + lx.loaders["tcp_connect"] = lx.onTCPConnect + lx.loaders["tls_handshake"] = lx.onTLSHandshake + lx.loaders["tee_addrs"] = lx.onTeeAddrs + + return lx +} + +func (lx *loader) load(logger model.Logger, root *RootNode) error { + + // load all the stages that belong to the root node + if err := lx.loadStages(logger, root.Stages...); err != nil { + return err + } + + // insert missing drops inside the code + var names []string + for name, register := range lx.registers { + if _, good := register.(chan dslvm.Done); good { + continue + } + logger.Warnf("register %s with type %T is not dropped: adding automatic drop", name, register) + names = append(names, name) + } + return lx.addAutomaticDrop(logger, names...) +} + +func (lx *loader) loadStages(logger model.Logger, stages ...StageNode) error { + for _, entry := range stages { + loader, good := lx.loaders[entry.Name] + if !good { + return fmt.Errorf("unknown instruction: %s", entry.Name) + } + if err := loader(entry.Value); err != nil { + return err + } + } + return nil +} + +func (lx *loader) addAutomaticDrop(logger model.Logger, names ...string) error { + for _, name := range names { + err := lx.loadStages(logger, StageNode{ + Name: "drop", + Value: must.MarshalJSON(dropValue{ + Input: name, + Output: name + "__autodrop", + }), + }) + if err != nil { + return err + } + } + return nil +} diff --git a/internal/x/dsljson/makeendpoints.go b/internal/x/dsljson/makeendpoints.go new file mode 100644 index 000000000..34840fc17 --- /dev/null +++ b/internal/x/dsljson/makeendpoints.go @@ -0,0 +1,44 @@ +package dsljson + +import ( + "encoding/json" + + "github.com/ooni/probe-cli/v3/internal/x/dslvm" +) + +type makeEndpointsValue struct { + Input string `json:"input"` + Output string `json:"output"` + Port string `json:"port"` +} + +func (lx *loader) onMakeEndpoints(raw json.RawMessage) error { + // parse the raw value + var value makeEndpointsValue + if err := json.Unmarshal(raw, &value); err != nil { + return err + } + + // create the required output registers + output, err := registerMakeOutput[string](lx, value.Output) + if err != nil { + return err + } + + // fetch the required input register + input, err := registerPopInput[string](lx, value.Input) + if err != nil { + return err + } + + // instantiate the stage + sx := &dslvm.MakeEndpointsStage{ + Input: input, + Output: output, + Port: value.Port, + } + + // remember the stage for later + lx.stages = append(lx.stages, sx) + return nil +} diff --git a/internal/x/dsljson/quic.go b/internal/x/dsljson/quic.go new file mode 100644 index 000000000..7b36eeb29 --- /dev/null +++ b/internal/x/dsljson/quic.go @@ -0,0 +1,65 @@ +package dsljson + +import ( + "crypto/x509" + "encoding/json" + "errors" + + "github.com/ooni/probe-cli/v3/internal/x/dslvm" +) + +type quicHandshakeValue struct { + Input string `json:"input"` + InsecureSkipVerify bool `json:"insecure_skip_verify"` + NextProtos []string `json:"next_protos"` + Output string `json:"output"` + RootCAs []string `json:"root_cas"` + ServerName string `json:"server_name"` + Tags []string `json:"tags"` +} + +func (lx *loader) onQUICHandshake(raw json.RawMessage) error { + // parse the raw value + var value quicHandshakeValue + if err := json.Unmarshal(raw, &value); err != nil { + return err + } + + // create the required output registers + output, err := registerMakeOutput[*dslvm.QUICConnection](lx, value.Output) + if err != nil { + return err + } + + // fetch the required input register + input, err := registerPopInput[string](lx, value.Input) + if err != nil { + return err + } + + // create the X509 cert pool + var pool *x509.CertPool + for _, cert := range value.RootCAs { + if pool == nil { + pool = x509.NewCertPool() + } + if !pool.AppendCertsFromPEM([]byte(cert)) { + return errors.New("cannot add PEM-encoded cert to X509 cert pool") + } + } + + // instantiate the stage + sx := &dslvm.QUICHandshakeStage{ + Input: input, + InsecureSkipVerify: value.InsecureSkipVerify, + NextProtos: value.NextProtos, + Output: output, + RootCAs: pool, + ServerName: value.ServerName, + Tags: value.Tags, + } + + // remember the stage for later + lx.stages = append(lx.stages, sx) + return nil +} diff --git a/internal/x/dsljson/register.go b/internal/x/dsljson/register.go new file mode 100644 index 000000000..7e58e3a18 --- /dev/null +++ b/internal/x/dsljson/register.go @@ -0,0 +1,37 @@ +package dsljson + +import "fmt" + +func registerMakeOutput[T any](lx *loader, name string) (chan T, error) { + if _, found := lx.registers[name]; found || lx.gone[name] { + return nil, fmt.Errorf("register already exists: %s", name) + } + c := make(chan T) + lx.registers[name] = c + return c, nil +} + +func registerPopInputRaw(lx *loader, name string) (any, error) { + rawch, found := lx.registers[name] + if !found { + if lx.gone[name] { + return nil, fmt.Errorf("register has already been used: %s", name) + } + return nil, fmt.Errorf("register does not exist: %s", name) + } + lx.gone[name] = true + delete(lx.registers, name) + return rawch, nil +} + +func registerPopInput[T any](lx *loader, name string) (chan T, error) { + rawch, err := registerPopInputRaw(lx, name) + if err != nil { + return nil, err + } + ch, okay := rawch.(chan T) + if !okay { + return nil, fmt.Errorf("invalid type for register: %s", name) + } + return ch, nil +} diff --git a/internal/x/dsljson/rootnode.go b/internal/x/dsljson/rootnode.go new file mode 100644 index 000000000..57e0eadf4 --- /dev/null +++ b/internal/x/dsljson/rootnode.go @@ -0,0 +1,6 @@ +package dsljson + +// RootNode is the root node of the DSL. +type RootNode struct { + Stages []StageNode `json:"stages"` +} diff --git a/internal/x/dsljson/run.go b/internal/x/dsljson/run.go new file mode 100644 index 000000000..1189863c3 --- /dev/null +++ b/internal/x/dsljson/run.go @@ -0,0 +1,20 @@ +package dsljson + +import ( + "context" + + "github.com/ooni/probe-cli/v3/internal/x/dslvm" +) + +// Run runs the DSL represented by the given [*RootNode]. +func Run(ctx context.Context, rtx dslvm.Runtime, root *RootNode) error { + lx := newLoader() + if err := lx.load(rtx.Logger(), root); err != nil { + return err + } + for _, stage := range lx.stages { + go stage.Run(ctx, rtx) + } + dslvm.Wait(lx.toWait...) + return nil +} diff --git a/internal/x/dsljson/stagenode.go b/internal/x/dsljson/stagenode.go new file mode 100644 index 000000000..7d5aafcb8 --- /dev/null +++ b/internal/x/dsljson/stagenode.go @@ -0,0 +1,9 @@ +package dsljson + +import "encoding/json" + +// StageNode describes a stage node in the DSL. +type StageNode struct { + Name string `json:"name"` + Value json.RawMessage `json:"value"` +} diff --git a/internal/x/dsljson/taken.go b/internal/x/dsljson/taken.go new file mode 100644 index 000000000..0dfd21a21 --- /dev/null +++ b/internal/x/dsljson/taken.go @@ -0,0 +1,83 @@ +package dsljson + +import ( + "encoding/json" + "errors" + + "github.com/ooni/probe-cli/v3/internal/x/dslvm" +) + +type takeNValue struct { + Input string `json:"input"` + N int64 `json:"n"` + Output string `json:"output"` +} + +func (lx *loader) onTakeN(raw json.RawMessage) error { + // parse the raw value + var value takeNValue + if err := json.Unmarshal(raw, &value); err != nil { + return err + } + + // fetch the required input register as a generic any value + xinput, err := registerPopInputRaw(lx, value.Input) + if err != nil { + return err + } + + // figure out the correct xinput type + var sx dslvm.Stage + switch input := xinput.(type) { + case chan *dslvm.TCPConnection: + output, err := registerMakeOutput[*dslvm.TCPConnection](lx, value.Output) + if err != nil { + return err + } + sx = &dslvm.TakeNStage[*dslvm.TCPConnection]{ + Input: input, + N: value.N, + Output: output, + } + + case chan *dslvm.TLSConnection: + output, err := registerMakeOutput[*dslvm.TLSConnection](lx, value.Output) + if err != nil { + return err + } + sx = &dslvm.TakeNStage[*dslvm.TLSConnection]{ + Input: input, + N: value.N, + Output: output, + } + + case chan *dslvm.QUICConnection: + output, err := registerMakeOutput[*dslvm.QUICConnection](lx, value.Output) + if err != nil { + return err + } + sx = &dslvm.TakeNStage[*dslvm.QUICConnection]{ + Input: input, + N: value.N, + Output: output, + } + + case chan string: + output, err := registerMakeOutput[string](lx, value.Output) + if err != nil { + return err + } + sx = &dslvm.TakeNStage[string]{ + Input: input, + N: value.N, + Output: output, + } + + default: + return errors.New("take_n: cannot instantiate output stage") + } + + // remember the stage for later + lx.stages = append(lx.stages, sx) + return nil +} diff --git a/internal/x/dsljson/tcp.go b/internal/x/dsljson/tcp.go new file mode 100644 index 000000000..f37ccb68e --- /dev/null +++ b/internal/x/dsljson/tcp.go @@ -0,0 +1,44 @@ +package dsljson + +import ( + "encoding/json" + + "github.com/ooni/probe-cli/v3/internal/x/dslvm" +) + +type tcpConnectValue struct { + Input string `json:"input"` + Output string `json:"output"` + Tags []string `json:"tags"` +} + +func (lx *loader) onTCPConnect(raw json.RawMessage) error { + // parse the raw value + var value tcpConnectValue + if err := json.Unmarshal(raw, &value); err != nil { + return err + } + + // create the required output registers + output, err := registerMakeOutput[*dslvm.TCPConnection](lx, value.Output) + if err != nil { + return err + } + + // fetch the required input register + input, err := registerPopInput[string](lx, value.Input) + if err != nil { + return err + } + + // instantiate the stage + sx := &dslvm.TCPConnectStage{ + Input: input, + Output: output, + Tags: value.Tags, + } + + // remember the stage for later + lx.stages = append(lx.stages, sx) + return nil +} diff --git a/internal/x/dsljson/teeaddrs.go b/internal/x/dsljson/teeaddrs.go new file mode 100644 index 000000000..e63b88908 --- /dev/null +++ b/internal/x/dsljson/teeaddrs.go @@ -0,0 +1,43 @@ +package dsljson + +import ( + "encoding/json" + + "github.com/ooni/probe-cli/v3/internal/x/dslvm" +) + +type teeAddrsValue struct { + Input string `json:"input"` + Outputs []string `json:"outputs"` +} + +func (lx *loader) onTeeAddrs(raw json.RawMessage) error { + // parse the raw value + var value teeAddrsValue + if err := json.Unmarshal(raw, &value); err != nil { + return err + } + + // create the required input registers + input, err := registerPopInput[string](lx, value.Input) + if err != nil { + return err + } + + // instantiate the stage + sx := &dslvm.TeeAddrsStage{ + Input: input, + Outputs: []chan<- string{}, + } + for _, name := range value.Outputs { + input, err := registerMakeOutput[string](lx, name) + if err != nil { + return err + } + sx.Outputs = append(sx.Outputs, input) + } + + // remember the stage for later + lx.stages = append(lx.stages, sx) + return nil +} diff --git a/internal/x/dsljson/tls.go b/internal/x/dsljson/tls.go new file mode 100644 index 000000000..7a975e935 --- /dev/null +++ b/internal/x/dsljson/tls.go @@ -0,0 +1,63 @@ +package dsljson + +import ( + "crypto/x509" + "encoding/json" + "errors" + + "github.com/ooni/probe-cli/v3/internal/x/dslvm" +) + +type tlsHandshakeValue struct { + Input string `json:"input"` + InsecureSkipVerify bool `json:"insecure_skip_verify"` + NextProtos []string `json:"next_protos"` + Output string `json:"output"` + RootCAs []string `json:"root_cas"` + ServerName string `json:"server_name"` +} + +func (lx *loader) onTLSHandshake(raw json.RawMessage) error { + // parse the raw value + var value tlsHandshakeValue + if err := json.Unmarshal(raw, &value); err != nil { + return err + } + + // create the required output registers + output, err := registerMakeOutput[*dslvm.TLSConnection](lx, value.Output) + if err != nil { + return err + } + + // fetch the required input register + input, err := registerPopInput[*dslvm.TCPConnection](lx, value.Input) + if err != nil { + return err + } + + // create the X509 cert pool + var pool *x509.CertPool + for _, cert := range value.RootCAs { + if pool == nil { + pool = x509.NewCertPool() + } + if !pool.AppendCertsFromPEM([]byte(cert)) { + return errors.New("cannot add PEM-encoded cert to X509 cert pool") + } + } + + // instantiate the stage + sx := &dslvm.TLSHandshakeStage{ + Input: input, + InsecureSkipVerify: value.InsecureSkipVerify, + NextProtos: value.NextProtos, + Output: output, + RootCAs: pool, + ServerName: value.ServerName, + } + + // remember the stage for later + lx.stages = append(lx.stages, sx) + return nil +} diff --git a/internal/x/dslvm/closer.go b/internal/x/dslvm/closer.go new file mode 100644 index 000000000..6f587210f --- /dev/null +++ b/internal/x/dslvm/closer.go @@ -0,0 +1,8 @@ +package dslvm + +import "github.com/ooni/probe-cli/v3/internal/model" + +// Closer is something that [Drop] should explicitly close. +type Closer interface { + Close(logger model.Logger) error +} diff --git a/internal/x/dslvm/dedupaddrs.go b/internal/x/dslvm/dedupaddrs.go new file mode 100644 index 000000000..7dd0dfad5 --- /dev/null +++ b/internal/x/dslvm/dedupaddrs.go @@ -0,0 +1,77 @@ +package dslvm + +import ( + "context" + "sync" +) + +// DedupAddrsStage is a [Stage] that deduplicates IP addresses. +type DedupAddrsStage struct { + // Inputs contains the MANDATORY channels from which to read IP addresses. We + // assume that these channels will be closed when done. + Inputs []<-chan string + + // Output is the MANDATORY channel where we emit the deduplicated IP addresss. We + // close this channel when all the Inputs have been closed. + Output chan<- string +} + +var _ Stage = &DedupAddrsStage{} + +// Run reads possibly duplicate IP addresses from Inputs and emits deduplicated +// IP addresses on Outputs. We close Outputs when done. +func (sx *DedupAddrsStage) Run(ctx context.Context, rtx Runtime) { + // create a locked map + var ( + dups = make(map[string]bool) + mu = &sync.Mutex{} + ) + + // stream the input channels to the workers + inputs := make(chan (<-chan string)) + go func() { + defer close(inputs) + for _, input := range sx.Inputs { + inputs <- input + } + }() + + // make sure we cap the number of workers we spawn + const maxworkers = 6 + workers := len(sx.Inputs) + if workers > maxworkers { + workers = maxworkers + } + waitGroup := &sync.WaitGroup{} + for idx := 0; idx < workers; idx++ { + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + + // get channel to drain + for input := range inputs { + + // deduplicate + for address := range input { + + mu.Lock() + already := dups[address] + dups[address] = true + mu.Unlock() + + if already { + continue + } + + sx.Output <- address + } + } + }() + } + + // make sure we close outputs + defer close(sx.Output) + + // wait for all inputs to be drained + waitGroup.Wait() +} diff --git a/internal/x/dslvm/dnslookupudp.go b/internal/x/dslvm/dnslookupudp.go new file mode 100644 index 000000000..02067dfcd --- /dev/null +++ b/internal/x/dslvm/dnslookupudp.go @@ -0,0 +1,83 @@ +package dslvm + +import ( + "context" + "time" + + "github.com/ooni/probe-cli/v3/internal/logx" +) + +// DNSLookupUDPStage is a [Stage] that resolves domain names using an UDP resolver. +type DNSLookupUDPStage struct { + // Domain is the MANDATORY domain to resolve using this DNS resolver. + Domain string + + // Output is the MANDATORY channel emitting IP addresses. We will close this + // channel when we have finished streaming the resolved addresses. + Output chan<- string + + // Resolver is the MANDATORY resolver endpoint (e.g., [::1]:53). + Resolver string + + // Tags contains OPTIONAL tags for the DNS observations. + Tags []string +} + +var _ Stage = &DNSLookupUDPStage{} + +// Run resolves a Domain using the given Do53 Endpoint and streams the +// results on Output, which is closed when we're done. +// +// This function honours the semaphore returned by the [Runtime] ActiveDNSLookups +// method and waits until it's given the permission to start a lookup. +func (sx *DNSLookupUDPStage) Run(ctx context.Context, rtx Runtime) { + // wait for permission to lookup and signal when done + rtx.ActiveDNSLookups().Wait() + defer rtx.ActiveDNSLookups().Signal() + + // make sure we close output when done + defer close(sx.Output) + + // create trace + trace := rtx.NewTrace(rtx.IDGenerator().Add(1), rtx.ZeroTime(), sx.Tags...) + + // start operation logger + ol := logx.NewOperationLogger( + rtx.Logger(), + "[#%d] DNSLookup[%s/udp] %s", + trace.Index(), + sx.Resolver, + sx.Domain, + ) + + // setup + const timeout = 4 * time.Second + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + // create the resolver + resolver := trace.NewParallelUDPResolver( + rtx.Logger(), + trace.NewDialerWithoutResolver(rtx.Logger()), + sx.Resolver, + ) + + // lookup + addrs, err := resolver.LookupHost(ctx, sx.Domain) + + // stop the operation logger + ol.Stop(err) + + // save the observations + rtx.SaveObservations(maybeTraceToObservations(trace)...) + + // handle error case + if err != nil { + return + } + + // handle success + for _, addr := range addrs { + sx.Output <- addr + } +} diff --git a/internal/x/dslvm/doc.go b/internal/x/dslvm/doc.go new file mode 100644 index 000000000..a0c2cf422 --- /dev/null +++ b/internal/x/dslvm/doc.go @@ -0,0 +1,2 @@ +// Package dslvm contains low-level code for implementing the measurements DSL. +package dslvm diff --git a/internal/x/dslvm/done.go b/internal/x/dslvm/done.go new file mode 100644 index 000000000..9c97c1c00 --- /dev/null +++ b/internal/x/dslvm/done.go @@ -0,0 +1,4 @@ +package dslvm + +// Done indicates that a DSL pipeline terminated. +type Done struct{} diff --git a/internal/x/dslvm/drop.go b/internal/x/dslvm/drop.go new file mode 100644 index 000000000..f20add27a --- /dev/null +++ b/internal/x/dslvm/drop.go @@ -0,0 +1,36 @@ +package dslvm + +import "context" + +// DropStage is a [Stage] that drops reference to whatever it is passed in input. If the +// input is a [Closer], this stage will also make sure it is closed. +type DropStage[T any] struct { + // Input contains the MANDATORY channel from which to read instances to drop. We + // assume that this channel will be closed when done. + Input <-chan T + + // Output contains the MANDATORY channel closed when Input has been closed. + Output chan Done +} + +var _ Stage = &DropStage[*TCPConnection]{} + +// Run drops all the input passed to the Input channel and closes Output when done. +func (sx *DropStage[T]) Run(ctx context.Context, rtx Runtime) { + // make sure we close Output when done + defer close(sx.Output) + + for input := range sx.Input { + drop[T](rtx, input) + } +} + +func drop[T any](rtx Runtime, value any) { + if closer, good := any(value).(Closer); good { + // close the connection and log about it + _ = closer.Close(rtx.Logger()) + + // make sure we signal the semaphore + rtx.ActiveConnections().Signal() + } +} diff --git a/internal/x/dslvm/getaddrinfo.go b/internal/x/dslvm/getaddrinfo.go new file mode 100644 index 000000000..8831acfb8 --- /dev/null +++ b/internal/x/dslvm/getaddrinfo.go @@ -0,0 +1,74 @@ +package dslvm + +import ( + "context" + "time" + + "github.com/ooni/probe-cli/v3/internal/logx" +) + +// GetaddrinfoStage is a [Stage] that resolves domain names using getaddrinfo. +type GetaddrinfoStage struct { + // Domain is the MANDATORY domain to resolve using this DNS resolver. + Domain string + + // Output is the MANDATORY channel emitting IP addresses. We will close this + // channel when we have finished streaming the resolved addresses. + Output chan<- string + + // Tags contains OPTIONAL tags for the DNS observations. + Tags []string +} + +var _ Stage = &GetaddrinfoStage{} + +// Run resolves a Domain using the getaddrinfo resolver. +// +// This function honours the semaphore returned by the [Runtime] ActiveDNSLookups +// method and waits until it's given the permission to start a lookup. +func (sx *GetaddrinfoStage) Run(ctx context.Context, rtx Runtime) { + // wait for permission to lookup and signal when done + rtx.ActiveDNSLookups().Wait() + defer rtx.ActiveDNSLookups().Signal() + + // make sure we close output when done + defer close(sx.Output) + + // create trace + trace := rtx.NewTrace(rtx.IDGenerator().Add(1), rtx.ZeroTime(), sx.Tags...) + + // start operation logger + ol := logx.NewOperationLogger( + rtx.Logger(), + "[#%d] DNSLookup[getaddrinfo] %s", + trace.Index(), + sx.Domain, + ) + + // setup + const timeout = 4 * time.Second + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + // create the resolver + resolver := trace.NewStdlibResolver(rtx.Logger()) + + // lookup + addrs, err := resolver.LookupHost(ctx, sx.Domain) + + // stop the operation logger + ol.Stop(err) + + // save the observations + rtx.SaveObservations(maybeTraceToObservations(trace)...) + + // handle error case + if err != nil { + return + } + + // handle success + for _, addr := range addrs { + sx.Output <- addr + } +} diff --git a/internal/x/dslvm/http.go b/internal/x/dslvm/http.go new file mode 100644 index 000000000..9a63af912 --- /dev/null +++ b/internal/x/dslvm/http.go @@ -0,0 +1,257 @@ +package dslvm + +import ( + "context" + "io" + "net/http" + "net/url" + "sync" + "time" + + "github.com/ooni/probe-cli/v3/internal/logx" + "github.com/ooni/probe-cli/v3/internal/measurexlite" + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/netxlite" + "github.com/ooni/probe-cli/v3/internal/throttling" +) + +// HTTPConnection is the connection type expected by [*HTTPRoundTripStage]. +type HTTPConnection interface { + // AsSingleUseTransport converts the connection to a single-use HTTP transport + AsSingleUseTransport(logger model.Logger) model.HTTPTransport + + // Closer embeds the Closer interface + Closer + + // Network returns the network + Network() string + + // RemoteAddress returns the remote address + RemoteAddress() string + + // Scheme returns the HTTP scheme for this connection + Scheme() string + + // TLSNegotiatedProtocol is the protocol negotiated by TLS + TLSNegotiatedProtocol() string + + // Trace returns the Trace to use + Trace() Trace +} + +// HTTPRoundTripStage performs HTTP round trips with connections of type T. +type HTTPRoundTripStage[T HTTPConnection] struct { + // Accept contains the OPTIONAL accept header. + Accept string + + // AcceptLanguage contains the OPTIONAL accept-language header. + AcceptLanguage string + + // Host contains the MANDATORY host header. + Host string + + // Input contains the MANDATORY channel from which to connections. We + // assume that this channel will be closed when done. + Input <-chan T + + // MaxBodySnapshotSize is the OPTIONAL maximum body snapshot size. + MaxBodySnapshotSize int64 + + // Method contains the MANDATORY method. + Method string + + // Output is the MANDATORY channel emitting [Void]. We will close this + // channel when the Input channel has been closed. + Output chan<- Done + + // Referer contains the OPTIONAL referer header. + Referer string + + // URLPath contains the MANDATORY URL path. + URLPath string + + // UserAgent contains the OPTIONAL user-agent header. + UserAgent string +} + +// Run is like [*TCPConnect.Run] except that it reads connections in Input and +// emits [Void] in Output. Each HTTP round trip runs in its own background +// goroutine. The parallelism is controlled by the [Runtime] ActiveConnections +// [Semaphore]. Note that this code TAKES OWNERSHIP of the connection it +// reads and closes it at the end of the round trip. While closing the conn, +// we signal [Runtime] ActiveConnections to unblock another measurement. +func (sx *HTTPRoundTripStage[T]) Run(ctx context.Context, rtx Runtime) { + // make sure we close the output channel + defer close(sx.Output) + + // track the number of running goroutines + waitGroup := &sync.WaitGroup{} + + for conn := range sx.Input { + // process connection in a background goroutine, which is fine + // because the previous step has acquired the semaphore. + waitGroup.Add(1) + go func(conn HTTPConnection) { + defer waitGroup.Done() + defer conn.Close(rtx.Logger()) // as documented, close when done + defer rtx.ActiveConnections().Signal() // unblock the next goroutine + sx.roundTrip(ctx, rtx, conn) + }(conn) + } + + // wait for pending work to finish + waitGroup.Wait() +} + +func (sx *HTTPRoundTripStage[T]) roundTrip(ctx context.Context, rtx Runtime, conn HTTPConnection) { + // setup + const timeout = 10 * time.Second + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + // create HTTP request + req, err := sx.newHTTPRequest(ctx, conn, rtx.Logger()) + if err != nil { + return + } + + // start the operation logger + ol := logx.NewOperationLogger( + rtx.Logger(), + "[#%d] HTTPRequest %s with %s/%s host=%s", + conn.Trace().Index(), + req.URL.String(), + conn.RemoteAddress(), + conn.Network(), + req.Host, + ) + + // perform HTTP round trip and collect observations + observations, err := sx.doRoundTrip(ctx, conn, rtx.Logger(), req) + + // stop the operation logger + ol.Stop(err) + + // merge and save observations + observations = append(observations, maybeTraceToObservations(conn.Trace())...) + rtx.SaveObservations(observations...) +} + +func (sx *HTTPRoundTripStage[T]) newHTTPRequest( + ctx context.Context, conn HTTPConnection, logger model.Logger) (*http.Request, error) { + // create the default HTTP request + URL := &url.URL{ + Scheme: conn.Scheme(), + Opaque: "", + User: nil, + Host: sx.Host, + Path: sx.URLPath, + RawPath: "", + ForceQuery: false, + RawQuery: "", + Fragment: "", + RawFragment: "", + } + req, err := http.NewRequestWithContext(ctx, sx.Method, URL.String(), nil) + if err != nil { + return nil, err + } + + // Go would use URL.Host as "Host" header anyways in case we leave req.Host empty. + // We already set it here so that we can use req.Host for logging. + req.Host = URL.Host + + // conditionally apply headers + if sx.Accept != "" { + req.Header.Set("Accept", sx.Accept) + } + if sx.AcceptLanguage != "" { + req.Header.Set("Accept-Language", sx.AcceptLanguage) + } + if sx.Referer != "" { + req.Header.Set("Referer", sx.Referer) + } + if sx.UserAgent != "" { + req.Header.Set("User-Agent", sx.UserAgent) + } + + // req.Header["Host"] is ignored by Go but we want to have it in the measurement + // to reflect what we think has been sent as HTTP headers. + req.Header.Set("Host", req.Host) + return req, nil +} + +func (sx *HTTPRoundTripStage[T]) doRoundTrip(ctx context.Context, + conn HTTPConnection, logger model.Logger, req *http.Request) ([]*Observations, error) { + maxbody := sx.MaxBodySnapshotSize + if maxbody < 0 { + maxbody = 0 + } + + started := conn.Trace().TimeSince(conn.Trace().ZeroTime()) + + // manually create a single 1-length observations structure because + // the trace cannot automatically capture HTTP events + observations := []*Observations{ + NewObservations(), + } + + observations[0].NetworkEvents = append(observations[0].NetworkEvents, + measurexlite.NewAnnotationArchivalNetworkEvent( + conn.Trace().Index(), + started, + "http_transaction_start", + conn.Trace().Tags()..., + )) + + txp := conn.AsSingleUseTransport(logger) + + resp, err := txp.RoundTrip(req) + var body []byte + if err == nil { + defer resp.Body.Close() + + // TODO(bassosimone): we should probably start sampling when + // we create the connection rather than here + + // create sampler for measuring throttling + sampler := throttling.NewSampler(conn.Trace()) + defer sampler.Close() + + // read a snapshot of the response body + reader := io.LimitReader(resp.Body, maxbody) + body, err = netxlite.ReadAllContext(ctx, reader) // TODO(https://github.com/ooni/probe/issues/2622) + + // collect and save download speed samples + samples := sampler.ExtractSamples() + observations[0].NetworkEvents = append(observations[0].NetworkEvents, samples...) + } + finished := conn.Trace().TimeSince(conn.Trace().ZeroTime()) + + observations[0].NetworkEvents = append(observations[0].NetworkEvents, + measurexlite.NewAnnotationArchivalNetworkEvent( + conn.Trace().Index(), + finished, + "http_transaction_done", + conn.Trace().Tags()..., + )) + + observations[0].Requests = append(observations[0].Requests, + measurexlite.NewArchivalHTTPRequestResult( + conn.Trace().Index(), + started, + conn.Network(), + conn.RemoteAddress(), + conn.TLSNegotiatedProtocol(), + txp.Network(), + req, + resp, + maxbody, + body, + err, + finished, + conn.Trace().Tags()..., + )) + + return observations, err +} diff --git a/internal/x/dslvm/makeendpoints.go b/internal/x/dslvm/makeendpoints.go new file mode 100644 index 000000000..e41bfca43 --- /dev/null +++ b/internal/x/dslvm/makeendpoints.go @@ -0,0 +1,30 @@ +package dslvm + +import ( + "context" + "net" +) + +// MakeEndpointsStage is a [Stage] that transforms IP addresses to TCP/UDP endpoints. +type MakeEndpointsStage struct { + // Input contains the MANDATORY channel from which to read IP addresses. We + // assume that this channel will be closed when done. + Input <-chan string + + // Output is the MANDATORY channel emitting endpoints. We will close this + // channel when the Input channel has been closed. + Output chan<- string + + // Port is the MANDATORY port. + Port string +} + +var _ Stage = &MakeEndpointsStage{} + +// Run transforms IP addresses to endpoints. +func (sx *MakeEndpointsStage) Run(ctx context.Context, rtx Runtime) { + defer close(sx.Output) + for addr := range sx.Input { + sx.Output <- net.JoinHostPort(addr, sx.Port) + } +} diff --git a/internal/x/dslvm/observations.go b/internal/x/dslvm/observations.go new file mode 100644 index 000000000..da82edc5a --- /dev/null +++ b/internal/x/dslvm/observations.go @@ -0,0 +1,54 @@ +package dslvm + +import "github.com/ooni/probe-cli/v3/internal/model" + +// Observations is the skeleton shared by most OONI measurements where +// we group observations by type using standard test keys. +type Observations struct { + // NetworkEvents contains I/O events. + NetworkEvents []*model.ArchivalNetworkEvent `json:"network_events"` + + // Queries contains the DNS queries results. + Queries []*model.ArchivalDNSLookupResult `json:"queries"` + + // Requests contains HTTP request results. + Requests []*model.ArchivalHTTPRequestResult `json:"requests"` + + // TCPConnect contains the TCP connect results. + TCPConnect []*model.ArchivalTCPConnectResult `json:"tcp_connect"` + + // TLSHandshakes contains the TLS handshakes results. + TLSHandshakes []*model.ArchivalTLSOrQUICHandshakeResult `json:"tls_handshakes"` + + // QUICHandshakes contains the QUIC handshakes results. + QUICHandshakes []*model.ArchivalTLSOrQUICHandshakeResult `json:"quic_handshakes"` +} + +// NewObservations initializes all measurements to empty arrays and returns the +// Observations skeleton. +func NewObservations() *Observations { + return &Observations{ + NetworkEvents: []*model.ArchivalNetworkEvent{}, + Queries: []*model.ArchivalDNSLookupResult{}, + Requests: []*model.ArchivalHTTPRequestResult{}, + TCPConnect: []*model.ArchivalTCPConnectResult{}, + TLSHandshakes: []*model.ArchivalTLSOrQUICHandshakeResult{}, + QUICHandshakes: []*model.ArchivalTLSOrQUICHandshakeResult{}, + } +} + +// maybeTraceToObservations returns the observations inside the +// trace taking into account the case where trace is nil. +func maybeTraceToObservations(trace Trace) (out []*Observations) { + if trace != nil { + out = append(out, &Observations{ + NetworkEvents: trace.NetworkEvents(), + Queries: trace.DNSLookupsFromRoundTrip(), + Requests: []*model.ArchivalHTTPRequestResult{}, // no extractor inside trace! + TCPConnect: trace.TCPConnects(), + TLSHandshakes: trace.TLSHandshakes(), + QUICHandshakes: trace.QUICHandshakes(), + }) + } + return +} diff --git a/internal/x/dslvm/quic.go b/internal/x/dslvm/quic.go new file mode 100644 index 000000000..20ac5030f --- /dev/null +++ b/internal/x/dslvm/quic.go @@ -0,0 +1,172 @@ +package dslvm + +import ( + "context" + "crypto/tls" + "crypto/x509" + "sync" + "time" + + "github.com/ooni/probe-cli/v3/internal/logx" + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/netxlite" + "github.com/quic-go/quic-go" +) + +// QUICHandshakeStage is a [Stage] that creates [*QUICConnection]. +type QUICHandshakeStage struct { + // Input contains the MANDATORY channel from which to read endpoints. We + // assume that this channel will be closed when done. + Input <-chan string + + // InsecureSkipVerify OPTIONALLY skips QUIC verification. + InsecureSkipVerify bool + + // NextProtos OPTIONALLY configures the ALPN. + NextProtos []string + + // Output is the MANDATORY channel emitting [*QUICConnection]. We will close this + // channel when the Input channel has been closed. + Output chan<- *QUICConnection + + // RootCAs OPTIONALLY configures alternative root CAs. + RootCAs *x509.CertPool + + // ServerName is the MANDATORY server name. + ServerName string + + // Tags contains OPTIONAL tags to add to the endpoint observations. + Tags []string +} + +// QUICConnection is a QUIC connection. +type QUICConnection struct { + Conn quic.EarlyConnection + tlsConfig *tls.Config + tx Trace +} + +// AsSingleUseTransport implements HTTPConnection. +func (c *QUICConnection) AsSingleUseTransport(logger model.Logger) model.HTTPTransport { + return netxlite.NewHTTP3Transport(logger, netxlite.NewSingleUseQUICDialer(c.Conn), c.tlsConfig.Clone()) +} + +// Close implements HTTPConnection. +func (c *QUICConnection) Close(logger model.Logger) error { + ol := logx.NewOperationLogger(logger, "[#%d] QUICClose %s", c.tx.Index(), c.RemoteAddress()) + err := c.Conn.CloseWithError(0, "") + ol.Stop(err) + return err +} + +// Network implements HTTPConnection. +func (c *QUICConnection) Network() string { + return "udp" +} + +// RemoteAddress implements HTTPConnection. +func (c *QUICConnection) RemoteAddress() (addr string) { + if v := c.Conn.RemoteAddr(); v != nil { + addr = v.String() + } + return +} + +// Scheme implements HTTPConnection. +func (c *QUICConnection) Scheme() string { + return "https" +} + +// TLSNegotiatedProtocol implements HTTPConnection. +func (c *QUICConnection) TLSNegotiatedProtocol() string { + return c.Conn.ConnectionState().TLS.NegotiatedProtocol +} + +// Trace implements HTTPConnection. +func (c *QUICConnection) Trace() Trace { + return c.tx +} + +var _ HTTPConnection = &QUICConnection{} + +// Run is like [*TCPConnect.Run] except that it reads [endpoints] in Input and +// emits [*QUICConnection] in Output. Each QUIC handshake runs in its own background +// goroutine. The parallelism is controlled by the [Runtime] ActiveConnections [Semaphore] and +// you MUST arrange for the [*QUICConnection] to eventually enter into a [*CloseStage] +// such that the code can release the above mentioned [Semaphore] and close the conn. Note +// that this code TAKES OWNERSHIP of the [*TCPConnection] it reads. We will close these +// conns automatically on failure. On success, they will be closed when the [*QUICConnection] +// wrapping them eventually enters into a [*CloseStage]. +func (sx *QUICHandshakeStage) Run(ctx context.Context, rtx Runtime) { + // make sure we close the output channel + defer close(sx.Output) + + // track the number of running goroutines + waitGroup := &sync.WaitGroup{} + + for endpoint := range sx.Input { + // wait for authorization to process a connection + rtx.ActiveConnections().Wait() + + // process connection in a background goroutine + waitGroup.Add(1) + go func(endpoint string) { + defer waitGroup.Done() + sx.handshake(ctx, rtx, endpoint) + }(endpoint) + } + + // wait for pending work to finish + waitGroup.Wait() +} + +func (sx *QUICHandshakeStage) handshake(ctx context.Context, rtx Runtime, endpoint string) { + // create trace + trace := rtx.NewTrace(rtx.IDGenerator().Add(1), rtx.ZeroTime(), sx.Tags...) + + // create a suitable QUIC configuration + config := sx.newTLSConfig() + + // start the operation logger + ol := logx.NewOperationLogger( + rtx.Logger(), + "[#%d] QUICHandshake with %s SNI=%s ALPN=%v", + trace.Index(), + endpoint, + config.ServerName, + config.NextProtos, + ) + + // setup + udpListener := trace.NewUDPListener() + quicDialer := trace.NewQUICDialerWithoutResolver(udpListener, rtx.Logger()) + const timeout = 10 * time.Second + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + // handshake + quicConn, err := quicDialer.DialContext(ctx, endpoint, config, &quic.Config{}) + + // stop the operation logger + ol.Stop(err) + + // save the observations + rtx.SaveObservations(maybeTraceToObservations(trace)...) + + // handle error case + if err != nil { + return + } + + // handle success + sx.Output <- &QUICConnection{Conn: quicConn, tx: trace, tlsConfig: config} +} + +func (sx *QUICHandshakeStage) newTLSConfig() *tls.Config { + return &tls.Config{ + NextProtos: sx.NextProtos, + InsecureSkipVerify: sx.InsecureSkipVerify, + RootCAs: sx.RootCAs, + ServerName: sx.ServerName, + } +} diff --git a/internal/x/dslvm/runtime.go b/internal/x/dslvm/runtime.go new file mode 100644 index 000000000..75a8686e3 --- /dev/null +++ b/internal/x/dslvm/runtime.go @@ -0,0 +1,49 @@ +package dslvm + +import ( + "sync/atomic" + "time" + + "github.com/ooni/probe-cli/v3/internal/model" +) + +// Runtime is the runtime in which we execute the DSL. +type Runtime interface { + // ActiveConnections returns the [Semaphore] controlling the + // maximum number of active connections that we can have. + ActiveConnections() *Semaphore + + // ActiveDNSLookups returns the [Semaphore] controlling the + // maximum number of active DNS lookups that we can have. + ActiveDNSLookups() *Semaphore + + // IDGenerator returns an atomic counter used to generate + // separate unique IDs for each trace. + IDGenerator() *atomic.Int64 + + // Logger returns the base logger to use. + Logger() model.Logger + + // NewTrace creates a [Trace] instance. Note that each [Runtime] + // creates its own [Trace] type. A [Trace] is not guaranteed to collect + // [*Observations]. For example, [NewMinimalRuntime] creates a [Runtime] + // that does not collect any [*Observations]. + NewTrace(index int64, zeroTime time.Time, tags ...string) Trace + + // Observations returns the [*Observations] saved so far and clears our + // internal copy such that the next call to this method only returns + // the [*Observations] saved since the previous call. + // + // You can safely call this method from multiple goroutine contexts. + Observations() *Observations + + // SaveObservations saves [*Observations] inside the [Runtime]. You can + // get the saved [*Observations] by calling Observations. + // + // You can safely call this method from multiple goroutine contexts. + SaveObservations(obs ...*Observations) + + // ZeroTime returns the runtime's "zero" time, which is used as the + // starting point to generate observation's delta times. + ZeroTime() time.Time +} diff --git a/internal/x/dslvm/semaphore.go b/internal/x/dslvm/semaphore.go new file mode 100644 index 000000000..da491c872 --- /dev/null +++ b/internal/x/dslvm/semaphore.go @@ -0,0 +1,62 @@ +package dslvm + +import ( + "log" + "os" + "time" + + "github.com/ooni/probe-cli/v3/internal/runtimex" +) + +var semaphoreDebug = os.Getenv("OONI_DEBUG_SEMAPHORE") == "1" + +// Semaphore implements a semaphore. +// +// See https://en.wikipedia.org/wiki/Semaphore_(programming). +type Semaphore struct { + name string + ch chan bool +} + +// NewSemaphore creates a new [*Semaphore] with the given count of available resources. This +// function PANICS if the given count of available resources is zero or negative. +func NewSemaphore(name string, count int) *Semaphore { + runtimex.Assert(count >= 1, "expected count to be >= 1") + sema := &Semaphore{ + ch: make(chan bool, count), + name: name, + } + for idx := 0; idx < count; idx++ { + sema.ch <- true + } + + if semaphoreDebug { + log.Printf("semaphore %s[%p]: NEW[%d]", sema.name, sema, count) + } + + return sema +} + +// Signal signals that a resource is now available. +func (sema *Semaphore) Signal() { + if semaphoreDebug { + log.Printf("semaphore %s[%p]: SIGNAL", sema.name, sema) + } + + sema.ch <- true +} + +// Wait waits for a resource to be available. +func (sema *Semaphore) Wait() { + var t0 time.Time + if semaphoreDebug { + log.Printf("semaphore %s[%p]: WAIT", sema.name, sema) + t0 = time.Now() + } + + <-sema.ch + + if semaphoreDebug { + log.Printf("semaphore %s[%p]: READY (%v)", sema.name, sema, time.Since(t0)) + } +} diff --git a/internal/x/dslvm/stage.go b/internal/x/dslvm/stage.go new file mode 100644 index 000000000..3895b9741 --- /dev/null +++ b/internal/x/dslvm/stage.go @@ -0,0 +1,8 @@ +package dslvm + +import "context" + +// Stage is a stage in the DSL graph. +type Stage interface { + Run(ctx context.Context, rtx Runtime) +} diff --git a/internal/x/dslvm/start.go b/internal/x/dslvm/start.go new file mode 100644 index 000000000..d317714ee --- /dev/null +++ b/internal/x/dslvm/start.go @@ -0,0 +1,10 @@ +package dslvm + +import "context" + +// Start starts all the given [Stage] instances. +func Start(ctx context.Context, rtx Runtime, stages ...Stage) { + for _, stage := range stages { + go stage.Run(ctx, rtx) + } +} diff --git a/internal/x/dslvm/taken.go b/internal/x/dslvm/taken.go new file mode 100644 index 000000000..950b37d96 --- /dev/null +++ b/internal/x/dslvm/taken.go @@ -0,0 +1,38 @@ +package dslvm + +import "context" + +// TakeNStage is a [Stage] that allows N elements with type T to pass and drops subsequent elements. +type TakeNStage[T any] struct { + // Input contains the MANDATORY channel from which to read T. We + // assume that this channel will be closed when done. + Input <-chan T + + // N is the maximum number of entries to allow to pass. Any value + // lower than zero is equivalent to setting this field to zero. + N int64 + + // Output is the MANDATORY channel emitting [T]. We will close this + // channel when the Input channel has been closed. + Output chan<- T +} + +// Run runs the stage until completion. +func (sx *TakeNStage[T]) Run(ctx context.Context, rtx Runtime) { + // make sure we close the output channel + defer close(sx.Output) + + var count int64 + for element := range sx.Input { + + // if we've already observed N elements, just drop the N+1-th + if count >= sx.N { + drop[T](rtx, element) + continue + } + + // otherwise increment counter and forward + count++ + sx.Output <- element + } +} diff --git a/internal/x/dslvm/tcp.go b/internal/x/dslvm/tcp.go new file mode 100644 index 000000000..6545f4367 --- /dev/null +++ b/internal/x/dslvm/tcp.go @@ -0,0 +1,148 @@ +package dslvm + +import ( + "context" + "net" + "sync" + "time" + + "github.com/ooni/probe-cli/v3/internal/logx" + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/netxlite" +) + +// TCPConnectStage is a [Stage] that creates [*TCPConnection]. +type TCPConnectStage struct { + // Input contains the MANDATORY channel from which to read endpoints. We + // assume that this channel will be closed when done. + Input <-chan string + + // Output is the MANDATORY channel emitting [*TCPConnection]. We will close this + // channel when the Input channel has been closed. + Output chan<- *TCPConnection + + // Tags contains OPTIONAL tags to add to the endpoint observations. + Tags []string +} + +// TCPConnection is a TCP connection. +type TCPConnection struct { + Conn net.Conn + tx Trace +} + +var _ HTTPConnection = &TCPConnection{} + +// AsSingleUseTransport implements HTTPConnection. +func (c *TCPConnection) AsSingleUseTransport(logger model.Logger) model.HTTPTransport { + return netxlite.NewHTTPTransport(logger, netxlite.NewSingleUseDialer(c.Conn), netxlite.NewNullTLSDialer()) +} + +// Close implements HTTPConnection. +func (c *TCPConnection) Close(logger model.Logger) error { + ol := logx.NewOperationLogger(logger, "[#%d] TCPClose %s", c.tx.Index(), c.RemoteAddress()) + err := c.Conn.Close() + ol.Stop(err) + return err +} + +// Network implements HTTPConnection. +func (c *TCPConnection) Network() string { + return "tcp" +} + +// RemoteAddress implements HTTPConnection. +func (c *TCPConnection) RemoteAddress() (addr string) { + if v := c.Conn.RemoteAddr(); v != nil { + addr = v.String() + } + return +} + +// Scheme implements HTTPConnection. +func (c *TCPConnection) Scheme() string { + return "http" +} + +// TLSNegotiatedProtocol implements HTTPConnection. +func (c *TCPConnection) TLSNegotiatedProtocol() string { + return "" +} + +// Trace implements HTTPConnection. +func (c *TCPConnection) Trace() Trace { + return c.tx +} + +var _ Stage = &TCPConnectStage{} + +// Run reads endpoints from Input and streams on the Output channel the [*TCPConnection] +// that it could successfully establish. Note that this function honors the [Semaphore] returned +// by the [Runtime] ActiveConnections that controls how many connections we can measure in +// parallel. When given the permission to run, this function spawns a background goroutine that +// attempts to establish a connection. The [*TCPConnection] returned by this stage MUST +// eventually feed into a [*CloseStage], so that the code can notify the above mentioned +// [Semaphore] and so that we close the open connection. This function will close the Output +// channel when Inputs have been closed and there are no pending connection attempts. In +// case of failure, the code will automatically notify the [Semaphore]. +func (sx *TCPConnectStage) Run(ctx context.Context, rtx Runtime) { + // make sure we close the output channel + defer close(sx.Output) + + // track the number of running goroutines + waitGroup := &sync.WaitGroup{} + + for endpoint := range sx.Input { + // wait for authorization to process a connection + rtx.ActiveConnections().Wait() + + // process connection in a background goroutine + waitGroup.Add(1) + go func(endpoint string) { + defer waitGroup.Done() + sx.connect(ctx, rtx, endpoint) + }(endpoint) + } + + // wait for pending work to finish + waitGroup.Wait() +} + +func (sx *TCPConnectStage) connect(ctx context.Context, rtx Runtime, endpoint string) { + // create trace + trace := rtx.NewTrace(rtx.IDGenerator().Add(1), rtx.ZeroTime(), sx.Tags...) + + // start operation logger + ol := logx.NewOperationLogger( + rtx.Logger(), + "[#%d] TCPConnect %s", + trace.Index(), + endpoint, + ) + + // setup + const timeout = 15 * time.Second + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + // obtain the dialer to use + dialer := trace.NewDialerWithoutResolver(rtx.Logger()) + + // connect + conn, err := dialer.DialContext(ctx, "tcp", endpoint) + + // stop the operation logger + ol.Stop(err) + + // save the observations + rtx.SaveObservations(maybeTraceToObservations(trace)...) + + // handle error case + if err != nil { + rtx.ActiveConnections().Signal() // make sure we release the semaphore + return + } + + // handle success + sx.Output <- &TCPConnection{Conn: conn, tx: trace} +} diff --git a/internal/x/dslvm/teeaddrs.go b/internal/x/dslvm/teeaddrs.go new file mode 100644 index 000000000..a34a8aec4 --- /dev/null +++ b/internal/x/dslvm/teeaddrs.go @@ -0,0 +1,56 @@ +package dslvm + +import ( + "context" + "sync" +) + +// TeeAddrsStage is a [Stage] that duplicates the addresses read in Input into +// each of the channels belonging to [Outputs]. +type TeeAddrsStage struct { + // Input is the MANDATORY channel from which we read addresses. We assume + // this channel is closed when done. + Input <-chan string + + // Outputs is the MANDATORY list of channels where to duplicate the addresses read + // from the Input channel. We close all Outputs when done. + Outputs []chan<- string +} + +var _ Stage = &TeeAddrsStage{} + +// Run duplicates addresses read in Input into all the given Outputs. +func (sx *TeeAddrsStage) Run(ctx context.Context, rtx Runtime) { + // make sure we limit the maximum number of goroutines we will create here + sema := NewSemaphore("teeAddrs", 12) + + waitGroup := &sync.WaitGroup{} + for addr := range sx.Input { + for _, output := range sx.Outputs { + // make sure we can create a new goroutine + sema.Wait() + + // register that there is a running goroutine + waitGroup.Add(1) + + go func(addr string, output chan<- string) { + // make sure we track that this goroutine is done + defer waitGroup.Done() + + // make sure a new goroutine can start + defer sema.Signal() + + // duplicate the address in output + output <- addr + }(addr, output) + } + } + + // make sure all goroutines finished running + waitGroup.Wait() + + // close all the output channels + for _, output := range sx.Outputs { + close(output) + } +} diff --git a/internal/x/dslvm/tls.go b/internal/x/dslvm/tls.go new file mode 100644 index 000000000..631aebaea --- /dev/null +++ b/internal/x/dslvm/tls.go @@ -0,0 +1,171 @@ +package dslvm + +import ( + "context" + "crypto/tls" + "crypto/x509" + "sync" + "time" + + "github.com/ooni/probe-cli/v3/internal/logx" + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/netxlite" +) + +// TLSHandshakeStage is a [Stage] that creates [*TLSConnection]. +type TLSHandshakeStage struct { + // Input contains the MANDATORY channel from which to read [*TCPConnection]. We + // assume that this channel will be closed when done. + Input <-chan *TCPConnection + + // InsecureSkipVerify OPTIONALLY skips TLS verification. + InsecureSkipVerify bool + + // NextProtos OPTIONALLY configures the ALPN. + NextProtos []string + + // Output is the MANDATORY channel emitting [*TLSConnection]. We will close this + // channel when the Input channel has been closed. + Output chan<- *TLSConnection + + // RootCAs OPTIONALLY configures alternative root CAs. + RootCAs *x509.CertPool + + // ServerName is the MANDATORY server name. + ServerName string +} + +// TLSConnection is a TLS connection. +type TLSConnection struct { + Conn model.TLSConn + tx Trace +} + +var _ HTTPConnection = &TLSConnection{} + +// AsSingleUseTransport implements HTTPConnection. +func (c *TLSConnection) AsSingleUseTransport(logger model.Logger) model.HTTPTransport { + return netxlite.NewHTTPTransport(logger, netxlite.NewNullDialer(), netxlite.NewSingleUseTLSDialer(c.Conn)) +} + +// Close implements HTTPConnection. +func (c *TLSConnection) Close(logger model.Logger) error { + ol := logx.NewOperationLogger(logger, "[#%d] TLSClose %s", c.tx.Index(), c.RemoteAddress()) + err := c.Conn.Close() + ol.Stop(err) + return err +} + +// Network implements HTTPConnection. +func (c *TLSConnection) Network() string { + return "tcp" +} + +// RemoteAddress implements HTTPConnection. +func (c *TLSConnection) RemoteAddress() (addr string) { + if v := c.Conn.RemoteAddr(); v != nil { + addr = v.String() + } + return +} + +// Scheme implements HTTPConnection. +func (c *TLSConnection) Scheme() string { + return "https" +} + +// TLSNegotiatedProtocol implements HTTPConnection. +func (c *TLSConnection) TLSNegotiatedProtocol() string { + return c.Conn.ConnectionState().NegotiatedProtocol +} + +// Trace implements HTTPConnection. +func (c *TLSConnection) Trace() Trace { + return c.tx +} + +// Run is like [*TCPConnect.Run] except that it reads [*TCPConnection] in Input and +// emits [*TLSConnection] in Output. Each TLS handshake runs in its own background +// goroutine. The parallelism is controlled by the [Runtime] ActiveConnections [Semaphore] +// and you MUST arrange for the [*TLSConnection] to eventually enter into a [*CloseStage] +// such that the code can release the above mentioned [Semaphore] and close the conn. Note +// that this code TAKES OWNERSHIP of the [*TCPConnection] it reads. We will close these +// conns automatically on failure. On success, they will be closed when the [*TLSConnection] +// wrapping them eventually enters into a [*CloseStage]. +func (sx *TLSHandshakeStage) Run(ctx context.Context, rtx Runtime) { + // make sure we close the output channel + defer close(sx.Output) + + // track the number of running goroutines + waitGroup := &sync.WaitGroup{} + + for tcpConn := range sx.Input { + // process connection in a background goroutine, which is fine + // because the previous step has acquired the semaphore. + waitGroup.Add(1) + go func(tcpConn *TCPConnection) { + defer waitGroup.Done() + sx.handshake(ctx, rtx, tcpConn) + }(tcpConn) + } + + // wait for pending work to finish + waitGroup.Wait() +} + +func (sx *TLSHandshakeStage) handshake(ctx context.Context, rtx Runtime, tcpConn *TCPConnection) { + // keep using the same trace + trace := tcpConn.Trace() + + // create a suitable TLS configuration + config := sx.newTLSConfig() + + // start the operation logger + ol := logx.NewOperationLogger( + rtx.Logger(), + "[#%d] TLSHandshake with %s SNI=%s ALPN=%v", + trace.Index(), + tcpConn.RemoteAddress(), + config.ServerName, + config.NextProtos, + ) + + // obtain the handshaker for use + handshaker := trace.NewTLSHandshakerStdlib(rtx.Logger()) + + // setup + const timeout = 10 * time.Second + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + // handshake + tlsConn, err := handshaker.Handshake(ctx, tcpConn.Conn, config) + + // stop the operation logger + ol.Stop(err) + + // save the observations + rtx.SaveObservations(maybeTraceToObservations(trace)...) + + // handle error case + if err != nil { + rtx.ActiveConnections().Signal() // make sure we release the semaphore + tcpConn.Conn.Close() // make sure we close the conn + return + } + + // handle success + sx.Output <- &TLSConnection{ + Conn: tlsConn, + tx: trace, + } +} + +func (sx *TLSHandshakeStage) newTLSConfig() *tls.Config { + return &tls.Config{ + NextProtos: sx.NextProtos, + InsecureSkipVerify: sx.InsecureSkipVerify, + RootCAs: sx.RootCAs, + ServerName: sx.ServerName, + } +} diff --git a/internal/x/dslvm/trace.go b/internal/x/dslvm/trace.go new file mode 100644 index 000000000..8523f2b10 --- /dev/null +++ b/internal/x/dslvm/trace.go @@ -0,0 +1,74 @@ +package dslvm + +import ( + "time" + + "github.com/ooni/probe-cli/v3/internal/model" +) + +// Trace collects [*Observations] using tracing. Specific implementations +// of this interface may be engineered to collect no [*Observations] for +// efficiency (i.e., when you don't care about collecting [*Observations] +// but you still want to use this package). +type Trace interface { + // CloneBytesReceivedMap returns a clone of the internal bytes received map. The key of the + // map is a string following the "EPNT_ADDRESS PROTO" pattern where the "EPNT_ADDRESS" contains + // the endpoint address and "PROTO" is "tcp" or "udp". + CloneBytesReceivedMap() (out map[string]int64) + + // DNSLookupsFromRoundTrip returns all the DNS lookup results collected so far. + DNSLookupsFromRoundTrip() (out []*model.ArchivalDNSLookupResult) + + // Index returns the unique index used by this trace. + Index() int64 + + // NewDialerWithoutResolver is equivalent to netxlite.NewDialerWithoutResolver + // except that it returns a model.Dialer that uses this trace. + // + // Caveat: the dialer wrappers are there to implement the + // model.MeasuringNetwork interface, but they're not used by this function. + NewDialerWithoutResolver(dl model.DebugLogger, wrappers ...model.DialerWrapper) model.Dialer + + // NewParallelUDPResolver returns a possibly-trace-ware parallel UDP resolver + NewParallelUDPResolver(logger model.DebugLogger, dialer model.Dialer, address string) model.Resolver + + // NewQUICDialerWithoutResolver is equivalent to + // netxlite.NewQUICDialerWithoutResolver except that it returns a + // model.QUICDialer that uses this trace. + // + // Caveat: the dialer wrappers are there to implement the + // model.MeasuringNetwork interface, but they're not used by this function. + NewQUICDialerWithoutResolver(listener model.UDPListener, + dl model.DebugLogger, wrappers ...model.QUICDialerWrapper) model.QUICDialer + + // NewTLSHandshakerStdlib is equivalent to netxlite.NewTLSHandshakerStdlib + // except that it returns a model.TLSHandshaker that uses this trace. + NewTLSHandshakerStdlib(dl model.DebugLogger) model.TLSHandshaker + + // NetworkEvents returns all the network events collected so far. + NetworkEvents() (out []*model.ArchivalNetworkEvent) + + // NewStdlibResolver returns a possibly-trace-ware system resolver. + NewStdlibResolver(logger model.DebugLogger) model.Resolver + + // NewUDPListener implements model.MeasuringNetwork. + NewUDPListener() model.UDPListener + + // QUICHandshakes collects all the QUIC handshake results collected so far. + QUICHandshakes() (out []*model.ArchivalTLSOrQUICHandshakeResult) + + // TCPConnects collects all the TCP connect results collected so far. + TCPConnects() (out []*model.ArchivalTCPConnectResult) + + // TLSHandshakes collects all the TLS handshake results collected so far. + TLSHandshakes() (out []*model.ArchivalTLSOrQUICHandshakeResult) + + // Tags returns the trace tags. + Tags() []string + + // TimeSince is equivalent to Trace.TimeNow().Sub(t0). + TimeSince(t0 time.Time) time.Duration + + // ZeroTime returns the "zero" time of this trace. + ZeroTime() time.Time +} diff --git a/internal/x/dslvm/wait.go b/internal/x/dslvm/wait.go new file mode 100644 index 000000000..0afd67c68 --- /dev/null +++ b/internal/x/dslvm/wait.go @@ -0,0 +1,18 @@ +package dslvm + +import "sync" + +// Wait waits until all the given channels are done. +func Wait(channels ...<-chan Done) { + waitGroup := &sync.WaitGroup{} + for _, channel := range channels { + waitGroup.Add(1) + go func(channel <-chan Done) { + defer waitGroup.Done() + for range channel { + // drain! + } + }(channel) + } + waitGroup.Wait() +} From b3ddedf99fc82cc62886d053275621bed8fe7224 Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Wed, 13 Dec 2023 18:37:35 +0100 Subject: [PATCH 4/4] cleanup: move dslx inside internal/x (#1438) This is a follow-up from merging https://github.com/ooni/probe-cli/pull/1437 to also mark `dslx` as experimental. Related issue: https://github.com/ooni/probe/issues/2647 --- internal/tutorial/dslx/chapter02/README.md | 2 +- internal/tutorial/dslx/chapter02/main.go | 2 +- internal/{ => x}/dslx/address.go | 0 internal/{ => x}/dslx/address_test.go | 0 internal/{ => x}/dslx/dns.go | 0 internal/{ => x}/dslx/dns_test.go | 0 internal/{ => x}/dslx/doc.go | 0 internal/{ => x}/dslx/endpoint.go | 0 internal/{ => x}/dslx/endpoint_test.go | 0 internal/{ => x}/dslx/fxasync.go | 0 internal/{ => x}/dslx/fxasync_test.go | 0 internal/{ => x}/dslx/fxcore.go | 0 internal/{ => x}/dslx/fxcore_test.go | 0 internal/{ => x}/dslx/fxgen.go | 0 internal/{ => x}/dslx/fxstream.go | 0 internal/{ => x}/dslx/fxstream_test.go | 0 internal/{ => x}/dslx/http_test.go | 0 internal/{ => x}/dslx/httpcore.go | 0 internal/{ => x}/dslx/httpquic.go | 0 internal/{ => x}/dslx/httptcp.go | 0 internal/{ => x}/dslx/httptls.go | 0 internal/{ => x}/dslx/integration_test.go | 0 internal/{ => x}/dslx/observations.go | 0 internal/{ => x}/dslx/qa_test.go | 2 +- internal/{ => x}/dslx/quic.go | 0 internal/{ => x}/dslx/quic_test.go | 0 internal/{ => x}/dslx/runtimecore.go | 0 internal/{ => x}/dslx/runtimemeasurex.go | 0 internal/{ => x}/dslx/runtimemeasurex_test.go | 0 internal/{ => x}/dslx/runtimeminimal.go | 0 internal/{ => x}/dslx/runtimeminimal_test.go | 0 internal/{ => x}/dslx/tcp.go | 0 internal/{ => x}/dslx/tcp_test.go | 0 internal/{ => x}/dslx/tls.go | 0 internal/{ => x}/dslx/tls_test.go | 0 internal/{ => x}/dslx/trace.go | 0 36 files changed, 3 insertions(+), 3 deletions(-) rename internal/{ => x}/dslx/address.go (100%) rename internal/{ => x}/dslx/address_test.go (100%) rename internal/{ => x}/dslx/dns.go (100%) rename internal/{ => x}/dslx/dns_test.go (100%) rename internal/{ => x}/dslx/doc.go (100%) rename internal/{ => x}/dslx/endpoint.go (100%) rename internal/{ => x}/dslx/endpoint_test.go (100%) rename internal/{ => x}/dslx/fxasync.go (100%) rename internal/{ => x}/dslx/fxasync_test.go (100%) rename internal/{ => x}/dslx/fxcore.go (100%) rename internal/{ => x}/dslx/fxcore_test.go (100%) rename internal/{ => x}/dslx/fxgen.go (100%) rename internal/{ => x}/dslx/fxstream.go (100%) rename internal/{ => x}/dslx/fxstream_test.go (100%) rename internal/{ => x}/dslx/http_test.go (100%) rename internal/{ => x}/dslx/httpcore.go (100%) rename internal/{ => x}/dslx/httpquic.go (100%) rename internal/{ => x}/dslx/httptcp.go (100%) rename internal/{ => x}/dslx/httptls.go (100%) rename internal/{ => x}/dslx/integration_test.go (100%) rename internal/{ => x}/dslx/observations.go (100%) rename internal/{ => x}/dslx/qa_test.go (99%) rename internal/{ => x}/dslx/quic.go (100%) rename internal/{ => x}/dslx/quic_test.go (100%) rename internal/{ => x}/dslx/runtimecore.go (100%) rename internal/{ => x}/dslx/runtimemeasurex.go (100%) rename internal/{ => x}/dslx/runtimemeasurex_test.go (100%) rename internal/{ => x}/dslx/runtimeminimal.go (100%) rename internal/{ => x}/dslx/runtimeminimal_test.go (100%) rename internal/{ => x}/dslx/tcp.go (100%) rename internal/{ => x}/dslx/tcp_test.go (100%) rename internal/{ => x}/dslx/tls.go (100%) rename internal/{ => x}/dslx/tls_test.go (100%) rename internal/{ => x}/dslx/trace.go (100%) diff --git a/internal/tutorial/dslx/chapter02/README.md b/internal/tutorial/dslx/chapter02/README.md index adac8b742..8a0c1fa97 100644 --- a/internal/tutorial/dslx/chapter02/README.md +++ b/internal/tutorial/dslx/chapter02/README.md @@ -44,9 +44,9 @@ import ( "errors" "net" - "github.com/ooni/probe-cli/v3/internal/dslx" "github.com/ooni/probe-cli/v3/internal/model" "github.com/ooni/probe-cli/v3/internal/runtimex" + "github.com/ooni/probe-cli/v3/internal/x/dslx" ) ``` diff --git a/internal/tutorial/dslx/chapter02/main.go b/internal/tutorial/dslx/chapter02/main.go index 95b398c94..17e9502aa 100644 --- a/internal/tutorial/dslx/chapter02/main.go +++ b/internal/tutorial/dslx/chapter02/main.go @@ -45,9 +45,9 @@ import ( "errors" "net" - "github.com/ooni/probe-cli/v3/internal/dslx" "github.com/ooni/probe-cli/v3/internal/model" "github.com/ooni/probe-cli/v3/internal/runtimex" + "github.com/ooni/probe-cli/v3/internal/x/dslx" ) // ``` diff --git a/internal/dslx/address.go b/internal/x/dslx/address.go similarity index 100% rename from internal/dslx/address.go rename to internal/x/dslx/address.go diff --git a/internal/dslx/address_test.go b/internal/x/dslx/address_test.go similarity index 100% rename from internal/dslx/address_test.go rename to internal/x/dslx/address_test.go diff --git a/internal/dslx/dns.go b/internal/x/dslx/dns.go similarity index 100% rename from internal/dslx/dns.go rename to internal/x/dslx/dns.go diff --git a/internal/dslx/dns_test.go b/internal/x/dslx/dns_test.go similarity index 100% rename from internal/dslx/dns_test.go rename to internal/x/dslx/dns_test.go diff --git a/internal/dslx/doc.go b/internal/x/dslx/doc.go similarity index 100% rename from internal/dslx/doc.go rename to internal/x/dslx/doc.go diff --git a/internal/dslx/endpoint.go b/internal/x/dslx/endpoint.go similarity index 100% rename from internal/dslx/endpoint.go rename to internal/x/dslx/endpoint.go diff --git a/internal/dslx/endpoint_test.go b/internal/x/dslx/endpoint_test.go similarity index 100% rename from internal/dslx/endpoint_test.go rename to internal/x/dslx/endpoint_test.go diff --git a/internal/dslx/fxasync.go b/internal/x/dslx/fxasync.go similarity index 100% rename from internal/dslx/fxasync.go rename to internal/x/dslx/fxasync.go diff --git a/internal/dslx/fxasync_test.go b/internal/x/dslx/fxasync_test.go similarity index 100% rename from internal/dslx/fxasync_test.go rename to internal/x/dslx/fxasync_test.go diff --git a/internal/dslx/fxcore.go b/internal/x/dslx/fxcore.go similarity index 100% rename from internal/dslx/fxcore.go rename to internal/x/dslx/fxcore.go diff --git a/internal/dslx/fxcore_test.go b/internal/x/dslx/fxcore_test.go similarity index 100% rename from internal/dslx/fxcore_test.go rename to internal/x/dslx/fxcore_test.go diff --git a/internal/dslx/fxgen.go b/internal/x/dslx/fxgen.go similarity index 100% rename from internal/dslx/fxgen.go rename to internal/x/dslx/fxgen.go diff --git a/internal/dslx/fxstream.go b/internal/x/dslx/fxstream.go similarity index 100% rename from internal/dslx/fxstream.go rename to internal/x/dslx/fxstream.go diff --git a/internal/dslx/fxstream_test.go b/internal/x/dslx/fxstream_test.go similarity index 100% rename from internal/dslx/fxstream_test.go rename to internal/x/dslx/fxstream_test.go diff --git a/internal/dslx/http_test.go b/internal/x/dslx/http_test.go similarity index 100% rename from internal/dslx/http_test.go rename to internal/x/dslx/http_test.go diff --git a/internal/dslx/httpcore.go b/internal/x/dslx/httpcore.go similarity index 100% rename from internal/dslx/httpcore.go rename to internal/x/dslx/httpcore.go diff --git a/internal/dslx/httpquic.go b/internal/x/dslx/httpquic.go similarity index 100% rename from internal/dslx/httpquic.go rename to internal/x/dslx/httpquic.go diff --git a/internal/dslx/httptcp.go b/internal/x/dslx/httptcp.go similarity index 100% rename from internal/dslx/httptcp.go rename to internal/x/dslx/httptcp.go diff --git a/internal/dslx/httptls.go b/internal/x/dslx/httptls.go similarity index 100% rename from internal/dslx/httptls.go rename to internal/x/dslx/httptls.go diff --git a/internal/dslx/integration_test.go b/internal/x/dslx/integration_test.go similarity index 100% rename from internal/dslx/integration_test.go rename to internal/x/dslx/integration_test.go diff --git a/internal/dslx/observations.go b/internal/x/dslx/observations.go similarity index 100% rename from internal/dslx/observations.go rename to internal/x/dslx/observations.go diff --git a/internal/dslx/qa_test.go b/internal/x/dslx/qa_test.go similarity index 99% rename from internal/dslx/qa_test.go rename to internal/x/dslx/qa_test.go index 24fd1c8d0..56398e616 100644 --- a/internal/dslx/qa_test.go +++ b/internal/x/dslx/qa_test.go @@ -11,10 +11,10 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/gopacket/layers" "github.com/ooni/netem" - "github.com/ooni/probe-cli/v3/internal/dslx" "github.com/ooni/probe-cli/v3/internal/model" "github.com/ooni/probe-cli/v3/internal/netemx" "github.com/ooni/probe-cli/v3/internal/netxlite" + "github.com/ooni/probe-cli/v3/internal/x/dslx" ) // qaStringLessFunc is an utility function to force cmp.Diff to sort string diff --git a/internal/dslx/quic.go b/internal/x/dslx/quic.go similarity index 100% rename from internal/dslx/quic.go rename to internal/x/dslx/quic.go diff --git a/internal/dslx/quic_test.go b/internal/x/dslx/quic_test.go similarity index 100% rename from internal/dslx/quic_test.go rename to internal/x/dslx/quic_test.go diff --git a/internal/dslx/runtimecore.go b/internal/x/dslx/runtimecore.go similarity index 100% rename from internal/dslx/runtimecore.go rename to internal/x/dslx/runtimecore.go diff --git a/internal/dslx/runtimemeasurex.go b/internal/x/dslx/runtimemeasurex.go similarity index 100% rename from internal/dslx/runtimemeasurex.go rename to internal/x/dslx/runtimemeasurex.go diff --git a/internal/dslx/runtimemeasurex_test.go b/internal/x/dslx/runtimemeasurex_test.go similarity index 100% rename from internal/dslx/runtimemeasurex_test.go rename to internal/x/dslx/runtimemeasurex_test.go diff --git a/internal/dslx/runtimeminimal.go b/internal/x/dslx/runtimeminimal.go similarity index 100% rename from internal/dslx/runtimeminimal.go rename to internal/x/dslx/runtimeminimal.go diff --git a/internal/dslx/runtimeminimal_test.go b/internal/x/dslx/runtimeminimal_test.go similarity index 100% rename from internal/dslx/runtimeminimal_test.go rename to internal/x/dslx/runtimeminimal_test.go diff --git a/internal/dslx/tcp.go b/internal/x/dslx/tcp.go similarity index 100% rename from internal/dslx/tcp.go rename to internal/x/dslx/tcp.go diff --git a/internal/dslx/tcp_test.go b/internal/x/dslx/tcp_test.go similarity index 100% rename from internal/dslx/tcp_test.go rename to internal/x/dslx/tcp_test.go diff --git a/internal/dslx/tls.go b/internal/x/dslx/tls.go similarity index 100% rename from internal/dslx/tls.go rename to internal/x/dslx/tls.go diff --git a/internal/dslx/tls_test.go b/internal/x/dslx/tls_test.go similarity index 100% rename from internal/dslx/tls_test.go rename to internal/x/dslx/tls_test.go diff --git a/internal/dslx/trace.go b/internal/x/dslx/trace.go similarity index 100% rename from internal/dslx/trace.go rename to internal/x/dslx/trace.go