From 7dde93fdc93bb86ee86f38ca25a512ac86502dfb Mon Sep 17 00:00:00 2001
From: Michael7371 <40476797+Michael7371@users.noreply.github.com>
Date: Mon, 10 Jun 2024 12:47:14 -0600
Subject: [PATCH 01/19] test artifact publishing
---
.github/workflows/artifact-publish.yml | 33 ++++++++++++++++++++++++++
pom.xml | 9 +++++++
2 files changed, 42 insertions(+)
create mode 100644 .github/workflows/artifact-publish.yml
diff --git a/.github/workflows/artifact-publish.yml b/.github/workflows/artifact-publish.yml
new file mode 100644
index 0000000..004c6f8
--- /dev/null
+++ b/.github/workflows/artifact-publish.yml
@@ -0,0 +1,33 @@
+name: Publish Java Package
+
+on:
+ # commented out for testing
+ # release:
+ # types: [created]
+ pull_request:
+ types:
+ - opened
+ - synchronize
+ - reopened
+
+jobs:
+ build:
+ runs-on: ubuntu-latest
+
+ steps:
+ - name: Checkout code
+ uses: actions/checkout@v4
+
+ - name: Set up JDK 21
+ uses: actions/setup-java@v4
+ with:
+ java-version: '21'
+ distribution: 'adopt'
+
+ - name: Build with Maven
+ run: mvn -B package --file pom.xml
+
+ - name: Publish to GitHub Packages
+ run: mvn --batch-mode -Dgithub_organization=${{ github.repository_owner }} deploy
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index d6e0ad1..b43be26 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,6 +10,8 @@
21
21
+
+ usdot-jpo-ode
@@ -101,4 +103,11 @@
+
+
+ github
+ GitHub Packages
+ https://github.com/${github_organization}/jpo-s3-deposit
+
+
From 35b7f18019eeccb582fedf3163e7a4fb029b8f93 Mon Sep 17 00:00:00 2001
From: Michael7371 <40476797+Michael7371@users.noreply.github.com>
Date: Mon, 10 Jun 2024 12:49:43 -0600
Subject: [PATCH 02/19] fix artifact endpoint
---
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index b43be26..c29687b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,7 +107,7 @@
github
GitHub Packages
- https://github.com/${github_organization}/jpo-s3-deposit
+ https://maven.pkg.github.com/${github_organization}/jpo-s3-deposit
From 5460c519735e90d9d9917501cb9d780a838d0e6b Mon Sep 17 00:00:00 2001
From: Michael7371 <40476797+Michael7371@users.noreply.github.com>
Date: Mon, 10 Jun 2024 12:56:06 -0600
Subject: [PATCH 03/19] update trigger to only run on release
---
.github/workflows/artifact-publish.yml | 10 ++--------
1 file changed, 2 insertions(+), 8 deletions(-)
diff --git a/.github/workflows/artifact-publish.yml b/.github/workflows/artifact-publish.yml
index 004c6f8..cb6ae1e 100644
--- a/.github/workflows/artifact-publish.yml
+++ b/.github/workflows/artifact-publish.yml
@@ -1,14 +1,8 @@
name: Publish Java Package
on:
- # commented out for testing
- # release:
- # types: [created]
- pull_request:
- types:
- - opened
- - synchronize
- - reopened
+ release:
+ types: [created]
jobs:
build:
From 3462850072bd733dbdc1ed9cf72b168da273c046 Mon Sep 17 00:00:00 2001
From: Michael7371 <40476797+Michael7371@users.noreply.github.com>
Date: Wed, 12 Jun 2024 13:54:18 -0600
Subject: [PATCH 04/19] testing -DremoveSnapshot task for artifact publishing
github action
---
.github/workflows/artifact-publish.yml | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
diff --git a/.github/workflows/artifact-publish.yml b/.github/workflows/artifact-publish.yml
index cb6ae1e..d1b90be 100644
--- a/.github/workflows/artifact-publish.yml
+++ b/.github/workflows/artifact-publish.yml
@@ -1,8 +1,13 @@
name: Publish Java Package
on:
- release:
- types: [created]
+ # release:
+ # types: [created]
+ pull_request:
+ types:
+ - opened
+ - synchronize
+ - reopened
jobs:
build:
@@ -17,6 +22,9 @@ jobs:
with:
java-version: '21'
distribution: 'adopt'
+
+ - name: Remove snapshot from version
+ run: mvn versions:set -DremoveSnapshot
- name: Build with Maven
run: mvn -B package --file pom.xml
From ebcc32dbe7f949d6b264c44a76317346599f7df5 Mon Sep 17 00:00:00 2001
From: Michael7371 <40476797+Michael7371@users.noreply.github.com>
Date: Wed, 12 Jun 2024 13:56:58 -0600
Subject: [PATCH 05/19] updating artifact-publish github action to be triggered
on release creation after testing
---
.github/workflows/artifact-publish.yml | 9 ++-------
1 file changed, 2 insertions(+), 7 deletions(-)
diff --git a/.github/workflows/artifact-publish.yml b/.github/workflows/artifact-publish.yml
index d1b90be..cb9e80d 100644
--- a/.github/workflows/artifact-publish.yml
+++ b/.github/workflows/artifact-publish.yml
@@ -1,13 +1,8 @@
name: Publish Java Package
on:
- # release:
- # types: [created]
- pull_request:
- types:
- - opened
- - synchronize
- - reopened
+ release:
+ types: [created]
jobs:
build:
From 465545c77ab562ca1313802d6b94c34953c12a79 Mon Sep 17 00:00:00 2001
From: Michael7371 <40476797+Michael7371@users.noreply.github.com>
Date: Wed, 12 Jun 2024 15:36:47 -0600
Subject: [PATCH 06/19] testing snapshot publishing
---
.../workflows/artifact-publish-dev-caller.yml | 16 ++++++++++++++++
.../workflows/artifact-publish-tag-caller.yml | 14 ++++++++++++++
.github/workflows/artifact-publish.yml | 14 +++++++++++---
3 files changed, 41 insertions(+), 3 deletions(-)
create mode 100644 .github/workflows/artifact-publish-dev-caller.yml
create mode 100644 .github/workflows/artifact-publish-tag-caller.yml
diff --git a/.github/workflows/artifact-publish-dev-caller.yml b/.github/workflows/artifact-publish-dev-caller.yml
new file mode 100644
index 0000000..89bd3a3
--- /dev/null
+++ b/.github/workflows/artifact-publish-dev-caller.yml
@@ -0,0 +1,16 @@
+name: Publish Java Package Snapshot
+
+on:
+ push:
+ branches:
+ - develop
+ - dev
+ - cicd-artifact-publishing
+
+jobs:
+ publish-java-package:
+ uses: ./.github/workflows/artifact-publish.yml
+ with:
+ drop-snapshot: false
+ secrets:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
\ No newline at end of file
diff --git a/.github/workflows/artifact-publish-tag-caller.yml b/.github/workflows/artifact-publish-tag-caller.yml
new file mode 100644
index 0000000..df31341
--- /dev/null
+++ b/.github/workflows/artifact-publish-tag-caller.yml
@@ -0,0 +1,14 @@
+name: Publish Java Package Tag
+
+on:
+ push:
+ tags:
+ - "*"
+
+jobs:
+ publish-java-package:
+ uses: ./.github/workflows/artifact-publish.yml
+ with:
+ drop-snapshot: true
+ secrets:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
\ No newline at end of file
diff --git a/.github/workflows/artifact-publish.yml b/.github/workflows/artifact-publish.yml
index cb9e80d..0b0160d 100644
--- a/.github/workflows/artifact-publish.yml
+++ b/.github/workflows/artifact-publish.yml
@@ -1,8 +1,15 @@
name: Publish Java Package
on:
- release:
- types: [created]
+ workflow_call:
+ inputs:
+ drop-snapshot:
+ description: 'Drop snapshot from version'
+ required: true
+ type: boolean
+ secrets:
+ GITHUB_TOKEN:
+ required: true
jobs:
build:
@@ -17,8 +24,9 @@ jobs:
with:
java-version: '21'
distribution: 'adopt'
-
+
- name: Remove snapshot from version
+ if: ${{ github.event.inputs.drop-snapshot == 'true' }}
run: mvn versions:set -DremoveSnapshot
- name: Build with Maven
From 18ab77cb5a04f91538922b96d9f47edb442bbfd9 Mon Sep 17 00:00:00 2001
From: Michael7371 <40476797+Michael7371@users.noreply.github.com>
Date: Wed, 12 Jun 2024 15:42:29 -0600
Subject: [PATCH 07/19] Removing GitHub token secret reference in workflow
---
.github/workflows/artifact-publish-dev-caller.yml | 4 +---
.github/workflows/artifact-publish-tag-caller.yml | 4 +---
.github/workflows/artifact-publish.yml | 3 ---
3 files changed, 2 insertions(+), 9 deletions(-)
diff --git a/.github/workflows/artifact-publish-dev-caller.yml b/.github/workflows/artifact-publish-dev-caller.yml
index 89bd3a3..858d6c7 100644
--- a/.github/workflows/artifact-publish-dev-caller.yml
+++ b/.github/workflows/artifact-publish-dev-caller.yml
@@ -11,6 +11,4 @@ jobs:
publish-java-package:
uses: ./.github/workflows/artifact-publish.yml
with:
- drop-snapshot: false
- secrets:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
\ No newline at end of file
+ drop-snapshot: false
\ No newline at end of file
diff --git a/.github/workflows/artifact-publish-tag-caller.yml b/.github/workflows/artifact-publish-tag-caller.yml
index df31341..bf4ed53 100644
--- a/.github/workflows/artifact-publish-tag-caller.yml
+++ b/.github/workflows/artifact-publish-tag-caller.yml
@@ -9,6 +9,4 @@ jobs:
publish-java-package:
uses: ./.github/workflows/artifact-publish.yml
with:
- drop-snapshot: true
- secrets:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
\ No newline at end of file
+ drop-snapshot: true
\ No newline at end of file
diff --git a/.github/workflows/artifact-publish.yml b/.github/workflows/artifact-publish.yml
index 0b0160d..ee6e895 100644
--- a/.github/workflows/artifact-publish.yml
+++ b/.github/workflows/artifact-publish.yml
@@ -7,9 +7,6 @@ on:
description: 'Drop snapshot from version'
required: true
type: boolean
- secrets:
- GITHUB_TOKEN:
- required: true
jobs:
build:
From 50c31e32bae60948350102e568a25643363f79d1 Mon Sep 17 00:00:00 2001
From: Michael7371 <40476797+Michael7371@users.noreply.github.com>
Date: Wed, 12 Jun 2024 16:04:13 -0600
Subject: [PATCH 08/19] Removing dev trigger as tags are immutable in GitHub's
Maven Registry.
---
.github/workflows/artifact-publish-dev-caller.yml | 14 --------------
.github/workflows/artifact-publish-tag-caller.yml | 12 ------------
.github/workflows/artifact-publish.yml | 10 +++-------
3 files changed, 3 insertions(+), 33 deletions(-)
delete mode 100644 .github/workflows/artifact-publish-dev-caller.yml
delete mode 100644 .github/workflows/artifact-publish-tag-caller.yml
diff --git a/.github/workflows/artifact-publish-dev-caller.yml b/.github/workflows/artifact-publish-dev-caller.yml
deleted file mode 100644
index 858d6c7..0000000
--- a/.github/workflows/artifact-publish-dev-caller.yml
+++ /dev/null
@@ -1,14 +0,0 @@
-name: Publish Java Package Snapshot
-
-on:
- push:
- branches:
- - develop
- - dev
- - cicd-artifact-publishing
-
-jobs:
- publish-java-package:
- uses: ./.github/workflows/artifact-publish.yml
- with:
- drop-snapshot: false
\ No newline at end of file
diff --git a/.github/workflows/artifact-publish-tag-caller.yml b/.github/workflows/artifact-publish-tag-caller.yml
deleted file mode 100644
index bf4ed53..0000000
--- a/.github/workflows/artifact-publish-tag-caller.yml
+++ /dev/null
@@ -1,12 +0,0 @@
-name: Publish Java Package Tag
-
-on:
- push:
- tags:
- - "*"
-
-jobs:
- publish-java-package:
- uses: ./.github/workflows/artifact-publish.yml
- with:
- drop-snapshot: true
\ No newline at end of file
diff --git a/.github/workflows/artifact-publish.yml b/.github/workflows/artifact-publish.yml
index ee6e895..d77c5bb 100644
--- a/.github/workflows/artifact-publish.yml
+++ b/.github/workflows/artifact-publish.yml
@@ -1,12 +1,9 @@
name: Publish Java Package
on:
- workflow_call:
- inputs:
- drop-snapshot:
- description: 'Drop snapshot from version'
- required: true
- type: boolean
+ push:
+ tags:
+ - "*"
jobs:
build:
@@ -23,7 +20,6 @@ jobs:
distribution: 'adopt'
- name: Remove snapshot from version
- if: ${{ github.event.inputs.drop-snapshot == 'true' }}
run: mvn versions:set -DremoveSnapshot
- name: Build with Maven
From cfcef3cb0eae87b5ec0105b6cfe42224de9262d8 Mon Sep 17 00:00:00 2001
From: Michael7371 <40476797+Michael7371@users.noreply.github.com>
Date: Thu, 13 Jun 2024 09:30:24 -0600
Subject: [PATCH 09/19] updating workflow to only trigger on release tags
---
.github/workflows/artifact-publish.yml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/.github/workflows/artifact-publish.yml b/.github/workflows/artifact-publish.yml
index d77c5bb..c09e1fc 100644
--- a/.github/workflows/artifact-publish.yml
+++ b/.github/workflows/artifact-publish.yml
@@ -3,7 +3,7 @@ name: Publish Java Package
on:
push:
tags:
- - "*"
+ - 'jpo-s3-deposit-*'
jobs:
build:
From f0da6daed4709ad9c3aef68ea817f792c320a104 Mon Sep 17 00:00:00 2001
From: dmccoystephenson
Date: Wed, 19 Jun 2024 15:05:31 -0600
Subject: [PATCH 10/19] Changed default value for enable.auto.commit property
to 'true'
---
sample.env | 2 +-
.../java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/sample.env b/sample.env
index a7f202e..884afc6 100644
--- a/sample.env
+++ b/sample.env
@@ -15,7 +15,7 @@ HEADER_X_API_KEY=
KAFKA_TYPE=
CONFLUENT_KEY=
CONFLUENT_SECRET=
-# Defaults to false
+# Defaults to true
KAFKA_ENABLE_AUTO_COMMIT=
# Defaults to 1000
KAFKA_AUTO_COMMIT_INTERVAL_MS=
diff --git a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java
index d85a2d4..1f62e90 100644
--- a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java
+++ b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java
@@ -159,7 +159,7 @@ public void run(String[] args) throws Exception {
}
KAFKA_AUTO_COMMIT_INTERVAL_MS = getEnvironmentVariable("KAFKA_AUTO_COMMIT_INTERVAL_MS", "1000");
- KAFKA_ENABLE_AUTO_COMMIT = getEnvironmentVariable("KAFKA_ENABLE_AUTO_COMMIT", "false");
+ KAFKA_ENABLE_AUTO_COMMIT = getEnvironmentVariable("KAFKA_ENABLE_AUTO_COMMIT", "true");
KAFKA_SESSION_TIMEOUT_MS = getEnvironmentVariable("KAFKA_SESSION_TIMEOUT_MS", "30000");
props.put("group.id", group);
From da54ae142f33e4a352b4471cee6de8d5c327e874 Mon Sep 17 00:00:00 2001
From: Marc Wodahl
Date: Tue, 2 Jul 2024 07:57:47 -0600
Subject: [PATCH 11/19] Add unit tests
---
pom.xml | 52 ++++++
.../jpo/ode/aws/depositor/AwsDepositor.java | 161 +++++++++++-------
.../depositor/AddConfluentPropertiesTest.java | 19 +++
.../depositor/BuildFirehoseClientTest.java | 18 ++
.../ConvertStringToByteBufferTest.java | 20 +++
.../ode/aws/depositor/CreateS3ClientTest.java | 25 +++
.../aws/depositor/CreateSampleFileTest.java | 23 +++
.../aws/depositor/DepositToFirehoseTest.java | 75 ++++++++
.../ode/aws/depositor/DepositToGCSTest.java | 35 ++++
.../ode/aws/depositor/DepositToS3Test.java | 41 +++++
.../aws/depositor/GenerateAWSProfileTest.java | 88 ++++++++++
.../depositor/GetEnvironmentVariableTest.java | 34 ++++
.../its/jpo/ode/aws/depositor/RunTest.java | 103 +++++++++++
13 files changed, 628 insertions(+), 66 deletions(-)
create mode 100644 src/test/java/us/dot/its/jpo/ode/aws/depositor/AddConfluentPropertiesTest.java
create mode 100644 src/test/java/us/dot/its/jpo/ode/aws/depositor/BuildFirehoseClientTest.java
create mode 100644 src/test/java/us/dot/its/jpo/ode/aws/depositor/ConvertStringToByteBufferTest.java
create mode 100644 src/test/java/us/dot/its/jpo/ode/aws/depositor/CreateS3ClientTest.java
create mode 100644 src/test/java/us/dot/its/jpo/ode/aws/depositor/CreateSampleFileTest.java
create mode 100644 src/test/java/us/dot/its/jpo/ode/aws/depositor/DepositToFirehoseTest.java
create mode 100644 src/test/java/us/dot/its/jpo/ode/aws/depositor/DepositToGCSTest.java
create mode 100644 src/test/java/us/dot/its/jpo/ode/aws/depositor/DepositToS3Test.java
create mode 100644 src/test/java/us/dot/its/jpo/ode/aws/depositor/GenerateAWSProfileTest.java
create mode 100644 src/test/java/us/dot/its/jpo/ode/aws/depositor/GetEnvironmentVariableTest.java
create mode 100644 src/test/java/us/dot/its/jpo/ode/aws/depositor/RunTest.java
diff --git a/pom.xml b/pom.xml
index d6e0ad1..4e6a7f4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,6 +10,8 @@
21
21
+ 1.49
+ 2.0.2
@@ -18,6 +20,26 @@
4.13.1
test
+
+ org.junit.jupiter
+ junit-jupiter-api
+ 5.9.3
+ test
+
+
+ org.jmockit
+ jmockit
+ ${jmockit.version}
+ test
+
+
+ org.mockito
+ mockito-core
+ 3.3.3
+ test
+
+
+
org.apache.kafka
@@ -99,6 +121,36 @@
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 3.2.5
+
+ -javaagent:${user.home}/.m2/repository/org/jmockit/jmockit/${jmockit.version}/jmockit-${jmockit.version}.jar -Xshare:off
+
+ ${loader.path}
+ ${project.build.directory}
+
+
+ testValue
+
+ testAccessKey
+ testSecretKey
+ testSessionToken
+ 2020-01-01 00:00:00
+ testApiEndpoint
+ testConfluentKey
+ testConfluentSecret
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 3.2.5
+
+
+
diff --git a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java
index d85a2d4..218a1bc 100644
--- a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java
+++ b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java
@@ -26,10 +26,24 @@
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentials;
@@ -53,19 +67,6 @@
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public class AwsDepositor {
private final Logger logger = LoggerFactory.getLogger(AwsDepositor.class);
private final long CONSUMER_POLL_TIMEOUT_MS = 60000;
@@ -79,6 +80,8 @@ public class AwsDepositor {
private String keyName;
private boolean waitOpt;
+ private boolean runDepositor = true;
+
private String K_AWS_ACCESS_KEY_ID;
private String K_AWS_SECRET_ACCESS_KEY;
private String K_AWS_SESSION_TOKEN;
@@ -98,45 +101,12 @@ public class AwsDepositor {
public static void main(String[] args) throws Exception {
AwsDepositor awsDepositor = new AwsDepositor();
- awsDepositor.run(args);
+ awsDepositor.run();
}
- public void run(String[] args) throws Exception {
- endpoint = getEnvironmentVariable("BOOTSTRAP_SERVER", "");
- topic = getEnvironmentVariable("DEPOSIT_TOPIC", "");
- group = getEnvironmentVariable("DEPOSIT_GROUP", "");
- destination = getEnvironmentVariable("DESTINATION", "firehose");
- if (System.getenv("WAIT") != null && System.getenv("WAIT") != "")
- { waitOpt = true; }
- else
- { waitOpt = false; }
-
- // S3 properties
- bucketName = getEnvironmentVariable("DEPOSIT_BUCKET_NAME", "");
- awsRegion = getEnvironmentVariable("REGION", "us-east-1");
- keyName = getEnvironmentVariable("DEPOSIT_KEY_NAME", "");
-
- K_AWS_ACCESS_KEY_ID = getEnvironmentVariable("AWS_ACCESS_KEY_ID", "AccessKeyId");
- K_AWS_SECRET_ACCESS_KEY = getEnvironmentVariable("AWS_SECRET_ACCESS_KEY", "SecretAccessKey");
- K_AWS_SESSION_TOKEN = getEnvironmentVariable("AWS_SESSION_TOKEN", "SessionToken");
- K_AWS_EXPIRATION = getEnvironmentVariable("AWS_EXPIRATION", "Expiration");
- API_ENDPOINT = getEnvironmentVariable("API_ENDPOINT", "");
- HEADER_Accept = getEnvironmentVariable("HEADER_ACCEPT", "application/json");
- HEADER_X_API_KEY = getEnvironmentVariable("HEADER_X_API_KEY", "");
-
- logger.debug("Bucket name: {}", bucketName);
- logger.debug("AWS Region: {}", awsRegion);
- logger.debug("Key name: {}", keyName);
- logger.debug("Kafka topic: {}", topic);
- logger.debug("Destination: {}", destination);
- logger.debug("Wait: {}", waitOpt);
- logger.debug("AWS_ACCESS_KEY_ID: {}", K_AWS_ACCESS_KEY_ID);
- logger.debug("AWS_SECRET_ACCESS_KEY: {}", K_AWS_SECRET_ACCESS_KEY);
- logger.debug("AWS_SESSION_TOKEN: {}", K_AWS_SESSION_TOKEN);
- logger.debug("AWS_EXPIRATION: {}", K_AWS_EXPIRATION);
- logger.debug("API_ENDPOINT: {}", API_ENDPOINT);
- logger.debug("HEADER_Accept: {}", HEADER_Accept);
- logger.debug("HEADER_X_API_KEY: {}", HEADER_X_API_KEY);
+ public void run() throws Exception {
+ // Pull in environment variables
+ depositorSetup();
if (API_ENDPOINT.length() > 0) {
JSONObject profile = generateAWSProfile();
@@ -187,8 +157,8 @@ public void run(String[] args) throws Exception {
}
- while (true) {
- KafkaConsumer stringConsumer = new KafkaConsumer(props);
+ while (getRunDepositor()) {
+ KafkaConsumer stringConsumer = getKafkaConsumer(props);
logger.debug("Subscribing to topic " + topic);
stringConsumer.subscribe(Arrays.asList(topic));
@@ -196,7 +166,7 @@ public void run(String[] args) throws Exception {
try {
boolean gotMessages = false;
- while (true) {
+ while (getRunDepositor()) {
ConsumerRecords records = stringConsumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT_MS));
if (records != null && !records.isEmpty()) {
for (ConsumerRecord record : records) {
@@ -234,7 +204,7 @@ public void run(String[] args) throws Exception {
}
}
- private static void addConfluentProperties(Properties props) {
+ static void addConfluentProperties(Properties props) {
props.put("ssl.endpoint.identification.algorithm", "https");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
@@ -250,7 +220,7 @@ private static void addConfluentProperties(Properties props) {
}
}
- private void depositToFirehose(AmazonKinesisFirehoseAsync firehose, ConsumerRecord record)
+ void depositToFirehose(AmazonKinesisFirehoseAsync firehose, ConsumerRecord record)
throws InterruptedException, ExecutionException, IOException {
try {
// IMPORTANT!!!
@@ -261,9 +231,8 @@ private void depositToFirehose(AmazonKinesisFirehoseAsync firehose, ConsumerReco
ByteBuffer data = convertStringToByteBuffer(msg, Charset.defaultCharset());
// Check the expiration time for the profile credentials
- LocalDateTime current_datetime = LocalDateTime.now();
- LocalDateTime expiration_datetime = LocalDateTime.parse(AWS_EXPIRATION,
- DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+ LocalDateTime current_datetime = getLocalDateTime();
+ LocalDateTime expiration_datetime = getExpirationDateTime();
System.out.println();
if (expiration_datetime.isBefore(current_datetime) && API_ENDPOINT.length() > 0) {
// If credential is expired, generate aws credentials
@@ -307,7 +276,7 @@ private void depositToFirehose(AmazonKinesisFirehoseAsync firehose, ConsumerReco
}
}
- private void depositToS3(AmazonS3 s3, ConsumerRecord record) throws IOException {
+ void depositToS3(AmazonS3 s3, ConsumerRecord record) throws IOException {
try {
long time = System.currentTimeMillis();
String timeStamp = Long.toString(time);
@@ -346,7 +315,7 @@ private void depositToS3(AmazonS3 s3, ConsumerRecord record) thr
}
}
- private void depositToGCS(Storage gcsStorage, String depositBucket, ConsumerRecord record) {
+ void depositToGCS(Storage gcsStorage, String depositBucket, ConsumerRecord record) {
String recordValue = record.value();
Bucket bucket = gcsStorage.get(depositBucket);
byte[] bytes = recordValue.getBytes(Charset.defaultCharset());
@@ -362,7 +331,7 @@ private void depositToGCS(Storage gcsStorage, String depositBucket, ConsumerReco
}
}
- private AmazonKinesisFirehoseAsync buildFirehoseClient(String awsRegion) {
+ AmazonKinesisFirehoseAsync buildFirehoseClient(String awsRegion) {
// Default is to deposit to Kinesis/Firehose, override via .env
// variables if S3 deposit desired
logger.debug("=============================");
@@ -372,7 +341,7 @@ private AmazonKinesisFirehoseAsync buildFirehoseClient(String awsRegion) {
return AmazonKinesisFirehoseAsyncClientBuilder.standard().withRegion(awsRegion).build();
}
- private AmazonS3 createS3Client(String awsRegion) {
+ AmazonS3 createS3Client(String awsRegion) {
logger.debug("============== ========");
logger.debug("Connecting to Amazon S3");
logger.debug("=======================");
@@ -397,7 +366,7 @@ public ByteBuffer convertStringToByteBuffer(String msg, Charset charset) {
return ByteBuffer.wrap(msg.getBytes(charset));
}
- private File createSampleFile(String json) throws IOException {
+ File createSampleFile(String json) throws IOException {
File file = File.createTempFile("aws-java-sdk-", ".json");
file.deleteOnExit();
@@ -408,8 +377,8 @@ private File createSampleFile(String json) throws IOException {
return file;
}
- private JSONObject generateAWSProfile() throws IOException {
- CloseableHttpClient client = HttpClients.createDefault();
+ JSONObject generateAWSProfile() throws IOException {
+ CloseableHttpClient client = getHttpClient();
HttpPost httpPost = new HttpPost(API_ENDPOINT);
JSONObject jsonResult = new JSONObject();
String json = "{}";
@@ -435,7 +404,9 @@ private JSONObject generateAWSProfile() throws IOException {
return jsonResult;
}
- private static String getEnvironmentVariable(String variableName, String defaultValue) {
+ static String getEnvironmentVariable(String variableName, String defaultValue) {
+ // get all environment variables
+ Map env = System.getenv();
String value = System.getenv(variableName);
if (value == null || value.equals("")) {
System.out.println("Something went wrong retrieving the environment variable " + variableName);
@@ -445,4 +416,62 @@ private static String getEnvironmentVariable(String variableName, String default
return value;
}
+ CloseableHttpClient getHttpClient() {
+ return HttpClients.createDefault();
+ }
+
+ LocalDateTime getLocalDateTime() {
+ return LocalDateTime.now();
+ }
+
+ LocalDateTime getExpirationDateTime() {
+ return LocalDateTime.parse(K_AWS_EXPIRATION,
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+ }
+
+ void depositorSetup() {
+ endpoint = getEnvironmentVariable("BOOTSTRAP_SERVER", "");
+ topic = getEnvironmentVariable("DEPOSIT_TOPIC", "");
+ group = getEnvironmentVariable("DEPOSIT_GROUP", "");
+ destination = getEnvironmentVariable("DESTINATION", "firehose");
+ if (System.getenv("WAIT") != null && System.getenv("WAIT") != "")
+ { waitOpt = true; }
+ else
+ { waitOpt = false; }
+
+ // S3 properties
+ bucketName = getEnvironmentVariable("DEPOSIT_BUCKET_NAME", "");
+ awsRegion = getEnvironmentVariable("REGION", "us-east-1");
+ keyName = getEnvironmentVariable("DEPOSIT_KEY_NAME", "");
+
+ K_AWS_ACCESS_KEY_ID = getEnvironmentVariable("AWS_ACCESS_KEY_ID", "AccessKeyId");
+ K_AWS_SECRET_ACCESS_KEY = getEnvironmentVariable("AWS_SECRET_ACCESS_KEY", "SecretAccessKey");
+ K_AWS_SESSION_TOKEN = getEnvironmentVariable("AWS_SESSION_TOKEN", "SessionToken");
+ K_AWS_EXPIRATION = getEnvironmentVariable("AWS_EXPIRATION", "Expiration");
+ API_ENDPOINT = getEnvironmentVariable("API_ENDPOINT", "");
+ HEADER_Accept = getEnvironmentVariable("HEADER_ACCEPT", "application/json");
+ HEADER_X_API_KEY = getEnvironmentVariable("HEADER_X_API_KEY", "");
+
+ logger.debug("Bucket name: {}", bucketName);
+ logger.debug("AWS Region: {}", awsRegion);
+ logger.debug("Key name: {}", keyName);
+ logger.debug("Kafka topic: {}", topic);
+ logger.debug("Destination: {}", destination);
+ logger.debug("Wait: {}", waitOpt);
+ logger.debug("AWS_ACCESS_KEY_ID: {}", K_AWS_ACCESS_KEY_ID);
+ logger.debug("AWS_SECRET_ACCESS_KEY: {}", K_AWS_SECRET_ACCESS_KEY);
+ logger.debug("AWS_SESSION_TOKEN: {}", K_AWS_SESSION_TOKEN);
+ logger.debug("AWS_EXPIRATION: {}", K_AWS_EXPIRATION);
+ logger.debug("API_ENDPOINT: {}", API_ENDPOINT);
+ logger.debug("HEADER_Accept: {}", HEADER_Accept);
+ logger.debug("HEADER_X_API_KEY: {}", HEADER_X_API_KEY);
+ }
+
+ boolean getRunDepositor() {
+ return runDepositor;
+ }
+
+ KafkaConsumer getKafkaConsumer(Properties props) {
+ return new KafkaConsumer<>(props);
+ }
}
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/AddConfluentPropertiesTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/AddConfluentPropertiesTest.java
new file mode 100644
index 0000000..c3862b1
--- /dev/null
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/AddConfluentPropertiesTest.java
@@ -0,0 +1,19 @@
+package us.dot.its.jpo.ode.aws.depositor;
+
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import org.junit.jupiter.api.Test;
+
+public class AddConfluentPropertiesTest {
+ @Test
+ public void testAddConfluentProperties() {
+ Properties props = new Properties();
+ AwsDepositor.addConfluentProperties(props);
+
+ assertEquals("https", props.getProperty("ssl.endpoint.identification.algorithm"));
+ assertEquals("SASL_SSL", props.getProperty("security.protocol"));
+ assertEquals("PLAIN", props.getProperty("sasl.mechanism"));
+ assertEquals("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"testConfluentKey\" password=\"testConfluentSecret\";" , props.getProperty("sasl.jaas.config"));
+ }
+}
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/BuildFirehoseClientTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/BuildFirehoseClientTest.java
new file mode 100644
index 0000000..1683c59
--- /dev/null
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/BuildFirehoseClientTest.java
@@ -0,0 +1,18 @@
+package us.dot.its.jpo.ode.aws.depositor;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import org.junit.jupiter.api.Test;
+
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseAsync;
+
+public class BuildFirehoseClientTest {
+ @Test
+ public void testBuildFirehoseClient() {
+ AwsDepositor awsDepositor = new AwsDepositor();
+ String awsRegion = "us-east-1";
+
+ AmazonKinesisFirehoseAsync firehose = awsDepositor.buildFirehoseClient(awsRegion);
+
+ assertNotNull(firehose);
+ }
+}
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/ConvertStringToByteBufferTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/ConvertStringToByteBufferTest.java
new file mode 100644
index 0000000..f3ea403
--- /dev/null
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/ConvertStringToByteBufferTest.java
@@ -0,0 +1,20 @@
+package us.dot.its.jpo.ode.aws.depositor;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import org.junit.jupiter.api.Test;
+
+public class ConvertStringToByteBufferTest {
+ @Test
+ public void testConvertStringToByteBuffer() {
+ AwsDepositor awsDepositor = new AwsDepositor();
+ String input = "Test";
+ ByteBuffer expected = ByteBuffer.wrap(input.getBytes(StandardCharsets.UTF_8));
+
+ ByteBuffer result = awsDepositor.convertStringToByteBuffer(input, StandardCharsets.UTF_8);
+
+ assertEquals(expected, result);
+ }
+}
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/CreateS3ClientTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/CreateS3ClientTest.java
new file mode 100644
index 0000000..79bb555
--- /dev/null
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/CreateS3ClientTest.java
@@ -0,0 +1,25 @@
+package us.dot.its.jpo.ode.aws.depositor;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import org.junit.jupiter.api.Test;
+
+import com.amazonaws.services.s3.AmazonS3;
+
+public class CreateS3ClientTest {
+
+ @Test
+ public void testCreateS3Client() {
+ AwsDepositor awsDepositor = new AwsDepositor();
+ AmazonS3 s3Client = awsDepositor.createS3Client("us-east-1");
+ assertNotNull(s3Client);
+ }
+
+ @Test
+ public void testCreateS3Client_InvalidCredentials() {
+ AwsDepositor awsDepositor = new AwsDepositor();
+ assertThrows(IllegalArgumentException.class, () -> {
+ awsDepositor.createS3Client("invalid-region");
+ });
+ }
+}
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/CreateSampleFileTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/CreateSampleFileTest.java
new file mode 100644
index 0000000..6570fd4
--- /dev/null
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/CreateSampleFileTest.java
@@ -0,0 +1,23 @@
+package us.dot.its.jpo.ode.aws.depositor;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import org.junit.jupiter.api.Test;
+
+public class CreateSampleFileTest {
+ @Test
+ public void testCreateSampleFile() throws IOException {
+ AwsDepositor awsDepositor = new AwsDepositor();
+ String json = "{\"key\": \"value\"}";
+ File file = awsDepositor.createSampleFile(json);
+ assertNotNull(file);
+ assertTrue(file.exists());
+ assertTrue(file.isFile());
+ assertEquals(".json", file.getName().substring(file.getName().lastIndexOf(".")));
+ file.delete();
+ }
+}
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/DepositToFirehoseTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/DepositToFirehoseTest.java
new file mode 100644
index 0000000..5fd06e8
--- /dev/null
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/DepositToFirehoseTest.java
@@ -0,0 +1,75 @@
+package us.dot.its.jpo.ode.aws.depositor;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.time.LocalDateTime;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.json.JSONObject;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseAsync;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordRequest;
+
+public class DepositToFirehoseTest {
+
+ @Test
+ public void testDepositToFirehose() throws InterruptedException, ExecutionException, IOException {
+
+ // Create a mock AmazonKinesisFirehoseAsync instance
+ AmazonKinesisFirehoseAsync firehose = mock(AmazonKinesisFirehoseAsync.class);
+
+ // Create a mock ConsumerRecord
+ ConsumerRecord mockRecord = mock(ConsumerRecord.class);
+ when(mockRecord.value()).thenReturn("Test Record");
+
+ AwsDepositor depositor = spy(new AwsDepositor());
+ doReturn(LocalDateTime.of(2024, 6, 26, 12, 0, 0)).when(depositor).getLocalDateTime();
+ doReturn(LocalDateTime.of(2024, 6, 26, 10, 0, 0)).when(depositor).getExpirationDateTime();
+
+ JSONObject generateAwsReturnVal = new JSONObject();
+ generateAwsReturnVal.put("testAccessKey", "test-access-key-id");
+ generateAwsReturnVal.put("testSecretKey", "test-secret-key");
+ generateAwsReturnVal.put("testSessionToken", "test-token");
+ generateAwsReturnVal.put("2020-01-01 00:00:00", "test-expiration");
+
+ doReturn(generateAwsReturnVal).when(depositor).generateAWSProfile();
+
+ // pull in necessary environment variables
+ depositor.depositorSetup();
+
+ // Call the depositToFirehose method
+ depositor.depositToFirehose(firehose, mockRecord);
+
+ // Verify that the putRecordAsync method was called on the mock AmazonKinesisFirehoseAsync instance
+ ArgumentCaptor putRecordRequestCaptor = ArgumentCaptor.forClass(PutRecordRequest.class);
+ verify(firehose).putRecordAsync(putRecordRequestCaptor.capture());
+
+ // Assert PutRecordRequest value is as expected
+ PutRecordRequest putRecordRequestResult = putRecordRequestCaptor.getValue();
+ assertEquals("Test Record\n", convertByteBufferToString(putRecordRequestResult.getRecord().getData()));
+ }
+
+ @Test
+ public void testGetExpirationDateTime() {
+ AwsDepositor depositor = new AwsDepositor();
+ depositor.depositorSetup();
+ LocalDateTime result = depositor.getExpirationDateTime();
+ assertEquals(LocalDateTime.of(2020, 01, 01, 0, 0, 0), result);
+ }
+
+ private String convertByteBufferToString(ByteBuffer buffer) {
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.get(bytes);
+ return new String(bytes, Charset.defaultCharset());
+ }
+}
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/DepositToGCSTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/DepositToGCSTest.java
new file mode 100644
index 0000000..b0cbdef
--- /dev/null
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/DepositToGCSTest.java
@@ -0,0 +1,35 @@
+package us.dot.its.jpo.ode.aws.depositor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.Test;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.Bucket;
+import com.google.cloud.storage.Storage;
+
+public class DepositToGCSTest {
+ @Test
+ public void testDepositToGCS() {
+ Storage gcsStorage = mock(Storage.class);
+ Bucket bucket = mock(Bucket.class);
+ Blob blob = mock(Blob.class);
+
+ ConsumerRecord record = mock(ConsumerRecord.class);
+ when(record.value()).thenReturn("test");
+
+ when(gcsStorage.get(anyString())).thenReturn(bucket);
+ when(bucket.create(anyString(), any(byte[].class))).thenReturn(blob);
+
+ AwsDepositor awsDepositor = new AwsDepositor();
+
+ awsDepositor.depositToGCS(gcsStorage, "depositBucket", record);
+
+ verify(gcsStorage).get("depositBucket");
+ verify(bucket).create(anyString(), any(byte[].class));
+ }
+}
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/DepositToS3Test.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/DepositToS3Test.java
new file mode 100644
index 0000000..6da5f0a
--- /dev/null
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/DepositToS3Test.java
@@ -0,0 +1,41 @@
+package us.dot.its.jpo.ode.aws.depositor;
+
+import java.io.IOException;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import static org.junit.Assert.assertNotNull;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+
+
+public class DepositToS3Test {
+ @Test
+ public void testDepositToS3() throws IOException {
+ // Mock necessary classes
+ AmazonS3 s3 = mock(AmazonS3.class);
+ ConsumerRecord mockRecord = mock(ConsumerRecord.class);
+
+ PutObjectResult result = new PutObjectResult();
+ when(mockRecord.value()).thenReturn("Test Record");
+ when(s3.putObject(any())).thenReturn(result);
+
+ AwsDepositor awsDepositor = new AwsDepositor();
+ awsDepositor.depositToS3(s3, mockRecord);
+
+ // Verify that the putObject method was called on the mock AmazonS3 instance
+ ArgumentCaptor putObjectRequestCaptor = ArgumentCaptor.forClass(PutObjectRequest.class);
+ verify(s3).putObject(putObjectRequestCaptor.capture());
+
+ // Assert that the putObjectRequest was created correctly
+ PutObjectRequest putObjectRequestResult = putObjectRequestCaptor.getValue();
+ assertNotNull(putObjectRequestResult);
+ }
+}
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/GenerateAWSProfileTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/GenerateAWSProfileTest.java
new file mode 100644
index 0000000..b81aec5
--- /dev/null
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/GenerateAWSProfileTest.java
@@ -0,0 +1,88 @@
+package us.dot.its.jpo.ode.aws.depositor;
+
+import java.io.IOException;
+
+import org.apache.http.HttpVersion;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.message.BasicStatusLine;
+import org.json.JSONObject;
+import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import org.junit.jupiter.api.Test;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import mockit.Verifications;
+
+public class GenerateAWSProfileTest {
+ @Test
+ void testGenerateAWSProfileSuccess() throws Exception {
+
+ AwsDepositor depositor = spy(new AwsDepositor());
+
+ // Mock the CloseableHttpResponse
+ CloseableHttpResponse mockResponse = mock(CloseableHttpResponse.class);
+ when(mockResponse.getStatusLine()).thenReturn(new BasicStatusLine(HttpVersion.HTTP_1_1, 200, "OK"));
+ when(mockResponse.getEntity()).thenReturn(new StringEntity("{\"key\":\"value\"}"));
+
+ // Mock the CloseableHttpClient
+ CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
+ when(mockClient.execute(any())).thenReturn(mockResponse);
+
+ doReturn(mockClient).when(depositor).getHttpClient();
+
+ depositor.depositorSetup();
+ JSONObject result = depositor.generateAWSProfile();
+
+ assertNotNull(result);
+ assertEquals("value", result.getString("key"));
+
+ // Verify interactions
+ new Verifications() {{
+ mockClient.execute((HttpPost) any);
+ times = 1;
+
+ mockResponse.close();
+ times = 1;
+
+ mockClient.close();
+ times = 1;
+ }};
+ }
+
+ @Test
+ void testGenerateAWSProfileException() throws IOException {
+ AwsDepositor depositor = spy(new AwsDepositor());
+
+ // Mock the CloseableHttpResponse
+ CloseableHttpResponse mockResponse = mock(CloseableHttpResponse.class);
+ when(mockResponse.getStatusLine()).thenReturn(new BasicStatusLine(HttpVersion.HTTP_1_1, 200, "OK"));
+ when(mockResponse.getEntity()).thenReturn(null);
+
+ // Mock the CloseableHttpClient
+ CloseableHttpClient mockClient = mock(CloseableHttpClient.class);
+ when(mockClient.execute(any())).thenReturn(mockResponse);
+
+ doReturn(mockClient).when(depositor).getHttpClient();
+ Exception exception = assertThrows(Exception.class, depositor::generateAWSProfile);
+
+ // Verify the exception
+ assertNotNull(exception);
+
+ // Verify interactions
+ new Verifications() {{
+ mockClient.execute((HttpPost) any);
+ times = 1;
+
+ mockClient.close();
+ times = 1;
+ }};
+ }
+}
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/GetEnvironmentVariableTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/GetEnvironmentVariableTest.java
new file mode 100644
index 0000000..57c8f38
--- /dev/null
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/GetEnvironmentVariableTest.java
@@ -0,0 +1,34 @@
+package us.dot.its.jpo.ode.aws.depositor;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import org.junit.jupiter.api.Test;
+
+public class GetEnvironmentVariableTest {
+ private final String TEST_VARIABLE = "TEST_VARIABLE";
+ private final String TEST_VARIABLE_NO_ENV = "TEST_VARIABLE_NO_ENV";
+ private final String TEST_VARIABLE_EMPTY = "TEST_VARIABLE_EMPTY";
+ private final String DEFAULT_VALUE = "default";
+
+ @Test
+ void testGetEnvironmentVariableExists() throws Exception {
+ String expectedValue = "testValue";
+
+ // Test
+ String result = AwsDepositor.getEnvironmentVariable(TEST_VARIABLE, "");
+ assertEquals(expectedValue, result);
+ }
+
+ @Test
+ void testGetEnvironmentVariableNotSet() {
+ // Test when the environment variable is not set
+ String result = AwsDepositor.getEnvironmentVariable(TEST_VARIABLE_NO_ENV, DEFAULT_VALUE);
+ assertEquals(DEFAULT_VALUE, result);
+ }
+
+ @Test
+ void testGetEnvironmentVariableEmpty() {
+ // Test
+ String result = AwsDepositor.getEnvironmentVariable(TEST_VARIABLE_EMPTY, DEFAULT_VALUE);
+ assertEquals(DEFAULT_VALUE, result);
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/RunTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/RunTest.java
new file mode 100644
index 0000000..d669487
--- /dev/null
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/RunTest.java
@@ -0,0 +1,103 @@
+package us.dot.its.jpo.ode.aws.depositor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.json.JSONObject;
+import org.junit.jupiter.api.Test;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import mockit.Verifications;
+
+public class RunTest {
+ @Test
+ public void testRunNoRecords() throws Exception {
+ AwsDepositor depositor = spy(new AwsDepositor());
+
+ KafkaConsumer mockConsumer = mock(KafkaConsumer.class);
+ when(mockConsumer.poll(any())).thenReturn(null);
+
+ doReturn(mockConsumer).when(depositor).getKafkaConsumer(any());
+ doReturn(true, true, false).when(depositor).getRunDepositor();
+
+ JSONObject generateAwsReturnVal = new JSONObject();
+ generateAwsReturnVal.put("testAccessKey", "test-access-key-id");
+ generateAwsReturnVal.put("testSecretKey", "test-secret-key");
+ generateAwsReturnVal.put("testSessionToken", "test-token");
+ generateAwsReturnVal.put("2020-01-01 00:00:00", "test-expiration");
+
+ doReturn(generateAwsReturnVal).when(depositor).generateAWSProfile();
+
+ new Verifications() {
+ {
+ depositor.getKafkaConsumer(any());
+ times = 1;
+ depositor.getRunDepositor();
+ times = 3;
+ }
+ };
+
+ depositor.run();
+ }
+
+ @Test
+ public void testRunRecords() throws Exception {
+ AwsDepositor depositor = spy(new AwsDepositor());
+
+ KafkaConsumer mockConsumer = mock(KafkaConsumer.class);
+ when(mockConsumer.poll(any())).thenReturn(null);
+
+ doReturn(mockConsumer).when(depositor).getKafkaConsumer(any());
+ doReturn(true, true, false).when(depositor).getRunDepositor();
+
+ JSONObject generateAwsReturnVal = new JSONObject();
+ generateAwsReturnVal.put("testAccessKey", "test-access-key-id");
+ generateAwsReturnVal.put("testSecretKey", "test-secret-key");
+ generateAwsReturnVal.put("testSessionToken", "test-token");
+ generateAwsReturnVal.put("2020-01-01 00:00:00", "test-expiration");
+
+ doReturn(generateAwsReturnVal).when(depositor).generateAWSProfile();
+
+ doNothing().when(depositor).depositToFirehose(any(), any());
+
+ // Create a list of ConsumerRecord
+ List> records = new ArrayList<>();
+ records.add(new ConsumerRecord<>("topic", 0, 0, "test", "test-value"));
+
+ // Create a TopicPartition object for your topic and partition
+ TopicPartition topicPartition = new TopicPartition("topic", 0);
+
+ // Create a map of TopicPartition to List of ConsumerRecord
+ Map>> recordsMap = new HashMap<>();
+ recordsMap.put(topicPartition, records);
+
+ // Initialize ConsumerRecords with the map
+ ConsumerRecords mockRecords = new ConsumerRecords<>(recordsMap);
+
+ when(mockConsumer.poll(any())).thenReturn(mockRecords);
+
+ new Verifications() {
+ {
+ depositor.getKafkaConsumer(any());
+ times = 1;
+ depositor.getRunDepositor();
+ times = 3;
+ depositor.depositToFirehose(any(), any());
+ times = 1;
+ }
+ };
+
+ depositor.run();
+ }
+}
From d2bf634c62e0c2c1d9f6f3e1d47ed980d4e9fac4 Mon Sep 17 00:00:00 2001
From: Marc Wodahl
Date: Wed, 3 Jul 2024 08:59:22 -0600
Subject: [PATCH 12/19] comment update
---
src/test/java/us/dot/its/jpo/ode/aws/depositor/RunTest.java | 6 +-----
1 file changed, 1 insertion(+), 5 deletions(-)
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/RunTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/RunTest.java
index d669487..a687ed3 100644
--- a/src/test/java/us/dot/its/jpo/ode/aws/depositor/RunTest.java
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/RunTest.java
@@ -71,18 +71,14 @@ public void testRunRecords() throws Exception {
doNothing().when(depositor).depositToFirehose(any(), any());
- // Create a list of ConsumerRecord
List> records = new ArrayList<>();
records.add(new ConsumerRecord<>("topic", 0, 0, "test", "test-value"));
-
- // Create a TopicPartition object for your topic and partition
+
TopicPartition topicPartition = new TopicPartition("topic", 0);
- // Create a map of TopicPartition to List of ConsumerRecord
Map>> recordsMap = new HashMap<>();
recordsMap.put(topicPartition, records);
- // Initialize ConsumerRecords with the map
ConsumerRecords mockRecords = new ConsumerRecords<>(recordsMap);
when(mockConsumer.poll(any())).thenReturn(mockRecords);
From 13e8d741bc474795deb37065ec7469b92b971287 Mon Sep 17 00:00:00 2001
From: Marc Wodahl
Date: Tue, 16 Jul 2024 13:32:33 -0600
Subject: [PATCH 13/19] Update mockito verifications to work with Java Test
Runner
---
.vscode/settings.json | 15 ++++++++
pom.xml | 36 +++++++++----------
.../aws/depositor/GenerateAWSProfileTest.java | 27 +++-----------
.../depositor/GetEnvironmentVariableTest.java | 17 ++++-----
.../its/jpo/ode/aws/depositor/RunTest.java | 31 +++++-----------
5 files changed, 53 insertions(+), 73 deletions(-)
create mode 100644 .vscode/settings.json
diff --git a/.vscode/settings.json b/.vscode/settings.json
new file mode 100644
index 0000000..0b4f2bb
--- /dev/null
+++ b/.vscode/settings.json
@@ -0,0 +1,15 @@
+{
+ "java.test.config": {
+ "env": {
+ "TEST_VARIABLE": "testValue",
+ "TEST_VARIABLE_EMPTY": "",
+ "AWS_ACCESS_KEY_ID": "testAccessKey",
+ "AWS_SECRET_ACCESS_KEY": "testSecretKey",
+ "AWS_EXPIRATION": "2020-01-01 00:00:00",
+ "AWS_SESSION_TOKEN": "testSessionToken",
+ "API_ENDPOINT": "testApiEndpoint",
+ "CONFLUENT_KEY": "testConfluentKey",
+ "CONFLUENT_SECRET": "testConfluentSecret",
+ }
+ },
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 4a73a41..d419435 100644
--- a/pom.xml
+++ b/pom.xml
@@ -8,18 +8,16 @@
jar
JPO AWS Depositor
- 21
- 21
+ 21
1.49
- 2.0.2
usdot-jpo-ode
- junit
- junit
- 4.13.1
+ org.jmockit
+ jmockit
+ ${jmockit.version}
test
@@ -27,19 +25,19 @@
junit-jupiter-api
5.9.3
test
-
-
- org.jmockit
- jmockit
- ${jmockit.version}
- test
-
-
- org.mockito
- mockito-core
- 3.3.3
- test
-
+
+
+ junit
+ junit
+ 4.13.2
+ test
+
+
+ org.mockito
+ mockito-core
+ 3.3.3
+ test
+
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/GenerateAWSProfileTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/GenerateAWSProfileTest.java
index b81aec5..21225a3 100644
--- a/src/test/java/us/dot/its/jpo/ode/aws/depositor/GenerateAWSProfileTest.java
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/GenerateAWSProfileTest.java
@@ -17,10 +17,10 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import mockit.Verifications;
-
public class GenerateAWSProfileTest {
@Test
void testGenerateAWSProfileSuccess() throws Exception {
@@ -44,17 +44,9 @@ void testGenerateAWSProfileSuccess() throws Exception {
assertNotNull(result);
assertEquals("value", result.getString("key"));
- // Verify interactions
- new Verifications() {{
- mockClient.execute((HttpPost) any);
- times = 1;
-
- mockResponse.close();
- times = 1;
-
- mockClient.close();
- times = 1;
- }};
+ verify(mockClient, times(1)).execute((HttpPost) any());
+ verify(mockResponse, times(1)).close();
+ verify(mockClient, times(1)).close();
}
@Test
@@ -75,14 +67,5 @@ void testGenerateAWSProfileException() throws IOException {
// Verify the exception
assertNotNull(exception);
-
- // Verify interactions
- new Verifications() {{
- mockClient.execute((HttpPost) any);
- times = 1;
-
- mockClient.close();
- times = 1;
- }};
}
}
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/GetEnvironmentVariableTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/GetEnvironmentVariableTest.java
index 57c8f38..ddc1c83 100644
--- a/src/test/java/us/dot/its/jpo/ode/aws/depositor/GetEnvironmentVariableTest.java
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/GetEnvironmentVariableTest.java
@@ -13,22 +13,19 @@ public class GetEnvironmentVariableTest {
void testGetEnvironmentVariableExists() throws Exception {
String expectedValue = "testValue";
- // Test
+ // Test when the environment variable is set
String result = AwsDepositor.getEnvironmentVariable(TEST_VARIABLE, "");
assertEquals(expectedValue, result);
}
@Test
- void testGetEnvironmentVariableNotSet() {
+ void testGetEnvironmentVariableNotSetOrEmpty() {
// Test when the environment variable is not set
- String result = AwsDepositor.getEnvironmentVariable(TEST_VARIABLE_NO_ENV, DEFAULT_VALUE);
- assertEquals(DEFAULT_VALUE, result);
- }
+ String notSetResult = AwsDepositor.getEnvironmentVariable(TEST_VARIABLE_NO_ENV, DEFAULT_VALUE);
+ assertEquals(DEFAULT_VALUE, notSetResult);
- @Test
- void testGetEnvironmentVariableEmpty() {
- // Test
- String result = AwsDepositor.getEnvironmentVariable(TEST_VARIABLE_EMPTY, DEFAULT_VALUE);
- assertEquals(DEFAULT_VALUE, result);
+ // Test when the environment variable is empty
+ String emptyResult = AwsDepositor.getEnvironmentVariable(TEST_VARIABLE_EMPTY, DEFAULT_VALUE);
+ assertEquals(DEFAULT_VALUE, emptyResult);
}
}
\ No newline at end of file
diff --git a/src/test/java/us/dot/its/jpo/ode/aws/depositor/RunTest.java b/src/test/java/us/dot/its/jpo/ode/aws/depositor/RunTest.java
index a687ed3..6bbec89 100644
--- a/src/test/java/us/dot/its/jpo/ode/aws/depositor/RunTest.java
+++ b/src/test/java/us/dot/its/jpo/ode/aws/depositor/RunTest.java
@@ -16,10 +16,10 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import mockit.Verifications;
-
public class RunTest {
@Test
public void testRunNoRecords() throws Exception {
@@ -39,16 +39,10 @@ public void testRunNoRecords() throws Exception {
doReturn(generateAwsReturnVal).when(depositor).generateAWSProfile();
- new Verifications() {
- {
- depositor.getKafkaConsumer(any());
- times = 1;
- depositor.getRunDepositor();
- times = 3;
- }
- };
-
depositor.run();
+
+ verify(depositor, times(1)).getKafkaConsumer(any());
+ verify(depositor, times(4)).getRunDepositor();
}
@Test
@@ -83,17 +77,10 @@ public void testRunRecords() throws Exception {
when(mockConsumer.poll(any())).thenReturn(mockRecords);
- new Verifications() {
- {
- depositor.getKafkaConsumer(any());
- times = 1;
- depositor.getRunDepositor();
- times = 3;
- depositor.depositToFirehose(any(), any());
- times = 1;
- }
- };
-
depositor.run();
+
+ verify(depositor, times(1)).getKafkaConsumer(any());
+ verify(depositor, times(4)).getRunDepositor();
+ verify(depositor, times(1)).depositToFirehose(any(), any());
}
}
From 1c24623345127a03020ddaefd5ea7724deffa13b Mon Sep 17 00:00:00 2001
From: Marc Wodahl
Date: Tue, 16 Jul 2024 13:35:58 -0600
Subject: [PATCH 14/19] Add argLine, jacoco.version to pom.xml
---
pom.xml | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/pom.xml b/pom.xml
index d419435..1b8415a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -12,6 +12,10 @@
1.49
usdot-jpo-ode
+
+ -javaagent:${user.home}/.m2/repository/org/jmockit/jmockit/${jmockit.version}/jmockit-${jmockit.version}.jar
+
+ 0.8.11
From 44979a56341b0e6a0dacd81ec49287bd7d309b0f Mon Sep 17 00:00:00 2001
From: Marc Wodahl
Date: Tue, 16 Jul 2024 13:41:42 -0600
Subject: [PATCH 15/19] revert java.version to maven.compiler.source and
maven.compiler.target
---
pom.xml | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index 1b8415a..6b52fa1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -8,7 +8,8 @@
jar
JPO AWS Depositor
- 21
+ 21
+ 21
1.49
usdot-jpo-ode
From df4901a5d2bee18ad2102f940258cbdb1deb82db Mon Sep 17 00:00:00 2001
From: dmccoystephenson
Date: Tue, 30 Jul 2024 13:18:04 -0600
Subject: [PATCH 16/19] Revised README
---
README.md | 151 +++++++++++++++++++-----------------------------------
1 file changed, 52 insertions(+), 99 deletions(-)
diff --git a/README.md b/README.md
index d0b9c07..83a0c19 100644
--- a/README.md
+++ b/README.md
@@ -1,9 +1,29 @@
-# Message Deposit Service
-
+# jpo-s3-deposit (Message Deposit Service)
This project is intended to serve as a consumer application to subscribe to a Kafka topic of streaming JSON, package the results as a JSON file, and deposit the resulting file into a predetermined Firehose/Kinesis, S3 Bucket, or Google Cloud Storage Bucket (GCS). This runs alongside the ODE and when deployed using Docker Compose, runs in a Docker container.
-## Quick Run
-The use of AWS credentials is being read from the machine's environmental variables. You may also set them in your bash profile. Note that when using Docker Compose from the main `jpo-ode` repository, these variables are set in the `.env` present in that repo.
+## Table of Contents
+- [Release Notes](#release-notes)
+- [Usage](#usage)
+- [Installation](#installation)
+- [Configuration](#configuration)
+- [Debugging](#debugging)
+- [Testing](#testing)
+
+## Release Notes
+The current version and release history of the Jpo-s3-deposit: [Jpo-s3-deposit Release Notes]()
+
+## Usage
+### Run with Docker
+1. Create a copy of `sample.env` and rename it to `.env`.
+2. Update the variable `DOCKER_HOST_IP` to the local IP address of the system running docker and set an admin user password with the `MONGO_DB_PASS` variable.
+ 1. If connecting to a separately deployed mongo cluster make sure to specify the `MONGO_IP` and `MONGO_PORT`.
+3. Navigate back to the root directory and run the following command: `docker compose -f docker-compose-mongo.yml up -d`
+4. Using either a local kafka install or [kcat](https://github.com/edenhill/kcat) to produce a sample message to one of the sink topics. Optionally, you can separately run the [ODE](https://github.com/usdot-jpo-ode/jpo-ode) and process messages directly from it's output.
+5. Using [MongoDB Compass](https://www.mongodb.com/products/compass) or another DB visualizer connect to the MongoDB database using this connection string: `mongodb://[admin_user]:[admin_password]@localhost:27017/`
+6. Now we are done! If everything is working properly you should see an ODE database with a collection for each kafka sink topic that contains messages.
+
+### Run manually
+The use of AWS credentials is being read from the machine's environment variables. You may also set them in your bash profile. Note that when using Docker Compose from the main `jpo-ode` repository, these variables are set in the `.env` present in that repo.
If depositing to GCS, credentials are read from a JSON service account key file. A sample service account file can be found at ./resources/google/sample_gcp_service_account.json.
Please note that if depositing to GCS the service account will need the storage.buckets.get and storage.objects.create permissions.
@@ -26,119 +46,53 @@ mvn clean compile assembly:single install
To run the jar, be sure to include the topic at the end and group id at the end. If this is not a distributed system, the group can be any string.
-```
-java -jar target/jpo-aws-depositor-0.0.1-SNAPSHOT-jar-with-dependencies.jar
-
-usage: Consumer Example
- -s,--bootstrap-server Endpoint ('ip:port')
- -d,--destination Destination (Optional, defaults to Kinesis/Firehose, put "s3" to override)
- -g,--group Consumer Group
- -k,--key_name Key Name
- -b,--bucket-name Bucket Name
- -t,--topic Topic Name
- -type,--type string|byte message type
- -i, --k-aws-key AWS key name (Optional, defaults to AccessKeyId)
- -a, --k-aws-secret-key AWS secret access key name (Optional, defaults to SecretAccessKey)
- -n, --k-aws-session-token AWS session token name (Optional, defaults to SessionToken)
- -e, --k-aws-expiration AWS expiration name (Optional, defaults Expiration)
- -u, --token-endpoint API token endpoint
- -h, --header-accept Header Accept (Optional, defaults to application/json)
- -x, --header-x-api-key Header X API key
-```
-Example Usage As Of: 3/2/18
-
-```
-java -jar target/jpo-aws-depositor-0.0.1-SNAPSHOT-jar-with-dependencies.jar --bootstrap-server 192.168.1.1:9092 -g group1 -t topic.OdeTimJson -b test-bucket-name -k "bsm/ingest/bsm-"
-```
-
-It should return the following confirmation
-
-```
-DEBUG - Bucket name: test-usdot-its-cvpilot-wydot-bsm
-DEBUG - Key name: bsm/ingest/wydot-bsm-
-DEBUG - Kafka topic: topic.OdeBsmJson
-DEBUG - Type: string
-DEBUG - Destination: null
-
-Subscribed to topic OdeTimJson
-```
Triggering an upload into the ODE, the output should be seen decoded into JSON in the console.
![CLI-output](images/cli-output.png)
-## Additional Resources
-
-With the Kafka installed locally on a machine, here are a few additional commands that may be helpful while debugging Kafka topics.
-
-[Kafka Install Instructions](https://www.cloudera.com/documentation/kafka/latest/topics/kafka_installing.html#concept_ngx_4l4_4r)
+## Installation
+### Run Script
+The run.sh script can be utilized to run the project. This script will export the necessary environment variables, compile the project, and run the jar file.
-The IP used is the location of the Kafka endpoints.
-
-#### Create, alter, list, and describe topics.
+It should be noted that this script must be run from the project root folder, or it will not work.
-```
-kafka-topics --zookeeper 192.168.1.151:2181 --list
-sink1
-t1
-t2
-```
+### Launch Configurations
+A launch.json file with some launch configurations have been included to allow developers to debug the project in VSCode.
-#### Read data from a Kafka topic and write it to standard output.
+The values between braces < > are stand-in and need to be replaced by the developer.
-```
-kafka-console-consumer --zookeeper 192.168.1.151:2181 --topic topic.J2735Bsm
-```
+To run the project through the launch configuration and start debugging, the developer can navigate to the Run panel (View->Run or Ctrl+Shift+D), select the configuration at the top, and click the green arrow or press F5 to begin.
-#### Push data from standard output and write it into a Kafka topic.
+### Docker Compose Files
+The docker-compose.yml file can be used to spin up the depositor as a container, along with instances of kafka and zookeeper.
-```
-kafka-console-producer --broker-list 192.168.1.151:9092 --topic topic.J2735Bsm
-```
+The docker-compose-confluent-cloud.yml file can be used to spin up the depositor as a container by itself. This depends on an instance of kafka hosted by Confluent Cloud.
-# Confluent Cloud Integration
+## Configuration
+### Confluent Cloud Integration
Rather than using a local kafka instance, this project can utilize an instance of kafka hosted by Confluent Cloud via SASL.
-## Environment variables
-### Purpose & Usage
+#### Environment variables
+##### Purpose & Usage
- The DOCKER_HOST_IP environment variable is used to communicate with the bootstrap server that the instance of Kafka is running on.
- The KAFKA_TYPE environment variable specifies what type of kafka connection will be attempted and is used to check if Confluent should be utilized.
- The CONFLUENT_KEY and CONFLUENT_SECRET environment variables are used to authenticate with the bootstrap server.
-### Values
+##### Values
- DOCKER_HOST_IP must be set to the bootstrap server address (excluding the port)
- KAFKA_TYPE must be set to "CONFLUENT"
- CONFLUENT_KEY must be set to the API key being utilized for CC
- CONFLUENT_SECRET must be set to the API secret being utilized for CC
-## CC Docker Compose File
+#### CC Docker Compose File
There is a provided docker-compose file (docker-compose-confluent-cloud.yml) that passes the above environment variables into the container that gets created. Further, this file doesn't spin up a local kafka instance since it is not required.
-## Release Notes
-The current version and release history of the Jpo-s3-deposit: [Jpo-s3-deposit Release Notes]()
-## Note
+##### Note
This has only been tested with Confluent Cloud but technically all SASL authenticated Kafka brokers can be reached using this method.
-# Run Script
-The run.sh script can be utilized to run the PPM manually.
-
-It should be noted that this script must be run from the project root folder, or it will not work.
-
-# Docker Compose Files
-The docker-compose.yml file can be used to spin up the PPM as a container, along with instances of kafka and zookeeper.
-
-The docker-compose-confluent-cloud.yml file can be used to spin up the PPM as a container by itself. This depends on an instance of kafka hosted by Confluent Cloud.
-
-# Launch Configurations
-A launch.json file with some launch configurations have been included to allow developers to debug the project in VSCode.
-
-The values between braces < > are stand-in and need to be replaced by the developer.
-
-To run the project through the launch configuration and start debugging, the developer can navigate to the Run panel (View->Run or Ctrl+Shift+D), select the configuration at the top, and click the green arrow or press F5 to begin.
-
-# MongoDB Deposit Service
+### MongoDB Deposit Service
The mongo-connector service connects to specified Kafka topics (as defined in the mongo-connector/connect_start.sh script) and deposits these messages to separate collections in the MongoDB Database. The codebase that provides this functionality comes from Confluent using their community licensed [cp-kafka-connect image](https://hub.docker.com/r/confluentinc/cp-kafka-connect). Documentation for this image can be found [here](https://docs.confluent.io/platform/current/connect/index.html#what-is-kafka-connect).
-## Configuration
Provided in the mongo-connector directory is a sample configuration shell script ([connect_start.sh](./mongo-connector/connect_start.sh)) that can be used to create kafka connectors to MongoDB. The connectors in kafka connect are defined in the format that follows:
``` shell
declare -A config_name=([name]="topic_name" [collection]="mongo_collection_name"
@@ -152,18 +106,17 @@ createSink config_name
```
This needs to be put after the createSink function definition.
-## Quick Run
-1. Create a copy of `sample.env` and rename it to `.env`.
-2. Update the variable `DOCKER_HOST_IP` to the local IP address of the system running docker and set an admin user password with the `MONGO_DB_PASS` variable.
- 1. If connecting to a separately deployed mongo cluster make sure to specify the `MONGO_IP` and `MONGO_PORT`.
-3. Navigate back to the root directory and run the following command: `docker compose -f docker-compose-mongo.yml up -d`
-4. Using either a local kafka install or [kcat](https://github.com/edenhill/kcat) to produce a sample message to one of the sink topics. Optionally, you can separately run the [ODE](https://github.com/usdot-jpo-ode/jpo-ode) and process messages directly from it's output.
-5. Using [MongoDB Compass](https://www.mongodb.com/products/compass) or another DB visualizer connect to the MongoDB database using this connection string: `mongodb://[admin_user]:[admin_password]@localhost:27017/`
-6. Now we are done! If everything is working properly you should see an ODE database with a collection for each kafka sink topic that contains messages.
-
## Debugging
If the Kafka connect image crashes with the following error:
``` bash
bash: /scripts/connect_wait.sh: /bin/bash^M: bad interpreter: No such file or directory
```
-Please verify that the line endings in the ([connect_start.sh](./mongo-connector/connect_start.sh)) and ([connect_wait.sh](./mongo-connector/connect_wait.sh)) are set to LF instead of CRLF.
\ No newline at end of file
+Please verify that the line endings in the ([connect_start.sh](./mongo-connector/connect_start.sh)) and ([connect_wait.sh](./mongo-connector/connect_wait.sh)) are set to LF instead of CRLF.
+
+## Testing
+### Unit Tests
+To run the unit tests, reopen the project in the provided dev container and run the following command:
+``` bash
+mvn test
+```
+This will run the unit tests and provide a report of the results.
From afbd97a796d9dc2efe1f968b0047c019bd0422f5 Mon Sep 17 00:00:00 2001
From: dmccoystephenson
Date: Fri, 6 Sep 2024 08:54:40 -0600
Subject: [PATCH 17/19] Changed version to 1.6.0-SNAPSHOT
---
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index 6b52fa1..5e8e8e7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
usdot.jpo.ode
jpo-aws-depositor
- 1.5.0-SNAPSHOT
+ 1.6.0-SNAPSHOT
jar
JPO AWS Depositor
From 33535410f63c914a23ac52ebe701eee196f46173 Mon Sep 17 00:00:00 2001
From: dmccoystephenson
Date: Fri, 6 Sep 2024 08:57:41 -0600
Subject: [PATCH 18/19] Added release notes for version 1.6.0
---
docs/Release_notes.md | 12 ++++++++++++
1 file changed, 12 insertions(+)
diff --git a/docs/Release_notes.md b/docs/Release_notes.md
index b65d2bc..0de6fa7 100644
--- a/docs/Release_notes.md
+++ b/docs/Release_notes.md
@@ -1,6 +1,18 @@
Jpo-s3-deposit Release Notes
----------------------------
+Version 1.6.0, released September 2024
+----------------------------------------
+### **Summary**
+The changes for the jpo-s3-deposit 1.6.0 release include a GitHub action to publish a java artifact to the GitHub repository whenever a release is created, a change to the default value for the enable.auto.commit property to 'true', unit tests, and revised documentation for accuracy & clarity.
+
+Enhancements in this release:
+- CDOT PR 23: Added GitHub action to publish a java artifact to the GitHub repository whenever a release is created
+- CDOT PR 25: Changed default value for enable.auto.commit property to 'true'
+- CDOT PR 26: Added unit tests
+- CDOT PR 27: Revised documentation for accuracy & clarity
+
+
Version 1.5.0, released June 2024
----------------------------------------
### **Summary**
From 7abc88f2ec52f59dc68f189c54c9c855bed941ca Mon Sep 17 00:00:00 2001
From: dmccoystephenson
Date: Thu, 19 Sep 2024 15:51:58 -0600
Subject: [PATCH 19/19] Removed unnecessary declaration from
`getEnvironmentVariable()` method in `AwsDepositor` class
---
.../java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java | 2 --
1 file changed, 2 deletions(-)
diff --git a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java
index 0e98b02..722f338 100644
--- a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java
+++ b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java
@@ -405,8 +405,6 @@ JSONObject generateAWSProfile() throws IOException {
}
static String getEnvironmentVariable(String variableName, String defaultValue) {
- // get all environment variables
- Map env = System.getenv();
String value = System.getenv(variableName);
if (value == null || value.equals("")) {
System.out.println("Something went wrong retrieving the environment variable " + variableName);