From 99a8c4a0fa666c80cb10fb23a32770898d950c4a Mon Sep 17 00:00:00 2001 From: Alextopher Date: Thu, 18 Jan 2024 12:26:04 -0500 Subject: [PATCH 1/4] clean up map --- static/js/map.js | 83 +++++++++++++++++++++++++----------------------- 1 file changed, 43 insertions(+), 40 deletions(-) diff --git a/static/js/map.js b/static/js/map.js index 431b7ad..fb7c9be 100644 --- a/static/js/map.js +++ b/static/js/map.js @@ -4,48 +4,55 @@ const MILLISECONDS_PER_SECOND = 1000 const DISPLAY_TIME = MILLISECONDS_PER_SECOND * displayTimeSeconds; var circles = []; -// Connects to the websocket endpoint +// Circle class +class Circle { + constructor(x, y, distro, time) { + this.x = x; + this.y = y; + this.distro = distro; + this.time = time; + } +} + +// Connects to the websocket function connect() { let ws_scheme = window.location.protocol === "https:" ? "wss://" : "ws://"; let socket = new WebSocket(ws_scheme + window.location.host + "/ws"); socket.binaryType = "arraybuffer"; - socket.onopen = function (e) { - console.log("Connected!", e); - }; socket.onmessage = async function (message) { const buffer = new Uint8Array(message.data); + const time = new Date().getTime(); - // 8 message at 5 bytes = 40 bytes + // Messages are sent in large groups, where every 5 bytes is a new data point for (let i = 0; i < buffer.length; i += 5) { - // First byte is the distro id + // u8 distro id const distro = buffer[i]; + // u16 lat const lat = buffer[i + 1] << 8 | buffer[i + 2]; + // u16 long const long = buffer[i + 3] << 8 | buffer[i + 4]; - // Convert into x and y coordinates and put them on scale of 0-1 + // Convert lat long into (x, y) coordinates for the map and scale them between 0-1 const x = long / 4096; const y = (4096 - lat) / 4096; - // Add new data points to the front of the list - circles.unshift([x, y, distro, new Date().getTime()]); + // Add new data points to the end of the array + circles.push(new Circle(x, y, distro, time)); // count hits distros[distro][2] += 1; - - // block this thread for a bit - await new Promise((r) => setTimeout(r, Math.random() * 500)); } }; + socket.onclose = function (e) { - console.log("Disconnected!", e); + console.log("Disconnected from server, reconnecting in 5 seconds..."); + setTimeout(connect, 5000); }; - socket.onerror = function (e) { - console.log("Error!", e); - // Try to reconnect after 5 seconds - setTimeout(connect, 5000); + socket.onerror = function (e) { + console.error(e); }; return socket; @@ -68,42 +75,39 @@ window.onload = async function () { window.onresize(); while (true) { - let checkTime = new Date().getTime(); - + // Clear the canvas ctx.globalAlpha = 1; ctx.drawImage(img, 0, 0, canvas.width, canvas.height); - for (let i = 0; i < circles.length; i++) { - let circle = circles[i]; - distros[circle[2]][3] = 1; + // Remove old data points + const time = new Date().getTime(); - // Time difference - const delta = checkTime - circles[i][3]; - - // Remove old data points - if (delta > DISPLAY_TIME) { - // We know all future indexes are older - circles = circles.slice(0, i); - break; - } + // Find the index of the first data point that is too old + let index = circles.findIndex((c) => time - c[3] > DISPLAY_TIME); + if (index != -1) { + circles.splice(0, index); + } - // The color is passed from the template - ctx.fillStyle = distros[circle[2]][1]; + // Draw each circle + for (const circle of circles) { + const color = distros[circle.distro][1]; + ctx.fillStyle = color; ctx.beginPath(); - ctx.globalAlpha = 1 - (DISPLAY_TIME < 6000 ? 0 : (delta / (DISPLAY_TIME * 100))); + ctx.globalAlpha = 1 - ((time - circle.time) / DISPLAY_TIME); ctx.arc( - circle[0] * canvas.width, - circle[1] * canvas.height, - 2.0, + circle.x * canvas.width, + circle.y * canvas.height, + 2.0, // Radius 0, - 2 * Math.PI, + 2 * Math.PI, // Full circle false ); ctx.closePath(); ctx.fill(); } - // Print the legend + // Draw the legend + // TODO: Putting this on the canvas and doesn't scale well ctx.beginPath(); let incX = 0; let incY = 0; @@ -147,7 +151,6 @@ window.onload = async function () { } } - // Run around 60 fps const framesPerSecond = 5 await new Promise((handler) => setTimeout(handler, MILLISECONDS_PER_SECOND / framesPerSecond)); } From 242e9f89ffe686e88d0e54f86d1d242130fac145 Mon Sep 17 00:00:00 2001 From: Alextopher Date: Thu, 18 Jan 2024 12:26:37 -0500 Subject: [PATCH 2/4] logging; remove Panic and fix joining --- logging/logging.go | 61 ++++++++++++++++++++-------------------------- 1 file changed, 26 insertions(+), 35 deletions(-) diff --git a/logging/logging.go b/logging/logging.go index f1c5629..b75c969 100644 --- a/logging/logging.go +++ b/logging/logging.go @@ -21,8 +21,6 @@ const ( WarnT // ErrorT is used for logging error [ERROR] messages ErrorT - // PanicT is used for logging panic [PANIC] messages - PanicT // SuccessT is used for logging success [SUCCESS] messages SuccessT ) @@ -37,8 +35,6 @@ func (mt messageType) String() string { return "\033[1m\033[33m[WARN] \033[0m| " case ErrorT: return "\033[1m\033[31m[ERROR] \033[0m| " - case PanicT: - return "\033[1m\033[34m[PANIC] \033[0m| " case SuccessT: return "\033[1m\033[32m[SUCCESS] \033[0m| " default: @@ -77,11 +73,6 @@ func ErrorLogEntry(message string) LogEntry { return NewLogEntry(ErrorT, message) } -// PanicLogEntry creates a new LogEntry with the current time and [PANIC] tag -func PanicLogEntry(message string) LogEntry { - return NewLogEntry(PanicT, message) -} - // SuccessLogEntry creates a new LogEntry with the current time and [SUCCESS] tag func SuccessLogEntry(message string) LogEntry { return NewLogEntry(SuccessT, message) @@ -102,7 +93,19 @@ func (le LogEntry) Log() { logger.Unlock() } -func logf(mt messageType, format string, v ...interface{}) { +// Joins a `v ...any` slice into a string with spaces between each element +func join(v ...any) string { + s := "" + for i := 0; i < len(v); i++ { + s += fmt.Sprint(v[i]) + if i != len(v)-1 { + s += " " + } + } + return s +} + +func logf(mt messageType, format string, v ...any) { logger.Lock() if format[len(format)-1] != '\n' { fmt.Printf("%s %s %s\n", time.Now().Format(tm), mt.String(), fmt.Sprintf(format, v...)) @@ -112,70 +115,58 @@ func logf(mt messageType, format string, v ...interface{}) { logger.Unlock() } -func logln(mt messageType, v ...interface{}) { +func logln(mt messageType, v ...any) { logger.Lock() - fmt.Printf("%s %s %s\n", time.Now().Format(tm), mt.String(), fmt.Sprint(v...)) + fmt.Printf("%s %s %s\n", time.Now().Format(tm), mt.String(), join(v...)) logger.Unlock() } // Infof formats a message and logs it with [INFO] tag, it adds a newline if the message didn't end with one -func Infof(format string, v ...interface{}) { +func Infof(format string, v ...any) { logf(InfoT, format, v...) } // Info logs a message with [INFO] tag and a newline -func Info(v ...interface{}) { +func Info(v ...any) { logln(InfoT, v...) } -// WarnF formats a message and logs it with [WARN] tag, it adds a newline if the message didn't end with one -func WarnF(format string, v ...interface{}) { +// Warnf formats a message and logs it with [WARN] tag, it adds a newline if the message didn't end with one +func Warnf(format string, v ...any) { logf(WarnT, format, v...) } // Warn logs a message with [WARN] tag and a newline -func Warn(v ...interface{}) { +func Warn(v ...any) { logln(WarnT, v...) } // Errorf formats a message and logs it with [ERROR] tag, it adds a newline if the message didn't end with one -func Errorf(format string, v ...interface{}) { +func Errorf(format string, v ...any) { logf(ErrorT, format, v...) } // Error logs a message with [ERROR] tag and a newline -func Error(v ...interface{}) { +func Error(v ...any) { logln(ErrorT, v...) } -// Panicf formats a message and logs it with [PANIC] tag, it adds a newline if the message didn't end with one -// Note: this function does not call panic() or otherwise stops the program -func Panicf(format string, v ...interface{}) { - logf(PanicT, format, v...) -} - -// Panic logs a message with [PANIC] tag and a newline -// Note: this function does not call panic() or otherwise stops the program -func Panic(v ...interface{}) { - logln(PanicT, v...) -} - // Successf formats a message and logs it with [SUCCESS] tag, it adds a newline if the message didn't end with one -func Successf(format string, v ...interface{}) { +func Successf(format string, v ...any) { logf(SuccessT, format, v...) } // Success logs a message with [SUCCESS] tag and a newline -func Success(v ...interface{}) { +func Success(v ...any) { logln(SuccessT, v...) } // Logf formats a message and logs it with provided tag, it adds a newline if the message didn't end with one -func Logf(mt messageType, format string, v ...interface{}) { +func Logf(mt messageType, format string, v ...any) { logf(mt, format, v...) } // Log logs a message with provided tag and a newline -func Log(mt messageType, v ...interface{}) { +func Log(mt messageType, v ...any) { logln(mt, v...) } From 6b4530ad3e277f505213e5df1a548f04cb5640d2 Mon Sep 17 00:00:00 2001 From: Alextopher Date: Thu, 18 Jan 2024 12:27:20 -0500 Subject: [PATCH 3/4] Fix typo in tokens_test.go --- config/tokens_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/tokens_test.go b/config/tokens_test.go index c1534ec..62410a9 100644 --- a/config/tokens_test.go +++ b/config/tokens_test.go @@ -7,7 +7,7 @@ import ( "github.com/COSI-Lab/Mirror/config" ) -// Test tokens.txt parsing +// Test tokens.toml parsing func TestTokens(t *testing.T) { example := ` [[tokens]] From eb4071a54b9c2b9b0c7449f1729a663597101fe6 Mon Sep 17 00:00:00 2001 From: Alextopher Date: Thu, 18 Jan 2024 12:48:02 -0500 Subject: [PATCH 4/4] Continue rewrite: - moved aggregator logic to it's own module - scheduler: now returns an error if _any_ task setup fails - scheduler: will create `/var/log/mirror` if it doesn't exist - live map: websocket messages are now grouped by time instead of by a fixed number - config: moved "rsyncd.conf" generation into its own file - influxdb api connection is no longer a global variable - map.js: improved the code clarity and performance a little bit --- README.md | 9 ++ aggregator.go => aggregator/aggregator.go | 13 ++- .../aggregator_nginx.go | 18 ++-- .../aggregator_rsyncd.go | 6 +- aggregators.go | 61 ++++++++++++ config/configFile.go | 41 -------- config/rsyncd.go | 44 +++++++++ influx.go | 9 +- main.go | 93 ++++++------------- map.go | 28 ++++-- scripts/raspbian-tools | 1 - sync.go | 38 ++++---- webserver.go | 28 +++--- 13 files changed, 211 insertions(+), 178 deletions(-) rename aggregator.go => aggregator/aggregator.go (66%) rename aggregator_nginx.go => aggregator/aggregator_nginx.go (96%) rename aggregator_rsyncd.go => aggregator/aggregator_rsyncd.go (97%) create mode 100644 aggregators.go create mode 100644 config/rsyncd.go delete mode 160000 scripts/raspbian-tools diff --git a/README.md b/README.md index 5a76dac..c8f57dc 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,15 @@ MAXMIND_LICENSE_KEY= INFLUX_TOKEN= ``` +## NGINX + +The NGINX aggregator requires this `access_log` configuration: + +```nginx +log_format config '"$time_local" "$remote_addr" "$request" "$status" "$body_bytes_sent" "$request_length" "$http_user_agent"'; +access_log /var/log/nginx/access.log config; +``` + ## Dependencies Quick-Fedora-Mirror requires `zsh` diff --git a/aggregator.go b/aggregator/aggregator.go similarity index 66% rename from aggregator.go rename to aggregator/aggregator.go index 2127eef..d17d6dc 100644 --- a/aggregator.go +++ b/aggregator/aggregator.go @@ -1,4 +1,4 @@ -package main +package aggregator import ( "time" @@ -18,9 +18,14 @@ type Aggregator[T any] interface { Send(writer api.WriteAPI) } -// StartAggregator starts the aggregator with the given Aggregator implementation, channel of type T, influxdb QueryAPI and WriteAPI. -// It returns the lastUpdated time and an error if any occurred during initialization. -func StartAggregator[T any](aggregator Aggregator[T], c <-chan T) (lastUpdated time.Time, err error) { +// StartAggregator is a function that starts an aggregator process to continuously +// read data from a channel, aggregate it using the provided aggregator, and +// send the aggregated data to a writer at regular intervals. +// +// # It takes an influxdb reader, writer, an aggregator over type T, and a channel of type T +// +// It returns the time of the last update (from influxdb), which can be used as a filter +func StartAggregator[T any](reader api.QueryAPI, writer api.WriteAPI, aggregator Aggregator[T], c <-chan T) (lastUpdated time.Time, err error) { lastUpdated, err = aggregator.Init(reader) if err != nil { return lastUpdated, err diff --git a/aggregator_nginx.go b/aggregator/aggregator_nginx.go similarity index 96% rename from aggregator_nginx.go rename to aggregator/aggregator_nginx.go index a2a455d..c762116 100644 --- a/aggregator_nginx.go +++ b/aggregator/aggregator_nginx.go @@ -1,4 +1,4 @@ -package main +package aggregator import ( "bufio" @@ -17,6 +17,7 @@ import ( "time" "github.com/COSI-Lab/Mirror/logging" + "github.com/COSI-Lab/geoip" "github.com/IncSW/geoip2" influxdb2 "github.com/influxdata/influxdb-client-go/v2" "github.com/influxdata/influxdb-client-go/v2/api" @@ -79,7 +80,6 @@ func (aggregator *NGINXProjectAggregator) Init(reader api.QueryAPI) (lastUpdated result, err = reader.Query(context.Background(), request) if err != nil { - logging.Warn("Failed to querying influxdb nginx statistics", err) time.Sleep(time.Second) continue } @@ -88,7 +88,7 @@ func (aggregator *NGINXProjectAggregator) Init(reader api.QueryAPI) (lastUpdated } if err != nil { - return lastUpdated, errors.New("error querying influxdb") + return lastUpdated, err } stats := make(ProjectStatistics) @@ -207,7 +207,7 @@ func (aggregator *NGINXProjectAggregator) Send(writer api.WriteAPI) { } } -// NGINXLogEntry is a struct that represents a parsed nginx log entry +// NGINXLogEntry represents a parsed nginx log entry type NGINXLogEntry struct { IP net.IP City *geoip2.CityResult @@ -225,7 +225,7 @@ type NGINXLogEntry struct { var reQuotes = regexp.MustCompile(`"(.*?)"`) // TailNGINXLogFile tails a log file and sends the parsed log entries to the specified channels -func TailNGINXLogFile(logFile string, lastUpdated time.Time, channels []chan<- NGINXLogEntry) { +func TailNGINXLogFile(logFile string, lastUpdated time.Time, channels []chan<- NGINXLogEntry, geoipHandler *geoip.GeoIPHandler) { start := time.Now() f, err := os.Open(logFile) @@ -261,7 +261,7 @@ func TailNGINXLogFile(logFile string, lastUpdated time.Time, channels []chan<- N // Parse each line as we receive it for line := range tail.Lines { - entry, err := parseNginxLine(line.Text) + entry, err := parseNginxLine(geoipHandler, line.Text) if err == nil { for ch := range channels { @@ -280,7 +280,7 @@ func parseNginxDate(line string) (time.Time, error) { // It's critical the log file uses the correct format found at the top of this file // If the log file is not in the correct format or if some other part of the parsing fails // this function will return an error -func parseNginxLine(line string) (entry NGINXLogEntry, err error) { +func parseNginxLine(geoipHandler *geoip.GeoIPHandler, line string) (entry NGINXLogEntry, err error) { // "$time_local" "$remote_addr" "$request" "$status" "$body_bytes_sent" "$request_length" "$http_user_agent"; quoteList := reQuotes.FindAllString(line, -1) @@ -305,8 +305,8 @@ func parseNginxLine(line string) (entry NGINXLogEntry, err error) { return entry, errors.New("failed to parse ip") } - // Optional GeoIP lookup - if geoipHandler != nil { + // GeoIP lookup + if geoipHandler == nil { city, err := geoipHandler.Lookup(entry.IP) if err != nil { entry.City = nil diff --git a/aggregator_rsyncd.go b/aggregator/aggregator_rsyncd.go similarity index 97% rename from aggregator_rsyncd.go rename to aggregator/aggregator_rsyncd.go index f9faf35..31b89ca 100644 --- a/aggregator_rsyncd.go +++ b/aggregator/aggregator_rsyncd.go @@ -1,9 +1,8 @@ -package main +package aggregator import ( "bufio" "context" - "errors" "fmt" "io" "os" @@ -45,7 +44,6 @@ func (a *RSYNCDAggregator) Init(reader api.QueryAPI) (lastUpdated time.Time, err result, err = reader.Query(context.Background(), request) if err != nil { - logging.Warn("Failed to querying influxdb rsyncd statistics", err) time.Sleep(time.Second) continue } @@ -54,7 +52,7 @@ func (a *RSYNCDAggregator) Init(reader api.QueryAPI) (lastUpdated time.Time, err } if result == nil { - return time.Time{}, errors.New("Error querying influxdb for rsyncd stat") + return time.Time{}, err } for result.Next() { diff --git a/aggregators.go b/aggregators.go new file mode 100644 index 0000000..e5e73c7 --- /dev/null +++ b/aggregators.go @@ -0,0 +1,61 @@ +package main + +import ( + "net" + "time" + + "github.com/COSI-Lab/Mirror/aggregator" + "github.com/COSI-Lab/Mirror/config" + "github.com/COSI-Lab/Mirror/logging" + "github.com/influxdata/influxdb-client-go/v2/api" +) + +func StartNGINXAggregator(reader api.QueryAPI, writer api.WriteAPI, config *config.File) (chan<- aggregator.NGINXLogEntry, time.Time, error) { + nginxAg := aggregator.NewNGINXProjectAggregator() + nginxAg.AddMeasurement("nginx", func(re aggregator.NGINXLogEntry) bool { + return true + }) + + // Add subnet aggregators + for name, subnetStrings := range config.Subnets { + subnets := make([]*net.IPNet, 0) + for _, subnetString := range subnetStrings { + _, subnet, err := net.ParseCIDR(subnetString) + if err != nil { + logging.Warnf("Failed to parse subnet %q for %q", subnetString, name) + continue + } + subnets = append(subnets, subnet) + } + + if len(subnets) == 0 { + logging.Warn("No valid subnets for", name) + continue + } + + nginxAg.AddMeasurement(name, func(re aggregator.NGINXLogEntry) bool { + for _, subnet := range subnets { + if subnet.Contains(re.IP) { + return true + } + } + return false + }) + + logging.Infof("Added subnet aggregator for %q", name) + } + + nginxMetrics := make(chan aggregator.NGINXLogEntry) + nginxLastUpdated, err := aggregator.StartAggregator[aggregator.NGINXLogEntry](reader, writer, nginxAg, nginxMetrics) + + return nginxMetrics, nginxLastUpdated, err +} + +func StartRSYNCAggregator(reader api.QueryAPI, writer api.WriteAPI) (chan<- aggregator.RSCYNDLogEntry, time.Time, error) { + rsyncAg := aggregator.NewRSYNCProjectAggregator() + + rsyncMetrics := make(chan aggregator.RSCYNDLogEntry) + rsyncLastUpdated, err := aggregator.StartAggregator[aggregator.RSCYNDLogEntry](reader, writer, rsyncAg, rsyncMetrics) + + return rsyncMetrics, rsyncLastUpdated, err +} diff --git a/config/configFile.go b/config/configFile.go index f68f7b3..2ba244d 100644 --- a/config/configFile.go +++ b/config/configFile.go @@ -8,7 +8,6 @@ import ( "os" "sort" "strings" - "text/template" "github.com/xeipuuv/gojsonschema" ) @@ -101,46 +100,6 @@ func (config *File) GetProjects() []Project { return projects } -// CreateRSCYNDConfig writes a rsyncd.conf file to the given writer based on the config -// -// Consider passing a bufio.Write to this function -func (config *File) CreateRSCYNDConfig(w io.Writer) error { - tmpl := `# This is a generated file. Do not edit manually. - uid = nobody - gid = nogroup - use chroot = yes - max connections = 0 - pid file = /var/run/rsyncd.pid - motd file = /etc/rsyncd.motd - log file = /var/log/rsyncd.log - log format = %t %o %a %m %f %b - dont compress = *.gz *.tgz *.zip *.z *.Z *.rpm *.deb *.bz2 *.tbz2 *.xz *.txz *.rar - refuse options = checksum delete - {{ range . }} - [{{ .Short }}] - comment = {{ .Name }} - path = /storage/{{ .Short }} - exclude = lost+found/ - read only = true - ignore nonreadable = yes{{ end }} - ` - - var filteredProjects []*Project - for _, project := range config.Projects { - if project.PublicRsync { - filteredProjects = append(filteredProjects, project) - } - } - - t := template.Must(template.New("rsyncd.conf").Parse(tmpl)) - err := t.Execute(w, filteredProjects) - if err != nil { - return err - } - - return nil -} - // Validate checks the config file for a few properties // // - All projects have a unique long name, case insensitive diff --git a/config/rsyncd.go b/config/rsyncd.go new file mode 100644 index 0000000..3489811 --- /dev/null +++ b/config/rsyncd.go @@ -0,0 +1,44 @@ +package config + +import ( + "io" + "text/template" +) + +// CreateRSCYNDConfig writes a rsyncd.conf file to the given writer based on the Config struct +func (config *File) CreateRSCYNDConfig(w io.Writer) error { + tmpl := `# This is a generated file. Do not edit manually. + uid = nobody + gid = nogroup + use chroot = yes + max connections = 0 + pid file = /var/run/rsyncd.pid + motd file = /etc/rsyncd.motd + log file = /var/log/rsyncd.log + log format = %t %o %a %m %f %b + dont compress = *.gz *.tgz *.zip *.z *.Z *.rpm *.deb *.bz2 *.tbz2 *.xz *.txz *.rar + refuse options = checksum delete + {{ range . }} + [{{ .Short }}] + comment = {{ .Name }} + path = /storage/{{ .Short }} + exclude = lost+found/ + read only = true + ignore nonreadable = yes{{ end }} + ` + + var filteredProjects []*Project + for _, project := range config.Projects { + if project.PublicRsync { + filteredProjects = append(filteredProjects, project) + } + } + + t := template.Must(template.New("rsyncd.conf").Parse(tmpl)) + err := t.Execute(w, filteredProjects) + if err != nil { + return err + } + + return nil +} diff --git a/influx.go b/influx.go index d941821..658668a 100644 --- a/influx.go +++ b/influx.go @@ -7,17 +7,12 @@ import ( "github.com/influxdata/influxdb-client-go/v2/api" ) -var writer api.WriteAPI -var reader api.QueryAPI - // SetupInfluxClients connects to influxdb and sets up the db clients -func SetupInfluxClients(token string) { +func SetupInfluxClients(token string) (reader api.QueryAPI, writer api.WriteAPI) { // create new client with default option for server url authenticate by token options := influxdb2.DefaultOptions() options.SetTLSConfig(&tls.Config{InsecureSkipVerify: true}) client := influxdb2.NewClientWithOptions("https://mirror.clarkson.edu:8086", token, options) - - writer = client.WriteAPI("COSI", "stats") - reader = client.QueryAPI("COSI") + return client.QueryAPI("COSI"), client.WriteAPI("COSI", "stats") } diff --git a/main.go b/main.go index 8177422..ba38e48 100644 --- a/main.go +++ b/main.go @@ -4,12 +4,12 @@ import ( "context" "errors" "fmt" - "net" "os" "os/signal" "runtime" "time" + "github.com/COSI-Lab/Mirror/aggregator" "github.com/COSI-Lab/Mirror/config" "github.com/COSI-Lab/Mirror/logging" "github.com/COSI-Lab/geoip" @@ -89,58 +89,8 @@ func loadTokens() (*config.Tokens, error) { return tokens, nil } -func startNGINX(config *config.File) (chan<- NGINXLogEntry, time.Time, error) { - nginxAg := NewNGINXProjectAggregator() - nginxAg.AddMeasurement("nginx", func(re NGINXLogEntry) bool { - return true - }) - - // Add subnet aggregators - for name, subnetStrings := range config.Subnets { - subnets := make([]*net.IPNet, 0) - for _, subnetString := range subnetStrings { - _, subnet, err := net.ParseCIDR(subnetString) - if err != nil { - logging.Warn("Failed to parse subnet", subnetString, "for", name) - continue - } - subnets = append(subnets, subnet) - } - - if len(subnets) == 0 { - logging.Warn("No valid subnets for", name) - continue - } - - nginxAg.AddMeasurement(name, func(re NGINXLogEntry) bool { - for _, subnet := range subnets { - if subnet.Contains(re.IP) { - return true - } - } - return false - }) - - logging.Info("Added subnet aggregator for", name) - } - - nginxMetrics := make(chan NGINXLogEntry) - nginxLastUpdated, err := StartAggregator[NGINXLogEntry](nginxAg, nginxMetrics) - - return nginxMetrics, nginxLastUpdated, err -} - -func startRSYNC() (chan<- RSCYNDLogEntry, time.Time, error) { - rsyncAg := NewRSYNCProjectAggregator() - - rsyncMetrics := make(chan RSCYNDLogEntry) - rsyncLastUpdated, err := StartAggregator[RSCYNDLogEntry](rsyncAg, rsyncMetrics) - - return rsyncMetrics, rsyncLastUpdated, err -} - func main() { - // Enforce we are running linux or macos + // Mirror only runs on linux or macos if runtime.GOOS != "linux" && runtime.GOOS != "darwin" { fmt.Println("This program is only meant to be run on *nix systems") os.Exit(1) @@ -180,13 +130,14 @@ func main() { os.Exit(1) } + // Initialize the tokens file for manual syncing tokens, err := loadTokens() if err != nil { logging.Error("Failed to load tokens file:", err) os.Exit(1) } - // GeoIP lookup + // GeoIP lookup for the map if maxmindLicenseKey != "" { geoipHandler, err = geoip.NewGeoIPHandler(maxmindLicenseKey) if err != nil { @@ -197,24 +148,27 @@ func main() { // Update rsyncd.conf file based on the config file rsyncdConf, err := os.OpenFile("/etc/rsyncd.conf", os.O_CREATE|os.O_WRONLY, 0644) if err != nil { - logging.Error("Could not open rsyncd.conf: ", err.Error()) - } - err = cfg.CreateRSCYNDConfig(rsyncdConf) - if err != nil { - logging.Error("Failed to create rsyncd.conf: ", err.Error()) + logging.Error(err.Error()) + } else { + err = cfg.CreateRSCYNDConfig(rsyncdConf) + if err != nil { + logging.Error("Failed to create rsyncd.conf: ", err.Error()) + } } - nginxChannels := make([]chan<- NGINXLogEntry, 0) + // TODO: Update nginx.conf file based on the config file + + nginxChannels := make([]chan<- aggregator.NGINXLogEntry, 0) nginxLastUpdated := time.Now() - rsyncChannels := make([]chan<- RSCYNDLogEntry, 0) + rsyncChannels := make([]chan<- aggregator.RSCYNDLogEntry, 0) rsyncLastUpdated := time.Now() if influxToken != "" { // Setup reader and writer for influxdb - SetupInfluxClients(influxToken) + reader, writer := SetupInfluxClients(influxToken) // Start the nginx aggregator - nginxMetrics, lastupdated, err := startNGINX(cfg) + nginxMetrics, lastupdated, err := StartNGINXAggregator(reader, writer, cfg) if err != nil { logging.Error("Failed to start nginx aggregator:", err) nginxLastUpdated = time.Now() @@ -224,7 +178,7 @@ func main() { } // Start the rsync aggregator - rsyncMetrics, lastupdated, err := startRSYNC() + rsyncMetrics, lastupdated, err := StartRSYNCAggregator(reader, writer) if err != nil { logging.Error("Failed to start rsync aggregator:", err) rsyncLastUpdated = time.Now() @@ -235,18 +189,23 @@ func main() { } manual := make(chan string) - scheduler := NewScheduler(context.Background(), cfg) + scheduler, err := NewScheduler(context.Background(), cfg) + if err != nil { + logging.Error("Failed to create scheduler:", err) + os.Exit(1) + } + go scheduler.Start(manual) // WebServer - mapEntries := make(chan NGINXLogEntry) + mapEntries := make(chan aggregator.NGINXLogEntry) nginxChannels = append(nginxChannels, mapEntries) WebServerLoadConfig(cfg, tokens) go HandleWebServer(manual, mapEntries) - go TailNGINXLogFile("/var/log/nginx/access.log", nginxLastUpdated, nginxChannels) - go TailRSYNCLogFile("/var/log/nginx/rsyncd.log", rsyncLastUpdated, rsyncChannels) + go aggregator.TailNGINXLogFile("/var/log/nginx/access.log", nginxLastUpdated, nginxChannels, geoipHandler) + go aggregator.TailRSYNCLogFile("/var/log/nginx/rsyncd.log", rsyncLastUpdated, rsyncChannels) // Wait forever select {} diff --git a/map.go b/map.go index 3b88de2..4c17ce5 100644 --- a/map.go +++ b/map.go @@ -3,7 +3,9 @@ package main import ( "net" "net/http" + "time" + "github.com/COSI-Lab/Mirror/aggregator" "github.com/COSI-Lab/Mirror/logging" "github.com/gorilla/mux" "github.com/gorilla/websocket" @@ -61,12 +63,12 @@ func (hub *hub) run() { close(client.send) logging.Info("Unregistered client", client.conn.RemoteAddr()) case message := <-hub.broadcast: - // broadcasts the message to all clients for client := range hub.clients { select { case client.send <- message: default: - // If the client blocks we skip it + // If the client is not receiving messages, unregister it + hub.unregister <- client } } } @@ -117,20 +119,28 @@ func MapRouter(r *mux.Router, broadcast chan []byte) { go h.run() } -func entriesToMessages(entries <-chan NGINXLogEntry, messages chan<- []byte) { - // Send groups of 8 messages +func entriesToMessages(entries <-chan aggregator.NGINXLogEntry, messages chan<- []byte) { + // Send a group of messages every second ch := make(chan []byte) go func() { + ticker := time.NewTicker(time.Second) + group := make([]byte, 0) for { - group := make([]byte, 0, 40) - for i := 0; i < 8; i++ { - group = append(group, <-ch...) + select { + case msg := <-ch: + group = append(group, msg...) + case <-ticker.C: + if len(group) == 0 { + continue + } + + messages <- group + group = make([]byte, 0) } - messages <- group } }() - // Track the previous IP to avoid sending duplicate data + // Deduplicate neighboring entries with the same IP prevIP := net.IPv4(0, 0, 0, 0) for { entry := <-entries diff --git a/scripts/raspbian-tools b/scripts/raspbian-tools deleted file mode 160000 index b080753..0000000 --- a/scripts/raspbian-tools +++ /dev/null @@ -1 +0,0 @@ -Subproject commit b080753fd7306fe0a428221af4632d002a474d87 diff --git a/sync.go b/sync.go index 86bc021..038edfe 100644 --- a/sync.go +++ b/sync.go @@ -64,12 +64,19 @@ type SchedulerTask struct { } // NewScheduler creates a new scheduler from a config.File -func NewScheduler(ctx context.Context, config *config.File) Scheduler { - failed := false +func NewScheduler(ctx context.Context, config *config.File) (Scheduler, error) { month := time.Now().UTC().Month() builer := scheduler.NewCalendarBuilder[*SchedulerTask]() + // Create the log directory if it doesn't exist + if _, err := os.Stat("/var/log/mirror"); os.IsNotExist(err) { + err := os.Mkdir("/var/log/mirror", 0755) + if err != nil { + return Scheduler{}, fmt.Errorf("failed to create log directory: %s", err) + } + } + for short, project := range config.Projects { var task Task var syncsPerDay uint @@ -102,13 +109,11 @@ func NewScheduler(ctx context.Context, config *config.File) Scheduler { stdout, err := os.OpenFile(fmt.Sprintf("/var/log/mirror/%s-%s.log", short, month), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { - logging.Error("Failed to open stdout file for project ", short, ": ", err) - failed = true + return Scheduler{}, fmt.Errorf("failed to open stdout file for %q: %s", short, err) } stderr, err := os.OpenFile(fmt.Sprintf("/var/log/mirror/%s-%s.err", short, month), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { - logging.Error("Failed to open stderr file for project ", short, ": ", err) - failed = true + return Scheduler{}, fmt.Errorf("failed to open stderr file for %q: %s", short, err) } builer.AddTask(&SchedulerTask{ @@ -123,30 +128,23 @@ func NewScheduler(ctx context.Context, config *config.File) Scheduler { }, syncsPerDay) } - if failed { - logging.Error("One or more errors occurred while setting up the scheduler") - os.Exit(1) - } - return Scheduler{ ctx: ctx, calendar: builer.Build(), - } + }, nil } // Start begins the scheduler and blocks until the context is canceled -// -// manual is a channel that can be used to manually trigger a project sync func (sc *Scheduler) Start(manual <-chan string) { timer := time.NewTimer(0) - month := time.NewTimer(waitMonth()) + month := time.NewTimer(timeToNextMonth()) for { select { case <-sc.ctx.Done(): return case <-month.C: - month.Reset(waitMonth()) + month.Reset(timeToNextMonth()) month := time.Now().Local().Month() sc.calendar.ForEach( func(task **SchedulerTask) { @@ -157,13 +155,13 @@ func (sc *Scheduler) Start(manual <-chan string) { // Create new files for the next month stdout, err := os.OpenFile(fmt.Sprintf("/var/log/mirror/%s-%s.log", t.short, month), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { - logging.Error("Failed to open stdout file for project ", t.short, ": ", err) + logging.Error("Failed to open stdout file for %q: %s", t.short, err) } else { t.stdout.Reset(stdout) } stderr, err := os.OpenFile(fmt.Sprintf("/var/log/mirror/%s-%s.err", t.short, month), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { - logging.Error("Failed to open stderr file for project ", t.short, ": ", err) + logging.Error("Failed to open stderr file for %q: %s", t.short, err) } else { t.stderr.Reset(stderr) } @@ -209,8 +207,8 @@ func (t *SchedulerTask) runTask(ctx context.Context) { }() } -// waitMonth returns a timer that will fire at the beginning of the next month -func waitMonth() time.Duration { +// timeToNextMonth returns the duration until the next month +func timeToNextMonth() time.Duration { now := time.Now().UTC() return time.Until(time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.Local)) } diff --git a/webserver.go b/webserver.go index d4dde5a..f2c4748 100644 --- a/webserver.go +++ b/webserver.go @@ -6,6 +6,7 @@ import ( "net/http" "sync" + "github.com/COSI-Lab/Mirror/aggregator" "github.com/COSI-Lab/Mirror/config" "github.com/COSI-Lab/Mirror/logging" "github.com/gorilla/mux" @@ -64,8 +65,9 @@ func handleProjects(w http.ResponseWriter, r *http.Request) { } } -// handleManualSyncs is a endpoint that allows a privileged user to manually cause a project to sync -// Access token is included in the query string. The http method is not considered. +// handleManualSyncs is an endpoint that allows privileged users to manually trigger a project to sync +// Access token must be included in the query string. +// HTTP method is ignored // // /sync/{project}?token={token} func handleManualSyncs(manual chan<- string) http.HandlerFunc { @@ -77,7 +79,7 @@ func handleManualSyncs(manual chan<- string) http.HandlerFunc { // Get the project name vars := mux.Vars(r) - projectName := vars["project"] + project := vars["project"] // Get the access token token := r.URL.Query().Get("token") @@ -86,25 +88,19 @@ func handleManualSyncs(manual chan<- string) http.HandlerFunc { return } - // Check if the token has permission for projectName + // Check if this token exists t := tokens.GetToken(token) - if t == nil { - http.Error(w, "Invalid access token", http.StatusForbidden) - return - } - - // Check if the token has permission for projectName - if !t.HasProject(projectName) { + if t == nil || !t.HasProject(project) { http.Error(w, "Invalid access token", http.StatusForbidden) return } // Return a success message - fmt.Fprintf(w, "Sync requested for project: %s", projectName) + fmt.Fprintf(w, "Sync successfully requested for project: %s", project) // Sync the project - logging.Info("Manual sync requested for project: _", projectName, "_") - manual <- projectName + logging.Info("Manual sync requested for project %q", project) + manual <- project } } @@ -126,7 +122,7 @@ func WebServerLoadConfig(cfg *config.File, t *config.Tokens) { // HandleWebServer starts the webserver and listens for incoming connections // manual is a channel that project short names are sent down to manually trigger a projects rsync // entries is a channel that contains log entries that are disabled by the mirror map -func HandleWebServer(manual chan<- string, entries <-chan NGINXLogEntry) { +func HandleWebServer(manual chan<- string, entries <-chan aggregator.NGINXLogEntry) { r := mux.NewRouter() // Setup the map @@ -149,7 +145,7 @@ func HandleWebServer(manual chan<- string, entries <-chan NGINXLogEntry) { http.FileServer(http.Dir("static")).ServeHTTP(w, r) })) - // Serve on 8080 + // Serve on 8012 l := &http.Server{ Addr: ":8012", Handler: r,