Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trace refactored #116

Merged
merged 27 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0bc81e9
govpp trace refactored
VladoLavor Mar 16, 2023
9559fc5
fix test
VladoLavor Mar 17, 2023
2c55f25
Merge branch 'master' into trace-fix
VladoLavor Mar 17, 2023
d6e7ed9
simplify
VladoLavor Mar 17, 2023
09e0c5c
Merge branch 'master' into trace-fix
VladoLavor May 4, 2023
3d31403
Merge branch 'master' into trace-fix
VladoLavor May 18, 2023
8915fe7
Merge branch 'master' into trace-fix
VladoLavor Jun 1, 2023
96900c4
Merge branch 'master' into trace-fix
VladoLavor Jun 23, 2023
b67ba44
some fixes
VladoLavor Jun 23, 2023
f2ee3d5
init records list out of the lock
VladoLavor Jun 23, 2023
7a2b4da
Merge branch 'master' into trace-fix
VladoLavor Jul 27, 2023
d1b91ca
Added succeed/fail flag to traced message
VladoLavor Jul 27, 2023
1a339a6
Merge branch 'master' into trace-fix
VladoLavor Aug 24, 2023
6f809a5
Merge branch 'master' into trace-fix
VladoLavor Sep 4, 2023
a0d199b
Merge branch 'master' into trace-fix
VladoLavor Sep 21, 2023
ce3a5fd
Merge branch 'master' into trace-fix
VladoLavor Oct 19, 2023
1fd714b
Merge branch 'master' into trace-fix
VladoLavor Nov 13, 2023
6d0521a
Merge branch 'master' into trace-fix
VladoLavor Dec 14, 2023
1870192
Merge branch 'master' into trace-fix
VladoLavor Dec 19, 2023
8859ead
Merge branch 'master' into trace-fix
VladoLavor Jan 15, 2024
44b1d74
Merge branch 'master' into trace-fix
VladoLavor Jan 23, 2024
6366087
Merge branch 'master' into trace-fix
sknat Feb 8, 2024
229f522
decrease the size of the trace prealocation
VladoLavor Feb 16, 2024
b5e13b4
make it safe to init trace during api calls
VladoLavor Feb 16, 2024
e33b77a
Merge branch 'master' into trace-fix
VladoLavor Feb 16, 2024
783ca18
Do not use trace lock if trace is disabled
VladoLavor Feb 22, 2024
d42dde7
fix data race on trace nil check
VladoLavor Feb 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions api/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,30 @@ import (
"time"
)

// Trace gives access to the API trace tool, capturing outcoming and incoming messages
// to and from GoVPP.
// Trace is the GoVPP utility tool capturing processed API messages. The trace is not operational
// by default.
// Enable trace for a given connection by calling `NewTrace(connection, size)`
type Trace interface {
// Enable allows to enable or disable API trace for a connection.
Enable(enable bool)

// GetRecords retrieves all messages collected (from all channels if they are used)
// since the point the trace was enabled or cleared.
// GetRecords returns all API messages from all channels captured since the trace
// was initialized or cleared up to the point of the call.
GetRecords() []*Record

// GetRecordsForChannel retrieves messages collected by the given channel since
// the point the trace was enabled or cleared.
// GetRecordsForChannel returns all API messages recorded by the given channel.
GetRecordsForChannel(chId uint16) []*Record

// Clear erases messages captured so far.
Clear()

// Close the tracer and release associated resources
Close()
}

// Record contains essential information about traced message, its timestamp and whether
// Record contains essential information about the traced message, its timestamp and whether
// the message was received or sent
type Record struct {
Message Message
Timestamp time.Time
IsReceived bool
ChannelID uint16
Succeeded bool
}
1 change: 0 additions & 1 deletion core/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ func (req *requestCtx) ReceiveReply(msg api.Message) error {
} else if lastReplyReceived {
return errors.New("multipart reply recieved while a single reply expected")
}

return nil
}

Expand Down
30 changes: 3 additions & 27 deletions core/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ type Connection struct {
msgControlPing api.Message
msgControlPingReply api.Message

apiTrace *trace // API tracer (disabled by default)
traceLock sync.Mutex
trace *Trace // API tracer (disabled by default)
}

type backgroundLoopStatus int
Expand Down Expand Up @@ -168,11 +169,7 @@ func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration,
subscriptions: make(map[uint16][]*subscriptionCtx),
msgControlPing: msgControlPing,
msgControlPingReply: msgControlPingReply,
apiTrace: &trace{
list: make([]*api.Record, 0),
mux: &sync.Mutex{},
},
channelIdPool: newIDPool(0x7fff),
channelIdPool: newIDPool(0x7fff),
}
c.channelPool = genericpool.New[*Channel](func() *Channel {
if isDebugOn(debugOptChannels) {
Expand Down Expand Up @@ -583,24 +580,3 @@ func (c *Connection) sendConnEvent(event ConnectionEvent) {
log.Warn("Connection state channel is full, discarding value.")
}
}

// Trace gives access to the API trace interface
func (c *Connection) Trace() api.Trace {
return c.apiTrace
}

// trace records api message
func (c *Connection) trace(msg api.Message, chId uint16, t time.Time, isReceived bool) {
if atomic.LoadInt32(&c.apiTrace.isEnabled) == 0 {
return
}
entry := &api.Record{
Message: msg,
Timestamp: t,
IsReceived: isReceived,
ChannelID: chId,
}
c.apiTrace.mux.Lock()
c.apiTrace.list = append(c.apiTrace.list, entry)
c.apiTrace.mux.Unlock()
}
57 changes: 47 additions & 10 deletions core/request_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,21 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
}

// send the request to VPP
t := time.Now()
err = c.vppClient.SendMsg(context, data)
if err != nil {
if err = func() (err error) {
timestamp, enabled := c.trace.registerNew()
err = c.vppClient.SendMsg(context, data)
if enabled {
c.traceLock.Lock()
defer c.traceLock.Unlock()
c.trace.send(&api.Record{
Message: req.msg,
Timestamp: timestamp,
ChannelID: ch.id,
Succeeded: err == nil,
})
}
return
}(); err != nil {
log.WithFields(logger.Fields{
"channel": ch.id,
"msg_id": msgID,
Expand All @@ -126,7 +138,6 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
}).Warnf("Unable to send message")
return err
}
c.trace(req.msg, ch.id, t, false)

if req.multi {
// send a control ping to determine end of the multipart response
Expand All @@ -143,16 +154,28 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
"data_len": len(pingData),
}).Debugf(" -> SEND MSG: %T", c.msgControlPing)
}

t = time.Now()
if err := c.vppClient.SendMsg(context, pingData); err != nil {
// send the control ping request to VPP
if err = func() (err error) {
timestamp, enabled := c.trace.registerNew()
err = c.vppClient.SendMsg(context, pingData)
if enabled {
c.traceLock.Lock()
defer c.traceLock.Unlock()
c.trace.send(&api.Record{
Message: c.msgControlPing,
Timestamp: timestamp,
ChannelID: ch.id,
Succeeded: err == nil,
})
}
return
}(); err != nil {
log.WithFields(logger.Fields{
"context": context,
"seq_num": req.seqNum,
"error": err,
}).Warnf("unable to send control ping")
}
c.trace(c.msgControlPing, ch.id, t, false)
}

return nil
Expand Down Expand Up @@ -188,11 +211,25 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) {

// decode and trace the message
msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
if err = c.codec.DecodeMsg(data, msg); err != nil {
if err = func() (err error) {
timestamp, enabled := c.trace.registerNew()
err = c.codec.DecodeMsg(data, msg)
if enabled {
c.traceLock.Lock()
defer c.traceLock.Unlock()
c.trace.send(&api.Record{
Message: msg,
Timestamp: timestamp,
IsReceived: true,
ChannelID: chanID,
Succeeded: err == nil,
})
}
return
}(); err != nil {
log.WithField("msg", msg).Warnf("Unable to decode message: %v", err)
return
}
c.trace(msg, chanID, time.Now(), true)

if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
log.WithFields(logger.Fields{
Expand Down
109 changes: 81 additions & 28 deletions core/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,51 +18,104 @@ import (
"go.fd.io/govpp/api"
"sort"
"sync"
"sync/atomic"
"time"
)

// trace is the API tracer object synchronizing and keeping recoded messages.
type trace struct {
list []*api.Record
mux *sync.Mutex
// default buffer size
const bufferSize = 100

isEnabled int32
// Trace is the default trace API implementation.
type Trace struct {
*sync.RWMutex
wg sync.WaitGroup

records []*api.Record
buffer chan *api.Record
index int

closeFunc func()
}

func (c *trace) Enable(enable bool) {
if enable && atomic.CompareAndSwapInt32(&c.isEnabled, 0, 1) {
log.Debugf("API trace enabled")
} else if atomic.CompareAndSwapInt32(&c.isEnabled, 1, 0) {
log.Debugf("API trace disabled")
// NewTrace initializes the trace object, always bound to a GoVPP connection.
// The size limits the number of records stored.
// Initializing a new trace for the same connection replaces the old one and
// discards all values already collected.
func NewTrace(c *Connection, size int) (t *Trace) {
t = &Trace{
RWMutex: &sync.RWMutex{},
records: make([]*api.Record, size),
buffer: make(chan *api.Record, bufferSize),
}
c.traceLock.Lock()
c.trace = t
c.traceLock.Unlock()
t.closeFunc = func() {
c.trace = nil // no more records
close(t.buffer)
}
go func() {
for {
record, ok := <-t.buffer
if !ok {
return
}
if t.index < len(t.records) {
t.Lock()
t.records[t.index] = record
t.index++
t.Unlock()
}
t.wg.Done()
}
}()
return
}

func (c *trace) GetRecords() (list []*api.Record) {
c.mux.Lock()
list = append(list, c.list...)
c.mux.Unlock()
func (t *Trace) GetRecords() (list []*api.Record) {
// it is supposed to wait until all API messages sent to the
// buffer are processed before returning the list
t.wg.Wait()
list = make([]*api.Record, t.index)
VladoLavor marked this conversation as resolved.
Show resolved Hide resolved
t.RLock()
copy(list, t.records[:t.index])
t.RUnlock()
sort.Slice(list, func(i, j int) bool {
return list[i].Timestamp.Before(list[j].Timestamp)
})
return list
}

func (c *trace) GetRecordsForChannel(chId uint16) (list []*api.Record) {
c.mux.Lock()
for _, entry := range c.list {
if entry.ChannelID == chId {
list = append(list, entry)
func (t *Trace) GetRecordsForChannel(chId uint16) (list []*api.Record) {
records := t.GetRecords()
for _, record := range records {
if record.ChannelID == chId {
list = append(list, record)
}
}
c.mux.Unlock()
sort.Slice(list, func(i, j int) bool {
return list[i].Timestamp.Before(list[j].Timestamp)
})
return list
}

func (c *trace) Clear() {
c.mux.Lock()
c.list = make([]*api.Record, 0)
c.mux.Unlock()
func (t *Trace) Clear() {
t.Lock()
t.records = make([]*api.Record, len(t.records))
t.index = 0
t.Unlock()
}

func (t *Trace) Close() {
t.closeFunc()
}

func (t *Trace) registerNew() (now time.Time, enabled bool) {
if t != nil {
t.wg.Add(1)
enabled = true
}
return time.Now(), enabled
}

func (t *Trace) send(record *api.Record) {
if t != nil {
t.buffer <- record
}
}
Loading
Loading