Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable accessing Message in TableView #1174

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pulsar/table_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,18 @@ type TableView interface {
// ContainsKey returns true if this TableView contains a mapping for the specified key.
ContainsKey(key string) bool

// Get returns the value to which the specified key is mapped, or nil if this map contains no mapping for the key.
// Get returns the value to which the specified key is mapped, or nil if this map contains no mapping for the key or the Message cannot be encoded to the SchemaValueType
Get(key string) interface{}

// Entries returns a map view of the mappings contained in this TableView.
// Message returns the Message to which the specified key is mapped, or nil if this map contains no mapping for the key.
Message(key string) Message

// Entries returns a map view of the mappings contained in this TableView, with values encoded into SchemaValueType.
Entries() map[string]interface{}

// Messages returns a map view of the Message mappings contained in this TableView.
Messages() map[string]Message

// Keys returns a slice of the keys contained in this TableView.
Keys() []string

Expand Down
57 changes: 48 additions & 9 deletions pulsar/table_view_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type TableViewImpl struct {
options TableViewOptions

dataMu sync.Mutex
data map[string]interface{}
data map[string]Message

readersMu sync.Mutex
cancelRaders map[string]cancelReader
Expand Down Expand Up @@ -75,7 +75,7 @@ func newTableView(client *client, options TableViewOptions) (TableView, error) {
tv := TableViewImpl{
client: client,
options: options,
data: make(map[string]interface{}),
data: make(map[string]Message),
cancelRaders: make(map[string]cancelReader),
logger: logger,
closedCh: make(chan struct{}),
Expand Down Expand Up @@ -178,6 +178,23 @@ func (tv *TableViewImpl) ContainsKey(key string) bool {
}

func (tv *TableViewImpl) Get(key string) interface{} {
tv.dataMu.Lock()
defer tv.dataMu.Unlock()
msg, ok := tv.data[key]
if !ok {
return nil
}

v, err := tv.schemaValueFromMessage(msg)
if err != nil {
tv.logger.Errorf("getting value for message, %w; msg is %v", err, msg)
return nil
}

return v
}

func (tv *TableViewImpl) Message(key string) Message {
tv.dataMu.Lock()
defer tv.dataMu.Unlock()
return tv.data[key]
Expand All @@ -186,10 +203,23 @@ func (tv *TableViewImpl) Get(key string) interface{} {
func (tv *TableViewImpl) Entries() map[string]interface{} {
tv.dataMu.Lock()
defer tv.dataMu.Unlock()

data := make(map[string]interface{}, len(tv.data))
for k, v := range tv.data {
for k, msg := range tv.data {
v, err := tv.schemaValueFromMessage(msg)
if err != nil {
tv.logger.Errorf("getting value for message, %w; msg is %v", len(tv.listeners), err, msg)
continue
}
data[k] = v
}

return data
}

func (tv *TableViewImpl) Messages() map[string]Message {
tv.dataMu.Lock()
defer tv.dataMu.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is returning the same underlying map, allowing for race-conditions and modifications from the application.

Note: the existing code is also wrong because it was returning tv.data instead of data

return tv.data
}

Expand Down Expand Up @@ -245,23 +275,32 @@ func (tv *TableViewImpl) handleMessage(msg Message) {
tv.dataMu.Lock()
defer tv.dataMu.Unlock()

payload := reflect.New(tv.options.SchemaValueType)
if len(msg.Payload()) == 0 {
delete(tv.data, msg.Key())
} else {
if err := msg.GetSchemaValue(payload.Interface()); err != nil {
tv.logger.Errorf("msg.GetSchemaValue() failed with %v; msg is %v", err, msg)
}
tv.data[msg.Key()] = reflect.Indirect(payload).Interface()
tv.data[msg.Key()] = msg
}

v, err := tv.schemaValueFromMessage(msg)
if err != nil {
tv.logger.Errorf("will not action %d listeners, getting value for message, %w; msg is %v", len(tv.listeners), err, msg)
return
}
for _, listener := range tv.listeners {
if err := listener(msg.Key(), reflect.Indirect(payload).Interface()); err != nil {
if err := listener(msg.Key(), v); err != nil {
tv.logger.Errorf("table view listener failed for %v: %w", msg, err)
}
}
}

func (tv *TableViewImpl) schemaValueFromMessage(msg Message) (interface{}, error) {
payload := reflect.New(tv.options.SchemaValueType)
if err := msg.GetSchemaValue(payload.Interface()); err != nil {
return nil, fmt.Errorf("msg.GetSchemaValue() failed: %w", err)
}
return reflect.Indirect(payload).Interface(), nil
}

func (tv *TableViewImpl) watchReaderForNewMessages(ctx context.Context, reader Reader) {
for {
msg, err := reader.Next(ctx)
Expand Down
66 changes: 66 additions & 0 deletions pulsar/table_view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,72 @@ func TestTableView(t *testing.T) {
}
}

func TestTableView_Message(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.NoError(t, err)
defer client.Close()

topic := newTopicName()
schema := NewStringSchema(nil)

// Create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
Schema: schema,
})
assert.NoError(t, err)
defer producer.Close()

numMsg := 10
valuePrefix := "hello table view: "
publicationTimeForKey := map[string]time.Time{}
keys := make([]string, 0, numMsg)

for i := 0; i < numMsg; i++ {
key := fmt.Sprintf("%d", i)
keys = append(keys, key)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
_, err = producer.Send(ctx, &ProducerMessage{
Key: key,
Value: fmt.Sprintf(valuePrefix + key),
})
assert.NoError(t, err)

publicationTimeForKey[key] = time.Now()
}

// Create table view
v := ""
tv, err := client.CreateTableView(TableViewOptions{
Topic: topic,
Schema: schema,
SchemaValueType: reflect.TypeOf(&v),
})
assert.NoError(t, err)
defer tv.Close()

// Wait until table view receives all messages
for tv.Size() < numMsg {
time.Sleep(time.Second * 500)
t.Logf("TableView number of elements: %d", tv.Size())
}

for _, k := range keys {
msg := tv.Message(k)

// Check that the payload can be accessed as bytes
assert.Equal(t, []byte(fmt.Sprintf("%s%s", valuePrefix, k)), msg.Payload())

// Check publication times can be accessed and are close to the recorded times above
assert.WithinDuration(t, publicationTimeForKey[k], msg.PublishTime(), time.Millisecond*10)
}
}

func TestTableViewSchemas(t *testing.T) {
var tests = []struct {
name string
Expand Down