diff --git a/carbonj.service/Dockerfile b/carbonj.service/Dockerfile index 1d8c6916..c5b1d79a 100644 --- a/carbonj.service/Dockerfile +++ b/carbonj.service/Dockerfile @@ -18,8 +18,8 @@ RUN yum update -y && \ yum install -y gcc-c++ gcc make libtool automake autoconf make python3-devel && \ rpm --import http://repos.azulsystems.com/RPM-GPG-KEY-azulsystems && \ yum install -y https://cdn.azul.com/zulu/bin/zulu-repo-1.0.0-1.noarch.rpm && \ - yum install -y https://mirror.stream.centos.org/9-stream/AppStream/$(uname -m)/os/Packages/pcp-conf-6.2.1-1.el9.$(uname -m).rpm && \ - yum install -y https://mirror.stream.centos.org/9-stream/AppStream/$(uname -m)/os/Packages/pcp-libs-6.2.1-1.el9.$(uname -m).rpm && \ + yum install -y https://mirror.stream.centos.org/9-stream/AppStream/$(uname -m)/os/Packages/pcp-conf-6.2.2-6.el9.$(uname -m).rpm && \ + yum install -y https://mirror.stream.centos.org/9-stream/AppStream/$(uname -m)/os/Packages/pcp-libs-6.2.2-6.el9.$(uname -m).rpm && \ # # If sysstat version is updated, confirm iolog.sh execution and update associated version check in entrypoint.sh # diff --git a/carbonj.service/build.gradle b/carbonj.service/build.gradle index 595f7b56..7cc3efe0 100644 --- a/carbonj.service/build.gradle +++ b/carbonj.service/build.gradle @@ -201,8 +201,12 @@ dependencies { implementation group: 'io.netty', name: 'netty-all', version: "${nettyAll}" implementation group: 'net.razorvine', name: 'pickle', version: "${pickle}" implementation group: 'org.python', name: 'jython-standalone', version: "${jythonStandalone}" - implementation group: 'com.amazonaws', name: 'amazon-kinesis-client', version: "${amazonKinesisClient}" - implementation group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: "${awsJavaSdkV1}" + implementation group: 'org.apache.httpcomponents', name: 'httpclient', version: "${httpClient}" + implementation group: 'software.amazon.kinesis', name: 'amazon-kinesis-client', version: "${awsKinesisClient}" + implementation group: 'software.amazon.awssdk', name: 'kinesis', version: "${awsJavaSdkV2}" + implementation group: 'software.amazon.awssdk', name: 'sts', version: "${awsJavaSdkV2}" + implementation group: 'software.amazon.awssdk', name: 'dynamodb', version: "${awsJavaSdkV2}" + implementation group: 'software.amazon.awssdk', name: 'netty-nio-client', version: "${awsJavaSdkV2}" implementation group: 'io.dropwizard.metrics', name: 'metrics-core', version: "${metrics}" implementation group: 'io.dropwizard.metrics', name: 'metrics-jvm', version: "${metrics}" implementation group: 'io.dropwizard.metrics', name: 'metrics-graphite', version: "${metrics}" diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/AWSCredential.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/AWSCredential.java deleted file mode 100644 index ed6f76c2..00000000 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/AWSCredential.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Copyright (c) 2018, salesforce.com, inc. - * All rights reserved. - * SPDX-License-Identifier: BSD-3-Clause - * For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause - */ -package com.demandware.carbonj.service.engine; - -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.context.annotation.Condition; -import org.springframework.context.annotation.ConditionContext; -import org.springframework.core.type.AnnotatedTypeMetadata; - -public class AWSCredential implements Condition { - - private static Logger log = LoggerFactory.getLogger(AWSCredential.class); - @Override - public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { - try - { - AWSCredentials credentials = new DefaultAWSCredentialsProviderChain().getCredentials(); - } - catch (Exception e) - { - log.error( - "Cannot load the credentials from the credential profiles file. " + - "Please make sure that your credentials file is at the correct " + - "location (~/.aws/credentials), and is in valid format."+ - "Or attach an IAM Access Role to the instance to provide temporary authentication"+ - e.getMessage(), e); - return false; - } - return true; - } -} diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/Consumers.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/Consumers.java index a840e050..173dfb5a 100644 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/Consumers.java +++ b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/Consumers.java @@ -12,6 +12,13 @@ import com.demandware.carbonj.service.ns.NamespaceCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.kinesis.common.ConfigsBuilder; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.processor.SingleStreamTracker; +import software.amazon.kinesis.processor.StreamTracker; import java.io.File; import java.io.FileInputStream; @@ -25,6 +32,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; public class Consumers { @@ -36,32 +44,40 @@ public class Consumers { private final PointProcessor pointProcessor; private final KinesisConfig kinesisConfig; - private final CheckPointMgr checkPointMgr; private final ConsumerRules consumerRules; private final Map consumers; - private final String kinesisConsumerRegion; - - private final PointProcessor recoveryPointProcessor; - private final NamespaceCounter namespaceCounter; private final File indexNameSyncDir; - Consumers(MetricRegistry metricRegistry, PointProcessor pointProcessor, PointProcessor recoveryPointProcessor, File rulesFile, - KinesisConfig kinesisConfig, CheckPointMgr checkPointMgr, String kinesisConsumerRegion, - NamespaceCounter namespaceCounter, File indexNameSyncDir) { + private final String activeProfile; + + private final KinesisAsyncClient kinesisAsyncClient; + + private final DynamoDbAsyncClient dynamoDbAsyncClient; + + private final CloudWatchAsyncClient cloudWatchAsyncClient; + + private final int kinesisConsumerRetroSeconds; + + Consumers(MetricRegistry metricRegistry, PointProcessor pointProcessor, File rulesFile, KinesisConfig kinesisConfig, + NamespaceCounter namespaceCounter, File indexNameSyncDir, String activeProfile, + KinesisAsyncClient kinesisAsyncClient, DynamoDbAsyncClient dynamoDbAsyncClient, + CloudWatchAsyncClient cloudWatchAsyncClient, int kinesisConsumerRetroSeconds) { this.metricRegistry = metricRegistry; this.pointProcessor = pointProcessor; - this.recoveryPointProcessor = recoveryPointProcessor; this.kinesisConfig = kinesisConfig; - this.checkPointMgr = checkPointMgr; - this.kinesisConsumerRegion = kinesisConsumerRegion; this.namespaceCounter = namespaceCounter; this.indexNameSyncDir = indexNameSyncDir; + this.activeProfile = activeProfile; + this.kinesisAsyncClient = kinesisAsyncClient; + this.dynamoDbAsyncClient = dynamoDbAsyncClient; + this.cloudWatchAsyncClient = cloudWatchAsyncClient; + this.kinesisConsumerRetroSeconds = kinesisConsumerRetroSeconds; consumers = new ConcurrentHashMap<>(); consumerRules = new ConsumerRules(rulesFile); reload(); @@ -113,6 +129,7 @@ private void reconfigureConsumers(Set newRules, Set currentRules /* create new consumers */ // we use the host name to generate the kinesis application name as they are stable for stable set pods. String hostName = getHostName(); + Date kinesisConsumerRetroDate = new Date(System.currentTimeMillis() - kinesisConsumerRetroSeconds * 1000L); for (String consumerName : newRules) { log.info(String.format("Creating new consumer with kinesis stream name: %s", consumerName)); @@ -139,8 +156,12 @@ private void reconfigureConsumers(Set newRules, Set currentRules } Counter initRetryCounter = metricRegistry.counter(MetricRegistry.name("kinesis.consumer." + kinesisStreamName + ".initRetryCounter")); - KinesisConsumer kinesisConsumer = new KinesisConsumer(metricRegistry, pointProcessor, recoveryPointProcessor, kinesisStreamName, - kinesisApplicationName, kinesisConfig, checkPointMgr, initRetryCounter, kinesisConsumerRegion); + StreamTracker streamTracker = new SingleStreamTracker(kinesisStreamName, + InitialPositionInStreamExtended.newInitialPositionAtTimestamp(kinesisConsumerRetroDate)); + ConfigsBuilder configsBuilder = new ConfigsBuilder(streamTracker, kinesisApplicationName, kinesisAsyncClient, + dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), + new KinesisRecordProcessorFactory(metricRegistry, pointProcessor, kinesisConfig, kinesisStreamName)); + KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisStreamName, kinesisApplicationName, initRetryCounter, configsBuilder); log.info(String.format("New Consumer created with name %s", kinesisStreamName)); newConsumers.add(consumerName); consumers.put(consumerName, kinesisConsumer); @@ -171,7 +192,7 @@ private String getHostName() { } private String getKinesisApplicationName(String streamName, String hostName) { - return streamName + "-" + hostName; + return streamName + "-" + hostName + "-" + activeProfile; } private void close(Set consumerSet) { diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/DynamoDbCheckPointMgr.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/DynamoDbCheckPointMgr.java index 720123dc..1c653c60 100644 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/DynamoDbCheckPointMgr.java +++ b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/DynamoDbCheckPointMgr.java @@ -6,23 +6,25 @@ */ package com.demandware.carbonj.service.engine; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.document.DynamoDB; -import com.amazonaws.services.dynamodbv2.document.Table; -import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; -import com.amazonaws.services.dynamodbv2.model.AttributeValue; -import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; -import com.amazonaws.services.dynamodbv2.model.GetItemRequest; -import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; -import com.amazonaws.services.dynamodbv2.model.KeyType; -import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; -import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; +import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse; +import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; +import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; +import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; +import software.amazon.awssdk.services.dynamodb.model.KeyType; +import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; +import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; import java.util.Date; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; public class DynamoDbCheckPointMgr implements CheckPointMgr { @@ -31,71 +33,74 @@ public class DynamoDbCheckPointMgr implements CheckPointMgr { private final String tableName; private final int defaultOffsetMins; - private final AmazonDynamoDB client; - private final DynamoDB dynamoDB; + private final DynamoDbAsyncClient client; + private final int checkPointDynamodbTimout; - public DynamoDbCheckPointMgr(AmazonDynamoDB client, String kinesisApplicationName, int defaultOffsetMins, - int provisionedThroughput) throws Exception { + public DynamoDbCheckPointMgr(DynamoDbAsyncClient client, String kinesisApplicationName, int defaultOffsetMins, + int provisionedThroughput, int checkPointDynamodbTimout) throws Exception { this.client = client; - this.dynamoDB = new DynamoDB(client); this.defaultOffsetMins = defaultOffsetMins; this.tableName = "checkpoints-" + kinesisApplicationName; - if (!DynamoDbUtils.isTablePresent(dynamoDB, tableName)) { + this.checkPointDynamodbTimout = checkPointDynamodbTimout; + if (!DynamoDbUtils.isTablePresent(client, tableName, checkPointDynamodbTimout)) { createTable(tableName, provisionedThroughput); } } private void createTable(String tableName, int provisionedThroughput) throws Exception { - CreateTableRequest request = new CreateTableRequest() - .withAttributeDefinitions( - new AttributeDefinition("checkPointType", ScalarAttributeType.S)) - .withKeySchema( - new KeySchemaElement("checkPointType", KeyType.HASH)) - .withProvisionedThroughput( - new ProvisionedThroughput((long)provisionedThroughput, (long)provisionedThroughput)) - .withTableName(tableName); + CreateTableRequest request = CreateTableRequest.builder() + .tableName(tableName) + .attributeDefinitions(AttributeDefinition.builder().attributeName("checkPointType").attributeType(ScalarAttributeType.S).build()) + .keySchema(KeySchemaElement.builder().attributeName("checkPointType").keyType(KeyType.HASH).build()) + .provisionedThroughput(ProvisionedThroughput.builder() + .readCapacityUnits((long)provisionedThroughput) + .writeCapacityUnits((long)provisionedThroughput) + .build()) + .build(); log.info("Issuing CreateTable request for " + tableName); - Table newlyCreatedTable = dynamoDB.createTable(request); + CompletableFuture createTableResponse = this.client.createTable(request); log.info("Waiting for " + tableName + " to be created...this may take a while..."); - newlyCreatedTable.waitForActive(); + createTableResponse.get(checkPointDynamodbTimout, TimeUnit.SECONDS); } @Override public void checkPoint(Date checkPoint) throws Exception { - Table table = dynamoDB.getTable(tableName); - HashMap expressionAttributeNames = new HashMap<>(); + Map expressionAttributeNames = new HashMap<>(); expressionAttributeNames.put("#V", "checkPointValue"); - HashMap expressionAttributeValues = new HashMap<>(); - expressionAttributeValues.put(":val1", checkPoint.getTime()); + Map expressionAttributeValues = new HashMap<>(); + expressionAttributeValues.put(":val1", AttributeValue.builder().n(String.valueOf(checkPoint.getTime())).build()); - table.updateItem( - "checkPointType", // key attribute name - "timestamp", // key attribute value - "set #V = :val1", // UpdateExpression - expressionAttributeNames, - expressionAttributeValues); + client.updateItem(UpdateItemRequest.builder() + .tableName(tableName) + .key(Map.of("checkPointType", AttributeValue.builder().s("timestamp").build())) + .updateExpression("set #V = :val1") + .expressionAttributeNames(expressionAttributeNames) + .expressionAttributeValues(expressionAttributeValues).build()); } @Override public Date lastCheckPoint() throws Exception { - HashMap keyToGet = new HashMap(); - keyToGet.put( "checkPointType", new AttributeValue( "timestamp") ); - GetItemRequest request = new GetItemRequest() - .withKey( keyToGet ) - .withTableName( tableName ); - - Map item = client.getItem( request ).getItem(); - if( item == null ) { + + GetItemRequest request = GetItemRequest.builder() + .tableName(tableName) + .key(Map.of("checkPointType", AttributeValue.builder().s("timestamp").build())) + .build(); + + GetItemResponse getItemResponse = this.client.getItem(request).get(checkPointDynamodbTimout, TimeUnit.SECONDS); + + if (!getItemResponse.hasItem()) { return getDefaultCheckPoint(); } - String value = item.get( "checkPointValue" ).getN(); + + Map item = getItemResponse.item(); + String value = item.get("checkPointValue").n(); if( value == null ) { return getDefaultCheckPoint(); } - return new Date( Long.parseLong( value ) ); + return new Date(Long.parseLong(value)); } private Date getDefaultCheckPoint() { @@ -104,4 +109,3 @@ private Date getDefaultCheckPoint() { return checkPoint; } } - diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/DynamoDbUtils.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/DynamoDbUtils.java index 482eb1c4..7ec26359 100644 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/DynamoDbUtils.java +++ b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/DynamoDbUtils.java @@ -6,35 +6,33 @@ */ package com.demandware.carbonj.service.engine; -import com.amazonaws.services.dynamodbv2.document.DynamoDB; -import com.amazonaws.services.dynamodbv2.document.Table; -import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException; -import com.amazonaws.services.dynamodbv2.model.TableDescription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; +import software.amazon.awssdk.services.dynamodb.model.TableDescription; +import software.amazon.awssdk.services.dynamodb.model.TableStatus; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public class DynamoDbUtils { private static final Logger log = LoggerFactory.getLogger(DynamoDbUtils.class); - public static boolean isTablePresent(DynamoDB dynamoDB, String tableName) { - Table table = dynamoDB.getTable(tableName); + public static boolean isTablePresent(DynamoDbAsyncClient client, String tableName, int checkPointDynamodbTimout) throws InterruptedException { + DescribeTableRequest describeTableRequest = DescribeTableRequest.builder().tableName(tableName).build(); try { - TableDescription tableDescription = table.describe(); - return "ACTIVE".equals(tableDescription.getTableStatus()); - } catch (ResourceNotFoundException e) { + DescribeTableResponse describeTableResponse = client.describeTable(describeTableRequest) + .get(checkPointDynamodbTimout, TimeUnit.SECONDS); + TableDescription tableDescription = describeTableResponse.table(); + return tableDescription.tableStatus() == TableStatus.ACTIVE; + } catch (ExecutionException | TimeoutException e) { log.warn("kinesis consumer table '" + tableName + "' not found!"); return false; } } - - public static void deleteTable(DynamoDB dynamoDB, String tableName) throws InterruptedException { - Table table = dynamoDB.getTable(tableName); - table.delete(); - - log.info("Waiting for " + tableName + " to be deleted...this may take a while..."); - - table.waitForDelete(); - } } diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/KinesisConsumer.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/KinesisConsumer.java index 8455149d..1cda30df 100644 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/KinesisConsumer.java +++ b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/KinesisConsumer.java @@ -6,163 +6,65 @@ */ package com.demandware.carbonj.service.engine; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import com.codahale.metrics.Counter; import com.codahale.metrics.MetricRegistry; -import com.demandware.carbonj.service.engine.kinesis.GzipDataPointCodec; -import com.demandware.carbonj.service.engine.kinesis.kcl.MemLeaseManager; -import com.demandware.carbonj.service.engine.recovery.*; -import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.kinesis.common.ConfigsBuilder; +import software.amazon.kinesis.coordinator.Scheduler; -import java.util.*; -import java.util.concurrent.TimeUnit; +import java.util.Date; public class KinesisConsumer extends Thread { private static final Logger log = LoggerFactory.getLogger(KinesisConsumer.class); - private final MetricRegistry metricRegistry; - - private final PointProcessor pointProcessor; - private final String kinesisStreamName; private final String kinesisApplicationName; - private final KinesisConfig kinesisConfig; - private final CheckPointMgr checkPointMgr; + private final ConfigsBuilder configsBuilder; + private Scheduler scheduler; private final Counter noOfRestarts; - private Worker worker; - - private PointProcessor recoveryPointProcessor; - private volatile boolean closed; - private String kinesisConsumerRegion; - - public KinesisConsumer(MetricRegistry metricRegistry, PointProcessor pointProcessor, PointProcessor recoveryPointProcessor, - String kinesisStreamName, String kinesisApplicationName, - KinesisConfig kinesisConfig, CheckPointMgr checkPointMgr, - Counter noOfRestarts, String kinesisConsumerRegion) { - this.metricRegistry = metricRegistry; - this.pointProcessor = Preconditions.checkNotNull(pointProcessor); - this.recoveryPointProcessor = recoveryPointProcessor; + public KinesisConsumer(String kinesisStreamName, String kinesisApplicationName, + Counter noOfRestarts, ConfigsBuilder configsBuilder) { this.kinesisStreamName = kinesisStreamName; this.kinesisApplicationName = kinesisApplicationName; - this.kinesisConfig = kinesisConfig; - this.checkPointMgr = checkPointMgr; this.noOfRestarts = noOfRestarts; - this.kinesisConsumerRegion = kinesisConsumerRegion; - log.info("Kinesis consumer started"); + this.configsBuilder = configsBuilder; + this.scheduler = new Scheduler(configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), + configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), + configsBuilder.processorConfig(), configsBuilder.retrievalConfig()); + log.info("Kinesis consumer {} with name {} started", kinesisStreamName, kinesisApplicationName); this.start(); } public void run () { while (!closed) { - try { - AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain(); - String workerId = kinesisApplicationName + "-worker"; - - if (kinesisConfig.isRecoveryEnabled()) { - initCatchupKinesisClient(); - } - - KinesisClientLibConfiguration kinesisClientLibConfiguration = - new KinesisClientLibConfiguration(kinesisApplicationName, kinesisStreamName, credentialsProvider, - workerId) - .withInitialPositionInStream(InitialPositionInStream.LATEST) - .withFailoverTimeMillis(kinesisConfig.getLeaseExpirationTimeInSecs() * 1000) - .withRegionName(kinesisConsumerRegion); - - int maxRecords = kinesisConfig.getMaxRecords(); - if (maxRecords > 0) { - kinesisClientLibConfiguration.withMaxRecords(maxRecords); - } - - log.info(" Kinesis Client Library started with application name " + kinesisApplicationName + " with stream " - + kinesisStreamName + " and worker id is " + workerId); - - IRecordProcessorFactory recordProcessorFactory = new KinesisRecordProcessorFactory(metricRegistry, pointProcessor, - kinesisConfig, kinesisStreamName); - worker = new Worker.Builder() - .recordProcessorFactory(recordProcessorFactory) - .config(kinesisClientLibConfiguration) - .leaseManager(new MemLeaseManager(kinesisConfig.getLeaseTakerDelayInMillis())) - .build(); - worker.run(); - } catch (Throwable t) { - log.error("Error in initializing kinesis consumer", t); - - shutdownQuietly(worker); - + scheduler.run(); + while (!scheduler.shutdownComplete()) { + log.info("Kinesis consumer {} with name {} shutting down", kinesisStreamName, kinesisApplicationName); try { - Thread.sleep(TimeUnit.SECONDS.toMillis(kinesisConfig.getInitRetryTimeInSecs())); - } catch (InterruptedException e) { - log.error("wait interrupted", e); + Thread.sleep(1000); + } catch (InterruptedException ignored) { } - - noOfRestarts.inc(); } + this.scheduler = new Scheduler(configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), + configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), + configsBuilder.processorConfig(), configsBuilder.retrievalConfig()); + noOfRestarts.inc(); } } - private void shutdownQuietly(Worker worker) { - try { - if (worker != null) { - worker.shutdown(); - } - } catch (Throwable throwable) { - log.error("worker shutdown failed!", throwable); - } - } - - private void initCatchupKinesisClient() throws Exception { - log.info("Initializing kinesis recovery processing.."); - GapsTable gapsTable; - if( kinesisConfig.getCheckPointProvider() == KinesisRecoveryProvider.DYNAMODB ) { - gapsTable = new DynamoDbGapsTableImpl( AmazonDynamoDBClientBuilder.standard().build(), kinesisApplicationName, kinesisConfig.getGapsTableProvisionedThroughput() ); - } else { - gapsTable = new FileSystemGapsTableImpl(kinesisConfig.getCheckPointDir()); - } - - AmazonKinesis kinesisClient = AmazonKinesisClientBuilder.defaultClient(); - long gapStartTimeInMillis = checkPointMgr.lastCheckPoint().getTime(); - long gapEndTimeInMillis = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2); - - // if the carbonj had restarted before any new checkpoint is committed, avoid overlapping gaps. - List gaps = gapsTable.getGaps(); - int noOfGaps = gaps.size(); - if (noOfGaps > 0) { - Gap lastGap = gaps.get(noOfGaps - 1); - long lastGapEndTimeInMillis = lastGap.endTime().getTime(); - if (lastGapEndTimeInMillis > gapStartTimeInMillis) { - gapStartTimeInMillis = lastGapEndTimeInMillis; - } - } - - gapsTable.add(new GapImpl(new Date(gapStartTimeInMillis), new Date(gapEndTimeInMillis))); - - RecoveryManager recoveryManager = new RecoveryManager(metricRegistry, gapsTable, kinesisStreamName, recoveryPointProcessor, kinesisClient, - kinesisConfig.getRecoveryIdleTimeMillis(), kinesisConfig.getRetryTimeInMillis(), - new GzipDataPointCodec()); - new Thread(recoveryManager).start(); - } - void closeQuietly() { closed = true; - shutdownQuietly(worker); + if (scheduler != null) { + scheduler.shutdown(); + } log.info(String.format("Kinesis stream %s consumer stopped", kinesisStreamName)); } diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/KinesisRecordProcessor.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/KinesisRecordProcessor.java index 1f5939b2..b7c070d4 100644 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/KinesisRecordProcessor.java +++ b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/KinesisRecordProcessor.java @@ -6,25 +6,32 @@ */ package com.demandware.carbonj.service.engine; -import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; -import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; -import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; -import com.amazonaws.services.kinesis.model.Record; -import com.codahale.metrics.*; +import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; import com.demandware.carbonj.service.engine.kinesis.DataPointCodec; import com.demandware.carbonj.service.engine.kinesis.DataPoints; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.kinesis.exceptions.InvalidStateException; +import software.amazon.kinesis.exceptions.ShutdownException; +import software.amazon.kinesis.exceptions.ThrottlingException; +import software.amazon.kinesis.lifecycle.events.InitializationInput; +import software.amazon.kinesis.lifecycle.events.LeaseLostInput; +import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; +import software.amazon.kinesis.lifecycle.events.ShardEndedInput; +import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; +import software.amazon.kinesis.processor.RecordProcessorCheckpointer; +import software.amazon.kinesis.processor.ShardRecordProcessor; +import software.amazon.kinesis.retrieval.KinesisClientRecord; -import java.nio.ByteBuffer; import java.util.List; -public class KinesisRecordProcessor implements IRecordProcessor { +public class KinesisRecordProcessor implements ShardRecordProcessor { - private static Logger log = LoggerFactory.getLogger(KinesisRecordProcessor.class); + private static final Logger log = LoggerFactory.getLogger(KinesisRecordProcessor.class); private final MetricRegistry metricRegistry; @@ -78,16 +85,17 @@ public class KinesisRecordProcessor implements IRecordProcessor { leaseLostCount = metricRegistry.counter(MetricRegistry.name("kinesis", "lostLease")); } - public void initialize(String shardId) { - log.info("Initializing record processor for shard: " + shardId); - this.kinesisShardId = shardId; + @Override + public void initialize(InitializationInput initializationInput) { + this.kinesisShardId = initializationInput.shardId(); + log.info("Initializing record processor for shard: " + this.kinesisShardId); // metrics to track number of records received per shard. MetricRegistry registry = metricRegistry; recordsFetchedPerShardCounter = registerCounter(registry, - MetricRegistry.name("kinesis", kinesisStreamName, shardId, "received")); + MetricRegistry.name("kinesis", kinesisStreamName, this.kinesisShardId, "received")); noOfFetchesPerShardCounter = registerCounter(registry, - MetricRegistry.name("kinesis", kinesisStreamName, shardId, "fetch")); + MetricRegistry.name("kinesis", kinesisStreamName, this.kinesisShardId, "fetch")); } private Counter registerCounter(MetricRegistry registry, String counterName) { @@ -95,21 +103,23 @@ private Counter registerCounter(MetricRegistry registry, String counterName) { return registry.counter(counterName); } - public void processRecords(List records, IRecordProcessorCheckpointer checkpointer) { + @Override + public void processRecords(ProcessRecordsInput processRecordsInput) { + List records = processRecordsInput.records(); recordsFetchedPerShardCounter.inc(records.size()); noOfFetchesPerShardCounter.inc(); processRecordsWithRetries(records); // Checkpoint once every checkpoint interval. if (System.currentTimeMillis() > nextCheckpointTimeInMillis) { - checkpoint(checkpointer); + checkpoint(processRecordsInput.checkpointer()); nextCheckpointTimeInMillis = System.currentTimeMillis() + kinesisConfig.getCheckPointIntervalMillis(); } } - private void processRecordsWithRetries(List records) { + private void processRecordsWithRetries(List records) { long receiveTimeStamp = System.currentTimeMillis(); - for (Record record : records) { + for (KinesisClientRecord record : records) { final Timer.Context timerContext = consumerTimer.time(); boolean processedSuccessfully = false; for (int i = 0; i < NUM_RETRIES; i++) { @@ -140,9 +150,10 @@ private void processRecordsWithRetries(List records) { } } - private void processSingleRecord(Record record, long receiveTimeStamp) { - ByteBuffer data = record.getData(); - DataPoints dataPoints = codec.decode(data.array()); + private void processSingleRecord(KinesisClientRecord record, long receiveTimeStamp) { + byte[] array = new byte[record.data().remaining()]; + record.data().get(array); + DataPoints dataPoints = codec.decode(array); List dataPointList = dataPoints.getDataPoints(); long latencyTime = receiveTimeStamp - dataPoints.getTimeStamp(); @@ -155,16 +166,13 @@ private void processSingleRecord(Record record, long receiveTimeStamp) { pointProcessor.process(dataPointList); } - // @Override - public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { + @Override + public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { log.info("Shutting down record processor for shard: " + kinesisShardId); - // Important to checkpoint after reaching end of shard, so we can start processing data from child shards. - if (reason == ShutdownReason.TERMINATE) { - checkpoint(checkpointer); - } + checkpoint(shutdownRequestedInput.checkpointer()); } - private void checkpoint(IRecordProcessorCheckpointer checkpointer) { + private void checkpoint(RecordProcessorCheckpointer checkpointer) { for (int i = 0; i < NUM_RETRIES; i++) { try { checkpointer.checkpoint(); @@ -192,5 +200,19 @@ private void checkpoint(IRecordProcessorCheckpointer checkpointer) { } } } -} + @Override + public void leaseLost(LeaseLostInput leaseLostInput) { + log.warn("Lease has been lost. No longer able to checkpoint."); + } + + @Override + public void shardEnded(ShardEndedInput shardEndedInput) { + try { + shardEndedInput.checkpointer().checkpoint(); + log.info("Shard completed and checkpoint written."); + } catch (InvalidStateException | ShutdownException e) { + log.error("Shard ended. Problem writing checkpoint.", e); + } + } +} diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/KinesisRecordProcessorFactory.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/KinesisRecordProcessorFactory.java index 8d9a7842..d2c563ac 100644 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/KinesisRecordProcessorFactory.java +++ b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/KinesisRecordProcessorFactory.java @@ -6,18 +6,17 @@ */ package com.demandware.carbonj.service.engine; - -import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; import com.demandware.carbonj.service.engine.kinesis.DataPointCodec; import com.demandware.carbonj.service.engine.kinesis.GzipDataPointCodec; +import software.amazon.kinesis.processor.ShardRecordProcessor; +import software.amazon.kinesis.processor.ShardRecordProcessorFactory; -public class KinesisRecordProcessorFactory implements IRecordProcessorFactory { +public class KinesisRecordProcessorFactory implements ShardRecordProcessorFactory { private final MetricRegistry metricRegistry; @@ -68,7 +67,8 @@ public class KinesisRecordProcessorFactory implements IRecordProcessorFactory { codec = new GzipDataPointCodec(); } - public IRecordProcessor createProcessor() { + @Override + public ShardRecordProcessor shardRecordProcessor() { return new KinesisRecordProcessor(metricRegistry, pointProcessor, metricsReceived, messagesReceived, pointsPerTask, kinesisConfig, messagesRetry, dropped, taskCount, consumerTimer, latency, codec, streamName); } diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/cfgAws.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/cfgAws.java new file mode 100644 index 00000000..f1164e94 --- /dev/null +++ b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/cfgAws.java @@ -0,0 +1,74 @@ +/** + * Copyright (c) 2018, salesforce.com, inc. + * All rights reserved. + * SPDX-License-Identifier: BSD-3-Clause + * For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ +package com.demandware.carbonj.service.engine; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider; +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.http.nio.netty.Http2Configuration; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder; + +import java.time.Duration; + +@Configuration +public class cfgAws { + private static final int INITIAL_WINDOW_SIZE_BYTES = 512 * 1024; // 512 KB + private static final long HEALTH_CHECK_PING_PERIOD_MILLIS = 60 * 1000; + + @Value("${aws.region:us-east-1}") + private String region; + + @Value("#{new Long('${aws.kinesis.processor.requestTimeoutMillis:30000}')}") + private long kinesisProcessorRequestTimeoutMillis; + + @Bean + @Profile("!test") + public AwsCredentialsProvider awsCredentialsProvider() { + return InstanceProfileCredentialsProvider.builder().build(); + } + + @Bean + @Profile("!test") + public KinesisAsyncClient kinesisAsyncClient(AwsCredentialsProvider awsCredentialsProvider) { + // https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/http-configuration-netty.html + KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder() + .credentialsProvider(awsCredentialsProvider).region(Region.of(region)); + return kinesisAsyncClientBuilder.httpClientBuilder( + NettyNioAsyncHttpClient.builder() + .connectionTimeout(Duration.ofMillis(kinesisProcessorRequestTimeoutMillis)) + .readTimeout(Duration.ofMillis(kinesisProcessorRequestTimeoutMillis)) + .maxConcurrency(Integer.MAX_VALUE) + .http2Configuration(Http2Configuration.builder().initialWindowSize(INITIAL_WINDOW_SIZE_BYTES) + .healthCheckPingPeriod(Duration.ofMillis(HEALTH_CHECK_PING_PERIOD_MILLIS)) + .build()) + .protocol(Protocol.HTTP2)) + .build(); + } + + @Bean + @Profile("!test") + public DynamoDbAsyncClient dynamoDbAsyncClient(AwsCredentialsProvider awsCredentialsProvider) { + return DynamoDbAsyncClient.builder().credentialsProvider(awsCredentialsProvider) + .region(Region.of(region)).build(); + } + + @Bean + @Profile("!test") + public CloudWatchAsyncClient cloudWatchAsyncClient(AwsCredentialsProvider awsCredentialsProvider) { + return CloudWatchAsyncClient.builder().credentialsProvider(awsCredentialsProvider) + .region(Region.of(region)).build(); + } +} diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/cfgCarbonJ.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/cfgCarbonJ.java index 4cbdcc0e..39d5f418 100644 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/cfgCarbonJ.java +++ b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/cfgCarbonJ.java @@ -49,6 +49,9 @@ import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; +import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import javax.annotation.PostConstruct; import java.io.File; @@ -63,7 +66,8 @@ @Configuration @Import( { cfgMetric.class, cfgTimeSeriesStorage.class, cfgHostnameOverride.class, cfgCentralThreadPools.class, - cfgStrings.class, cfgAccumulator.class, cfgNamespaces.class, cfgKinesis.class, cfgEventBus.class, cfgCheckPointMgr.class } ) + cfgStrings.class, cfgAccumulator.class, cfgNamespaces.class, cfgKinesis.class, cfgEventBus.class, + cfgCheckPointMgr.class, cfgAws.class } ) public class cfgCarbonJ { private static final Logger log = LoggerFactory.getLogger( cfgCarbonJ.class ); @@ -175,9 +179,6 @@ public class cfgCarbonJ @Value( "${metrics.store.dataDir:data}" ) private String dataDir = null; - @Value( "${kinesis.consumer.region:us-east-1}" ) - private String kinesisConsumerRegion = "us-east-1"; - @Value( "${kinesis.relay.region:us-east-1}" ) private String kinesisRelayRegion = "us-east-1"; @@ -187,11 +188,6 @@ public class cfgCarbonJ @Value( "${kinesis.relay.role:}" ) private String kinesisRelayRole; - /** - * Config server properties - */ - @Value( "${configServer.enabled:false}" ) private boolean configServerEnabled; - @Value( "${configServer.registrationSeconds:30}" ) private int configServerRegistrationSeconds; @Value( "${configServer.baseUrl:http://localhost:8081}" ) private String configServerBaseUrl; @@ -207,6 +203,12 @@ public class cfgCarbonJ @Value("${metrics.store.sync.secondary.db:false}") private boolean syncSecondaryDb; + @Value("${spring.profiles.active:prd}") + private String activeProfile; + + @Value("${kinesis.consumer.retroSeconds:600}") + private int kinesisConsumerRetroSeconds = 600; + @Bean public RestTemplate restTemplate() { return new RestTemplate(); @@ -413,18 +415,24 @@ Consumer dataPointSink( @Qualifier( "dataPointSinkRelay" ) Relay r ) @Autowired( required = false ) CheckPointMgr checkPointMgr; + @Autowired + private KinesisAsyncClient kinesisAsyncClient; + + @Autowired + private DynamoDbAsyncClient dynamoDbAsyncClient; + + @Autowired + private CloudWatchAsyncClient cloudWatchAsyncClient; + @Bean @ConditionalOnProperty(name = "rocksdb.readonly", havingValue = "false", matchIfMissing = true) - Consumers consumer( PointProcessor pointProcessor, - @Qualifier( "recoveryPointProcessor" ) PointProcessor recoveryPointProcessor, - ScheduledExecutorService s, KinesisConfig kinesisConfig, NamespaceCounter nsCounter ) - { + Consumers consumer( PointProcessor pointProcessor, ScheduledExecutorService s, KinesisConfig kinesisConfig, NamespaceCounter nsCounter) { if ( kinesisConfig.isKinesisConsumerEnabled() ) { File rulesFile = locateConfigFile( serviceDir, consumerRulesFile ); - Consumers consumer = new Consumers( metricRegistry, pointProcessor, recoveryPointProcessor, rulesFile, - kinesisConfig, checkPointMgr, kinesisConsumerRegion, - nsCounter, dataDir == null ? null : new File(dataDir, "index-name-sync")); + Consumers consumer = new Consumers( metricRegistry, pointProcessor, rulesFile, kinesisConfig, + nsCounter, dataDir == null ? null : new File(dataDir, "index-name-sync"), activeProfile, + kinesisAsyncClient, dynamoDbAsyncClient, cloudWatchAsyncClient, kinesisConsumerRetroSeconds); s.scheduleWithFixedDelay( consumer::reload, 15, 30, TimeUnit.SECONDS ); if (syncSecondaryDb) { s.scheduleWithFixedDelay( consumer::syncNamespaces, 60, 60, TimeUnit.SECONDS ); diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/destination/KinesisDestination.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/destination/KinesisDestination.java index 11ee29cd..21c7f3a5 100644 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/destination/KinesisDestination.java +++ b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/destination/KinesisDestination.java @@ -6,18 +6,6 @@ */ package com.demandware.carbonj.service.engine.destination; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicSessionCredentials; -import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; -import com.amazonaws.services.securitytoken.AWSSecurityTokenService; -import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceAsyncClientBuilder; -import com.amazonaws.services.securitytoken.model.AssumeRoleRequest; -import com.amazonaws.services.securitytoken.model.AssumeRoleResult; -import com.amazonaws.services.securitytoken.model.Credentials; -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; @@ -32,6 +20,12 @@ import com.demandware.carbonj.service.engine.kinesis.GzipDataPointCodec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; + import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.function.Consumer; @@ -44,7 +38,7 @@ public class KinesisDestination private static final Logger log = LoggerFactory.getLogger( KinesisDestination.class ); - private final AmazonKinesis kinesisClient; + private final KinesisAsyncClient kinesisClient; private final ArrayBlockingQueue q; @@ -69,14 +63,6 @@ public class KinesisDestination private final Timer producerTimer; - private final Boolean kinesisRelayRbacEnabled; - - private final String kinesisRelayAccount; - - private final String kinesisRelayRegion; - - private final String kinesisRelayRole; - public KinesisDestination(MetricRegistry metricRegistry, String type, int queueSize, String streamName, int batchSize, int threadCount, int maxWaitTimeInSecs, String kinesisRelayRegion, Boolean kinesisRelayRbacEnabled, String kinesisRelayAccount, String kinesisRelayRole) @@ -107,22 +93,18 @@ public KinesisDestination(MetricRegistry metricRegistry, String type, int queueS this.maxWaitTimeInSecs = maxWaitTimeInSecs; - this.kinesisRelayRbacEnabled = kinesisRelayRbacEnabled; - this.kinesisRelayRegion = kinesisRelayRegion; - this.kinesisRelayAccount = kinesisRelayAccount; - this.kinesisRelayRole = kinesisRelayRole; - if ( kinesisRelayRbacEnabled ) { log.info( "Rbac enabled. Building kinesis client and credentials provider with region: " + kinesisRelayRegion + ", account: " + kinesisRelayAccount + ", role: " + kinesisRelayRole); - kinesisClient = AmazonKinesisClientBuilder.standard().withCredentials( buildCredentialsProvider(kinesisRelayRegion, kinesisRelayAccount, kinesisRelayRole ) ) - .withRegion( kinesisRelayRegion ).build(); + kinesisClient = KinesisAsyncClient.builder() + .credentialsProvider( buildCredentialsProvider(kinesisRelayRegion, kinesisRelayAccount, kinesisRelayRole ) ) + .region(Region.of(kinesisRelayRegion)).build(); } else { log.info( "Rbac not enabled. Building kinesis client."); - kinesisClient = AmazonKinesisClientBuilder.standard().withRegion(kinesisRelayRegion).build(); + kinesisClient = KinesisAsyncClient.builder().region(Region.of(kinesisRelayRegion)).build(); } this.streamName = streamName; @@ -135,18 +117,19 @@ public KinesisDestination(MetricRegistry metricRegistry, String type, int queueS this.start(); } - private static AWSCredentialsProvider buildCredentialsProvider(String kinesisRelayRegion, String kinesisRelayAccount, String kinesisRelayRole) + private static AwsCredentialsProvider buildCredentialsProvider(String kinesisRelayRegion, String kinesisRelayAccount, String kinesisRelayRole) { String roleArn = "arn:aws:iam::" + kinesisRelayAccount + ":role/" + kinesisRelayRole; String roleSessionName = "cc-umon-client-session"; - final AWSCredentialsProvider credentialsProvider; + final AwsCredentialsProvider credentialsProvider; - AWSSecurityTokenService stsClient = - AWSSecurityTokenServiceAsyncClientBuilder.standard().withRegion( kinesisRelayRegion ).build(); + StsClient stsClient = StsClient.builder().region(Region.of(kinesisRelayRegion)).build(); - credentialsProvider = new STSAssumeRoleSessionCredentialsProvider.Builder( roleArn, roleSessionName ) - .withStsClient( stsClient ).withRoleSessionDurationSeconds( 3600 ).build(); + credentialsProvider = StsAssumeRoleCredentialsProvider.builder() + .refreshRequest(builder -> builder.roleArn(roleArn).roleSessionName(roleSessionName).durationSeconds(3600)) + .stsClient(stsClient) + .build(); return credentialsProvider; } @@ -245,5 +228,4 @@ public Consumer andThen(@SuppressWarnings("NullableProblems") Consume //noinspection DataFlowIssue return null; } - } diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/destination/KinesisProducerTask.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/destination/KinesisProducerTask.java index 59c328f7..1d6517ec 100644 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/destination/KinesisProducerTask.java +++ b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/destination/KinesisProducerTask.java @@ -6,21 +6,27 @@ */ package com.demandware.carbonj.service.engine.destination; -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; -import com.amazonaws.services.kinesis.model.PutRecordRequest; -import com.amazonaws.services.kinesis.model.PutRecordResult; -import com.codahale.metrics.*; +import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; import com.demandware.carbonj.service.engine.DataPoint; import com.demandware.carbonj.service.engine.kinesis.DataPointCodec; import com.demandware.carbonj.service.engine.kinesis.DataPoints; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException; +import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; + import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeUnit; public class KinesisProducerTask implements Runnable @@ -29,25 +35,25 @@ public class KinesisProducerTask private final MetricRegistry metricRegistry; - private AmazonKinesis kinesisClient; + private final KinesisAsyncClient kinesisClient; final private String streamName; - private List points; + private final List points; - private Meter metricsSent; + private final Meter metricsSent; - private Meter metricsDropped; + private final Meter metricsDropped; - private Meter messagesSent; + private final Meter messagesSent; - private Meter messageRetryCounter; + private final Meter messageRetryCounter; - private Histogram messageSize; + private final Histogram messageSize; - private Timer.Context timerContext; + private final Timer.Context timerContext; - private Map shardIdMap; + private final Map shardIdMap; private final Histogram dataPointsPerMessage; @@ -55,7 +61,7 @@ public class KinesisProducerTask private static final int NUM_RETRIES = 3; - KinesisProducerTask(MetricRegistry metricRegistry, AmazonKinesis kinesisClient, String streamName, List points, Meter metricsSent, + KinesisProducerTask(MetricRegistry metricRegistry, KinesisAsyncClient kinesisClient, String streamName, List points, Meter metricsSent, Meter metricsDropped, Meter messagesSent, Histogram messageSize, Meter messageRetryCounter, Timer.Context timerContext, Histogram dataPointsPerMessage, DataPointCodec codec) { this.metricRegistry = metricRegistry; @@ -78,15 +84,17 @@ public void run() { try { /* gzip multiple data points*/ byte[] message = codec.encode(new DataPoints(points, System.currentTimeMillis())); - PutRecordRequest putRecordRequest = new PutRecordRequest(); - putRecordRequest.setStreamName(streamName); - putRecordRequest.setData(ByteBuffer.wrap(message)); - putRecordRequest.setPartitionKey(UUID.randomUUID().toString()); + PutRecordRequest putRecordRequest = PutRecordRequest.builder() + .streamName(streamName) + .data(SdkBytes.fromByteArray(message)) + .partitionKey(UUID.randomUUID().toString()) + .build(); + boolean processedSuccessfully = false; - PutRecordResult putRecordResult = null; + PutRecordResponse putRecordResult = null; for (int i = 0; i < NUM_RETRIES && !processedSuccessfully; i++) { try { - putRecordResult = kinesisClient.putRecord(putRecordRequest); + putRecordResult = kinesisClient.putRecord(putRecordRequest).get(30, TimeUnit.SECONDS); processedSuccessfully = true; } catch (ProvisionedThroughputExceededException e) { messageRetryCounter.mark(); @@ -101,14 +109,14 @@ public void run() { metricsSent.mark(noOfDataPoints); dataPointsPerMessage.update(noOfDataPoints); messagesSent.mark(); - if (!shardIdMap.containsKey(putRecordResult.getShardId())) { - shardIdMap.put(putRecordResult.getShardId(), metricRegistry.counter(MetricRegistry.name(streamName, putRecordResult.getShardId()))); + if (!shardIdMap.containsKey(putRecordResult.shardId())) { + shardIdMap.put(putRecordResult.shardId(), metricRegistry.counter(MetricRegistry.name(streamName, putRecordResult.shardId()))); } - shardIdMap.get(putRecordResult.getShardId()).inc(); + shardIdMap.get(putRecordResult.shardId()).inc(); // log.info("Message sent.ShardId is " + putRecordResult.getShardId()); } else { metricsDropped.mark(); - log.error("Couldn't process record " + putRecordRequest + ". Skipping the record."); + log.error("Couldn't process record {}. Skipping the record.", putRecordRequest); } } catch(Throwable e) { log.error(e.getMessage(),e); diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/kinesis/cfgCheckPointMgr.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/kinesis/cfgCheckPointMgr.java index 40ae4ff0..e9027a33 100644 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/kinesis/cfgCheckPointMgr.java +++ b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/kinesis/cfgCheckPointMgr.java @@ -6,15 +6,12 @@ */ package com.demandware.carbonj.service.engine.kinesis; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.demandware.carbonj.service.accumulator.Accumulator; import com.demandware.carbonj.service.accumulator.cfgAccumulator; import com.demandware.carbonj.service.engine.CheckPointMgr; import com.demandware.carbonj.service.engine.DynamoDbCheckPointMgr; import com.demandware.carbonj.service.engine.FileCheckPointMgr; import com.demandware.carbonj.service.engine.KinesisConfig; -import com.demandware.carbonj.service.engine.cfgCarbonJ; import com.demandware.carbonj.service.engine.cfgKinesis; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,6 +21,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import java.nio.file.Paths; import java.util.Date; @@ -45,6 +43,8 @@ public class cfgCheckPointMgr { @Value( "${metrics.store.checkPoint.table.provisioned.throughput:2}" ) private int checkPointTableProvisionedThroughput; + @Value("${metrics.store.checkPoint.dynamodb.timeout:30}") private int checkPointDynamodbTimout; + private static final Logger log = LoggerFactory.getLogger( cfgCheckPointMgr.class ); @Bean @@ -66,9 +66,9 @@ CheckPointMgr checkPointMgr(ScheduledExecutorService s, KinesisConfig kine if ( checkPointProvider.equalsIgnoreCase( "dynamodb" ) ) { log.info( "Creating Dynamo DB Checkpoint Mgr" ); - AmazonDynamoDB dynamoDbClient = AmazonDynamoDBClientBuilder.standard().build(); - checkPointMgr = new DynamoDbCheckPointMgr( dynamoDbClient, checkPointApplicationName, - defaultCheckPointOffset, checkPointTableProvisionedThroughput ); + DynamoDbAsyncClient dynamoDbClient = DynamoDbAsyncClient.builder().build(); + checkPointMgr = new DynamoDbCheckPointMgr(dynamoDbClient, checkPointApplicationName, + defaultCheckPointOffset, checkPointTableProvisionedThroughput, checkPointDynamodbTimout); } else { diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/kinesis/kcl/MemLeaseManager.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/kinesis/kcl/MemLeaseManager.java deleted file mode 100644 index e33f862c..00000000 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/kinesis/kcl/MemLeaseManager.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Copyright (c) 2018, salesforce.com, inc. - * All rights reserved. - * SPDX-License-Identifier: BSD-3-Clause - * For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause - */ -package com.demandware.carbonj.service.engine.kinesis.kcl; - -import com.amazonaws.services.kinesis.leases.impl.Lease; -import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - - -public class MemLeaseManager implements ILeaseManager { - - private final long delayInMillis; - - private Map shardIdToLease; - - public MemLeaseManager(long delayInMillis) { - this.delayInMillis = delayInMillis; - } - - @Override - public boolean createLeaseTableIfNotExists(Long readCapacity, Long writeCapacity) { - - if (shardIdToLease == null) { - shardIdToLease = new ConcurrentHashMap<>(); - return true; - } - return false; - } - - @Override - public boolean leaseTableExists() { - return shardIdToLease != null; - } - - @Override - public boolean waitUntilLeaseTableExists(long secondsBetweenPolls, long timeoutSeconds) { - return shardIdToLease != null; - } - - @Override - public List listLeases() { - return new ArrayList<>(shardIdToLease.values()); - } - - @Override - public boolean createLeaseIfNotExists(T lease) { - return shardIdToLease.putIfAbsent(lease.getLeaseKey(), lease) == null; - } - - @Override - public T getLease(String shardId) { - return shardIdToLease.get(shardId); - } - - @Override - public boolean renewLease(T lease) { - lease.setLeaseCounter(lease.getLeaseCounter() + 1L); - shardIdToLease.put(lease.getLeaseKey(), lease); - return true; - } - - @Override - public boolean takeLease(T lease, String owner) { - - // an hack to get around a KCL race condition bug - try { - Thread.sleep(delayInMillis); - } catch (InterruptedException e) { - ; //ignore - } - - lease.setLeaseCounter(lease.getLeaseCounter() + 1L); - lease.setLeaseOwner(owner); - shardIdToLease.put(lease.getLeaseKey(), lease); - return true; - } - - @Override - public boolean evictLease(T lease) { - lease.setLeaseOwner((String)null); - lease.setLeaseCounter(lease.getLeaseCounter() + 1L); - shardIdToLease.put(lease.getLeaseKey(), lease); - - return true; - } - - @Override - public void deleteLease(T lease) { - shardIdToLease.remove(lease.getLeaseKey()); - } - - @Override - public void deleteAll() { - shardIdToLease.clear(); - } - - @Override - public boolean updateLease(T lease) { - shardIdToLease.put(lease.getLeaseKey(), lease); - lease.setLeaseCounter(lease.getLeaseCounter() + 1L); - return true; - } - - @Override - public boolean isLeaseTableEmpty() { - return shardIdToLease.isEmpty(); - } -} diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/recovery/DynamoDbGapsTableImpl.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/recovery/DynamoDbGapsTableImpl.java deleted file mode 100644 index 5f9b46aa..00000000 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/recovery/DynamoDbGapsTableImpl.java +++ /dev/null @@ -1,149 +0,0 @@ -/** - * Copyright (c) 2018, salesforce.com, inc. - * All rights reserved. - * SPDX-License-Identifier: BSD-3-Clause - * For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause - */ -package com.demandware.carbonj.service.engine.recovery; - -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.document.DynamoDB; -import com.amazonaws.services.dynamodbv2.document.Item; -import com.amazonaws.services.dynamodbv2.document.PrimaryKey; -import com.amazonaws.services.dynamodbv2.document.Table; -import com.amazonaws.services.dynamodbv2.document.spec.DeleteItemSpec; -import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; -import com.amazonaws.services.dynamodbv2.model.AttributeValue; -import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; -import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; -import com.amazonaws.services.dynamodbv2.model.KeyType; -import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; -import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType; -import com.amazonaws.services.dynamodbv2.model.ScanRequest; -import com.amazonaws.services.dynamodbv2.model.ScanResult; -import com.demandware.carbonj.service.engine.DynamoDbUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class DynamoDbGapsTableImpl implements GapsTable { - - private static final Logger log = LoggerFactory.getLogger(DynamoDbGapsTableImpl.class); - - private final DynamoDB dynamoDB; - private final String tableName; - private final AmazonDynamoDB client; - - public DynamoDbGapsTableImpl(AmazonDynamoDB client, String kinesisApplicationName, int gapsTableProvThroughput ) throws Exception { - - this.dynamoDB = new DynamoDB(client); - this.tableName = "gaps-" + kinesisApplicationName; - this.client = client; - - if (!DynamoDbUtils.isTablePresent(dynamoDB, tableName)) { - createTable(tableName, gapsTableProvThroughput); - } - } - - private void createTable(String tableName, int provisionedThroughput) throws Exception { - List attributeDefinitions = new ArrayList<>(); - attributeDefinitions.add(new AttributeDefinition("starttime", ScalarAttributeType.N)); - - List ks = new ArrayList<>(); - ks.add(new KeySchemaElement("starttime", KeyType.HASH)); - - ProvisionedThroughput provisionedthroughput = - new ProvisionedThroughput((long) provisionedThroughput, (long) provisionedThroughput); - - CreateTableRequest request = - new CreateTableRequest() - .withTableName(tableName) - .withAttributeDefinitions(attributeDefinitions) - .withKeySchema(ks) - .withProvisionedThroughput(provisionedthroughput); - - log.info("Issuing CreateTable request for " + tableName); - Table newlyCreatedTable = dynamoDB.createTable(request); - - log.info("Waiting for " + tableName + " to be created...this may take a while..."); - newlyCreatedTable.waitForActive(); - } - - @Override - public void add(Gap gap) { - Table table = dynamoDB.getTable(tableName); - - table.putItem(new Item().withPrimaryKey("starttime", gap.startTime().getTime()) - .withLong("endtime", gap.endTime().getTime()) - .withLong("lastRecovered", gap.lastRecovered().getTime())); - - log.info("Added gap: " + gap); - } - - @Override - public List getGaps() { - ScanRequest scanRequest = new ScanRequest() - .withTableName(tableName); - - List gaps = new ArrayList<>(); - - ScanResult result = client.scan(scanRequest); - for (Map item : result.getItems()){ - Date starttime = new Date(Long.parseLong(item.get("starttime").getN())); - Date endtime = new Date(Long.parseLong(item.get("endtime").getN())); - Date lastRecovered = new Date(Long.parseLong(item.get("lastRecovered").getN())); - gaps.add(new GapImpl(starttime, endtime, lastRecovered)); - } - - Collections.sort(gaps); - return gaps; - } - - @Override - public void delete(Gap gap) { - Table table = dynamoDB.getTable(tableName); - - DeleteItemSpec deleteItemSpec = new DeleteItemSpec() - .withPrimaryKey(new PrimaryKey("starttime", gap.startTime().getTime())); - - table.deleteItem(deleteItemSpec); - log.info("Gap deleted: " + gap); - } - - @Override - public boolean updateGap(Gap gap) { - Table table = dynamoDB.getTable(tableName); - - Map expressionAttributeNames = new HashMap<>(); - expressionAttributeNames.put("#E", "endtime"); - expressionAttributeNames.put("#L", "lastRecovered"); - - Map expressionAttributeValues = new HashMap<>(); - expressionAttributeValues.put(":val1", gap.endTime().getTime()); - expressionAttributeValues.put(":val2", gap.lastRecovered().getTime()); - - table.updateItem( - "starttime", // key attribute name - gap.startTime().getTime(), // key attribute value - "set #E = :val1, #L = :val2", // UpdateExpression - expressionAttributeNames, - expressionAttributeValues); - - return true; - } - - @Override - public void destroy() { - try { - DynamoDbUtils.deleteTable(dynamoDB, tableName); - } catch (InterruptedException e) { - log.error("Interrupted while destroting " + tableName, e); - } - } -} diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/recovery/GapProcessor.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/recovery/GapProcessor.java index 89f447d4..ce83fb36 100644 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/recovery/GapProcessor.java +++ b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/recovery/GapProcessor.java @@ -6,7 +6,6 @@ */ package com.demandware.carbonj.service.engine.recovery; -import com.amazonaws.services.kinesis.model.Record; import com.codahale.metrics.Counter; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; @@ -17,8 +16,17 @@ import com.demandware.carbonj.service.engine.kinesis.DataPoints; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import java.util.*; +import software.amazon.awssdk.services.kinesis.model.Record; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.Set; public class GapProcessor { @@ -91,7 +99,7 @@ public void process() throws InterruptedException { // take each record from the queue and check if it is not greater than end time. // if it is, we are done. // else, process them. after that, get the next record from the shard and push it to the priority queue. - while (queue.size() > 0) { + while (!queue.isEmpty()) { DataPointsInfo dataPointsInfo = queue.poll(); DataPoints dataPoints = dataPointsInfo.dataPoints; @@ -148,7 +156,7 @@ shardInfo, new Date( recordMinTimeStamp ), filteredDataPoints.size(), } } - if (dataPoints.getDataPoints().size() > 0) { + if (!dataPoints.getDataPoints().isEmpty()) { processRecordsWithRetries(dataPoints); } @@ -188,15 +196,15 @@ private DataPointsInfo getNextRecord(Map shardToT Record record = recordAndIterator.getRecord(); String iterator = recordAndIterator.getIterator(); - shardToTrackingInfoMap.put(shardInfo, new RecordTrackingInfo(record.getSequenceNumber(), iterator)); + shardToTrackingInfoMap.put(shardInfo, new RecordTrackingInfo(record.sequenceNumber(), iterator)); - DataPoints dataPoints = codec.decode(record.getData().array()); + DataPoints dataPoints = codec.decode(record.data().asByteArray()); - DataPointsInfo dataPointsInfo = new DataPointsInfo(dataPoints, shardInfo, record.getApproximateArrivalTimestamp().getTime()); + DataPointsInfo dataPointsInfo = new DataPointsInfo(dataPoints, shardInfo, record.approximateArrivalTimestamp().toEpochMilli()); if (log.isDebugEnabled()) { - log.debug( String.format( "Recovery: fetched %s : %tc record; Record min timestamp: %tc", shardInfo, record.getApproximateArrivalTimestamp(), new Date( + log.debug( String.format( "Recovery: fetched %s : %tc record; Record min timestamp: %tc", shardInfo, Date.from(record.approximateArrivalTimestamp()), new Date( dataPointsInfo.minTs * 1000L ) ) ); } return dataPointsInfo; @@ -233,8 +241,8 @@ private void processSingleRecord(DataPoints record) { } private static class RecordTrackingInfo { - private String lastSequenceNumber; - private String shardIterator; + private final String lastSequenceNumber; + private final String shardIterator; RecordTrackingInfo(String lastSequenceNumber, String shardIterator) { this.lastSequenceNumber = lastSequenceNumber; diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/recovery/KinesisStream.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/recovery/KinesisStream.java index 7911ecb4..811b6cee 100644 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/recovery/KinesisStream.java +++ b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/recovery/KinesisStream.java @@ -11,11 +11,11 @@ public interface KinesisStream { - Set getShards(); + Set getShards() throws InterruptedException; RecordAndIterator getNextRecord(ShardInfo shardInfo, String shardIterator, String lastSequenceNumber) throws InterruptedException; - String getShardIterator(ShardInfo shardInfo, Date startTimeStamp); + String getShardIterator(ShardInfo shardInfo, Date startTimeStamp) throws InterruptedException; - String getShardIterator(ShardInfo shardInfo, String sequenceNumber); + String getShardIterator(ShardInfo shardInfo, String sequenceNumber) throws InterruptedException; } diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/recovery/KinesisStreamImpl.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/recovery/KinesisStreamImpl.java deleted file mode 100644 index 737ce763..00000000 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/recovery/KinesisStreamImpl.java +++ /dev/null @@ -1,120 +0,0 @@ -/** - * Copyright (c) 2018, salesforce.com, inc. - * All rights reserved. - * SPDX-License-Identifier: BSD-3-Clause - * For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause - */ -package com.demandware.carbonj.service.engine.recovery; - -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.model.DescribeStreamRequest; -import com.amazonaws.services.kinesis.model.DescribeStreamResult; -import com.amazonaws.services.kinesis.model.ExpiredIteratorException; -import com.amazonaws.services.kinesis.model.GetRecordsRequest; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; -import com.amazonaws.services.kinesis.model.GetShardIteratorResult; -import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.model.Shard; -import com.amazonaws.services.kinesis.model.ShardIteratorType; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Timer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - -public class KinesisStreamImpl implements KinesisStream { - - private static final Logger log = LoggerFactory.getLogger(KinesisStreamImpl.class); - - private static Timer timer; - private final AmazonKinesis kinesis; - private final String streamName; - private final long retryTimeInMillis; - - KinesisStreamImpl(MetricRegistry metricRegistry, AmazonKinesis kinesis, String streamName, long retryTimeInMillis) { - this.timer = metricRegistry.timer(MetricRegistry.name("KinesisStream", "getNextRecord")); - this.kinesis = kinesis; - this.streamName = streamName; - this.retryTimeInMillis = retryTimeInMillis; - } - - @Override - public Set getShards() { - DescribeStreamRequest request = new DescribeStreamRequest().withStreamName(streamName); - DescribeStreamResult result = kinesis.describeStream(request); - Set shardInfos = new HashSet<>(); - for (Shard shard: result.getStreamDescription().getShards()) { - shardInfos.add(new ShardInfoImpl(shard.getShardId())); - } - return shardInfos; - } - - @Override // todo: improve error handling... - public RecordAndIterator getNextRecord(ShardInfo shardInfo, String shardIterator, String lastSequenceNumber) - throws InterruptedException { - Timer.Context context = timer.time(); - - try { - GetRecordsRequest request = new GetRecordsRequest().withLimit(1).withShardIterator(shardIterator); - GetRecordsResult result; - - try { - result = kinesis.getRecords(request); - } catch (ExpiredIteratorException e) { - log.warn("Got expired iterator: ", e.getMessage()); - - // retry with last sequence number - log.info("Trying with a new iterator"); - shardIterator = getShardIterator(shardInfo, lastSequenceNumber); - request = new GetRecordsRequest().withLimit(1).withShardIterator(shardIterator); - result = kinesis.getRecords(request); - } - - List records = result.getRecords(); - shardIterator = result.getNextShardIterator(); - - // retry until we get the records. - while (records.size() == 0 && shardIterator != null) { - if (log.isDebugEnabled()) - { - log.debug( "Recovery: Records not available yet for " + shardInfo.getShardId() + ". Waiting.." ); - } - Thread.sleep(retryTimeInMillis); - - request = new GetRecordsRequest().withLimit(1).withShardIterator(shardIterator); - result = kinesis.getRecords(request); - records = result.getRecords(); - shardIterator = result.getNextShardIterator(); - } - - if (records.size() == 0) { // end of shard has been reached. - return RecordAndIterator.EMPTY; - } - return new RecordAndIterator(records.get(0), shardIterator); - } finally { - context.close(); - } - } - - @Override - public String getShardIterator(ShardInfo shardInfo, Date startTimeStamp) { - GetShardIteratorRequest request = new GetShardIteratorRequest().withStreamName(streamName) - .withShardId(shardInfo.getShardId()) - .withShardIteratorType(ShardIteratorType.AT_TIMESTAMP) - .withTimestamp(startTimeStamp); - GetShardIteratorResult result = kinesis.getShardIterator(request); - return result.getShardIterator(); - } - - @Override - public String getShardIterator(ShardInfo shardInfo, String sequenceNumber) { - GetShardIteratorRequest request = new GetShardIteratorRequest().withStreamName(streamName) - .withShardId(shardInfo.getShardId()) - .withShardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER) - .withStartingSequenceNumber(sequenceNumber); - GetShardIteratorResult result = kinesis.getShardIterator(request); - return result.getShardIterator(); - } -} diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/recovery/RecordAndIterator.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/recovery/RecordAndIterator.java index e8547fb5..ca5479f0 100644 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/recovery/RecordAndIterator.java +++ b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/recovery/RecordAndIterator.java @@ -6,7 +6,7 @@ */ package com.demandware.carbonj.service.engine.recovery; -import com.amazonaws.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.Record; class RecordAndIterator { diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/recovery/RecoveryManager.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/recovery/RecoveryManager.java deleted file mode 100644 index 38e344c1..00000000 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/engine/recovery/RecoveryManager.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Copyright (c) 2018, salesforce.com, inc. - * All rights reserved. - * SPDX-License-Identifier: BSD-3-Clause - * For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause - */ -package com.demandware.carbonj.service.engine.recovery; - -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.codahale.metrics.MetricRegistry; -import com.demandware.carbonj.service.accumulator.Accumulator; -import com.demandware.carbonj.service.engine.PointProcessor; -import com.demandware.carbonj.service.engine.kinesis.DataPointCodec; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -public class RecoveryManager implements Runnable { - - private static final Logger log = LoggerFactory.getLogger(RecoveryManager.class); - - private final MetricRegistry metricRegistry; - private final GapsTable gapsTable; - private final String streamName; - private final PointProcessor pointProcessor; - private final AmazonKinesis kinesisClient; - private final long idleTimeInMillis; - private final long retryTimeInMillis; - private final DataPointCodec codec; - - public RecoveryManager(MetricRegistry metricRegistry, GapsTable gapsTable, String streamName, PointProcessor pointProcessor, - AmazonKinesis kinesisClient, long idleTimeInMillis, long retryTimeInMillis, - DataPointCodec codec) { - this.gapsTable = gapsTable; - this.streamName = streamName; - this.pointProcessor = pointProcessor; - this.kinesisClient = kinesisClient; - this.idleTimeInMillis = idleTimeInMillis; - this.retryTimeInMillis = retryTimeInMillis; - this.metricRegistry = metricRegistry; - this.codec = codec; - } - - @Override - public void run() { - try { - log.info("Recovery: Running Recovery Manager...."); - List gaps = Collections.synchronizedList(gapsTable.getGaps()); - log.info("Recovery: Found gaps " + gaps); - - KinesisStream kinesisStream = new KinesisStreamImpl(metricRegistry, kinesisClient, streamName, - retryTimeInMillis); - - ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); - executor.setRemoveOnCancelPolicy(true); - - while (gaps.size() > 0) { - Gap gap = gaps.get(0); - - // checkpoint recovery table periodically so that we do not have to restart recovery from the start if - // carbonj restarts in the middle of recovery. - Accumulator accumulator = pointProcessor.getAccumulator(); - RecoveryCheckPointCommand gapTableCommand = new RecoveryCheckPointCommand(gapsTable, gap, accumulator); - ScheduledFuture checkPointCmdFuture = executor.scheduleWithFixedDelay(gapTableCommand, 1, 1, - TimeUnit.MINUTES); - - // process gap. - log.info("Recovery: Recovering gap: " + gap.startTime() + " - " + gap.endTime() + " : LastRecoveryTime: " + gap.lastRecovered()); - GapProcessor gapProcessor = new GapProcessor(metricRegistry, gap, kinesisStream, pointProcessor, idleTimeInMillis, codec); - gapProcessor.process(); - - // gap has been processed. clean up resources. - checkPointCmdFuture.cancel(false); - gapsTable.delete(gap); - //todo: ideally we would like to have a separate point processor while proccessing each gap so that - // we do not have to clear state between processing each gaps. - pointProcessor.flushAggregations(true); // flush all slots. - accumulator.reset(); - - gaps = gapsTable.getGaps(); - } - - executor.shutdown(); - gapsTable.destroy(); - - log.info("Recovery: Exiting Recovery Manager...."); - } catch (InterruptedException e) { - log.error("Interrupted..", e); - } catch (Exception e) { - log.error("Unexpected error", e); // todo: add retry logic - } - } - - // We checkpoint the recovery to the last slot flushed while processing recovery data points. - private class RecoveryCheckPointCommand implements Runnable { - - private final GapsTable gapsTable; - private final Gap gap; - private final Accumulator accumulator; - - RecoveryCheckPointCommand(GapsTable gapsTable, Gap gap, Accumulator accumulator) { - this.gapsTable = gapsTable; - this.gap = gap; - this.accumulator = accumulator; - } - - @Override - public void run() { - int maxClosedSlotTs = accumulator.getMaxClosedSlotTs(); - - if (maxClosedSlotTs <= 0) { - return; - } - - long recoveredSoFarInMillis = maxClosedSlotTs * 1000L; - try { - gapsTable.updateGap(new GapImpl(gap.startTime(), gap.endTime(), new Date(recoveredSoFarInMillis))); - } catch (Exception e) { - log.error("Exception while updating gaps table", e); - } - } - } -} diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/events/KinesisEventsLogger.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/events/KinesisEventsLogger.java index 215bf374..ffcce656 100644 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/events/KinesisEventsLogger.java +++ b/carbonj.service/src/main/java/com/demandware/carbonj/service/events/KinesisEventsLogger.java @@ -6,24 +6,23 @@ */ package com.demandware.carbonj.service.events; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicSessionCredentials; -import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.securitytoken.AWSSecurityTokenService; -import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceAsyncClientBuilder; -import com.amazonaws.services.securitytoken.model.AssumeRoleRequest; -import com.amazonaws.services.securitytoken.model.AssumeRoleResult; -import com.amazonaws.services.securitytoken.model.Credentials; -import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import com.codahale.metrics.MetricRegistry; import com.demandware.carbonj.service.engine.RejectionHandler; import com.demandware.carbonj.service.queue.InputQueue; import com.demandware.carbonj.service.queue.QueueProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.http.nio.netty.Http2Configuration; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; + +import java.time.Duration; /** * Entry point to the events shipping functionality. It holds the queue where all the events received from other @@ -36,6 +35,10 @@ public class KinesisEventsLogger implements EventsLogger { private final InputQueue queue; + private static final int NETTY_HTTP_CLIENT_TIMEOUT_MILLIS = 30 * 1000; + private static final int INITIAL_WINDOW_SIZE_BYTES = 512 * 1024; // 512 KB + private static final long HEALTH_CHECK_PING_PERIOD_MILLIS = 60 * 1000; + KinesisEventsLogger(MetricRegistry metricRegistry, int queueSize, int emptyQueuePauseMillis, RejectionHandler rejectionHandler, QueueProcessor queueProcessor, int batchSize, long maxWaitTimeMillis) { @@ -51,29 +54,40 @@ public class KinesisEventsLogger implements EventsLogger { streamName, buildKinesisClient(rbacEnabled, region, account, role), noOfThreads), batchSize, maxWaitTimeMillis); } - private static AmazonKinesis buildKinesisClient(boolean rbacEnabled, String region, String account, String role) + private static KinesisAsyncClient buildKinesisClient(boolean rbacEnabled, String region, String account, String role) { - if ( rbacEnabled) { + if (rbacEnabled) { String roleArn = "arn:aws:iam::" + account + ":role/" + role; String roleSessionName = "cc-umon-client-events-session"; - final AWSCredentialsProvider credentialsProvider; - log.info( "Rbac enabled for events. Building kinesis client and credentials provider with region: " + region + ", account: " + account + ", role: " + role); - AWSSecurityTokenService stsClient = - AWSSecurityTokenServiceAsyncClientBuilder.standard().withRegion(region).build(); + StsClient stsClient = StsClient.builder() + .region(Region.of(region)) + .build(); - credentialsProvider = new STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, roleSessionName) - .withStsClient(stsClient).withRoleSessionDurationSeconds(3600).build(); + AwsCredentialsProvider awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder() + .refreshRequest(builder -> builder.roleArn(roleArn).roleSessionName(roleSessionName).durationSeconds(3600)) + .stsClient(stsClient) + .build(); - return AmazonKinesisClientBuilder.standard().withCredentials( credentialsProvider ).withRegion(region).build(); - } - else - { - log.info( "Rbac not enabled for events. Building kinesis client."); - return AmazonKinesisClientBuilder.defaultClient(); + // https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/http-configuration-netty.html + KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder() + .credentialsProvider(awsCredentialsProvider).region(Region.of(region)); + return kinesisAsyncClientBuilder.httpClientBuilder( + NettyNioAsyncHttpClient.builder() + .connectionTimeout(Duration.ofMillis(NETTY_HTTP_CLIENT_TIMEOUT_MILLIS)) + .readTimeout(Duration.ofMillis(NETTY_HTTP_CLIENT_TIMEOUT_MILLIS)) + .maxConcurrency(Integer.MAX_VALUE) + .http2Configuration(Http2Configuration.builder().initialWindowSize(INITIAL_WINDOW_SIZE_BYTES) + .healthCheckPingPeriod(Duration.ofMillis(HEALTH_CHECK_PING_PERIOD_MILLIS)) + .build()) + .protocol(Protocol.HTTP2)) + .build(); + } else { + log.info( "Rbac not enabled for events. Building kinesis client."); + return KinesisAsyncClient.builder().build(); } } diff --git a/carbonj.service/src/main/java/com/demandware/carbonj/service/events/KinesisQueueProcessor.java b/carbonj.service/src/main/java/com/demandware/carbonj/service/events/KinesisQueueProcessor.java index d8ba7c86..829de435 100644 --- a/carbonj.service/src/main/java/com/demandware/carbonj/service/events/KinesisQueueProcessor.java +++ b/carbonj.service/src/main/java/com/demandware/carbonj/service/events/KinesisQueueProcessor.java @@ -6,10 +6,11 @@ */ package com.demandware.carbonj.service.events; -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; -import com.amazonaws.services.kinesis.model.PutRecordRequest; -import com.codahale.metrics.*; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; import com.demandware.carbonj.service.db.util.StatsAware; import com.demandware.carbonj.service.engine.BlockingPolicy; import com.demandware.carbonj.service.engine.InputQueueThreadFactory; @@ -19,7 +20,10 @@ import com.salesforce.cc.infra.core.kinesis.PayloadCodec; import org.slf4j.LoggerFactory; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException; +import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import java.nio.ByteBuffer; import java.util.Collection; @@ -34,13 +38,13 @@ */ public class KinesisQueueProcessor implements QueueProcessor, StatsAware { - private static Meter kinesisMessagesSent; + private final Meter kinesisMessagesSent; - private static Meter logEventsDropped; + private final Meter logEventsDropped; - private static Meter messageRetryCounter; + private final Meter messageRetryCounter; - public static Histogram messageSize; + private final Histogram messageSize; private static final Map HEADERS = ImmutableMap.of( "Payload-Version", "2.0"); @@ -50,15 +54,15 @@ public class KinesisQueueProcessor implements QueueProcessor, StatsAware private static final int NUM_RETRIES = 3; private final String streamName; - private final AmazonKinesis kinesisClient; + private final KinesisAsyncClient kinesisClient; private final ThreadPoolExecutor ex; private final Histogram activeThreadsHistogram; private final Gauge activeThreadCount; - private Gauge taskCount; + private final Gauge taskCount; - KinesisQueueProcessor(MetricRegistry metricRegistry, String streamName, AmazonKinesis kinesisClient, int noOfThreads) { + KinesisQueueProcessor(MetricRegistry metricRegistry, String streamName, KinesisAsyncClient kinesisClient, int noOfThreads) { this.streamName = streamName; this.kinesisClient = kinesisClient; @@ -93,7 +97,7 @@ public class KinesisQueueProcessor implements QueueProcessor, StatsAware @Override public void process(Collection events) { - ex.submit(new KinesisEventTask(streamName, kinesisClient, events)); + ex.submit(new KinesisEventTask(streamName, kinesisClient, events, messageSize, kinesisMessagesSent, messageRetryCounter, logEventsDropped)); } @Override @@ -115,13 +119,22 @@ public void refreshStats() private static final class KinesisEventTask implements Runnable { private final String streamName; - private final AmazonKinesis kinesisClient; + private final KinesisAsyncClient kinesisClient; private final Collection events; + private final Histogram messageSize; + private final Meter kinesisMessagesSent; + private final Meter messageRetryCounter; + private final Meter logEventsDropped; - KinesisEventTask(String streamName, AmazonKinesis kinesisClient, Collection events) { + KinesisEventTask(String streamName, KinesisAsyncClient kinesisClient, Collection events, + Histogram messageSize, Meter kinesisMessagesSent, Meter messageRetryCounter, Meter logEventsDropped) { this.streamName = streamName; this.kinesisClient = kinesisClient; this.events = events; + this.messageSize = messageSize; + this.kinesisMessagesSent = kinesisMessagesSent; + this.messageRetryCounter = messageRetryCounter; + this.logEventsDropped = logEventsDropped; } @Override @@ -130,10 +143,11 @@ public void run() { byte[] encodedDataBytes = GzipPayloadV2Codec.getInstance().encode(events); Message message = new Message(HEADERS, encodedDataBytes); byte[] payload = PayloadCodec.encode(message); - PutRecordRequest putRecordRequest = new PutRecordRequest(); - putRecordRequest.setStreamName(streamName); - putRecordRequest.setData(ByteBuffer.wrap(payload)); - putRecordRequest.setPartitionKey(UUID.randomUUID().toString()); + PutRecordRequest putRecordRequest = PutRecordRequest.builder() + .streamName(streamName) + .data(SdkBytes.fromByteBuffer(ByteBuffer.wrap(payload))) + .partitionKey(UUID.randomUUID().toString()) + .build(); boolean processedSuccessfully = false; for (int i = 0; i < NUM_RETRIES && !processedSuccessfully; i++) { try { diff --git a/carbonj.service/src/test/java/com/demandware/carbonj/service/engine/TestDynamoDbCheckPointMgr.java b/carbonj.service/src/test/java/com/demandware/carbonj/service/engine/TestDynamoDbCheckPointMgr.java index efffbcea..fc474693 100644 --- a/carbonj.service/src/test/java/com/demandware/carbonj/service/engine/TestDynamoDbCheckPointMgr.java +++ b/carbonj.service/src/test/java/com/demandware/carbonj/service/engine/TestDynamoDbCheckPointMgr.java @@ -6,12 +6,11 @@ */ package com.demandware.carbonj.service.engine; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.annotation.DirtiesContext; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import java.util.Date; import java.util.concurrent.TimeUnit; @@ -27,8 +26,8 @@ public class TestDynamoDbCheckPointMgr { @Test @Disabled // ignoring because it is calling AWS API and it should not be public void testBasic() throws Exception { - AmazonDynamoDB dynamoDbClient = AmazonDynamoDBClientBuilder.standard().build(); - CheckPointMgr checkPointMgr = new DynamoDbCheckPointMgr(dynamoDbClient, "test", 60, 1); + DynamoDbAsyncClient dynamoDbClient = DynamoDbAsyncClient.builder().build(); + CheckPointMgr checkPointMgr = new DynamoDbCheckPointMgr(dynamoDbClient, "test", 60, 1, 30); Date lastCheckPoint = checkPointMgr.lastCheckPoint(); assertTrue(lastCheckPoint.before(new Date(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(60)))); Date checkPoint1 = new Date(System.currentTimeMillis()); diff --git a/carbonj.service/src/test/java/com/demandware/carbonj/service/engine/kcl/TestMemLeaseManager.java b/carbonj.service/src/test/java/com/demandware/carbonj/service/engine/kcl/TestMemLeaseManager.java deleted file mode 100644 index 3a2897d0..00000000 --- a/carbonj.service/src/test/java/com/demandware/carbonj/service/engine/kcl/TestMemLeaseManager.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Copyright (c) 2018, salesforce.com, inc. - * All rights reserved. - * SPDX-License-Identifier: BSD-3-Clause - * For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause - */ -package com.demandware.carbonj.service.engine.kcl; - -import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; -import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; -import com.demandware.carbonj.service.engine.kinesis.kcl.MemLeaseManager; -import org.junit.jupiter.api.Test; - -import java.util.ArrayList; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class TestMemLeaseManager { - - private static final String OWNER = "owner"; - private static final int NO_OF_SHARDS = 5; - - @Test - public void testBasicPath() throws Exception { - ILeaseManager leaseManager = new MemLeaseManager<>(1); - - // check if the lease table exists - assertFalse(leaseManager.leaseTableExists()); - - // create table if it not exists - assertTrue(leaseManager.createLeaseTableIfNotExists(10L, 10L)); - - // check if the lease table exists - assertTrue(leaseManager.waitUntilLeaseTableExists(1, 10)); - - // check if the lease table exists - assertTrue(leaseManager.leaseTableExists()); - - // check if the lease table is empty - assertTrue(leaseManager.isLeaseTableEmpty()); - - assertEquals(0, leaseManager.listLeases().size()); - - // create lease for 5 shards - for (int i = 0; i < NO_OF_SHARDS; i++) { - String shardId = "shard-" + i; - KinesisClientLease lease = newKCLLease(shardId); - assertTrue(leaseManager.createLeaseIfNotExists(lease)); - } - - // check if the lease table is not empty - assertFalse(leaseManager.isLeaseTableEmpty()); - - assertEquals(5, leaseManager.listLeases().size()); - - verify(leaseManager, null, 0L); - - for (int i = 0; i < NO_OF_SHARDS; i++) { - String shardId = "shard-" + i; - KinesisClientLease lease = leaseManager.getLease(shardId); - leaseManager.takeLease(lease, OWNER); - } - - verify(leaseManager, OWNER, 1L); - - renewLeases(leaseManager); - - verify(leaseManager, OWNER, 2L); - - updateLeases(leaseManager); - - verify(leaseManager, OWNER, 3L); - } - - private void updateLeases(ILeaseManager leaseManager) throws Exception { - for (int i = 0; i < NO_OF_SHARDS; i++) { - String shardId = "shard-" + i; - KinesisClientLease lease = leaseManager.getLease(shardId); - leaseManager.updateLease(lease); - } - } - - private void verify(ILeaseManager leaseManager, String expectedOwner, Long expectedLeaseCounter) throws Exception { - for (int i = 0; i < NO_OF_SHARDS; i++) { - String shardId = "shard-" + i; - KinesisClientLease lease = leaseManager.getLease(shardId); - assertEquals(expectedOwner, lease.getLeaseOwner()); - assertEquals(expectedLeaseCounter, lease.getLeaseCounter()); - } - assertEquals(NO_OF_SHARDS, leaseManager.listLeases().size()); - } - - private static KinesisClientLease newKCLLease(String shardId) { - KinesisClientLease newLease = new KinesisClientLease(); - newLease.setLeaseKey(shardId); - List parentShardIds = new ArrayList<>(); - newLease.setParentShardIds(parentShardIds); - newLease.setOwnerSwitchesSinceCheckpoint(0L); - - return newLease; - } - - private void renewLeases(ILeaseManager leaseManager) throws Exception { - for (int i = 0; i < NO_OF_SHARDS; i++) { - String shardId = "shard-" + i; - KinesisClientLease lease = leaseManager.getLease(shardId); - leaseManager.renewLease(lease); - } - } -} diff --git a/carbonj.service/src/test/java/com/demandware/carbonj/service/engine/recovery/TestGapProcessor.java b/carbonj.service/src/test/java/com/demandware/carbonj/service/engine/recovery/TestGapProcessor.java index 248bbf72..dc91a51d 100644 --- a/carbonj.service/src/test/java/com/demandware/carbonj/service/engine/recovery/TestGapProcessor.java +++ b/carbonj.service/src/test/java/com/demandware/carbonj/service/engine/recovery/TestGapProcessor.java @@ -6,7 +6,6 @@ */ package com.demandware.carbonj.service.engine.recovery; -import com.amazonaws.services.kinesis.model.Record; import com.codahale.metrics.MetricRegistry; import com.demandware.carbonj.service.accumulator.Accumulator; import com.demandware.carbonj.service.accumulator.DefaultSlotStrategy; @@ -17,9 +16,11 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.stubbing.OngoingStubbing; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.kinesis.model.Record; -import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; @@ -136,10 +137,11 @@ private KinesisStream getMockKinesisStream(List mockShards, Map gaps = gapsTable.getGaps(); - assertEquals(2, gaps.size()); - - assertEquals(firstGap, gaps.get(0)); - assertEquals(latestGap, gaps.get(1)); - - gapsTable.delete(firstGap); - - gaps = gapsTable.getGaps(); - assertEquals(1, gaps.size()); - assertEquals(latestGap, gaps.get(0)); - - assertEquals(latestGap.startTime(), latestGap.lastRecovered()); - Date lastRecovered = new Date(nowTime + TimeUnit.HOURS.toMillis(1)); - Gap updatedGap = new GapImpl(latestGap.startTime(), latestGap.endTime(), lastRecovered); - gapsTable.updateGap(updatedGap); - - // verify updated gap - gaps = gapsTable.getGaps(); - assertEquals(1, gaps.size()); - Gap newlyFetchedGap = gaps.get(0); - assertEquals(updatedGap.startTime(), newlyFetchedGap.startTime()); - assertEquals(updatedGap.endTime(), newlyFetchedGap.endTime()); - assertEquals(lastRecovered, newlyFetchedGap.lastRecovered()); - - gapsTable.delete(newlyFetchedGap); - - gaps = gapsTable.getGaps(); - assertEquals(0, gaps.size()); - - gapsTable.destroy(); - } -} diff --git a/carbonj.service/src/test/java/com/demandware/carbonj/service/events/TestKinesisEventsLogger.java b/carbonj.service/src/test/java/com/demandware/carbonj/service/events/TestKinesisEventsLogger.java index f5c6ece8..a473b532 100644 --- a/carbonj.service/src/test/java/com/demandware/carbonj/service/events/TestKinesisEventsLogger.java +++ b/carbonj.service/src/test/java/com/demandware/carbonj/service/events/TestKinesisEventsLogger.java @@ -6,9 +6,6 @@ */ package com.demandware.carbonj.service.events; -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.model.PutRecordRequest; -import com.amazonaws.services.kinesis.model.PutRecordResult; import com.codahale.metrics.MetricRegistry; import com.demandware.carbonj.service.queue.QueueProcessor; import com.salesforce.cc.infra.core.kinesis.Message; @@ -17,11 +14,15 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatcher; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertArrayEquals; @@ -73,9 +74,10 @@ public void testSingleEvent() throws Exception { assertArrayEquals(eventBytes, eventCollection.iterator().next()); } - private AmazonKinesis mockAmazonKinesis() { - AmazonKinesis mockKinesisClient = mock(AmazonKinesis.class); - when(mockKinesisClient.putRecord(argThat(new PutRecordRequestArgMatcher(datae)))).thenReturn(new PutRecordResult().withShardId("1")); + private KinesisAsyncClient mockAmazonKinesis() { + KinesisAsyncClient mockKinesisClient = mock(KinesisAsyncClient.class); + when(mockKinesisClient.putRecord(argThat(new PutRecordRequestArgMatcher(datae)))) + .thenReturn(CompletableFuture.completedFuture(PutRecordResponse.builder().shardId("1").build())); return mockKinesisClient; } @@ -91,7 +93,7 @@ private static class PutRecordRequestArgMatcher @Override public boolean matches(PutRecordRequest argument) { - byte[] bytes = argument.getData().array(); + byte[] bytes = argument.data().asByteArray(); return dataList.add(bytes); } } diff --git a/gradle.properties b/gradle.properties index a0882aea..8cf4c909 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=1.1.64-SNAPSHOT +version=1.1.64-W-17312191 org.gradle.jvmargs=--add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED org.gradle.daemon=true springbootVersion=3.3.3 @@ -19,8 +19,8 @@ jodaTime=2.12.7 nettyAll=4.1.113.Final pickle=1.5 jythonStandalone=2.7.4 -amazonKinesisClient=1.15.2 -awsJavaSdkV1=1.12.771 +awsJavaSdkV2=2.29.20 +awsKinesisClient=3.0.1 metrics=4.2.27 msgpack=0.9.8 servletApi=6.1.0 @@ -32,3 +32,4 @@ mavenPublish=2.0.1 dependencyLicenseReport=2.9 hierynomusLicense=0.16.1 sonarqube=5.1.0.4882 +httpClient=4.5.14