From 296736d3461b467d6955a5d6194f0a0a8e584103 Mon Sep 17 00:00:00 2001 From: Chris Helma Date: Thu, 1 Jun 2023 11:47:49 -0500 Subject: [PATCH 1/2] OpenSearch Domain capacity now configurable Signed-off-by: Chris Helma --- README.md | 4 +- cdk-lib/capture-stacks/capture-nodes-stack.ts | 18 +- cdk-lib/capture-stacks/capture-vpc-stack.ts | 10 +- .../capture-stacks/opensearch-domain-stack.ts | 13 +- cdk-lib/cloud-demo.ts | 7 +- cdk-lib/core/capacity-plan.ts | 44 ++++- cdk-lib/core/command-params.ts | 6 +- cdk-lib/core/context-wrangling.ts | 3 +- cdk-lib/core/ssm-wrangling.ts | 3 +- manage_arkime.py | 20 ++- manage_arkime/cdk_interactions/cdk_context.py | 22 ++- manage_arkime/commands/create_cluster.py | 46 +++-- manage_arkime/core/capacity_planning.py | 162 +++++++++++++----- .../commands/test_create_cluster.py | 102 ++++++++--- .../commands/test_destroy_cluster.py | 23 ++- .../core/test_capacity_planning.py | 128 +++++++++++++- 16 files changed, 477 insertions(+), 134 deletions(-) diff --git a/README.md b/README.md index 4541700..28e177e 100644 --- a/README.md +++ b/README.md @@ -106,10 +106,10 @@ You can see your created cluster and the VPCs it is currently monitoring using t ./manage_arkime.py create-cluster --name MyCluster ``` -By default, you will be given the minimum-size Capture Node fleet. If you have a specific amount of traffic you're expecting to need to be able to capture, you an specify it (in Gbps) using the `--expected-traffic` argument. The CLI will provision an EC2 Autoscaling Group that should be able to handle that amount of capture plus a little extra. +By default, you will be given the minimum-size Capture Cluster. You can provision a Cluster that will serve your expected usage using a set of optional command-line parameters, which will ensure the EC2 Capture Nodes and OpenSearch Domain are suitably provisioned (plus a little extra for safety): ``` -./manage_arkime.py create-cluster --name MyCluster --expected-traffic 10 +./manage_arkime.py create-cluster --name MyCluster --expected-traffic 1 --spi-days 30 --replicas 2 ``` ### Setting up capture for a VPC diff --git a/cdk-lib/capture-stacks/capture-nodes-stack.ts b/cdk-lib/capture-stacks/capture-nodes-stack.ts index 99179ab..0b8d504 100644 --- a/cdk-lib/capture-stacks/capture-nodes-stack.ts +++ b/cdk-lib/capture-stacks/capture-nodes-stack.ts @@ -26,8 +26,7 @@ export interface CaptureNodesStackProps extends cdk.StackProps { readonly clusterName: string; readonly osDomain: opensearch.Domain; readonly osPassword: secretsmanager.Secret; - readonly planCaptureNodes: plan.CaptureNodesPlan; - readonly planEcsResources: plan.EcsSysResourcePlan; + readonly planCluster: plan.ClusterPlan; readonly ssmParamNameCluster: string; } @@ -81,11 +80,11 @@ export class CaptureNodesStack extends cdk.Stack { // Load Balancers do not properly integrate with ECS Fargate. const autoScalingGroup = new autoscaling.AutoScalingGroup(this, 'ASG', { vpc: props.captureVpc, - instanceType: new ec2.InstanceType(props.planCaptureNodes.instanceType), + instanceType: new ec2.InstanceType(props.planCluster.captureNodes.instanceType), machineImage: ecs.EcsOptimizedImage.amazonLinux2(), - desiredCapacity: props.planCaptureNodes.desiredCount, - minCapacity: props.planCaptureNodes.minCount, - maxCapacity: props.planCaptureNodes.maxCount + desiredCapacity: props.planCluster.captureNodes.desiredCount, + minCapacity: props.planCluster.captureNodes.minCount, + maxCapacity: props.planCluster.captureNodes.maxCount }); const asgSecurityGroup = new ec2.SecurityGroup(this, 'ASGSecurityGroup', { @@ -162,8 +161,8 @@ export class CaptureNodesStack extends cdk.Stack { 'OPENSEARCH_ENDPOINT': props.osDomain.domainEndpoint, 'OPENSEARCH_SECRET_ARN': props.osPassword.secretArn, }, - cpu: props.planEcsResources.cpu, - memoryLimitMiB: props.planEcsResources.memory, + cpu: props.planCluster.ecsResources.cpu, + memoryLimitMiB: props.planCluster.ecsResources.memory, portMappings: [ { containerPort: 6081, hostPort: 6081, protocol: ecs.Protocol.UDP}, { containerPort: healthCheckPort, hostPort: healthCheckPort, protocol: ecs.Protocol.TCP}, @@ -242,8 +241,7 @@ export class CaptureNodesStack extends cdk.Stack { busName: clusterBus.eventBusName, clusterName: props.clusterName, vpceServiceId: gwlbEndpointService.ref, - captureNodesPlan: props.planCaptureNodes, - ecsSysResourcePlan: props.planEcsResources + capacityPlan: props.planCluster } const clusterParam = new ssm.StringParameter(this, 'ClusterParam', { allowedPattern: '.*', diff --git a/cdk-lib/capture-stacks/capture-vpc-stack.ts b/cdk-lib/capture-stacks/capture-vpc-stack.ts index 4e78422..cbf4c54 100644 --- a/cdk-lib/capture-stacks/capture-vpc-stack.ts +++ b/cdk-lib/capture-stacks/capture-vpc-stack.ts @@ -4,15 +4,21 @@ import * as ec2 from 'aws-cdk-lib/aws-ec2'; import * as logs from 'aws-cdk-lib/aws-logs'; import { Stack, StackProps } from 'aws-cdk-lib'; +import * as plan from '../core/capacity-plan'; + +export interface CaptureVpcStackProps extends StackProps { + readonly planCluster: plan.ClusterPlan; +} + export class CaptureVpcStack extends Stack { public readonly vpc: ec2.Vpc; public readonly flowLog: ec2.FlowLog; - constructor(scope: Construct, id: string, props: StackProps) { + constructor(scope: Construct, id: string, props: CaptureVpcStackProps) { super(scope, id, props); this.vpc = new ec2.Vpc(this, 'VPC', { - maxAzs: 2, + maxAzs: props.planCluster.captureVpc.numAzs, subnetConfiguration: [ { subnetType: ec2.SubnetType.PUBLIC, diff --git a/cdk-lib/capture-stacks/opensearch-domain-stack.ts b/cdk-lib/capture-stacks/opensearch-domain-stack.ts index 51b2e93..23f29c0 100644 --- a/cdk-lib/capture-stacks/opensearch-domain-stack.ts +++ b/cdk-lib/capture-stacks/opensearch-domain-stack.ts @@ -7,9 +7,12 @@ import {Domain, EngineVersion, TLSSecurityPolicy} from 'aws-cdk-lib/aws-opensear import * as secretsmanager from 'aws-cdk-lib/aws-secretsmanager'; import * as ssm from 'aws-cdk-lib/aws-ssm'; +import * as plan from '../core/capacity-plan'; + export interface OpenSearchDomainStackProps extends StackProps { readonly captureVpc: ec2.Vpc; + readonly planCluster: plan.ClusterPlan; readonly ssmParamName: string; } @@ -64,11 +67,13 @@ export class OpenSearchDomainStack extends Stack { version: EngineVersion.openSearch("2.5"), enableVersionUpgrade: true, capacity: { - masterNodes: 3, - dataNodes: 2, + masterNodes: props.planCluster.osDomain.masterNodes.count, + masterNodeInstanceType: props.planCluster.osDomain.masterNodes.instanceType, + dataNodes: props.planCluster.osDomain.dataNodes.count, + dataNodeInstanceType: props.planCluster.osDomain.dataNodes.instanceType }, ebs: { - volumeSize: 20, + volumeSize: props.planCluster.osDomain.dataNodes.volumeSize, }, nodeToNodeEncryption: true, encryptionAtRest: { @@ -76,7 +81,7 @@ export class OpenSearchDomainStack extends Stack { kmsKey: this.domainKey, }, zoneAwareness: { - availabilityZoneCount: 2, + availabilityZoneCount: props.planCluster.captureVpc.numAzs, }, logging: { slowSearchLogEnabled: true, diff --git a/cdk-lib/cloud-demo.ts b/cdk-lib/cloud-demo.ts index 8b3e386..27c7116 100644 --- a/cdk-lib/cloud-demo.ts +++ b/cdk-lib/cloud-demo.ts @@ -29,12 +29,14 @@ switch(params.type) { }); const captureVpcStack = new CaptureVpcStack(app, params.nameCaptureVpcStack, { - env: env + env: env, + planCluster: params.planCluster, }); const osDomainStack = new OpenSearchDomainStack(app, params.nameOSDomainStack, { env: env, captureVpc: captureVpcStack.vpc, + planCluster: params.planCluster, ssmParamName: params.nameOSDomainSsmParam, }); osDomainStack.addDependency(captureVpcStack) @@ -47,8 +49,7 @@ switch(params.type) { clusterName: params.nameCluster, osDomain: osDomainStack.domain, osPassword: osDomainStack.osPassword, - planCaptureNodes: params.planCaptureNodes, - planEcsResources: params.planEcsResources, + planCluster: params.planCluster, ssmParamNameCluster: params.nameClusterSsmParam }); captureNodesStack.addDependency(captureBucketStack) diff --git a/cdk-lib/core/capacity-plan.ts b/cdk-lib/core/capacity-plan.ts index 5a2e612..ccd509f 100644 --- a/cdk-lib/core/capacity-plan.ts +++ b/cdk-lib/core/capacity-plan.ts @@ -14,4 +14,46 @@ export interface CaptureNodesPlan { export interface EcsSysResourcePlan { cpu: number; memory: number; -} \ No newline at end of file +} + +/** + * Structure to hold the capacity plan for an OS Domain's data nodes + */ +export interface DataNodesPlan { + count: number; + instanceType: string; + volumeSize: number; +} + +/** + * Structure to hold the capacity plan for an OS Domain's master nodes + */ +export interface MasterNodesPlan { + count: number; + instanceType: string; +} + +/** + * Structure to hold the overall capacity plan for an OS Domain + */ +export interface OSDomainPlan { + dataNodes: DataNodesPlan; + masterNodes: MasterNodesPlan; +} + +/** + * Structure to hold the details of the cluster's Capture VPC + */ +export interface CaptureVpcPlan { + numAzs: number; +} + +/** + * Structure to hold the overall capacity plan for an Arkime Cluster + */ +export interface ClusterPlan { + captureNodes: CaptureNodesPlan; + captureVpc: CaptureVpcPlan; + ecsResources: EcsSysResourcePlan; + osDomain: OSDomainPlan; +} diff --git a/cdk-lib/core/command-params.ts b/cdk-lib/core/command-params.ts index 1b90488..6638227 100644 --- a/cdk-lib/core/command-params.ts +++ b/cdk-lib/core/command-params.ts @@ -25,8 +25,7 @@ export interface ClusterMgmtParamsRaw extends CommandParamsRaw { nameViewerPassSsmParam: string; nameViewerUserSsmParam: string; nameViewerNodesStack: string; - planCaptureNodes: string; - planEcsResources: string; + planCluster: string; } /** @@ -89,8 +88,7 @@ export interface ClusterMgmtParams extends CommandParams { nameViewerPassSsmParam: string; nameViewerUserSsmParam: string; nameViewerNodesStack: string; - planCaptureNodes: plan.CaptureNodesPlan; - planEcsResources: plan.EcsSysResourcePlan; + planCluster: plan.ClusterPlan; } /** diff --git a/cdk-lib/core/context-wrangling.ts b/cdk-lib/core/context-wrangling.ts index 9921c81..ea60068 100644 --- a/cdk-lib/core/context-wrangling.ts +++ b/cdk-lib/core/context-wrangling.ts @@ -99,8 +99,7 @@ function validateArgs(args: ValidateArgs) : (prms.ClusterMgmtParams | prms.Deplo nameViewerPassSsmParam: rawClusterMgmtParamsObj.nameViewerPassSsmParam, nameViewerUserSsmParam: rawClusterMgmtParamsObj.nameViewerUserSsmParam, nameViewerNodesStack: rawClusterMgmtParamsObj.nameViewerNodesStack, - planCaptureNodes: JSON.parse(rawClusterMgmtParamsObj.planCaptureNodes), - planEcsResources: JSON.parse(rawClusterMgmtParamsObj.planEcsResources), + planCluster: JSON.parse(rawClusterMgmtParamsObj.planCluster), } return clusterMgmtParams; case ManagementCmd.AddVpc: // Add and Remove VPC use the same parameters diff --git a/cdk-lib/core/ssm-wrangling.ts b/cdk-lib/core/ssm-wrangling.ts index cfd44c4..7ae9019 100644 --- a/cdk-lib/core/ssm-wrangling.ts +++ b/cdk-lib/core/ssm-wrangling.ts @@ -11,8 +11,7 @@ export interface ClusterSsmValue { readonly busName: string; readonly clusterName: string; readonly vpceServiceId: string; - readonly captureNodesPlan: plan.CaptureNodesPlan; - readonly ecsSysResourcePlan: plan.EcsSysResourcePlan; + readonly capacityPlan: plan.ClusterPlan; } export interface SubnetSsmValue { diff --git a/manage_arkime.py b/manage_arkime.py index 0d6c55d..e0900b7 100755 --- a/manage_arkime.py +++ b/manage_arkime.py @@ -12,7 +12,7 @@ from commands.list_clusters import cmd_list_clusters from commands.remove_vpc import cmd_remove_vpc import constants as constants -from core.capacity_planning import MAX_TRAFFIC, MINIMUM_TRAFFIC +from core.capacity_planning import MAX_TRAFFIC, DEFAULT_SPI_DAYS, DEFAULT_SPI_REPLICAS from logging_wrangler import LoggingWrangler, set_boto_log_level logger = logging.getLogger(__name__) @@ -58,15 +58,27 @@ def destroy_demo_traffic(ctx): @click.option( "--expected-traffic", help=("The amount of traffic, in gigabits-per-second, you expect your Arkime Cluster to receive." - + f"Minimum: {MINIMUM_TRAFFIC} Gbps, Maximum: {MAX_TRAFFIC} Gbps"), + + f"Maximum: {MAX_TRAFFIC} Gbps"), default=None, type=click.FLOAT, required=False) +@click.option( + "--spi-days", + help=(f"The number of days to store SPI metadata in the OpenSearch Domain. Default: {DEFAULT_SPI_DAYS}"), + default=None, + type=click.INT, + required=False) +@click.option( + "--replicas", + help=(f"The number replicas to make of the SPI metadata in the OpenSearch Domain. Default: {DEFAULT_SPI_REPLICAS}"), + default=None, + type=click.INT, + required=False) @click.pass_context -def create_cluster(ctx, name, expected_traffic): +def create_cluster(ctx, name, expected_traffic, spi_days, replicas): profile = ctx.obj.get("profile") region = ctx.obj.get("region") - cmd_create_cluster(profile, region, name, expected_traffic) + cmd_create_cluster(profile, region, name, expected_traffic, spi_days, replicas) cli.add_command(create_cluster) @click.command(help="Tears down the Arkime Cluster in your account; by default, leaves your data intact") diff --git a/manage_arkime/cdk_interactions/cdk_context.py b/manage_arkime/cdk_interactions/cdk_context.py index e7333fe..4e5e567 100644 --- a/manage_arkime/cdk_interactions/cdk_context.py +++ b/manage_arkime/cdk_interactions/cdk_context.py @@ -3,10 +3,11 @@ from typing import Dict import constants as constants -from core.capacity_planning import CaptureNodesPlan, INSTANCE_TYPE_CAPTURE_NODE, EcsSysResourcePlan +from core.capacity_planning import (CaptureNodesPlan, CaptureVpcPlan, ClusterPlan, DataNodesPlan, EcsSysResourcePlan, + MasterNodesPlan, OSDomainPlan, INSTANCE_TYPE_CAPTURE_NODE, DEFAULT_NUM_AZS) -def generate_create_cluster_context(name: str, viewer_cert_arn: str, capture_plan: CaptureNodesPlan, ecs_resource_plan: EcsSysResourcePlan) -> Dict[str, str]: - create_context = _generate_cluster_context(name, viewer_cert_arn, capture_plan, ecs_resource_plan) +def generate_create_cluster_context(name: str, viewer_cert_arn: str, cluster_plan: ClusterPlan) -> Dict[str, str]: + create_context = _generate_cluster_context(name, viewer_cert_arn, cluster_plan) create_context[constants.CDK_CONTEXT_CMD_VAR] = constants.CMD_CREATE_CLUSTER return create_context @@ -15,14 +16,18 @@ def generate_destroy_cluster_context(name: str) -> Dict[str, str]: # we're tearing down the Cfn stack in which it would be used, the operation either succeeds they are irrelevant # or it fails/rolls back they are irrelevant. fake_arn = "N/A" - fake_capture_capacity = CaptureNodesPlan(INSTANCE_TYPE_CAPTURE_NODE, 1, 2, 1) - fake_resource_plan = EcsSysResourcePlan(1, 1) + fake_cluster_plan = ClusterPlan( + CaptureNodesPlan(INSTANCE_TYPE_CAPTURE_NODE, 1, 2, 1), + CaptureVpcPlan(1), + EcsSysResourcePlan(1, 1), + OSDomainPlan(DataNodesPlan(2, "t3.small.search", 100), MasterNodesPlan(3, "m6g.large.search")) + ) - destroy_context = _generate_cluster_context(name, fake_arn, fake_capture_capacity, fake_resource_plan) + destroy_context = _generate_cluster_context(name, fake_arn, fake_cluster_plan) destroy_context[constants.CDK_CONTEXT_CMD_VAR] = constants.CMD_DESTROY_CLUSTER return destroy_context -def _generate_cluster_context(name: str, viewer_cert_arn: str, capture_plan: CaptureNodesPlan, ecs_resources_plan: EcsSysResourcePlan) -> Dict[str, str]: +def _generate_cluster_context(name: str, viewer_cert_arn: str, cluster_plan: ClusterPlan) -> Dict[str, str]: cmd_params = { "nameCluster": name, "nameCaptureBucketStack": constants.get_capture_bucket_stack_name(name), @@ -37,8 +42,7 @@ def _generate_cluster_context(name: str, viewer_cert_arn: str, capture_plan: Cap "nameViewerPassSsmParam": constants.get_viewer_password_ssm_param_name(name), "nameViewerUserSsmParam": constants.get_viewer_user_ssm_param_name(name), "nameViewerNodesStack": constants.get_viewer_nodes_stack_name(name), - "planCaptureNodes": json.dumps(capture_plan.to_dict()), - "planEcsResources": json.dumps(ecs_resources_plan.to_dict()) + "planCluster": json.dumps(cluster_plan.to_dict()), } return { diff --git a/manage_arkime/commands/create_cluster.py b/manage_arkime/commands/create_cluster.py index d46d1cb..139ece8 100644 --- a/manage_arkime/commands/create_cluster.py +++ b/manage_arkime/commands/create_cluster.py @@ -8,16 +8,22 @@ from cdk_interactions.cdk_client import CdkClient import cdk_interactions.cdk_context as context import constants as constants -from core.capacity_planning import get_capture_node_capacity_plan, get_ecs_sys_resource_plan, CaptureNodesPlan, EcsSysResourcePlan, MINIMUM_TRAFFIC +from core.capacity_planning import (get_capture_node_capacity_plan, get_ecs_sys_resource_plan, get_os_domain_plan, ClusterPlan, + CaptureNodesPlan, CaptureVpcPlan, EcsSysResourcePlan, OSDomainPlan, MINIMUM_TRAFFIC, + DEFAULT_SPI_DAYS, DEFAULT_SPI_REPLICAS, DEFAULT_NUM_AZS) logger = logging.getLogger(__name__) -def cmd_create_cluster(profile: str, region: str, name: str, expected_traffic: float): +class MustProvideAllParams(Exception): + def __init__(self): + super().__init__("If you specify one of the optional capacity parameters, you must specify all of them.") + +def cmd_create_cluster(profile: str, region: str, name: str, expected_traffic: float, spi_days: int, replicas: int): logger.debug(f"Invoking create-cluster with profile '{profile}' and region '{region}'") aws_provider = AwsClientProvider(aws_profile=profile, aws_region=region) - capture_plan, ecs_resource_plan = _get_capacity_plans(name, expected_traffic, aws_provider) + capacity_plan = _get_capacity_plan(name, expected_traffic, spi_days, replicas, aws_provider) cert_arn = _set_up_viewer_cert(name, aws_provider) @@ -29,33 +35,41 @@ def cmd_create_cluster(profile: str, region: str, name: str, expected_traffic: f constants.get_opensearch_domain_stack_name(name), constants.get_viewer_nodes_stack_name(name) ] - create_context = context.generate_create_cluster_context(name, cert_arn, capture_plan, ecs_resource_plan) + create_context = context.generate_create_cluster_context(name, cert_arn, capacity_plan) cdk_client.deploy(stacks_to_deploy, aws_profile=profile, aws_region=region, context=create_context) -def _get_capacity_plans(cluster_name: str, expected_traffic: float, aws_provider: AwsClientProvider) -> Tuple[CaptureNodesPlan, EcsSysResourcePlan]: +def _get_capacity_plan(cluster_name: str, expected_traffic: float, spi_days: int, replicas: int, aws_provider: AwsClientProvider) -> ClusterPlan: - if not expected_traffic: + # None of the parameters defined + if (not expected_traffic) and (not spi_days) and (not replicas): + # Re-use the existing configuration if it exists try: plan_json = ssm_ops.get_ssm_param_json_value( constants.get_cluster_ssm_param_name(cluster_name), - "captureNodesPlan", + "capacityPlan", aws_provider ) - capture_plan = CaptureNodesPlan( - instance_type=plan_json["instanceType"], - desired_count=plan_json["desiredCount"], - max_count=plan_json["maxCount"], - min_count=plan_json["minCount"], - ) + capacity_plan = ClusterPlan.from_dict(plan_json) + + return capacity_plan + # Existing configuration doesn't exist, use defaults except ssm_ops.ParamDoesNotExist: capture_plan = get_capture_node_capacity_plan(MINIMUM_TRAFFIC) - else: + capture_vpc_plan = CaptureVpcPlan(DEFAULT_NUM_AZS) + os_domain_plan = get_os_domain_plan(MINIMUM_TRAFFIC, DEFAULT_SPI_DAYS, DEFAULT_SPI_REPLICAS, capture_vpc_plan.numAzs) + # All of the parameters defined + elif expected_traffic and spi_days and replicas: capture_plan = get_capture_node_capacity_plan(expected_traffic) + capture_vpc_plan = CaptureVpcPlan(DEFAULT_NUM_AZS) + os_domain_plan = get_os_domain_plan(expected_traffic, spi_days, replicas, capture_vpc_plan.numAzs) + # Some, but not all, of the parameters defined + else: + raise MustProvideAllParams() - ecs_resource_plan = get_ecs_sys_resource_plan(capture_plan.instance_type) + ecs_resource_plan = get_ecs_sys_resource_plan(capture_plan.instanceType) - return (capture_plan, ecs_resource_plan) + return ClusterPlan(capture_plan, capture_vpc_plan, ecs_resource_plan, os_domain_plan) def _set_up_viewer_cert(name: str, aws_provider: AwsClientProvider) -> str: diff --git a/manage_arkime/core/capacity_planning.py b/manage_arkime/core/capacity_planning.py index 3109f54..f25574a 100644 --- a/manage_arkime/core/capacity_planning.py +++ b/manage_arkime/core/capacity_planning.py @@ -2,7 +2,7 @@ from enum import Enum import math import logging -from typing import Dict, Tuple +from typing import Dict, Type, TypeVar logger = logging.getLogger(__name__) @@ -10,8 +10,12 @@ TRAFFIC_PER_M5_XL = 2 # in Gbps, guestimate, should be updated with experimental data MAX_TRAFFIC = 100 # Gbps, scaling limit of a single User Subnet VPC Endpoint MINIMUM_NODES = 1 # We'll always have at least one capture node -MINIMUM_TRAFFIC = MINIMUM_NODES * TRAFFIC_PER_M5_XL +MINIMUM_TRAFFIC = 0.01 # Gbps; arbitrarily chosen, but will yield a minimal cluster CAPACITY_BUFFER_FACTOR = 1.25 # Arbitrarily chosen +MASTER_NODE_COUNT = 3 # Recommended number in docs +DEFAULT_SPI_DAYS = 30 # How many days of SPI metadata to keep in the OS Domain +DEFAULT_SPI_REPLICAS = 1 # How replicas of metadata to keep in the OS Domain +DEFAULT_NUM_AZS = 2 # How many AWS Availability zones to utilize class TooMuchTraffic(Exception): def __init__(self, expected_traffic: int): @@ -23,21 +27,21 @@ def __init__(self, expected_traffic: int): @dataclass class CaptureNodesPlan: - instance_type: str - desired_count: int - max_count: int - min_count: int + instanceType: str + desiredCount: int + maxCount: int + minCount: int def __equal__(self, other): - return (self.instance_type == other.instance_type and self.desired_count == other.desired_count - and self.max_count == other.max_count and self.min_count == other.min_count) + return (self.instanceType == other.instance_type and self.desiredCount == other.desired_count + and self.maxCount == other.max_count and self.minCount == other.min_count) def to_dict(self) -> Dict[str, str]: return { - "instanceType": self.instance_type, - "desiredCount": self.desired_count, - "maxCount": self.max_count, - "minCount": self.min_count, + "instanceType": self.instanceType, + "desiredCount": self.desiredCount, + "maxCount": self.maxCount, + "minCount": self.minCount, } def get_capture_node_capacity_plan(expected_traffic: float) -> CaptureNodesPlan: @@ -124,49 +128,57 @@ class DataNode: @dataclass class DataNodesPlan: count: int - type: str - vol_size: int # in GiB + instanceType: str + volumeSize: int # in GiB def __equal__(self, other): - return (self.count == other.count and self.type == other.type - and self.vol_size == other.vol_size) + return (self.count == other.count and self.instanceType == other.type + and self.volumeSize == other.vol_size) def to_dict(self) -> Dict[str, str]: return { "count": self.count, - "type": self.type, - "volSize": self.vol_size + "instanceType": self.instanceType, + "volumeSize": self.volumeSize } @dataclass class MasterNodesPlan: count: int - type: str + instanceType: str def __equal__(self, other): - return (self.count == other.count and self.type == other.type) + return (self.count == other.count and self.instanceType == other.type) def to_dict(self) -> Dict[str, str]: return { "count": self.count, - "type": self.type + "instanceType": self.instanceType } +T_OSDomainPlan = TypeVar('T_OSDomainPlan', bound='OSDomainPlan') + @dataclass class OSDomainPlan: - data_nodes: DataNodesPlan - master_nodes: MasterNodesPlan + dataNodes: DataNodesPlan + masterNodes: MasterNodesPlan def __equal__(self, other): - return (self.data_nodes == other.dataNodes - and self.master_nodes == other.masterNodes) + return (self.dataNodes == other.dataNodes + and self.masterNodes == other.masterNodes) def to_dict(self) -> Dict[str, str]: return { - "dataNodes": self.data_nodes.to_dict(), - "masterNodes": self.master_nodes.to_dict() + "dataNodes": self.dataNodes.to_dict(), + "masterNodes": self.masterNodes.to_dict() } + @classmethod + def from_dict(cls: Type[T_OSDomainPlan], input: Dict[str, any]) -> T_OSDomainPlan: + data_nodes = DataNodesPlan(**input["dataNodes"]) + master_nodes = MasterNodesPlan(**input["masterNodes"]) + return cls(data_nodes, master_nodes) + def _get_storage_per_replica(expected_traffic: float, spi_days: int) -> float: """ Predict the required OpenSearch domain storage for each replica, in GiB @@ -184,21 +196,25 @@ def _get_total_storage(expected_traffic: float, spi_days: int, replicas: int) -> spi_days: the number of days to retain the SPI data stored in the OpenSearch Domain replicas: the number of replicas to have of the data """ - return _get_storage_per_replica(expected_traffic, spi_days) * replicas + return _get_storage_per_replica(expected_traffic, spi_days) * (1 + replicas) -def _get_data_node_plan(total_storage: float) -> DataNodesPlan: +def _get_data_node_plan(total_storage: float, num_azs: int) -> DataNodesPlan: """ Per the OpenSearch Service limits doc [1], you can have a maximum of 10 T2/T3 data nodes or 80 of other types by default. You can raise this limit up to 200. To keep things simple, we will assume if the user needs more storage than 80 of the largest instance type can provide, they'll bump the limit out of band and just keep getting more of - that largest instance type. + that largest instance type. There's also an apparent incentive to have more, smaller nodes than fewer, larger + nodes [2]. + + We ensure there are at least two data nodes of whichever type is selected for the + capacity plan. - There's also an apparent incentive to have more, smaller nodes than fewer, larger nodes [2] + An additional constraint is that you must have an even number of data nodes if you have two AZs. [1] https://docs.aws.amazon.com/opensearch-service/latest/developerguide/limits.html [2] https://github.com/arkime/aws-aio/issues/56#issuecomment-1563652060 - total_storage: full stage requirement for all data, including replicas, in GiB + total_storage: full storage requirement for all data, including replicas, in GiB """ if total_storage <= 10 * T3_SMALL_SEARCH.vol_size: @@ -209,18 +225,24 @@ def _get_data_node_plan(total_storage: float) -> DataNodesPlan: node = R6G_4XLARGE_SEARCH elif total_storage <= 80 * R6G_12XLARGE_SEARCH.vol_size: node = R6G_12XLARGE_SEARCH + else: + node = R6G_12XLARGE_SEARCH # overflow with our largest instance type + + num_of_nodes = max(math.ceil(total_storage / node.vol_size), 2) + if num_azs == 2: + num_of_nodes = math.ceil(num_of_nodes / 2) * 2 # The next largest even integer plan = DataNodesPlan( - count = math.ceil(total_storage / node.vol_size), - type = node.type, - vol_size = node.vol_size + count = num_of_nodes, + instanceType = node.type, + volumeSize = node.vol_size ) return plan -def _get_master_node_plan(storage_per_replica: float, data_node_count: int) -> MasterNodesPlan: +def _get_master_node_plan(storage_per_replica: float, data_node_count: int, data_node_type: str) -> MasterNodesPlan: """ - We follow the sizing recommendation in the docs [1]. + We follow the sizing recommendation in the docs [1]. One complicating [1] https://docs.aws.amazon.com/opensearch-service/latest/developerguide/managedomains-dedicatedmasternodes.html @@ -232,7 +254,12 @@ def _get_master_node_plan(storage_per_replica: float, data_node_count: int) -> M storage_per_shard = 40 # GiB num_shards = math.ceil(storage_per_replica / storage_per_shard) - if num_shards <= 10000 and data_node_count <= 10: + if data_node_type == T3_SMALL_SEARCH.type: + # You can't mix graviton and non-graviton instance types across the data/master node roles. Additionally, + # there are no "toy"-class graviton data node instance types. Therefore, we need this (hacky) check to + # make sure we're using a compatible type. + node_type = "m5.large.search" + elif num_shards <= 10000 and data_node_count <= 10: node_type = "m6g.large.search" elif num_shards <= 30000 and data_node_count <= 30: node_type = "c6g.2xlarge.search" @@ -242,19 +269,66 @@ def _get_master_node_plan(storage_per_replica: float, data_node_count: int) -> M node_type = "r6g.4xlarge.search" return MasterNodesPlan( - count = 3, # Recommended number in docs - type = node_type + count = MASTER_NODE_COUNT, + instanceType = node_type ) -def get_os_domain_plan(expected_traffic: float, spi_days: int, replicas: int) -> OSDomainPlan: +def get_os_domain_plan(expected_traffic: float, spi_days: int, replicas: int, num_azs: int) -> OSDomainPlan: """ Get the OpenSearch Domain capacity required to satisify the expected traffic + + expected_traffic: traffic volume to the capture nodes, in Gbps + spi_days: the number of days to retain the SPI data stored in the OpenSearch Domain + replicas: the number of replicas to have of the data + num_azs: the number of AZs in the domain's VPC """ storage_per_replica = _get_storage_per_replica(expected_traffic, spi_days) total_storage = _get_total_storage(expected_traffic, spi_days, replicas) - data_node_plan = _get_data_node_plan(total_storage) - master_node_plan = _get_master_node_plan(storage_per_replica, data_node_plan.count) + data_node_plan = _get_data_node_plan(total_storage, num_azs) + master_node_plan = _get_master_node_plan(storage_per_replica, data_node_plan.count, data_node_plan.instanceType) + + return OSDomainPlan(data_node_plan, master_node_plan) + +@dataclass +class CaptureVpcPlan: + numAzs: int + + def __equal__(self, other): + return self.numAzs == other.numAzs + + def to_dict(self) -> Dict[str, str]: + return { + "numAzs": self.numAzs + } + +T_ClusterPlan = TypeVar('T_ClusterPlan', bound='ClusterPlan') - return OSDomainPlan(data_node_plan, master_node_plan) \ No newline at end of file +@dataclass +class ClusterPlan: + captureNodes: CaptureNodesPlan + captureVpc: CaptureVpcPlan + ecsResources: EcsSysResourcePlan + osDomain: OSDomainPlan + + def __equal__(self, other): + return (self.captureNodes == other.captureNodes and self.ecsResources == other.ecsResources + and self.osDomain == other.osDomain and self.captureVpc == other.vpc) + + def to_dict(self) -> Dict[str, str]: + return { + "captureNodes": self.captureNodes.to_dict(), + "captureVpc": self.captureVpc.to_dict(), + "ecsResources": self.ecsResources.to_dict(), + "osDomain": self.osDomain.to_dict() + } + + @classmethod + def from_dict(cls: Type[T_ClusterPlan], input: Dict[str, any]) -> T_ClusterPlan: + capture_nodes = CaptureNodesPlan(**input["captureNodes"]) + capture_vpc = CaptureVpcPlan(**input["captureVpc"]) + ecs_resources = EcsSysResourcePlan(**input["ecsResources"]) + os_domain = OSDomainPlan.from_dict(input["osDomain"]) + + return cls(capture_nodes, capture_vpc, ecs_resources, os_domain) \ No newline at end of file diff --git a/test_manage_arkime/commands/test_create_cluster.py b/test_manage_arkime/commands/test_create_cluster.py index 47bf2f5..6b280a9 100644 --- a/test_manage_arkime/commands/test_create_cluster.py +++ b/test_manage_arkime/commands/test_create_cluster.py @@ -1,14 +1,16 @@ import json +import pytest import shlex import unittest.mock as mock import aws_interactions.ssm_operations as ssm_ops -from commands.create_cluster import cmd_create_cluster, _set_up_viewer_cert, _get_capacity_plans +from commands.create_cluster import cmd_create_cluster, _set_up_viewer_cert, _get_capacity_plan, MustProvideAllParams import constants as constants -from core.capacity_planning import CaptureNodesPlan, EcsSysResourcePlan, MINIMUM_TRAFFIC +from core.capacity_planning import (CaptureNodesPlan, EcsSysResourcePlan, MINIMUM_TRAFFIC, OSDomainPlan, DataNodesPlan, MasterNodesPlan, + CaptureVpcPlan, ClusterPlan, DEFAULT_SPI_DAYS, DEFAULT_SPI_REPLICAS, DEFAULT_NUM_AZS) @mock.patch("commands.create_cluster.AwsClientProvider", mock.Mock()) -@mock.patch("commands.create_cluster._get_capacity_plans") +@mock.patch("commands.create_cluster._get_capacity_plan") @mock.patch("commands.create_cluster._set_up_viewer_cert") @mock.patch("commands.create_cluster.CdkClient") def test_WHEN_cmd_create_cluster_called_THEN_cdk_command_correct(mock_cdk_client_cls, mock_set_up, mock_get_plans): @@ -18,12 +20,16 @@ def test_WHEN_cmd_create_cluster_called_THEN_cdk_command_correct(mock_cdk_client mock_client = mock.Mock() mock_cdk_client_cls.return_value = mock_client - cap_plan = CaptureNodesPlan("m5.xlarge", 20, 25, 1) - ecs_plan = EcsSysResourcePlan(3584, 15360) - mock_get_plans.return_value = (cap_plan, ecs_plan) + cluster_plan = ClusterPlan( + CaptureNodesPlan("m5.xlarge", 20, 25, 1), + CaptureVpcPlan(DEFAULT_NUM_AZS), + EcsSysResourcePlan(3584, 15360), + OSDomainPlan(DataNodesPlan(2, "t3.small.search", 100), MasterNodesPlan(3, "m6g.large.search")) + ) + mock_get_plans.return_value = cluster_plan # Run our test - cmd_create_cluster("profile", "region", "my-cluster", None) + cmd_create_cluster("profile", "region", "my-cluster", None, None, None) # Check our results expected_calls = [ @@ -53,8 +59,7 @@ def test_WHEN_cmd_create_cluster_called_THEN_cdk_command_correct(mock_cdk_client "nameViewerPassSsmParam": constants.get_viewer_password_ssm_param_name("my-cluster"), "nameViewerUserSsmParam": constants.get_viewer_user_ssm_param_name("my-cluster"), "nameViewerNodesStack": constants.get_viewer_nodes_stack_name("my-cluster"), - "planCaptureNodes": json.dumps(cap_plan.to_dict()), - "planEcsResources": json.dumps(ecs_plan.to_dict()) + "planCluster": json.dumps(cluster_plan.to_dict()), })) } ) @@ -67,78 +72,125 @@ def test_WHEN_cmd_create_cluster_called_THEN_cdk_command_correct(mock_cdk_client assert expected_set_up_calls == mock_set_up.call_args_list @mock.patch("commands.create_cluster.ssm_ops") -def test_WHEN_get_capacity_plans_called_AND_use_existing_THEN_as_expected(mock_ssm_ops): +def test_WHEN_get_capacity_plan_called_AND_use_existing_THEN_as_expected(mock_ssm_ops): # Set up our mock - mock_ssm_ops.get_ssm_param_json_value.return_value = {"instanceType":"m5.xlarge","desiredCount":10,"maxCount":12,"minCount":1} + mock_ssm_ops.ParamDoesNotExist = ssm_ops.ParamDoesNotExist + + mock_ssm_ops.get_ssm_param_json_value.return_value = { + "captureNodes": { + "instanceType":"m5.xlarge","desiredCount":10,"maxCount":12,"minCount":1 + }, + "captureVpc": { + "numAzs": 3 + }, + "ecsResources": { + "cpu": 3584, "memory": 15360 + }, + "osDomain": { + "dataNodes": { + "count": 6, "instanceType": "t3.small.search", "volumeSize": 100 + }, + "masterNodes": { + "count": 3, "instanceType": "c6g.2xlarge.search", + } + } + } mock_provider = mock.Mock() # Run our test - actual_cap, actual_resources = _get_capacity_plans("my-cluster", None, mock_provider) + actual_value = _get_capacity_plan("my-cluster", None, None, None, mock_provider) # Check our results - assert CaptureNodesPlan("m5.xlarge", 10, 12, 1) == actual_cap - assert EcsSysResourcePlan(3584, 15360) == actual_resources + assert CaptureNodesPlan("m5.xlarge", 10, 12, 1) == actual_value.captureNodes + assert CaptureVpcPlan(3) == actual_value.captureVpc + assert EcsSysResourcePlan(3584, 15360) == actual_value.ecsResources + assert OSDomainPlan(DataNodesPlan(6, "t3.small.search", 100), MasterNodesPlan(3, "c6g.2xlarge.search")) == actual_value.osDomain expected_get_ssm_calls = [ - mock.call(constants.get_cluster_ssm_param_name("my-cluster"), "captureNodesPlan", mock.ANY) + mock.call(constants.get_cluster_ssm_param_name("my-cluster"), "capacityPlan", mock.ANY) ] assert expected_get_ssm_calls == mock_ssm_ops.get_ssm_param_json_value.call_args_list +@mock.patch("commands.create_cluster.get_os_domain_plan") @mock.patch("commands.create_cluster.get_capture_node_capacity_plan") @mock.patch("commands.create_cluster.ssm_ops") -def test_WHEN_get_capacity_plans_called_AND_use_default_THEN_as_expected(mock_ssm_ops, mock_get_cap): +def test_WHEN_get_capacity_plan_called_AND_use_default_THEN_as_expected(mock_ssm_ops, mock_get_cap, mock_get_os): # Set up our mock mock_ssm_ops.ParamDoesNotExist = ssm_ops.ParamDoesNotExist mock_ssm_ops.get_ssm_param_json_value.side_effect = ssm_ops.ParamDoesNotExist("") mock_get_cap.return_value = CaptureNodesPlan("m5.xlarge", 1, 2, 1) + mock_get_os.return_value = OSDomainPlan(DataNodesPlan(2, "t3.small.search", 100), MasterNodesPlan(3, "m6g.large.search")) mock_provider = mock.Mock() # Run our test - actual_cap, actual_resources = _get_capacity_plans("my-cluster", None, mock_provider) + actual_value = _get_capacity_plan("my-cluster", None, None, None, mock_provider) # Check our results - assert mock_get_cap.return_value == actual_cap - assert EcsSysResourcePlan(3584, 15360) == actual_resources + assert mock_get_cap.return_value == actual_value.captureNodes + assert CaptureVpcPlan(DEFAULT_NUM_AZS) == actual_value.captureVpc + assert mock_get_os.return_value == actual_value.osDomain + assert EcsSysResourcePlan(3584, 15360) == actual_value.ecsResources expected_get_cap_calls = [ mock.call(MINIMUM_TRAFFIC) ] assert expected_get_cap_calls == mock_get_cap.call_args_list + expected_get_os_calls = [ + mock.call(MINIMUM_TRAFFIC, DEFAULT_SPI_DAYS, DEFAULT_SPI_REPLICAS, DEFAULT_NUM_AZS) + ] + assert expected_get_os_calls == mock_get_os.call_args_list + expected_get_ssm_calls = [ - mock.call(constants.get_cluster_ssm_param_name("my-cluster"), "captureNodesPlan", mock.ANY) + mock.call(constants.get_cluster_ssm_param_name("my-cluster"), "capacityPlan", mock.ANY) ] assert expected_get_ssm_calls == mock_ssm_ops.get_ssm_param_json_value.call_args_list +@mock.patch("commands.create_cluster.get_os_domain_plan") @mock.patch("commands.create_cluster.get_capture_node_capacity_plan") @mock.patch("commands.create_cluster.ssm_ops") -def test_WHEN_get_capacity_plans_called_AND_gen_plan_THEN_as_expected(mock_ssm_ops, mock_get_cap): +def test_WHEN_get_capacity_plan_called_AND_gen_plan_THEN_as_expected(mock_ssm_ops, mock_get_cap, mock_get_os): # Set up our mock mock_ssm_ops.ParamDoesNotExist = ssm_ops.ParamDoesNotExist mock_ssm_ops.get_ssm_param_json_value.side_effect = ssm_ops.ParamDoesNotExist("") mock_get_cap.return_value = CaptureNodesPlan("m5.xlarge", 10, 12, 1) + mock_get_os.return_value = OSDomainPlan(DataNodesPlan(20, "r6g.large.search", 100), MasterNodesPlan(3, "m6g.large.search")) mock_provider = mock.Mock() # Run our test - actual_cap, actual_resources = _get_capacity_plans("my-cluster", 20, mock_provider) + actual_value = _get_capacity_plan("my-cluster", 10, 40, 2, mock_provider) # Check our results - assert mock_get_cap.return_value == actual_cap - assert EcsSysResourcePlan(3584, 15360) == actual_resources + assert mock_get_cap.return_value == actual_value.captureNodes + assert CaptureVpcPlan(DEFAULT_NUM_AZS) == actual_value.captureVpc + assert mock_get_os.return_value == actual_value.osDomain + assert EcsSysResourcePlan(3584, 15360) == actual_value.ecsResources expected_get_cap_calls = [ - mock.call(20) + mock.call(10) ] assert expected_get_cap_calls == mock_get_cap.call_args_list + expected_get_os_calls = [ + mock.call(10, 40, 2, DEFAULT_NUM_AZS) + ] + assert expected_get_os_calls == mock_get_os.call_args_list + expected_get_ssm_calls = [] assert expected_get_ssm_calls == mock_ssm_ops.get_ssm_param_json_value.call_args_list +def test_WHEN_get_capacity_plan_called_AND_not_all_params_THEN_as_expected(): + # Set up our mock + mock_provider = mock.Mock() + + # Run our test + with pytest.raises(MustProvideAllParams): + _get_capacity_plan("my-cluster", 10, None, None, mock_provider) @mock.patch("commands.create_cluster.upload_default_elb_cert") @mock.patch("commands.create_cluster.ssm_ops") diff --git a/test_manage_arkime/commands/test_destroy_cluster.py b/test_manage_arkime/commands/test_destroy_cluster.py index e0dd51a..20e126a 100644 --- a/test_manage_arkime/commands/test_destroy_cluster.py +++ b/test_manage_arkime/commands/test_destroy_cluster.py @@ -5,7 +5,8 @@ from aws_interactions.ssm_operations import ParamDoesNotExist from commands.destroy_cluster import cmd_destroy_cluster, _destroy_viewer_cert import constants as constants -from core.capacity_planning import CaptureNodesPlan, EcsSysResourcePlan +from core.capacity_planning import (CaptureNodesPlan, EcsSysResourcePlan, OSDomainPlan, DataNodesPlan, MasterNodesPlan, + ClusterPlan, CaptureVpcPlan) TEST_CLUSTER = "my-cluster" @@ -21,6 +22,13 @@ def test_WHEN_cmd_destroy_cluster_called_AND_dont_destroy_everything_THEN_expect mock_client = mock.Mock() mock_cdk_client_cls.return_value = mock_client + cluster_plan = ClusterPlan( + CaptureNodesPlan("m5.xlarge", 1, 2, 1), + CaptureVpcPlan(1), + EcsSysResourcePlan(1, 1), + OSDomainPlan(DataNodesPlan(2, "t3.small.search", 100), MasterNodesPlan(3, "m6g.large.search")) + ) + # Run our test cmd_destroy_cluster("profile", "region", TEST_CLUSTER, False) @@ -52,8 +60,7 @@ def test_WHEN_cmd_destroy_cluster_called_AND_dont_destroy_everything_THEN_expect "nameViewerPassSsmParam": constants.get_viewer_password_ssm_param_name(TEST_CLUSTER), "nameViewerUserSsmParam": constants.get_viewer_user_ssm_param_name(TEST_CLUSTER), "nameViewerNodesStack": constants.get_viewer_nodes_stack_name(TEST_CLUSTER), - "planCaptureNodes": json.dumps(CaptureNodesPlan("m5.xlarge", 1, 2, 1).to_dict()), - "planEcsResources": json.dumps(EcsSysResourcePlan(1, 1).to_dict()) + "planCluster": json.dumps(cluster_plan.to_dict()), })) } ) @@ -84,6 +91,13 @@ def test_WHEN_cmd_destroy_cluster_called_AND_destroy_everything_THEN_expected_cm constants.get_capture_bucket_ssm_param_name(TEST_CLUSTER), ] + cluster_plan = ClusterPlan( + CaptureNodesPlan("m5.xlarge", 1, 2, 1), + CaptureVpcPlan(1), + EcsSysResourcePlan(1, 1), + OSDomainPlan(DataNodesPlan(2, "t3.small.search", 100), MasterNodesPlan(3, "m6g.large.search")) + ) + # Run our test cmd_destroy_cluster("profile", "region", TEST_CLUSTER, True) @@ -132,8 +146,7 @@ def test_WHEN_cmd_destroy_cluster_called_AND_destroy_everything_THEN_expected_cm "nameViewerPassSsmParam": constants.get_viewer_password_ssm_param_name(TEST_CLUSTER), "nameViewerUserSsmParam": constants.get_viewer_user_ssm_param_name(TEST_CLUSTER), "nameViewerNodesStack": constants.get_viewer_nodes_stack_name(TEST_CLUSTER), - "planCaptureNodes": json.dumps(CaptureNodesPlan("m5.xlarge", 1, 2, 1).to_dict()), - "planEcsResources": json.dumps(EcsSysResourcePlan(1, 1).to_dict()) + "planCluster": json.dumps(cluster_plan.to_dict()), })) } ) diff --git a/test_manage_arkime/core/test_capacity_planning.py b/test_manage_arkime/core/test_capacity_planning.py index 2eb0362..2de61a9 100644 --- a/test_manage_arkime/core/test_capacity_planning.py +++ b/test_manage_arkime/core/test_capacity_planning.py @@ -47,4 +47,130 @@ def test_WHEN_get_ecs_sys_resource_plan_called_THEN_as_expected(): # TEST 2: Get an unknown instance type with pytest.raises(cap.UnknownInstanceType): - cap.get_ecs_sys_resource_plan("unknown") \ No newline at end of file + cap.get_ecs_sys_resource_plan("unknown") + +def test_WHEN_get_total_storage_called_THEN_as_expected(): + # TEST 1: No replicas + actual_value = cap._get_total_storage(10, 30, 0) + expected_value = 97200 + + assert expected_value == actual_value + + # TEST 2: Single replica + actual_value = cap._get_total_storage(10, 30, 1) + expected_value = 97200*2 + + assert expected_value == actual_value + + # TEST 3: Many replicas + actual_value = cap._get_total_storage(10, 30, 5) + expected_value = 97200*6 + + assert expected_value == actual_value + +def test_WHEN_get_data_node_plan_called_THEN_as_expected(): + # TEST 1: Toy setup + actual_value = cap._get_data_node_plan(10, 3) + expected_value = cap.DataNodesPlan(2, cap.T3_SMALL_SEARCH.type, cap.T3_SMALL_SEARCH.vol_size) + assert expected_value == actual_value + + # TEST 2: Tiny setup + actual_value = cap._get_data_node_plan(650, 3) + expected_value = cap.DataNodesPlan(7, cap.T3_SMALL_SEARCH.type, cap.T3_SMALL_SEARCH.vol_size) + assert expected_value == actual_value + + # TEST 2b: Tiny setup, 2 AZs + actual_value = cap._get_data_node_plan(650, 2) + expected_value = cap.DataNodesPlan(8, cap.T3_SMALL_SEARCH.type, cap.T3_SMALL_SEARCH.vol_size) + assert expected_value == actual_value + + # TEST 3: Small setup (1) + actual_value = cap._get_data_node_plan(1100, 3) + expected_value = cap.DataNodesPlan(2, cap.R6G_LARGE_SEARCH.type, cap.R6G_LARGE_SEARCH.vol_size) + assert expected_value == actual_value + + # TEST 4: Small setup (2) + actual_value = cap._get_data_node_plan(65000, 3) + expected_value = cap.DataNodesPlan(64, cap.R6G_LARGE_SEARCH.type, cap.R6G_LARGE_SEARCH.vol_size) + assert expected_value == actual_value + + # TEST 5: Medium setup (1) + actual_value = cap._get_data_node_plan(90000, 3) + expected_value = cap.DataNodesPlan(15, cap.R6G_4XLARGE_SEARCH.type, cap.R6G_4XLARGE_SEARCH.vol_size) + assert expected_value == actual_value + + # TEST 5b: Medium setup (1), 2 AZs + actual_value = cap._get_data_node_plan(90000, 2) + expected_value = cap.DataNodesPlan(16, cap.R6G_4XLARGE_SEARCH.type, cap.R6G_4XLARGE_SEARCH.vol_size) + assert expected_value == actual_value + + # TEST 6: Medium setup (2) + actual_value = cap._get_data_node_plan(450000, 3) + expected_value = cap.DataNodesPlan(74, cap.R6G_4XLARGE_SEARCH.type, cap.R6G_4XLARGE_SEARCH.vol_size) + assert expected_value == actual_value + + # TEST 7: Large setup (1) + actual_value = cap._get_data_node_plan(500000, 3) + expected_value = cap.DataNodesPlan(41, cap.R6G_12XLARGE_SEARCH.type, cap.R6G_12XLARGE_SEARCH.vol_size) + assert expected_value == actual_value + + # TEST 8: Enormous setup + actual_value = cap._get_data_node_plan(1200000, 3) + expected_value = cap.DataNodesPlan(98, cap.R6G_12XLARGE_SEARCH.type, cap.R6G_12XLARGE_SEARCH.vol_size) + assert expected_value == actual_value + +def test_WHEN_get_master_node_plan_called_THEN_as_expected(): + # TEST: Non-graviton + actual_value = cap._get_master_node_plan(5, 2, cap.T3_SMALL_SEARCH.type) + expected_value = cap.MasterNodesPlan(3, "m5.large.search") + assert expected_value == actual_value + + # TEST: Small data + actual_value = cap._get_master_node_plan(5, 2, "blah") + expected_value = cap.MasterNodesPlan(3, "m6g.large.search") + assert expected_value == actual_value + + # TEST: Small data w/ lots of data nodes + actual_value = cap._get_master_node_plan(5, 11, "blah") + expected_value = cap.MasterNodesPlan(3, "c6g.2xlarge.search") + assert expected_value == actual_value + + # TEST: Small data w/ lots and lots of data nodes + actual_value = cap._get_master_node_plan(5, 31, "blah") + expected_value = cap.MasterNodesPlan(3, "r6g.2xlarge.search") + assert expected_value == actual_value + + # TEST: Medium data + actual_value = cap._get_master_node_plan(401000, 20, "blah") + expected_value = cap.MasterNodesPlan(3, "c6g.2xlarge.search") + assert expected_value == actual_value + + # TEST: Medium data w/ lots of data nodes + actual_value = cap._get_master_node_plan(401000, 31, "blah") + expected_value = cap.MasterNodesPlan(3, "r6g.2xlarge.search") + assert expected_value == actual_value + + # TEST: Large data + actual_value = cap._get_master_node_plan(1250000, 80, "blah") + expected_value = cap.MasterNodesPlan(3, "r6g.2xlarge.search") + assert expected_value == actual_value + + # TEST: Large data w/ lots of data nodes + actual_value = cap._get_master_node_plan(1250000, 130, "blah") + expected_value = cap.MasterNodesPlan(3, "r6g.4xlarge.search") + assert expected_value == actual_value + + # TEST: Enormous data w/ lots of data nodes + actual_value = cap._get_master_node_plan(3010000, 120, "blah") + expected_value = cap.MasterNodesPlan(3, "r6g.4xlarge.search") + assert expected_value == actual_value + +def test_WHEN_get_os_domain_plan_called_THEN_as_expected(): + actual_value = cap.get_os_domain_plan(20, 30, 1, 2) + expected_value = cap.OSDomainPlan( + cap.DataNodesPlan(64, cap.R6G_4XLARGE_SEARCH.type, cap.R6G_4XLARGE_SEARCH.vol_size), + cap.MasterNodesPlan(3, "r6g.2xlarge.search") + ) + assert expected_value == actual_value + + \ No newline at end of file From 0a150453658357cf95457b5ba1e5e68b869de59c Mon Sep 17 00:00:00 2001 From: Chris Helma Date: Thu, 1 Jun 2023 17:49:17 -0500 Subject: [PATCH 2/2] Made capacity options individually changable Signed-off-by: Chris Helma --- README.md | 2 +- cdk-lib/capture-stacks/capture-nodes-stack.ts | 5 +- cdk-lib/cloud-demo.ts | 3 +- cdk-lib/core/command-params.ts | 3 + cdk-lib/core/context-wrangling.ts | 3 +- cdk-lib/core/ssm-wrangling.ts | 2 + cdk-lib/core/user-config.ts | 8 ++ manage_arkime.py | 2 +- manage_arkime/cdk_interactions/cdk_context.py | 11 +- manage_arkime/commands/create_cluster.py | 51 ++++--- manage_arkime/core/capacity_planning.py | 15 +- manage_arkime/core/user_config.py | 23 ++++ .../commands/test_create_cluster.py | 129 ++++++++++-------- .../commands/test_destroy_cluster.py | 3 + 14 files changed, 158 insertions(+), 102 deletions(-) create mode 100644 cdk-lib/core/user-config.ts create mode 100644 manage_arkime/core/user_config.py diff --git a/README.md b/README.md index 28e177e..59c2497 100644 --- a/README.md +++ b/README.md @@ -109,7 +109,7 @@ You can see your created cluster and the VPCs it is currently monitoring using t By default, you will be given the minimum-size Capture Cluster. You can provision a Cluster that will serve your expected usage using a set of optional command-line parameters, which will ensure the EC2 Capture Nodes and OpenSearch Domain are suitably provisioned (plus a little extra for safety): ``` -./manage_arkime.py create-cluster --name MyCluster --expected-traffic 1 --spi-days 30 --replicas 2 +./manage_arkime.py create-cluster --name MyCluster --expected-traffic 1 --spi-days 30 --replicas 1 ``` ### Setting up capture for a VPC diff --git a/cdk-lib/capture-stacks/capture-nodes-stack.ts b/cdk-lib/capture-stacks/capture-nodes-stack.ts index 0b8d504..dc05e0a 100644 --- a/cdk-lib/capture-stacks/capture-nodes-stack.ts +++ b/cdk-lib/capture-stacks/capture-nodes-stack.ts @@ -18,6 +18,7 @@ import { Construct } from 'constructs'; import * as constants from '../core/constants'; import * as plan from '../core/capacity-plan'; import {ClusterSsmValue} from '../core/ssm-wrangling'; +import * as user from '../core/user-config'; export interface CaptureNodesStackProps extends cdk.StackProps { readonly captureBucket: s3.Bucket; @@ -28,6 +29,7 @@ export interface CaptureNodesStackProps extends cdk.StackProps { readonly osPassword: secretsmanager.Secret; readonly planCluster: plan.ClusterPlan; readonly ssmParamNameCluster: string; + readonly userConfig: user.UserConfig; } export class CaptureNodesStack extends cdk.Stack { @@ -241,7 +243,8 @@ export class CaptureNodesStack extends cdk.Stack { busName: clusterBus.eventBusName, clusterName: props.clusterName, vpceServiceId: gwlbEndpointService.ref, - capacityPlan: props.planCluster + capacityPlan: props.planCluster, + userConfig: props.userConfig } const clusterParam = new ssm.StringParameter(this, 'ClusterParam', { allowedPattern: '.*', diff --git a/cdk-lib/cloud-demo.ts b/cdk-lib/cloud-demo.ts index 27c7116..4a34fab 100644 --- a/cdk-lib/cloud-demo.ts +++ b/cdk-lib/cloud-demo.ts @@ -50,7 +50,8 @@ switch(params.type) { osDomain: osDomainStack.domain, osPassword: osDomainStack.osPassword, planCluster: params.planCluster, - ssmParamNameCluster: params.nameClusterSsmParam + ssmParamNameCluster: params.nameClusterSsmParam, + userConfig: params.userConfig }); captureNodesStack.addDependency(captureBucketStack) captureNodesStack.addDependency(captureVpcStack) diff --git a/cdk-lib/core/command-params.ts b/cdk-lib/core/command-params.ts index 6638227..c5eac83 100644 --- a/cdk-lib/core/command-params.ts +++ b/cdk-lib/core/command-params.ts @@ -1,4 +1,5 @@ import * as plan from './capacity-plan'; +import * as user from './user-config'; /** * Base type for receiving arguments from the Python side of the app. These directly match the interface on the Python @@ -26,6 +27,7 @@ export interface ClusterMgmtParamsRaw extends CommandParamsRaw { nameViewerUserSsmParam: string; nameViewerNodesStack: string; planCluster: string; + userConfig: string; } /** @@ -89,6 +91,7 @@ export interface ClusterMgmtParams extends CommandParams { nameViewerUserSsmParam: string; nameViewerNodesStack: string; planCluster: plan.ClusterPlan; + userConfig: user.UserConfig; } /** diff --git a/cdk-lib/core/context-wrangling.ts b/cdk-lib/core/context-wrangling.ts index ea60068..506455d 100644 --- a/cdk-lib/core/context-wrangling.ts +++ b/cdk-lib/core/context-wrangling.ts @@ -15,7 +15,7 @@ import {CDK_CONTEXT_CMD_VAR, CDK_CONTEXT_REGION_VAR, CDK_CONTEXT_PARAMS_VAR, Man export function getCommandParams(app: cdk.App) : (prms.ClusterMgmtParams | prms.DeployDemoTrafficParams | prms.DestroyDemoTrafficParams | prms.MirrorMgmtParams) { // This ENV variable is set by the CDK CLI. It reads it from your AWS Credential profile, and configures the var // before invoking CDK actions. - const awsAccount: string | undefined = process.env.CDK_DEFAULT_ACCOUNT + const awsAccount: string | undefined = process.env.CDK_DEFAULT_ACCOUNT // Like the CDK_DEFAULT_ACCOUNT, the CDK CLI sets the CDK_DEFAULT_REGION by reading the AWS Credential profile. // However, we want the user to to able to specify a different region than the default so we optionaly pass in one @@ -100,6 +100,7 @@ function validateArgs(args: ValidateArgs) : (prms.ClusterMgmtParams | prms.Deplo nameViewerUserSsmParam: rawClusterMgmtParamsObj.nameViewerUserSsmParam, nameViewerNodesStack: rawClusterMgmtParamsObj.nameViewerNodesStack, planCluster: JSON.parse(rawClusterMgmtParamsObj.planCluster), + userConfig: JSON.parse(rawClusterMgmtParamsObj.userConfig), } return clusterMgmtParams; case ManagementCmd.AddVpc: // Add and Remove VPC use the same parameters diff --git a/cdk-lib/core/ssm-wrangling.ts b/cdk-lib/core/ssm-wrangling.ts index 7ae9019..5ad0603 100644 --- a/cdk-lib/core/ssm-wrangling.ts +++ b/cdk-lib/core/ssm-wrangling.ts @@ -1,4 +1,5 @@ import * as plan from '../core/capacity-plan'; +import * as user from '../core/user-config'; /** * This file contains functions and types that define a shared interface with the Python management CLI; the two need @@ -12,6 +13,7 @@ export interface ClusterSsmValue { readonly clusterName: string; readonly vpceServiceId: string; readonly capacityPlan: plan.ClusterPlan; + readonly userConfig: user.UserConfig; } export interface SubnetSsmValue { diff --git a/cdk-lib/core/user-config.ts b/cdk-lib/core/user-config.ts new file mode 100644 index 0000000..8a8df63 --- /dev/null +++ b/cdk-lib/core/user-config.ts @@ -0,0 +1,8 @@ +/** + * Structure to hold the user's input configuration + */ +export interface UserConfig { + expectedTraffic: number; + spiDays: number; + replicas: number; +} \ No newline at end of file diff --git a/manage_arkime.py b/manage_arkime.py index e0900b7..fb56b17 100755 --- a/manage_arkime.py +++ b/manage_arkime.py @@ -57,7 +57,7 @@ def destroy_demo_traffic(ctx): @click.option("--name", help="The name you want your Arkime Cluster and its associated resources to have", required=True) @click.option( "--expected-traffic", - help=("The amount of traffic, in gigabits-per-second, you expect your Arkime Cluster to receive." + help=("The average amount of traffic, in gigabits-per-second, you expect your Arkime Cluster to receive." + f"Maximum: {MAX_TRAFFIC} Gbps"), default=None, type=click.FLOAT, diff --git a/manage_arkime/cdk_interactions/cdk_context.py b/manage_arkime/cdk_interactions/cdk_context.py index 4e5e567..ebe0703 100644 --- a/manage_arkime/cdk_interactions/cdk_context.py +++ b/manage_arkime/cdk_interactions/cdk_context.py @@ -5,9 +5,10 @@ import constants as constants from core.capacity_planning import (CaptureNodesPlan, CaptureVpcPlan, ClusterPlan, DataNodesPlan, EcsSysResourcePlan, MasterNodesPlan, OSDomainPlan, INSTANCE_TYPE_CAPTURE_NODE, DEFAULT_NUM_AZS) +from core.user_config import UserConfig -def generate_create_cluster_context(name: str, viewer_cert_arn: str, cluster_plan: ClusterPlan) -> Dict[str, str]: - create_context = _generate_cluster_context(name, viewer_cert_arn, cluster_plan) +def generate_create_cluster_context(name: str, viewer_cert_arn: str, cluster_plan: ClusterPlan, user_config: UserConfig) -> Dict[str, str]: + create_context = _generate_cluster_context(name, viewer_cert_arn, cluster_plan, user_config) create_context[constants.CDK_CONTEXT_CMD_VAR] = constants.CMD_CREATE_CLUSTER return create_context @@ -22,12 +23,13 @@ def generate_destroy_cluster_context(name: str) -> Dict[str, str]: EcsSysResourcePlan(1, 1), OSDomainPlan(DataNodesPlan(2, "t3.small.search", 100), MasterNodesPlan(3, "m6g.large.search")) ) + fake_user_config = UserConfig(1, 1, 1) - destroy_context = _generate_cluster_context(name, fake_arn, fake_cluster_plan) + destroy_context = _generate_cluster_context(name, fake_arn, fake_cluster_plan, fake_user_config) destroy_context[constants.CDK_CONTEXT_CMD_VAR] = constants.CMD_DESTROY_CLUSTER return destroy_context -def _generate_cluster_context(name: str, viewer_cert_arn: str, cluster_plan: ClusterPlan) -> Dict[str, str]: +def _generate_cluster_context(name: str, viewer_cert_arn: str, cluster_plan: ClusterPlan, user_config: UserConfig) -> Dict[str, str]: cmd_params = { "nameCluster": name, "nameCaptureBucketStack": constants.get_capture_bucket_stack_name(name), @@ -43,6 +45,7 @@ def _generate_cluster_context(name: str, viewer_cert_arn: str, cluster_plan: Clu "nameViewerUserSsmParam": constants.get_viewer_user_ssm_param_name(name), "nameViewerNodesStack": constants.get_viewer_nodes_stack_name(name), "planCluster": json.dumps(cluster_plan.to_dict()), + "userConfig": json.dumps(user_config.to_dict()), } return { diff --git a/manage_arkime/commands/create_cluster.py b/manage_arkime/commands/create_cluster.py index 139ece8..da56a04 100644 --- a/manage_arkime/commands/create_cluster.py +++ b/manage_arkime/commands/create_cluster.py @@ -1,6 +1,5 @@ import json import logging -from typing import Tuple from aws_interactions.acm_interactions import upload_default_elb_cert from aws_interactions.aws_client_provider import AwsClientProvider @@ -9,21 +8,18 @@ import cdk_interactions.cdk_context as context import constants as constants from core.capacity_planning import (get_capture_node_capacity_plan, get_ecs_sys_resource_plan, get_os_domain_plan, ClusterPlan, - CaptureNodesPlan, CaptureVpcPlan, EcsSysResourcePlan, OSDomainPlan, MINIMUM_TRAFFIC, - DEFAULT_SPI_DAYS, DEFAULT_SPI_REPLICAS, DEFAULT_NUM_AZS) + CaptureVpcPlan, MINIMUM_TRAFFIC, DEFAULT_SPI_DAYS, DEFAULT_SPI_REPLICAS, DEFAULT_NUM_AZS) +from core.user_config import UserConfig logger = logging.getLogger(__name__) -class MustProvideAllParams(Exception): - def __init__(self): - super().__init__("If you specify one of the optional capacity parameters, you must specify all of them.") - def cmd_create_cluster(profile: str, region: str, name: str, expected_traffic: float, spi_days: int, replicas: int): logger.debug(f"Invoking create-cluster with profile '{profile}' and region '{region}'") aws_provider = AwsClientProvider(aws_profile=profile, aws_region=region) - capacity_plan = _get_capacity_plan(name, expected_traffic, spi_days, replicas, aws_provider) + user_config = _get_user_config(name, expected_traffic, spi_days, replicas, aws_provider) + capacity_plan = _get_capacity_plan(user_config) cert_arn = _set_up_viewer_cert(name, aws_provider) @@ -35,38 +31,41 @@ def cmd_create_cluster(profile: str, region: str, name: str, expected_traffic: f constants.get_opensearch_domain_stack_name(name), constants.get_viewer_nodes_stack_name(name) ] - create_context = context.generate_create_cluster_context(name, cert_arn, capacity_plan) + create_context = context.generate_create_cluster_context(name, cert_arn, capacity_plan, user_config) cdk_client.deploy(stacks_to_deploy, aws_profile=profile, aws_region=region, context=create_context) -def _get_capacity_plan(cluster_name: str, expected_traffic: float, spi_days: int, replicas: int, aws_provider: AwsClientProvider) -> ClusterPlan: - - # None of the parameters defined - if (not expected_traffic) and (not spi_days) and (not replicas): +def _get_user_config(cluster_name: str, expected_traffic: float, spi_days: int, replicas: int, aws_provider: AwsClientProvider) -> UserConfig: + # At least one parameter isn't defined + if None in [expected_traffic, spi_days, replicas]: # Re-use the existing configuration if it exists try: - plan_json = ssm_ops.get_ssm_param_json_value( + stored_config_json = ssm_ops.get_ssm_param_json_value( constants.get_cluster_ssm_param_name(cluster_name), - "capacityPlan", + "userConfig", aws_provider ) - capacity_plan = ClusterPlan.from_dict(plan_json) + user_config = UserConfig(**stored_config_json) + + if expected_traffic is not None: + user_config.expectedTraffic = expected_traffic + if spi_days is not None: + user_config.spiDays = spi_days + if replicas is not None: + user_config.replicas = replicas - return capacity_plan + return user_config # Existing configuration doesn't exist, use defaults except ssm_ops.ParamDoesNotExist: - capture_plan = get_capture_node_capacity_plan(MINIMUM_TRAFFIC) - capture_vpc_plan = CaptureVpcPlan(DEFAULT_NUM_AZS) - os_domain_plan = get_os_domain_plan(MINIMUM_TRAFFIC, DEFAULT_SPI_DAYS, DEFAULT_SPI_REPLICAS, capture_vpc_plan.numAzs) + return UserConfig(MINIMUM_TRAFFIC, DEFAULT_SPI_DAYS, DEFAULT_SPI_REPLICAS) # All of the parameters defined - elif expected_traffic and spi_days and replicas: - capture_plan = get_capture_node_capacity_plan(expected_traffic) - capture_vpc_plan = CaptureVpcPlan(DEFAULT_NUM_AZS) - os_domain_plan = get_os_domain_plan(expected_traffic, spi_days, replicas, capture_vpc_plan.numAzs) - # Some, but not all, of the parameters defined else: - raise MustProvideAllParams() + return UserConfig(expected_traffic, spi_days, replicas) +def _get_capacity_plan(user_config: UserConfig) -> ClusterPlan: + capture_plan = get_capture_node_capacity_plan(user_config.expectedTraffic) + capture_vpc_plan = CaptureVpcPlan(DEFAULT_NUM_AZS) + os_domain_plan = get_os_domain_plan(user_config.expectedTraffic, user_config.spiDays, user_config.replicas, capture_vpc_plan.numAzs) ecs_resource_plan = get_ecs_sys_resource_plan(capture_plan.instanceType) return ClusterPlan(capture_plan, capture_vpc_plan, ecs_resource_plan, os_domain_plan) diff --git a/manage_arkime/core/capacity_planning.py b/manage_arkime/core/capacity_planning.py index f25574a..ae3f535 100644 --- a/manage_arkime/core/capacity_planning.py +++ b/manage_arkime/core/capacity_planning.py @@ -1,5 +1,4 @@ from dataclasses import dataclass -from enum import Enum import math import logging from typing import Dict, Type, TypeVar @@ -36,7 +35,7 @@ def __equal__(self, other): return (self.instanceType == other.instance_type and self.desiredCount == other.desired_count and self.maxCount == other.max_count and self.minCount == other.min_count) - def to_dict(self) -> Dict[str, str]: + def to_dict(self) -> Dict[str, any]: return { "instanceType": self.instanceType, "desiredCount": self.desiredCount, @@ -77,7 +76,7 @@ class EcsSysResourcePlan: def __equal__(self, other): return self.cpu == other.cpu and self.memory == other.memory - def to_dict(self) -> Dict[str, str]: + def to_dict(self) -> Dict[str, any]: return { "cpu": self.cpu, "memory": self.memory @@ -135,7 +134,7 @@ def __equal__(self, other): return (self.count == other.count and self.instanceType == other.type and self.volumeSize == other.vol_size) - def to_dict(self) -> Dict[str, str]: + def to_dict(self) -> Dict[str, any]: return { "count": self.count, "instanceType": self.instanceType, @@ -150,7 +149,7 @@ class MasterNodesPlan: def __equal__(self, other): return (self.count == other.count and self.instanceType == other.type) - def to_dict(self) -> Dict[str, str]: + def to_dict(self) -> Dict[str, any]: return { "count": self.count, "instanceType": self.instanceType @@ -167,7 +166,7 @@ def __equal__(self, other): return (self.dataNodes == other.dataNodes and self.masterNodes == other.masterNodes) - def to_dict(self) -> Dict[str, str]: + def to_dict(self) -> Dict[str, any]: return { "dataNodes": self.dataNodes.to_dict(), "masterNodes": self.masterNodes.to_dict() @@ -298,7 +297,7 @@ class CaptureVpcPlan: def __equal__(self, other): return self.numAzs == other.numAzs - def to_dict(self) -> Dict[str, str]: + def to_dict(self) -> Dict[str, any]: return { "numAzs": self.numAzs } @@ -316,7 +315,7 @@ def __equal__(self, other): return (self.captureNodes == other.captureNodes and self.ecsResources == other.ecsResources and self.osDomain == other.osDomain and self.captureVpc == other.vpc) - def to_dict(self) -> Dict[str, str]: + def to_dict(self) -> Dict[str, any]: return { "captureNodes": self.captureNodes.to_dict(), "captureVpc": self.captureVpc.to_dict(), diff --git a/manage_arkime/core/user_config.py b/manage_arkime/core/user_config.py new file mode 100644 index 0000000..5c6210a --- /dev/null +++ b/manage_arkime/core/user_config.py @@ -0,0 +1,23 @@ +from dataclasses import dataclass +import logging +from typing import Dict + +logger = logging.getLogger(__name__) + +@dataclass +class UserConfig: + expectedTraffic: float + spiDays: int + replicas: int + + def __equal__(self, other): + return (self.expectedTraffic == other.expectedTraffic and self.spiDays == other.spiDays + and self.replicas == other.replicas) + + def to_dict(self) -> Dict[str, any]: + return { + 'expectedTraffic': self.expectedTraffic, + 'spiDays': self.spiDays, + 'replicas': self.replicas + } + diff --git a/test_manage_arkime/commands/test_create_cluster.py b/test_manage_arkime/commands/test_create_cluster.py index 6b280a9..c8b1386 100644 --- a/test_manage_arkime/commands/test_create_cluster.py +++ b/test_manage_arkime/commands/test_create_cluster.py @@ -4,22 +4,27 @@ import unittest.mock as mock import aws_interactions.ssm_operations as ssm_ops -from commands.create_cluster import cmd_create_cluster, _set_up_viewer_cert, _get_capacity_plan, MustProvideAllParams +from commands.create_cluster import cmd_create_cluster, _set_up_viewer_cert, _get_capacity_plan, _get_user_config import constants as constants from core.capacity_planning import (CaptureNodesPlan, EcsSysResourcePlan, MINIMUM_TRAFFIC, OSDomainPlan, DataNodesPlan, MasterNodesPlan, CaptureVpcPlan, ClusterPlan, DEFAULT_SPI_DAYS, DEFAULT_SPI_REPLICAS, DEFAULT_NUM_AZS) +from core.user_config import UserConfig @mock.patch("commands.create_cluster.AwsClientProvider", mock.Mock()) +@mock.patch("commands.create_cluster._get_user_config") @mock.patch("commands.create_cluster._get_capacity_plan") @mock.patch("commands.create_cluster._set_up_viewer_cert") @mock.patch("commands.create_cluster.CdkClient") -def test_WHEN_cmd_create_cluster_called_THEN_cdk_command_correct(mock_cdk_client_cls, mock_set_up, mock_get_plans): +def test_WHEN_cmd_create_cluster_called_THEN_cdk_command_correct(mock_cdk_client_cls, mock_set_up, mock_get_plans, mock_get_config): # Set up our mock mock_set_up.return_value = "arn" mock_client = mock.Mock() mock_cdk_client_cls.return_value = mock_client + user_config = UserConfig(1, 30, 2) + mock_get_config.return_value = user_config + cluster_plan = ClusterPlan( CaptureNodesPlan("m5.xlarge", 20, 25, 1), CaptureVpcPlan(DEFAULT_NUM_AZS), @@ -60,6 +65,7 @@ def test_WHEN_cmd_create_cluster_called_THEN_cdk_command_correct(mock_cdk_client "nameViewerUserSsmParam": constants.get_viewer_user_ssm_param_name("my-cluster"), "nameViewerNodesStack": constants.get_viewer_nodes_stack_name("my-cluster"), "planCluster": json.dumps(cluster_plan.to_dict()), + "userConfig": json.dumps(user_config.to_dict()), })) } ) @@ -72,98 +78,104 @@ def test_WHEN_cmd_create_cluster_called_THEN_cdk_command_correct(mock_cdk_client assert expected_set_up_calls == mock_set_up.call_args_list @mock.patch("commands.create_cluster.ssm_ops") -def test_WHEN_get_capacity_plan_called_AND_use_existing_THEN_as_expected(mock_ssm_ops): +def test_WHEN_get_user_config_called_AND_use_existing_THEN_as_expected(mock_ssm_ops): # Set up our mock mock_ssm_ops.ParamDoesNotExist = ssm_ops.ParamDoesNotExist mock_ssm_ops.get_ssm_param_json_value.return_value = { - "captureNodes": { - "instanceType":"m5.xlarge","desiredCount":10,"maxCount":12,"minCount":1 - }, - "captureVpc": { - "numAzs": 3 - }, - "ecsResources": { - "cpu": 3584, "memory": 15360 - }, - "osDomain": { - "dataNodes": { - "count": 6, "instanceType": "t3.small.search", "volumeSize": 100 - }, - "masterNodes": { - "count": 3, "instanceType": "c6g.2xlarge.search", - } - } + "expectedTraffic": 1.2, + "spiDays": 40, + "replicas": 2 } mock_provider = mock.Mock() # Run our test - actual_value = _get_capacity_plan("my-cluster", None, None, None, mock_provider) + actual_value = _get_user_config("my-cluster", None, None, None, mock_provider) # Check our results - assert CaptureNodesPlan("m5.xlarge", 10, 12, 1) == actual_value.captureNodes - assert CaptureVpcPlan(3) == actual_value.captureVpc - assert EcsSysResourcePlan(3584, 15360) == actual_value.ecsResources - assert OSDomainPlan(DataNodesPlan(6, "t3.small.search", 100), MasterNodesPlan(3, "c6g.2xlarge.search")) == actual_value.osDomain + assert UserConfig(1.2, 40, 2) == actual_value expected_get_ssm_calls = [ - mock.call(constants.get_cluster_ssm_param_name("my-cluster"), "capacityPlan", mock.ANY) + mock.call(constants.get_cluster_ssm_param_name("my-cluster"), "userConfig", mock.ANY) ] assert expected_get_ssm_calls == mock_ssm_ops.get_ssm_param_json_value.call_args_list -@mock.patch("commands.create_cluster.get_os_domain_plan") -@mock.patch("commands.create_cluster.get_capture_node_capacity_plan") @mock.patch("commands.create_cluster.ssm_ops") -def test_WHEN_get_capacity_plan_called_AND_use_default_THEN_as_expected(mock_ssm_ops, mock_get_cap, mock_get_os): +def test_WHEN_get_user_config_called_AND_partial_update_THEN_as_expected(mock_ssm_ops): # Set up our mock mock_ssm_ops.ParamDoesNotExist = ssm_ops.ParamDoesNotExist - mock_ssm_ops.get_ssm_param_json_value.side_effect = ssm_ops.ParamDoesNotExist("") - mock_get_cap.return_value = CaptureNodesPlan("m5.xlarge", 1, 2, 1) - mock_get_os.return_value = OSDomainPlan(DataNodesPlan(2, "t3.small.search", 100), MasterNodesPlan(3, "m6g.large.search")) + mock_ssm_ops.get_ssm_param_json_value.return_value = { + "expectedTraffic": 1.2, + "spiDays": 40, + "replicas": 2 + } mock_provider = mock.Mock() # Run our test - actual_value = _get_capacity_plan("my-cluster", None, None, None, mock_provider) + actual_value = _get_user_config("my-cluster", None, 30, None, mock_provider) # Check our results - assert mock_get_cap.return_value == actual_value.captureNodes - assert CaptureVpcPlan(DEFAULT_NUM_AZS) == actual_value.captureVpc - assert mock_get_os.return_value == actual_value.osDomain - assert EcsSysResourcePlan(3584, 15360) == actual_value.ecsResources + assert UserConfig(1.2, 30, 2) == actual_value - expected_get_cap_calls = [ - mock.call(MINIMUM_TRAFFIC) + expected_get_ssm_calls = [ + mock.call(constants.get_cluster_ssm_param_name("my-cluster"), "userConfig", mock.ANY) ] - assert expected_get_cap_calls == mock_get_cap.call_args_list + assert expected_get_ssm_calls == mock_ssm_ops.get_ssm_param_json_value.call_args_list - expected_get_os_calls = [ - mock.call(MINIMUM_TRAFFIC, DEFAULT_SPI_DAYS, DEFAULT_SPI_REPLICAS, DEFAULT_NUM_AZS) - ] - assert expected_get_os_calls == mock_get_os.call_args_list +@mock.patch("commands.create_cluster.ssm_ops") +def test_WHEN_get_user_config_called_AND_use_default_THEN_as_expected(mock_ssm_ops): + # Set up our mock + mock_ssm_ops.ParamDoesNotExist = ssm_ops.ParamDoesNotExist + mock_ssm_ops.get_ssm_param_json_value.side_effect = ssm_ops.ParamDoesNotExist("") + + mock_provider = mock.Mock() + + # Run our test + actual_value = _get_user_config("my-cluster", None, None, None, mock_provider) + + # Check our results + assert UserConfig(MINIMUM_TRAFFIC, DEFAULT_SPI_DAYS, DEFAULT_SPI_REPLICAS) == actual_value expected_get_ssm_calls = [ - mock.call(constants.get_cluster_ssm_param_name("my-cluster"), "capacityPlan", mock.ANY) + mock.call(constants.get_cluster_ssm_param_name("my-cluster"), "userConfig", mock.ANY) ] assert expected_get_ssm_calls == mock_ssm_ops.get_ssm_param_json_value.call_args_list +@mock.patch("commands.create_cluster.ssm_ops") +def test_WHEN_get_user_config_called_AND_specify_all_THEN_as_expected(mock_ssm_ops): + # Set up our mock + mock_ssm_ops.ParamDoesNotExist = ssm_ops.ParamDoesNotExist + + mock_provider = mock.Mock() + + # Run our test + actual_value = _get_user_config("my-cluster", 10, 40, 2, mock_provider) + + # Check our results + assert UserConfig(10, 40, 2) == actual_value + + expected_get_ssm_calls = [] + assert expected_get_ssm_calls == mock_ssm_ops.get_ssm_param_json_value.call_args_list + + @mock.patch("commands.create_cluster.get_os_domain_plan") @mock.patch("commands.create_cluster.get_capture_node_capacity_plan") @mock.patch("commands.create_cluster.ssm_ops") -def test_WHEN_get_capacity_plan_called_AND_gen_plan_THEN_as_expected(mock_ssm_ops, mock_get_cap, mock_get_os): +def test_WHEN_get_capacity_plan_called_THEN_as_expected(mock_ssm_ops, mock_get_cap, mock_get_os): # Set up our mock mock_ssm_ops.ParamDoesNotExist = ssm_ops.ParamDoesNotExist mock_ssm_ops.get_ssm_param_json_value.side_effect = ssm_ops.ParamDoesNotExist("") - mock_get_cap.return_value = CaptureNodesPlan("m5.xlarge", 10, 12, 1) - mock_get_os.return_value = OSDomainPlan(DataNodesPlan(20, "r6g.large.search", 100), MasterNodesPlan(3, "m6g.large.search")) + mock_get_cap.return_value = CaptureNodesPlan("m5.xlarge", 1, 2, 1) + mock_get_os.return_value = OSDomainPlan(DataNodesPlan(2, "t3.small.search", 100), MasterNodesPlan(3, "m6g.large.search")) mock_provider = mock.Mock() # Run our test - actual_value = _get_capacity_plan("my-cluster", 10, 40, 2, mock_provider) + actual_value = _get_capacity_plan(UserConfig(1, 40, 2)) # Check our results assert mock_get_cap.return_value == actual_value.captureNodes @@ -172,25 +184,24 @@ def test_WHEN_get_capacity_plan_called_AND_gen_plan_THEN_as_expected(mock_ssm_op assert EcsSysResourcePlan(3584, 15360) == actual_value.ecsResources expected_get_cap_calls = [ - mock.call(10) + mock.call(1) ] assert expected_get_cap_calls == mock_get_cap.call_args_list expected_get_os_calls = [ - mock.call(10, 40, 2, DEFAULT_NUM_AZS) + mock.call(1, 40, 2, DEFAULT_NUM_AZS) ] assert expected_get_os_calls == mock_get_os.call_args_list - expected_get_ssm_calls = [] - assert expected_get_ssm_calls == mock_ssm_ops.get_ssm_param_json_value.call_args_list -def test_WHEN_get_capacity_plan_called_AND_not_all_params_THEN_as_expected(): - # Set up our mock - mock_provider = mock.Mock() - # Run our test - with pytest.raises(MustProvideAllParams): - _get_capacity_plan("my-cluster", 10, None, None, mock_provider) + + + + + + + @mock.patch("commands.create_cluster.upload_default_elb_cert") @mock.patch("commands.create_cluster.ssm_ops") diff --git a/test_manage_arkime/commands/test_destroy_cluster.py b/test_manage_arkime/commands/test_destroy_cluster.py index 20e126a..95edfd5 100644 --- a/test_manage_arkime/commands/test_destroy_cluster.py +++ b/test_manage_arkime/commands/test_destroy_cluster.py @@ -7,6 +7,7 @@ import constants as constants from core.capacity_planning import (CaptureNodesPlan, EcsSysResourcePlan, OSDomainPlan, DataNodesPlan, MasterNodesPlan, ClusterPlan, CaptureVpcPlan) +from core.user_config import UserConfig TEST_CLUSTER = "my-cluster" @@ -61,6 +62,7 @@ def test_WHEN_cmd_destroy_cluster_called_AND_dont_destroy_everything_THEN_expect "nameViewerUserSsmParam": constants.get_viewer_user_ssm_param_name(TEST_CLUSTER), "nameViewerNodesStack": constants.get_viewer_nodes_stack_name(TEST_CLUSTER), "planCluster": json.dumps(cluster_plan.to_dict()), + "userConfig": json.dumps(UserConfig(1, 1, 1).to_dict()), })) } ) @@ -147,6 +149,7 @@ def test_WHEN_cmd_destroy_cluster_called_AND_destroy_everything_THEN_expected_cm "nameViewerUserSsmParam": constants.get_viewer_user_ssm_param_name(TEST_CLUSTER), "nameViewerNodesStack": constants.get_viewer_nodes_stack_name(TEST_CLUSTER), "planCluster": json.dumps(cluster_plan.to_dict()), + "userConfig": json.dumps(UserConfig(1, 1, 1).to_dict()), })) } )