From d53b7fd87abdf3bde2f5c0e78c1c321a2ff8a3ce Mon Sep 17 00:00:00 2001 From: marcushines <80116818+marcushines@users.noreply.github.com> Date: Wed, 28 Feb 2024 15:01:25 -0600 Subject: [PATCH] Revert stream change to method. (#163) Add new service AcctzStream which implements the unidirectional streaming. This will eventually be versioned to main Acctz implementation in a future major release. --- acctz/acctz.pb.go | 28 +++++--- acctz/acctz.proto | 37 ++++++++++- acctz/acctz_grpc.pb.go | 148 ++++++++++++++++++++++++++++++++++++----- 3 files changed, 186 insertions(+), 27 deletions(-) diff --git a/acctz/acctz.pb.go b/acctz/acctz.pb.go index 1eed648..3830e07 100755 --- a/acctz/acctz.pb.go +++ b/acctz/acctz.pb.go @@ -1191,16 +1191,22 @@ var file_github_com_openconfig_gnsi_acctz_acctz_proto_rawDesc = []byte{ 0x74, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, - 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x32, 0x59, 0x0a, 0x05, 0x41, - 0x63, 0x63, 0x74, 0x7a, 0x12, 0x50, 0x0a, 0x0f, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x53, 0x75, + 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x32, 0x5b, 0x0a, 0x05, 0x41, + 0x63, 0x63, 0x74, 0x7a, 0x12, 0x52, 0x0a, 0x0f, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x1c, 0x2e, 0x67, 0x6e, 0x73, 0x69, 0x2e, 0x61, 0x63, 0x63, 0x74, 0x7a, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x67, 0x6e, 0x73, 0x69, 0x2e, 0x61, 0x63, 0x63, 0x74, 0x7a, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, - 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2f, - 0x67, 0x6e, 0x73, 0x69, 0x2f, 0x61, 0x63, 0x63, 0x74, 0x7a, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x32, 0x5f, 0x0a, 0x0b, 0x41, 0x63, 0x63, 0x74, + 0x7a, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x50, 0x0a, 0x0f, 0x52, 0x65, 0x63, 0x6f, 0x72, + 0x64, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x1c, 0x2e, 0x67, 0x6e, 0x73, + 0x69, 0x2e, 0x61, 0x63, 0x63, 0x74, 0x7a, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, + 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x67, 0x6e, 0x73, 0x69, 0x2e, + 0x61, 0x63, 0x63, 0x74, 0x7a, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, + 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x2f, 0x67, 0x6e, 0x73, 0x69, 0x2f, 0x61, 0x63, 0x63, 0x74, 0x7a, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1253,9 +1259,11 @@ var file_github_com_openconfig_gnsi_acctz_acctz_proto_depIdxs = []int32{ 11, // 14: gnsi.acctz.v1.RecordResponse.grpc_service:type_name -> gnsi.acctz.v1.GrpcService 15, // 15: gnsi.acctz.v1.RecordRequest.timestamp:type_name -> google.protobuf.Timestamp 13, // 16: gnsi.acctz.v1.Acctz.RecordSubscribe:input_type -> gnsi.acctz.v1.RecordRequest - 12, // 17: gnsi.acctz.v1.Acctz.RecordSubscribe:output_type -> gnsi.acctz.v1.RecordResponse - 17, // [17:18] is the sub-list for method output_type - 16, // [16:17] is the sub-list for method input_type + 13, // 17: gnsi.acctz.v1.AcctzStream.RecordSubscribe:input_type -> gnsi.acctz.v1.RecordRequest + 12, // 18: gnsi.acctz.v1.Acctz.RecordSubscribe:output_type -> gnsi.acctz.v1.RecordResponse + 12, // 19: gnsi.acctz.v1.AcctzStream.RecordSubscribe:output_type -> gnsi.acctz.v1.RecordResponse + 18, // [18:20] is the sub-list for method output_type + 16, // [16:18] is the sub-list for method input_type 16, // [16:16] is the sub-list for extension type_name 16, // [16:16] is the sub-list for extension extendee 0, // [0:16] is the sub-list for field type_name @@ -1376,7 +1384,7 @@ func file_github_com_openconfig_gnsi_acctz_acctz_proto_init() { NumEnums: 6, NumMessages: 8, NumExtensions: 0, - NumServices: 1, + NumServices: 2, }, GoTypes: file_github_com_openconfig_gnsi_acctz_acctz_proto_goTypes, DependencyIndexes: file_github_com_openconfig_gnsi_acctz_acctz_proto_depIdxs, diff --git a/acctz/acctz.proto b/acctz/acctz.proto index b10dfb8..ab3fcaf 100644 --- a/acctz/acctz.proto +++ b/acctz/acctz.proto @@ -39,15 +39,48 @@ option go_package = "github.com/openconfig/gnsi/acctz"; // The gRPC-level Accounting service exported by targets. -// The Accounting service describes the interfaces between a System +// [DEPRECATED] The Accounting service describes the interfaces between a System // and a remote Collector that collect data about what changes were attempted // or completed on that System through standard interfaces to the System. // // Accounting messages may be collected by connecting to the Accounting // service (Acctz) on a System from a Collector. - +// This service will be replaced with AcctzStream for unidirectional streaming. service Acctz { + // RecordSubscribe returns a stream of Record()s sent from the system to + // a Collector, when the Collector connects to the Acctz service. + // + // A RecordRequest contains a timestamp indicating the last message + // received. The Collector's expectation is that all messages after the + // timestamp will be streamed. If this results in no records to be sent, + // because either the history is empty or no new accounting events occured + // after the requested timestamp, the server will be silent until new + // records are created. + // + // Any history for replay of records or size of the history is subject to + // implementation support and may or may not be configurable. The history + // should be global to the device or virtual tenant, such that new and + // re-connecting clients can access the history. + // + // At connection initiation the Collector may send either the current + // time or a zero time. A zero time indicates that all messages stored on the + // System should be sent to the Collector. A current time indicates that only + // new messages should be sent. + // + // The stream continues ad infinitum, until the gNSI session is severed. + // The gNSI TCP connection could timeout due to expiration of the TCP + // keepalive mechanism, or the server could disconnect the client if the + // output buffer to the client remains full for a long timeout period, + // implying a stuck client (see also getsockopt(SIOCOUTQ) and + // https://datatracker.ietf.org/doc/draft-ietf-idr-bgp-sendholdtimer/). + // + rpc RecordSubscribe(stream RecordRequest) returns (stream RecordResponse); +} + + +service AcctzStream { + // RecordSubscribe returns a stream of Record()s sent from the system to // a Collector, when the Collector connects to the Acctz service. // diff --git a/acctz/acctz_grpc.pb.go b/acctz/acctz_grpc.pb.go index dc0e936..704109e 100755 --- a/acctz/acctz_grpc.pb.go +++ b/acctz/acctz_grpc.pb.go @@ -22,7 +22,7 @@ const _ = grpc.SupportPackageIsVersion7 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type AcctzClient interface { - RecordSubscribe(ctx context.Context, in *RecordRequest, opts ...grpc.CallOption) (Acctz_RecordSubscribeClient, error) + RecordSubscribe(ctx context.Context, opts ...grpc.CallOption) (Acctz_RecordSubscribeClient, error) } type acctzClient struct { @@ -33,22 +33,17 @@ func NewAcctzClient(cc grpc.ClientConnInterface) AcctzClient { return &acctzClient{cc} } -func (c *acctzClient) RecordSubscribe(ctx context.Context, in *RecordRequest, opts ...grpc.CallOption) (Acctz_RecordSubscribeClient, error) { +func (c *acctzClient) RecordSubscribe(ctx context.Context, opts ...grpc.CallOption) (Acctz_RecordSubscribeClient, error) { stream, err := c.cc.NewStream(ctx, &Acctz_ServiceDesc.Streams[0], "/gnsi.acctz.v1.Acctz/RecordSubscribe", opts...) if err != nil { return nil, err } x := &acctzRecordSubscribeClient{stream} - if err := x.ClientStream.SendMsg(in); err != nil { - return nil, err - } - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } return x, nil } type Acctz_RecordSubscribeClient interface { + Send(*RecordRequest) error Recv() (*RecordResponse, error) grpc.ClientStream } @@ -57,6 +52,10 @@ type acctzRecordSubscribeClient struct { grpc.ClientStream } +func (x *acctzRecordSubscribeClient) Send(m *RecordRequest) error { + return x.ClientStream.SendMsg(m) +} + func (x *acctzRecordSubscribeClient) Recv() (*RecordResponse, error) { m := new(RecordResponse) if err := x.ClientStream.RecvMsg(m); err != nil { @@ -69,7 +68,7 @@ func (x *acctzRecordSubscribeClient) Recv() (*RecordResponse, error) { // All implementations must embed UnimplementedAcctzServer // for forward compatibility type AcctzServer interface { - RecordSubscribe(*RecordRequest, Acctz_RecordSubscribeServer) error + RecordSubscribe(Acctz_RecordSubscribeServer) error mustEmbedUnimplementedAcctzServer() } @@ -77,7 +76,7 @@ type AcctzServer interface { type UnimplementedAcctzServer struct { } -func (UnimplementedAcctzServer) RecordSubscribe(*RecordRequest, Acctz_RecordSubscribeServer) error { +func (UnimplementedAcctzServer) RecordSubscribe(Acctz_RecordSubscribeServer) error { return status.Errorf(codes.Unimplemented, "method RecordSubscribe not implemented") } func (UnimplementedAcctzServer) mustEmbedUnimplementedAcctzServer() {} @@ -94,15 +93,12 @@ func RegisterAcctzServer(s grpc.ServiceRegistrar, srv AcctzServer) { } func _Acctz_RecordSubscribe_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(RecordRequest) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(AcctzServer).RecordSubscribe(m, &acctzRecordSubscribeServer{stream}) + return srv.(AcctzServer).RecordSubscribe(&acctzRecordSubscribeServer{stream}) } type Acctz_RecordSubscribeServer interface { Send(*RecordResponse) error + Recv() (*RecordRequest, error) grpc.ServerStream } @@ -114,6 +110,14 @@ func (x *acctzRecordSubscribeServer) Send(m *RecordResponse) error { return x.ServerStream.SendMsg(m) } +func (x *acctzRecordSubscribeServer) Recv() (*RecordRequest, error) { + m := new(RecordRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // Acctz_ServiceDesc is the grpc.ServiceDesc for Acctz service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -126,6 +130,120 @@ var Acctz_ServiceDesc = grpc.ServiceDesc{ StreamName: "RecordSubscribe", Handler: _Acctz_RecordSubscribe_Handler, ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "github.com/openconfig/gnsi/acctz/acctz.proto", +} + +// AcctzStreamClient is the client API for AcctzStream service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type AcctzStreamClient interface { + RecordSubscribe(ctx context.Context, in *RecordRequest, opts ...grpc.CallOption) (AcctzStream_RecordSubscribeClient, error) +} + +type acctzStreamClient struct { + cc grpc.ClientConnInterface +} + +func NewAcctzStreamClient(cc grpc.ClientConnInterface) AcctzStreamClient { + return &acctzStreamClient{cc} +} + +func (c *acctzStreamClient) RecordSubscribe(ctx context.Context, in *RecordRequest, opts ...grpc.CallOption) (AcctzStream_RecordSubscribeClient, error) { + stream, err := c.cc.NewStream(ctx, &AcctzStream_ServiceDesc.Streams[0], "/gnsi.acctz.v1.AcctzStream/RecordSubscribe", opts...) + if err != nil { + return nil, err + } + x := &acctzStreamRecordSubscribeClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type AcctzStream_RecordSubscribeClient interface { + Recv() (*RecordResponse, error) + grpc.ClientStream +} + +type acctzStreamRecordSubscribeClient struct { + grpc.ClientStream +} + +func (x *acctzStreamRecordSubscribeClient) Recv() (*RecordResponse, error) { + m := new(RecordResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// AcctzStreamServer is the server API for AcctzStream service. +// All implementations must embed UnimplementedAcctzStreamServer +// for forward compatibility +type AcctzStreamServer interface { + RecordSubscribe(*RecordRequest, AcctzStream_RecordSubscribeServer) error + mustEmbedUnimplementedAcctzStreamServer() +} + +// UnimplementedAcctzStreamServer must be embedded to have forward compatible implementations. +type UnimplementedAcctzStreamServer struct { +} + +func (UnimplementedAcctzStreamServer) RecordSubscribe(*RecordRequest, AcctzStream_RecordSubscribeServer) error { + return status.Errorf(codes.Unimplemented, "method RecordSubscribe not implemented") +} +func (UnimplementedAcctzStreamServer) mustEmbedUnimplementedAcctzStreamServer() {} + +// UnsafeAcctzStreamServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to AcctzStreamServer will +// result in compilation errors. +type UnsafeAcctzStreamServer interface { + mustEmbedUnimplementedAcctzStreamServer() +} + +func RegisterAcctzStreamServer(s grpc.ServiceRegistrar, srv AcctzStreamServer) { + s.RegisterService(&AcctzStream_ServiceDesc, srv) +} + +func _AcctzStream_RecordSubscribe_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(RecordRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(AcctzStreamServer).RecordSubscribe(m, &acctzStreamRecordSubscribeServer{stream}) +} + +type AcctzStream_RecordSubscribeServer interface { + Send(*RecordResponse) error + grpc.ServerStream +} + +type acctzStreamRecordSubscribeServer struct { + grpc.ServerStream +} + +func (x *acctzStreamRecordSubscribeServer) Send(m *RecordResponse) error { + return x.ServerStream.SendMsg(m) +} + +// AcctzStream_ServiceDesc is the grpc.ServiceDesc for AcctzStream service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var AcctzStream_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "gnsi.acctz.v1.AcctzStream", + HandlerType: (*AcctzStreamServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "RecordSubscribe", + Handler: _AcctzStream_RecordSubscribe_Handler, + ServerStreams: true, }, }, Metadata: "github.com/openconfig/gnsi/acctz/acctz.proto",