diff --git a/control-plane/pkg/contract/contract.pb.go b/control-plane/pkg/contract/contract.pb.go index 0b3a750950..a0cd0405e8 100644 --- a/control-plane/pkg/contract/contract.pb.go +++ b/control-plane/pkg/contract/contract.pb.go @@ -116,6 +116,58 @@ func (DeliveryOrder) EnumDescriptor() ([]byte, []int) { return file_contract_proto_rawDescGZIP(), []int{1} } +type KeyType int32 + +const ( + KeyType_String KeyType = 0 + KeyType_Integer KeyType = 1 + KeyType_Double KeyType = 2 + KeyType_ByteArray KeyType = 3 +) + +// Enum value maps for KeyType. +var ( + KeyType_name = map[int32]string{ + 0: "String", + 1: "Integer", + 2: "Double", + 3: "ByteArray", + } + KeyType_value = map[string]int32{ + "String": 0, + "Integer": 1, + "Double": 2, + "ByteArray": 3, + } +) + +func (x KeyType) Enum() *KeyType { + p := new(KeyType) + *p = x + return p +} + +func (x KeyType) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (KeyType) Descriptor() protoreflect.EnumDescriptor { + return file_contract_proto_enumTypes[2].Descriptor() +} + +func (KeyType) Type() protoreflect.EnumType { + return &file_contract_proto_enumTypes[2] +} + +func (x KeyType) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use KeyType.Descriptor instead. +func (KeyType) EnumDescriptor() ([]byte, []int) { + return file_contract_proto_rawDescGZIP(), []int{2} +} + // CloudEvent content mode type ContentMode int32 @@ -147,11 +199,11 @@ func (x ContentMode) String() string { } func (ContentMode) Descriptor() protoreflect.EnumDescriptor { - return file_contract_proto_enumTypes[2].Descriptor() + return file_contract_proto_enumTypes[3].Descriptor() } func (ContentMode) Type() protoreflect.EnumType { - return &file_contract_proto_enumTypes[2] + return &file_contract_proto_enumTypes[3] } func (x ContentMode) Number() protoreflect.EnumNumber { @@ -160,7 +212,7 @@ func (x ContentMode) Number() protoreflect.EnumNumber { // Deprecated: Use ContentMode.Descriptor instead. func (ContentMode) EnumDescriptor() ([]byte, []int) { - return file_contract_proto_rawDescGZIP(), []int{2} + return file_contract_proto_rawDescGZIP(), []int{3} } // We don't use the google.protobuf.Empty type because @@ -370,6 +422,8 @@ type Egress struct { // Delivery guarantee to use // Empty defaults to unordered DeliveryOrder DeliveryOrder `protobuf:"varint,8,opt,name=deliveryOrder,proto3,enum=DeliveryOrder" json:"deliveryOrder,omitempty"` + // Kafka record key type. + KeyType KeyType `protobuf:"varint,10,opt,name=keyType,proto3,enum=KeyType" json:"keyType,omitempty"` } func (x *Egress) Reset() { @@ -474,6 +528,13 @@ func (x *Egress) GetDeliveryOrder() DeliveryOrder { return DeliveryOrder_UNORDERED } +func (x *Egress) GetKeyType() KeyType { + if x != nil { + return x.KeyType + } + return KeyType_String +} + type isEgress_ReplyStrategy interface { isEgress_ReplyStrategy() } @@ -921,7 +982,7 @@ var file_contract_proto_rawDesc = []byte{ 0x6b, 0x6f, 0x66, 0x66, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x62, 0x61, 0x63, 0x6b, 0x6f, 0x66, 0x66, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, - 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x22, 0x87, 0x03, 0x0a, 0x06, 0x45, 0x67, 0x72, 0x65, + 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x22, 0xab, 0x03, 0x0a, 0x06, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x24, 0x0a, 0x0d, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x20, 0x0a, 0x0b, 0x64, 0x65, 0x73, 0x74, @@ -945,62 +1006,68 @@ var file_contract_proto_rawDesc = []byte{ 0x6c, 0x69, 0x76, 0x65, 0x72, 0x79, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0e, 0x2e, 0x44, 0x65, 0x6c, 0x69, 0x76, 0x65, 0x72, 0x79, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x52, 0x0d, 0x64, 0x65, 0x6c, 0x69, 0x76, 0x65, 0x72, 0x79, 0x4f, 0x72, 0x64, 0x65, 0x72, - 0x42, 0x0f, 0x0a, 0x0d, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x53, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, - 0x79, 0x22, 0x74, 0x0a, 0x07, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x2e, 0x0a, 0x0b, - 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x4d, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0e, 0x32, 0x0c, 0x2e, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x4d, 0x6f, 0x64, 0x65, 0x52, - 0x0b, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x14, 0x0a, 0x04, - 0x70, 0x61, 0x74, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x04, 0x70, 0x61, - 0x74, 0x68, 0x12, 0x14, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, - 0x48, 0x00, 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x42, 0x0d, 0x0a, 0x0b, 0x69, 0x6e, 0x67, 0x72, - 0x65, 0x73, 0x73, 0x54, 0x79, 0x70, 0x65, 0x22, 0x6b, 0x0a, 0x09, 0x52, 0x65, 0x66, 0x65, 0x72, - 0x65, 0x6e, 0x63, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x75, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x04, 0x75, 0x75, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, - 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, - 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, - 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, - 0x73, 0x69, 0x6f, 0x6e, 0x22, 0xbc, 0x02, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, - 0x65, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, - 0x75, 0x69, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x18, 0x02, 0x20, - 0x03, 0x28, 0x09, 0x52, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x2a, 0x0a, 0x10, 0x62, - 0x6f, 0x6f, 0x74, 0x73, 0x74, 0x72, 0x61, 0x70, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x62, 0x6f, 0x6f, 0x74, 0x73, 0x74, 0x72, 0x61, 0x70, - 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x12, 0x22, 0x0a, 0x07, 0x69, 0x6e, 0x67, 0x72, 0x65, - 0x73, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x49, 0x6e, 0x67, 0x72, 0x65, - 0x73, 0x73, 0x52, 0x07, 0x69, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x31, 0x0a, 0x0c, 0x65, - 0x67, 0x72, 0x65, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x05, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x0d, 0x2e, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, - 0x52, 0x0c, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x23, - 0x0a, 0x08, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x18, 0x06, 0x20, 0x03, 0x28, 0x0b, - 0x32, 0x07, 0x2e, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x08, 0x65, 0x67, 0x72, 0x65, 0x73, - 0x73, 0x65, 0x73, 0x12, 0x28, 0x0a, 0x0a, 0x61, 0x62, 0x73, 0x65, 0x6e, 0x74, 0x41, 0x75, 0x74, - 0x68, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x06, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x48, - 0x00, 0x52, 0x0a, 0x61, 0x62, 0x73, 0x65, 0x6e, 0x74, 0x41, 0x75, 0x74, 0x68, 0x12, 0x2c, 0x0a, - 0x0a, 0x61, 0x75, 0x74, 0x68, 0x53, 0x65, 0x63, 0x72, 0x65, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x0a, 0x2e, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x48, 0x00, 0x52, - 0x0a, 0x61, 0x75, 0x74, 0x68, 0x53, 0x65, 0x63, 0x72, 0x65, 0x74, 0x42, 0x06, 0x0a, 0x04, 0x41, - 0x75, 0x74, 0x68, 0x22, 0x53, 0x0a, 0x08, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x12, - 0x1e, 0x0a, 0x0a, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x04, 0x52, 0x0a, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, - 0x27, 0x0a, 0x09, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, - 0x28, 0x0b, 0x32, 0x09, 0x2e, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x52, 0x09, 0x72, - 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x2a, 0x2c, 0x0a, 0x0d, 0x42, 0x61, 0x63, 0x6b, - 0x6f, 0x66, 0x66, 0x50, 0x6f, 0x6c, 0x69, 0x63, 0x79, 0x12, 0x0f, 0x0a, 0x0b, 0x45, 0x78, 0x70, - 0x6f, 0x6e, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x69, - 0x6e, 0x65, 0x61, 0x72, 0x10, 0x01, 0x2a, 0x2b, 0x0a, 0x0d, 0x44, 0x65, 0x6c, 0x69, 0x76, 0x65, - 0x72, 0x79, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x0d, 0x0a, 0x09, 0x55, 0x4e, 0x4f, 0x52, 0x44, - 0x45, 0x52, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x4f, 0x52, 0x44, 0x45, 0x52, 0x45, - 0x44, 0x10, 0x01, 0x2a, 0x29, 0x0a, 0x0b, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x4d, 0x6f, - 0x64, 0x65, 0x12, 0x0a, 0x0a, 0x06, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x10, 0x00, 0x12, 0x0e, - 0x0a, 0x0a, 0x53, 0x54, 0x52, 0x55, 0x43, 0x54, 0x55, 0x52, 0x45, 0x44, 0x10, 0x01, 0x42, 0x5b, - 0x0a, 0x2a, 0x64, 0x65, 0x76, 0x2e, 0x6b, 0x6e, 0x61, 0x74, 0x69, 0x76, 0x65, 0x2e, 0x65, 0x76, - 0x65, 0x6e, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x62, 0x72, 0x6f, - 0x6b, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x42, 0x11, 0x44, 0x61, - 0x74, 0x61, 0x50, 0x6c, 0x61, 0x6e, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x5a, - 0x1a, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x2d, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x2f, 0x70, - 0x6b, 0x67, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x12, 0x22, 0x0a, 0x07, 0x6b, 0x65, 0x79, 0x54, 0x79, 0x70, 0x65, 0x18, 0x0a, 0x20, 0x01, 0x28, + 0x0e, 0x32, 0x08, 0x2e, 0x4b, 0x65, 0x79, 0x54, 0x79, 0x70, 0x65, 0x52, 0x07, 0x6b, 0x65, 0x79, + 0x54, 0x79, 0x70, 0x65, 0x42, 0x0f, 0x0a, 0x0d, 0x72, 0x65, 0x70, 0x6c, 0x79, 0x53, 0x74, 0x72, + 0x61, 0x74, 0x65, 0x67, 0x79, 0x22, 0x74, 0x0a, 0x07, 0x49, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, + 0x12, 0x2e, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x4d, 0x6f, 0x64, 0x65, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0c, 0x2e, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x4d, + 0x6f, 0x64, 0x65, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x4d, 0x6f, 0x64, 0x65, + 0x12, 0x14, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, + 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x12, 0x14, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x42, 0x0d, 0x0a, 0x0b, + 0x69, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x54, 0x79, 0x70, 0x65, 0x22, 0x6b, 0x0a, 0x09, 0x52, + 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x75, 0x75, 0x69, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x75, 0x75, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, + 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, + 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x18, + 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0xbc, 0x02, 0x0a, 0x08, 0x52, 0x65, 0x73, + 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, + 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, + 0x2a, 0x0a, 0x10, 0x62, 0x6f, 0x6f, 0x74, 0x73, 0x74, 0x72, 0x61, 0x70, 0x53, 0x65, 0x72, 0x76, + 0x65, 0x72, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x62, 0x6f, 0x6f, 0x74, 0x73, + 0x74, 0x72, 0x61, 0x70, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x12, 0x22, 0x0a, 0x07, 0x69, + 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x49, + 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x07, 0x69, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, + 0x31, 0x0a, 0x0c, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, + 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x43, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0c, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x12, 0x23, 0x0a, 0x08, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x18, 0x06, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x07, 0x2e, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x52, 0x08, 0x65, + 0x67, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x12, 0x28, 0x0a, 0x0a, 0x61, 0x62, 0x73, 0x65, 0x6e, + 0x74, 0x41, 0x75, 0x74, 0x68, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x06, 0x2e, 0x45, 0x6d, + 0x70, 0x74, 0x79, 0x48, 0x00, 0x52, 0x0a, 0x61, 0x62, 0x73, 0x65, 0x6e, 0x74, 0x41, 0x75, 0x74, + 0x68, 0x12, 0x2c, 0x0a, 0x0a, 0x61, 0x75, 0x74, 0x68, 0x53, 0x65, 0x63, 0x72, 0x65, 0x74, 0x18, + 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, + 0x65, 0x48, 0x00, 0x52, 0x0a, 0x61, 0x75, 0x74, 0x68, 0x53, 0x65, 0x63, 0x72, 0x65, 0x74, 0x42, + 0x06, 0x0a, 0x04, 0x41, 0x75, 0x74, 0x68, 0x22, 0x53, 0x0a, 0x08, 0x43, 0x6f, 0x6e, 0x74, 0x72, + 0x61, 0x63, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x12, 0x27, 0x0a, 0x09, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, + 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x09, 0x2e, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, + 0x65, 0x52, 0x09, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x2a, 0x2c, 0x0a, 0x0d, + 0x42, 0x61, 0x63, 0x6b, 0x6f, 0x66, 0x66, 0x50, 0x6f, 0x6c, 0x69, 0x63, 0x79, 0x12, 0x0f, 0x0a, + 0x0b, 0x45, 0x78, 0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x10, 0x00, 0x12, 0x0a, + 0x0a, 0x06, 0x4c, 0x69, 0x6e, 0x65, 0x61, 0x72, 0x10, 0x01, 0x2a, 0x2b, 0x0a, 0x0d, 0x44, 0x65, + 0x6c, 0x69, 0x76, 0x65, 0x72, 0x79, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x0d, 0x0a, 0x09, 0x55, + 0x4e, 0x4f, 0x52, 0x44, 0x45, 0x52, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x4f, 0x52, + 0x44, 0x45, 0x52, 0x45, 0x44, 0x10, 0x01, 0x2a, 0x3d, 0x0a, 0x07, 0x4b, 0x65, 0x79, 0x54, 0x79, + 0x70, 0x65, 0x12, 0x0a, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x10, 0x00, 0x12, 0x0b, + 0x0a, 0x07, 0x49, 0x6e, 0x74, 0x65, 0x67, 0x65, 0x72, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x44, + 0x6f, 0x75, 0x62, 0x6c, 0x65, 0x10, 0x02, 0x12, 0x0d, 0x0a, 0x09, 0x42, 0x79, 0x74, 0x65, 0x41, + 0x72, 0x72, 0x61, 0x79, 0x10, 0x03, 0x2a, 0x29, 0x0a, 0x0b, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, + 0x74, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x0a, 0x0a, 0x06, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x10, + 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x53, 0x54, 0x52, 0x55, 0x43, 0x54, 0x55, 0x52, 0x45, 0x44, 0x10, + 0x01, 0x42, 0x5b, 0x0a, 0x2a, 0x64, 0x65, 0x76, 0x2e, 0x6b, 0x6e, 0x61, 0x74, 0x69, 0x76, 0x65, + 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, + 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x42, + 0x11, 0x44, 0x61, 0x74, 0x61, 0x50, 0x6c, 0x61, 0x6e, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x61, + 0x63, 0x74, 0x5a, 0x1a, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x2d, 0x70, 0x6c, 0x61, 0x6e, + 0x65, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1015,42 +1082,44 @@ func file_contract_proto_rawDescGZIP() []byte { return file_contract_proto_rawDescData } -var file_contract_proto_enumTypes = make([]protoimpl.EnumInfo, 3) +var file_contract_proto_enumTypes = make([]protoimpl.EnumInfo, 4) var file_contract_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_contract_proto_goTypes = []interface{}{ (BackoffPolicy)(0), // 0: BackoffPolicy (DeliveryOrder)(0), // 1: DeliveryOrder - (ContentMode)(0), // 2: ContentMode - (*Empty)(nil), // 3: Empty - (*Filter)(nil), // 4: Filter - (*EgressConfig)(nil), // 5: EgressConfig - (*Egress)(nil), // 6: Egress - (*Ingress)(nil), // 7: Ingress - (*Reference)(nil), // 8: Reference - (*Resource)(nil), // 9: Resource - (*Contract)(nil), // 10: Contract - nil, // 11: Filter.AttributesEntry + (KeyType)(0), // 2: KeyType + (ContentMode)(0), // 3: ContentMode + (*Empty)(nil), // 4: Empty + (*Filter)(nil), // 5: Filter + (*EgressConfig)(nil), // 6: EgressConfig + (*Egress)(nil), // 7: Egress + (*Ingress)(nil), // 8: Ingress + (*Reference)(nil), // 9: Reference + (*Resource)(nil), // 10: Resource + (*Contract)(nil), // 11: Contract + nil, // 12: Filter.AttributesEntry } var file_contract_proto_depIdxs = []int32{ - 11, // 0: Filter.attributes:type_name -> Filter.AttributesEntry + 12, // 0: Filter.attributes:type_name -> Filter.AttributesEntry 0, // 1: EgressConfig.backoffPolicy:type_name -> BackoffPolicy - 3, // 2: Egress.replyToOriginalTopic:type_name -> Empty - 3, // 3: Egress.discardReply:type_name -> Empty - 4, // 4: Egress.filter:type_name -> Filter - 5, // 5: Egress.egressConfig:type_name -> EgressConfig + 4, // 2: Egress.replyToOriginalTopic:type_name -> Empty + 4, // 3: Egress.discardReply:type_name -> Empty + 5, // 4: Egress.filter:type_name -> Filter + 6, // 5: Egress.egressConfig:type_name -> EgressConfig 1, // 6: Egress.deliveryOrder:type_name -> DeliveryOrder - 2, // 7: Ingress.contentMode:type_name -> ContentMode - 7, // 8: Resource.ingress:type_name -> Ingress - 5, // 9: Resource.egressConfig:type_name -> EgressConfig - 6, // 10: Resource.egresses:type_name -> Egress - 3, // 11: Resource.absentAuth:type_name -> Empty - 8, // 12: Resource.authSecret:type_name -> Reference - 9, // 13: Contract.resources:type_name -> Resource - 14, // [14:14] is the sub-list for method output_type - 14, // [14:14] is the sub-list for method input_type - 14, // [14:14] is the sub-list for extension type_name - 14, // [14:14] is the sub-list for extension extendee - 0, // [0:14] is the sub-list for field type_name + 2, // 7: Egress.keyType:type_name -> KeyType + 3, // 8: Ingress.contentMode:type_name -> ContentMode + 8, // 9: Resource.ingress:type_name -> Ingress + 6, // 10: Resource.egressConfig:type_name -> EgressConfig + 7, // 11: Resource.egresses:type_name -> Egress + 4, // 12: Resource.absentAuth:type_name -> Empty + 9, // 13: Resource.authSecret:type_name -> Reference + 10, // 14: Contract.resources:type_name -> Resource + 15, // [15:15] is the sub-list for method output_type + 15, // [15:15] is the sub-list for method input_type + 15, // [15:15] is the sub-list for extension type_name + 15, // [15:15] is the sub-list for extension extendee + 0, // [0:15] is the sub-list for field type_name } func init() { file_contract_proto_init() } @@ -1174,7 +1243,7 @@ func file_contract_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_contract_proto_rawDesc, - NumEnums: 3, + NumEnums: 4, NumMessages: 9, NumExtensions: 0, NumServices: 0, diff --git a/control-plane/pkg/core/config/egress.go b/control-plane/pkg/core/config/egress.go index 938e78b226..dccc09841c 100644 --- a/control-plane/pkg/core/config/egress.go +++ b/control-plane/pkg/core/config/egress.go @@ -64,3 +64,19 @@ func AddOrUpdateEgressConfig(ct *contract.Contract, brokerIndex int, egress *con return EgressChanged } + +// KeyTypeFromString returns the contract.KeyType associated to a given string. +func KeyTypeFromString(s string) contract.KeyType { + switch s { + case "byte-array": + return contract.KeyType_ByteArray + case "string": + return contract.KeyType_String + case "int": + return contract.KeyType_Integer + case "float": + return contract.KeyType_Double + default: + return contract.KeyType_String + } +} diff --git a/control-plane/pkg/core/config/egress_test.go b/control-plane/pkg/core/config/egress_test.go index 192b84398a..23540fafbb 100644 --- a/control-plane/pkg/core/config/egress_test.go +++ b/control-plane/pkg/core/config/egress_test.go @@ -218,3 +218,38 @@ func TestAddOrUpdateEgressConfig(t *testing.T) { }) } } + +func TestKeyTypeFromString(t *testing.T) { + tests := []struct { + s string + want contract.KeyType + }{ + { + s: "int", + want: contract.KeyType_Integer, + }, + { + s: "float", + want: contract.KeyType_Double, + }, + { + s: "byte-array", + want: contract.KeyType_ByteArray, + }, + { + s: "string", + want: contract.KeyType_String, + }, + { + s: "unknown", + want: contract.KeyType_String, + }, + } + for _, tt := range tests { + t.Run(tt.s, func(t *testing.T) { + if got := KeyTypeFromString(tt.s); got != tt.want { + t.Errorf("KeyTypeFromString() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/control-plane/pkg/reconciler/source/source.go b/control-plane/pkg/reconciler/source/source.go index a150b89c20..80fe4a7baa 100644 --- a/control-plane/pkg/reconciler/source/source.go +++ b/control-plane/pkg/reconciler/source/source.go @@ -185,6 +185,10 @@ func (r *Reconciler) getResource(ctx context.Context, ks *sources.KafkaSource, s EgressConfig: egressConfig, DeliveryOrder: DefaultDeliveryOrder, } + // Set key type hint (if any). + if keyType, ok := ks.Labels[sources.KafkaKeyTypeLabel]; ok { + egress.KeyType = coreconfig.KeyTypeFromString(keyType) + } resource := &contract.Resource{ Uid: string(ks.GetUID()), Topics: ks.Spec.Topics, diff --git a/control-plane/pkg/reconciler/source/source_test.go b/control-plane/pkg/reconciler/source/source_test.go index 9dab6673c1..a6299d14e3 100644 --- a/control-plane/pkg/reconciler/source/source_test.go +++ b/control-plane/pkg/reconciler/source/source_test.go @@ -104,6 +104,214 @@ func TestReconcileKind(t *testing.T) { }, }, }, + { + Name: "Reconciled normal - key type string", + Objects: []runtime.Object{ + NewSourceSinkObject(), + NewSource(WithKeyType("string")), + SourceDispatcherPod(configs.SystemNamespace, map[string]string{ + "annotation_to_preserve": "value_to_preserve", + }), + }, + Key: testKey, + WantUpdates: []clientgotesting.UpdateActionImpl{ + ConfigMapUpdate(&configs, &contract.Contract{ + Generation: 1, + Resources: []*contract.Resource{ + { + Uid: SourceUUID, + Topics: SourceTopics, + BootstrapServers: SourceBootstrapServers, + Egresses: []*contract.Egress{ + { + ConsumerGroup: SourceConsumerGroup, + Destination: ServiceURL, + Uid: SourceUUID, + EgressConfig: &DefaultEgressConfig, + DeliveryOrder: DefaultDeliveryOrder, + KeyType: contract.KeyType_String, + }, + }, + Auth: &contract.Resource_AbsentAuth{}, + }, + }, + }), + SourceDispatcherPodUpdate(configs.SystemNamespace, map[string]string{ + "annotation_to_preserve": "value_to_preserve", + base.VolumeGenerationAnnotationKey: "1", + }), + }, + SkipNamespaceValidation: true, // WantCreates compare the broker namespace with configmap namespace, so skip it + WantCreates: []runtime.Object{ + NewConfigMap(&configs, nil), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: NewSource( + WithKeyType("string"), + SourceConfigMapUpdatedReady(&configs), + SourceTopicsReady, + SourceDataPlaneAvailable, + ), + }, + }, + }, + { + Name: "Reconciled normal - key type int", + Objects: []runtime.Object{ + NewSourceSinkObject(), + NewSource(WithKeyType("int")), + SourceDispatcherPod(configs.SystemNamespace, map[string]string{ + "annotation_to_preserve": "value_to_preserve", + }), + }, + Key: testKey, + WantUpdates: []clientgotesting.UpdateActionImpl{ + ConfigMapUpdate(&configs, &contract.Contract{ + Generation: 1, + Resources: []*contract.Resource{ + { + Uid: SourceUUID, + Topics: SourceTopics, + BootstrapServers: SourceBootstrapServers, + Egresses: []*contract.Egress{ + { + ConsumerGroup: SourceConsumerGroup, + Destination: ServiceURL, + Uid: SourceUUID, + EgressConfig: &DefaultEgressConfig, + DeliveryOrder: DefaultDeliveryOrder, + KeyType: contract.KeyType_Integer, + }, + }, + Auth: &contract.Resource_AbsentAuth{}, + }, + }, + }), + SourceDispatcherPodUpdate(configs.SystemNamespace, map[string]string{ + "annotation_to_preserve": "value_to_preserve", + base.VolumeGenerationAnnotationKey: "1", + }), + }, + SkipNamespaceValidation: true, // WantCreates compare the broker namespace with configmap namespace, so skip it + WantCreates: []runtime.Object{ + NewConfigMap(&configs, nil), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: NewSource( + WithKeyType("int"), + SourceConfigMapUpdatedReady(&configs), + SourceTopicsReady, + SourceDataPlaneAvailable, + ), + }, + }, + }, + { + Name: "Reconciled normal - key type byte-array", + Objects: []runtime.Object{ + NewSourceSinkObject(), + NewSource(WithKeyType("byte-array")), + SourceDispatcherPod(configs.SystemNamespace, map[string]string{ + "annotation_to_preserve": "value_to_preserve", + }), + }, + Key: testKey, + WantUpdates: []clientgotesting.UpdateActionImpl{ + ConfigMapUpdate(&configs, &contract.Contract{ + Generation: 1, + Resources: []*contract.Resource{ + { + Uid: SourceUUID, + Topics: SourceTopics, + BootstrapServers: SourceBootstrapServers, + Egresses: []*contract.Egress{ + { + ConsumerGroup: SourceConsumerGroup, + Destination: ServiceURL, + Uid: SourceUUID, + EgressConfig: &DefaultEgressConfig, + DeliveryOrder: DefaultDeliveryOrder, + KeyType: contract.KeyType_ByteArray, + }, + }, + Auth: &contract.Resource_AbsentAuth{}, + }, + }, + }), + SourceDispatcherPodUpdate(configs.SystemNamespace, map[string]string{ + "annotation_to_preserve": "value_to_preserve", + base.VolumeGenerationAnnotationKey: "1", + }), + }, + SkipNamespaceValidation: true, // WantCreates compare the broker namespace with configmap namespace, so skip it + WantCreates: []runtime.Object{ + NewConfigMap(&configs, nil), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: NewSource( + WithKeyType("byte-array"), + SourceConfigMapUpdatedReady(&configs), + SourceTopicsReady, + SourceDataPlaneAvailable, + ), + }, + }, + }, + { + Name: "Reconciled normal - key type float", + Objects: []runtime.Object{ + NewSourceSinkObject(), + NewSource(WithKeyType("float")), + SourceDispatcherPod(configs.SystemNamespace, map[string]string{ + "annotation_to_preserve": "value_to_preserve", + }), + }, + Key: testKey, + WantUpdates: []clientgotesting.UpdateActionImpl{ + ConfigMapUpdate(&configs, &contract.Contract{ + Generation: 1, + Resources: []*contract.Resource{ + { + Uid: SourceUUID, + Topics: SourceTopics, + BootstrapServers: SourceBootstrapServers, + Egresses: []*contract.Egress{ + { + ConsumerGroup: SourceConsumerGroup, + Destination: ServiceURL, + Uid: SourceUUID, + EgressConfig: &DefaultEgressConfig, + DeliveryOrder: DefaultDeliveryOrder, + KeyType: contract.KeyType_Double, + }, + }, + Auth: &contract.Resource_AbsentAuth{}, + }, + }, + }), + SourceDispatcherPodUpdate(configs.SystemNamespace, map[string]string{ + "annotation_to_preserve": "value_to_preserve", + base.VolumeGenerationAnnotationKey: "1", + }), + }, + SkipNamespaceValidation: true, // WantCreates compare the broker namespace with configmap namespace, so skip it + WantCreates: []runtime.Object{ + NewConfigMap(&configs, nil), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: NewSource( + WithKeyType("float"), + SourceConfigMapUpdatedReady(&configs), + SourceTopicsReady, + SourceDataPlaneAvailable, + ), + }, + }, + }, } table.Test(t, NewFactory(&configs, func(ctx context.Context, listers *Listers, configs *broker.Configs, row *TableRow) controller.Reconciler { diff --git a/control-plane/pkg/reconciler/testing/objects_source.go b/control-plane/pkg/reconciler/testing/objects_source.go index f882a24518..ccd132251e 100644 --- a/control-plane/pkg/reconciler/testing/objects_source.go +++ b/control-plane/pkg/reconciler/testing/objects_source.go @@ -70,6 +70,15 @@ func NewSource(options ...SourceOption) *sources.KafkaSource { return s } +func WithKeyType(keyType string) SourceOption { + return func(ks *sources.KafkaSource) { + if ks.Labels == nil { + ks.Labels = make(map[string]string, 1) + } + ks.Labels[sources.KafkaKeyTypeLabel] = keyType + } +} + func NewSourceSinkObject() *corev1.Service { return NewService() } diff --git a/data-plane/contract/src/main/java/dev/knative/eventing/kafka/broker/contract/DataPlaneContract.java b/data-plane/contract/src/main/java/dev/knative/eventing/kafka/broker/contract/DataPlaneContract.java index 7a915d5258..554fe3ea8d 100644 --- a/data-plane/contract/src/main/java/dev/knative/eventing/kafka/broker/contract/DataPlaneContract.java +++ b/data-plane/contract/src/main/java/dev/knative/eventing/kafka/broker/contract/DataPlaneContract.java @@ -254,6 +254,132 @@ private DeliveryOrder(int value) { // @@protoc_insertion_point(enum_scope:DeliveryOrder) } + /** + * Protobuf enum {@code KeyType} + */ + public enum KeyType + implements com.google.protobuf.ProtocolMessageEnum { + /** + * String = 0; + */ + String(0), + /** + * Integer = 1; + */ + Integer(1), + /** + * Double = 2; + */ + Double(2), + /** + * ByteArray = 3; + */ + ByteArray(3), + UNRECOGNIZED(-1), + ; + + /** + * String = 0; + */ + public static final int String_VALUE = 0; + /** + * Integer = 1; + */ + public static final int Integer_VALUE = 1; + /** + * Double = 2; + */ + public static final int Double_VALUE = 2; + /** + * ByteArray = 3; + */ + public static final int ByteArray_VALUE = 3; + + + public final int getNumber() { + if (this == UNRECOGNIZED) { + throw new java.lang.IllegalArgumentException( + "Can't get the number of an unknown enum value."); + } + return value; + } + + /** + * @param value The numeric wire value of the corresponding enum entry. + * @return The enum associated with the given numeric wire value. + * @deprecated Use {@link #forNumber(int)} instead. + */ + @java.lang.Deprecated + public static KeyType valueOf(int value) { + return forNumber(value); + } + + /** + * @param value The numeric wire value of the corresponding enum entry. + * @return The enum associated with the given numeric wire value. + */ + public static KeyType forNumber(int value) { + switch (value) { + case 0: return String; + case 1: return Integer; + case 2: return Double; + case 3: return ByteArray; + default: return null; + } + } + + public static com.google.protobuf.Internal.EnumLiteMap + internalGetValueMap() { + return internalValueMap; + } + private static final com.google.protobuf.Internal.EnumLiteMap< + KeyType> internalValueMap = + new com.google.protobuf.Internal.EnumLiteMap() { + public KeyType findValueByNumber(int number) { + return KeyType.forNumber(number); + } + }; + + public final com.google.protobuf.Descriptors.EnumValueDescriptor + getValueDescriptor() { + if (this == UNRECOGNIZED) { + throw new java.lang.IllegalStateException( + "Can't get the descriptor of an unrecognized enum value."); + } + return getDescriptor().getValues().get(ordinal()); + } + public final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptorForType() { + return getDescriptor(); + } + public static final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptor() { + return dev.knative.eventing.kafka.broker.contract.DataPlaneContract.getDescriptor().getEnumTypes().get(2); + } + + private static final KeyType[] VALUES = values(); + + public static KeyType valueOf( + com.google.protobuf.Descriptors.EnumValueDescriptor desc) { + if (desc.getType() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "EnumValueDescriptor is not for this type."); + } + if (desc.getIndex() == -1) { + return UNRECOGNIZED; + } + return VALUES[desc.getIndex()]; + } + + private final int value; + + private KeyType(int value) { + this.value = value; + } + + // @@protoc_insertion_point(enum_scope:KeyType) + } + /** *
    * CloudEvent content mode
@@ -340,7 +466,7 @@ public ContentMode findValueByNumber(int number) {
     }
     public static final com.google.protobuf.Descriptors.EnumDescriptor
         getDescriptor() {
-      return dev.knative.eventing.kafka.broker.contract.DataPlaneContract.getDescriptor().getEnumTypes().get(2);
+      return dev.knative.eventing.kafka.broker.contract.DataPlaneContract.getDescriptor().getEnumTypes().get(3);
     }
 
     private static final ContentMode[] VALUES = values();
@@ -2910,6 +3036,25 @@ public interface EgressOrBuilder extends
      */
     dev.knative.eventing.kafka.broker.contract.DataPlaneContract.DeliveryOrder getDeliveryOrder();
 
+    /**
+     * 
+     * Kafka record key type.
+     * 
+ * + * .KeyType keyType = 10; + * @return The enum numeric value on the wire for keyType. + */ + int getKeyTypeValue(); + /** + *
+     * Kafka record key type.
+     * 
+ * + * .KeyType keyType = 10; + * @return The keyType. + */ + dev.knative.eventing.kafka.broker.contract.DataPlaneContract.KeyType getKeyType(); + public dev.knative.eventing.kafka.broker.contract.DataPlaneContract.Egress.ReplyStrategyCase getReplyStrategyCase(); } /** @@ -2929,6 +3074,7 @@ private Egress() { destination_ = ""; uid_ = ""; deliveryOrder_ = 0; + keyType_ = 0; } @java.lang.Override @@ -3045,6 +3191,12 @@ private Egress( replyStrategyCase_ = 9; break; } + case 80: { + int rawValue = input.readEnum(); + + keyType_ = rawValue; + break; + } default: { if (!parseUnknownField( input, unknownFields, extensionRegistry, tag)) { @@ -3506,6 +3658,33 @@ public dev.knative.eventing.kafka.broker.contract.DataPlaneContract.EgressConfig return result == null ? dev.knative.eventing.kafka.broker.contract.DataPlaneContract.DeliveryOrder.UNRECOGNIZED : result; } + public static final int KEYTYPE_FIELD_NUMBER = 10; + private int keyType_; + /** + *
+     * Kafka record key type.
+     * 
+ * + * .KeyType keyType = 10; + * @return The enum numeric value on the wire for keyType. + */ + @java.lang.Override public int getKeyTypeValue() { + return keyType_; + } + /** + *
+     * Kafka record key type.
+     * 
+ * + * .KeyType keyType = 10; + * @return The keyType. + */ + @java.lang.Override public dev.knative.eventing.kafka.broker.contract.DataPlaneContract.KeyType getKeyType() { + @SuppressWarnings("deprecation") + dev.knative.eventing.kafka.broker.contract.DataPlaneContract.KeyType result = dev.knative.eventing.kafka.broker.contract.DataPlaneContract.KeyType.valueOf(keyType_); + return result == null ? dev.knative.eventing.kafka.broker.contract.DataPlaneContract.KeyType.UNRECOGNIZED : result; + } + private byte memoizedIsInitialized = -1; @java.lang.Override public final boolean isInitialized() { @@ -3547,6 +3726,9 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) if (replyStrategyCase_ == 9) { output.writeMessage(9, (dev.knative.eventing.kafka.broker.contract.DataPlaneContract.Empty) replyStrategy_); } + if (keyType_ != dev.knative.eventing.kafka.broker.contract.DataPlaneContract.KeyType.String.getNumber()) { + output.writeEnum(10, keyType_); + } unknownFields.writeTo(output); } @@ -3588,6 +3770,10 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeMessageSize(9, (dev.knative.eventing.kafka.broker.contract.DataPlaneContract.Empty) replyStrategy_); } + if (keyType_ != dev.knative.eventing.kafka.broker.contract.DataPlaneContract.KeyType.String.getNumber()) { + size += com.google.protobuf.CodedOutputStream + .computeEnumSize(10, keyType_); + } size += unknownFields.getSerializedSize(); memoizedSize = size; return size; @@ -3620,6 +3806,7 @@ public boolean equals(final java.lang.Object obj) { .equals(other.getEgressConfig())) return false; } if (deliveryOrder_ != other.deliveryOrder_) return false; + if (keyType_ != other.keyType_) return false; if (!getReplyStrategyCase().equals(other.getReplyStrategyCase())) return false; switch (replyStrategyCase_) { case 3: @@ -3664,6 +3851,8 @@ public int hashCode() { } hash = (37 * hash) + DELIVERYORDER_FIELD_NUMBER; hash = (53 * hash) + deliveryOrder_; + hash = (37 * hash) + KEYTYPE_FIELD_NUMBER; + hash = (53 * hash) + keyType_; switch (replyStrategyCase_) { case 3: hash = (37 * hash) + REPLYURL_FIELD_NUMBER; @@ -3833,6 +4022,8 @@ public Builder clear() { } deliveryOrder_ = 0; + keyType_ = 0; + replyStrategyCase_ = 0; replyStrategy_ = null; return this; @@ -3892,6 +4083,7 @@ public dev.knative.eventing.kafka.broker.contract.DataPlaneContract.Egress build result.egressConfig_ = egressConfigBuilder_.build(); } result.deliveryOrder_ = deliveryOrder_; + result.keyType_ = keyType_; result.replyStrategyCase_ = replyStrategyCase_; onBuilt(); return result; @@ -3962,6 +4154,9 @@ public Builder mergeFrom(dev.knative.eventing.kafka.broker.contract.DataPlaneCon if (other.deliveryOrder_ != 0) { setDeliveryOrderValue(other.getDeliveryOrderValue()); } + if (other.keyType_ != 0) { + setKeyTypeValue(other.getKeyTypeValue()); + } switch (other.getReplyStrategyCase()) { case REPLYURL: { replyStrategyCase_ = 3; @@ -5154,6 +5349,80 @@ public Builder clearDeliveryOrder() { onChanged(); return this; } + + private int keyType_ = 0; + /** + *
+       * Kafka record key type.
+       * 
+ * + * .KeyType keyType = 10; + * @return The enum numeric value on the wire for keyType. + */ + @java.lang.Override public int getKeyTypeValue() { + return keyType_; + } + /** + *
+       * Kafka record key type.
+       * 
+ * + * .KeyType keyType = 10; + * @param value The enum numeric value on the wire for keyType to set. + * @return This builder for chaining. + */ + public Builder setKeyTypeValue(int value) { + + keyType_ = value; + onChanged(); + return this; + } + /** + *
+       * Kafka record key type.
+       * 
+ * + * .KeyType keyType = 10; + * @return The keyType. + */ + @java.lang.Override + public dev.knative.eventing.kafka.broker.contract.DataPlaneContract.KeyType getKeyType() { + @SuppressWarnings("deprecation") + dev.knative.eventing.kafka.broker.contract.DataPlaneContract.KeyType result = dev.knative.eventing.kafka.broker.contract.DataPlaneContract.KeyType.valueOf(keyType_); + return result == null ? dev.knative.eventing.kafka.broker.contract.DataPlaneContract.KeyType.UNRECOGNIZED : result; + } + /** + *
+       * Kafka record key type.
+       * 
+ * + * .KeyType keyType = 10; + * @param value The keyType to set. + * @return This builder for chaining. + */ + public Builder setKeyType(dev.knative.eventing.kafka.broker.contract.DataPlaneContract.KeyType value) { + if (value == null) { + throw new NullPointerException(); + } + + keyType_ = value.getNumber(); + onChanged(); + return this; + } + /** + *
+       * Kafka record key type.
+       * 
+ * + * .KeyType keyType = 10; + * @return This builder for chaining. + */ + public Builder clearKeyType() { + + keyType_ = 0; + onChanged(); + return this; + } @java.lang.Override public final Builder setUnknownFields( final com.google.protobuf.UnknownFieldSet unknownFields) { @@ -11657,31 +11926,34 @@ public dev.knative.eventing.kafka.broker.contract.DataPlaneContract.Contract get "e\030\002 \001(\t:\0028\001\"\177\n\014EgressConfig\022\022\n\ndeadLette" + "r\030\001 \001(\t\022\r\n\005retry\030\002 \001(\r\022%\n\rbackoffPolicy\030" + "\003 \001(\0162\016.BackoffPolicy\022\024\n\014backoffDelay\030\004 " + - "\001(\004\022\017\n\007timeout\030\005 \001(\004\"\223\002\n\006Egress\022\025\n\rconsu" + + "\001(\004\022\017\n\007timeout\030\005 \001(\004\"\256\002\n\006Egress\022\025\n\rconsu" + "merGroup\030\001 \001(\t\022\023\n\013destination\030\002 \001(\t\022\022\n\010r" + "eplyUrl\030\003 \001(\tH\000\022&\n\024replyToOriginalTopic\030" + "\004 \001(\0132\006.EmptyH\000\022\036\n\014discardReply\030\t \001(\0132\006." + "EmptyH\000\022\027\n\006filter\030\005 \001(\0132\007.Filter\022\013\n\003uid\030" + "\006 \001(\t\022#\n\014egressConfig\030\007 \001(\0132\r.EgressConf" + "ig\022%\n\rdeliveryOrder\030\010 \001(\0162\016.DeliveryOrde" + - "rB\017\n\rreplyStrategy\"[\n\007Ingress\022!\n\013content" + - "Mode\030\001 \001(\0162\014.ContentMode\022\016\n\004path\030\002 \001(\tH\000" + - "\022\016\n\004host\030\003 \001(\tH\000B\r\n\013ingressType\"K\n\tRefer" + - "ence\022\014\n\004uuid\030\001 \001(\t\022\021\n\tnamespace\030\002 \001(\t\022\014\n" + - "\004name\030\003 \001(\t\022\017\n\007version\030\004 \001(\t\"\344\001\n\010Resourc" + - "e\022\013\n\003uid\030\001 \001(\t\022\016\n\006topics\030\002 \003(\t\022\030\n\020bootst" + - "rapServers\030\003 \001(\t\022\031\n\007ingress\030\004 \001(\0132\010.Ingr" + - "ess\022#\n\014egressConfig\030\005 \001(\0132\r.EgressConfig" + - "\022\031\n\010egresses\030\006 \003(\0132\007.Egress\022\034\n\nabsentAut" + - "h\030\007 \001(\0132\006.EmptyH\000\022 \n\nauthSecret\030\010 \001(\0132\n." + - "ReferenceH\000B\006\n\004Auth\"<\n\010Contract\022\022\n\ngener" + - "ation\030\001 \001(\004\022\034\n\tresources\030\002 \003(\0132\t.Resourc" + - "e*,\n\rBackoffPolicy\022\017\n\013Exponential\020\000\022\n\n\006L" + - "inear\020\001*+\n\rDeliveryOrder\022\r\n\tUNORDERED\020\000\022" + - "\013\n\007ORDERED\020\001*)\n\013ContentMode\022\n\n\006BINARY\020\000\022" + - "\016\n\nSTRUCTURED\020\001B[\n*dev.knative.eventing." + - "kafka.broker.contractB\021DataPlaneContract" + - "Z\032control-plane/pkg/contractb\006proto3" + "r\022\031\n\007keyType\030\n \001(\0162\010.KeyTypeB\017\n\rreplyStr" + + "ategy\"[\n\007Ingress\022!\n\013contentMode\030\001 \001(\0162\014." + + "ContentMode\022\016\n\004path\030\002 \001(\tH\000\022\016\n\004host\030\003 \001(" + + "\tH\000B\r\n\013ingressType\"K\n\tReference\022\014\n\004uuid\030" + + "\001 \001(\t\022\021\n\tnamespace\030\002 \001(\t\022\014\n\004name\030\003 \001(\t\022\017" + + "\n\007version\030\004 \001(\t\"\344\001\n\010Resource\022\013\n\003uid\030\001 \001(" + + "\t\022\016\n\006topics\030\002 \003(\t\022\030\n\020bootstrapServers\030\003 " + + "\001(\t\022\031\n\007ingress\030\004 \001(\0132\010.Ingress\022#\n\014egress" + + "Config\030\005 \001(\0132\r.EgressConfig\022\031\n\010egresses\030" + + "\006 \003(\0132\007.Egress\022\034\n\nabsentAuth\030\007 \001(\0132\006.Emp" + + "tyH\000\022 \n\nauthSecret\030\010 \001(\0132\n.ReferenceH\000B\006" + + "\n\004Auth\"<\n\010Contract\022\022\n\ngeneration\030\001 \001(\004\022\034" + + "\n\tresources\030\002 \003(\0132\t.Resource*,\n\rBackoffP" + + "olicy\022\017\n\013Exponential\020\000\022\n\n\006Linear\020\001*+\n\rDe" + + "liveryOrder\022\r\n\tUNORDERED\020\000\022\013\n\007ORDERED\020\001*" + + "=\n\007KeyType\022\n\n\006String\020\000\022\013\n\007Integer\020\001\022\n\n\006D" + + "ouble\020\002\022\r\n\tByteArray\020\003*)\n\013ContentMode\022\n\n" + + "\006BINARY\020\000\022\016\n\nSTRUCTURED\020\001B[\n*dev.knative" + + ".eventing.kafka.broker.contractB\021DataPla" + + "neContractZ\032control-plane/pkg/contractb\006" + + "proto3" }; descriptor = com.google.protobuf.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, @@ -11716,7 +11988,7 @@ public dev.knative.eventing.kafka.broker.contract.DataPlaneContract.Contract get internal_static_Egress_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_Egress_descriptor, - new java.lang.String[] { "ConsumerGroup", "Destination", "ReplyUrl", "ReplyToOriginalTopic", "DiscardReply", "Filter", "Uid", "EgressConfig", "DeliveryOrder", "ReplyStrategy", }); + new java.lang.String[] { "ConsumerGroup", "Destination", "ReplyUrl", "ReplyToOriginalTopic", "DiscardReply", "Filter", "Uid", "EgressConfig", "DeliveryOrder", "KeyType", "ReplyStrategy", }); internal_static_Ingress_descriptor = getDescriptor().getMessageTypes().get(4); internal_static_Ingress_fieldAccessorTable = new diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/reconciler/impl/ResourcesReconcilerImpl.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/reconciler/impl/ResourcesReconcilerImpl.java index bc83014429..86c20d33bb 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/reconciler/impl/ResourcesReconcilerImpl.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/reconciler/impl/ResourcesReconcilerImpl.java @@ -22,6 +22,7 @@ import dev.knative.eventing.kafka.broker.core.utils.CollectionsUtils; import io.vertx.core.CompositeFuture; import io.vertx.core.Future; + import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; import java.util.Collection; @@ -31,6 +32,7 @@ import java.util.Objects; import java.util.function.Function; import java.util.stream.Collectors; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -243,7 +245,8 @@ private boolean egressEquals(DataPlaneContract.Egress e1, DataPlaneContract.Egre && Objects.equals(e1.getReplyUrl(), e2.getReplyUrl()) && Objects.equals(e1.getReplyToOriginalTopic(), e2.getReplyToOriginalTopic()) && Objects.equals(e1.getEgressConfig(), e2.getEgressConfig()) - && Objects.equals(e1.getFilter(), e2.getFilter()); + && Objects.equals(e1.getFilter(), e2.getFilter()) + && Objects.equals(e1.getKeyType(), e2.getKeyType()); } private static void logFailure(final String msg, final DataPlaneContract.Egress egress, final Throwable cause) { diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/RecordDispatcher.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/RecordDispatcher.java index c80eed25d4..b264922e3c 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/RecordDispatcher.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/RecordDispatcher.java @@ -32,6 +32,6 @@ public interface RecordDispatcher extends AsyncCloseable { * @param record record to handle. * @return the completion future. */ - Future dispatch(KafkaConsumerRecord record); + Future dispatch(KafkaConsumerRecord record); } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java index 981121db35..0896d038fc 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java @@ -46,8 +46,8 @@ public class RecordDispatcherImpl implements RecordDispatcher { private static final Logger logger = LoggerFactory.getLogger(RecordDispatcherImpl.class); private final Filter filter; - private final Function, Future> subscriberSender; - private final Function, Future> dlsSender; + private final Function, Future> subscriberSender; + private final Function, Future> dlsSender; private final RecordDispatcherListener recordDispatcherListener; private final AsyncCloseable closeable; private final ConsumerTracer consumerTracer; @@ -89,7 +89,7 @@ public RecordDispatcherImpl( * @param record record to handle. */ @Override - public Future dispatch(KafkaConsumerRecord record) { + public Future dispatch(KafkaConsumerRecord record) { Promise promise = Promise.promise(); /* @@ -119,7 +119,7 @@ public Future dispatch(KafkaConsumerRecord record) { return promise.future(); } - private void onRecordReceived(final KafkaConsumerRecord record, Promise finalProm) { + private void onRecordReceived(final KafkaConsumerRecord record, Promise finalProm) { logDebug("Handling record", record); // Trace record received event @@ -151,35 +151,35 @@ private void onRecordReceived(final KafkaConsumerRecord reco .onFailure(finalProm::fail); // This should really never happen } - private void onFilterMatching(final KafkaConsumerRecord record, final Promise finalProm) { + private void onFilterMatching(final KafkaConsumerRecord record, final Promise finalProm) { logDebug("Record match filtering", record); subscriberSender.apply(record) .onSuccess(res -> onSubscriberSuccess(record, finalProm)) .onFailure(ex -> onSubscriberFailure(record, finalProm)); } - private void onFilterNotMatching(final KafkaConsumerRecord record, + private void onFilterNotMatching(final KafkaConsumerRecord record, final Promise finalProm) { logDebug("Record doesn't match filtering", record); recordDispatcherListener.recordDiscarded(record) .onComplete(finalProm); } - private void onSubscriberSuccess(final KafkaConsumerRecord record, + private void onSubscriberSuccess(final KafkaConsumerRecord record, final Promise finalProm) { logDebug("Successfully sent event to subscriber", record); recordDispatcherListener.successfullySentToSubscriber(record) .onComplete(finalProm); } - private void onSubscriberFailure(final KafkaConsumerRecord record, + private void onSubscriberFailure(final KafkaConsumerRecord record, final Promise finalProm) { dlsSender.apply(record) .onSuccess(v -> onDeadLetterSinkSuccess(record, finalProm)) .onFailure(ex -> onDeadLetterSinkFailure(record, ex, finalProm)); } - private void onDeadLetterSinkSuccess(final KafkaConsumerRecord record, + private void onDeadLetterSinkSuccess(final KafkaConsumerRecord record, final Promise finalProm) { logDebug("Successfully sent event to the dead letter sink", record); recordDispatcherListener.successfullySentToDeadLetterSink(record) @@ -187,13 +187,13 @@ private void onDeadLetterSinkSuccess(final KafkaConsumerRecord record, final Throwable exception, + private void onDeadLetterSinkFailure(final KafkaConsumerRecord record, final Throwable exception, final Promise finalProm) { recordDispatcherListener.failedToSendToDeadLetterSink(record, exception) .onComplete(finalProm); } - private static Function, Future> composeSenderAndSinkHandler( + private static Function, Future> composeSenderAndSinkHandler( CloudEventSender sender, ResponseHandler sinkHandler, String senderType) { return rec -> sender.send(rec.value()) .onFailure(ex -> logError("Failed to send event to " + senderType, rec, ex)) @@ -205,7 +205,7 @@ private static Function, Future> c private static void logError( final String msg, - final KafkaConsumerRecord record, + final KafkaConsumerRecord record, final Throwable cause) { if (logger.isDebugEnabled()) { @@ -229,13 +229,14 @@ private static void logError( private static void logDebug( final String msg, - final KafkaConsumerRecord record) { + final KafkaConsumerRecord record) { - logger.debug(msg + " {} {} {} {} {}", + logger.debug(msg + " {} {} {} {} {} {}", keyValue("topic", record.topic()), keyValue("partition", record.partition()), keyValue("headers", record.headers()), keyValue("offset", record.offset()), + keyValue("key", record.key()), keyValue("event", record.value()) ); } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/BaseConsumerVerticle.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/BaseConsumerVerticle.java index d8f7754ffb..12fac0eb29 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/BaseConsumerVerticle.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/BaseConsumerVerticle.java @@ -39,7 +39,7 @@ public interface Initializer extends BiFunction topics; - KafkaConsumer consumer; + KafkaConsumer consumer; RecordDispatcher recordDispatcher; private AsyncCloseable closeable; @@ -74,7 +74,7 @@ public void stop(Promise stopPromise) { ).close(stopPromise); } - public void setConsumer(KafkaConsumer consumer) { + public void setConsumer(KafkaConsumer consumer) { this.consumer = consumer; } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/InvalidCloudEventInterceptor.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/InvalidCloudEventInterceptor.java index 12d1a1a396..88bc903b1d 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/InvalidCloudEventInterceptor.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/InvalidCloudEventInterceptor.java @@ -18,7 +18,6 @@ import io.cloudevents.CloudEvent; import io.cloudevents.core.v1.CloudEventBuilder; -import io.cloudevents.kafka.PartitionKeyExtensionInterceptor; import org.apache.kafka.clients.consumer.ConsumerInterceptor; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -39,6 +38,7 @@ import java.util.stream.Stream; import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue; +import static io.cloudevents.kafka.PartitionKeyExtensionInterceptor.PARTITION_KEY_EXTENSION; /** * The {@link InvalidCloudEventInterceptor} is a {@link ConsumerInterceptor}. @@ -51,7 +51,7 @@ *

* For this reason this interceptor is capable of creating a CloudEvent from {@link ConsumerRecord} metadata and data. */ -public class InvalidCloudEventInterceptor implements ConsumerInterceptor { +public class InvalidCloudEventInterceptor implements ConsumerInterceptor { private static final Logger logger = LoggerFactory.getLogger(InvalidCloudEventInterceptor.class); @@ -61,6 +61,8 @@ public class InvalidCloudEventInterceptor implements ConsumerInterceptor configs) { } @Override - public ConsumerRecords onConsume(final ConsumerRecords records) { + public ConsumerRecords onConsume(final ConsumerRecords records) { if (!this.isEnabled || records == null || records.isEmpty()) { return records; } - final Map>> validRecords = new HashMap<>(records.count()); + final Map>> validRecords = new HashMap<>(records.count()); for (final var record : records) { final var tp = new TopicPartition(record.topic(), record.partition()); if (!validRecords.containsKey(tp)) { @@ -128,7 +130,7 @@ private static String getOrDefault(final Map configs, final String ke return null; } - private ConsumerRecord validRecord(final ConsumerRecord record) { + private ConsumerRecord validRecord(final ConsumerRecord record) { if (!(record.value() instanceof InvalidCloudEvent)) { return record; // Valid CloudEvent } @@ -141,8 +143,9 @@ private ConsumerRecord validRecord(final ConsumerRecord validRecord(final ConsumerRecord= 'a' && c <= 'z') || (c >= '0' && c <= '9'); } - private static String subject(final ConsumerRecord record) { + private static String subject(final ConsumerRecord record) { return "partition:" + record.partition() + "#" + record.offset(); } - private URI source(final ConsumerRecord record) { + private URI source(final ConsumerRecord record) { return URI.create( "/apis/v1/namespaces/" + this.sourceNamespace + @@ -214,11 +235,11 @@ private static String type() { return TYPE; } - private static OffsetDateTime time(final ConsumerRecord record) { + private static OffsetDateTime time(final ConsumerRecord record) { return OffsetDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), ZoneId.of("UTC")); } - private static String id(final ConsumerRecord record) { + private static String id(final ConsumerRecord record) { return "partition:" + record.partition() + "/offset:" + record.offset(); } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/KeyDeserializer.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/KeyDeserializer.java new file mode 100644 index 0000000000..f8502e5d4e --- /dev/null +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/KeyDeserializer.java @@ -0,0 +1,107 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer; + +import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.DoubleDeserializer; +import org.apache.kafka.common.serialization.FloatDeserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.util.Map; + +/** + * Deserializer for the key of Kafka records. + *

+ * It uses the configuration value associated with {@link KeyDeserializer.KEY_TYPE} to deserialize the key of a consumer record. + *

+ * Internally, it uses Kafka client provided deserializers for deserialization. + *

+ * - When {@code KEY_TYPE = String} it uses {@link StringDeserializer}. + *

+ * - When {@code KEY_TYPE = Integer} it uses {@link IntegerDeserializer} and falls back to {@link StringDeserializer} + * if the {@code byte[]} is not 4 bytes long. + *

+ * - When {@code KEY_TYPE = Double} it uses {@link FloatDeserializer} if the key is 4 bytes long, + * {@link DoubleDeserializer} if the key is 8 bytes long and falls back to {@link StringDeserializer} in any other case. + *

+ * - When {@code KEY_TYPE = ByteArray} it just returns the key. + *

+ * - When {@code KEY_TYPE = UNRECOGNIZED or unspecified} it uses {@link StringDeserializer}. + */ +public class KeyDeserializer implements Deserializer { + + public static final String KEY_TYPE = "cloudevent.key.deserializer.type"; + + private DataPlaneContract.KeyType keyType; + + private FloatDeserializer floatDeserializer; + private DoubleDeserializer doubleDeserializer; + private StringDeserializer stringDeserializer; + private IntegerDeserializer integerDeserializer; + + @Override + public void configure(Map configs, boolean isKey) { + if (isKey && configs.containsKey(KEY_TYPE)) { + final var keyType = configs.get(KEY_TYPE); + if (keyType != null) { + this.keyType = (DataPlaneContract.KeyType) keyType; + } + } + + floatDeserializer = new FloatDeserializer(); + floatDeserializer.configure(configs, isKey); + doubleDeserializer = new DoubleDeserializer(); + doubleDeserializer.configure(configs, isKey); + stringDeserializer = new StringDeserializer(); + stringDeserializer.configure(configs, isKey); + integerDeserializer = new IntegerDeserializer(); + integerDeserializer.configure(configs, isKey); + } + + @Override + public Object deserialize(final String topic, final byte[] data) { + if (keyType == null) { + return stringDeserializer.deserialize(topic, data); + } + return switch (keyType) { + case Double -> deserializeFloatingPoint(topic, data); + case Integer -> deserializeInteger(topic, data); + case ByteArray -> data; + default -> stringDeserializer.deserialize(topic, data); + }; + } + + private Object deserializeInteger(String topic, byte[] data) { + if (data.length == 4) { + return integerDeserializer.deserialize(topic, data); + } + // Fall back to string deserializer. + return stringDeserializer.deserialize(topic, data); + } + + private Object deserializeFloatingPoint(String topic, byte[] data) { + if (data.length == 4) { + return floatDeserializer.deserialize(topic, data); + } + if (data.length == 8) { + return doubleDeserializer.deserialize(topic, data); + } + // Fall back to string deserializer. + return stringDeserializer.deserialize(topic, data); + } +} diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java index 6779e864ed..ca78dab17a 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java @@ -110,17 +110,17 @@ public void stop(Promise stopPromise) { super.stop(stopPromise); } - void recordsHandler(KafkaConsumerRecords records) { + void recordsHandler(KafkaConsumerRecords records) { // Put records in queues // I assume the records are ordered per topic-partition for (int i = 0; i < records.size(); i++) { this.pendingRecords++; - KafkaConsumerRecord record = records.recordAt(i); + KafkaConsumerRecord record = records.recordAt(i); this.enqueueRecord(new TopicPartition(record.topic(), record.partition()), record); } } - void enqueueRecord(TopicPartition topicPartition, KafkaConsumerRecord record) { + void enqueueRecord(TopicPartition topicPartition, KafkaConsumerRecord record) { this.recordDispatcherExecutors.computeIfAbsent(topicPartition, (tp) -> new OrderedAsyncExecutor()) .offer(() -> this.recordDispatcher.dispatch(record).onComplete(v -> this.pendingRecords--)); } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImpl.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImpl.java index 5aa50b0234..dca520518e 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImpl.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImpl.java @@ -33,6 +33,7 @@ import dev.knative.eventing.kafka.broker.dispatcher.impl.RecordDispatcherImpl; import dev.knative.eventing.kafka.broker.dispatcher.impl.WebClientCloudEventSender; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.BaseConsumerVerticle; +import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KeyDeserializer; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.OrderedConsumerVerticle; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.OrderedOffsetManager; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.UnorderedConsumerVerticle; @@ -131,6 +132,7 @@ public AbstractVerticle get(final DataPlaneContract.Resource resource, final Dat final var consumerConfigs = new HashMap<>(this.consumerConfigs); consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, resource.getBootstrapServers()); consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, egress.getConsumerGroup()); + consumerConfigs.put(KeyDeserializer.KEY_TYPE, egress.getKeyType()); final var producerConfigs = new HashMap<>(this.producerConfigs); producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, resource.getBootstrapServers()); @@ -145,7 +147,7 @@ public AbstractVerticle get(final DataPlaneContract.Resource resource, final Dat KafkaClientsAuth.attachCredentials(consumerConfigs, credentials); KafkaClientsAuth.attachCredentials(producerConfigs, credentials); - KafkaConsumer consumer = createConsumer(vertx, consumerConfigs); + final KafkaConsumer consumer = createConsumer(vertx, consumerConfigs); AutoCloseable metricsCloser = Metrics.register(consumer.unwrap()); final var egressConfig = @@ -216,7 +218,7 @@ protected KafkaProducer createProducer(final Vertx vertx, return KafkaProducer.create(vertx, producerProperties); } - protected KafkaConsumer createConsumer(final Vertx vertx, + protected KafkaConsumer createConsumer(final Vertx vertx, final Map consumerConfigs) { return KafkaConsumer.create( vertx, diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java index 9888485f51..ee028f2158 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java @@ -26,6 +26,7 @@ import dev.knative.eventing.kafka.broker.core.utils.Shutdown; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.CloudEventDeserializer; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEventInterceptor; +import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KeyDeserializer; import io.cloudevents.kafka.CloudEventSerializer; import io.cloudevents.kafka.PartitionKeyExtensionInterceptor; import io.opentelemetry.sdk.OpenTelemetrySdk; @@ -43,6 +44,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,9 +66,11 @@ public static void main(final String[] args) throws IOException { // Read consumer and producer kafka config Properties producerConfig = Configurations.readPropertiesSync(env.getProducerConfigFilePath()); + producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class.getName()); producerConfig.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, PartitionKeyExtensionInterceptor.class.getName()); Properties consumerConfig = Configurations.readPropertiesSync(env.getConsumerConfigFilePath()); + consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KeyDeserializer.class.getName()); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CloudEventDeserializer.class.getName()); consumerConfig.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, InvalidCloudEventInterceptor.class.getName()); diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java index 870d8d55bc..213ca00386 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java @@ -231,7 +231,7 @@ public void shouldCloseSinkResponseHandlerSubscriberSenderAndDeadLetterSinkSende })); } - private static KafkaConsumerRecord record() { + private static KafkaConsumerRecord record() { return new KafkaConsumerRecordImpl<>(new ConsumerRecord<>("", 0, 0L, "", CoreObjects.event())); } diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractConsumerVerticleTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractConsumerVerticleTest.java index d729900d14..4f2051c6cd 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractConsumerVerticleTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractConsumerVerticleTest.java @@ -52,9 +52,8 @@ public abstract class AbstractConsumerVerticleTest { @Test - @SuppressWarnings("unchecked") public void subscribedToTopic(final Vertx vertx, final VertxTestContext context) { - final var consumer = new MockConsumer(OffsetResetStrategy.LATEST); + final var consumer = new MockConsumer(OffsetResetStrategy.LATEST); final var recordDispatcher = new RecordDispatcherImpl( value -> false, CloudEventSender.noop("subscriber send called"), @@ -88,9 +87,8 @@ public void subscribedToTopic(final Vertx vertx, final VertxTestContext context) } @Test - @SuppressWarnings("unchecked") public void stop(final Vertx vertx, final VertxTestContext context) { - final var consumer = new MockConsumer(OffsetResetStrategy.LATEST); + final var consumer = new MockConsumer(OffsetResetStrategy.LATEST); final var recordDispatcher = new RecordDispatcherImpl( value -> false, CloudEventSender.noop("subscriber send called"), @@ -132,7 +130,7 @@ public void stop(final Vertx vertx, final VertxTestContext context) { @SuppressWarnings("unchecked") public void shouldCloseEverything(final Vertx vertx, final VertxTestContext context) { final var topics = new String[] {"a"}; - final KafkaConsumer consumer = mock(KafkaConsumer.class); + final KafkaConsumer consumer = mock(KafkaConsumer.class); when(consumer.close()).thenReturn(Future.succeededFuture()); when(consumer.subscribe((Set) any(), any())).then(answer -> { @@ -149,7 +147,7 @@ public void shouldCloseEverything(final Vertx vertx, final VertxTestContext cont return promise.future(); }); - final var mockConsumer = new MockConsumer(OffsetResetStrategy.LATEST); + final var mockConsumer = new MockConsumer(OffsetResetStrategy.LATEST); when(consumer.unwrap()).thenReturn(mockConsumer); mockConsumer.schedulePollTask(() -> { diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/InvalidCloudEventInterceptorTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/InvalidCloudEventInterceptorTest.java index fa6cf65e1b..e3b81697ca 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/InvalidCloudEventInterceptorTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/InvalidCloudEventInterceptorTest.java @@ -178,7 +178,7 @@ public void shouldTransformEventsToValidCloudEvents() { for (final var r : input) { var tp = new TopicPartition(r.topic(), r.partition()); var inputRecords = input.records(tp); - var expected = new ArrayList>(); + var expected = new ArrayList>(); for (var i : inputRecords) { var value = CloudEventBuilder.v1() .withId(String.format("partition:%d/offset:%d", i.partition(), i.offset())) @@ -193,11 +193,12 @@ public void shouldTransformEventsToValidCloudEvents() { )) .withSubject(String.format("partition:%d#%d", i.partition(), i.offset())) .withType("dev.knative.kafka.event") - .withExtension("partitionkey", i.key()) .withData(BytesCloudEventData.wrap(new byte[]{1})); i.headers().forEach(h -> value.withExtension(h.key(), h.value())); + setKeys(value, i); + expected.add(new ConsumerRecord<>( i.topic(), i.partition(), @@ -221,7 +222,23 @@ public void shouldTransformEventsToValidCloudEvents() { } } - private void assertConsumerRecordEquals(ConsumerRecord actual, ConsumerRecord expected) { + private void setKeys(io.cloudevents.core.v1.CloudEventBuilder value, ConsumerRecord i) { + if (i.key() instanceof Number) { + value.withExtension("partitionkey", (Number) i.key()); + value.withExtension("key", (Number) i.key()); + } else if (i.key() instanceof String) { + value.withExtension("partitionkey", i.key().toString()); + value.withExtension("key", i.key().toString()); + } else if (i.key() instanceof byte[]) { + value.withExtension("partitionkey", (byte[]) i.key()); + value.withExtension("key", (byte[]) i.key()); + } else { + throw new IllegalArgumentException("unknown type for key: " + i.key()); + } + } + + private void assertConsumerRecordEquals(final ConsumerRecord actual, + final ConsumerRecord expected) { assertThat(actual.topic()).isEqualTo(expected.topic()); assertThat(actual.partition()).isEqualTo(expected.partition()); assertThat(actual.offset()).isEqualTo(expected.offset()); @@ -257,19 +274,21 @@ public void shouldNotThrowExceptionDuringLifecycle() { assertDoesNotThrow(interceptor::close); } - private static ConsumerRecords mockValidRecords() { + private static ConsumerRecords mockValidRecords() { return new ConsumerRecords<>(Map.of( new TopicPartition("t1", 0), List.of(new ConsumerRecord<>("t1", 0, 0, "a", event)) )); } - private static ConsumerRecords mockInvalidRecords() { + private static ConsumerRecords mockInvalidRecords() { return new ConsumerRecords<>(Map.of( new TopicPartition("t1", 0), List.of( new ConsumerRecord<>("t1", 0, 0, "a", new InvalidCloudEvent(new byte[]{1})), - new ConsumerRecord<>("t1", 0, 1, "a", new InvalidCloudEvent(new byte[]{1})) + new ConsumerRecord<>("t1", 0, 1, "a", new InvalidCloudEvent(new byte[]{1})), + new ConsumerRecord<>("t1", 0, 1, new byte[]{1, 2, 3, 4}, new InvalidCloudEvent(new byte[]{1})), + new ConsumerRecord<>("t1", 0, 1, 4, new InvalidCloudEvent(new byte[]{1})) ), new TopicPartition("t1", 1), List.of(new ConsumerRecord<>( diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/KeyDeserializerTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/KeyDeserializerTest.java new file mode 100644 index 0000000000..817e60ad95 --- /dev/null +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/KeyDeserializerTest.java @@ -0,0 +1,168 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer; + +import com.google.common.base.Charsets; +import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; +import org.apache.kafka.common.serialization.DoubleDeserializer; +import org.apache.kafka.common.serialization.FloatDeserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; + +import static dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KeyDeserializer.KEY_TYPE; +import static org.assertj.core.api.Assertions.assertThat; + +public class KeyDeserializerTest { + + @Test + public void shouldDeserializeAsUTF8StringWhenNoKeyTypeSpecified() { + final var deserializer = new KeyDeserializer(); + + final var configs = new HashMap(); + deserializer.configure(configs, true); + + final var data = new byte[]{1, 2, 3}; + final var got = deserializer.deserialize("t1", data); + + assertThat(got).isInstanceOf(String.class); + assertThat(got.toString()).isEqualTo(new String(data, Charsets.UTF_8)); + } + + @Test + public void shouldDeserializeAsUTF8StringWhenUnrecognizedKeyTypeSpecified() { + final var deserializer = new KeyDeserializer(); + + final var configs = new HashMap(); + configs.put(KEY_TYPE, DataPlaneContract.KeyType.UNRECOGNIZED); + deserializer.configure(configs, true); + + final var data = new byte[]{1, 2, 3}; + final var got = deserializer.deserialize("t1", data); + + assertThat(got).isInstanceOf(String.class); + assertThat(got).isEqualTo(new String(data, Charsets.UTF_8)); + } + + @Test + public void shouldDeserializeAsUTF8StringWhenStringKeyTypeSpecified() { + final var deserializer = new KeyDeserializer(); + + final var configs = new HashMap(); + configs.put(KEY_TYPE, DataPlaneContract.KeyType.String); + deserializer.configure(configs, true); + + final var data = new byte[]{1, 2, 3}; + final var got = deserializer.deserialize("t1", data); + + assertThat(got).isInstanceOf(String.class); + assertThat(got).isEqualTo(new String(data, Charsets.UTF_8)); + } + + @Test + public void shouldDeserializeDouble() { + final var deserializer = new KeyDeserializer(); + + final var configs = new HashMap(); + configs.put(KEY_TYPE, DataPlaneContract.KeyType.Double); + deserializer.configure(configs, true); + + final var data = new byte[]{1, 2, 3, 4, 5, 6, 7, 8}; + final var got = deserializer.deserialize("t1", data); + + assertThat(got).isInstanceOf(Double.class); + assertThat(got).isEqualTo(new DoubleDeserializer().deserialize("t1", data)); + } + + @Test + public void shouldDeserializeFloat() { + final var deserializer = new KeyDeserializer(); + + final var configs = new HashMap(); + configs.put(KEY_TYPE, DataPlaneContract.KeyType.Double); + deserializer.configure(configs, true); + + final var data = new byte[]{1, 2, 3, 4}; + final var got = deserializer.deserialize("t1", data); + + assertThat(got).isInstanceOf(Float.class); + assertThat(got).isEqualTo(new FloatDeserializer().deserialize("t1", data)); + } + + @Test + public void shouldDeserializeAsUTF8StringWhenWrongSizeForDouble() { + final var deserializer = new KeyDeserializer(); + + final var configs = new HashMap(); + configs.put(KEY_TYPE, DataPlaneContract.KeyType.Double); + deserializer.configure(configs, true); + + final var data = new byte[]{1, 2, 3, 4, 5, 6, 7}; + final var got = deserializer.deserialize("t1", data); + + assertThat(got).isInstanceOf(String.class); + assertThat(got).isEqualTo(new StringDeserializer().deserialize("t1", data)); + } + + + @Test + public void shouldDeserializeInteger() { + final var deserializer = new KeyDeserializer(); + + final var configs = new HashMap(); + configs.put(KEY_TYPE, DataPlaneContract.KeyType.Integer); + deserializer.configure(configs, true); + + final var data = new byte[]{1, 2, 3, 4}; + final var got = deserializer.deserialize("t1", data); + + assertThat(got).isInstanceOf(Integer.class); + assertThat(got).isEqualTo(new IntegerDeserializer().deserialize("t1", data)); + } + + @Test + public void shouldDeserializeAsUTF8StringWhenWrongSizeForInteger() { + final var deserializer = new KeyDeserializer(); + + final var configs = new HashMap(); + configs.put(KEY_TYPE, DataPlaneContract.KeyType.Integer); + deserializer.configure(configs, true); + + final var data = new byte[]{1, 2, 3}; + final var got = deserializer.deserialize("t1", data); + + assertThat(got).isInstanceOf(String.class); + assertThat(got).isEqualTo(new StringDeserializer().deserialize("t1", data)); + } + + + @Test + public void shouldDeserializeByteArray() { + final var deserializer = new KeyDeserializer(); + + final var configs = new HashMap(); + configs.put(KEY_TYPE, DataPlaneContract.KeyType.ByteArray); + deserializer.configure(configs, true); + + final var data = new byte[]{1, 2, 3, 4}; + final var got = deserializer.deserialize("t1", data); + + assertThat(got).isInstanceOf(byte[].class); + assertThat(got).isEqualTo(data); + } +} diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticleTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticleTest.java index 4f625ef81a..3a83b17ef9 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticleTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticleTest.java @@ -76,7 +76,7 @@ public void consumeOneByOne(final long delay, final int tasks, final int partiti final Vertx vertx) throws InterruptedException { final var topic = "topic1"; final Random random = new Random(); - final var consumer = new MockConsumer(OffsetResetStrategy.LATEST); + final var consumer = new MockConsumer(OffsetResetStrategy.LATEST); // Mock the record dispatcher to count down the latch and save the received records order CountDownLatch latch = new CountDownLatch(tasks); @@ -165,7 +165,7 @@ BaseConsumerVerticle createConsumerVerticle( return new OrderedConsumerVerticle(initializer, topics); } - protected static ConsumerRecord record(String topic, int partition, long offset) { + protected static ConsumerRecord record(String topic, int partition, long offset) { return new ConsumerRecord<>( topic, partition, diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/ConsumerVerticleFactoryImplMock.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/ConsumerVerticleFactoryImplMock.java index f93b025965..dcf0828ff7 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/ConsumerVerticleFactoryImplMock.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/ConsumerVerticleFactoryImplMock.java @@ -71,9 +71,9 @@ protected KafkaProducer createProducer(Vertx vertx, } @Override - protected KafkaConsumer createConsumer(Vertx vertx, + protected KafkaConsumer createConsumer(Vertx vertx, Map consumerConfigs) { - final var consumer = new MockConsumer(OffsetResetStrategy.LATEST); + final var consumer = new MockConsumer(OffsetResetStrategy.LATEST); consumer.schedulePollTask(() -> { consumer.unsubscribe(); diff --git a/proto/contract.proto b/proto/contract.proto index c161184694..96faf20230 100644 --- a/proto/contract.proto +++ b/proto/contract.proto @@ -57,6 +57,13 @@ enum DeliveryOrder { ORDERED = 1; } +enum KeyType { + String = 0; + Integer = 1; + Double = 2; + ByteArray = 3; +} + message Egress { // consumer group name string consumerGroup = 1; @@ -88,6 +95,9 @@ message Egress { // Delivery guarantee to use // Empty defaults to unordered DeliveryOrder deliveryOrder = 8; + + // Kafka record key type. + KeyType keyType = 10; } // CloudEvent content mode