-
Notifications
You must be signed in to change notification settings - Fork 54
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: minipipeline to analyze measurements (#1393)
This diff introduces the `minipipeline` package and command. The general idea here is to analyze experiments results using an architecture that matches https://github.com/ooni/data as closely as possible. The advantage of using this architecture is that we can neatly divide the data analysis process into three phases: 1. ingestion converts an OONI measurement into observations, which are flat data structures; 2. analysis converts observations into an analysis structure, which is significantly simpler than observations; 3. scoring is the process (implemented by each experiment) to turn the analysis into top-level keys. That is: ```mermaid stateDiagram-v2 state "{ingestion}" as ingestion MEASUREMENT --> ingestion ingestion --> OBSERVATIONS state "{analysis}" as analysis OBSERVATIONS --> analysis analysis --> ANALYSIS state "{scoring}" as scoring ANALYSIS --> scoring scoring --> TOP_LEVEL_KEYS ``` This diff in particular tackles points 1 and 2. We'll implement 3 for Web Connectivity LTE in a separate diff. The diff itself commits several observations and analysis files along with the measurements from which they originate, which we all collectively use to write regression tests for the `minipipeline` package. The motivation for implementing these changes is that I need a more clearly defined model to address the differences between Web Connectivity LTE and v0.4 (see also #1392). Because there is already an architectural model for implementing these changes in ooni/data, it seems logical to borrow liberally from ooni/data. Incidentally, this change has the benefit of making it easier, in the future, to align ooni/probe-cli and ooni/data. The reference issue is ooni/probe#2634. Implementing this change seems now a precondition for implementing extra changes that would address such an issue for good, by changing LTE's scoring. Unlike the ooni/data pipeline, this pipeline is minimal (hence the name): it only aims to process measurements collected by a recent version of ooniprobe, rather than all measurements from the beginning of time. While there, disable signal integration tests because of ooni/probe#2636.
- Loading branch information
1 parent
1f8ca7e
commit bb7dd9c
Showing
116 changed files
with
44,824 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
/*.csv | ||
/*.deb | ||
/*.exe | ||
/*.json | ||
/*.jsonl | ||
/*.pprof | ||
/*.sqlite3 | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
package main | ||
|
||
import ( | ||
"flag" | ||
"fmt" | ||
"os" | ||
"path/filepath" | ||
|
||
"github.com/ooni/probe-cli/v3/internal/minipipeline" | ||
"github.com/ooni/probe-cli/v3/internal/must" | ||
"github.com/ooni/probe-cli/v3/internal/runtimex" | ||
) | ||
|
||
var ( | ||
// destdir is the -destdir flag | ||
destdir = flag.String("destdir", ".", "destination directory to use") | ||
|
||
// measurement is the -measurement flag | ||
measurement = flag.String("measurement", "", "measurement file to analyze") | ||
|
||
// mustWriteFileLn allows overwriting must.WriteFile in tests | ||
mustWriteFileFn = must.WriteFile | ||
|
||
// prefix is the -prefix flag | ||
prefix = flag.String("prefix", "", "prefix to add to generated files") | ||
|
||
// osExit allows overwriting os.Exit in tests | ||
osExit = os.Exit | ||
) | ||
|
||
func main() { | ||
flag.Parse() | ||
if *measurement == "" { | ||
fmt.Fprintf(os.Stderr, "\n") | ||
fmt.Fprintf(os.Stderr, "usage: %s -measurement <file> [-prefix <prefix>]\n", filepath.Base(os.Args[0])) | ||
fmt.Fprintf(os.Stderr, "\n") | ||
fmt.Fprintf(os.Stderr, "Mini measurement processing pipeline to reprocess recent probe measurements\n") | ||
fmt.Fprintf(os.Stderr, "and align results calculation with ooni/data.\n") | ||
fmt.Fprintf(os.Stderr, "\n") | ||
fmt.Fprintf(os.Stderr, "Analyzes the <file> provided using -measurement <file> and writes the\n") | ||
fmt.Fprintf(os.Stderr, "observations.json and analysis.json files in the -destdir <destdir> directory,\n") | ||
fmt.Fprintf(os.Stderr, "which must already exist.\n") | ||
fmt.Fprintf(os.Stderr, "\n") | ||
fmt.Fprintf(os.Stderr, "Use -prefix <prefix> to add <prefix> in front of the generated files names.\n") | ||
fmt.Fprintf(os.Stderr, "\n") | ||
osExit(1) | ||
} | ||
|
||
// parse the measurement file | ||
var parsed minipipeline.WebMeasurement | ||
must.UnmarshalJSON(must.ReadFile(*measurement), &parsed) | ||
|
||
// generate and write observations | ||
observationsPath := filepath.Join(*destdir, *prefix+"observations.json") | ||
container := runtimex.Try1(minipipeline.IngestWebMeasurement(&parsed)) | ||
mustWriteFileFn(observationsPath, must.MarshalAndIndentJSON(container, "", " "), 0600) | ||
|
||
// generate and write observations analysis | ||
analysisPath := filepath.Join(*destdir, *prefix+"analysis.json") | ||
analysis := minipipeline.AnalyzeWebObservations(container) | ||
mustWriteFileFn(analysisPath, must.MarshalAndIndentJSON(analysis, "", " "), 0600) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
package main | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"io/fs" | ||
"os" | ||
"path/filepath" | ||
"testing" | ||
|
||
"github.com/google/go-cmp/cmp" | ||
"github.com/ooni/probe-cli/v3/internal/must" | ||
) | ||
|
||
func mustloadfile(filename string) (object map[string]any) { | ||
data := must.ReadFile(filename) | ||
must.UnmarshalJSON(data, &object) | ||
return | ||
} | ||
|
||
func mustloaddata(contentmap map[string][]byte, key string) (object map[string]any) { | ||
data := contentmap[key] | ||
must.UnmarshalJSON(data, &object) | ||
return | ||
} | ||
|
||
func TestMainSuccess(t *testing.T) { | ||
// make sure we set the destination directory | ||
*destdir = "xo" | ||
|
||
// make sure we're reading from the expected input | ||
*measurement = filepath.Join("testdata", "measurement.json") | ||
|
||
// make sure we store the expected output | ||
contentmap := make(map[string][]byte) | ||
mustWriteFileFn = func(filename string, content []byte, mode fs.FileMode) { | ||
contentmap[filename] = content | ||
} | ||
|
||
// make sure osExit is correct | ||
osExit = os.Exit | ||
|
||
// also check whether we can add a prefix | ||
*prefix = "y-" | ||
|
||
// run the main function | ||
main() | ||
|
||
// make sure the generated observations are good | ||
expectedObservations := mustloadfile(filepath.Join("testdata", "observations.json")) | ||
gotObservations := mustloaddata(contentmap, filepath.Join("xo", "y-observations.json")) | ||
if diff := cmp.Diff(expectedObservations, gotObservations); diff != "" { | ||
t.Fatal(diff) | ||
} | ||
|
||
// make sure the generated analysis is good | ||
expectedAnalysis := mustloadfile(filepath.Join("testdata", "analysis.json")) | ||
gotAnalysis := mustloaddata(contentmap, filepath.Join("xo", "y-analysis.json")) | ||
if diff := cmp.Diff(expectedAnalysis, gotAnalysis); diff != "" { | ||
t.Fatal(diff) | ||
} | ||
} | ||
|
||
func TestMainUsage(t *testing.T) { | ||
// make sure we clear the destination directory | ||
*destdir = "" | ||
|
||
// make sure the expected input file is empty | ||
*measurement = "" | ||
|
||
// make sure we panic if we try to write on disk | ||
mustWriteFileFn = func(filename string, content []byte, mode fs.FileMode) { | ||
panic(errors.New("mustWriteFileFn")) | ||
} | ||
|
||
// make sure osExit is correct | ||
osExit = func(code int) { | ||
panic(fmt.Errorf("osExit: %d", code)) | ||
} | ||
|
||
// make sure the prefix is also clean | ||
*prefix = "" | ||
|
||
var err error | ||
func() { | ||
// intercept panic caused by osExit or other panics | ||
defer func() { | ||
if r := recover(); r != nil { | ||
err = r.(error) | ||
} | ||
}() | ||
|
||
// run the main function with the given args | ||
main() | ||
}() | ||
|
||
// make sure we've got the expected error | ||
if err == nil || err.Error() != "osExit: 1" { | ||
t.Fatal("expected", "os.Exit: 1", "got", err) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
{ | ||
"DNSExperimentFailure": null, | ||
"DNSTransactionsWithBogons": {}, | ||
"DNSTransactionsWithUnexpectedFailures": {}, | ||
"DNSPossiblyInvalidAddrs": {}, | ||
"HTTPDiffBodyProportionFactor": 1, | ||
"HTTPDiffStatusCodeMatch": true, | ||
"HTTPDiffTitleDifferentLongWords": {}, | ||
"HTTPDiffUncommonHeadersIntersection": { | ||
"x-drupal-cache": true, | ||
"x-generator": true | ||
}, | ||
"HTTPFinalResponses": { | ||
"4": true | ||
}, | ||
"HTTPFinalResponsesWithTLS": { | ||
"4": true | ||
}, | ||
"TCPTransactionsWithUnexpectedTCPConnectFailures": {}, | ||
"TCPTransactionsWithUnexpectedTLSHandshakeFailures": {}, | ||
"TCPTransactionsWithUnexpectedHTTPFailures": {}, | ||
"TCPTransactionsWithUnexplainedUnexpectedFailures": {} | ||
} |
Oops, something went wrong.