Skip to content

Commit

Permalink
OpenSearch Domain capacity now configurable (#58)
Browse files Browse the repository at this point in the history
* OpenSearch Domain capacity now configurable

Signed-off-by: Chris Helma <[email protected]>

* Made capacity options individually changable

Signed-off-by: Chris Helma <[email protected]>

---------

Signed-off-by: Chris Helma <[email protected]>
  • Loading branch information
chelma authored Jun 2, 2023
1 parent cdc9da0 commit 77a8e5d
Show file tree
Hide file tree
Showing 18 changed files with 556 additions and 157 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ You can see your created cluster and the VPCs it is currently monitoring using t
./manage_arkime.py create-cluster --name MyCluster
```

By default, you will be given the minimum-size Capture Node fleet. If you have a specific amount of traffic you're expecting to need to be able to capture, you an specify it (in Gbps) using the `--expected-traffic` argument. The CLI will provision an EC2 Autoscaling Group that should be able to handle that amount of capture plus a little extra.
By default, you will be given the minimum-size Capture Cluster. You can provision a Cluster that will serve your expected usage using a set of optional command-line parameters, which will ensure the EC2 Capture Nodes and OpenSearch Domain are suitably provisioned (plus a little extra for safety):

```
./manage_arkime.py create-cluster --name MyCluster --expected-traffic 10
./manage_arkime.py create-cluster --name MyCluster --expected-traffic 1 --spi-days 30 --replicas 1
```

### Setting up capture for a VPC
Expand Down
21 changes: 11 additions & 10 deletions cdk-lib/capture-stacks/capture-nodes-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { Construct } from 'constructs';
import * as constants from '../core/constants';
import * as plan from '../core/capacity-plan';
import {ClusterSsmValue} from '../core/ssm-wrangling';
import * as user from '../core/user-config';

export interface CaptureNodesStackProps extends cdk.StackProps {
readonly captureBucket: s3.Bucket;
Expand All @@ -26,9 +27,9 @@ export interface CaptureNodesStackProps extends cdk.StackProps {
readonly clusterName: string;
readonly osDomain: opensearch.Domain;
readonly osPassword: secretsmanager.Secret;
readonly planCaptureNodes: plan.CaptureNodesPlan;
readonly planEcsResources: plan.EcsSysResourcePlan;
readonly planCluster: plan.ClusterPlan;
readonly ssmParamNameCluster: string;
readonly userConfig: user.UserConfig;
}

export class CaptureNodesStack extends cdk.Stack {
Expand Down Expand Up @@ -81,11 +82,11 @@ export class CaptureNodesStack extends cdk.Stack {
// Load Balancers do not properly integrate with ECS Fargate.
const autoScalingGroup = new autoscaling.AutoScalingGroup(this, 'ASG', {
vpc: props.captureVpc,
instanceType: new ec2.InstanceType(props.planCaptureNodes.instanceType),
instanceType: new ec2.InstanceType(props.planCluster.captureNodes.instanceType),
machineImage: ecs.EcsOptimizedImage.amazonLinux2(),
desiredCapacity: props.planCaptureNodes.desiredCount,
minCapacity: props.planCaptureNodes.minCount,
maxCapacity: props.planCaptureNodes.maxCount
desiredCapacity: props.planCluster.captureNodes.desiredCount,
minCapacity: props.planCluster.captureNodes.minCount,
maxCapacity: props.planCluster.captureNodes.maxCount
});

const asgSecurityGroup = new ec2.SecurityGroup(this, 'ASGSecurityGroup', {
Expand Down Expand Up @@ -162,8 +163,8 @@ export class CaptureNodesStack extends cdk.Stack {
'OPENSEARCH_ENDPOINT': props.osDomain.domainEndpoint,
'OPENSEARCH_SECRET_ARN': props.osPassword.secretArn,
},
cpu: props.planEcsResources.cpu,
memoryLimitMiB: props.planEcsResources.memory,
cpu: props.planCluster.ecsResources.cpu,
memoryLimitMiB: props.planCluster.ecsResources.memory,
portMappings: [
{ containerPort: 6081, hostPort: 6081, protocol: ecs.Protocol.UDP},
{ containerPort: healthCheckPort, hostPort: healthCheckPort, protocol: ecs.Protocol.TCP},
Expand Down Expand Up @@ -242,8 +243,8 @@ export class CaptureNodesStack extends cdk.Stack {
busName: clusterBus.eventBusName,
clusterName: props.clusterName,
vpceServiceId: gwlbEndpointService.ref,
captureNodesPlan: props.planCaptureNodes,
ecsSysResourcePlan: props.planEcsResources
capacityPlan: props.planCluster,
userConfig: props.userConfig
}
const clusterParam = new ssm.StringParameter(this, 'ClusterParam', {
allowedPattern: '.*',
Expand Down
10 changes: 8 additions & 2 deletions cdk-lib/capture-stacks/capture-vpc-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,21 @@ import * as ec2 from 'aws-cdk-lib/aws-ec2';
import * as logs from 'aws-cdk-lib/aws-logs';
import { Stack, StackProps } from 'aws-cdk-lib';

import * as plan from '../core/capacity-plan';

export interface CaptureVpcStackProps extends StackProps {
readonly planCluster: plan.ClusterPlan;
}

export class CaptureVpcStack extends Stack {
public readonly vpc: ec2.Vpc;
public readonly flowLog: ec2.FlowLog;

constructor(scope: Construct, id: string, props: StackProps) {
constructor(scope: Construct, id: string, props: CaptureVpcStackProps) {
super(scope, id, props);

this.vpc = new ec2.Vpc(this, 'VPC', {
maxAzs: 2,
maxAzs: props.planCluster.captureVpc.numAzs,
subnetConfiguration: [
{
subnetType: ec2.SubnetType.PUBLIC,
Expand Down
13 changes: 9 additions & 4 deletions cdk-lib/capture-stacks/opensearch-domain-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import {Domain, EngineVersion, TLSSecurityPolicy} from 'aws-cdk-lib/aws-opensear
import * as secretsmanager from 'aws-cdk-lib/aws-secretsmanager';
import * as ssm from 'aws-cdk-lib/aws-ssm';

import * as plan from '../core/capacity-plan';


export interface OpenSearchDomainStackProps extends StackProps {
readonly captureVpc: ec2.Vpc;
readonly planCluster: plan.ClusterPlan;
readonly ssmParamName: string;
}

Expand Down Expand Up @@ -64,19 +67,21 @@ export class OpenSearchDomainStack extends Stack {
version: EngineVersion.openSearch("2.5"),
enableVersionUpgrade: true,
capacity: {
masterNodes: 3,
dataNodes: 2,
masterNodes: props.planCluster.osDomain.masterNodes.count,
masterNodeInstanceType: props.planCluster.osDomain.masterNodes.instanceType,
dataNodes: props.planCluster.osDomain.dataNodes.count,
dataNodeInstanceType: props.planCluster.osDomain.dataNodes.instanceType
},
ebs: {
volumeSize: 20,
volumeSize: props.planCluster.osDomain.dataNodes.volumeSize,
},
nodeToNodeEncryption: true,
encryptionAtRest: {
enabled: true,
kmsKey: this.domainKey,
},
zoneAwareness: {
availabilityZoneCount: 2,
availabilityZoneCount: props.planCluster.captureVpc.numAzs,
},
logging: {
slowSearchLogEnabled: true,
Expand Down
10 changes: 6 additions & 4 deletions cdk-lib/cloud-demo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ switch(params.type) {
});

const captureVpcStack = new CaptureVpcStack(app, params.nameCaptureVpcStack, {
env: env
env: env,
planCluster: params.planCluster,
});

const osDomainStack = new OpenSearchDomainStack(app, params.nameOSDomainStack, {
env: env,
captureVpc: captureVpcStack.vpc,
planCluster: params.planCluster,
ssmParamName: params.nameOSDomainSsmParam,
});
osDomainStack.addDependency(captureVpcStack)
Expand All @@ -47,9 +49,9 @@ switch(params.type) {
clusterName: params.nameCluster,
osDomain: osDomainStack.domain,
osPassword: osDomainStack.osPassword,
planCaptureNodes: params.planCaptureNodes,
planEcsResources: params.planEcsResources,
ssmParamNameCluster: params.nameClusterSsmParam
planCluster: params.planCluster,
ssmParamNameCluster: params.nameClusterSsmParam,
userConfig: params.userConfig
});
captureNodesStack.addDependency(captureBucketStack)
captureNodesStack.addDependency(captureVpcStack)
Expand Down
44 changes: 43 additions & 1 deletion cdk-lib/core/capacity-plan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,46 @@ export interface CaptureNodesPlan {
export interface EcsSysResourcePlan {
cpu: number;
memory: number;
}
}

/**
* Structure to hold the capacity plan for an OS Domain's data nodes
*/
export interface DataNodesPlan {
count: number;
instanceType: string;
volumeSize: number;
}

/**
* Structure to hold the capacity plan for an OS Domain's master nodes
*/
export interface MasterNodesPlan {
count: number;
instanceType: string;
}

/**
* Structure to hold the overall capacity plan for an OS Domain
*/
export interface OSDomainPlan {
dataNodes: DataNodesPlan;
masterNodes: MasterNodesPlan;
}

/**
* Structure to hold the details of the cluster's Capture VPC
*/
export interface CaptureVpcPlan {
numAzs: number;
}

/**
* Structure to hold the overall capacity plan for an Arkime Cluster
*/
export interface ClusterPlan {
captureNodes: CaptureNodesPlan;
captureVpc: CaptureVpcPlan;
ecsResources: EcsSysResourcePlan;
osDomain: OSDomainPlan;
}
9 changes: 5 additions & 4 deletions cdk-lib/core/command-params.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import * as plan from './capacity-plan';
import * as user from './user-config';

/**
* Base type for receiving arguments from the Python side of the app. These directly match the interface on the Python
Expand All @@ -25,8 +26,8 @@ export interface ClusterMgmtParamsRaw extends CommandParamsRaw {
nameViewerPassSsmParam: string;
nameViewerUserSsmParam: string;
nameViewerNodesStack: string;
planCaptureNodes: string;
planEcsResources: string;
planCluster: string;
userConfig: string;
}

/**
Expand Down Expand Up @@ -89,8 +90,8 @@ export interface ClusterMgmtParams extends CommandParams {
nameViewerPassSsmParam: string;
nameViewerUserSsmParam: string;
nameViewerNodesStack: string;
planCaptureNodes: plan.CaptureNodesPlan;
planEcsResources: plan.EcsSysResourcePlan;
planCluster: plan.ClusterPlan;
userConfig: user.UserConfig;
}

/**
Expand Down
6 changes: 3 additions & 3 deletions cdk-lib/core/context-wrangling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {CDK_CONTEXT_CMD_VAR, CDK_CONTEXT_REGION_VAR, CDK_CONTEXT_PARAMS_VAR, Man
export function getCommandParams(app: cdk.App) : (prms.ClusterMgmtParams | prms.DeployDemoTrafficParams | prms.DestroyDemoTrafficParams | prms.MirrorMgmtParams) {
// This ENV variable is set by the CDK CLI. It reads it from your AWS Credential profile, and configures the var
// before invoking CDK actions.
const awsAccount: string | undefined = process.env.CDK_DEFAULT_ACCOUNT
const awsAccount: string | undefined = process.env.CDK_DEFAULT_ACCOUNT

// Like the CDK_DEFAULT_ACCOUNT, the CDK CLI sets the CDK_DEFAULT_REGION by reading the AWS Credential profile.
// However, we want the user to to able to specify a different region than the default so we optionaly pass in one
Expand Down Expand Up @@ -99,8 +99,8 @@ function validateArgs(args: ValidateArgs) : (prms.ClusterMgmtParams | prms.Deplo
nameViewerPassSsmParam: rawClusterMgmtParamsObj.nameViewerPassSsmParam,
nameViewerUserSsmParam: rawClusterMgmtParamsObj.nameViewerUserSsmParam,
nameViewerNodesStack: rawClusterMgmtParamsObj.nameViewerNodesStack,
planCaptureNodes: JSON.parse(rawClusterMgmtParamsObj.planCaptureNodes),
planEcsResources: JSON.parse(rawClusterMgmtParamsObj.planEcsResources),
planCluster: JSON.parse(rawClusterMgmtParamsObj.planCluster),
userConfig: JSON.parse(rawClusterMgmtParamsObj.userConfig),
}
return clusterMgmtParams;
case ManagementCmd.AddVpc: // Add and Remove VPC use the same parameters
Expand Down
5 changes: 3 additions & 2 deletions cdk-lib/core/ssm-wrangling.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import * as plan from '../core/capacity-plan';
import * as user from '../core/user-config';

/**
* This file contains functions and types that define a shared interface with the Python management CLI; the two need
Expand All @@ -11,8 +12,8 @@ export interface ClusterSsmValue {
readonly busName: string;
readonly clusterName: string;
readonly vpceServiceId: string;
readonly captureNodesPlan: plan.CaptureNodesPlan;
readonly ecsSysResourcePlan: plan.EcsSysResourcePlan;
readonly capacityPlan: plan.ClusterPlan;
readonly userConfig: user.UserConfig;
}

export interface SubnetSsmValue {
Expand Down
8 changes: 8 additions & 0 deletions cdk-lib/core/user-config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/**
* Structure to hold the user's input configuration
*/
export interface UserConfig {
expectedTraffic: number;
spiDays: number;
replicas: number;
}
22 changes: 17 additions & 5 deletions manage_arkime.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from commands.list_clusters import cmd_list_clusters
from commands.remove_vpc import cmd_remove_vpc
import constants as constants
from core.capacity_planning import MAX_TRAFFIC, MINIMUM_TRAFFIC
from core.capacity_planning import MAX_TRAFFIC, DEFAULT_SPI_DAYS, DEFAULT_SPI_REPLICAS
from logging_wrangler import LoggingWrangler, set_boto_log_level

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -57,16 +57,28 @@ def destroy_demo_traffic(ctx):
@click.option("--name", help="The name you want your Arkime Cluster and its associated resources to have", required=True)
@click.option(
"--expected-traffic",
help=("The amount of traffic, in gigabits-per-second, you expect your Arkime Cluster to receive."
+ f"Minimum: {MINIMUM_TRAFFIC} Gbps, Maximum: {MAX_TRAFFIC} Gbps"),
help=("The average amount of traffic, in gigabits-per-second, you expect your Arkime Cluster to receive."
+ f"Maximum: {MAX_TRAFFIC} Gbps"),
default=None,
type=click.FLOAT,
required=False)
@click.option(
"--spi-days",
help=(f"The number of days to store SPI metadata in the OpenSearch Domain. Default: {DEFAULT_SPI_DAYS}"),
default=None,
type=click.INT,
required=False)
@click.option(
"--replicas",
help=(f"The number replicas to make of the SPI metadata in the OpenSearch Domain. Default: {DEFAULT_SPI_REPLICAS}"),
default=None,
type=click.INT,
required=False)
@click.pass_context
def create_cluster(ctx, name, expected_traffic):
def create_cluster(ctx, name, expected_traffic, spi_days, replicas):
profile = ctx.obj.get("profile")
region = ctx.obj.get("region")
cmd_create_cluster(profile, region, name, expected_traffic)
cmd_create_cluster(profile, region, name, expected_traffic, spi_days, replicas)
cli.add_command(create_cluster)

@click.command(help="Tears down the Arkime Cluster in your account; by default, leaves your data intact")
Expand Down
25 changes: 16 additions & 9 deletions manage_arkime/cdk_interactions/cdk_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
from typing import Dict

import constants as constants
from core.capacity_planning import CaptureNodesPlan, INSTANCE_TYPE_CAPTURE_NODE, EcsSysResourcePlan
from core.capacity_planning import (CaptureNodesPlan, CaptureVpcPlan, ClusterPlan, DataNodesPlan, EcsSysResourcePlan,
MasterNodesPlan, OSDomainPlan, INSTANCE_TYPE_CAPTURE_NODE, DEFAULT_NUM_AZS)
from core.user_config import UserConfig

def generate_create_cluster_context(name: str, viewer_cert_arn: str, capture_plan: CaptureNodesPlan, ecs_resource_plan: EcsSysResourcePlan) -> Dict[str, str]:
create_context = _generate_cluster_context(name, viewer_cert_arn, capture_plan, ecs_resource_plan)
def generate_create_cluster_context(name: str, viewer_cert_arn: str, cluster_plan: ClusterPlan, user_config: UserConfig) -> Dict[str, str]:
create_context = _generate_cluster_context(name, viewer_cert_arn, cluster_plan, user_config)
create_context[constants.CDK_CONTEXT_CMD_VAR] = constants.CMD_CREATE_CLUSTER
return create_context

Expand All @@ -15,14 +17,19 @@ def generate_destroy_cluster_context(name: str) -> Dict[str, str]:
# we're tearing down the Cfn stack in which it would be used, the operation either succeeds they are irrelevant
# or it fails/rolls back they are irrelevant.
fake_arn = "N/A"
fake_capture_capacity = CaptureNodesPlan(INSTANCE_TYPE_CAPTURE_NODE, 1, 2, 1)
fake_resource_plan = EcsSysResourcePlan(1, 1)
fake_cluster_plan = ClusterPlan(
CaptureNodesPlan(INSTANCE_TYPE_CAPTURE_NODE, 1, 2, 1),
CaptureVpcPlan(1),
EcsSysResourcePlan(1, 1),
OSDomainPlan(DataNodesPlan(2, "t3.small.search", 100), MasterNodesPlan(3, "m6g.large.search"))
)
fake_user_config = UserConfig(1, 1, 1)

destroy_context = _generate_cluster_context(name, fake_arn, fake_capture_capacity, fake_resource_plan)
destroy_context = _generate_cluster_context(name, fake_arn, fake_cluster_plan, fake_user_config)
destroy_context[constants.CDK_CONTEXT_CMD_VAR] = constants.CMD_DESTROY_CLUSTER
return destroy_context

def _generate_cluster_context(name: str, viewer_cert_arn: str, capture_plan: CaptureNodesPlan, ecs_resources_plan: EcsSysResourcePlan) -> Dict[str, str]:
def _generate_cluster_context(name: str, viewer_cert_arn: str, cluster_plan: ClusterPlan, user_config: UserConfig) -> Dict[str, str]:
cmd_params = {
"nameCluster": name,
"nameCaptureBucketStack": constants.get_capture_bucket_stack_name(name),
Expand All @@ -37,8 +44,8 @@ def _generate_cluster_context(name: str, viewer_cert_arn: str, capture_plan: Cap
"nameViewerPassSsmParam": constants.get_viewer_password_ssm_param_name(name),
"nameViewerUserSsmParam": constants.get_viewer_user_ssm_param_name(name),
"nameViewerNodesStack": constants.get_viewer_nodes_stack_name(name),
"planCaptureNodes": json.dumps(capture_plan.to_dict()),
"planEcsResources": json.dumps(ecs_resources_plan.to_dict())
"planCluster": json.dumps(cluster_plan.to_dict()),
"userConfig": json.dumps(user_config.to_dict()),
}

return {
Expand Down
Loading

0 comments on commit 77a8e5d

Please sign in to comment.