Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: FDN-278 - SDKConfig Event #259

Merged
merged 13 commits into from
Jun 7, 2024
2 changes: 2 additions & 0 deletions .github/workflows/run-test-harness.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ jobs:
labels: ubuntu-latest-4-core
steps:
- uses: DevCycleHQ/test-harness@main
env:
SDK_CAPABILITIES: '["cloud","edgeDB","clientCustomData","multithreading","defaultReason","etagReporting","lastModifiedHeader","sdkConfigEvent","clientUUID"]'
with:
sdks-to-test: '["go"]'
sdk-github-sha: ${{github.event.pull_request.head.sha}}
Expand Down
14 changes: 12 additions & 2 deletions api/model_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
EventType_AggVariableEvaluated = "aggVariableEvaluated"
EventType_VariableDefaulted = "variableDefaulted"
EventType_AggVariableDefaulted = "aggVariableDefaulted"
EventType_SDKConfig = "sdkConfig"
EventType_CustomEvent = "customEvent"
)

Expand Down Expand Up @@ -51,6 +52,7 @@ func (fp *FlushPayload) AddBatchRecordForUser(record UserEventsBatchRecord, chun
for _, chunk := range chunkedEvents {
userRecord.Events = append(userRecord.Events, chunk...)
}
fp.setRecordForUser(record.User.UserId, *userRecord)
} else {
for _, chunk := range chunkedEvents {
fp.Records = append(fp.Records, UserEventsBatchRecord{
Expand All @@ -59,7 +61,6 @@ func (fp *FlushPayload) AddBatchRecordForUser(record UserEventsBatchRecord, chun
})
}
}

}

func (fp *FlushPayload) getRecordForUser(userId string) *UserEventsBatchRecord {
Expand All @@ -71,6 +72,15 @@ func (fp *FlushPayload) getRecordForUser(userId string) *UserEventsBatchRecord {
return nil
}

func (fp *FlushPayload) setRecordForUser(userId string, record UserEventsBatchRecord) {
for i, r := range fp.Records {
if r.User.UserId == userId {
fp.Records[i] = record
return
}
}
}

type BatchEventsBody struct {
Batch []UserEventsBatchRecord `json:"batch"`
}
Expand Down Expand Up @@ -103,7 +113,7 @@ func (o *EventQueueOptions) CheckBounds() {

func (o *EventQueueOptions) IsEventLoggingDisabled(eventType string) bool {
switch eventType {
case EventType_VariableEvaluated, EventType_AggVariableEvaluated, EventType_VariableDefaulted, EventType_AggVariableDefaulted:
case EventType_VariableEvaluated, EventType_AggVariableEvaluated, EventType_VariableDefaulted, EventType_AggVariableDefaulted, EventType_SDKConfig:
return o.DisableAutomaticEventLogging
default:
return o.DisableCustomEventLogging
Expand Down
20 changes: 15 additions & 5 deletions bucketing/event_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ func (u *UserEventQueue) BuildBatchRecords() []api.UserEventsBatchRecord {

func (agg *AggregateEventQueue) BuildBatchRecords(platformData *api.PlatformData, clientUUID, configEtag, rayId, lastModified string) api.UserEventsBatchRecord {
var aggregateEvents []api.Event
userId, err := os.Hostname()
hostname, err := os.Hostname()
if err != nil {
userId = "aggregate"
hostname = "aggregate"
}
userId := fmt.Sprintf("%s@%s", clientUUID, hostname)
emptyFeatureVars := make(map[string]string)

// type is either aggVariableEvaluated or aggVariableDefaulted
Expand Down Expand Up @@ -324,6 +325,7 @@ func (eq *EventQueue) HandleFlushResults(successPayloads []string, failurePayloa
eq.eventsReported.Add(reported)
}

// Metrics returns the number of events flushed, reported, and dropped
func (eq *EventQueue) Metrics() (int32, int32, int32) {
return eq.eventsFlushed.Load(), eq.eventsReported.Load(), eq.eventsDropped.Load()
}
Expand Down Expand Up @@ -382,12 +384,20 @@ func (eq *EventQueue) processEvents(ctx context.Context) {
close(eq.userEventQueueRaw)
close(eq.aggEventQueueRaw)
return
case userEvent := <-eq.userEventQueueRaw:
case userEvent, ok := <-eq.userEventQueueRaw:
// if the channel is closed - ok will be false
if !ok {
return
}
err := eq.processUserEvent(userEvent)
if err != nil {
return
}
case aggEvent := <-eq.aggEventQueueRaw:
case aggEvent, ok := <-eq.aggEventQueueRaw:
// if the channel is closed - ok will be false
if !ok {
return
}
err := eq.processAggregateEvent(aggEvent)
if err != nil {
return
Expand Down Expand Up @@ -420,7 +430,7 @@ func (eq *EventQueue) processUserEvent(event userEventData) (err error) {
event.event.FeatureVars = bucketedConfig.FeatureVariationMap

switch event.event.Type_ {
case api.EventType_AggVariableDefaulted, api.EventType_VariableDefaulted, api.EventType_AggVariableEvaluated, api.EventType_VariableEvaluated:
case api.EventType_AggVariableDefaulted, api.EventType_VariableDefaulted, api.EventType_AggVariableEvaluated, api.EventType_VariableEvaluated, api.EventType_SDKConfig:
break
default:
event.event.CustomType = event.event.Type_
Expand Down
6 changes: 3 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ type LocalBucketing interface {
InternalEventQueue
GenerateBucketedConfigForUser(user User) (ret *BucketedUserConfig, err error)
SetClientCustomData(map[string]interface{}) error
GetClientUUID() string
Variable(user User, key string, variableType string) (variable Variable, err error)
Close()
}
Expand Down Expand Up @@ -120,7 +119,8 @@ func NewClient(sdkKey string, options *Options) (*Client, error) {
return c, fmt.Errorf("Error initializing event queue: %w", err)
}

c.configManager = NewEnvironmentConfigManager(sdkKey, c.localBucketing, options, c.cfg)
c.configManager = NewEnvironmentConfigManager(sdkKey, c.localBucketing, c.eventQueue, options, c.cfg)

c.configManager.StartPolling(options.ConfigPollingIntervalMS)

if c.DevCycleOptions.OnInitializedChannel != nil {
Expand Down Expand Up @@ -153,7 +153,7 @@ func (c *Client) handleInitialization() {
c.isInitialized = true

if c.IsLocalBucketing() {
util.Infof("Client initialized with local bucketing %v", c.localBucketing.GetClientUUID())
util.Infof("Client initialized with local bucketing %v", c.localBucketing.GetUUID())
}
if c.DevCycleOptions.OnInitializedChannel != nil {
go func() {
Expand Down
15 changes: 5 additions & 10 deletions client_native_bucketing.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,12 @@ func NewNativeLocalBucketing(sdkKey string, platformData *api.PlatformData, opti
}, err
}

func (n *NativeLocalBucketing) GetUUID() string {
return n.clientUUID
}

func (n *NativeLocalBucketing) StoreConfig(configJSON []byte, eTag, rayId, lastModified string) error {
oldETag := bucketing.GetEtag(n.sdkKey)
_, err := n.eventQueue.FlushEventQueue(n.clientUUID, oldETag, n.GetRayId(), n.GetLastModified())
if err != nil {
return fmt.Errorf("Error flushing events for %s: %w", oldETag, err)
}
err = bucketing.SetConfig(configJSON, n.sdkKey, eTag, rayId, lastModified, n.eventQueue)
err := bucketing.SetConfig(configJSON, n.sdkKey, eTag, rayId, lastModified, n.eventQueue)
if err != nil {
return fmt.Errorf("Error parsing config: %w", err)
}
Expand All @@ -82,10 +81,6 @@ func (n *NativeLocalBucketing) HasConfig() bool {
return bucketing.HasConfig(n.sdkKey)
}

func (n *NativeLocalBucketing) GetClientUUID() string {
return n.clientUUID
}

func (n *NativeLocalBucketing) GetLastModified() string {
return bucketing.GetLastModified(n.sdkKey)
}
Expand Down
69 changes: 65 additions & 4 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package devcycle
import (
"flag"
"fmt"
"github.com/devcyclehq/go-server-sdk/v2/api"
"github.com/devcyclehq/go-server-sdk/v2/util"
"github.com/stretchr/testify/require"
"io"
"log"
"net/http"
"os"
"reflect"
"strings"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -434,12 +436,71 @@ func TestClient_Validate_OnInitializedChannel_EnableCloudBucketing_Options(t *te
t.Fatal("Expected config to be loaded")
}
}
func TestClient_ConfigUpdatedEvent(t *testing.T) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Consider adding a comment to explain the purpose of this test.

Adding a brief comment at the beginning of the test function can help future maintainers understand the purpose and context of this test.

Suggested change
func TestClient_ConfigUpdatedEvent(t *testing.T) {
func TestClient_ConfigUpdatedEvent(t *testing.T) {
// Test to ensure the client correctly handles configuration update events

httpmock.Activate()
defer httpmock.DeactivateAndReset()
httpConfigMock(200)
responder := func(req *http.Request) (*http.Response, error) {
reqBody, err := io.ReadAll(req.Body)
fmt.Println(string(reqBody))
if err != nil {
return httpmock.NewStringResponse(500, `{}`), err
}
if !strings.Contains(string(reqBody), api.EventType_SDKConfig) {
t.Fatal("Expected config updated event in request body")
}
return httpmock.NewStringResponse(201, `{}`), nil
}
httpmock.RegisterResponder("POST", "https://config-updated.devcycle.com/v1/events/batch", responder)
c, err := NewClient(test_environmentKey, &Options{EventsAPIURI: "https://config-updated.devcycle.com", EventFlushIntervalMS: time.Millisecond * 500})
fatalErr(t, err)
if !c.isInitialized {
t.Fatal("Expected client to be initialized")
}
if !c.hasConfig() {
t.Fatal("Expected client to have config")
}

func fatalErr(t *testing.T, err error) {
t.Helper()
if err != nil {
t.Fatal(err)
require.Eventually(t, func() bool {
return httpmock.GetCallCountInfo()["POST https://config-updated.devcycle.com/v1/events/batch"] >= 1
}, 1*time.Second, 100*time.Millisecond)
}

func TestClient_ConfigUpdatedEvent_VariableEval(t *testing.T) {
httpmock.Activate()
defer httpmock.DeactivateAndReset()
httpConfigMock(200)
responder := func(req *http.Request) (*http.Response, error) {
reqBody, err := io.ReadAll(req.Body)
fmt.Println(string(reqBody))
if err != nil {
return httpmock.NewStringResponse(500, `{}`), err
}
if !strings.Contains(string(reqBody), api.EventType_SDKConfig) || !strings.Contains(string(reqBody), api.EventType_AggVariableDefaulted) {
fmt.Println("Expected config updated event and defaulted event in request body")
}
return httpmock.NewStringResponse(201, `{}`), nil
}
httpmock.RegisterResponder("POST", "https://config-updated.devcycle.com/v1/events/batch", responder)
c, err := NewClient(test_environmentKey, &Options{EventsAPIURI: "https://config-updated.devcycle.com", EventFlushIntervalMS: time.Millisecond * 500})
fatalErr(t, err)
if !c.isInitialized {
t.Fatal("Expected client to be initialized")
}
if !c.hasConfig() {
t.Fatal("Expected client to have config")
}

user := User{UserId: "j_test", DeviceModel: "testing"}
variable, _ := c.Variable(user, "variableThatShouldBeDefaulted", true)

if !variable.IsDefaulted {
t.Fatal("Expected variable to be defaulted")
}

require.Eventually(t, func() bool {
return httpmock.GetCallCountInfo()["POST https://config-updated.devcycle.com/v1/events/batch"] >= 1
}, 1*time.Second, 100*time.Millisecond)
}

var (
Expand Down
12 changes: 11 additions & 1 deletion configmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ type EnvironmentConfigManager struct {
stopPolling context.CancelFunc
httpClient *http.Client
cfg *HTTPConfiguration
eventManager *EventManager
ticker *time.Ticker
}

func NewEnvironmentConfigManager(
sdkKey string,
localBucketing ConfigReceiver,
manager *EventManager,
options *Options,
cfg *HTTPConfiguration,
) (e *EnvironmentConfigManager) {
Expand All @@ -47,7 +49,8 @@ func NewEnvironmentConfigManager(
// Use the configurable timeout because fetching the first config can block SDK initialization.
Timeout: options.RequestTimeout,
},
firstLoad: true,
eventManager: manager,
firstLoad: true,
}

configManager.context, configManager.stopPolling = context.WithCancel(context.Background())
Expand Down Expand Up @@ -114,6 +117,7 @@ func (e *EnvironmentConfigManager) fetchConfig(numRetriesRemaining int) (err err
defer resp.Body.Close()
switch statusCode := resp.StatusCode; {
case statusCode == http.StatusOK:
resp.Request = req
return e.setConfigFromResponse(resp)
case statusCode == http.StatusNotModified:
return nil
Expand Down Expand Up @@ -164,6 +168,12 @@ func (e *EnvironmentConfigManager) setConfigFromResponse(response *http.Response
}

util.Infof("Config set. ETag: %s Last-Modified: %s\n", e.localBucketing.GetETag(), e.localBucketing.GetLastModified())
if e.eventManager != nil {
err = e.eventManager.QueueSDKConfigEvent(*response.Request, *response)
if err != nil {
util.Warnf("Error queuing SDK config event: %s\n", err)
}
}

if e.firstLoad {
e.firstLoad = false
Expand Down
8 changes: 4 additions & 4 deletions configmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestEnvironmentConfigManager_fetchConfig_success(t *testing.T) {
httpConfigMock(200)

localBucketing := &recordingConfigReceiver{}
manager := NewEnvironmentConfigManager(test_environmentKey, localBucketing, test_options, NewConfiguration(test_options))
manager := NewEnvironmentConfigManager(test_environmentKey, localBucketing, nil, test_options, NewConfiguration(test_options))

err := manager.initialFetch()
if err != nil {
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestEnvironmentConfigManager_fetchConfig_retries500(t *testing.T) {
)

localBucketing := &recordingConfigReceiver{}
manager := NewEnvironmentConfigManager(test_environmentKey, localBucketing, test_options, NewConfiguration(test_options))
manager := NewEnvironmentConfigManager(test_environmentKey, localBucketing, nil, test_options, NewConfiguration(test_options))

err := manager.initialFetch()
if err != nil {
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestEnvironmentConfigManager_fetchConfig_retries_errors(t *testing.T) {
)

localBucketing := &recordingConfigReceiver{}
manager := NewEnvironmentConfigManager(test_environmentKey, localBucketing, test_options, NewConfiguration(test_options))
manager := NewEnvironmentConfigManager(test_environmentKey, localBucketing, nil, test_options, NewConfiguration(test_options))

err := manager.initialFetch()
if err != nil {
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestEnvironmentConfigManager_fetchConfig_returns_errors(t *testing.T) {
)

localBucketing := &recordingConfigReceiver{}
manager := NewEnvironmentConfigManager(test_environmentKey, localBucketing, test_options, NewConfiguration(test_options))
manager := NewEnvironmentConfigManager(test_environmentKey, localBucketing, nil, test_options, NewConfiguration(test_options))

err := manager.initialFetch()
if err == nil {
Expand Down
Loading
Loading