Skip to content

Commit

Permalink
Use BPF ringbuf to send collected data to user-space (#10)
Browse files Browse the repository at this point in the history
* use ring buffer for data sharing

* require kernel version >= 5.8

* improve error handling
  • Loading branch information
keisku authored May 19, 2024
1 parent 41a39e1 commit af685b3
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 163 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# Prerequisites

- amd64 (x86_64)
- Linux Kernel 5.8+ since `gmon` uses [BPF ring buffer](https://nakryiko.com/posts/bpf-ringbuf/)

# Usage

Expand Down
24 changes: 7 additions & 17 deletions ebpf/bpf_x86_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 23 additions & 16 deletions ebpf/c/gmon.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,18 @@ int runtime_newproc1(struct pt_regs *ctx) {
bpf_printk("%s:%d | failed to read stackid\n", __FILE__, __LINE__);
return 0;
}
struct newproc1_event_key key = {
.goroutine_id = goid,
.ktime = bpf_ktime_get_ns(),
};
struct newproc1_event event = {
.stack_id = stack_id,
};
bpf_map_update_elem(&newproc1_events, &key, &event, BPF_ANY);

struct event *ev;
ev = bpf_ringbuf_reserve(&events, sizeof(*ev), 0);
if (!ev) {
bpf_printk("%s:%d | failed to reserve ringbuf\n", __FILE__, __LINE__);
return 0;
}
ev->goroutine_id = goid;
ev->stack_id = stack_id;
ev->exit = false;
bpf_ringbuf_submit(ev, 0);

return 0;
}

Expand All @@ -65,14 +69,17 @@ int runtime_goexit1(struct pt_regs *ctx) {
return 0;
}

struct goexit1_event_key key = {
.goroutine_id = go_id,
.ktime = bpf_ktime_get_ns(),
};
struct goexit1_event event = {
.stack_id = stack_id,
};
bpf_map_update_elem(&goexit1_events, &key, &event, BPF_ANY);
struct event *ev;
ev = bpf_ringbuf_reserve(&events, sizeof(*ev), 0);
if (!ev) {
bpf_printk("%s:%d | failed to reserve ringbuf\n", __FILE__, __LINE__);
return 0;
}
ev->goroutine_id = go_id;
ev->stack_id = stack_id;
ev->exit = true;
bpf_ringbuf_submit(ev, 0);

return 0;
}

Expand Down
26 changes: 7 additions & 19 deletions ebpf/c/maps.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,24 @@
__type(value, _value_type); \
} _name SEC(".maps");

#define BPF_HASH(_name, _key_type, _value_type, _max_entries) \
BPF_MAP(_name, BPF_MAP_TYPE_HASH, _key_type, _value_type, _max_entries)

// stack traces: the value is 1 big byte array of the stack addresses
typedef __u64 stack_trace_t[MAX_STACK_DEPTH];
#define BPF_STACK_TRACE(_name, _max_entries) \
BPF_MAP(_name, BPF_MAP_TYPE_STACK_TRACE, u32, stack_trace_t, _max_entries)

BPF_STACK_TRACE(stack_addresses, MAX_STACK_ADDRESSES); // store stack traces

struct newproc1_event_key {
int64_t goroutine_id;
uint64_t ktime; // To make this struct unique
};
struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
__uint(max_entries, 1 << 24);
} events SEC(".maps");

struct newproc1_event {
int stack_id;
};

BPF_HASH(newproc1_events, struct newproc1_event_key, struct newproc1_event, 10240);

struct goexit1_event_key {
struct event {
int64_t goroutine_id;
uint64_t ktime; // To make this struct unique
};

struct goexit1_event {
int stack_id;
bool exit;
};

BPF_HASH(goexit1_events, struct goexit1_event_key, struct goexit1_event, 10240);
struct event *unused __attribute__((unused));

#endif /* __MAPS_H__ */
158 changes: 48 additions & 110 deletions ebpf/event_handler.go
Original file line number Diff line number Diff line change
@@ -1,56 +1,84 @@
package ebpf

import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"log/slog"
"runtime/trace"
"strconv"
"time"

"github.com/cilium/ebpf"
"github.com/cilium/ebpf/ringbuf"
"github.com/go-delve/delve/pkg/proc"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/keisku/gmon/bininfo"
)

type eventHandler struct {
goroutineQueue chan<- goroutine
objs *bpfObjects
biTranslator bininfo.Translator
reader *ringbuf.Reader
}

func (h *eventHandler) run(ctx context.Context) {
ticker := time.NewTicker(200 * time.Millisecond)
var event bpfEvent
stackIdCache, _ := lru.New[int32, struct{}](16)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
ctx, task := trace.NewTask(ctx, "event_handler.handle")
trace.WithRegion(ctx, "event_handler.handle_newproc1", func() {
if err := h.handleNewproc1(); err != nil {
slog.Warn("Failed to handle newproc1", slog.Any("error", err))
}
})
trace.WithRegion(ctx, "event_handler.handle_goexit1", func() {
if err := h.handleGoexit1(); err != nil {
slog.Warn("Failed to handle goexit1", slog.Any("error", err))
}
})
task.End()
if err := h.readRecord(ctx, &event); err != nil {
if errors.Is(err, ringbuf.ErrClosed) {
slog.Debug("ring buffer is closed")
return
}
slog.Warn("Failed to read bpf ring buffer", slog.Any("error", err))
continue
}
stack, err := h.lookupStack(ctx, event.StackId)
if err != nil {
slog.Warn(err.Error())
continue
}
h.sendGoroutine(goroutine{
Id: event.GoroutineId,
ObservedAt: time.Now(),
Stack: stack,
Exit: event.Exit,
})
contains, _ := stackIdCache.ContainsOrAdd(event.StackId, struct{}{})
if !contains {
slog.Debug("delete stack_addresses", slog.Int("stack_id", int(event.StackId)))
if err := h.objs.StackAddresses.Delete(event.StackId); err != nil {
slog.Debug("Failed to delete stack_addresses", slog.Any("error", err))
}
}
}
}

func (h *eventHandler) readRecord(ctx context.Context, event *bpfEvent) error {
_, task := trace.NewTask(ctx, "event_handler.read_ring_buffer")
defer task.End()
record, err := h.reader.Read()
if err != nil {
return err
}
if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, event); err != nil {
return fmt.Errorf("decode ring buffer record: %w", err)
}
return nil
}

// lookupStack is a copy of the function in tracee.
// https://github.com/aquasecurity/tracee/blob/f61866b4e2277d2a7dddc6cd77a67cd5a5da3b14/pkg/ebpf/events_pipeline.go#L642-L681
const maxStackDepth = 20

var stackFrameSize = (strconv.IntSize / 8)

func (h *eventHandler) lookupStack(stackId int32) ([]*proc.Function, error) {
func (h *eventHandler) lookupStack(ctx context.Context, stackId int32) ([]*proc.Function, error) {
_, task := trace.NewTask(ctx, "event_handler.lookup_stack")
defer task.End()
stackBytes, err := h.objs.StackAddresses.LookupBytes(stackId)
if err != nil {
return nil, fmt.Errorf("failed to lookup stack addresses: %w", err)
Expand Down Expand Up @@ -83,38 +111,6 @@ func (h *eventHandler) lookupStack(stackId int32) ([]*proc.Function, error) {
return stack[0:stackCounter], nil
}

func (h *eventHandler) handle(
stackAddrs, eventMap *ebpf.Map,
// stackIdSet is the set of stack_id to delete later.
// keysToDelete is the slice of eBPF map keys to delete later.
// keyLength holds the count of keys in keysToDelete to determine if BatchDelete is required.
processMap func(iter *ebpf.MapIterator, stackIdSet map[int32]struct{}) (keysToDelete any, keyLength int),
) error {
stackIdSetToDelete := make(map[int32]struct{})
mapIter := eventMap.Iterate()
keysToDelete, keyLength := processMap(mapIter, stackIdSetToDelete)
if err := mapIter.Err(); err != nil {
return fmt.Errorf("failed to iterate eBPF map: %w", err)
}
if 0 < keyLength {
if n, err := eventMap.BatchDelete(keysToDelete, nil); err == nil {
slog.Debug("Deleted eBPF map", slog.Int("deleted", n), slog.Int("expected", keyLength))
} else {
slog.Warn("Failed to delete eBPF map", slog.Any("err", err))
}
}
// Don't use BatchDelete for stack addresses because the opration is not supported.
// If we do it, we will see "batch delete: not supported" error.
for stackId := range stackIdSetToDelete {
if err := stackAddrs.Delete(stackId); err != nil {
slog.Debug("Failed to delete stack_addresses", slog.Any("error", err))
continue
}
slog.Debug("Deleted stack address map", slog.Int("stack_id", int(stackId)))
}
return nil
}

func (h *eventHandler) sendGoroutine(g goroutine) {
maxRetries := 3
retryInterval := 10 * time.Millisecond
Expand Down Expand Up @@ -145,61 +141,3 @@ func (h *eventHandler) sendGoroutine(g goroutine) {
}
}
}

func (h *eventHandler) handleNewproc1() error {
var key bpfNewproc1EventKey
var value bpfNewproc1Event
var keysToDelete []bpfNewproc1EventKey

return h.handle(
h.objs.StackAddresses,
h.objs.Newproc1Events,
func(mapIter *ebpf.MapIterator, stackIdSet map[int32]struct{}) (any, int) {
for mapIter.Next(&key, &value) {
stack, err := h.lookupStack(value.StackId)
if err != nil {
slog.Warn(err.Error())
continue
}
stackIdSet[value.StackId] = struct{}{}
keysToDelete = append(keysToDelete, key)
h.sendGoroutine(goroutine{
Id: key.GoroutineId,
ObservedAt: time.Now(),
Stack: stack,
Exit: false,
})
}
return keysToDelete, len(keysToDelete)
},
)
}

func (h *eventHandler) handleGoexit1() error {
var key bpfGoexit1EventKey
var value bpfGoexit1Event
var keysToDelete []bpfGoexit1EventKey

return h.handle(
h.objs.StackAddresses,
h.objs.Goexit1Events,
func(mapIter *ebpf.MapIterator, stackIdSet map[int32]struct{}) (any, int) {
for mapIter.Next(&key, &value) {
stack, err := h.lookupStack(value.StackId)
if err != nil {
slog.Warn(err.Error())
continue
}
stackIdSet[value.StackId] = struct{}{}
keysToDelete = append(keysToDelete, key)
h.sendGoroutine(goroutine{
Id: key.GoroutineId,
ObservedAt: time.Now(),
Stack: stack,
Exit: false,
})
}
return keysToDelete, len(keysToDelete)
},
)
}
Loading

0 comments on commit af685b3

Please sign in to comment.