Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FS-172266, FS-199069] : Conductor Monitor Changes for queue depth metrics #28

Open
wants to merge 33 commits into
base: freshservice_staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d9243c6
[FS-172266] : Fixing conductor prom metrics
keerthivaasan-kanagaraj May 27, 2024
40e6404
test
prabhakaranE6157 Sep 23, 2024
c3aea1b
Merge branch 'freshservice_staging' into lt_test_load_gowtham
keerthivaasan-kanagaraj Sep 25, 2024
4057c91
Merge branch 'freshservice_staging' into nw_metrics
keerthivaasan-kanagaraj Sep 25, 2024
306751b
enable monitor
keerthivaasan-kanagaraj Sep 25, 2024
de5291e
fix
keerthivaasan-kanagaraj Sep 25, 2024
625e9c5
due to scylla time out commenting this code
keerthivaasan-kanagaraj Sep 25, 2024
862374a
fix
keerthivaasan-kanagaraj Sep 25, 2024
cd8e88e
fix logs
keerthivaasan-kanagaraj Sep 25, 2024
0e7cda2
fix
keerthivaasan-kanagaraj Sep 26, 2024
2274aea
fix
keerthivaasan-kanagaraj Sep 26, 2024
0b0e271
fix
keerthivaasan-kanagaraj Sep 26, 2024
611901e
for now
keerthivaasan-kanagaraj Sep 26, 2024
dbe8f97
Merge branch 'lt_test_load_gowtham' into nw_metrics
keerthivaasan-kanagaraj Sep 26, 2024
a364060
fix
keerthivaasan-kanagaraj Sep 26, 2024
cdd6515
bucket http metrics
keerthivaasan-kanagaraj Oct 1, 2024
3d9d5a1
Merge branch 'freshservice_staging' into lt_test_load_gowtham
prabhakaranE6157 Oct 3, 2024
b9bc589
t
prabhakaranE6157 Oct 4, 2024
c5aef6a
Merge branch 'freshservice_staging' into nw_metrics
prabhakaranE6157 Oct 16, 2024
d66f80b
Merge branch 'lt_test_load_gowtham' into nw_metrics
prabhakaranE6157 Oct 16, 2024
dac0c75
Fix workflow monitoring
keerthivaasan-kanagaraj Oct 28, 2024
a486979
Merge branch 'freshservice_staging' into nw_metrics
keerthivaasan-kanagaraj Oct 28, 2024
07476d5
[FS-199069] : Conductor Monitor Changes for queue depth metrics
keerthivaasan-kanagaraj Oct 29, 2024
656ed80
fix
keerthivaasan-kanagaraj Oct 29, 2024
4de5591
fix
keerthivaasan-kanagaraj Oct 29, 2024
7167a2c
Review comments
keerthivaasan-kanagaraj Oct 30, 2024
2126390
fix
keerthivaasan-kanagaraj Nov 4, 2024
796ef7d
fix
keerthivaasan-kanagaraj Nov 4, 2024
4d9dd6d
fix
keerthivaasan-kanagaraj Nov 4, 2024
686ca84
fix
keerthivaasan-kanagaraj Nov 5, 2024
cbbc556
fix
keerthivaasan-kanagaraj Nov 7, 2024
ebdeef1
fix log
keerthivaasan-kanagaraj Nov 7, 2024
5382b1e
fix
keerthivaasan-kanagaraj Nov 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,13 @@ public void sweep(String workflowId) {
long workflowOffsetTimeout =
workflowOffsetWithJitter(properties.getWorkflowOffsetTimeout().getSeconds());
if (workflow != null) {
long startTime = Instant.now().toEpochMilli();
// long startTime = Instant.now().toEpochMilli();
unack(workflow, workflowOffsetTimeout);
long endTime = Instant.now().toEpochMilli();
Monitors.recordUnackTime(workflow.getWorkflowName(), endTime - startTime);
// long endTime = Instant.now().toEpochMilli();
// This metrics(recordUnackTime) has workflowName in our prod account we have more number of workflows.
// So we may get cardinality problems in haystack due to this.
// Disabling this temporarily.
// Monitors.recordUnackTime(workflow.getWorkflowName(), endTime - startTime);
keerthivaasan-kanagaraj marked this conversation as resolved.
Show resolved Hide resolved
} else {
LOGGER.warn(
"Workflow with {} id can not be found. Attempting to unack using the id",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,48 +77,57 @@ public WorkflowMonitor(
initialDelayString = "${conductor.workflow-monitor.stats.initial-delay:120000}",
fixedDelayString = "${conductor.workflow-monitor.stats.delay:60000}")
public void reportMetrics() {
try {
if (refreshCounter <= 0) {
if (refreshCounter <= 0) {
workflowDefs = metadataService.getWorkflowDefs();
taskDefs = new ArrayList<>(metadataService.getTaskDefs());
refreshCounter = metadataRefreshInterval;
}

getPendingWorkflowToOwnerAppMap(workflowDefs)
.forEach(
(workflowName, ownerApp) -> {
long count =
executionDAOFacade.getPendingWorkflowCount(workflowName);
Monitors.recordRunningWorkflows(count, workflowName, ownerApp);
});

taskDefs.forEach(
taskDef -> {
}

// Commented out as we don't have use case for Pending Workflows as of now and hence we don't have pending workflow implementation from

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Remove these code if they are no longer needed
  • We can always get them from the version control

// scylla persistence.
// try {
// getPendingWorkflowToOwnerAppMap(workflowDefs)
// .forEach(
// (workflowName, ownerApp) -> {
// long count =
// executionDAOFacade.getPendingWorkflowCount(workflowName);
// Monitors.recordRunningWorkflows(count, workflowName, ownerApp);
// });
// } catch (Exception e) {
// LOGGER.error("Error while publishing scheduled metrics", e);
// }

try {
taskDefs.forEach(
taskDef -> {
long size = queueDAO.getSize(taskDef.getName());
long inProgressCount =
executionDAOFacade.getInProgressTaskCount(taskDef.getName());
Monitors.recordQueueDepth(taskDef.getName(), size, taskDef.getOwnerApp());
if (taskDef.concurrencyLimit() > 0) {
Monitors.recordTaskInProgress(
taskDef.getName(), inProgressCount, taskDef.getOwnerApp());
Monitors.recordTaskInProgress(
taskDef.getName(), inProgressCount, taskDef.getOwnerApp());
}
});
});
} catch (Exception e) {
LOGGER.error("Error while publishing scheduled metrics", e);
}

asyncSystemTasks.forEach(
workflowSystemTask -> {
try {
asyncSystemTasks.forEach(
workflowSystemTask -> {
long size = queueDAO.getSize(workflowSystemTask.getTaskType());
long inProgressCount =
executionDAOFacade.getInProgressTaskCount(
workflowSystemTask.getTaskType());
Monitors.recordQueueDepth(workflowSystemTask.getTaskType(), size, "system");
Monitors.recordTaskInProgress(
workflowSystemTask.getTaskType(), inProgressCount, "system");
});

refreshCounter--;
});
} catch (Exception e) {
LOGGER.error("Error while publishing scheduled metrics", e);
LOGGER.error("Error while publishing scheduled metrics", e);
}
refreshCounter--;
}

/**
Expand Down
1 change: 1 addition & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ dependencies {
implementation 'com.netflix.conductor:conductor-kafka:3.13.3'

implementation 'io.micrometer:micrometer-registry-prometheus:1.11.0'
implementation "com.netflix.spectator:spectator-reg-micrometer:${revSpectator}"

implementation "io.opentelemetry:opentelemetry-extension-annotations:1.18.0"
implementation "io.opentelemetry:opentelemetry-api:1.35.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.netflix.conductor.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Configuration;

import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Spectator;
import com.netflix.spectator.micrometer.MicrometerRegistry;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import io.micrometer.prometheus.PrometheusRenameFilter;
import io.prometheus.client.CollectorRegistry;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Metrics;

// This class loads all the configurations related to prometheus.
@Configuration
public class PrometheusIntegrationConfig
implements CommandLineRunner {

private static final Logger log = LoggerFactory.getLogger(PrometheusIntegrationConfig.class);
private static PrometheusMeterRegistry prometheusRegistry;
keerthivaasan-kanagaraj marked this conversation as resolved.
Show resolved Hide resolved

public PrometheusIntegrationConfig(PrometheusMeterRegistry prometheusRegistry) {
this.prometheusRegistry = prometheusRegistry;
}

@Override
public void run(String... args) throws Exception {
setupPrometheusRegistry();
}

/**
* To Register PrometheusRegistry
*/
private static void setupPrometheusRegistry() {
keerthivaasan-kanagaraj marked this conversation as resolved.
Show resolved Hide resolved
log.info("Registered PrometheusRegistry");
final MicrometerRegistry metricsRegistry = new MicrometerRegistry(prometheusRegistry);
prometheusRegistry.config().meterFilter(new PrometheusRenameFilter());
Spectator.globalRegistry().add(metricsRegistry);
}

}