Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Big Refactor #57

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ MAXMIND_LICENSE_KEY=
INFLUX_TOKEN=
```

## NGINX

The NGINX aggregator requires this `access_log` configuration:

```nginx
log_format config '"$time_local" "$remote_addr" "$request" "$status" "$body_bytes_sent" "$request_length" "$http_user_agent"';
access_log /var/log/nginx/access.log config;
```

## Dependencies

Quick-Fedora-Mirror requires `zsh`
Expand Down
13 changes: 9 additions & 4 deletions aggregator.go → aggregator/aggregator.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package aggregator

import (
"time"
Expand All @@ -18,9 +18,14 @@ type Aggregator[T any] interface {
Send(writer api.WriteAPI)
}

// StartAggregator starts the aggregator with the given Aggregator implementation, channel of type T, influxdb QueryAPI and WriteAPI.
// It returns the lastUpdated time and an error if any occurred during initialization.
func StartAggregator[T any](aggregator Aggregator[T], c <-chan T) (lastUpdated time.Time, err error) {
// StartAggregator is a function that starts an aggregator process to continuously
// read data from a channel, aggregate it using the provided aggregator, and
// send the aggregated data to a writer at regular intervals.
//
// # It takes an influxdb reader, writer, an aggregator over type T, and a channel of type T
//
// It returns the time of the last update (from influxdb), which can be used as a filter
func StartAggregator[T any](reader api.QueryAPI, writer api.WriteAPI, aggregator Aggregator[T], c <-chan T) (lastUpdated time.Time, err error) {
lastUpdated, err = aggregator.Init(reader)
if err != nil {
return lastUpdated, err
Expand Down
18 changes: 9 additions & 9 deletions aggregator_nginx.go → aggregator/aggregator_nginx.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package aggregator

import (
"bufio"
Expand All @@ -17,6 +17,7 @@ import (
"time"

"github.com/COSI-Lab/Mirror/logging"
"github.com/COSI-Lab/geoip"
"github.com/IncSW/geoip2"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
Expand Down Expand Up @@ -79,7 +80,6 @@ func (aggregator *NGINXProjectAggregator) Init(reader api.QueryAPI) (lastUpdated
result, err = reader.Query(context.Background(), request)

if err != nil {
logging.Warn("Failed to querying influxdb nginx statistics", err)
time.Sleep(time.Second)
continue
}
Expand All @@ -88,7 +88,7 @@ func (aggregator *NGINXProjectAggregator) Init(reader api.QueryAPI) (lastUpdated
}

if err != nil {
return lastUpdated, errors.New("error querying influxdb")
return lastUpdated, err
}

stats := make(ProjectStatistics)
Expand Down Expand Up @@ -207,7 +207,7 @@ func (aggregator *NGINXProjectAggregator) Send(writer api.WriteAPI) {
}
}

// NGINXLogEntry is a struct that represents a parsed nginx log entry
// NGINXLogEntry represents a parsed nginx log entry
type NGINXLogEntry struct {
IP net.IP
City *geoip2.CityResult
Expand All @@ -225,7 +225,7 @@ type NGINXLogEntry struct {
var reQuotes = regexp.MustCompile(`"(.*?)"`)

// TailNGINXLogFile tails a log file and sends the parsed log entries to the specified channels
func TailNGINXLogFile(logFile string, lastUpdated time.Time, channels []chan<- NGINXLogEntry) {
func TailNGINXLogFile(logFile string, lastUpdated time.Time, channels []chan<- NGINXLogEntry, geoipHandler *geoip.GeoIPHandler) {
start := time.Now()

f, err := os.Open(logFile)
Expand Down Expand Up @@ -261,7 +261,7 @@ func TailNGINXLogFile(logFile string, lastUpdated time.Time, channels []chan<- N

// Parse each line as we receive it
for line := range tail.Lines {
entry, err := parseNginxLine(line.Text)
entry, err := parseNginxLine(geoipHandler, line.Text)

if err == nil {
for ch := range channels {
Expand All @@ -280,7 +280,7 @@ func parseNginxDate(line string) (time.Time, error) {
// It's critical the log file uses the correct format found at the top of this file
// If the log file is not in the correct format or if some other part of the parsing fails
// this function will return an error
func parseNginxLine(line string) (entry NGINXLogEntry, err error) {
func parseNginxLine(geoipHandler *geoip.GeoIPHandler, line string) (entry NGINXLogEntry, err error) {
// "$time_local" "$remote_addr" "$request" "$status" "$body_bytes_sent" "$request_length" "$http_user_agent";
quoteList := reQuotes.FindAllString(line, -1)

Expand All @@ -305,8 +305,8 @@ func parseNginxLine(line string) (entry NGINXLogEntry, err error) {
return entry, errors.New("failed to parse ip")
}

// Optional GeoIP lookup
if geoipHandler != nil {
// GeoIP lookup
if geoipHandler == nil {
city, err := geoipHandler.Lookup(entry.IP)
if err != nil {
entry.City = nil
Expand Down
6 changes: 2 additions & 4 deletions aggregator_rsyncd.go → aggregator/aggregator_rsyncd.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package main
package aggregator

import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -45,7 +44,6 @@ func (a *RSYNCDAggregator) Init(reader api.QueryAPI) (lastUpdated time.Time, err
result, err = reader.Query(context.Background(), request)

if err != nil {
logging.Warn("Failed to querying influxdb rsyncd statistics", err)
time.Sleep(time.Second)
continue
}
Expand All @@ -54,7 +52,7 @@ func (a *RSYNCDAggregator) Init(reader api.QueryAPI) (lastUpdated time.Time, err
}

if result == nil {
return time.Time{}, errors.New("Error querying influxdb for rsyncd stat")
return time.Time{}, err
}

for result.Next() {
Expand Down
61 changes: 61 additions & 0 deletions aggregators.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package main

import (
"net"
"time"

"github.com/COSI-Lab/Mirror/aggregator"
"github.com/COSI-Lab/Mirror/config"
"github.com/COSI-Lab/Mirror/logging"
"github.com/influxdata/influxdb-client-go/v2/api"
)

func StartNGINXAggregator(reader api.QueryAPI, writer api.WriteAPI, config *config.File) (chan<- aggregator.NGINXLogEntry, time.Time, error) {
nginxAg := aggregator.NewNGINXProjectAggregator()
nginxAg.AddMeasurement("nginx", func(re aggregator.NGINXLogEntry) bool {
return true
})

// Add subnet aggregators
for name, subnetStrings := range config.Subnets {
subnets := make([]*net.IPNet, 0)
for _, subnetString := range subnetStrings {
_, subnet, err := net.ParseCIDR(subnetString)
if err != nil {
logging.Warnf("Failed to parse subnet %q for %q", subnetString, name)
continue
}
subnets = append(subnets, subnet)
}

if len(subnets) == 0 {
logging.Warn("No valid subnets for", name)
continue
}

nginxAg.AddMeasurement(name, func(re aggregator.NGINXLogEntry) bool {
for _, subnet := range subnets {
if subnet.Contains(re.IP) {
return true
}
}
return false
})

logging.Infof("Added subnet aggregator for %q", name)
}

nginxMetrics := make(chan aggregator.NGINXLogEntry)
nginxLastUpdated, err := aggregator.StartAggregator[aggregator.NGINXLogEntry](reader, writer, nginxAg, nginxMetrics)

return nginxMetrics, nginxLastUpdated, err
}

func StartRSYNCAggregator(reader api.QueryAPI, writer api.WriteAPI) (chan<- aggregator.RSCYNDLogEntry, time.Time, error) {
rsyncAg := aggregator.NewRSYNCProjectAggregator()

rsyncMetrics := make(chan aggregator.RSCYNDLogEntry)
rsyncLastUpdated, err := aggregator.StartAggregator[aggregator.RSCYNDLogEntry](reader, writer, rsyncAg, rsyncMetrics)

return rsyncMetrics, rsyncLastUpdated, err
}
41 changes: 0 additions & 41 deletions config/configFile.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os"
"sort"
"strings"
"text/template"

"github.com/xeipuuv/gojsonschema"
)
Expand Down Expand Up @@ -101,46 +100,6 @@ func (config *File) GetProjects() []Project {
return projects
}

// CreateRSCYNDConfig writes a rsyncd.conf file to the given writer based on the config
//
// Consider passing a bufio.Write to this function
func (config *File) CreateRSCYNDConfig(w io.Writer) error {
tmpl := `# This is a generated file. Do not edit manually.
uid = nobody
gid = nogroup
use chroot = yes
max connections = 0
pid file = /var/run/rsyncd.pid
motd file = /etc/rsyncd.motd
log file = /var/log/rsyncd.log
log format = %t %o %a %m %f %b
dont compress = *.gz *.tgz *.zip *.z *.Z *.rpm *.deb *.bz2 *.tbz2 *.xz *.txz *.rar
refuse options = checksum delete
{{ range . }}
[{{ .Short }}]
comment = {{ .Name }}
path = /storage/{{ .Short }}
exclude = lost+found/
read only = true
ignore nonreadable = yes{{ end }}
`

var filteredProjects []*Project
for _, project := range config.Projects {
if project.PublicRsync {
filteredProjects = append(filteredProjects, project)
}
}

t := template.Must(template.New("rsyncd.conf").Parse(tmpl))
err := t.Execute(w, filteredProjects)
if err != nil {
return err
}

return nil
}

// Validate checks the config file for a few properties
//
// - All projects have a unique long name, case insensitive
Expand Down
44 changes: 44 additions & 0 deletions config/rsyncd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package config

import (
"io"
"text/template"
)

// CreateRSCYNDConfig writes a rsyncd.conf file to the given writer based on the Config struct
func (config *File) CreateRSCYNDConfig(w io.Writer) error {
tmpl := `# This is a generated file. Do not edit manually.
uid = nobody
gid = nogroup
use chroot = yes
max connections = 0
pid file = /var/run/rsyncd.pid
motd file = /etc/rsyncd.motd
log file = /var/log/rsyncd.log
log format = %t %o %a %m %f %b
dont compress = *.gz *.tgz *.zip *.z *.Z *.rpm *.deb *.bz2 *.tbz2 *.xz *.txz *.rar
refuse options = checksum delete
{{ range . }}
[{{ .Short }}]
comment = {{ .Name }}
path = /storage/{{ .Short }}
exclude = lost+found/
read only = true
ignore nonreadable = yes{{ end }}
`

var filteredProjects []*Project
for _, project := range config.Projects {
if project.PublicRsync {
filteredProjects = append(filteredProjects, project)
}
}

t := template.Must(template.New("rsyncd.conf").Parse(tmpl))
err := t.Execute(w, filteredProjects)
if err != nil {
return err
}

return nil
}
2 changes: 1 addition & 1 deletion config/tokens_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/COSI-Lab/Mirror/config"
)

// Test tokens.txt parsing
// Test tokens.toml parsing
func TestTokens(t *testing.T) {
example := `
[[tokens]]
Expand Down
9 changes: 2 additions & 7 deletions influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,12 @@ import (
"github.com/influxdata/influxdb-client-go/v2/api"
)

var writer api.WriteAPI
var reader api.QueryAPI

// SetupInfluxClients connects to influxdb and sets up the db clients
func SetupInfluxClients(token string) {
func SetupInfluxClients(token string) (reader api.QueryAPI, writer api.WriteAPI) {
// create new client with default option for server url authenticate by token
options := influxdb2.DefaultOptions()
options.SetTLSConfig(&tls.Config{InsecureSkipVerify: true})

client := influxdb2.NewClientWithOptions("https://mirror.clarkson.edu:8086", token, options)

writer = client.WriteAPI("COSI", "stats")
reader = client.QueryAPI("COSI")
return client.QueryAPI("COSI"), client.WriteAPI("COSI", "stats")
}
Loading