Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a source service to stream onemac data to kafka #1292

Merged
merged 6 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions deploy.sh
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit tests? Cypress automated tests?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No automated tests were written as part of this changeset.

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ services=(
'admin'
)

# Only deploy source service for higher envs
if [[ "$stage" == "develop" || "$stage" == "master" || "$stage" == "production" ]]; then
services+=('source')
fi


set -e
for i in "${services[@]}"; do
deploy $i
Expand Down
7 changes: 7 additions & 0 deletions services/source/.eslintrc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
rules:
no-console: off
# TODO: remove this after fixing the names
"@typescript-eslint/camelcase": off
"@typescript-eslint/no-var-requires": off
# TODO turn this rule back on once we figure out types for this module
"@typescript-eslint/explicit-module-boundary-types": off
8 changes: 8 additions & 0 deletions services/source/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# package directories
node_modules
jspm_packages

# Serverless directories
.serverless

.dump.json
26 changes: 26 additions & 0 deletions services/source/handlers/cleanupKafka.js
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to add an actual timeout value for the deleteTopics or are we good with the default API timeout?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's set to 300s for the lambda and 295s for the kafkajs call. Are you seeing differently? @Dark-Knight-1313

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just note: We are taking the second table stream and Dynamo only allows two stream subscriptions so if we ever need anything else to use table streams we will have to reconstruct using a fan out pattern.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great point, and will keep this in mind. If you're comfortable moving ahead, I am.

Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { send, SUCCESS, FAILED } from "cfn-response-async";
import * as topics from "./../libs/topics-lib.js";

exports.handler = async function (event, context) {
console.log("Request:", JSON.stringify(event, undefined, 2));
const responseData = {};
let responseStatus = SUCCESS;
try {
const BrokerString = event.ResourceProperties.BrokerString;
const TopicPatternsToDelete =
event.ResourceProperties.TopicPatternsToDelete;
if (event.RequestType === "Create" || event.RequestType == "Update") {
console.log("This resource does nothing on Create and Update events.");
} else if (event.RequestType === "Delete") {
console.log(
`Attempting a delete for each of the following patterns: ${TopicPatternsToDelete}`
);
await topics.deleteTopics(BrokerString, TopicPatternsToDelete);
}
} catch (error) {
console.error(error);
responseStatus = FAILED;
} finally {
await send(event, context, responseStatus, responseData, "static");
}
};
42 changes: 42 additions & 0 deletions services/source/handlers/createTopics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { send, SUCCESS, FAILED } from "cfn-response-async";
import * as topics from "./../libs/topics-lib.js";

exports.handler = async function (event, context) {
console.log("Request:", JSON.stringify(event, undefined, 2));
const responseData = {};
let responseStatus = SUCCESS;
try {
const TopicsToCreate = event.ResourceProperties.TopicsToCreate;
const BrokerString = event.ResourceProperties.BrokerString;
const topicConfig = TopicsToCreate.map(function (element) {
const topic = element.name;
const replicationFactor = element.replicationFactor || 3;
const numPartitions = element.numPartitions || 1;
if (!topic) {
throw "Invalid configuration for TopicsToCreate. All entries must have a 'name' key with a string value.";
}
if (replicationFactor < 3) {
throw "Invalid configuration for TopicsToCreate. If specified, replicationFactor must be greater than or equal to 3.";
}
if (numPartitions < 1) {
throw "Invalid configuration for TopicsToCreate. If specified, numPartitions must be greater than or equal to 1.";
}
return {
topic,
numPartitions,
replicationFactor,
};
});
console.log(JSON.stringify(topicConfig, null, 2));
if (event.RequestType === "Create" || event.RequestType == "Update") {
await topics.createTopics(BrokerString, topicConfig);
} else if (event.RequestType === "Delete") {
console.log("This resource does nothing on Delete events.");
}
} catch (error) {
console.error(error);
responseStatus = FAILED;
} finally {
await send(event, context, responseStatus, responseData, "static");
}
};
77 changes: 77 additions & 0 deletions services/source/handlers/source.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import AWS from "aws-sdk";
const { Kafka } = require("kafkajs");
const kafka = new Kafka({
clientId: `${process.env.topic}-producer`,
brokers: process.env.brokerString.split(","),
retry: {
initialRetryTime: 300,
retries: 8,
},
ssl: {
rejectUnauthorized: false,
},
});
const producer = kafka.producer();
const signalTraps = ["SIGTERM", "SIGINT", "SIGUSR2", "beforeExit"];
signalTraps.map((type) => {
process.removeListener(type, producer.disconnect);
});
signalTraps.map((type) => {
process.once(type, producer.disconnect);
});
let connected = false;

function unmarshall(r) {
return AWS.DynamoDB.Converter.unmarshall(r, {
convertEmptyValues: true,
wrapNumbers: true,
});
}

function isValid(r) {
if (r.sk && r.sk.includes("SEATool")) {
console.log("Junk record detected.");
return false;
} else {
console.log("Valid record detected.");
return true;
}
}

exports.handler = async function (event) {
const messages = [];
for (const record of event.Records) {
if (record.eventName != "REMOVE") {
const r = unmarshall(record.dynamodb.NewImage);
if (isValid(r)) {
messages.push({
key: r.pk,
value: JSON.stringify(r),
partition: 0,
headers: { source: "onemac" },
});
}
} else {
const r = unmarshall(record.dynamodb.OldImage);
if (isValid(r)) {
messages.push({
key: r.pk,
value: null,
partition: 0,
headers: { source: "onemac" },
});
}
}
}
if (messages.length > 0) {
console.log(`Sending ${messages.length} messages to Kafka`);
if (!connected) {
await producer.connect();
connected = true;
}
await producer.send({
topic: process.env.topic,
messages,
});
}
};
133 changes: 133 additions & 0 deletions services/source/libs/topics-lib.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
const _ = require("lodash");
import { Kafka, ResourceTypes } from "kafkajs";

export async function createTopics(brokerString, topicsConfig) {
const topics = topicsConfig;
const brokers = brokerString.split(",");

const kafka = new Kafka({
clientId: "admin",
brokers: brokers,
ssl: true,
});
const admin = kafka.admin();

const create = async () => {
await admin.connect();

//fetch topics from MSK and filter out __ internal management topic
const existingTopicList = _.filter(await admin.listTopics(), function (n) {
return !n.startsWith("_");
});

console.log("Existing topics:", JSON.stringify(existingTopicList, null, 2));

//fetch the metadata for the topics in MSK
const topicsMetadata = _.get(
await admin.fetchTopicMetadata({ topics: existingTopicList }),
"topics",
{}
);
console.log("Topics Metadata:", JSON.stringify(topicsMetadata, null, 2));

//diff the existing topics array with the topic configuration collection
const topicsToCreate = _.differenceWith(
topics,
existingTopicList,
(topicConfig, topic) => _.get(topicConfig, "topic") == topic
);

//find interestion of topics metadata collection with topic configuration collection
//where partition count of topic in Kafka is less than what is specified in the topic configuration collection
//...can't remove partitions, only add them
const topicsToUpdate = _.intersectionWith(
topics,
topicsMetadata,
(topicConfig, topicMetadata) =>
_.get(topicConfig, "topic") == _.get(topicMetadata, "name") &&
_.get(topicConfig, "numPartitions") >
_.get(topicMetadata, "partitions", []).length
);

//create a collection to update topic paritioning
const paritionConfig = _.map(topicsToUpdate, function (topic) {
return {
topic: _.get(topic, "topic"),
count: _.get(topic, "numPartitions"),
};
});

//create a collection to allow querying of topic configuration
const configOptions = _.map(topicsMetadata, function (topic) {
return {
name: _.get(topic, "name"),
type: _.get(ResourceTypes, "TOPIC"),
};
});

//query topic configuration
const configs =
configOptions.length != 0
? await admin.describeConfigs({ resources: configOptions })
: [];

console.log("Topics to Create:", JSON.stringify(topicsToCreate, null, 2));
console.log("Topics to Update:", JSON.stringify(topicsToUpdate, null, 2));
console.log(
"Partitions to Update:",
JSON.stringify(paritionConfig, null, 2)
);
console.log(
"Topic configuration options:",
JSON.stringify(configs, null, 2)
);

//create topics that don't exist in MSK
await admin.createTopics({ topics: topicsToCreate });

//if any topics have less partitions in MSK than in the configuration, add those partitions
paritionConfig.length > 0 &&
(await admin.createPartitions({ topicPartitions: paritionConfig }));

await admin.disconnect();
};

await create();
}

export async function deleteTopics(brokerString, topicList) {
// Check that each topic in the list is something we can delete
for (const topic of topicList) {
if (!topic.match(/.*--.*--.*--.*/g)) {
throw "ERROR: The deleteTopics function only operates against topics that match /.*--.*--.*--.*/g";
}
}

const brokers = brokerString.split(",");

const kafka = new Kafka({
clientId: "admin",
brokers: brokers,
ssl: true,
requestTimeout: 295000, // 5s short of the lambda function's timeout
});
const admin = kafka.admin();

await admin.connect();

const currentTopics = await admin.listTopics();

const topicsToDelete = _.filter(currentTopics, function (currentTopic) {
return topicList.some((pattern) => {
return !!currentTopic.match(pattern);
});
});

console.log(`Deleting topics: ${topicsToDelete}`);
await admin.deleteTopics({
topics: topicsToDelete,
timeout: 295000,
});

await admin.disconnect();
}
29 changes: 29 additions & 0 deletions services/source/migrate.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# exit on error
set -eo pipefail

# tables
TABLE_FROM=$1
TABLE_TO=$2

# read
aws dynamodb scan \
--table-name "$TABLE_FROM" \
--output json \
| jq "[ .Items[] | { PutRequest: { Item: . } } ]" \
> "${TABLE_FROM}-dump.json"

table_size="$(cat "${TABLE_FROM}-dump.json" | jq '. | length')"
echo "table size: ${table_size}"

# write in batches of 25
for i in $(seq 0 25 $table_size); do
j=$(( i + 25 ))
cat "${TABLE_FROM}-dump.json" | jq -c '{ "'$TABLE_TO'": .['$i':'$j'] }' > "${TABLE_TO}-batch-payload.json"
echo "Loading records $i through $j (up to $table_size) into ${TABLE_TO}"
aws dynamodb batch-write-item --request-items file://"${TABLE_TO}-batch-payload.json"
rm "${TABLE_TO}-batch-payload.json"
done


# clean up
rm ".dump.json"
Loading