From fe5bb167d566fe04356b207ac8a2740e2bfd20af Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Wed, 25 Oct 2023 18:17:03 +0200 Subject: [PATCH 1/6] x --- internal/dslx/address.go | 2 + internal/dslx/dns.go | 35 ++++++-- internal/dslx/endpoint.go | 41 +++++++++ internal/dslx/fxasync.go | 53 ++++++++++++ internal/dslx/fxcore.go | 69 ++++++++++++++++ internal/dslx/fxstream.go | 12 ++- internal/dslx/qa_test.go | 160 +++++++++++++++++++++++++++++++++++- internal/netemx/scenario.go | 6 ++ 8 files changed, 362 insertions(+), 16 deletions(-) diff --git a/internal/dslx/address.go b/internal/dslx/address.go index c0b3076cd..7d06ac714 100644 --- a/internal/dslx/address.go +++ b/internal/dslx/address.go @@ -71,6 +71,8 @@ type EndpointPort uint16 // ToEndpoints transforms this set of IP addresses to a list of endpoints. We will // combine each IP address with the network and the port to construct an endpoint and // we will also apply any additional option to each endpoint. +// +// Deprecated: use MakeEndpoint instead. func (as *AddressSet) ToEndpoints( network EndpointNetwork, port EndpointPort, options ...EndpointOption) (v []*Endpoint) { for addr := range as.M { diff --git a/internal/dslx/dns.go b/internal/dslx/dns.go index 38249a87f..d67eaa97d 100644 --- a/internal/dslx/dns.go +++ b/internal/dslx/dns.go @@ -64,6 +64,26 @@ type ResolvedAddresses struct { Domain string } +// Flatten transforms a [ResolvedAddresses] into a slice of zero or more [ResolvedAddress]. +func (ra *ResolvedAddresses) Flatten() (out []*ResolvedAddress) { + for _, ipAddr := range ra.Addresses { + out = append(out, &ResolvedAddress{ + Address: ipAddr, + Domain: ra.Domain, + }) + } + return +} + +// ResolvedAddress is a single address resolved using a DNS lookup function. +type ResolvedAddress struct { + // Address is the address that was resolved. + Address string + + // Domain is the domain from which we resolved the address. + Domain string +} + // DNSLookupGetaddrinfo returns a function that resolves a domain name to // IP addresses using libc's getaddrinfo function. func DNSLookupGetaddrinfo(rt Runtime) Func[*DomainToResolve, *ResolvedAddresses] { @@ -90,18 +110,17 @@ func DNSLookupGetaddrinfo(rt Runtime) Func[*DomainToResolve, *ResolvedAddresses] // lookup addrs, err := resolver.LookupHost(ctx, input.Domain) - // stop the operation logger - ol.Stop(err) - // save the observations rt.SaveObservations(maybeTraceToObservations(trace)...) // handle error case if err != nil { + ol.Stop(err) return nil, err } // handle success + ol.Stop(addrs) state := &ResolvedAddresses{ Addresses: addrs, Domain: input.Domain, @@ -141,18 +160,17 @@ func DNSLookupUDP(rt Runtime, endpoint string) Func[*DomainToResolve, *ResolvedA // lookup addrs, err := resolver.LookupHost(ctx, input.Domain) - // stop the operation logger - ol.Stop(err) - // save the observations rt.SaveObservations(maybeTraceToObservations(trace)...) // handle error case if err != nil { + ol.Stop(err) return nil, err } // handle success + ol.Stop(addrs) state := &ResolvedAddresses{ Addresses: addrs, Domain: input.Domain, @@ -170,8 +188,11 @@ var ErrDNSLookupParallel = errors.New("dslx: DNSLookupParallel failed") // processing observations or by creating a per-DNS-resolver pipeline. func DNSLookupParallel(fxs ...Func[*DomainToResolve, *ResolvedAddresses]) Func[*DomainToResolve, *ResolvedAddresses] { return Operation[*DomainToResolve, *ResolvedAddresses](func(ctx context.Context, domain *DomainToResolve) (*ResolvedAddresses, error) { + // TODO(bassosimone): we may want to configure this + const parallelism = Parallelism(3) + // run all the DNS resolvers in parallel - results := Parallel(ctx, Parallelism(2), domain, fxs...) + results := Parallel(ctx, parallelism, domain, fxs...) // reduce addresses addressSet := NewAddressSet() diff --git a/internal/dslx/endpoint.go b/internal/dslx/endpoint.go index ea725f5d5..156f1a375 100644 --- a/internal/dslx/endpoint.go +++ b/internal/dslx/endpoint.go @@ -4,6 +4,12 @@ package dslx // Manipulate endpoints // +import ( + "context" + "net" + "strconv" +) + type ( // EndpointNetwork is the network of the endpoint EndpointNetwork string @@ -70,3 +76,38 @@ func NewEndpoint( } return epnt } + +// MakeEndpoint returns a [Func] that creates an [*Endpoint] given a [*ResolvedAddress]. +func MakeEndpoint(network EndpointNetwork, port EndpointPort, options ...EndpointOption) Func[*ResolvedAddress, *Endpoint] { + return Operation[*ResolvedAddress, *Endpoint](func(ctx context.Context, addr *ResolvedAddress) (*Endpoint, error) { + // create the destination endpoint address + addrport := EndpointAddress(net.JoinHostPort(addr.Address, strconv.Itoa(int(port)))) + + // make sure we include the proper domain first but allow the caller + // to potentially override the endpoint + allOptions := []EndpointOption{ + EndpointOptionDomain(addr.Domain), + } + allOptions = append(allOptions, options...) + + // build and return the endpoint + endpoint := NewEndpoint(network, addrport, allOptions...) + return endpoint, nil + }) +} + +// MeasureResolvedAddresses returns a [Func] that measures the resolved addresses provided +// as the input argument using each of the provided functions. +func MeasureResolvedAddresses(fxs ...Func[*ResolvedAddress, Void]) Func[*ResolvedAddresses, Void] { + return Operation[*ResolvedAddresses, Void](func(ctx context.Context, addrs *ResolvedAddresses) (Void, error) { + // TODO(bassosimone): we may want to configure this + const parallelism = Parallelism(3) + + // run the matrix until the output is drained + for range Matrix(ctx, parallelism, addrs.Flatten(), fxs) { + // nothing + } + + return Void{}, nil + }) +} diff --git a/internal/dslx/fxasync.go b/internal/dslx/fxasync.go index cfb428ec7..75e341a7f 100644 --- a/internal/dslx/fxasync.go +++ b/internal/dslx/fxasync.go @@ -30,6 +30,8 @@ type Parallelism int // The return value is the channel generating fx(a) // for every a in inputs. This channel will also be closed // to signal EOF to the consumer. +// +// Deprecated: use Matrix instead. func Map[A, B any]( ctx context.Context, parallelism Parallelism, @@ -77,6 +79,8 @@ func Map[A, B any]( // - fn is the list of functions. // // The return value is the list [fx(a)] for every fx in fn. +// +// Deprecated: use Matrix instead. func Parallel[A, B any]( ctx context.Context, parallelism Parallelism, @@ -90,6 +94,8 @@ func Parallel[A, B any]( // ParallelAsync is like Parallel but deals with channels. We assume the // input channel will be closed to signal EOF. We will close the output // channel to signal EOF to the consumer. +// +// Deprecated: use Matrix instead. func ParallelAsync[A, B any]( ctx context.Context, parallelism Parallelism, @@ -124,6 +130,8 @@ func ParallelAsync[A, B any]( } // ApplyAsync is equivalent to calling Apply but returns a channel. +// +// Deprecated: use Matrix instead. func ApplyAsync[A, B any]( ctx context.Context, fx Func[A, B], @@ -131,3 +139,48 @@ func ApplyAsync[A, B any]( ) <-chan *Maybe[B] { return Map(ctx, Parallelism(1), fx, StreamList(input)) } + +type matrixPoint[A, B any] struct { + f Func[A, B] + in A +} + +// Matrix invokes each function on each input using N goroutines streaming the results in output. +func Matrix[A, B any](ctx context.Context, N Parallelism, inputs []A, functions []Func[A, B]) <-chan *Maybe[B] { + // make output + output := make(chan *Maybe[B]) + + // stream all the possible points + points := make(chan *matrixPoint[A, B]) + go func() { + defer close(points) + for _, input := range inputs { + for _, fx := range functions { + points <- &matrixPoint[A, B]{f: fx, in: input} + } + } + }() + + // execute N goroutines + wg := &sync.WaitGroup{} + if N < 1 { + N = 1 + } + for i := Parallelism(0); i < N; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for p := range points { + output <- p.f.Apply(ctx, NewMaybeWithValue(p.in)) + } + }() + } + + // close output channel when done + go func() { + defer close(output) + wg.Wait() + }() + + return output +} diff --git a/internal/dslx/fxcore.go b/internal/dslx/fxcore.go index 8074a4b18..fc036c4cf 100644 --- a/internal/dslx/fxcore.go +++ b/internal/dslx/fxcore.go @@ -6,6 +6,8 @@ package dslx import ( "context" + "errors" + "sync" ) // Func is a function f: (context.Context, A) -> B. @@ -13,6 +15,14 @@ type Func[A, B any] interface { Apply(ctx context.Context, a *Maybe[A]) *Maybe[B] } +// FuncAdapter adapts a func to be a [Func]. +type FuncAdapter[A, B any] func(ctx context.Context, a *Maybe[A]) *Maybe[B] + +// Apply implements Func. +func (fa FuncAdapter[A, B]) Apply(ctx context.Context, a *Maybe[A]) *Maybe[B] { + return fa(ctx, a) +} + // Operation adapts a golang function to behave like a Func. type Operation[A, B any] func(ctx context.Context, a A) (B, error) @@ -73,3 +83,62 @@ type compose2Func[A, B, C any] struct { func (h *compose2Func[A, B, C]) Apply(ctx context.Context, a *Maybe[A]) *Maybe[C] { return h.g.Apply(ctx, h.f.Apply(ctx, a)) } + +// Void is the empty data structure. +type Void struct{} + +// Discard transforms any type to [Void]. +func Discard[T any]() Func[T, Void] { + return Operation[T, Void](func(ctx context.Context, input T) (Void, error) { + return Void{}, nil + }) +} + +// ErrSkip is an error that indicates that we already processed an error emitted +// by a previous stage, so we are using this error to avoid counting the original +// error more than once when computing statistics, e.g., in [*Stats]. +var ErrSkip = errors.New("dslx: error already processed by a previous stage") + +// Stats measures the number of successes and failures. +// +// The zero value is invalid; use [NewStats]. +type Stats[T any] struct { + m map[string]int64 + mu sync.Mutex +} + +// NewStats creates a [*Stats] instance. +func NewStats[T any]() *Stats[T] { + return &Stats[T]{ + m: map[string]int64{}, + mu: sync.Mutex{}, + } +} + +// Observer returns a Func that observes the results of the previous pipeline stage. +func (s *Stats[T]) Observer() Func[T, T] { + return FuncAdapter[T, T](func(ctx context.Context, minput *Maybe[T]) *Maybe[T] { + defer s.mu.Unlock() + s.mu.Lock() + var r string + if err := minput.Error; err != nil { + r = err.Error() + } + s.m[r]++ + if r != "" { + return NewMaybeWithError[T](ErrSkip) + } + return minput + }) +} + +// Export exports the current stats without clearing the internally used map. +func (s *Stats[T]) Export() (out map[string]int64) { + out = make(map[string]int64) + defer s.mu.Unlock() + s.mu.Lock() + for r, cnt := range s.m { + out[r] = cnt + } + return +} diff --git a/internal/dslx/fxstream.go b/internal/dslx/fxstream.go index 7b15efe45..b45cdb555 100644 --- a/internal/dslx/fxstream.go +++ b/internal/dslx/fxstream.go @@ -18,13 +18,11 @@ func Collect[T any](c <-chan T) (v []T) { // StreamList creates a channel out of static values. This function will // close the channel when it has streamed all the available elements. func StreamList[T any](ts ...T) <-chan T { - c := make(chan T) - go func() { - defer close(c) // as documented - for _, t := range ts { - c <- t - } - }() + c := make(chan T, len(ts)) + defer close(c) // as documented + for _, t := range ts { + c <- t + } return c } diff --git a/internal/dslx/qa_test.go b/internal/dslx/qa_test.go index f329482d2..fba2ea156 100644 --- a/internal/dslx/qa_test.go +++ b/internal/dslx/qa_test.go @@ -28,7 +28,7 @@ func TestDNSLookupQA(t *testing.T) { // name is the test case name name string - // newRuntime is the function that creates a new runtime + // newRuntime is the function creating a new runtime newRuntime func(netx model.MeasuringNetwork) dslx.Runtime // configureDPI configures DPI @@ -45,7 +45,7 @@ func TestDNSLookupQA(t *testing.T) { } cases := []testcase{{ - name: "successful case with minimal runtime", + name: "success with minimal runtime", newRuntime: func(netx model.MeasuringNetwork) dslx.Runtime { return dslx.NewMinimalRuntime(log.Log, time.Now(), dslx.MinimalRuntimeOptionMeasuringNetwork(netx)) }, @@ -134,3 +134,159 @@ func TestDNSLookupQA(t *testing.T) { }) } } + +func TestMeasureResolvedAddressesQA(t *testing.T) { + // testcase is a test case implemented by this function + type testcase struct { + // name is the test case name + name string + + // newRuntime is the function creating a new runtime + newRuntime func(netx model.MeasuringNetwork) dslx.Runtime + + // configureDPI configures DPI + configureDPI func(dpi *netem.DPIEngine) + + // expectTCP contains the expected TCP connect stats + expectTCP map[string]int64 + + // expectTLS contains the expected TLS handshake stats + expectTLS map[string]int64 + + // expectQUIC contains the expected QUIC handshake stats + expectQUIC map[string]int64 + } + + cases := []testcase{{ + name: "success with minimal runtime", + newRuntime: func(netx model.MeasuringNetwork) dslx.Runtime { + return dslx.NewMinimalRuntime(log.Log, time.Now(), dslx.MinimalRuntimeOptionMeasuringNetwork(netx)) + }, + configureDPI: func(dpi *netem.DPIEngine) { + // nothing + }, + expectTCP: map[string]int64{"": 2}, + expectTLS: map[string]int64{"": 2}, + expectQUIC: map[string]int64{"": 2}, + }, { + name: "TCP connection refused with minimal runtime", + newRuntime: func(netx model.MeasuringNetwork) dslx.Runtime { + return dslx.NewMinimalRuntime(log.Log, time.Now(), dslx.MinimalRuntimeOptionMeasuringNetwork(netx)) + }, + configureDPI: func(dpi *netem.DPIEngine) { + dpi.AddRule(&netem.DPICloseConnectionForServerEndpoint{ + Logger: log.Log, + ServerIPAddress: "8.8.8.8", + ServerPort: 443, + }) + }, + expectTCP: map[string]int64{ + "": 1, + "connection_refused": 1, + }, + expectTLS: map[string]int64{ + "": 1, + "dslx: error already processed by a previous stage": 1, + }, + expectQUIC: map[string]int64{"": 2}, + }, { + name: "TLS handshake reset with minimal runtime", + newRuntime: func(netx model.MeasuringNetwork) dslx.Runtime { + return dslx.NewMinimalRuntime(log.Log, time.Now(), dslx.MinimalRuntimeOptionMeasuringNetwork(netx)) + }, + configureDPI: func(dpi *netem.DPIEngine) { + dpi.AddRule(&netem.DPIResetTrafficForTLSSNI{ + Logger: log.Log, + SNI: "dns.google", + }) + }, + expectTCP: map[string]int64{"": 2}, + expectTLS: map[string]int64{ + "connection_reset": 2, + }, + expectQUIC: map[string]int64{"": 2}, + }} + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + // define the scenario characteristics with multiple IP addresses per host + + // create an internet testing scenario + env := netemx.MustNewScenario(netemx.InternetScenario) + defer env.Close() + + // create a dslx.Runtime using the client stack + rt := tc.newRuntime(&netxlite.Netx{ + Underlying: &netxlite.NetemUnderlyingNetworkAdapter{UNet: env.ClientStack}, + }) + defer rt.Close() + + // configure the DPI engine + tc.configureDPI(env.DPIEngine()) + + // create stats + var ( + tcpConnectStats = dslx.NewStats[*dslx.TCPConnection]() + tlsHandshakeStats = dslx.NewStats[*dslx.TLSConnection]() + quicHandshakeStats = dslx.NewStats[*dslx.QUICConnection]() + ) + + // create endpoint measurement function + function := dslx.MeasureResolvedAddresses( + // measure 443/tcp + dslx.Compose7( + dslx.MakeEndpoint("tcp", 443), + dslx.TCPConnect(rt), + tcpConnectStats.Observer(), + dslx.TLSHandshake(rt), + tlsHandshakeStats.Observer(), + dslx.HTTPRequestOverTLS(rt), + dslx.Discard[*dslx.HTTPResponse](), + ), + + // measure 443/udp + dslx.Compose5( + dslx.MakeEndpoint("udp", 443), + dslx.QUICHandshake( + rt, + // TODO(???): understand why certificate verification always + // fails when we're using netem along with HTTP/3 + dslx.TLSHandshakeOptionInsecureSkipVerify(true), + ), + quicHandshakeStats.Observer(), + dslx.HTTPRequestOverQUIC(rt), + dslx.Discard[*dslx.HTTPResponse](), + ), + ) + + // create context + ctx := context.Background() + + // fake out the resolved addresses + resolvedAddrs := &dslx.ResolvedAddresses{ + Addresses: []string{"8.8.8.8", "8.8.4.4"}, + Domain: "dns.google", + } + + // measure the endpoints + _ = function.Apply(ctx, dslx.NewMaybeWithValue(resolvedAddrs)) + + // make sure the TCP connect results are consistent + if diff := cmp.Diff(tc.expectTCP, tcpConnectStats.Export()); diff != "" { + t.Fatal(diff) + } + + // make sure the TLS handshake results are consistent + if diff := cmp.Diff(tc.expectTLS, tlsHandshakeStats.Export()); diff != "" { + t.Fatal(diff) + } + + // make sure the QUIC handshake results are consistent + if diff := cmp.Diff(tc.expectQUIC, quicHandshakeStats.Export()); diff != "" { + t.Fatal(diff) + } + + // TODO(bassosimone): make sure the observations are OK + }) + } +} diff --git a/internal/netemx/scenario.go b/internal/netemx/scenario.go index 1d9f6c7ce..b5b5a0f7c 100644 --- a/internal/netemx/scenario.go +++ b/internal/netemx/scenario.go @@ -209,6 +209,12 @@ func MustNewScenario(config []*ScenarioDomainAddresses) *QAEnv { ServerNameMain: sad.ServerNameMain, ServerNameExtras: sad.ServerNameExtras, }, + &HTTP3ServerFactory{ + Factory: &DNSOverHTTPSHandlerFactory{}, + Ports: []int{443}, + ServerNameMain: sad.ServerNameMain, + ServerNameExtras: sad.ServerNameExtras, + }, )) } From 6f464046681f431c8b62bf493b46996cab1b6f38 Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Wed, 25 Oct 2023 18:33:56 +0200 Subject: [PATCH 2/6] [ci skip] Apply suggestions from code review --- internal/dslx/endpoint.go | 4 ++-- internal/dslx/fxasync.go | 1 + internal/dslx/fxcore.go | 3 ++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/dslx/endpoint.go b/internal/dslx/endpoint.go index 156f1a375..6b5a403a4 100644 --- a/internal/dslx/endpoint.go +++ b/internal/dslx/endpoint.go @@ -83,8 +83,8 @@ func MakeEndpoint(network EndpointNetwork, port EndpointPort, options ...Endpoin // create the destination endpoint address addrport := EndpointAddress(net.JoinHostPort(addr.Address, strconv.Itoa(int(port)))) - // make sure we include the proper domain first but allow the caller - // to potentially override the endpoint + // make sure we include the proper domain name first but allow the caller + // to potentially override the domain name using options allOptions := []EndpointOption{ EndpointOptionDomain(addr.Domain), } diff --git a/internal/dslx/fxasync.go b/internal/dslx/fxasync.go index 75e341a7f..94904cbca 100644 --- a/internal/dslx/fxasync.go +++ b/internal/dslx/fxasync.go @@ -140,6 +140,7 @@ func ApplyAsync[A, B any]( return Map(ctx, Parallelism(1), fx, StreamList(input)) } +// matrixPoint is a point within the matrix used by [Matrix]. type matrixPoint[A, B any] struct { f Func[A, B] in A diff --git a/internal/dslx/fxcore.go b/internal/dslx/fxcore.go index fc036c4cf..e9a5197b6 100644 --- a/internal/dslx/fxcore.go +++ b/internal/dslx/fxcore.go @@ -115,7 +115,8 @@ func NewStats[T any]() *Stats[T] { } } -// Observer returns a Func that observes the results of the previous pipeline stage. +// Observer returns a Func that observes the results of the previous pipeline stage. This function +// converts any error that it sees to [ErrSkip]. This function does not account for [ErrSkip]. func (s *Stats[T]) Observer() Func[T, T] { return FuncAdapter[T, T](func(ctx context.Context, minput *Maybe[T]) *Maybe[T] { defer s.mu.Unlock() From 1ea176e187f7819c9b71b84f8e7a7e82fdaf1d5a Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Wed, 25 Oct 2023 18:50:19 +0200 Subject: [PATCH 3/6] x --- internal/dslx/fxcore.go | 11 +++++--- internal/dslx/fxstream.go | 4 +-- internal/dslx/qa_test.go | 47 +++++++++++++++++++++++---------- internal/dslx/quic.go | 3 +-- internal/dslx/runtimeminimal.go | 5 ++++ internal/dslx/trace.go | 3 +++ internal/measurexlite/udp.go | 1 + 7 files changed, 53 insertions(+), 21 deletions(-) diff --git a/internal/dslx/fxcore.go b/internal/dslx/fxcore.go index e9a5197b6..4dfcade41 100644 --- a/internal/dslx/fxcore.go +++ b/internal/dslx/fxcore.go @@ -116,24 +116,29 @@ func NewStats[T any]() *Stats[T] { } // Observer returns a Func that observes the results of the previous pipeline stage. This function -// converts any error that it sees to [ErrSkip]. This function does not account for [ErrSkip]. +// converts any error that it sees to [ErrSkip]. This function does not account for [ErrSkip], meaning +// that you will never see [ErrSkip] in the stats returned by [Stats.Export]. func (s *Stats[T]) Observer() Func[T, T] { return FuncAdapter[T, T](func(ctx context.Context, minput *Maybe[T]) *Maybe[T] { defer s.mu.Unlock() s.mu.Lock() var r string if err := minput.Error; err != nil { + if errors.Is(err, ErrSkip) { + return NewMaybeWithError[T](ErrSkip) // as documented + } r = err.Error() } s.m[r]++ if r != "" { - return NewMaybeWithError[T](ErrSkip) + return NewMaybeWithError[T](ErrSkip) // as documented } return minput }) } -// Export exports the current stats without clearing the internally used map. +// Export exports the current stats without clearing the internally used map such that +// statistics accumulate over time and never reset for the [*Stats] lifecycle. func (s *Stats[T]) Export() (out map[string]int64) { out = make(map[string]int64) defer s.mu.Unlock() diff --git a/internal/dslx/fxstream.go b/internal/dslx/fxstream.go index b45cdb555..21d067226 100644 --- a/internal/dslx/fxstream.go +++ b/internal/dslx/fxstream.go @@ -18,8 +18,8 @@ func Collect[T any](c <-chan T) (v []T) { // StreamList creates a channel out of static values. This function will // close the channel when it has streamed all the available elements. func StreamList[T any](ts ...T) <-chan T { - c := make(chan T, len(ts)) - defer close(c) // as documented + c := make(chan T, len(ts)) // buffer so writing does not block + defer close(c) // as documented for _, t := range ts { c <- t } diff --git a/internal/dslx/qa_test.go b/internal/dslx/qa_test.go index fba2ea156..6695727a9 100644 --- a/internal/dslx/qa_test.go +++ b/internal/dslx/qa_test.go @@ -9,6 +9,7 @@ import ( "github.com/apex/log" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/google/gopacket/layers" "github.com/ooni/netem" "github.com/ooni/probe-cli/v3/internal/dslx" "github.com/ooni/probe-cli/v3/internal/model" @@ -179,15 +180,16 @@ func TestMeasureResolvedAddressesQA(t *testing.T) { ServerIPAddress: "8.8.8.8", ServerPort: 443, }) + dpi.AddRule(&netem.DPICloseConnectionForServerEndpoint{ + Logger: log.Log, + ServerIPAddress: "8.8.4.4", + ServerPort: 443, + }) }, expectTCP: map[string]int64{ - "": 1, - "connection_refused": 1, - }, - expectTLS: map[string]int64{ - "": 1, - "dslx: error already processed by a previous stage": 1, + "connection_refused": 2, }, + expectTLS: map[string]int64{}, expectQUIC: map[string]int64{"": 2}, }, { name: "TLS handshake reset with minimal runtime", @@ -205,12 +207,34 @@ func TestMeasureResolvedAddressesQA(t *testing.T) { "connection_reset": 2, }, expectQUIC: map[string]int64{"": 2}, + }, { + name: "QUIC handshake timeout with minimal runtime", + newRuntime: func(netx model.MeasuringNetwork) dslx.Runtime { + return dslx.NewMinimalRuntime(log.Log, time.Now(), dslx.MinimalRuntimeOptionMeasuringNetwork(netx)) + }, + configureDPI: func(dpi *netem.DPIEngine) { + dpi.AddRule(&netem.DPIDropTrafficForServerEndpoint{ + Logger: log.Log, + ServerIPAddress: "8.8.8.8", + ServerPort: 443, + ServerProtocol: layers.IPProtocolUDP, + }) + dpi.AddRule(&netem.DPIDropTrafficForServerEndpoint{ + Logger: log.Log, + ServerIPAddress: "8.8.4.4", + ServerPort: 443, + ServerProtocol: layers.IPProtocolUDP, + }) + }, + expectTCP: map[string]int64{"": 2}, + expectTLS: map[string]int64{"": 2}, + expectQUIC: map[string]int64{ + "generic_timeout_error": 2, + }, }} for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - // define the scenario characteristics with multiple IP addresses per host - // create an internet testing scenario env := netemx.MustNewScenario(netemx.InternetScenario) defer env.Close() @@ -247,12 +271,7 @@ func TestMeasureResolvedAddressesQA(t *testing.T) { // measure 443/udp dslx.Compose5( dslx.MakeEndpoint("udp", 443), - dslx.QUICHandshake( - rt, - // TODO(???): understand why certificate verification always - // fails when we're using netem along with HTTP/3 - dslx.TLSHandshakeOptionInsecureSkipVerify(true), - ), + dslx.QUICHandshake(rt), quicHandshakeStats.Observer(), dslx.HTTPRequestOverQUIC(rt), dslx.Discard[*dslx.HTTPResponse](), diff --git a/internal/dslx/quic.go b/internal/dslx/quic.go index 3ad65241c..362f05250 100644 --- a/internal/dslx/quic.go +++ b/internal/dslx/quic.go @@ -11,7 +11,6 @@ import ( "time" "github.com/ooni/probe-cli/v3/internal/logx" - "github.com/ooni/probe-cli/v3/internal/netxlite" "github.com/quic-go/quic-go" ) @@ -35,7 +34,7 @@ func QUICHandshake(rt Runtime, options ...TLSHandshakeOption) Func[*Endpoint, *Q ) // setup - udpListener := netxlite.NewUDPListener() + udpListener := trace.NewUDPListener() quicDialer := trace.NewQUICDialerWithoutResolver(udpListener, rt.Logger()) const timeout = 10 * time.Second ctx, cancel := context.WithTimeout(ctx, timeout) diff --git a/internal/dslx/runtimeminimal.go b/internal/dslx/runtimeminimal.go index d0fd4aefd..1e7327693 100644 --- a/internal/dslx/runtimeminimal.go +++ b/internal/dslx/runtimeminimal.go @@ -170,6 +170,11 @@ func (tx *minimalTrace) NewTLSHandshakerStdlib(dl model.DebugLogger) model.TLSHa return tx.netx.NewTLSHandshakerStdlib(dl) } +// NewUDPListener implements Trace +func (tx *minimalTrace) NewUDPListener() model.UDPListener { + return tx.netx.NewUDPListener() +} + // QUICHandshakes implements Trace. func (tx *minimalTrace) QUICHandshakes() (out []*model.ArchivalTLSOrQUICHandshakeResult) { return []*model.ArchivalTLSOrQUICHandshakeResult{} diff --git a/internal/dslx/trace.go b/internal/dslx/trace.go index 09094712a..1cd0cc7bf 100644 --- a/internal/dslx/trace.go +++ b/internal/dslx/trace.go @@ -51,6 +51,9 @@ type Trace interface { // NewStdlibResolver returns a possibly-trace-ware system resolver. NewStdlibResolver(logger model.DebugLogger) model.Resolver + // NewUDPListener implements model.Measuring Network. + NewUDPListener() model.UDPListener + // QUICHandshakes collects all the QUIC handshake results collected so far. QUICHandshakes() (out []*model.ArchivalTLSOrQUICHandshakeResult) diff --git a/internal/measurexlite/udp.go b/internal/measurexlite/udp.go index ee9d1aaf0..72f2cb2fc 100644 --- a/internal/measurexlite/udp.go +++ b/internal/measurexlite/udp.go @@ -2,6 +2,7 @@ package measurexlite import "github.com/ooni/probe-cli/v3/internal/model" +// NewUDPListener implements model.Measuring Network. func (tx *Trace) NewUDPListener() model.UDPListener { return tx.Netx.NewUDPListener() } From 81cd9c4728c4045b922af5325ce393f1c9a1c517 Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Wed, 25 Oct 2023 19:05:47 +0200 Subject: [PATCH 4/6] x --- internal/dslx/fxasync.go | 12 +++++++++--- internal/dslx/fxasync_test.go | 12 ++++++++++++ internal/dslx/quic_test.go | 4 ++++ 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/internal/dslx/fxasync.go b/internal/dslx/fxasync.go index 94904cbca..53a7bbcfc 100644 --- a/internal/dslx/fxasync.go +++ b/internal/dslx/fxasync.go @@ -146,6 +146,14 @@ type matrixPoint[A, B any] struct { in A } +// matrixMin can be replaced with the built-in min when we switch to go1.21. +func matrixMin(a, b Parallelism) Parallelism { + if a < b { + return a + } + return b +} + // Matrix invokes each function on each input using N goroutines streaming the results in output. func Matrix[A, B any](ctx context.Context, N Parallelism, inputs []A, functions []Func[A, B]) <-chan *Maybe[B] { // make output @@ -164,9 +172,7 @@ func Matrix[A, B any](ctx context.Context, N Parallelism, inputs []A, functions // execute N goroutines wg := &sync.WaitGroup{} - if N < 1 { - N = 1 - } + N = matrixMin(1, N) for i := Parallelism(0); i < N; i++ { wg.Add(1) go func() { diff --git a/internal/dslx/fxasync_test.go b/internal/dslx/fxasync_test.go index 0e9824205..2893d6d3a 100644 --- a/internal/dslx/fxasync_test.go +++ b/internal/dslx/fxasync_test.go @@ -101,3 +101,15 @@ func TestParallel(t *testing.T) { } }) } + +func TestMatrixMin(t *testing.T) { + if v := matrixMin(1, 7); v != 1 { + t.Fatal("expected to see 1, got", v) + } + if v := matrixMin(7, 4); v != 4 { + t.Fatal("expected to see 4, got", v) + } + if v := matrixMin(11, 11); v != 11 { + t.Fatal("expected to see 11, got", v) + } +} diff --git a/internal/dslx/quic_test.go b/internal/dslx/quic_test.go index d798adc17..8272055a9 100644 --- a/internal/dslx/quic_test.go +++ b/internal/dslx/quic_test.go @@ -10,6 +10,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/ooni/probe-cli/v3/internal/mocks" "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/netxlite" "github.com/quic-go/quic-go" ) @@ -85,6 +86,9 @@ func TestQUICHandshake(t *testing.T) { MockNewQUICDialerWithoutResolver: func(listener model.UDPListener, logger model.DebugLogger, w ...model.QUICDialerWrapper) model.QUICDialer { return tt.dialer }, + MockNewUDPListener: func() model.UDPListener { + return netxlite.NewUDPListener() + }, })) quicHandshake := QUICHandshake(rt, TLSHandshakeOptionServerName(tt.sni)) endpoint := &Endpoint{ From 97559055c2ab34532b78c448bdefff96b073b738 Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Wed, 25 Oct 2023 19:23:30 +0200 Subject: [PATCH 5/6] [ci skip] Apply suggestions from code review --- internal/dslx/dns.go | 2 +- internal/dslx/endpoint.go | 4 ++-- internal/dslx/fxasync.go | 4 ++-- internal/dslx/qa_test.go | 2 +- internal/dslx/trace.go | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/dslx/dns.go b/internal/dslx/dns.go index d67eaa97d..c6f417289 100644 --- a/internal/dslx/dns.go +++ b/internal/dslx/dns.go @@ -188,7 +188,7 @@ var ErrDNSLookupParallel = errors.New("dslx: DNSLookupParallel failed") // processing observations or by creating a per-DNS-resolver pipeline. func DNSLookupParallel(fxs ...Func[*DomainToResolve, *ResolvedAddresses]) Func[*DomainToResolve, *ResolvedAddresses] { return Operation[*DomainToResolve, *ResolvedAddresses](func(ctx context.Context, domain *DomainToResolve) (*ResolvedAddresses, error) { - // TODO(bassosimone): we may want to configure this + // TODO(https://github.com/ooni/probe/issues/2619): we may want to configure this const parallelism = Parallelism(3) // run all the DNS resolvers in parallel diff --git a/internal/dslx/endpoint.go b/internal/dslx/endpoint.go index 6b5a403a4..c9f5b0c4a 100644 --- a/internal/dslx/endpoint.go +++ b/internal/dslx/endpoint.go @@ -77,7 +77,7 @@ func NewEndpoint( return epnt } -// MakeEndpoint returns a [Func] that creates an [*Endpoint] given a [*ResolvedAddress]. +// MakeEndpoint returns a [Func] that creates an [*Endpoint] given [*ResolvedAddress]. func MakeEndpoint(network EndpointNetwork, port EndpointPort, options ...EndpointOption) Func[*ResolvedAddress, *Endpoint] { return Operation[*ResolvedAddress, *Endpoint](func(ctx context.Context, addr *ResolvedAddress) (*Endpoint, error) { // create the destination endpoint address @@ -100,7 +100,7 @@ func MakeEndpoint(network EndpointNetwork, port EndpointPort, options ...Endpoin // as the input argument using each of the provided functions. func MeasureResolvedAddresses(fxs ...Func[*ResolvedAddress, Void]) Func[*ResolvedAddresses, Void] { return Operation[*ResolvedAddresses, Void](func(ctx context.Context, addrs *ResolvedAddresses) (Void, error) { - // TODO(bassosimone): we may want to configure this + // TODO(https://github.com/ooni/probe/issues/2619): we may want to configure this const parallelism = Parallelism(3) // run the matrix until the output is drained diff --git a/internal/dslx/fxasync.go b/internal/dslx/fxasync.go index 53a7bbcfc..c11c553d6 100644 --- a/internal/dslx/fxasync.go +++ b/internal/dslx/fxasync.go @@ -154,7 +154,7 @@ func matrixMin(a, b Parallelism) Parallelism { return b } -// Matrix invokes each function on each input using N goroutines streaming the results in output. +// Matrix invokes each function on each input using N goroutines and streams the results to a channel. func Matrix[A, B any](ctx context.Context, N Parallelism, inputs []A, functions []Func[A, B]) <-chan *Maybe[B] { // make output output := make(chan *Maybe[B]) @@ -170,7 +170,7 @@ func Matrix[A, B any](ctx context.Context, N Parallelism, inputs []A, functions } }() - // execute N goroutines + // spawn goroutines wg := &sync.WaitGroup{} N = matrixMin(1, N) for i := Parallelism(0); i < N; i++ { diff --git a/internal/dslx/qa_test.go b/internal/dslx/qa_test.go index 6695727a9..10f0c7671 100644 --- a/internal/dslx/qa_test.go +++ b/internal/dslx/qa_test.go @@ -305,7 +305,7 @@ func TestMeasureResolvedAddressesQA(t *testing.T) { t.Fatal(diff) } - // TODO(bassosimone): make sure the observations are OK + // TODO(https://github.com/ooni/probe/issues/2620): make sure the observations are OK }) } } diff --git a/internal/dslx/trace.go b/internal/dslx/trace.go index 1cd0cc7bf..8cc9f4030 100644 --- a/internal/dslx/trace.go +++ b/internal/dslx/trace.go @@ -51,7 +51,7 @@ type Trace interface { // NewStdlibResolver returns a possibly-trace-ware system resolver. NewStdlibResolver(logger model.DebugLogger) model.Resolver - // NewUDPListener implements model.Measuring Network. + // NewUDPListener implements model.MeasuringNetwork. NewUDPListener() model.UDPListener // QUICHandshakes collects all the QUIC handshake results collected so far. From 9796f4b0c31b2fa157ba43745c054a992a25e429 Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Wed, 25 Oct 2023 19:27:21 +0200 Subject: [PATCH 6/6] x --- internal/dslx/httpcore.go | 4 ++-- internal/dslx/qa_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/dslx/httpcore.go b/internal/dslx/httpcore.go index 56bd6241e..22e87363b 100644 --- a/internal/dslx/httpcore.go +++ b/internal/dslx/httpcore.go @@ -221,7 +221,7 @@ func httpRoundTrip( input *HTTPConnection, req *http.Request, ) (*http.Response, []byte, []*Observations, error) { - const maxbody = 1 << 19 // TODO(bassosimone): allow to configure this value? + const maxbody = 1 << 19 // TODO(https://github.com/ooni/probe/issues/2621): allow to configure this value started := input.Trace.TimeSince(input.Trace.ZeroTime()) // manually create a single 1-length observations structure because @@ -249,7 +249,7 @@ func httpRoundTrip( // read a snapshot of the response body reader := io.LimitReader(resp.Body, maxbody) - body, err = netxlite.ReadAllContext(ctx, reader) // TODO: enable streaming and measure speed + body, err = netxlite.ReadAllContext(ctx, reader) // TODO(https://github.com/ooni/probe/issues/2622) // collect and save download speed samples samples := sampler.ExtractSamples() diff --git a/internal/dslx/qa_test.go b/internal/dslx/qa_test.go index 10f0c7671..24fd1c8d0 100644 --- a/internal/dslx/qa_test.go +++ b/internal/dslx/qa_test.go @@ -131,7 +131,7 @@ func TestDNSLookupQA(t *testing.T) { t.Fatal(diff) } - // TODO(bassosimone): make sure the observations are OK + // TODO(https://github.com/ooni/probe/issues/2620): make sure the observations are OK }) } }