diff --git a/k8s-scheduler/src/main/java/com/vmware/dcm/PodEventsToDatabase.java b/k8s-scheduler/src/main/java/com/vmware/dcm/PodEventsToDatabase.java index b807c0c6..6bc6555e 100644 --- a/k8s-scheduler/src/main/java/com/vmware/dcm/PodEventsToDatabase.java +++ b/k8s-scheduler/src/main/java/com/vmware/dcm/PodEventsToDatabase.java @@ -29,6 +29,7 @@ import io.fabric8.kubernetes.api.model.Quantity; import io.fabric8.kubernetes.api.model.ResourceRequirements; import io.fabric8.kubernetes.api.model.Toleration; +import io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.PublishSubject; import org.jooq.DSLContext; @@ -72,6 +73,7 @@ class PodEventsToDatabase { .build(); private final AtomicLong expressionIds = new AtomicLong(); private final PublishSubject eventStream = PublishSubject.create(); + private enum Operators { In, Exists, @@ -81,27 +83,28 @@ private enum Operators { PodEventsToDatabase(final IConnectionPool dbConnectionPool) { this.dbConnectionPool = dbConnectionPool; - eventStream.subscribeOn(Schedulers.single()) - .buffer(BATCH_INTERVAL_IN_MS, TimeUnit.MILLISECONDS, BATCH_COUNT) - .subscribe(podEvents -> { - if (podEvents.isEmpty()) { - return; - } - final List queries = new ArrayList<>(); - for (final BatchedTask task: podEvents) { - queries.addAll(task.queries()); - } - final long now = System.nanoTime(); - dbConnectionPool.getConnectionToDb().batch(queries).execute(); - LOG.info("Inserted {} queries from a batch of {} events in time {}", queries.size(), podEvents.size(), - System.nanoTime() - now); - for (final BatchedTask task: podEvents) { - task.future().set(true); - } - }); + final Disposable subscribe = eventStream.subscribeOn(Schedulers.single()) + .buffer(BATCH_INTERVAL_IN_MS, TimeUnit.MILLISECONDS, BATCH_COUNT) + .subscribe(podEvents -> { + if (podEvents.isEmpty()) { + return; + } + final List queries = new ArrayList<>(); + for (final BatchedTask task : podEvents) { + queries.addAll(task.queries()); + } + final long now = System.nanoTime(); + dbConnectionPool.getConnectionToDb().batch(queries).execute(); + LOG.info("Inserted {} queries from a batch of {} events in time {}", queries.size(), podEvents.size(), + System.nanoTime() - now); + for (final BatchedTask task : podEvents) { + task.future().set(true); + } + }); + LOG.trace("Subscription: {}", subscribe); } - record BatchedTask (List queries, SettableFuture future) {} + record BatchedTask(List queries, SettableFuture future) { } PodEvent handle(final PodEvent event) { final List queries = switch (event.action()) { @@ -308,8 +311,6 @@ static List insertPodRecord(final Pod pod, final DSLContext conn) { p.HAS_POD_ANTI_AFFINITY_REQUIREMENTS, p.HAS_NODE_PORT_REQUIREMENTS, p.HAS_TOPOLOGY_SPREAD_CONSTRAINTS, - p.PRIORITY, - p.SCHEDULER_NAME, p.EQUIVALENCE_CLASS, p.QOS_CLASS, p.RESOURCEVERSION, diff --git a/k8s-scheduler/src/main/java/com/vmware/dcm/Policies.java b/k8s-scheduler/src/main/java/com/vmware/dcm/Policies.java index 7772d8f9..34524807 100644 --- a/k8s-scheduler/src/main/java/com/vmware/dcm/Policies.java +++ b/k8s-scheduler/src/main/java/com/vmware/dcm/Policies.java @@ -244,9 +244,9 @@ static Policy taintsAndTolerations() { "from pods_to_assign " + "join nodes_that_have_tolerations" + " on pods_to_assign.controllable__node_name = nodes_that_have_tolerations.node_name " + - "check pods_to_assign.controllable__node_name IN " + - " (select node_name from pods_that_tolerate_node_taints AS A " + - " where A.pod_uid = pods_to_assign.uid)"; + "check exists(select * from pods_that_tolerate_node_taints as A " + + " where A.pod_uid = pods_to_assign.uid" + + " and A.node_name = pods_to_assign.controllable__node_name) = true"; return new Policy("NodeTaintsPredicate", constraint); }