Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FS-172266, FS-199069] : Conductor Monitor Changes for queue depth metrics #28

Open
wants to merge 33 commits into
base: freshservice_staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d9243c6
[FS-172266] : Fixing conductor prom metrics
keerthivaasan-kanagaraj May 27, 2024
40e6404
test
prabhakaranE6157 Sep 23, 2024
c3aea1b
Merge branch 'freshservice_staging' into lt_test_load_gowtham
keerthivaasan-kanagaraj Sep 25, 2024
4057c91
Merge branch 'freshservice_staging' into nw_metrics
keerthivaasan-kanagaraj Sep 25, 2024
306751b
enable monitor
keerthivaasan-kanagaraj Sep 25, 2024
de5291e
fix
keerthivaasan-kanagaraj Sep 25, 2024
625e9c5
due to scylla time out commenting this code
keerthivaasan-kanagaraj Sep 25, 2024
862374a
fix
keerthivaasan-kanagaraj Sep 25, 2024
cd8e88e
fix logs
keerthivaasan-kanagaraj Sep 25, 2024
0e7cda2
fix
keerthivaasan-kanagaraj Sep 26, 2024
2274aea
fix
keerthivaasan-kanagaraj Sep 26, 2024
0b0e271
fix
keerthivaasan-kanagaraj Sep 26, 2024
611901e
for now
keerthivaasan-kanagaraj Sep 26, 2024
dbe8f97
Merge branch 'lt_test_load_gowtham' into nw_metrics
keerthivaasan-kanagaraj Sep 26, 2024
a364060
fix
keerthivaasan-kanagaraj Sep 26, 2024
cdd6515
bucket http metrics
keerthivaasan-kanagaraj Oct 1, 2024
3d9d5a1
Merge branch 'freshservice_staging' into lt_test_load_gowtham
prabhakaranE6157 Oct 3, 2024
b9bc589
t
prabhakaranE6157 Oct 4, 2024
c5aef6a
Merge branch 'freshservice_staging' into nw_metrics
prabhakaranE6157 Oct 16, 2024
d66f80b
Merge branch 'lt_test_load_gowtham' into nw_metrics
prabhakaranE6157 Oct 16, 2024
dac0c75
Fix workflow monitoring
keerthivaasan-kanagaraj Oct 28, 2024
a486979
Merge branch 'freshservice_staging' into nw_metrics
keerthivaasan-kanagaraj Oct 28, 2024
07476d5
[FS-199069] : Conductor Monitor Changes for queue depth metrics
keerthivaasan-kanagaraj Oct 29, 2024
656ed80
fix
keerthivaasan-kanagaraj Oct 29, 2024
4de5591
fix
keerthivaasan-kanagaraj Oct 29, 2024
7167a2c
Review comments
keerthivaasan-kanagaraj Oct 30, 2024
2126390
fix
keerthivaasan-kanagaraj Nov 4, 2024
796ef7d
fix
keerthivaasan-kanagaraj Nov 4, 2024
4d9dd6d
fix
keerthivaasan-kanagaraj Nov 4, 2024
686ca84
fix
keerthivaasan-kanagaraj Nov 5, 2024
cbbc556
fix
keerthivaasan-kanagaraj Nov 7, 2024
ebdeef1
fix log
keerthivaasan-kanagaraj Nov 7, 2024
5382b1e
fix
keerthivaasan-kanagaraj Nov 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,13 @@ public void sweep(String workflowId) {
long workflowOffsetTimeout =
workflowOffsetWithJitter(properties.getWorkflowOffsetTimeout().getSeconds());
if (workflow != null) {
long startTime = Instant.now().toEpochMilli();
// long startTime = Instant.now().toEpochMilli();
unack(workflow, workflowOffsetTimeout);
long endTime = Instant.now().toEpochMilli();
Monitors.recordUnackTime(workflow.getWorkflowName(), endTime - startTime);
// long endTime = Instant.now().toEpochMilli();
// This metrics(recordUnackTime) has workflowName in our prod account we have more number of workflows.
// So we may get cardinality problems in haystack due to this.
// Disabling this temporarily.
// Monitors.recordUnackTime(workflow.getWorkflowName(), endTime - startTime);
keerthivaasan-kanagaraj marked this conversation as resolved.
Show resolved Hide resolved
} else {
LOGGER.warn(
"Workflow with {} id can not be found. Attempting to unack using the id",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,51 +74,61 @@ public WorkflowMonitor(
}

@Scheduled(
initialDelayString = "${conductor.workflow-monitor.stats.initial-delay:120000}",
fixedDelayString = "${conductor.workflow-monitor.stats.delay:60000}")
initialDelayString = "${conductor.workflow-monitor.stats.initial-delay}",
fixedDelayString = "${conductor.workflow-monitor.stats.delay}")
public void reportMetrics() {
try {
if (refreshCounter <= 0) {
workflowDefs = metadataService.getWorkflowDefs();
if (refreshCounter <= 0) {
// workflowDefs = metadataService.getWorkflowDefs();
taskDefs = new ArrayList<>(metadataService.getTaskDefs());
refreshCounter = metadataRefreshInterval;
}

getPendingWorkflowToOwnerAppMap(workflowDefs)
.forEach(
(workflowName, ownerApp) -> {
long count =
executionDAOFacade.getPendingWorkflowCount(workflowName);
Monitors.recordRunningWorkflows(count, workflowName, ownerApp);
});

taskDefs.forEach(
taskDef -> {
}

// Commented out as we don't have use case for Pending Workflows as of now and hence we don't have pending workflow implementation from

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Remove these code if they are no longer needed
  • We can always get them from the version control

// scylla persistence.
// try {
// getPendingWorkflowToOwnerAppMap(workflowDefs)
// .forEach(
// (workflowName, ownerApp) -> {
// long count =
// executionDAOFacade.getPendingWorkflowCount(workflowName);
// Monitors.recordRunningWorkflows(count, workflowName, ownerApp);
// });
// } catch (Exception e) {
// LOGGER.error("Error while publishing scheduled metrics", e);
// }

try {
taskDefs.forEach(
taskDef -> {
long size = queueDAO.getSize(taskDef.getName());
long inProgressCount =
executionDAOFacade.getInProgressTaskCount(taskDef.getName());
// long inProgressCount =
// executionDAOFacade.getInProgressTaskCount(taskDef.getName());
Monitors.recordQueueDepth(taskDef.getName(), size, taskDef.getOwnerApp());
if (taskDef.concurrencyLimit() > 0) {
Monitors.recordTaskInProgress(
taskDef.getName(), inProgressCount, taskDef.getOwnerApp());
}
});

asyncSystemTasks.forEach(
workflowSystemTask -> {
// if (taskDef.concurrencyLimit() > 0) {
// Monitors.recordTaskInProgress(
// taskDef.getName(), inProgressCount, taskDef.getOwnerApp());
// }
});
} catch (Exception e) {
LOGGER.error("Error while publishing scheduled metrics", e);
}

try {
asyncSystemTasks.forEach(
workflowSystemTask -> {
long size = queueDAO.getSize(workflowSystemTask.getTaskType());
long inProgressCount =
executionDAOFacade.getInProgressTaskCount(
workflowSystemTask.getTaskType());
// long inProgressCount =
// executionDAOFacade.getInProgressTaskCount(
// workflowSystemTask.getTaskType());
Monitors.recordQueueDepth(workflowSystemTask.getTaskType(), size, "system");
Monitors.recordTaskInProgress(
workflowSystemTask.getTaskType(), inProgressCount, "system");
});

refreshCounter--;
// Monitors.recordTaskInProgress(
// workflowSystemTask.getTaskType(), inProgressCount, "system");
});
} catch (Exception e) {
LOGGER.error("Error while publishing scheduled metrics", e);
LOGGER.error("Error while publishing scheduled metrics", e);
}
LOGGER.info("Workflow Scheduled Monitor Completed");
// refreshCounter--;
}

/**
Expand Down
29 changes: 25 additions & 4 deletions docker/server/bin/fsstartup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,42 @@

echo "Starting Conductor server"

function print_log()
{
echo -e "$(date +'[%F %T %Z]') $*"
}

# Start the server
cd /app/libs
echo "Property file: $CONFIG_PROP"
echo $CONFIG_PROP
print_log "Property file: $CONFIG_PROP"
print_log $CONFIG_PROP
export config_file=

if [ -z "$CONFIG_PROP" ];
then
echo "Using an in-memory instance of conductor";
print_log "Using an in-memory instance of conductor";
export config_file=/app/config/config-local.properties
else
echo "Using '$CONFIG_PROP'";
print_log "Using '$CONFIG_PROP'";
export config_file=/app/config/$CONFIG_PROP
fi

[[ -z ${HOSTNAME} ]] && { print_log "Error: HOSTNAME environment variable not set"; exit 1; }

if [[ "$HOSTNAME" == *"conductor-monitor"* ]]; then
print_log "Generating Envs for conductor-monitor"
export WORKFLOW_MONITOR_STATS_INITIAL_DELAY="30000"
export WORKFLOW_MONITOR_STATS_FIXED_DELAY="10000"
else
print_log "Generating Envs for common layers"
# Disabling scheduled monitor for common layers by keeping large delay(30 days)
export WORKFLOW_MONITOR_STATS_INITIAL_DELAY="2592000000"
export WORKFLOW_MONITOR_STATS_FIXED_DELAY="2592000000"
fi

echo "WORKFLOW_MONITOR_STATS_INITIAL_DELAY: $WORKFLOW_MONITOR_STATS_INITIAL_DELAY"
echo "WORKFLOW_MONITOR_STATS_FIXED_DELAY: $WORKFLOW_MONITOR_STATS_FIXED_DELAY"

echo "Using java options config: $JAVA_OPTS"

OTEL_TRACES_SAMPLER=parentbased_always_off OTEL_RESOURCE_ATTRIBUTES=service.name=${OTEL_SERVICE_NAME},host.name=${POD_NAME},host.ip=${POD_IP} OTEL_EXPORTER_OTLP_ENDPOINT=http://${HOST_IP}:5680 OTEL_METRICS_EXPORTER=none java ${JAVA_OPTS} -jar -DCONDUCTOR_CONFIG_FILE=$config_file conductor-server-*-boot.jar 2>&1 | tee -a /app/logs/server.log
19 changes: 17 additions & 2 deletions docker/server/config/config-conductor-server-lt.properties
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ conductor.scylla.keyspace=${SCYLLA_KEYSPACE}
conductor.scylla.replicationFactorValue=${SCYLLA_REPLICATIONFACTOR}
conductor.scylla.replicationStrategy=${SCYLLA_REPLICATION_STRATEGY}

conductor.workflow-status-listener.type=archive
# Workflow Execution configuration for cpu and memory limits
conductor.workflow-reconciler.enabled=true
conductor.workflow-monitor.enabled=true
conductor.workflow-monitor.stats.initial-delay=${WORKFLOW_MONITOR_STATS_INITIAL_DELAY}
conductor.workflow-monitor.stats.delay=${WORKFLOW_MONITOR_STATS_FIXED_DELAY}
conductor.workflow-repair-service.enabled=true

conductor.redis.queueShardingStrategy=${REDIS_QUEUE_SHARDING_STRATEGY}
conductor.app.executorServiceMaxThreadCount=${EXTOR_MAX_THREAD_COUNT}
conductor.app.systemTaskWorkerThreadCount=${SYSTEM_TASK_WORKER_THREAD_COUNT}
Expand Down Expand Up @@ -54,4 +60,13 @@ conductor.redis-lock.namespace=${REDIS_LOCK_NS_PREFIX}
loadSample=false

# Pushing Prom Metrics
conductor.metrics-prometheus.enabled = true
conductor.metrics-prometheus.enabled = true

#Actuator Endpoints
management.endpoints.web.exposure.include=info,metrics,health,prometheus,httptrace
management.metrics.distribution.sla.http.server.requests=100ms,150ms,250ms,500ms,1s
management.metrics.distribution.percentiles-histogram.http.server.requests=true
management.metrics.enable.jvm=true
management.endpoint.metrics.enabled=true
management.endpoint.prometheus.enabled=true
management.prometheus.metrics.export.enabled=true
5 changes: 4 additions & 1 deletion docker/server/config/config-conductor-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ conductor.indexing.enabled=false

# Workflow Execution configuration for cpu and memory limits
conductor.workflow-reconciler.enabled=true
conductor.workflow-monitor.enabled=false
conductor.workflow-monitor.enabled=true
conductor.workflow-monitor.stats.initial-delay=${WORKFLOW_MONITOR_STATS_INITIAL_DELAY}
conductor.workflow-monitor.stats.delay=${WORKFLOW_MONITOR_STATS_FIXED_DELAY}
conductor.workflow-repair-service.enabled=true
conductor.workflow-scylla-execution-lock.enabled=true
conductor.redis-lock.serverAddress=redis://${REDIS_HOST}:${REDIS_PORT}
Expand All @@ -38,6 +40,7 @@ conductor.metrics-prometheus.enabled = true
#Actuator Endpoints
management.endpoints.web.exposure.include=info,metrics,health,prometheus,httptrace
management.metrics.distribution.sla.http.server.requests=100ms,150ms,250ms,500ms,1s
management.metrics.distribution.percentiles-histogram.http.server.requests=true
management.metrics.enable.jvm=true
management.endpoint.metrics.enabled=true
management.endpoint.prometheus.enabled=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,20 +248,20 @@ <T> T readValue(String json, Class<T> clazz) {
}

void recordCassandraDaoRequests(String action) {
recordCassandraDaoRequests(action, "n/a", "n/a");
// recordCassandraDaoRequests(action, "n/a", "n/a");
}

void recordCassandraDaoRequests(String action, String taskType, String workflowType) {
Monitors.recordDaoRequests(DAO_NAME, action, taskType, workflowType);
// Monitors.recordDaoRequests(DAO_NAME, action, taskType, workflowType);
}

void recordCassandraDaoEventRequests(String action, String event) {
Monitors.recordDaoEventRequests(DAO_NAME, action, event);
// Monitors.recordDaoEventRequests(DAO_NAME, action, event);
}

void recordCassandraDaoPayloadSize(
String action, int size, String taskType, String workflowType) {
Monitors.recordDaoPayloadSize(DAO_NAME, action, taskType, workflowType, size);
// Monitors.recordDaoPayloadSize(DAO_NAME, action, taskType, workflowType, size);
}

static class WorkflowMetadata {
Expand Down
1 change: 1 addition & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ dependencies {
implementation 'com.netflix.conductor:conductor-kafka:3.13.3'

implementation 'io.micrometer:micrometer-registry-prometheus:1.11.0'
implementation "com.netflix.spectator:spectator-reg-micrometer:${revSpectator}"

implementation "io.opentelemetry:opentelemetry-extension-annotations:1.18.0"
implementation "io.opentelemetry:opentelemetry-api:1.35.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.netflix.conductor.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Configuration;

import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Spectator;
import com.netflix.spectator.micrometer.MicrometerRegistry;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import io.micrometer.prometheus.PrometheusRenameFilter;
import io.prometheus.client.CollectorRegistry;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Metrics;

// This class loads all the configurations related to prometheus.
@Configuration
public class PrometheusIntegrationConfig
implements CommandLineRunner {

private static final Logger log = LoggerFactory.getLogger(PrometheusIntegrationConfig.class);
private PrometheusMeterRegistry prometheusRegistry;

public PrometheusIntegrationConfig(PrometheusMeterRegistry prometheusRegistry) {
this.prometheusRegistry = prometheusRegistry;
}

@Override
public void run(String... args) throws Exception {
log.info("Registered PrometheusRegistry");
final MicrometerRegistry metricsRegistry = new MicrometerRegistry(prometheusRegistry);
prometheusRegistry.config().meterFilter(new PrometheusRenameFilter());
Spectator.globalRegistry().add(metricsRegistry);
}
}