Skip to content

Commit

Permalink
Learn from all executed cells (#341)
Browse files Browse the repository at this point in the history
* See TN013 for more detail
  • Loading branch information
jlewi authored Nov 22, 2024
1 parent d63d18b commit 2d4e8e3
Show file tree
Hide file tree
Showing 8 changed files with 363 additions and 179 deletions.
10 changes: 5 additions & 5 deletions app/pkg/analyze/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type Analyzer struct {

watcher *fsnotify.Watcher

learnNotifier PostBlockEvent
learnNotifier PostSessionEvent

handleLogFileIsDone sync.WaitGroup
handleBlocksIsDone sync.WaitGroup
Expand Down Expand Up @@ -152,13 +152,13 @@ type blockItem struct {
id string
}

// PostBlockEvent interface for functions to post block events.
type PostBlockEvent func(id string) error
// PostSessionEvent interface for functions to post session events.
type PostSessionEvent func(id string) error

// Run runs the analyzer; continually processing logs.
// learnNotifier is an optional function that will be called when a block is updated.
// This should be non blocking.
func (a *Analyzer) Run(ctx context.Context, logDirs []string, learnNotifier PostBlockEvent) error {
func (a *Analyzer) Run(ctx context.Context, logDirs []string, learnNotifier PostSessionEvent) error {
a.learnNotifier = learnNotifier
// Find all the current files
jsonFiles, err := findLogFilesInDirs(ctx, logDirs)
Expand Down Expand Up @@ -276,7 +276,7 @@ func (a *Analyzer) processLogFile(ctx context.Context, path string) error {
}

// Add the entry to a session if it should be.
a.sessBuilder.processLogEntry(entry)
a.sessBuilder.processLogEntry(entry, a.learnNotifier)

if matchers.IsLogEvent(entry.Function()) {
a.processLogEvent(ctx, entry)
Expand Down
13 changes: 10 additions & 3 deletions app/pkg/analyze/session_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ func NewSessionBuilder(sessions *SessionsManager) (*sessionBuilder, error) {

// n.b. processLogEntry doesn't return a error because we expect errors to be ignored and for processing to continue.
// I'm not sure that's a good idea but we'll see.
func (p *sessionBuilder) processLogEntry(entry *api.LogEntry) {
func (p *sessionBuilder) processLogEntry(entry *api.LogEntry, notifier PostSessionEvent) {
// We need to use HasPrefix because the logging statement is nested inside an anonymous function so there
// will be a suffix like "func1"
if matchers.IsLogEvent(entry.Function()) {
// TODO(Jeremy): There is also Analyzer.processLogEvent
p.processLogEvent(entry)
p.processLogEvent(entry, notifier)
}

if matchers.IsLLMUsage(entry.Function()) {
Expand All @@ -50,7 +50,7 @@ func (p *sessionBuilder) processLogEntry(entry *api.LogEntry) {
}
}

func (p *sessionBuilder) processLogEvent(entry *api.LogEntry) {
func (p *sessionBuilder) processLogEvent(entry *api.LogEntry, notifier PostSessionEvent) {
log := zapr.NewLogger(zap.L())
event := &v1alpha1.LogEvent{}

Expand All @@ -71,6 +71,13 @@ func (p *sessionBuilder) processLogEvent(entry *api.LogEntry) {

if err := p.sessions.Update(context.Background(), event.GetContextId(), updateFunc); err != nil {
log.Error(err, "Failed to update session", "event", event)
return
}

if event.Type == v1alpha1.LogEventType_SESSION_END {
if err := notifier(event.GetContextId()); err != nil {
log.Error(err, "Failed to send session process event")
}
}
}

Expand Down
12 changes: 9 additions & 3 deletions app/pkg/analyze/session_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package analyze
import (
"context"
"database/sql"
"fmt"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -70,6 +71,12 @@ func setup() (testTuple, error) {
}, nil
}

// Process the log entry
func testNotifier(contextId string) error {
fmt.Printf("Received session end event for context: %v", contextId)
return nil
}

func Test_ProcessLogEvent(t *testing.T) {
tuple, err := setup()
if err != nil {
Expand Down Expand Up @@ -101,8 +108,7 @@ func Test_ProcessLogEvent(t *testing.T) {
"event": logs.ZapProto("event", event).Interface,
}

// Process the log entry
tuple.p.processLogEvent(entry)
tuple.p.processLogEvent(entry, testNotifier)

s, err := tuple.sessions.Get(context.Background(), event.GetContextId())
if err != nil {
Expand Down Expand Up @@ -154,7 +160,7 @@ func Test_ProcessStreamGenerate(t *testing.T) {
}

// Process the log entry
tuple.p.processLogEntry(entry)
tuple.p.processLogEntry(entry, testNotifier)

s, err := tuple.sessions.Get(context.Background(), contextId)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion app/pkg/application/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (a *App) SetupLearner() (*learn.Learner, error) {
if err != nil {
return nil, err
}
return learn.NewLearner(*a.Config, client, a.LockingBlocksDB)
return learn.NewLearner(*a.Config, client, a.sessionsManager)
}

func (a *App) createComponents() error {
Expand Down
Loading

0 comments on commit 2d4e8e3

Please sign in to comment.