Skip to content
This repository has been archived by the owner on May 10, 2024. It is now read-only.

Commit

Permalink
k8s-scheduler: postfixes after rebase with master
Browse files Browse the repository at this point in the history
Signed-off-by: Lalith Suresh <[email protected]>
  • Loading branch information
lalithsuresh committed Feb 2, 2022
1 parent 3075716 commit bedb6aa
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 24 deletions.
43 changes: 22 additions & 21 deletions k8s-scheduler/src/main/java/com/vmware/dcm/PodEventsToDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
import io.fabric8.kubernetes.api.model.Toleration;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;
import org.jooq.DSLContext;
Expand Down Expand Up @@ -72,6 +73,7 @@ class PodEventsToDatabase {
.build();
private final AtomicLong expressionIds = new AtomicLong();
private final PublishSubject<BatchedTask> eventStream = PublishSubject.create();

private enum Operators {
In,
Exists,
Expand All @@ -81,27 +83,28 @@ private enum Operators {

PodEventsToDatabase(final IConnectionPool dbConnectionPool) {
this.dbConnectionPool = dbConnectionPool;
eventStream.subscribeOn(Schedulers.single())
.buffer(BATCH_INTERVAL_IN_MS, TimeUnit.MILLISECONDS, BATCH_COUNT)
.subscribe(podEvents -> {
if (podEvents.isEmpty()) {
return;
}
final List<Query> queries = new ArrayList<>();
for (final BatchedTask task: podEvents) {
queries.addAll(task.queries());
}
final long now = System.nanoTime();
dbConnectionPool.getConnectionToDb().batch(queries).execute();
LOG.info("Inserted {} queries from a batch of {} events in time {}", queries.size(), podEvents.size(),
System.nanoTime() - now);
for (final BatchedTask task: podEvents) {
task.future().set(true);
}
});
final Disposable subscribe = eventStream.subscribeOn(Schedulers.single())
.buffer(BATCH_INTERVAL_IN_MS, TimeUnit.MILLISECONDS, BATCH_COUNT)
.subscribe(podEvents -> {
if (podEvents.isEmpty()) {
return;
}
final List<Query> queries = new ArrayList<>();
for (final BatchedTask task : podEvents) {
queries.addAll(task.queries());
}
final long now = System.nanoTime();
dbConnectionPool.getConnectionToDb().batch(queries).execute();
LOG.info("Inserted {} queries from a batch of {} events in time {}", queries.size(), podEvents.size(),
System.nanoTime() - now);
for (final BatchedTask task : podEvents) {
task.future().set(true);
}
});
LOG.trace("Subscription: {}", subscribe);
}

record BatchedTask (List<Query> queries, SettableFuture<Boolean> future) {}
record BatchedTask(List<Query> queries, SettableFuture<Boolean> future) { }

PodEvent handle(final PodEvent event) {
final List<Query> queries = switch (event.action()) {
Expand Down Expand Up @@ -308,8 +311,6 @@ static List<Query> insertPodRecord(final Pod pod, final DSLContext conn) {
p.HAS_POD_ANTI_AFFINITY_REQUIREMENTS,
p.HAS_NODE_PORT_REQUIREMENTS,
p.HAS_TOPOLOGY_SPREAD_CONSTRAINTS,
p.PRIORITY,
p.SCHEDULER_NAME,
p.EQUIVALENCE_CLASS,
p.QOS_CLASS,
p.RESOURCEVERSION,
Expand Down
6 changes: 3 additions & 3 deletions k8s-scheduler/src/main/java/com/vmware/dcm/Policies.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,9 @@ static Policy taintsAndTolerations() {
"from pods_to_assign " +
"join nodes_that_have_tolerations" +
" on pods_to_assign.controllable__node_name = nodes_that_have_tolerations.node_name " +
"check pods_to_assign.controllable__node_name IN " +
" (select node_name from pods_that_tolerate_node_taints AS A " +
" where A.pod_uid = pods_to_assign.uid)";
"check exists(select * from pods_that_tolerate_node_taints as A " +
" where A.pod_uid = pods_to_assign.uid" +
" and A.node_name = pods_to_assign.controllable__node_name) = true";
return new Policy("NodeTaintsPredicate", constraint);
}

Expand Down

0 comments on commit bedb6aa

Please sign in to comment.