Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(dslx): collect observations using runtime #1383

Merged
merged 7 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions internal/dslx/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func DNSLookupGetaddrinfo(rt Runtime) Func[*DomainToResolve, *ResolvedAddresses]
// stop the operation logger
ol.Stop(err)

// save the observations
rt.SaveObservations(maybeTraceToObservations(trace)...)

state := &ResolvedAddresses{
Addresses: addrs, // maybe empty
Domain: input.Domain,
Expand Down Expand Up @@ -145,6 +148,9 @@ func DNSLookupUDP(rt Runtime, endpoint string) Func[*DomainToResolve, *ResolvedA
// stop the operation logger
ol.Stop(err)

// save the observations
rt.SaveObservations(maybeTraceToObservations(trace)...)

state := &ResolvedAddresses{
Addresses: addrs, // maybe empty
Domain: input.Domain,
Expand Down
142 changes: 63 additions & 79 deletions internal/dslx/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/ooni/probe-cli/v3/internal/model"
)

// TODO(bassosimone): this test suite should be written in terms of netemx.

bassosimone marked this conversation as resolved.
Show resolved Hide resolved
/*
Test cases:
- New domain to resolve:
Expand Down Expand Up @@ -61,13 +63,12 @@ func TestGetaddrinfo(t *testing.T) {
}

t.Run("with nil resolver", func(t *testing.T) {
f := DNSLookupGetaddrinfo(
NewMinimalRuntime(model.DiscardLogger, time.Now()),
)
rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now())
f := DNSLookupGetaddrinfo(rt)
ctx, cancel := context.WithCancel(context.Background())
cancel() // immediately cancel the lookup
res := f.Apply(ctx, NewMaybeWithValue(domain))
if res.Observations == nil || len(res.Observations) <= 0 {
if obs := rt.Observations(); obs == nil || len(obs.Queries) <= 0 {
t.Fatal("unexpected empty observations")
}
if res.Error == nil {
Expand All @@ -77,21 +78,17 @@ func TestGetaddrinfo(t *testing.T) {

t.Run("with lookup error", func(t *testing.T) {
mockedErr := errors.New("mocked")
f := DNSLookupGetaddrinfo(
NewMinimalRuntime(model.DiscardLogger, time.Now(), MinimalRuntimeOptionMeasuringNetwork(&mocks.MeasuringNetwork{
MockNewStdlibResolver: func(logger model.DebugLogger) model.Resolver {
return &mocks.Resolver{
MockLookupHost: func(ctx context.Context, domain string) ([]string, error) {
return nil, mockedErr
},
}
},
})),
)
rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now(), RuntimeMeasurexLiteOptionMeasuringNetwork(&mocks.MeasuringNetwork{
MockNewStdlibResolver: func(logger model.DebugLogger) model.Resolver {
return &mocks.Resolver{
MockLookupHost: func(ctx context.Context, domain string) ([]string, error) {
return nil, mockedErr
},
}
},
}))
f := DNSLookupGetaddrinfo(rt)
res := f.Apply(context.Background(), NewMaybeWithValue(domain))
if res.Observations == nil || len(res.Observations) <= 0 {
t.Fatal("unexpected empty observations")
}
if res.Error != mockedErr {
t.Fatalf("unexpected error type: %s", res.Error)
}
Expand All @@ -104,21 +101,17 @@ func TestGetaddrinfo(t *testing.T) {
})

t.Run("with success", func(t *testing.T) {
f := DNSLookupGetaddrinfo(
NewRuntimeMeasurexLite(model.DiscardLogger, time.Now(), RuntimeMeasurexLiteOptionMeasuringNetwork(&mocks.MeasuringNetwork{
MockNewStdlibResolver: func(logger model.DebugLogger) model.Resolver {
return &mocks.Resolver{
MockLookupHost: func(ctx context.Context, domain string) ([]string, error) {
return []string{"93.184.216.34"}, nil
},
}
},
})),
)
rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now(), RuntimeMeasurexLiteOptionMeasuringNetwork(&mocks.MeasuringNetwork{
MockNewStdlibResolver: func(logger model.DebugLogger) model.Resolver {
return &mocks.Resolver{
MockLookupHost: func(ctx context.Context, domain string) ([]string, error) {
return []string{"93.184.216.34"}, nil
},
}
},
}))
f := DNSLookupGetaddrinfo(rt)
res := f.Apply(context.Background(), NewMaybeWithValue(domain))
if res.Observations == nil || len(res.Observations) <= 0 {
t.Fatal("unexpected empty observations")
}
if res.Error != nil {
t.Fatalf("unexpected error: %s", res.Error)
}
Expand All @@ -144,18 +137,19 @@ Test cases:
- with success
*/
func TestLookupUDP(t *testing.T) {
t.Run("Apply dnsLookupGetaddrinfoFunc", func(t *testing.T) {
t.Run("Apply dnsLookupUDPFunc", func(t *testing.T) {
domain := &DomainToResolve{
Domain: "example.com",
Tags: []string{"antani"},
}

t.Run("with nil resolver", func(t *testing.T) {
f := DNSLookupUDP(NewMinimalRuntime(model.DiscardLogger, time.Now()), "1.1.1.1:53")
rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now())
f := DNSLookupUDP(rt, "1.1.1.1:53")
ctx, cancel := context.WithCancel(context.Background())
cancel()
res := f.Apply(ctx, NewMaybeWithValue(domain))
if res.Observations == nil || len(res.Observations) <= 0 {
if obs := rt.Observations(); obs == nil || len(obs.Queries) <= 0 {
t.Fatal("unexpected empty observations")
}
if res.Error == nil {
Expand All @@ -165,29 +159,24 @@ func TestLookupUDP(t *testing.T) {

t.Run("with lookup error", func(t *testing.T) {
mockedErr := errors.New("mocked")
f := DNSLookupUDP(
NewMinimalRuntime(model.DiscardLogger, time.Now(), MinimalRuntimeOptionMeasuringNetwork(&mocks.MeasuringNetwork{
MockNewParallelUDPResolver: func(logger model.DebugLogger, dialer model.Dialer, endpoint string) model.Resolver {
return &mocks.Resolver{
MockLookupHost: func(ctx context.Context, domain string) ([]string, error) {
return nil, mockedErr
},
}
},
MockNewDialerWithoutResolver: func(dl model.DebugLogger, w ...model.DialerWrapper) model.Dialer {
return &mocks.Dialer{
MockDialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
panic("should not be called")
},
}
},
})),
"1.1.1.1:53",
)
rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now(), RuntimeMeasurexLiteOptionMeasuringNetwork(&mocks.MeasuringNetwork{
MockNewParallelUDPResolver: func(logger model.DebugLogger, dialer model.Dialer, endpoint string) model.Resolver {
return &mocks.Resolver{
MockLookupHost: func(ctx context.Context, domain string) ([]string, error) {
return nil, mockedErr
},
}
},
MockNewDialerWithoutResolver: func(dl model.DebugLogger, w ...model.DialerWrapper) model.Dialer {
return &mocks.Dialer{
MockDialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
panic("should not be called")
},
}
},
}))
f := DNSLookupUDP(rt, "1.1.1.1:53")
res := f.Apply(context.Background(), NewMaybeWithValue(domain))
if res.Observations == nil || len(res.Observations) <= 0 {
t.Fatal("unexpected empty observations")
}
if res.Error != mockedErr {
t.Fatalf("unexpected error type: %s", res.Error)
}
Expand All @@ -200,29 +189,24 @@ func TestLookupUDP(t *testing.T) {
})

t.Run("with success", func(t *testing.T) {
f := DNSLookupUDP(
NewRuntimeMeasurexLite(model.DiscardLogger, time.Now(), RuntimeMeasurexLiteOptionMeasuringNetwork(&mocks.MeasuringNetwork{
MockNewParallelUDPResolver: func(logger model.DebugLogger, dialer model.Dialer, address string) model.Resolver {
return &mocks.Resolver{
MockLookupHost: func(ctx context.Context, domain string) ([]string, error) {
return []string{"93.184.216.34"}, nil
},
}
},
MockNewDialerWithoutResolver: func(dl model.DebugLogger, w ...model.DialerWrapper) model.Dialer {
return &mocks.Dialer{
MockDialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
panic("should not be called")
},
}
},
})),
"1.1.1.1:53",
)
rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now(), RuntimeMeasurexLiteOptionMeasuringNetwork(&mocks.MeasuringNetwork{
MockNewParallelUDPResolver: func(logger model.DebugLogger, dialer model.Dialer, address string) model.Resolver {
return &mocks.Resolver{
MockLookupHost: func(ctx context.Context, domain string) ([]string, error) {
return []string{"93.184.216.34"}, nil
},
}
},
MockNewDialerWithoutResolver: func(dl model.DebugLogger, w ...model.DialerWrapper) model.Dialer {
return &mocks.Dialer{
MockDialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
panic("should not be called")
},
}
},
}))
f := DNSLookupUDP(rt, "1.1.1.1:53")
res := f.Apply(context.Background(), NewMaybeWithValue(domain))
if res.Observations == nil || len(res.Observations) <= 0 {
t.Fatal("unexpected empty observations")
}
if res.Error != nil {
t.Fatalf("unexpected error: %s", res.Error)
}
Expand Down
5 changes: 3 additions & 2 deletions internal/dslx/fxcore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/ooni/probe-cli/v3/internal/mocks"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/netxlite"
"github.com/ooni/probe-cli/v3/internal/runtimex"
)

func getFn(err error, name string) Func[int, int] {
Expand All @@ -22,7 +21,9 @@ type fn struct {
}

func (f *fn) Apply(ctx context.Context, i *Maybe[int]) *Maybe[int] {
runtimex.Assert(i.Error == nil, "did not expect to see an error here")
if i.Error != nil {
return i
}
return &Maybe[int]{
Error: f.err,
State: i.State + 1,
Expand Down
44 changes: 17 additions & 27 deletions internal/dslx/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package dslx
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"strings"
Expand Down Expand Up @@ -296,27 +295,20 @@ func TestHTTPRequest(t *testing.T) {

// makeSureObservationsContainTags ensures the observations you can extract from
// the given HTTPResponse contain the tags we configured when testing
makeSureObservationsContainTags := func(res *Maybe[*HTTPResponse]) error {
// exclude the case where there was an error
if res.Error != nil {
return fmt.Errorf("unexpected error: %w", res.Error)
}

// obtain the observations
for _, obs := range ExtractObservations(res) {
makeSureObservationsContainTags := func(rt Runtime) error {
obs := rt.Observations()

// check the network events
for _, ev := range obs.NetworkEvents {
if diff := cmp.Diff([]string{"antani"}, ev.Tags); diff != "" {
return errors.New(diff)
}
// check the network events
for _, ev := range obs.NetworkEvents {
if diff := cmp.Diff([]string{"antani"}, ev.Tags); diff != "" {
return errors.New(diff)
}
}

// check the HTTP events
for _, ev := range obs.Requests {
if diff := cmp.Diff([]string{"antani"}, ev.Tags); diff != "" {
return errors.New(diff)
}
// check the HTTP events
for _, ev := range obs.Requests {
if diff := cmp.Diff([]string{"antani"}, ev.Tags); diff != "" {
return errors.New(diff)
}
}

Expand All @@ -331,17 +323,16 @@ func TestHTTPRequest(t *testing.T) {
Trace: trace,
Transport: goodTransport,
}
httpRequest := HTTPRequest(
NewMinimalRuntime(model.DiscardLogger, time.Now()),
)
rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now())
httpRequest := HTTPRequest(rt)
res := httpRequest.Apply(context.Background(), NewMaybeWithValue(&httpTransport))
if res.Error != nil {
t.Fatal("unexpected error")
}
if res.State.HTTPResponse == nil || res.State.HTTPResponse.Status != "expected" {
t.Fatal("unexpected request")
}
makeSureObservationsContainTags(res)
makeSureObservationsContainTags(rt)
})

t.Run("with success (http)", func(t *testing.T) {
Expand All @@ -352,17 +343,16 @@ func TestHTTPRequest(t *testing.T) {
Trace: trace,
Transport: goodTransport,
}
httpRequest := HTTPRequest(
NewMinimalRuntime(model.DiscardLogger, time.Now()),
)
rt := NewRuntimeMeasurexLite(model.DiscardLogger, time.Now())
httpRequest := HTTPRequest(rt)
res := httpRequest.Apply(context.Background(), NewMaybeWithValue(&httpTransport))
if res.Error != nil {
t.Fatal("unexpected error")
}
if res.State.HTTPResponse == nil || res.State.HTTPResponse.Status != "expected" {
t.Fatal("unexpected request")
}
makeSureObservationsContainTags(res)
makeSureObservationsContainTags(rt)
})

t.Run("with header options", func(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions internal/dslx/httpcore.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ func HTTPRequest(rt Runtime, options ...HTTPRequestOption) Func[*HTTPConnection,
ol.Stop(err)
}

// merge and save observations
observations = append(observations, maybeTraceToObservations(input.Trace)...)
rt.SaveObservations(observations...)

state := &HTTPResponse{
Address: input.Address,
Expand Down
14 changes: 6 additions & 8 deletions internal/dslx/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,17 @@ func TestMakeSureWeCollectSpeedSamples(t *testing.T) {
}

// measure the endpoint
result := f0.Apply(context.Background(), NewMaybeWithValue(epnt))
_ = f0.Apply(context.Background(), NewMaybeWithValue(epnt))

// get observations
observations := ExtractObservations(result)
observations := rt.Observations()

// process the network events and check for summary
var foundSummary bool
for _, entry := range observations {
for _, ev := range entry.NetworkEvents {
if ev.Operation == throttling.BytesReceivedCumulativeOperation {
t.Log(ev)
foundSummary = true
}
for _, ev := range observations.NetworkEvents {
if ev.Operation == throttling.BytesReceivedCumulativeOperation {
t.Log(ev)
foundSummary = true
}
}
if !foundSummary {
Expand Down
8 changes: 0 additions & 8 deletions internal/dslx/observations.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,6 @@ func NewObservations() *Observations {
}
}

// ExtractObservations extracts observations from a list of [Maybe].
func ExtractObservations[T any](rs ...*Maybe[T]) (out []*Observations) {
for _, r := range rs {
out = append(out, r.Observations...)
}
return
}

// maybeTraceToObservations returns the observations inside the
// trace taking into account the case where trace is nil.
func maybeTraceToObservations(trace Trace) (out []*Observations) {
Expand Down
3 changes: 3 additions & 0 deletions internal/dslx/quic.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func QUICHandshake(rt Runtime, options ...TLSHandshakeOption) Func[*Endpoint, *Q
// stop the operation logger
ol.Stop(err)

// save the observations
rt.SaveObservations(maybeTraceToObservations(trace)...)

state := &QUICConnection{
Address: input.Address,
QUICConn: quicConn, // possibly nil
Expand Down
Loading