Skip to content

Commit

Permalink
Add Key deserializer for KafkaSource (#1326)
Browse files Browse the repository at this point in the history
* TODO

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Add KeyDeserializer to data plane

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Set key type on the control plane

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Add comments and copyright headers

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi authored Oct 19, 2021
1 parent 0a5ca0b commit ccb0489
Show file tree
Hide file tree
Showing 23 changed files with 1,107 additions and 161 deletions.
247 changes: 158 additions & 89 deletions control-plane/pkg/contract/contract.pb.go

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions control-plane/pkg/core/config/egress.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,19 @@ func AddOrUpdateEgressConfig(ct *contract.Contract, brokerIndex int, egress *con

return EgressChanged
}

// KeyTypeFromString returns the contract.KeyType associated to a given string.
func KeyTypeFromString(s string) contract.KeyType {
switch s {
case "byte-array":
return contract.KeyType_ByteArray
case "string":
return contract.KeyType_String
case "int":
return contract.KeyType_Integer
case "float":
return contract.KeyType_Double
default:
return contract.KeyType_String
}
}
35 changes: 35 additions & 0 deletions control-plane/pkg/core/config/egress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,38 @@ func TestAddOrUpdateEgressConfig(t *testing.T) {
})
}
}

func TestKeyTypeFromString(t *testing.T) {
tests := []struct {
s string
want contract.KeyType
}{
{
s: "int",
want: contract.KeyType_Integer,
},
{
s: "float",
want: contract.KeyType_Double,
},
{
s: "byte-array",
want: contract.KeyType_ByteArray,
},
{
s: "string",
want: contract.KeyType_String,
},
{
s: "unknown",
want: contract.KeyType_String,
},
}
for _, tt := range tests {
t.Run(tt.s, func(t *testing.T) {
if got := KeyTypeFromString(tt.s); got != tt.want {
t.Errorf("KeyTypeFromString() = %v, want %v", got, tt.want)
}
})
}
}
4 changes: 4 additions & 0 deletions control-plane/pkg/reconciler/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ func (r *Reconciler) getResource(ctx context.Context, ks *sources.KafkaSource, s
EgressConfig: egressConfig,
DeliveryOrder: DefaultDeliveryOrder,
}
// Set key type hint (if any).
if keyType, ok := ks.Labels[sources.KafkaKeyTypeLabel]; ok {
egress.KeyType = coreconfig.KeyTypeFromString(keyType)
}
resource := &contract.Resource{
Uid: string(ks.GetUID()),
Topics: ks.Spec.Topics,
Expand Down
208 changes: 208 additions & 0 deletions control-plane/pkg/reconciler/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,214 @@ func TestReconcileKind(t *testing.T) {
},
},
},
{
Name: "Reconciled normal - key type string",
Objects: []runtime.Object{
NewSourceSinkObject(),
NewSource(WithKeyType("string")),
SourceDispatcherPod(configs.SystemNamespace, map[string]string{
"annotation_to_preserve": "value_to_preserve",
}),
},
Key: testKey,
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(&configs, &contract.Contract{
Generation: 1,
Resources: []*contract.Resource{
{
Uid: SourceUUID,
Topics: SourceTopics,
BootstrapServers: SourceBootstrapServers,
Egresses: []*contract.Egress{
{
ConsumerGroup: SourceConsumerGroup,
Destination: ServiceURL,
Uid: SourceUUID,
EgressConfig: &DefaultEgressConfig,
DeliveryOrder: DefaultDeliveryOrder,
KeyType: contract.KeyType_String,
},
},
Auth: &contract.Resource_AbsentAuth{},
},
},
}),
SourceDispatcherPodUpdate(configs.SystemNamespace, map[string]string{
"annotation_to_preserve": "value_to_preserve",
base.VolumeGenerationAnnotationKey: "1",
}),
},
SkipNamespaceValidation: true, // WantCreates compare the broker namespace with configmap namespace, so skip it
WantCreates: []runtime.Object{
NewConfigMap(&configs, nil),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{
{
Object: NewSource(
WithKeyType("string"),
SourceConfigMapUpdatedReady(&configs),
SourceTopicsReady,
SourceDataPlaneAvailable,
),
},
},
},
{
Name: "Reconciled normal - key type int",
Objects: []runtime.Object{
NewSourceSinkObject(),
NewSource(WithKeyType("int")),
SourceDispatcherPod(configs.SystemNamespace, map[string]string{
"annotation_to_preserve": "value_to_preserve",
}),
},
Key: testKey,
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(&configs, &contract.Contract{
Generation: 1,
Resources: []*contract.Resource{
{
Uid: SourceUUID,
Topics: SourceTopics,
BootstrapServers: SourceBootstrapServers,
Egresses: []*contract.Egress{
{
ConsumerGroup: SourceConsumerGroup,
Destination: ServiceURL,
Uid: SourceUUID,
EgressConfig: &DefaultEgressConfig,
DeliveryOrder: DefaultDeliveryOrder,
KeyType: contract.KeyType_Integer,
},
},
Auth: &contract.Resource_AbsentAuth{},
},
},
}),
SourceDispatcherPodUpdate(configs.SystemNamespace, map[string]string{
"annotation_to_preserve": "value_to_preserve",
base.VolumeGenerationAnnotationKey: "1",
}),
},
SkipNamespaceValidation: true, // WantCreates compare the broker namespace with configmap namespace, so skip it
WantCreates: []runtime.Object{
NewConfigMap(&configs, nil),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{
{
Object: NewSource(
WithKeyType("int"),
SourceConfigMapUpdatedReady(&configs),
SourceTopicsReady,
SourceDataPlaneAvailable,
),
},
},
},
{
Name: "Reconciled normal - key type byte-array",
Objects: []runtime.Object{
NewSourceSinkObject(),
NewSource(WithKeyType("byte-array")),
SourceDispatcherPod(configs.SystemNamespace, map[string]string{
"annotation_to_preserve": "value_to_preserve",
}),
},
Key: testKey,
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(&configs, &contract.Contract{
Generation: 1,
Resources: []*contract.Resource{
{
Uid: SourceUUID,
Topics: SourceTopics,
BootstrapServers: SourceBootstrapServers,
Egresses: []*contract.Egress{
{
ConsumerGroup: SourceConsumerGroup,
Destination: ServiceURL,
Uid: SourceUUID,
EgressConfig: &DefaultEgressConfig,
DeliveryOrder: DefaultDeliveryOrder,
KeyType: contract.KeyType_ByteArray,
},
},
Auth: &contract.Resource_AbsentAuth{},
},
},
}),
SourceDispatcherPodUpdate(configs.SystemNamespace, map[string]string{
"annotation_to_preserve": "value_to_preserve",
base.VolumeGenerationAnnotationKey: "1",
}),
},
SkipNamespaceValidation: true, // WantCreates compare the broker namespace with configmap namespace, so skip it
WantCreates: []runtime.Object{
NewConfigMap(&configs, nil),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{
{
Object: NewSource(
WithKeyType("byte-array"),
SourceConfigMapUpdatedReady(&configs),
SourceTopicsReady,
SourceDataPlaneAvailable,
),
},
},
},
{
Name: "Reconciled normal - key type float",
Objects: []runtime.Object{
NewSourceSinkObject(),
NewSource(WithKeyType("float")),
SourceDispatcherPod(configs.SystemNamespace, map[string]string{
"annotation_to_preserve": "value_to_preserve",
}),
},
Key: testKey,
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(&configs, &contract.Contract{
Generation: 1,
Resources: []*contract.Resource{
{
Uid: SourceUUID,
Topics: SourceTopics,
BootstrapServers: SourceBootstrapServers,
Egresses: []*contract.Egress{
{
ConsumerGroup: SourceConsumerGroup,
Destination: ServiceURL,
Uid: SourceUUID,
EgressConfig: &DefaultEgressConfig,
DeliveryOrder: DefaultDeliveryOrder,
KeyType: contract.KeyType_Double,
},
},
Auth: &contract.Resource_AbsentAuth{},
},
},
}),
SourceDispatcherPodUpdate(configs.SystemNamespace, map[string]string{
"annotation_to_preserve": "value_to_preserve",
base.VolumeGenerationAnnotationKey: "1",
}),
},
SkipNamespaceValidation: true, // WantCreates compare the broker namespace with configmap namespace, so skip it
WantCreates: []runtime.Object{
NewConfigMap(&configs, nil),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{
{
Object: NewSource(
WithKeyType("float"),
SourceConfigMapUpdatedReady(&configs),
SourceTopicsReady,
SourceDataPlaneAvailable,
),
},
},
},
}

table.Test(t, NewFactory(&configs, func(ctx context.Context, listers *Listers, configs *broker.Configs, row *TableRow) controller.Reconciler {
Expand Down
9 changes: 9 additions & 0 deletions control-plane/pkg/reconciler/testing/objects_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ func NewSource(options ...SourceOption) *sources.KafkaSource {
return s
}

func WithKeyType(keyType string) SourceOption {
return func(ks *sources.KafkaSource) {
if ks.Labels == nil {
ks.Labels = make(map[string]string, 1)
}
ks.Labels[sources.KafkaKeyTypeLabel] = keyType
}
}

func NewSourceSinkObject() *corev1.Service {
return NewService()
}
Expand Down
Loading

0 comments on commit ccb0489

Please sign in to comment.