Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BFD-3738: Track RDA sequence number latency #2535

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,10 @@ public final class AppConfiguration extends BaseAppConfiguration {
Set.of(
"FissClaimRdaSink.change.latency.millis",
"McsClaimRdaSink.change.latency.millis",
"FissClaimRdaSink.lastSeq",
"McsClaimRdaSink.lastSeq",
"FissClaimRdaSink.maxSeq",
"McsClaimRdaSink.maxSeq",
CcwRifLoadJob.Metrics.DATASET_PROCESSING_ACTIVE_TIMER_NAME,
CcwRifLoadJob.Metrics.DATASET_PROCESSING_TOTAL_TIMER_NAME,
CcwRifLoadJob.Metrics.MANIFEST_PROCESSING_ACTIVE_TIMER_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import gov.cms.bfd.model.rda.MessageError;
import gov.cms.model.dsl.codegen.library.DataTransformer;
import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -191,6 +192,13 @@ Optional<TClaim> transformMessage(String apiVersion, TMessage message)
*/
int writeClaims(Collection<TClaim> objects) throws ProcessingException;

/**
* Updates the available range of sequence numbers.
*
* @param sequenceNumberRange Current range of sequence numbers.
*/
void updateSequenceNumberRange(ClaimSequenceNumberRange sequenceNumberRange);

/**
* Return count of records processed since the most recent call to a write method or this method.
* Calls to this method collect the current value and resets the counter. The sum of this method's
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package gov.cms.bfd.pipeline.rda.grpc.server;

import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import java.util.NoSuchElementException;

/**
Expand Down Expand Up @@ -45,6 +46,11 @@ public T next() throws Exception {
throw new NoSuchElementException();
}

@Override
public ClaimSequenceNumberRange getSequenceNumberRange() {
return ClaimSequenceNumberRange.newBuilder().setLower(0).setUpper(0).build();
}

@Override
public void close() throws Exception {}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package gov.cms.bfd.pipeline.rda.grpc.server;

import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -62,6 +63,11 @@ public T next() throws Exception {
return source.next();
}

@Override
public ClaimSequenceNumberRange getSequenceNumberRange() {
return ClaimSequenceNumberRange.newBuilder().setLower(0).setUpper(remainingBeforeThrow).build();
}

@Override
public void close() throws Exception {
source.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.io.CharSource;
import com.google.protobuf.util.JsonFormat;
import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import gov.cms.mpsm.rda.v1.FissClaimChange;
import gov.cms.mpsm.rda.v1.McsClaimChange;
import java.io.BufferedReader;
Expand Down Expand Up @@ -167,6 +168,12 @@ public T next() throws Exception {
return answer;
}

@Override
public ClaimSequenceNumberRange getSequenceNumberRange() {
// Can't easily support this since it would require deserializing the entire contents
return ClaimSequenceNumberRange.newBuilder().setLower(0).setUpper(0).build();
}

@Override
public void close() throws Exception {
reader.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package gov.cms.bfd.pipeline.rda.grpc.server;

import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;

/**
* Interface for objects that produce message objects from some source (e.g. a file, an array, a
* database, etc). Mirrors the Iterator protocol but allows for unwrapped exceptions to be passed
Expand Down Expand Up @@ -36,4 +38,11 @@ public interface MessageSource<T> extends AutoCloseable {
* @throws Exception if there is an issue getting the next claim
*/
MessageSource<T> skipTo(long startingSequenceNumber) throws Exception;

/**
* Returns the current range of sequence numbers.
*
* @return sequence number range
*/
ClaimSequenceNumberRange getSequenceNumberRange();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.protobuf.Timestamp;
import gov.cms.mpsm.rda.v1.ChangeType;
import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import gov.cms.mpsm.rda.v1.FissClaimChange;
import gov.cms.mpsm.rda.v1.RecordSource;
import java.time.Clock;
Expand Down Expand Up @@ -89,6 +90,11 @@ public FissClaimChange next() {
return change;
}

@Override
public ClaimSequenceNumberRange getSequenceNumberRange() {
return ClaimSequenceNumberRange.newBuilder().setLower(0).setUpper(maxToSend).build();
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.protobuf.Timestamp;
import gov.cms.mpsm.rda.v1.ChangeType;
import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import gov.cms.mpsm.rda.v1.McsClaimChange;
import gov.cms.mpsm.rda.v1.RecordSource;
import java.time.Clock;
Expand Down Expand Up @@ -87,6 +88,11 @@ public McsClaimChange next() {
return change;
}

@Override
public ClaimSequenceNumberRange getSequenceNumberRange() {
return ClaimSequenceNumberRange.newBuilder().setLower(0).setUpper(maxToSend).build();
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,42 @@ public void getVersion(Empty request, StreamObserver<ApiVersion> responseObserve
}
}

@Override
public void getFissClaimsSequenceNumberRange(
com.google.protobuf.Empty request,
StreamObserver<gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange> responseObserver) {
LOGGER.info("start getFissClaimsSequenceNumberRange");
try (MessageSource<FissClaimChange> source = messageSourceFactory.createFissMessageSource(0)) {
responseObserver.onNext(source.getSequenceNumberRange());
responseObserver.onCompleted();
LOGGER.info("end getFissClaimsSequenceNumberRange");
} catch (Exception ex) {
responseObserver.onError(Status.fromThrowable(ex).asException());
LOGGER.error(
"end getFissClaimsSequenceNumberRange call - call failed with exception: message={}",
ex.getMessage(),
ex);
}
}

@Override
public void getMcsClaimsSequenceNumberRange(
com.google.protobuf.Empty request,
StreamObserver<gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange> responseObserver) {
LOGGER.info("start getMcsClaimsSequenceNumberRange");
try (MessageSource<McsClaimChange> source = messageSourceFactory.createMcsMessageSource(0)) {
responseObserver.onNext(source.getSequenceNumberRange());
responseObserver.onCompleted();
LOGGER.info("end getMcsClaimsSequenceNumberRange");
} catch (Exception ex) {
responseObserver.onError(Status.fromThrowable(ex).asException());
LOGGER.error(
"end getMcsClaimsSequenceNumberRange call - call failed with exception: message={}",
ex.getMessage(),
ex);
}
}

@Override
public void getFissClaims(
ClaimRequest request, StreamObserver<FissClaimChange> responseObserver) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.annotations.VisibleForTesting;
import gov.cms.bfd.pipeline.rda.grpc.RdaChange;
import gov.cms.bfd.pipeline.sharedutils.s3.S3DirectoryDao;
import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
Expand Down Expand Up @@ -186,6 +187,11 @@ public synchronized T next() throws Exception {
return current.next();
}

@Override
public ClaimSequenceNumberRange getSequenceNumberRange() {
return current.getSequenceNumberRange();
}

@Override
public synchronized void close() throws Exception {
current.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import gov.cms.bfd.pipeline.sharedutils.MultiCloser;
import gov.cms.bfd.pipeline.sharedutils.SequenceNumberTracker;
import gov.cms.model.dsl.codegen.library.DataTransformer;
import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import jakarta.annotation.Nonnull;
import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -272,6 +273,11 @@ public Optional<Long> readMaxExistingSequenceNumber() throws ProcessingException
return sink.readMaxExistingSequenceNumber();
}

@Override
public void updateSequenceNumberRange(ClaimSequenceNumberRange sequenceNumberRange) {
sink.updateSequenceNumberRange(sequenceNumberRange);
}

/**
* This method is not implemented since that would bypass the queue used to schedule writes.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import gov.cms.bfd.pipeline.sharedutils.PipelineApplicationState;
import gov.cms.bfd.pipeline.sharedutils.TransactionManager;
import gov.cms.model.dsl.codegen.library.DataTransformer;
import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -238,6 +239,11 @@ public int writeClaims(Collection<RdaChange<TClaim>> claims) throws ProcessingEx
return claims.size();
}

@Override
public void updateSequenceNumberRange(ClaimSequenceNumberRange sequenceNumberRange) {
metrics.setMaxSequenceNumber(sequenceNumberRange.getUpper());
}

/**
* Always returns zero since all claims are written synchronously by writeMessages.
*
Expand Down Expand Up @@ -525,7 +531,7 @@ static class Metrics {
/** Tracks the number of updates per database transaction. */
private final DistributionSummary dbBatchSize;

/** Latest sequnce number from writing a batch. * */
/** Latest sequence number from writing a batch. * */
private final AtomicLong latestSequenceNumber;

/** The value returned by the latestSequenceNumber gauge. * */
Expand All @@ -534,6 +540,12 @@ static class Metrics {
/** The number of insert statements executed. */
private final DistributionSummary insertCount;

/** Maximum available sequence number. */
private final AtomicLong maxSequenceNumber;

/** The value returned by the maxSequenceNumber gauge. */
private final AtomicLong maxSequenceNumberValue;

/**
* Initializes all the metrics.
*
Expand All @@ -560,6 +572,9 @@ private Metrics(Class<?> klass, MeterRegistry appMetrics) {
latestSequenceNumber = GAUGES.getGaugeForName(appMetrics, latestSequenceNumberGaugeName);
latestSequenceNumberValue = GAUGES.getValueForName(latestSequenceNumberGaugeName);
insertCount = appMetrics.summary(MetricRegistry.name(base, "insertCount"));
String maxSequenceNumberGaugeName = MetricRegistry.name(base, "maxSeq");
maxSequenceNumber = GAUGES.getGaugeForName(appMetrics, maxSequenceNumberGaugeName);
maxSequenceNumberValue = GAUGES.getValueForName(maxSequenceNumberGaugeName);
}

/**
Expand All @@ -571,5 +586,14 @@ private Metrics(Class<?> klass, MeterRegistry appMetrics) {
void setLatestSequenceNumber(long value) {
latestSequenceNumberValue.set(value);
}

/**
* Sets the {@link #maxSequenceNumber}.
*
* @param value value to set
*/
void setMaxSequenceNumber(long value) {
maxSequenceNumberValue.set(value);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package gov.cms.bfd.pipeline.rda.grpc.source;

import com.google.common.base.Preconditions;
import com.google.protobuf.Empty;
import gov.cms.mpsm.rda.v1.ClaimRequest;
import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import gov.cms.mpsm.rda.v1.FissClaimChange;
import gov.cms.mpsm.rda.v1.RDAServiceGrpc;
import io.grpc.CallOptions;
Expand Down Expand Up @@ -45,4 +47,13 @@ public GrpcResponseStream<FissClaimChange> callService(
ClientCalls.blockingServerStreamingCall(call, request);
return new GrpcResponseStream<>(call, apiResults);
}

@Override
public ClaimSequenceNumberRange callSequenceNumberRangeService(
ManagedChannel channel, CallOptions callOptions) {
final MethodDescriptor<Empty, ClaimSequenceNumberRange> method =
RDAServiceGrpc.getGetFissClaimsSequenceNumberRangeMethod();
final ClientCall<Empty, ClaimSequenceNumberRange> call = channel.newCall(method, callOptions);
return ClientCalls.blockingUnaryCall(call, Empty.getDefaultInstance());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.google.protobuf.Empty;
import gov.cms.bfd.pipeline.rda.grpc.RdaServerJob;
import gov.cms.mpsm.rda.v1.ApiVersion;
import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import gov.cms.mpsm.rda.v1.RDAServiceGrpc;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
Expand Down Expand Up @@ -62,6 +63,16 @@ public abstract GrpcResponseStream<TResponse> callService(
ManagedChannel channel, CallOptions callOptions, long startingSequenceNumber)
throws Exception;

/**
* Calls the service to get the sequence number range.
*
* @param channel an already open channel to the service being called
* @param callOptions the CallOptions object to use for the API call
* @return sequence number range
*/
public abstract ClaimSequenceNumberRange callSequenceNumberRangeService(
ManagedChannel channel, CallOptions callOptions);

/**
* Make a call to the server's {@code getVersion()} service and return the version component. Will
* retry several times if the call fails. Retries allow the job to handle with a race condition
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package gov.cms.bfd.pipeline.rda.grpc.source;

import com.google.common.base.Preconditions;
import com.google.protobuf.Empty;
import gov.cms.mpsm.rda.v1.ClaimRequest;
import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import gov.cms.mpsm.rda.v1.McsClaimChange;
import gov.cms.mpsm.rda.v1.RDAServiceGrpc;
import io.grpc.CallOptions;
Expand Down Expand Up @@ -44,4 +46,13 @@ public GrpcResponseStream<McsClaimChange> callService(
ClientCalls.blockingServerStreamingCall(call, request);
return new GrpcResponseStream<>(call, apiResults);
}

@Override
public ClaimSequenceNumberRange callSequenceNumberRangeService(
ManagedChannel channel, CallOptions callOptions) {
final MethodDescriptor<Empty, ClaimSequenceNumberRange> method =
RDAServiceGrpc.getGetMcsClaimsSequenceNumberRangeMethod();
final ClientCall<Empty, ClaimSequenceNumberRange> call = channel.newCall(method, callOptions);
return ClientCalls.blockingUnaryCall(call, Empty.getDefaultInstance());
}
}
Loading