Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CYB-193][UI] Landing page for pipelines. #81

Open
wants to merge 18 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions flink-cyber/cyber-jobs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.cloudera.cyber</groupId>
<artifactId>cyber-worker-service</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down
7 changes: 7 additions & 0 deletions flink-cyber/cyber-jobs/src/main/assemblies/cloudera.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@
<fileMode>0644</fileMode>
</file>

<file>
<source>../cyber-services/cyber-worker-service/target/cyber-worker-service-${project.version}.jar</source>
<outputDirectory>jobs/</outputDirectory>
<destName>cyber-worker-service-${cybersec.full.version}.jar</destName>
<fileMode>0644</fileMode>
</file>

<file>
<source>../flink-commands/scoring-commands/target/scoring-commands-${project.version}.jar</source>
<outputDirectory>tools/</outputDirectory>
Expand Down
5 changes: 5 additions & 0 deletions flink-cyber/cyber-services/cyber-service-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,10 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RequestBody {
private String clusterServiceId;
private String jobIdHex;
private String pipelineDir;
private String pipelineName;
private String branch;
private String profileName;
private List<String> jobs;
private byte[] payload;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package com.cloudera.service.common.request;

public enum RequestType {
GET_ALL_CLUSTERS_SERVICE_REQUEST, GET_CLUSTER_SERVICE_REQUEST, START_JOB_REQUEST, RESTART_JOB_REQUEST, STOP_JOB_REQUEST, GET_JOB_CONFIG_REQUEST, UPDATE_JOB_CONFIG_REQUEST
GET_ALL_CLUSTERS_SERVICE_REQUEST, GET_CLUSTER_SERVICE_REQUEST, START_JOB_REQUEST, RESTART_JOB_REQUEST, STOP_JOB_REQUEST, GET_JOB_CONFIG_REQUEST, CREATE_EMPTY_PIPELINE, START_ARCHIVE_PIPELINE, UPDATE_JOB_CONFIG_REQUEST
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class Job {

private JobType jobType;

private String user;

@AllArgsConstructor
@Getter
public enum JobType {
Expand All @@ -46,7 +48,6 @@ public enum JobType {

public String[] getScript(Job job) {
switch (this) {
case GENERATOR:
case PROFILE:
case PARSER:
return new String[]{scriptName, job.getJobBranch(), job.getJobPipeline(), job.getJobName()};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.cloudera.service.common.response;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Pipeline {
String id;
String name;
String clusterName;
String date;
String userName;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package com.cloudera.service.common.response;

public enum ResponseType {
GET_ALL_CLUSTERS_SERVICE_RESPONSE, GET_CLUSTER_SERVICE_RESPONSE, START_JOB_RESPONSE, RESTART_JOB_RESPONSE, STOP_JOB_RESPONSE, GET_JOB_CONFIG_RESPONSE, UPDATE_JOB_CONFIG_RESPONSE, ERROR_RESPONSE
GET_ALL_CLUSTERS_SERVICE_RESPONSE, GET_CLUSTER_SERVICE_RESPONSE, START_JOB_RESPONSE, RESTART_JOB_RESPONSE, STOP_JOB_RESPONSE, GET_JOB_CONFIG_RESPONSE, UPDATE_JOB_CONFIG_RESPONSE, CREATE_EMPTY_PIPELINE_RESPONSE, START_ARCHIVE_PIPELINE_RESPONSE, ERROR_RESPONSE
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.cloudera.cyber.restcli.configuration;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "cluster")
@Getter
@Setter
public class AppWorkerConfig {
private String name;
private String id;
private String status;
private String version;
private String pipelineDir;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.cloudera.cyber.restcli.controller;

import com.cloudera.cyber.restcli.configuration.AppWorkerConfig;
import com.cloudera.cyber.restcli.service.JobService;
import com.cloudera.cyber.restcli.service.FilePipelineService;
import com.cloudera.service.common.Utils;
import com.cloudera.service.common.request.RequestBody;
import com.cloudera.service.common.request.RequestType;
Expand Down Expand Up @@ -29,18 +31,11 @@
@RequiredArgsConstructor
@Slf4j
public class KafkaListenerController {

@Value("${cluster.name}")
private String clusterName;
@Value("${cluster.id}")
private String clusterId;
@Value("${cluster.status}")
private String clusterStatus;
@Value("${cluster.version}")
private String clusterVersion;

private final JobService jobService;
private final FilePipelineService pipelineService;
private final AppWorkerConfig config;

//TODO: Rewrite to Spring events. Probably split the events into separate types, such as cluster event, job event, pipeline event, etc.
@KafkaListener(topics = "#{kafkaProperties.getRequestTopic()}", containerFactory = "kafkaListenerContainerFactory")
@SendTo({"#{kafkaProperties.getReplyTopic()}"})
public Message<ResponseBody> handleMessage(RequestBody requestBody, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
Expand Down Expand Up @@ -85,6 +80,26 @@ public Message<ResponseBody> handleMessage(RequestBody requestBody, @Header(Kafk
} catch (IOException e) {
return handleErrorResponse(e, replyTo, correlationId);
}
case CREATE_EMPTY_PIPELINE:
try {
pipelineService.createEmptyPipeline(requestBody.getPipelineName(), requestBody.getBranch());
final ResponseBody responseBody = ResponseBody.builder().build();
return buildResponseMessage(responseBody, ResponseType.CREATE_EMPTY_PIPELINE_RESPONSE, replyTo, correlationId);
} catch (Exception e) {
return handleErrorResponse(e, replyTo, correlationId);
}
case START_ARCHIVE_PIPELINE:
try {
pipelineService.extractPipeline(requestBody.getPayload(), requestBody.getPipelineName(), requestBody.getBranch());
pipelineService.startPipelineJob(requestBody.getPipelineName(), requestBody.getBranch(), requestBody.getProfileName(), requestBody.getJobs());
final ResponseBody responseBody = ResponseBody.builder().build();
return buildResponseMessage(responseBody, ResponseType.START_ARCHIVE_PIPELINE_RESPONSE, replyTo, correlationId);
} catch (Exception e) {
log.error("Exception while processing the Start All request {}", e.getMessage());
return handleErrorResponse(e, replyTo, correlationId);

}

}
return null;
}
Expand All @@ -95,10 +110,10 @@ private Message<ResponseBody> getResponseBodyMessage(byte[] replyTo, byte[] corr
ResponseBody responseBody = ResponseBody.builder()
.jobs(jobs)
.clusterMeta(ClusterMeta.builder()
.name(clusterName)
.clusterId(clusterId)
.clusterStatus(clusterStatus)
.version(clusterVersion)
.name(config.getName())
.clusterId(config.getId())
.clusterStatus(config.getStatus())
.version(config.getVersion())
.build())
.build();
return buildResponseMessage(responseBody, responseType, replyTo, correlationId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.cloudera.cyber.restcli.service;

import com.cloudera.cyber.restcli.configuration.AppWorkerConfig;
import com.cloudera.service.common.Utils;
import com.cloudera.service.common.response.Job;
import com.cloudera.service.common.utils.ArchiveUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;

@Slf4j
@Service
@RequiredArgsConstructor
public class FilePipelineService {
private final AppWorkerConfig config;


public void createEmptyPipeline(String pipelineName, String branchName) {
String fullPath = this.config.getPipelineDir().endsWith("/") ? this.config.getPipelineDir() + pipelineName + "/" + branchName
: this.config.getPipelineDir() + "/" + pipelineName + "/" + branchName;
File directory = new File(fullPath);
if (directory.mkdirs()) {
log.info("Create full path {}", fullPath);
}
try {
ProcessBuilder processBuilder = new ProcessBuilder("cs-create-pipeline", pipelineName);
processBuilder.directory(directory);
Process process = processBuilder.start();
process.waitFor();
} catch (IOException ioe) {
log.error("Caught get IOException {} ", ioe.getMessage());
} catch (InterruptedException e) {
log.error("Caught Interrupt Exception with message {} ", e.getMessage());
}
}

public void extractPipeline(byte[] payload, String pipelineName, String branch) throws IOException {
String fullPipelinePath = pipelineName.endsWith("/") ? this.config.getPipelineDir() + pipelineName + "/" + branch : this.config.getPipelineDir() + "/" + pipelineName + "/" + branch;
ArchiveUtil.decompressFromTarGzInMemory(payload, fullPipelinePath, true);
}

public void startPipelineJob(String pipelineName, String branch, String profileName, List<String> jobsNames) throws IOException {
String fullPipelinePath = pipelineName.endsWith("/") ?this.config.getPipelineDir() + pipelineName + "/" + branch
: this.config.getPipelineDir() + "/" + pipelineName + "/" + branch;

List<Job> jobs = jobsNames.stream().map(jobName -> Job.builder()
.jobPipeline(pipelineName)
.jobType(Utils.getEnumFromString(jobName, Job.JobType.class, Job.JobType::getName))
.jobBranch(branch)
.jobName(StringUtils.defaultString(profileName, "main"))
.build()).collect(Collectors.toList());
for (Job job : jobs) {
job.getJobType().getScript(job);
ProcessBuilder processBuilder = new ProcessBuilder(job.getJobType().getScript(job));
processBuilder.directory(new File(fullPipelinePath));
Process process = processBuilder.start();
Thread clt = new Thread(() -> {
try {
log.info(IOUtils.toString(process.getInputStream(), StandardCharsets.UTF_8));
log.error(IOUtils.toString(process.getErrorStream(), StandardCharsets.UTF_8));
} catch (IOException e) {
log.error("Error happens on stream reading from bash {}", e.getMessage());
}
});
clt.setDaemon(true);
clt.start();
}
}

}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package com.cloudera.cyber.restcli.service;

import com.cloudera.cyber.restcli.configuration.AppWorkerConfig;
import com.cloudera.service.common.Utils;
import com.cloudera.service.common.response.Job;
import com.cloudera.service.common.utils.ArchiveUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.io.BufferedReader;
Expand All @@ -27,12 +28,13 @@

@Slf4j
@Service
@RequiredArgsConstructor
public class JobService {
@Value("${cluster.pipeline.dir}")
private String pipelineDir;
public static final String LOG_CLI_JOB_INFO = "Successfully read jobs from cli with exit code {}. job count '{}' jobs data '[{}]'";
private final Pattern pattern = Pattern.compile("^(?<date>[\\d.:\\s]+)\\s:\\s(?<jobId>[a-fA-F0-9]+)\\s:\\s(?<jobFullName>[\\w.-]+)\\s\\((?<jobStatus>\\w+)\\)$");

private final AppWorkerConfig config;


public List<Job> getJobs() throws IOException {
List<Job> jobs = new ArrayList<>();
Expand All @@ -56,8 +58,8 @@ public Job restartJob(String id) throws IOException {
log.info("Script command = '{}'", Arrays.toString(job.getJobType().getScript(job)));
try {
ProcessBuilder processBuilder = new ProcessBuilder(job.getJobType().getScript(job));
if (pipelineDir != null) {
processBuilder.directory(new File(pipelineDir));
if (config.getPipelineDir() != null) {
processBuilder.directory(new File(config.getPipelineDir()));
}
Process process = processBuilder.start();
log.debug("Command input stream '{}' \n Command error stream '{}'", IOUtils.toString(process.getInputStream(), StandardCharsets.UTF_8), IOUtils.toString(process.getErrorStream(), StandardCharsets.UTF_8));
Expand Down Expand Up @@ -155,12 +157,12 @@ private void setJobParameters(Job job, String fullJobName) {
String[] jobParameters = fullJobName.split("\\.");
job.setJobBranch(jobParameters[0]);
job.setJobPipeline(jobParameters[1]);
if (job.getJobType() == Job.JobType.PROFILE || job.getJobType() == Job.JobType.GENERATOR || job.getJobType() == Job.JobType.PARSER) {
if (job.getJobType() == Job.JobType.PROFILE || job.getJobType() == Job.JobType.PARSER) {
job.setJobName(jobParameters[jobParameters.length - 1]);
}
}

public void updateConfig(byte[] payload) throws IOException {
ArchiveUtil.decompressFromTarGzInMemory(payload, pipelineDir, true);
ArchiveUtil.decompressFromTarGzInMemory(payload, config.getPipelineDir(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ cluster.name=testName
cluster.id=1
cluster.status=online
cluster.version=1.0.0
cluster.pipeline.dir=/tmp/test
cluster.pipelineDir=/tmp/test
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=testGroup
spring.kafka.consumer.auto-offset-reset=earliest
Expand Down
11 changes: 11 additions & 0 deletions flink-cyber/flink-commands/json-commands/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@
<version>${log4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
Expand Down
1 change: 1 addition & 0 deletions flink-cyber/flink-commands/scoring-commands/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
Expand Down
5 changes: 5 additions & 0 deletions flink-cyber/flink-profiler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@
<artifactId>flink-avro</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
"simple-import-sort/imports": "off",
"array-bracket-spacing": "off",
"no-underscore-dangle": "off",
"@typescript-eslint/member-ordering": [
"warn",
{
"default": ["static-field", "instance-field", "static-method", "instance-method"]
}],
"@typescript-eslint/no-unused-vars": [
"warn",
{
Expand Down
Loading
Loading