Skip to content

Commit

Permalink
Add usage limits & more metrics tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewFerr committed Nov 15, 2023
1 parent 64f0304 commit f6517ca
Show file tree
Hide file tree
Showing 10 changed files with 384 additions and 69 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
## Element fork

The Element fork includes the following changes:
- User activity tracking
- Add additional metrics to the bridge

Some changes that appear here may get upstreamed to https://github.com/mautrix/signalgo, and will be removed from
the list when they appear in both versions.

Tagged versions will appear as `v{UPSTREAM-VERSION}-mod-{VERSION}`

E.g. The third modification release to 1.0 of the upstream bridge would be `v1.0-mod-3`.

# mautrix-signalgo
![Languages](https://img.shields.io/github/languages/top/mautrix/signalgo.svg)
[![License](https://img.shields.io/github/license/mautrix/signalgo.svg)](LICENSE)
Expand Down
10 changes: 10 additions & 0 deletions config/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ type BridgeConfig struct {

usernameTemplate *template.Template `yaml:"-"`
displaynameTemplate *template.Template `yaml:"-"`

Limits struct {
MaxPuppetLimit uint `yaml:"max_puppet_limit"`
MinPuppetActivityDays uint `yaml:"min_puppet_activity_days"`
PuppetInactivityDays *uint `yaml:"puppet_inactivity_days"`
BlockOnLimitReached bool `yaml:"block_on_limit_reached"`
BlockBeginsNotification string `yaml:"block_begins_notification"`
BlockEndsNotification string `yaml:"block_ends_notification"`
BlockNotificationIntervalSeconds uint `yaml:"block_notification_interval_seconds"`
} `yaml:"limits"`
}

func (bc *BridgeConfig) GetResendBridgeInfo() bool {
Expand Down
9 changes: 9 additions & 0 deletions config/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ func DoUpgrade(helper *up.Helper) {
}

helper.Copy(up.Map, "bridge", "permissions")

helper.Copy(up.Int, "bridge", "limits", "max_puppet_limit")
helper.Copy(up.Int, "bridge", "limits", "min_puppet_activity_days")
helper.Copy(up.Int|up.Null, "bridge", "limits", "puppet_inactivity_days")
helper.Copy(up.Bool, "bridge", "limits", "block_on_limit_reached")
helper.Copy(up.Str, "bridge", "limits", "block_begins_notification")
helper.Copy(up.Str, "bridge", "limits", "block_ends_notification")
helper.Copy(up.Int, "bridge", "limits", "block_notification_interval_seconds")
}

var SpacedBlocks = [][]string{
Expand All @@ -92,5 +100,6 @@ var SpacedBlocks = [][]string{
{"bridge", "encryption"},
{"bridge", "provisioning"},
{"bridge", "permissions"},
{"bridge", "limits"},
{"logging"},
}
37 changes: 37 additions & 0 deletions database/puppet.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"database/sql"
"errors"
"fmt"
"time"

"go.mau.fi/util/dbutil"
log "maunium.net/go/maulogger/v2"

"maunium.net/go/mautrix/id"
)

const oneDayMs = 24 * 60 * 60 * 1000

type PuppetQuery struct {
db *Database
log log.Logger
Expand Down Expand Up @@ -176,6 +179,26 @@ func (p *Puppet) UpdateNumber() error {
return nil
}

func (p *Puppet) UpdateActivityTs(activityTs int64) {
if p.LastActivityTs > activityTs {
return
}
p.log.Debugfln("Updating activity time for %s to %d", p.SignalID, activityTs)
p.LastActivityTs = activityTs
_, err := p.db.Exec("UPDATE puppet SET last_activity_ts=$1 WHERE uuid=$2", p.LastActivityTs, p.SignalID)
if err != nil {
p.log.Warnfln("Failed to update last_activity_ts for %s: %v", p.SignalID, err)
}

if p.FirstActivityTs == 0 {
p.FirstActivityTs = activityTs
_, err = p.db.Exec("UPDATE puppet SET first_activity_ts=$1 WHERE uuid=$2 AND first_activity_ts is NULL", p.FirstActivityTs, p.SignalID)
if err != nil {
p.log.Warnfln("Failed to update first_activity_ts %s: %v", p.SignalID, err)
}
}
}

func (p *Puppet) Update() error {
q := `
UPDATE puppet SET
Expand Down Expand Up @@ -236,3 +259,17 @@ func (pq *PuppetQuery) GetAllWithCustomMXID() ([]*Puppet, error) {
}
return puppets, nil
}

func (pq *PuppetQuery) GetRecentlyActiveCount(minActivityDays uint, maxActivityDays *uint) (count uint, err error) {
q := "SELECT COUNT(*) FROM puppet WHERE (last_activity_ts - first_activity_ts) > $1"
var row *sql.Row
lastActivityTs := oneDayMs * minActivityDays
if maxActivityDays == nil {
row = pq.db.QueryRow(q, lastActivityTs)
} else {
q += " AND ($2 - last_activity_ts) <= $3"
row = pq.db.QueryRow(q, lastActivityTs, time.Now().UnixMilli(), oneDayMs*(*maxActivityDays))
}
err = row.Scan(&count)
return
}
16 changes: 16 additions & 0 deletions example-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,22 @@ bridge:
"example.com": user
"@admin:example.com": admin

# Limit usage of the bridge
limits:
# The maximum number of bridge puppets that can be "active" before the limit is reached
max_puppet_limit: 0
# The minimum amount of days a puppet must be active for before they are considered "active".
min_puppet_activity_days: 0
# The number of days after a puppets last activity where they are considered inactive again.
puppet_inactivity_days: 30
# Should the bridge block traffic when a limit has been reached
block_on_limit_reached: true
# The message sent to bridge admins when the bridges starts/stops being blocked
block_begins_notification: 'The bridge is currently blocking messages. You may need to increase your usage limits in EMS.'
block_ends_notification: 'The bridge is no longer blocking messages.'
# The minimum interval between blocking notification being sent
block_notification_interval_seconds: 3600

# Logging config. See https://github.com/tulir/zeroconfig for details.
logging:
min_level: debug
Expand Down
140 changes: 139 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
_ "embed"
"fmt"
"os"
"time"

"sync"

Expand All @@ -13,14 +14,18 @@ import (
"github.com/vector-im/mautrix-signal/database"
"github.com/vector-im/mautrix-signal/pkg/signalmeow"
"go.mau.fi/util/configupgrade"
"maunium.net/go/maulogger/v2"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/bridge"
"maunium.net/go/mautrix/bridge/bridgeconfig"
"maunium.net/go/mautrix/bridge/commands"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/format"
"maunium.net/go/mautrix/id"
)

const activeUserMetricsIntervalSec = 60

//go:embed example-config.yaml
var ExampleConfig string

Expand All @@ -42,6 +47,11 @@ type SignalBridge struct {

provisioning *ProvisioningAPI

puppetActivity *PuppetActivity
activePuppetMetricLoopDone chan bool
activePuppetMetricRequest chan bool
lastBlockingNotification int64

usersByMXID map[id.UserID]*User
usersBySignalID map[string]*User
usersLock sync.Mutex
Expand Down Expand Up @@ -95,7 +105,7 @@ func (br *SignalBridge) Init() {
Bridge: br,
}

br.Metrics = NewMetricsHandler(br.Config.Metrics.Listen, br.Log.Sub("Metrics"), br.DB)
br.Metrics = NewMetricsHandler(br.Config.Metrics.Listen, br.Log.Sub("Metrics"), br.DB, br.puppetActivity)
br.MatrixHandler.TrackEventDuration = br.Metrics.TrackMatrixEvent
}

Expand All @@ -110,6 +120,8 @@ func (br *SignalBridge) Start() {
br.provisioning.Init()
}
go br.StartUsers()
br.updateActivePuppetMetricNow()
go br.loopActivePuppetMetric()
if br.Config.Metrics.Enabled {
go br.Metrics.Start()
}
Expand All @@ -122,6 +134,7 @@ func (br *SignalBridge) Stop() {
br.Log.Debugln("Disconnecting", user.MXID)
user.Disconnect()
}
close(br.activePuppetMetricLoopDone)
}

func (br *SignalBridge) GetIPortal(mxid id.RoomID) bridge.Portal {
Expand Down Expand Up @@ -230,6 +243,124 @@ func (br *SignalBridge) createPrivatePortalFromInvite(roomID id.RoomID, inviter
portal.UpdateBridgeInfo()
_, _ = intent.SendNotice(roomID, "Private chat portal created")
}

func (br *SignalBridge) UpdateActivePuppetMetric() {
defer func() {
if recover() != nil {
br.ZLog.Warn().Msg("Attempted to update active puppet metrics after bridge has stopped")
}
}()
br.activePuppetMetricRequest <- true
}

func (br *SignalBridge) updateActivePuppetMetricNow() {
br.Log.Debugln("Updating active puppet count")
br.updateActivePuppetMetric(br.Log)
}

func (br *SignalBridge) updateActivePuppetMetric(log maulogger.Logger) {
activeUsers, err := br.DB.Puppet.GetRecentlyActiveCount(
br.Config.Bridge.Limits.MinPuppetActivityDays,
br.Config.Bridge.Limits.PuppetInactivityDays,
)
if err != nil {
log.Warnln("Failed to scan number of active puppets")
return
}

if br.Config.Bridge.Limits.BlockOnLimitReached {
blocked := br.Config.Bridge.Limits.MaxPuppetLimit < activeUsers
if br.puppetActivity.isBlocked != blocked {
br.puppetActivity.isBlocked = blocked
br.notifyBridgeBlocked(blocked)
}
}
log.Debugfln("Current active puppet count is %d", activeUsers)
br.puppetActivity.currentUserCount = activeUsers

if br.Config.Metrics.Enabled {
br.Metrics.updatePuppetActivity()
}
}

func (br *SignalBridge) loopActivePuppetMetric() {
log := br.Log.Sub("mau.active_puppet_metric")

ticker := time.Tick(activeUserMetricsIntervalSec * time.Second)
for {
select {
case <-ticker:
log.Infoln("Executing periodic active puppet metric check")
br.updateActivePuppetMetric(log)
case <-br.activePuppetMetricRequest:
br.updateActivePuppetMetricNow()
case <-br.activePuppetMetricLoopDone:
close(br.activePuppetMetricRequest)
return
}
}
}

func (br *SignalBridge) notifyBridgeBlocked(isBlocked bool) {
var msg string
if isBlocked {
msg = br.Config.Bridge.Limits.BlockBeginsNotification
nextNotification := br.lastBlockingNotification + int64(br.Config.Bridge.Limits.BlockNotificationIntervalSeconds)
// We're only checking if the block is active, since the unblock notification will not be resent and we want it ASAP
if now := time.Now().Unix(); nextNotification > now {
return
} else {
br.lastBlockingNotification = now
}
} else {
msg = br.Config.Bridge.Limits.BlockEndsNotification
}

admins := make([]id.UserID, 0, len(br.Config.Bridge.Permissions))
for key, permissionLevel := range br.Config.Bridge.Permissions {
if permissionLevel == bridgeconfig.PermissionLevelAdmin {
// Only explicit MXIDs are notified, not wildcards or domains
if _, _, err := id.UserID(key).ParseAndValidate(); err == nil {
admins = append(admins, id.UserID(key))
}
}
}
if len(admins) == 0 {
br.ZLog.Debug().Msg("No bridge admins to notify about the bridge being blocked")
return
}

allAdmins := string(admins[0])
for _, adminMXID := range admins[1:] {
allAdmins += "," + string(adminMXID)
}
br.ZLog.Debug().Msgf("Notifying bridge admins (%s) about bridge being blocked", allAdmins)
for _, adminMXID := range admins {
admin := br.GetUserByMXID(adminMXID)
if admin == nil {
continue
}

roomID := admin.GetManagementRoomID()
if roomID == "" {
resp, err := br.Bot.CreateRoom(&mautrix.ReqCreateRoom{
Name: "Signal Bridge notice room",
IsDirect: true,
Invite: []id.UserID{adminMXID},
})
if err != nil {
br.ZLog.Warn().Err(err).Msg("Failed to create notice room")
continue
}
roomID = resp.RoomID
admin.SetManagementRoom(roomID)
}

// \u26a0 is a warning sign
br.Bot.SendNotice(roomID, "\u26a0 "+msg)
}
}

func main() {
br := &SignalBridge{
usersByMXID: make(map[id.UserID]*User),
Expand All @@ -243,6 +374,13 @@ func main() {
puppets: make(map[string]*Puppet),
puppetsByCustomMXID: make(map[id.UserID]*Puppet),
puppetsByNumber: make(map[string]*Puppet),

puppetActivity: &PuppetActivity{
currentUserCount: 0,
isBlocked: false,
},
activePuppetMetricLoopDone: make(chan bool),
activePuppetMetricRequest: make(chan bool),
}
br.Bridge = bridge.Bridge{
Name: "mautrix-signal",
Expand Down
Loading

0 comments on commit f6517ca

Please sign in to comment.