Skip to content
This repository has been archived by the owner on May 14, 2022. It is now read-only.

Commit

Permalink
Use Map data structure to store tasks in JobSnapshot (#1130)
Browse files Browse the repository at this point in the history
so we can more reliably eliminate task duplicates with different versions
  • Loading branch information
tbak authored Oct 8, 2021
1 parent 809bea4 commit e119187
Show file tree
Hide file tree
Showing 14 changed files with 171 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.netflix.titus.runtime.connector.jobmanager;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Predicate;
Expand Down Expand Up @@ -63,7 +64,7 @@ public List<Task> getTasks() {

@Override
public List<Task> getTasks(String jobId) {
return replicator.getCurrent().getTasks(jobId);
return new ArrayList<>(replicator.getCurrent().getTasks(jobId).values());
}

@Override
Expand All @@ -76,7 +77,7 @@ public List<Job<?>> findJobs(Predicate<Pair<Job<?>, List<Task>>> queryPredicate,
JobSnapshot snapshot = replicator.getCurrent();

return snapshot.getJobMap().values().stream()
.filter(job -> queryPredicate.test(Pair.of(job, snapshot.getTasks(job.getId()))))
.filter(job -> queryPredicate.test(Pair.of(job, new ArrayList<>(snapshot.getTasks(job.getId()).values()))))
.skip(offset)
.limit(limit)
.collect(Collectors.toList());
Expand All @@ -87,7 +88,7 @@ public List<Pair<Job<?>, Task>> findTasks(Predicate<Pair<Job<?>, Task>> queryPre
JobSnapshot snapshot = replicator.getCurrent();

return snapshot.getJobMap().values().stream()
.flatMap(job -> snapshot.getTasks(job.getId()).stream()
.flatMap(job -> snapshot.getTasks(job.getId()).values().stream()
.filter(task -> queryPredicate.test(Pair.of(job, task)))
.map(task -> Pair.<Job<?>, Task>of(job, task))
)
Expand All @@ -113,7 +114,7 @@ public Observable<JobManagerEvent<?>> observeJobs(Predicate<Pair<Job<?>, List<Ta
if (event.getRight() instanceof JobUpdateEvent) {
JobUpdateEvent jobUpdateEvent = (JobUpdateEvent) event.getRight();
Job<?> job = jobUpdateEvent.getCurrent();
List<Task> tasks = replicator.getCurrent().getTasks(job.getId());
List<Task> tasks = new ArrayList<>(replicator.getCurrent().getTasks(job.getId()).values());
return jobsPredicate.test(Pair.of(job, tasks));
}
if (event.getRight() instanceof TaskUpdateEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ public String getSnapshotId() {
@Deprecated
public abstract List<Task> getTasks();

public abstract List<Task> getTasks(String jobId);
public abstract Map<String, Task> getTasks(String jobId);

/**
* This value is expensive to compute on each update. {@link PCollectionJobSnapshot} computes it lazily to avoid
* the overhead. Consider using other methods.
*/
@Deprecated
public abstract List<Pair<Job<?>, List<Task>>> getJobsAndTasks();
public abstract List<Pair<Job<?>, Map<String, Task>>> getJobsAndTasks();

public abstract Optional<Pair<Job<?>, Task>> findTaskById(String taskId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.netflix.titus.runtime.connector.jobmanager;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
Expand Down Expand Up @@ -53,7 +52,7 @@ public static JobSnapshotFactory newDefault(boolean autoFixInconsistencies, Cons
private static class LegacyJobSnapshotFactory implements JobSnapshotFactory {

@Override
public JobSnapshot newSnapshot(Map<String, Job<?>> jobsById, Map<String, List<Task>> tasksByJobId) {
public JobSnapshot newSnapshot(Map<String, Job<?>> jobsById, Map<String, Map<String, Task>> tasksByJobId) {
return LegacyJobSnapshot.newInstance(UUID.randomUUID().toString(), jobsById, tasksByJobId);
}
}
Expand All @@ -69,7 +68,7 @@ private PCollectionJobSnapshotFactory(boolean autoFixInconsistencies, Consumer<S
}

@Override
public JobSnapshot newSnapshot(Map<String, Job<?>> jobsById, Map<String, List<Task>> tasksByJobId) {
public JobSnapshot newSnapshot(Map<String, Job<?>> jobsById, Map<String, Map<String, Task>> tasksByJobId) {
return PCollectionJobSnapshot.newInstance(
UUID.randomUUID().toString(),
jobsById,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@

package com.netflix.titus.runtime.connector.jobmanager;

import java.util.List;
import java.util.Map;

import com.netflix.titus.api.jobmanager.model.job.Job;
import com.netflix.titus.api.jobmanager.model.job.Task;

public interface JobSnapshotFactory {
JobSnapshot newSnapshot(Map<String, Job<?>> jobsById, Map<String, List<Task>> tasksByJobId);
JobSnapshot newSnapshot(Map<String, Job<?>> jobsById, Map<String, Map<String, Task>> tasksByJobId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ public class LegacyJobSnapshot extends JobSnapshot {
private static final LegacyJobSnapshot EMPTY = new Builder("empty", Collections.emptyMap(), Collections.emptyMap()).build();

private final Map<String, Job<?>> jobsById;
private final Map<String, List<Task>> tasksByJobId;
private final Map<String, Map<String, Task>> tasksByJobId;
private final List<Job<?>> allJobs;
private final List<Task> allTasks;
private final List<Pair<Job<?>, List<Task>>> allJobsAndTasks;
private final List<Pair<Job<?>, Map<String, Task>>> allJobsAndTasks;
private final Map<String, Task> taskById;

private final String signature;
Expand All @@ -57,20 +57,20 @@ public static LegacyJobSnapshot empty() {
return EMPTY;
}

public static LegacyJobSnapshot newInstance(String snapshotId, Map<String, Job<?>> jobsById, Map<String, List<Task>> tasksByJobId) {
public static LegacyJobSnapshot newInstance(String snapshotId, Map<String, Job<?>> jobsById, Map<String, Map<String, Task>> tasksByJobId) {
return new Builder(snapshotId, jobsById, tasksByJobId).build();
}

public static Builder newBuilder(String snapshotId, Map<String, Job<?>> jobsById, Map<String, List<Task>> tasksByJobId) {
public static Builder newBuilder(String snapshotId, Map<String, Job<?>> jobsById, Map<String, Map<String, Task>> tasksByJobId) {
return new Builder(snapshotId, jobsById, tasksByJobId);
}

public static Builder newBuilder(String snapshotId) {
return new Builder(snapshotId);
}

private LegacyJobSnapshot(String snapshotId, Map<String, Job<?>> jobsById, Map<String, List<Task>> tasksByJobId,
List<Job<?>> allJobs, List<Task> allTasks, List<Pair<Job<?>, List<Task>>> allJobsAndTasks,
private LegacyJobSnapshot(String snapshotId, Map<String, Job<?>> jobsById, Map<String, Map<String, Task>> tasksByJobId,
List<Job<?>> allJobs, List<Task> allTasks, List<Pair<Job<?>, Map<String, Task>>> allJobsAndTasks,
Map<String, Task> taskById) {
super(snapshotId);
this.jobsById = jobsById;
Expand Down Expand Up @@ -104,11 +104,13 @@ public List<Task> getTasks() {
return allTasks;
}

public List<Task> getTasks(String jobId) {
return tasksByJobId.getOrDefault(jobId, Collections.emptyList());
@Override
public Map<String, Task> getTasks(String jobId) {
return tasksByJobId.getOrDefault(jobId, Collections.emptyMap());
}

public List<Pair<Job<?>, List<Task>>> getJobsAndTasks() {
@Override
public List<Pair<Job<?>, Map<String, Task>>> getJobsAndTasks() {
return allJobsAndTasks;
}

Expand Down Expand Up @@ -166,7 +168,7 @@ public String toSummaryString() {
public String toString() {
StringBuilder sb = new StringBuilder("JobSnapshot2{snapshotId=").append(snapshotId).append(", jobs=");
jobsById.forEach((id, job) -> {
List<Task> tasks = tasksByJobId.get(id);
Map<String, Task> tasks = tasksByJobId.get(id);
int tasksCount = tasks == null ? 0 : tasks.size();
sb.append(id).append('=').append(tasksCount).append(',');
});
Expand All @@ -184,7 +186,7 @@ private String computeSignature() {
public static class Builder {
private final String snapshotId;
private final Map<String, Job<?>> jobsById;
private final Map<String, List<Task>> tasksByJobId;
private final Map<String, Map<String, Task>> tasksByJobId;

private Builder(String snapshotId) {
this.snapshotId = snapshotId;
Expand All @@ -196,24 +198,22 @@ private Builder(LegacyJobSnapshot from) {
this(from.snapshotId, from.jobsById, from.tasksByJobId);
}

private Builder(String snapshotId, Map<String, Job<?>> jobsById, Map<String, List<Task>> tasksByJobId) {
private Builder(String snapshotId, Map<String, Job<?>> jobsById, Map<String, Map<String, Task>> tasksByJobId) {
this.snapshotId = snapshotId;
this.jobsById = new HashMap<>(jobsById);
HashMap<String, List<Task>> copy = new HashMap<>();
tasksByJobId.forEach((jobId, tasks) -> copy.put(jobId, new ArrayList<>(tasks)));
HashMap<String, Map<String, Task>> copy = new HashMap<>();
tasksByJobId.forEach((jobId, tasks) -> copy.put(jobId, new HashMap<>(tasks)));
this.tasksByJobId = copy;
}

public LegacyJobSnapshot build() {
List<Task> allTasks = new ArrayList<>();
Map<String, List<Task>> immutableTasksByJobId = new HashMap<>();
Map<String, Map<String, Task>> immutableTasksByJobId = new HashMap<>();
Map<String, Task> taskById = new HashMap<>();
this.tasksByJobId.forEach((jobId, tasks) -> {
allTasks.addAll(tasks);
immutableTasksByJobId.put(jobId, unmodifiableList(tasks));
for (Task task : tasks) {
taskById.put(task.getId(), task);
}
allTasks.addAll(tasks.values());
immutableTasksByJobId.put(jobId, unmodifiableMap(tasks));
taskById.putAll(tasks);
});

return new LegacyJobSnapshot(
Expand Down Expand Up @@ -245,8 +245,9 @@ public Builder removeTask(Task task, boolean movedFromAnotherJob) {
task.getJobId();
Preconditions.checkArgument(StringExt.isNotEmpty(jobIdIndexToUpdate));

if (tasksByJobId.containsKey(jobIdIndexToUpdate)) {
tasksByJobId.get(jobIdIndexToUpdate).removeIf(t -> t.getId().equals(task.getId()));
Map<String, Task> tasks = tasksByJobId.get(jobIdIndexToUpdate);
if (tasks != null) {
tasks.remove(task.getId());
}

return this;
Expand All @@ -256,23 +257,26 @@ public Builder addOrUpdateTask(Task task, boolean movedFromAnotherJob) {
if (movedFromAnotherJob) {
removeTask(task, true);
}
tasksByJobId.putIfAbsent(task.getJobId(), new ArrayList<>());
List<Task> jobTasks = tasksByJobId.get(task.getJobId());
jobTasks.removeIf(t -> t.getId().equals(task.getId()));
jobTasks.add(task);
Map<String, Task> jobTasks = tasksByJobId.get(task.getJobId());
if (jobTasks == null) {
jobTasks = new HashMap<>();
tasksByJobId.put(task.getJobId(), jobTasks);
}
jobTasks.put(task.getId(), task);

return this;
}

private static List<Pair<Job<?>, List<Task>>> buildAllJobsAndTasksList(Map<String, Job<?>> jobsById, Map<String, List<Task>> tasksByJobId) {
List<Pair<Job<?>, List<Task>>> result = new ArrayList<>();
private static List<Pair<Job<?>, Map<String, Task>>> buildAllJobsAndTasksList(Map<String, Job<?>> jobsById,
Map<String, Map<String, Task>> tasksByJobId) {
List<Pair<Job<?>, Map<String, Task>>> result = new ArrayList<>();

jobsById.values().forEach(job -> {
List<Task> tasks = tasksByJobId.get(job.getId());
Map<String, Task> tasks = tasksByJobId.get(job.getId());
if (CollectionsExt.isNullOrEmpty(tasks)) {
result.add(Pair.of(job, Collections.emptyList()));
result.add(Pair.of(job, Collections.emptyMap()));
} else {
result.add(Pair.of(job, unmodifiableList(tasks)));
result.add(Pair.of(job, unmodifiableMap(tasks)));
}
});

Expand Down
Loading

0 comments on commit e119187

Please sign in to comment.