diff --git a/titus-common-client/src/main/java/com/netflix/titus/client/clustermembership/grpc/ReactorClusterMembershipClient.java b/titus-common-client/src/main/java/com/netflix/titus/client/clustermembership/grpc/ClusterMembershipClient.java similarity index 86% rename from titus-common-client/src/main/java/com/netflix/titus/client/clustermembership/grpc/ReactorClusterMembershipClient.java rename to titus-common-client/src/main/java/com/netflix/titus/client/clustermembership/grpc/ClusterMembershipClient.java index 9854f47fb2..506003faac 100644 --- a/titus-common-client/src/main/java/com/netflix/titus/client/clustermembership/grpc/ReactorClusterMembershipClient.java +++ b/titus-common-client/src/main/java/com/netflix/titus/client/clustermembership/grpc/ClusterMembershipClient.java @@ -27,12 +27,10 @@ import reactor.core.publisher.Mono; /** - * Cluster membership Spring-Reactor client interface. To be used with - * {@link com.netflix.titus.runtime.connector.common.reactor.client.ReactorToGrpcClientBuilder}. - * This API maps to the GRPC ClusterMembershipService documented at: + * Reactor wrapper around the GRPC ClusterMembershipService documented at: * https://github.com/Netflix/titus-api-definitions/blob/master/src/main/proto/netflix/titus/titus_cluster_membership_api.proto. */ -public interface ReactorClusterMembershipClient { +public interface ClusterMembershipClient { Mono getMembers(); @@ -47,4 +45,6 @@ public interface ReactorClusterMembershipClient { Mono stopBeingLeader(); Flux events(); + + void shutdown(); } diff --git a/titus-common-client/src/main/java/com/netflix/titus/client/clustermembership/resolver/SingleClusterMemberResolver.java b/titus-common-client/src/main/java/com/netflix/titus/client/clustermembership/resolver/SingleClusterMemberResolver.java index 3238e586d2..7f4c57e3b6 100644 --- a/titus-common-client/src/main/java/com/netflix/titus/client/clustermembership/resolver/SingleClusterMemberResolver.java +++ b/titus-common-client/src/main/java/com/netflix/titus/client/clustermembership/resolver/SingleClusterMemberResolver.java @@ -20,7 +20,6 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; import com.google.common.annotations.VisibleForTesting; import com.netflix.titus.api.clustermembership.model.ClusterMember; @@ -30,21 +29,18 @@ import com.netflix.titus.api.clustermembership.model.ClusterMembershipFunctions; import com.netflix.titus.api.clustermembership.model.ClusterMembershipRevision; import com.netflix.titus.api.clustermembership.model.ClusterMembershipSnapshot; -import com.netflix.titus.api.model.callmetadata.CallMetadata; -import com.netflix.titus.client.clustermembership.grpc.ReactorClusterMembershipClient; +import com.netflix.titus.client.clustermembership.grpc.ClusterMembershipClient; import com.netflix.titus.common.runtime.TitusRuntime; import com.netflix.titus.common.util.CollectionsExt; import com.netflix.titus.common.util.ExceptionExt; -import com.netflix.titus.common.util.grpc.reactor.client.ReactorToGrpcClientBuilder; +import com.netflix.titus.common.util.closeable.CloseableReference; import com.netflix.titus.common.util.rx.ReactorExt; import com.netflix.titus.common.util.rx.ReactorRetriers; import com.netflix.titus.common.util.rx.RetryHandlerBuilder; import com.netflix.titus.common.util.time.Clock; import com.netflix.titus.grpc.protogen.ClusterMember.LeadershipState; import com.netflix.titus.grpc.protogen.ClusterMembershipEvent; -import com.netflix.titus.grpc.protogen.ClusterMembershipServiceGrpc; import com.netflix.titus.runtime.common.grpc.GrpcClientErrorUtils; -import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.StatusRuntimeException; import org.slf4j.Logger; @@ -55,7 +51,6 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.ReplayProcessor; import reactor.core.publisher.SignalType; -import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import static com.netflix.titus.api.clustermembership.model.ClusterMembershipFunctions.hasIpAddress; @@ -74,14 +69,13 @@ public class SingleClusterMemberResolver implements DirectClusterMemberResolver */ private static final Duration GRPC_REQUEST_TIMEOUT = Duration.ofSeconds(1); + private final CloseableReference clientRef; private final String name; private final ClusterMembershipResolverConfiguration configuration; private final ClusterMemberAddress address; private final ClusterMemberVerifier clusterMemberVerifier; private final Clock clock; - private final ManagedChannel channel; - private final ReactorClusterMembershipClient client; private final Disposable eventStreamDisposable; private volatile String rejectedMemberError; @@ -95,33 +89,20 @@ public class SingleClusterMemberResolver implements DirectClusterMemberResolver private final AtomicReference disconnectTimeRef; public SingleClusterMemberResolver(ClusterMembershipResolverConfiguration configuration, - Function channelProvider, + CloseableReference clientRef, ClusterMemberAddress address, ClusterMemberVerifier clusterMemberVerifier, - Scheduler scheduler, TitusRuntime titusRuntime) { + this.clientRef = clientRef; this.name = "member@" + ClusterMembershipFunctions.toStringUri(address); this.configuration = configuration; this.address = address; this.clusterMemberVerifier = clusterMemberVerifier; this.clock = titusRuntime.getClock(); - this.channel = channelProvider.apply(address); this.disconnectTimeRef = new AtomicReference<>(clock.wallTime()); - this.client = ReactorToGrpcClientBuilder - .newBuilderWithDefaults( - ReactorClusterMembershipClient.class, - ClusterMembershipServiceGrpc.newStub(channel), - ClusterMembershipServiceGrpc.getServiceDescriptor(), - CallMetadata.class - ) - // FIXME Once call metadata interceptor is moved into common module -// .withGrpcStubDecorator(AnonymousCallMetadataResolver.getInstance()) - .withTimeout(GRPC_REQUEST_TIMEOUT) - .withStreamingTimeout(Duration.ofMillis(configuration.getSingleMemberReconnectIntervalMs())) - .build(); - this.eventStreamDisposable = client.events() + this.eventStreamDisposable = clientRef.get().events() .materialize() .flatMap(signal -> { if (signal.getType() == SignalType.ON_COMPLETE) { @@ -189,7 +170,7 @@ private Optional checkSnapshot(ClusterMembershipEvent.Snapshot snapshot) public void shutdown() { ReactorExt.safeDispose(eventStreamDisposable); ExceptionExt.silent(eventSink::complete); - ExceptionExt.silent(channel::shutdownNow); + clientRef.close(); } @Override diff --git a/titus-common-client/src/test/java/com/netflix/titus/client/clustermembership/resolver/SimpleClusterMembershipClient.java b/titus-common-client/src/test/java/com/netflix/titus/client/clustermembership/resolver/SimpleClusterMembershipClient.java new file mode 100644 index 0000000000..eee58c90dc --- /dev/null +++ b/titus-common-client/src/test/java/com/netflix/titus/client/clustermembership/resolver/SimpleClusterMembershipClient.java @@ -0,0 +1,127 @@ +/* + * Copyright 2022 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.titus.client.clustermembership.resolver; + +import com.google.protobuf.Empty; +import com.netflix.titus.client.clustermembership.grpc.ClusterMembershipClient; +import com.netflix.titus.grpc.protogen.ClusterMembershipEvent; +import com.netflix.titus.grpc.protogen.ClusterMembershipRevision; +import com.netflix.titus.grpc.protogen.ClusterMembershipRevisions; +import com.netflix.titus.grpc.protogen.ClusterMembershipServiceGrpc; +import com.netflix.titus.grpc.protogen.ClusterMembershipServiceGrpc.ClusterMembershipServiceStub; +import com.netflix.titus.grpc.protogen.DeleteMemberLabelsRequest; +import com.netflix.titus.grpc.protogen.EnableMemberRequest; +import com.netflix.titus.grpc.protogen.MemberId; +import com.netflix.titus.grpc.protogen.UpdateMemberLabelsRequest; +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoSink; + +/** + * FIXME We need this implementation for testing as DefaultClusterMembershipClient is located in titus-common-server package. + * To resolve this issue we have to move CallMetadata classes to titus-common-client. + */ +class SimpleClusterMembershipClient implements ClusterMembershipClient { + + private final ManagedChannel channel; + private final ClusterMembershipServiceStub stub; + + public SimpleClusterMembershipClient(ManagedChannel channel) { + this.channel = channel; + this.stub = ClusterMembershipServiceGrpc.newStub(channel); + } + + @Override + public Mono getMembers() { + return Mono.create(sink -> stub.getMembers(Empty.getDefaultInstance(), connectSink(sink))); + } + + @Override + public Mono getMember(MemberId request) { + return Mono.create(sink -> stub.getMember(request, connectSink(sink))); + } + + @Override + public Mono updateMemberLabels(UpdateMemberLabelsRequest request) { + return Mono.create(sink -> stub.updateMemberLabels(request, connectSink(sink))); + } + + @Override + public Mono deleteMemberLabels(DeleteMemberLabelsRequest request) { + return Mono.create(sink -> stub.deleteMemberLabels(request, connectSink(sink))); + } + + @Override + public Mono enableMember(EnableMemberRequest request) { + return Mono.create(sink -> stub.enableMember(request, connectSink(sink))); + } + + @Override + public Mono stopBeingLeader() { + return Mono.create(sink -> stub.stopBeingLeader(Empty.getDefaultInstance(), connectSink(sink))) + .ignoreElement() + .cast(Void.class); + } + + @Override + public Flux events() { + return Flux.create(sink -> { + stub.events(Empty.getDefaultInstance(), new StreamObserver() { + @Override + public void onNext(ClusterMembershipEvent value) { + sink.next(value); + } + + @Override + public void onError(Throwable t) { + sink.error(t); + } + + @Override + public void onCompleted() { + sink.complete(); + } + }); + }); + } + + @Override + public void shutdown() { + channel.shutdownNow(); + } + + private StreamObserver connectSink(MonoSink sink) { + return new StreamObserver() { + @Override + public void onNext(T value) { + sink.success(value); + } + + @Override + public void onError(Throwable t) { + sink.error(t); + } + + @Override + public void onCompleted() { + sink.success(); + } + }; + } +} diff --git a/titus-common-client/src/test/java/com/netflix/titus/client/clustermembership/resolver/SingleClusterMemberResolverTest.java b/titus-common-client/src/test/java/com/netflix/titus/client/clustermembership/resolver/SingleClusterMemberResolverTest.java index 6771194bac..f86ac65131 100644 --- a/titus-common-client/src/test/java/com/netflix/titus/client/clustermembership/resolver/SingleClusterMemberResolverTest.java +++ b/titus-common-client/src/test/java/com/netflix/titus/client/clustermembership/resolver/SingleClusterMemberResolverTest.java @@ -22,10 +22,12 @@ import com.netflix.titus.api.clustermembership.model.ClusterMemberAddress; import com.netflix.titus.api.clustermembership.model.ClusterMembershipSnapshot; +import com.netflix.titus.client.clustermembership.grpc.ClusterMembershipClient; import com.netflix.titus.common.runtime.TitusRuntime; import com.netflix.titus.common.runtime.TitusRuntimes; import com.netflix.titus.common.util.ExceptionExt; import com.netflix.titus.common.util.archaius2.Archaius2Ext; +import com.netflix.titus.common.util.closeable.CloseableReference; import com.netflix.titus.grpc.protogen.ClusterMember; import com.netflix.titus.grpc.protogen.ClusterMembershipRevision; import io.grpc.Server; @@ -34,7 +36,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import reactor.core.scheduler.Schedulers; import static com.jayway.awaitility.Awaitility.await; import static org.assertj.core.api.Assertions.assertThat; @@ -91,12 +92,12 @@ private void setupServerAndResolver(ClusterMemberVerifier clusterMemberVerifier) .build() .start(); + SimpleClusterMembershipClient client = new SimpleClusterMembershipClient(InProcessChannelBuilder.forName(serviceName).directExecutor().build()); this.resolver = new SingleClusterMemberResolver( configuration, - address -> InProcessChannelBuilder.forName(serviceName).directExecutor().build(), + CloseableReference.referenceOf(client, ClusterMembershipClient::shutdown), ADDRESS, clusterMemberVerifier, - Schedulers.parallel(), titusRuntime ); } diff --git a/titus-common-server/src/main/java/com/netflix/titus/runtime/clustermembership/DefaultClusterMembershipClient.java b/titus-common-server/src/main/java/com/netflix/titus/runtime/clustermembership/DefaultClusterMembershipClient.java new file mode 100644 index 0000000000..c2f51903a1 --- /dev/null +++ b/titus-common-server/src/main/java/com/netflix/titus/runtime/clustermembership/DefaultClusterMembershipClient.java @@ -0,0 +1,87 @@ +/* + * Copyright 2022 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.titus.runtime.clustermembership; + +import com.google.protobuf.Empty; +import com.netflix.titus.client.clustermembership.grpc.ClusterMembershipClient; +import com.netflix.titus.common.util.ExceptionExt; +import com.netflix.titus.grpc.protogen.ClusterMembershipEvent; +import com.netflix.titus.grpc.protogen.ClusterMembershipRevision; +import com.netflix.titus.grpc.protogen.ClusterMembershipRevisions; +import com.netflix.titus.grpc.protogen.ClusterMembershipServiceGrpc; +import com.netflix.titus.grpc.protogen.ClusterMembershipServiceGrpc.ClusterMembershipServiceStub; +import com.netflix.titus.grpc.protogen.DeleteMemberLabelsRequest; +import com.netflix.titus.grpc.protogen.EnableMemberRequest; +import com.netflix.titus.grpc.protogen.MemberId; +import com.netflix.titus.grpc.protogen.UpdateMemberLabelsRequest; +import com.netflix.titus.runtime.endpoint.common.grpc.assistant.GrpcClientCallAssistant; +import com.netflix.titus.runtime.endpoint.common.grpc.assistant.GrpcClientCallAssistantFactory; +import io.grpc.ManagedChannel; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class DefaultClusterMembershipClient implements ClusterMembershipClient { + + private final GrpcClientCallAssistant assistant; + private final ManagedChannel managedChannel; + + public DefaultClusterMembershipClient(GrpcClientCallAssistantFactory assistantFactory, + ManagedChannel managedChannel) { + this.managedChannel = managedChannel; + this.assistant = assistantFactory.create(ClusterMembershipServiceGrpc.newStub(managedChannel)); + } + + @Override + public Mono getMembers() { + return assistant.asMono((stub, responseStream) -> stub.getMembers(Empty.getDefaultInstance(), responseStream)); + } + + @Override + public Mono getMember(MemberId request) { + return assistant.asMono((stub, responseStream) -> stub.getMember(request, responseStream)); + } + + @Override + public Mono updateMemberLabels(UpdateMemberLabelsRequest request) { + return assistant.asMono((stub, responseStream) -> stub.updateMemberLabels(request, responseStream)); + } + + @Override + public Mono deleteMemberLabels(DeleteMemberLabelsRequest request) { + return assistant.asMono((stub, responseStream) -> stub.deleteMemberLabels(request, responseStream)); + } + + @Override + public Mono enableMember(EnableMemberRequest request) { + return assistant.asMono((stub, responseStream) -> stub.enableMember(request, responseStream)); + } + + @Override + public Mono stopBeingLeader() { + return assistant.asMonoEmpty((stub, responseStream) -> stub.stopBeingLeader(Empty.getDefaultInstance(), responseStream)); + } + + @Override + public Flux events() { + return assistant.asFlux((stub, responseStream) -> stub.events(Empty.getDefaultInstance(), responseStream)); + } + + @Override + public void shutdown() { + ExceptionExt.silent(managedChannel::shutdownNow); + } +} diff --git a/titus-common-server/src/main/java/com/netflix/titus/runtime/clustermembership/endpoint/grpc/ClusterMembershipGrpcEndpointComponent.java b/titus-common-server/src/main/java/com/netflix/titus/runtime/clustermembership/endpoint/grpc/ClusterMembershipGrpcEndpointComponent.java index a52c89f949..55cd8085aa 100644 --- a/titus-common-server/src/main/java/com/netflix/titus/runtime/clustermembership/endpoint/grpc/ClusterMembershipGrpcEndpointComponent.java +++ b/titus-common-server/src/main/java/com/netflix/titus/runtime/clustermembership/endpoint/grpc/ClusterMembershipGrpcEndpointComponent.java @@ -18,6 +18,7 @@ import com.netflix.titus.api.clustermembership.service.ClusterMembershipService; import com.netflix.titus.common.runtime.TitusRuntime; +import com.netflix.titus.runtime.endpoint.common.grpc.assistant.GrpcServerCallAssistant; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -25,7 +26,9 @@ public class ClusterMembershipGrpcEndpointComponent { @Bean - public ReactorClusterMembershipGrpcService getReactorClusterMembershipGrpcService(ClusterMembershipService service, TitusRuntime titusRuntime) { - return new ReactorClusterMembershipGrpcService(service, titusRuntime); + public GrpcClusterMembershipService getGrpcClusterMembershipService(ClusterMembershipService service, + GrpcServerCallAssistant assistant, + TitusRuntime titusRuntime) { + return new GrpcClusterMembershipService(service, assistant, titusRuntime); } } diff --git a/titus-common-server/src/main/java/com/netflix/titus/runtime/clustermembership/endpoint/grpc/ReactorClusterMembershipGrpcService.java b/titus-common-server/src/main/java/com/netflix/titus/runtime/clustermembership/endpoint/grpc/GrpcClusterMembershipService.java similarity index 65% rename from titus-common-server/src/main/java/com/netflix/titus/runtime/clustermembership/endpoint/grpc/ReactorClusterMembershipGrpcService.java rename to titus-common-server/src/main/java/com/netflix/titus/runtime/clustermembership/endpoint/grpc/GrpcClusterMembershipService.java index 5f05f5f841..6bf56fa19e 100644 --- a/titus-common-server/src/main/java/com/netflix/titus/runtime/clustermembership/endpoint/grpc/ReactorClusterMembershipGrpcService.java +++ b/titus-common-server/src/main/java/com/netflix/titus/runtime/clustermembership/endpoint/grpc/GrpcClusterMembershipService.java @@ -24,6 +24,7 @@ import javax.inject.Singleton; import com.google.common.base.Preconditions; +import com.google.protobuf.Empty; import com.netflix.titus.api.clustermembership.model.ClusterMember; import com.netflix.titus.api.clustermembership.model.ClusterMemberLeadershipState; import com.netflix.titus.api.clustermembership.model.event.ClusterMembershipSnapshotEvent; @@ -35,10 +36,13 @@ import com.netflix.titus.grpc.protogen.ClusterMembershipEvent; import com.netflix.titus.grpc.protogen.ClusterMembershipRevision; import com.netflix.titus.grpc.protogen.ClusterMembershipRevisions; +import com.netflix.titus.grpc.protogen.ClusterMembershipServiceGrpc; import com.netflix.titus.grpc.protogen.DeleteMemberLabelsRequest; import com.netflix.titus.grpc.protogen.EnableMemberRequest; import com.netflix.titus.grpc.protogen.MemberId; import com.netflix.titus.grpc.protogen.UpdateMemberLabelsRequest; +import com.netflix.titus.runtime.endpoint.common.grpc.assistant.GrpcServerCallAssistant; +import io.grpc.stub.StreamObserver; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -46,61 +50,75 @@ import static com.netflix.titus.runtime.clustermembership.endpoint.grpc.MemberDataMixer.NO_LEADER_ID; @Singleton -public class ReactorClusterMembershipGrpcService { +public class GrpcClusterMembershipService extends ClusterMembershipServiceGrpc.ClusterMembershipServiceImplBase { private final String localMemberId; private final ClusterMembershipService service; - + private final GrpcServerCallAssistant assistant; private final Clock clock; @Inject - public ReactorClusterMembershipGrpcService(ClusterMembershipService service, TitusRuntime titusRuntime) { + public GrpcClusterMembershipService(ClusterMembershipService service, + GrpcServerCallAssistant assistant, + TitusRuntime titusRuntime) { this.localMemberId = service.getLocalLeadership().getCurrent().getMemberId(); this.service = service; + this.assistant = assistant; this.clock = titusRuntime.getClock(); } /** * Get all known cluster members. */ - public Mono getMembers() { - return Mono.fromCallable(() -> { - List grpcRevisions = new ArrayList<>(); - grpcRevisions.add(toGrpcClusterMembershipRevision(service.getLocalClusterMember(), isLocalLeader())); + @Override + public void getMembers(Empty request, StreamObserver responseObserver) { + assistant.callDirect(responseObserver, context -> getMembersInternal()); + } - String leaderId = getLeaderId(); - for (com.netflix.titus.api.clustermembership.model.ClusterMembershipRevision sibling : service.getClusterMemberSiblings().values()) { - grpcRevisions.add(toGrpcClusterMembershipRevision(sibling, sibling.getCurrent().getMemberId().equals(leaderId))); - } + public ClusterMembershipRevisions getMembersInternal() { + List grpcRevisions = new ArrayList<>(); + grpcRevisions.add(toGrpcClusterMembershipRevision(service.getLocalClusterMember(), isLocalLeader())); + + String leaderId = getLeaderId(); + for (com.netflix.titus.api.clustermembership.model.ClusterMembershipRevision sibling : service.getClusterMemberSiblings().values()) { + grpcRevisions.add(toGrpcClusterMembershipRevision(sibling, sibling.getCurrent().getMemberId().equals(leaderId))); + } - return ClusterMembershipRevisions.newBuilder().addAllRevisions(grpcRevisions).build(); - }); + return ClusterMembershipRevisions.newBuilder().addAllRevisions(grpcRevisions).build(); } /** * Get member with the given id. */ - public Mono getMember(MemberId request) { - return Mono.fromCallable(() -> { - String memberId = request.getId(); + @Override + public void getMember(MemberId request, StreamObserver responseObserver) { + assistant.callDirect(responseObserver, context -> getMemberInternal(request)); + } + + public ClusterMembershipRevision getMemberInternal(MemberId request) { + String memberId = request.getId(); - if (service.getLocalClusterMember().getCurrent().getMemberId().equals(memberId)) { - return toGrpcClusterMembershipRevision(service.getLocalClusterMember(), isLocalLeader()); - } + if (service.getLocalClusterMember().getCurrent().getMemberId().equals(memberId)) { + return toGrpcClusterMembershipRevision(service.getLocalClusterMember(), isLocalLeader()); + } - com.netflix.titus.api.clustermembership.model.ClusterMembershipRevision sibling = service.getClusterMemberSiblings().get(memberId); - if (sibling == null) { - throw ClusterMembershipServiceException.memberNotFound(memberId); - } - return toGrpcClusterMembershipRevision(sibling, getLeaderId().equals(memberId)); - }); + com.netflix.titus.api.clustermembership.model.ClusterMembershipRevision sibling = service.getClusterMemberSiblings().get(memberId); + if (sibling == null) { + throw ClusterMembershipServiceException.memberNotFound(memberId); + } + return toGrpcClusterMembershipRevision(sibling, getLeaderId().equals(memberId)); } /** * Adds all labels from the request object to the target member. Labels that exist are * overridden. Returns the updated object. */ - public Mono updateMemberLabels(UpdateMemberLabelsRequest request) { + @Override + public void updateMemberLabels(UpdateMemberLabelsRequest request, StreamObserver responseObserver) { + assistant.callMono(responseObserver, context -> updateMemberLabelsInternal(request)); + } + + public Mono updateMemberLabelsInternal(UpdateMemberLabelsRequest request) { if (!request.getMemberId().equals(localMemberId)) { return Mono.error(ClusterMembershipServiceException.localOnly(request.getMemberId())); } @@ -124,7 +142,12 @@ public Mono updateMemberLabels(UpdateMemberLabelsRequ * Removes all specified labels from the target object. Labels that do not exist are ignored. * Returns the updated object. */ - public Mono deleteMemberLabels(DeleteMemberLabelsRequest request) { + @Override + public void deleteMemberLabels(DeleteMemberLabelsRequest request, StreamObserver responseObserver) { + assistant.callMono(responseObserver, context -> deleteMemberLabelsInternal(request)); + } + + public Mono deleteMemberLabelsInternal(DeleteMemberLabelsRequest request) { if (!request.getMemberId().equals(localMemberId)) { return Mono.error(ClusterMembershipServiceException.localOnly(request.getMemberId())); } @@ -147,7 +170,12 @@ public Mono deleteMemberLabels(DeleteMemberLabelsRequ /** * Enable or disable a member. */ - public Mono enableMember(EnableMemberRequest request) { + @Override + public void enableMember(EnableMemberRequest request, StreamObserver responseObserver) { + assistant.callMono(responseObserver, context -> enableMemberInternal(request)); + } + + public Mono enableMemberInternal(EnableMemberRequest request) { if (!request.getMemberId().equals(localMemberId)) { return Mono.error(ClusterMembershipServiceException.localOnly(request.getMemberId())); } @@ -168,15 +196,21 @@ public Mono enableMember(EnableMemberRequest request) * Requests the member that handles this request to stop being leader. If the given member * is not a leader, the request is ignored. */ - public Mono stopBeingLeader() { + @Override + public void stopBeingLeader(Empty request, StreamObserver responseObserver) { + assistant.callMonoEmpty(responseObserver, context -> stopBeingLeaderInternal()); + } + + public Mono stopBeingLeaderInternal() { return service.stopBeingLeader(); } /** * Event stream. */ - public Flux events() { - return Flux.defer(() -> { + @Override + public void events(Empty request, StreamObserver responseObserver) { + assistant.callFlux(responseObserver, context -> Flux.defer(() -> { AtomicReference leaderRef = new AtomicReference<>(); return service.events().flatMapIterable(event -> { MemberDataMixer data = leaderRef.get(); @@ -192,7 +226,7 @@ public Flux events() { } return data.process(event); }); - }); + })); } private String getLeaderId() { diff --git a/titus-common-server/src/main/java/com/netflix/titus/runtime/clustermembership/endpoint/rest/ClusterMembershipRestEndpointComponent.java b/titus-common-server/src/main/java/com/netflix/titus/runtime/clustermembership/endpoint/rest/ClusterMembershipRestEndpointComponent.java index 67862fdaf7..0c8803ffb1 100644 --- a/titus-common-server/src/main/java/com/netflix/titus/runtime/clustermembership/endpoint/rest/ClusterMembershipRestEndpointComponent.java +++ b/titus-common-server/src/main/java/com/netflix/titus/runtime/clustermembership/endpoint/rest/ClusterMembershipRestEndpointComponent.java @@ -17,7 +17,7 @@ package com.netflix.titus.runtime.clustermembership.endpoint.rest; import com.netflix.titus.runtime.clustermembership.activation.LeaderActivationStatus; -import com.netflix.titus.runtime.clustermembership.endpoint.grpc.ReactorClusterMembershipGrpcService; +import com.netflix.titus.runtime.clustermembership.endpoint.grpc.GrpcClusterMembershipService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -31,7 +31,7 @@ public class ClusterMembershipRestEndpointComponent implements WebMvcConfigurer private LeaderActivationStatus leaderActivationStatus; @Bean - public ClusterMembershipSpringResource getClusterMembershipSpringResource(ReactorClusterMembershipGrpcService reactorClusterMembershipService) { + public ClusterMembershipSpringResource getClusterMembershipSpringResource(GrpcClusterMembershipService reactorClusterMembershipService) { return new ClusterMembershipSpringResource(reactorClusterMembershipService); } diff --git a/titus-common-server/src/main/java/com/netflix/titus/runtime/clustermembership/endpoint/rest/ClusterMembershipSpringResource.java b/titus-common-server/src/main/java/com/netflix/titus/runtime/clustermembership/endpoint/rest/ClusterMembershipSpringResource.java index 5885e797a9..2daaf20bdd 100644 --- a/titus-common-server/src/main/java/com/netflix/titus/runtime/clustermembership/endpoint/rest/ClusterMembershipSpringResource.java +++ b/titus-common-server/src/main/java/com/netflix/titus/runtime/clustermembership/endpoint/rest/ClusterMembershipSpringResource.java @@ -24,7 +24,7 @@ import com.netflix.titus.grpc.protogen.EnableMemberRequest; import com.netflix.titus.grpc.protogen.MemberId; import com.netflix.titus.grpc.protogen.UpdateMemberLabelsRequest; -import com.netflix.titus.runtime.clustermembership.endpoint.grpc.ReactorClusterMembershipGrpcService; +import com.netflix.titus.runtime.clustermembership.endpoint.grpc.GrpcClusterMembershipService; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; @@ -37,20 +37,20 @@ public class ClusterMembershipSpringResource { private static final Duration TIMEOUT = Duration.ofSeconds(1); - private final ReactorClusterMembershipGrpcService clusterMembershipService; + private final GrpcClusterMembershipService clusterMembershipService; - public ClusterMembershipSpringResource(ReactorClusterMembershipGrpcService clusterMembershipService) { + public ClusterMembershipSpringResource(GrpcClusterMembershipService clusterMembershipService) { this.clusterMembershipService = clusterMembershipService; } @RequestMapping(method = RequestMethod.GET, path = "/members", produces = "application/json") public ClusterMembershipRevisions getMembers() { - return clusterMembershipService.getMembers().block(TIMEOUT); + return clusterMembershipService.getMembersInternal(); } @RequestMapping(method = RequestMethod.GET, path = "/members/{memberId}", produces = "application/json") public ClusterMembershipRevision getMember(@PathVariable("memberId") String memberId) { - return clusterMembershipService.getMember(MemberId.newBuilder().setId(memberId).build()).block(TIMEOUT); + return clusterMembershipService.getMemberInternal(MemberId.newBuilder().setId(memberId).build()); } @RequestMapping(method = RequestMethod.POST, path = "/members/{memberId}/labels", consumes = "application/json", produces = "application/json") @@ -59,7 +59,7 @@ public ClusterMembershipRevision updateMemberLabels(@PathVariable("memberId") St if (request.getMemberId().equals(memberId)) { throw new IllegalArgumentException("Member id in path and request body are different"); } - return clusterMembershipService.updateMemberLabels(request).block(TIMEOUT); + return clusterMembershipService.updateMemberLabelsInternal(request).block(TIMEOUT); } @RequestMapping(method = RequestMethod.DELETE, path = "/members/{memberId}/labels", consumes = "application/json", produces = "application/json") @@ -68,7 +68,7 @@ public ClusterMembershipRevision deleteMemberLabels(@PathVariable("memberId") St if (request.getMemberId().equals(memberId)) { throw new IllegalArgumentException("Member id in path and request body are different"); } - return clusterMembershipService.deleteMemberLabels(request).block(TIMEOUT); + return clusterMembershipService.deleteMemberLabelsInternal(request).block(TIMEOUT); } @RequestMapping(method = RequestMethod.POST, path = "/members/{memberId}/enable", consumes = "application/json", produces = "application/json") @@ -77,11 +77,11 @@ public ClusterMembershipRevision enableMember(@PathVariable("memberId") String m if (request.getMemberId().equals(memberId)) { throw new IllegalArgumentException("Member id in path and request body are different"); } - return clusterMembershipService.enableMember(request).block(TIMEOUT); + return clusterMembershipService.enableMemberInternal(request).block(TIMEOUT); } @RequestMapping(method = RequestMethod.POST, path = "/members/{memberId}/stopBeingLeader") public void stopBeingLeader() { - clusterMembershipService.stopBeingLeader().block(TIMEOUT); + clusterMembershipService.stopBeingLeaderInternal().block(TIMEOUT); } } diff --git a/titus-common-server/src/main/java/com/netflix/titus/runtime/endpoint/common/grpc/TitusGrpcServer.java b/titus-common-server/src/main/java/com/netflix/titus/runtime/endpoint/common/grpc/TitusGrpcServer.java index f765c7fe6b..2691daa962 100644 --- a/titus-common-server/src/main/java/com/netflix/titus/runtime/endpoint/common/grpc/TitusGrpcServer.java +++ b/titus-common-server/src/main/java/com/netflix/titus/runtime/endpoint/common/grpc/TitusGrpcServer.java @@ -113,7 +113,7 @@ public static class Builder { private final List interceptors = new ArrayList<>(); private final List>> serviceExceptionMappers = new ArrayList<>(); private Duration shutdownTime; - private UnaryOperator serverConfigurer; + private final List> serverConfigurers = new ArrayList<>(); private Builder() { // Add default exception mappings. @@ -143,7 +143,7 @@ public Builder withShutdownTime(Duration shutdownTime) { } public Builder withServerConfigurer(UnaryOperator serverConfigurer) { - this.serverConfigurer = serverConfigurer; + this.serverConfigurers.add(serverConfigurer); return this; } @@ -177,8 +177,8 @@ public TitusGrpcServer build() { commonInterceptors.addAll(interceptors); ServerBuilder serverBuilder = ServerBuilder.forPort(port); - if (serverConfigurer != null) { - serverBuilder = serverConfigurer.apply(serverBuilder); + for (UnaryOperator c : serverConfigurers) { + c.apply(serverBuilder); } for (ServiceBuilder serviceBuilder : serviceBuilders.values()) { serverBuilder.addService(serviceBuilder.build(commonInterceptors)); diff --git a/titus-common-server/src/test/java/com/netflix/titus/runtime/clustermembership/endpoint/grpc/ClusterMembershipGrpcServerTest.java b/titus-common-server/src/test/java/com/netflix/titus/runtime/clustermembership/endpoint/grpc/ClusterMembershipGrpcServerTest.java index 17101de57f..a650ebff25 100644 --- a/titus-common-server/src/test/java/com/netflix/titus/runtime/clustermembership/endpoint/grpc/ClusterMembershipGrpcServerTest.java +++ b/titus-common-server/src/test/java/com/netflix/titus/runtime/clustermembership/endpoint/grpc/ClusterMembershipGrpcServerTest.java @@ -20,7 +20,7 @@ import java.util.function.Consumer; import com.netflix.titus.api.clustermembership.service.ClusterMembershipService; -import com.netflix.titus.client.clustermembership.grpc.ReactorClusterMembershipClient; +import com.netflix.titus.client.clustermembership.grpc.ClusterMembershipClient; import com.netflix.titus.grpc.protogen.ClusterMember; import com.netflix.titus.grpc.protogen.ClusterMembershipEvent; import com.netflix.titus.grpc.protogen.ClusterMembershipRevision; @@ -47,7 +47,7 @@ public class ClusterMembershipGrpcServerTest { private final TitusRxSubscriber eventSubscriber = new TitusRxSubscriber<>(); - private ReactorClusterMembershipClient client; + private ClusterMembershipClient client; @Before public void setUp() { diff --git a/titus-common-server/src/test/java/com/netflix/titus/runtime/clustermembership/endpoint/grpc/ClusterMembershipServerResource.java b/titus-common-server/src/test/java/com/netflix/titus/runtime/clustermembership/endpoint/grpc/ClusterMembershipServerResource.java index 515a263b95..7c2a665f2d 100644 --- a/titus-common-server/src/test/java/com/netflix/titus/runtime/clustermembership/endpoint/grpc/ClusterMembershipServerResource.java +++ b/titus-common-server/src/test/java/com/netflix/titus/runtime/clustermembership/endpoint/grpc/ClusterMembershipServerResource.java @@ -17,21 +17,21 @@ package com.netflix.titus.runtime.clustermembership.endpoint.grpc; import java.time.Duration; -import java.util.Collections; import com.netflix.titus.api.clustermembership.service.ClusterMembershipService; -import com.netflix.titus.api.model.callmetadata.CallMetadata; -import com.netflix.titus.api.model.callmetadata.CallMetadataConstants; -import com.netflix.titus.client.clustermembership.grpc.ReactorClusterMembershipClient; +import com.netflix.titus.client.clustermembership.grpc.ClusterMembershipClient; import com.netflix.titus.common.runtime.TitusRuntime; import com.netflix.titus.common.runtime.TitusRuntimes; import com.netflix.titus.common.util.archaius2.Archaius2Ext; -import com.netflix.titus.common.util.grpc.reactor.server.DefaultGrpcToReactorServerFactory; -import com.netflix.titus.grpc.protogen.ClusterMembershipServiceGrpc; +import com.netflix.titus.runtime.clustermembership.DefaultClusterMembershipClient; import com.netflix.titus.runtime.connector.GrpcRequestConfiguration; -import com.netflix.titus.runtime.connector.common.reactor.DefaultGrpcToReactorClientFactory; import com.netflix.titus.runtime.endpoint.common.grpc.CommonGrpcEndpointConfiguration; import com.netflix.titus.runtime.endpoint.common.grpc.TitusGrpcServer; +import com.netflix.titus.runtime.endpoint.common.grpc.assistant.DefaultGrpcClientCallAssistantFactory; +import com.netflix.titus.runtime.endpoint.common.grpc.assistant.DefaultGrpcServerCallAssistant; +import com.netflix.titus.runtime.endpoint.common.grpc.assistant.GrpcCallAssistantConfiguration; +import com.netflix.titus.runtime.endpoint.metadata.AnonymousCallMetadataResolver; +import com.netflix.titus.runtime.endpoint.resolver.NoOpHostCallerIdResolver; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import org.junit.rules.ExternalResource; @@ -50,7 +50,7 @@ public class ClusterMembershipServerResource extends ExternalResource { private TitusGrpcServer server; private ManagedChannel channel; - private ReactorClusterMembershipClient client; + private ClusterMembershipClient client; public ClusterMembershipServerResource(ClusterMembershipService service) { this.service = service; @@ -60,17 +60,10 @@ public ClusterMembershipServerResource(ClusterMembershipService service) { protected void before() { when(grpcEndpointConfiguration.getPort()).thenReturn(0); - DefaultGrpcToReactorServerFactory reactorServerFactory = new DefaultGrpcToReactorServerFactory<>( - CallMetadata.class, - () -> CallMetadataConstants.UNDEFINED_CALL_METADATA - ); + DefaultGrpcServerCallAssistant grpcServerCallAssistant = new DefaultGrpcServerCallAssistant(AnonymousCallMetadataResolver.getInstance(), NoOpHostCallerIdResolver.getInstance()); server = TitusGrpcServer.newBuilder(0, titusRuntime) - .withService( - reactorServerFactory.apply( - ClusterMembershipServiceGrpc.getServiceDescriptor(), - new ReactorClusterMembershipGrpcService(service, titusRuntime) - ), - Collections.emptyList() + .withServerConfigurer(builder -> + builder.addService(new GrpcClusterMembershipService(service, grpcServerCallAssistant, titusRuntime)) ) .withExceptionMapper(ClusterMembershipGrpcExceptionMapper.getInstance()) .withShutdownTime(Duration.ZERO) @@ -81,15 +74,11 @@ protected void before() { .usePlaintext() .build(); - this.client = new DefaultGrpcToReactorClientFactory<>( - grpcRequestConfiguration, - (stub, contextOpt) -> stub, - CallMetadata.class - ).apply( - ClusterMembershipServiceGrpc.newStub(channel), - ReactorClusterMembershipClient.class, - ClusterMembershipServiceGrpc.getServiceDescriptor() + GrpcCallAssistantConfiguration configuration = Archaius2Ext.newConfiguration(GrpcCallAssistantConfiguration.class); + DefaultGrpcClientCallAssistantFactory grpcClientCallAssistantFactory = new DefaultGrpcClientCallAssistantFactory( + configuration, AnonymousCallMetadataResolver.getInstance() ); + this.client = new DefaultClusterMembershipClient(grpcClientCallAssistantFactory, channel); } @Override @@ -100,7 +89,7 @@ protected void after() { } } - public ReactorClusterMembershipClient getClient() { + public ClusterMembershipClient getClient() { return client; } diff --git a/titus-common-server/src/test/java/com/netflix/titus/runtime/clustermembership/service/ClusterMembershipConnectorStub.java b/titus-common-server/src/test/java/com/netflix/titus/runtime/clustermembership/service/ClusterMembershipConnectorStub.java index 182d3ea3b9..8e767c8c00 100644 --- a/titus-common-server/src/test/java/com/netflix/titus/runtime/clustermembership/service/ClusterMembershipConnectorStub.java +++ b/titus-common-server/src/test/java/com/netflix/titus/runtime/clustermembership/service/ClusterMembershipConnectorStub.java @@ -27,10 +27,12 @@ import com.netflix.titus.api.clustermembership.model.ClusterMemberLeadershipState; import com.netflix.titus.api.clustermembership.model.ClusterMembershipRevision; import com.netflix.titus.api.clustermembership.model.event.ClusterMembershipEvent; +import com.netflix.titus.api.clustermembership.model.event.ClusterMembershipSnapshotEvent; +import com.netflix.titus.common.util.rx.ReactorExt; import com.netflix.titus.testkit.model.clustermembership.ClusterMemberGenerator; -import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; class ClusterMembershipConnectorStub implements ClusterMembershipConnector { @@ -39,7 +41,7 @@ class ClusterMembershipConnectorStub implements ClusterMembershipConnector { private volatile ClusterMembershipRevision localMemberRevision; private volatile ClusterMembershipRevision localLeadershipRevision; - private final DirectProcessor eventProcessor = DirectProcessor.create(); + private final Sinks.Many eventProcessor = Sinks.many().multicast().directAllOrNothing(); ClusterMembershipConnectorStub() { this.localMemberRevision = ClusterMembershipRevision.newBuilder() @@ -123,10 +125,10 @@ public Mono leaveLeadershipGroup(boolean onlyNonLeader) { @Override public Flux membershipChangeEvents() { - return Flux.defer(() -> Flux. - just(ClusterMembershipEvent.snapshotEvent(Collections.emptyList(), localLeadershipRevision, Optional.empty())) - .concatWith(eventProcessor) - ); + return eventProcessor.asFlux().transformDeferred(ReactorExt.head(() -> { + ClusterMembershipSnapshotEvent snapshot = ClusterMembershipEvent.snapshotEvent(Collections.emptyList(), localLeadershipRevision, Optional.empty()); + return Collections.singletonList(snapshot); + })); } void becomeLeader() { @@ -145,7 +147,7 @@ private ClusterMembershipRevision newLocalLeadershipSta private void emitEvent(ClusterMembershipEvent event) { synchronized (eventProcessor) { - eventProcessor.onNext(event); + eventProcessor.tryEmitNext(event); } } } diff --git a/titus-supplementary-component/task-relocation/src/main/java/com/netflix/titus/supplementary/relocation/endpoint/grpc/TaskRelocationGrpcServerRunner.java b/titus-supplementary-component/task-relocation/src/main/java/com/netflix/titus/supplementary/relocation/endpoint/grpc/TaskRelocationGrpcServerRunner.java index d918cb879e..30d2493d35 100644 --- a/titus-supplementary-component/task-relocation/src/main/java/com/netflix/titus/supplementary/relocation/endpoint/grpc/TaskRelocationGrpcServerRunner.java +++ b/titus-supplementary-component/task-relocation/src/main/java/com/netflix/titus/supplementary/relocation/endpoint/grpc/TaskRelocationGrpcServerRunner.java @@ -24,12 +24,11 @@ import com.netflix.titus.common.runtime.TitusRuntime; import com.netflix.titus.common.util.grpc.reactor.GrpcToReactorServerFactory; -import com.netflix.titus.grpc.protogen.ClusterMembershipServiceGrpc; import com.netflix.titus.grpc.protogen.TaskRelocationServiceGrpc; import com.netflix.titus.runtime.clustermembership.activation.LeaderActivationStatus; -import com.netflix.titus.runtime.clustermembership.endpoint.grpc.GrpcLeaderServerInterceptor; import com.netflix.titus.runtime.clustermembership.endpoint.grpc.ClusterMembershipGrpcExceptionMapper; -import com.netflix.titus.runtime.clustermembership.endpoint.grpc.ReactorClusterMembershipGrpcService; +import com.netflix.titus.runtime.clustermembership.endpoint.grpc.GrpcClusterMembershipService; +import com.netflix.titus.runtime.clustermembership.endpoint.grpc.GrpcLeaderServerInterceptor; import com.netflix.titus.runtime.endpoint.common.grpc.GrpcEndpointConfiguration; import com.netflix.titus.runtime.endpoint.common.grpc.TitusGrpcServer; @@ -41,7 +40,7 @@ public class TaskRelocationGrpcServerRunner { @Inject public TaskRelocationGrpcServerRunner(GrpcEndpointConfiguration configuration, LeaderActivationStatus leaderActivationStatus, - ReactorClusterMembershipGrpcService reactorClusterMembershipGrpcService, + GrpcClusterMembershipService grpcClusterMembershipService, ReactorTaskRelocationGrpcService reactorTaskRelocationGrpcService, GrpcToReactorServerFactory reactorServerFactory, TitusRuntime titusRuntime) { @@ -51,13 +50,7 @@ public TaskRelocationGrpcServerRunner(GrpcEndpointConfiguration configuration, .withShutdownTime(Duration.ofMillis(configuration.getShutdownTimeoutMs())) // Cluster membership service - .withService( - reactorServerFactory.apply( - ClusterMembershipServiceGrpc.getServiceDescriptor(), - reactorClusterMembershipGrpcService - ), - Collections.emptyList() - ) + .withServerConfigurer(builder -> builder.addService(grpcClusterMembershipService)) .withExceptionMapper(ClusterMembershipGrpcExceptionMapper.getInstance()) // Relocation service diff --git a/titus-supplementary-component/task-relocation/src/test/java/com/netflix/titus/supplementary/relocation/integration/TaskRelocationSandbox.java b/titus-supplementary-component/task-relocation/src/test/java/com/netflix/titus/supplementary/relocation/integration/TaskRelocationSandbox.java index 44ec3d59f9..9c4229e1a4 100644 --- a/titus-supplementary-component/task-relocation/src/test/java/com/netflix/titus/supplementary/relocation/integration/TaskRelocationSandbox.java +++ b/titus-supplementary-component/task-relocation/src/test/java/com/netflix/titus/supplementary/relocation/integration/TaskRelocationSandbox.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; import com.netflix.titus.common.util.ExceptionExt; +import com.netflix.titus.common.util.archaius2.Archaius2Ext; import com.netflix.titus.grpc.protogen.TaskRelocationQuery; import com.netflix.titus.grpc.protogen.TaskRelocationServiceGrpc; import com.netflix.titus.grpc.protogen.TaskRelocationServiceGrpc.TaskRelocationServiceBlockingStub; @@ -29,7 +30,11 @@ import com.netflix.titus.runtime.clustermembership.endpoint.grpc.ClusterMembershipGrpcEndpointComponent; import com.netflix.titus.runtime.clustermembership.service.ClusterMembershipServiceComponent; import com.netflix.titus.runtime.connector.common.reactor.GrpcToReactorServerFactoryComponent; +import com.netflix.titus.runtime.endpoint.common.grpc.assistant.GrpcCallAssistantComponent; +import com.netflix.titus.runtime.endpoint.common.grpc.assistant.GrpcCallAssistantConfiguration; import com.netflix.titus.runtime.endpoint.metadata.CallMetadataResolveComponent; +import com.netflix.titus.runtime.endpoint.resolver.HostCallerIdResolver; +import com.netflix.titus.runtime.endpoint.resolver.NoOpHostCallerIdResolver; import com.netflix.titus.runtime.health.AlwaysHealthyComponent; import com.netflix.titus.supplementary.relocation.RelocationConnectorStubs; import com.netflix.titus.supplementary.relocation.RelocationLeaderActivator; @@ -74,6 +79,10 @@ public TaskRelocationSandbox(RelocationConnectorStubs relocationConnectorStubs, container.register(ClusterMembershipGrpcEndpointComponent.class); container.register(LeaderActivationComponent.class); + container.registerBean(HostCallerIdResolver.class, NoOpHostCallerIdResolver::getInstance); + container.register(GrpcCallAssistantComponent.class); + container.registerBean(GrpcCallAssistantConfiguration.class, () -> Archaius2Ext.newConfiguration(GrpcCallAssistantConfiguration.class)); + container.register(CallMetadataResolveComponent.class); container.register(GrpcToReactorServerFactoryComponent.class); container.register(InMemoryRelocationStoreComponent.class); diff --git a/titus-testkit/src/main/java/com/netflix/titus/testkit/grpc/TestStreamObserver.java b/titus-testkit/src/main/java/com/netflix/titus/testkit/grpc/TestStreamObserver.java index a38172ce2b..2b8c9b190a 100644 --- a/titus-testkit/src/main/java/com/netflix/titus/testkit/grpc/TestStreamObserver.java +++ b/titus-testkit/src/main/java/com/netflix/titus/testkit/grpc/TestStreamObserver.java @@ -67,7 +67,6 @@ public void onError(Throwable error) { this.error = error; this.mappedError = exceptionMapper(error); eventSubject.onError(error); - System.out.println("ERROR!!!!!!!!!!!!!!!!!!!!!"); doFinish(); }