From 405e62b08b24f890ce6fc036d3168ebbd9c2b663 Mon Sep 17 00:00:00 2001 From: Aleksey Plekhanov Date: Thu, 30 Jan 2025 09:21:14 +0300 Subject: [PATCH] IGNITE-24323 SQL Calcite: Add query blocking tasks executor (allows to execute SQL inside UDF) - Fixes #11833. Signed-off-by: Aleksey Plekhanov --- .../query/calcite/CalciteQueryProcessor.java | 22 +- .../query/calcite/exec/rel/AbstractNode.java | 9 - .../exec/task/AbstractQueryTaskExecutor.java | 89 +++++ .../calcite/exec/task/QueryAwareTask.java | 26 ++ .../exec/task/QueryBlockingTaskExecutor.java | 111 ++++++ .../query/calcite/exec/task/QueryKey.java | 63 +++ .../calcite/exec/task/QueryTasksQueue.java | 369 ++++++++++++++++++ .../StripedQueryTaskExecutor.java} | 53 +-- .../exec/rel/AbstractExecutionTest.java | 67 +++- .../task/QueryBlockingTaskExecutorTest.java | 118 ++++++ .../exec/task/QueryTasksQueueTest.java | 279 +++++++++++++ .../AbstractBasicIntegrationTest.java | 8 +- ...ryBlockingTaskExecutorIntegrationTest.java | 76 ++++ .../SqlDiagnosticIntegrationTest.java | 4 +- .../UserDefinedFunctionsIntegrationTest.java | 48 +++ .../calcite/planner/AbstractPlannerTest.java | 6 +- .../query/calcite/planner/PlannerTest.java | 4 +- .../testsuites/IgniteCalciteTestSuite.java | 11 +- .../testsuites/IntegrationTestSuite.java | 2 + .../ignite/testsuites/UtilTestSuite.java | 44 +++ 20 files changed, 1309 insertions(+), 100 deletions(-) create mode 100644 modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/AbstractQueryTaskExecutor.java create mode 100644 modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryAwareTask.java create mode 100644 modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutor.java create mode 100644 modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryKey.java create mode 100644 modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueue.java rename modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/{QueryTaskExecutorImpl.java => task/StripedQueryTaskExecutor.java} (61%) create mode 100644 modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutorTest.java create mode 100644 modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueueTest.java create mode 100644 modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/QueryBlockingTaskExecutorIntegrationTest.java create mode 100644 modules/calcite/src/test/java/org/apache/ignite/testsuites/UtilTestSuite.java diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java index 7e9763e161fe7..83f1c7e7418f2 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java @@ -79,10 +79,11 @@ import org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistry; import org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistryImpl; import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutor; -import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutorImpl; import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutService; import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutServiceImpl; import org.apache.ignite.internal.processors.query.calcite.exec.exp.RexExecutorImpl; +import org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor; +import org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.hint.HintsConfig; import org.apache.ignite.internal.processors.query.calcite.message.MessageService; import org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl; @@ -124,6 +125,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.IgniteSystemProperties.getBoolean; import static org.apache.ignite.IgniteSystemProperties.getLong; import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION; @@ -141,6 +143,15 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query defaults = "" + DFLT_IGNITE_CALCITE_PLANNER_TIMEOUT) public static final String IGNITE_CALCITE_PLANNER_TIMEOUT = "IGNITE_CALCITE_PLANNER_TIMEOUT"; + /** + * Use query blocking executor property name. + */ + @SystemProperty(value = "Calcite-based SQL engine. Use query blocking task executor instead of striped task " + + "executor. Query blocking executor allows to run SQL queries inside user-defined functions at the cost of " + + "some performance penalty", defaults = "" + false) + public static final String IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR = + "IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR"; + /** */ public static final FrameworkConfig FRAMEWORK_CONFIG = Frameworks.newConfigBuilder() .executor(new RexExecutorImpl(DataContexts.EMPTY)) @@ -186,8 +197,7 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query private final FrameworkConfig frameworkCfg; /** Query planner timeout. */ - private final long queryPlannerTimeout = getLong(IGNITE_CALCITE_PLANNER_TIMEOUT, - DFLT_IGNITE_CALCITE_PLANNER_TIMEOUT); + private final long qryPlannerTimeout = getLong(IGNITE_CALCITE_PLANNER_TIMEOUT, DFLT_IGNITE_CALCITE_PLANNER_TIMEOUT); /** */ private final QueryPlanCache qryPlanCache; @@ -254,7 +264,9 @@ public CalciteQueryProcessor(GridKernalContext ctx) { qryPlanCache = new QueryPlanCacheImpl(ctx); parserMetrics = new QueryParserMetricsHolder(ctx.metric()); mailboxRegistry = new MailboxRegistryImpl(ctx); - taskExecutor = new QueryTaskExecutorImpl(ctx); + taskExecutor = getBoolean(IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR) + ? new QueryBlockingTaskExecutor(ctx) + : new StripedQueryTaskExecutor(ctx); executionSvc = new ExecutionServiceImpl<>(ctx, ArrayRowHandler.INSTANCE); partSvc = new AffinityServiceImpl(ctx); msgSvc = new MessageServiceImpl(ctx); @@ -666,7 +678,7 @@ private T processQuery( exchangeSvc, (q, ex) -> qryReg.unregister(q.id(), ex), log, - queryPlannerTimeout, + qryPlannerTimeout, timeout ); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java index 5450992ac050c..f27548c0eb7c9 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java @@ -49,9 +49,6 @@ public abstract class AbstractNode implements Node { /** */ protected static final int IO_BATCH_CNT = IgniteSystemProperties.getInteger(IGNITE_CALCITE_EXEC_IO_BATCH_CNT, 4); - /** for debug purpose */ - private volatile Thread thread; - /** * {@link Inbox} node may not have proper context at creation time in case it * creates on first message received from a remote source. This case the context @@ -186,12 +183,6 @@ protected void checkState() throws Exception { throw new QueryCancelledException("The query was timed out."); if (Thread.interrupted()) throw new IgniteInterruptedCheckedException("Thread was interrupted."); - if (!U.assertionsEnabled()) - return; - if (thread == null) - thread = Thread.currentThread(); - else - assert thread == Thread.currentThread(); } /** */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/AbstractQueryTaskExecutor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/AbstractQueryTaskExecutor.java new file mode 100644 index 0000000000000..ffed07350852c --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/AbstractQueryTaskExecutor.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.task; + +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutor; +import org.apache.ignite.internal.processors.query.calcite.util.AbstractService; +import org.apache.ignite.internal.processors.security.SecurityContext; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Abstract query task executor. + */ +public abstract class AbstractQueryTaskExecutor extends AbstractService implements QueryTaskExecutor, Thread.UncaughtExceptionHandler { + /** */ + public static final String THREAD_POOL_NAME = "CalciteQueryExecutor"; + + /** */ + protected final GridKernalContext ctx; + + /** */ + protected Thread.UncaughtExceptionHandler eHnd; + + /** */ + protected AbstractQueryTaskExecutor(GridKernalContext ctx) { + super(ctx); + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Override public void uncaughtException(Thread t, Throwable e) { + if (eHnd != null) + eHnd.uncaughtException(t, e); + } + + /** {@inheritDoc} */ + @Override public void onStart(GridKernalContext ctx) { + eHnd = ctx.uncaughtExceptionHandler(); + + super.onStart(ctx); + } + + /** */ + protected class SecurityAwareTask implements Runnable { + /** */ + private final SecurityContext secCtx; + + /** */ + private final Runnable qryTask; + + /** */ + public SecurityAwareTask(SecurityContext secCtx, Runnable qryTask) { + this.secCtx = secCtx; + this.qryTask = qryTask; + } + + /** {@inheritDoc} */ + @Override public void run() { + try (AutoCloseable ignored = ctx.security().withContext(secCtx)) { + qryTask.run(); + } + catch (Throwable e) { + U.warn(log, "Uncaught exception", e); + + /* + * No exceptions are rethrown here to preserve the current thread from being destroyed, + * because other queries may be pinned to the current thread id. + * However, unrecoverable errors must be processed by FailureHandler. + */ + uncaughtException(Thread.currentThread(), e); + } + } + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryAwareTask.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryAwareTask.java new file mode 100644 index 0000000000000..b04f6706306fa --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryAwareTask.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.task; + +/** + * Query aware task. + */ +interface QueryAwareTask extends Runnable { + /** */ + public QueryKey queryKey(); +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutor.java new file mode 100644 index 0000000000000..c19b3dad31a85 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutor.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.task; + +import java.util.UUID; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.processors.security.SecurityContext; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.thread.IgniteThreadPoolExecutor; + +import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName; +import static org.apache.ignite.internal.processors.pool.PoolProcessor.THREAD_POOLS; + +/** + * Query task executor based on queue with query blocking. + */ +public class QueryBlockingTaskExecutor extends AbstractQueryTaskExecutor { + /** */ + private final QueryTasksQueue tasksQueue = new QueryTasksQueue(); + + /** */ + private IgniteThreadPoolExecutor executor; + + /** */ + public QueryBlockingTaskExecutor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public void execute(UUID qryId, long fragmentId, Runnable qryTask) { + SecurityContext secCtx = ctx.security().securityContext(); + + QueryKey qryKey = new QueryKey(qryId, fragmentId); + + executor.execute(new QueryAndSecurityAwareTask(qryKey, secCtx, qryTask)); + } + + /** {@inheritDoc} */ + @Override public void onStart(GridKernalContext ctx) { + super.onStart(ctx); + + executor = new IgniteThreadPoolExecutor( + "calciteQry", + ctx.igniteInstanceName(), + ctx.config().getQueryThreadPoolSize(), + ctx.config().getQueryThreadPoolSize(), + IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME, + tasksQueue.blockingQueue(), + GridIoPolicy.CALLER_THREAD, + eHnd + ) { + @Override protected void afterExecute(Runnable r, Throwable t) { + tasksQueue.unblockQuery(((QueryAwareTask)r).queryKey()); + + super.afterExecute(r, t); + } + }; + + // Prestart threads to ensure that all threads always use queue to poll tasks (without this call worker can + // get its first task directly from 'execute' method, bypassing tasks queue). + executor.prestartAllCoreThreads(); + + executor.registerMetrics(ctx.metric().registry(metricName(THREAD_POOLS, THREAD_POOL_NAME))); + } + + /** {@inheritDoc} */ + @Override public void tearDown() { + U.shutdownNow(getClass(), executor, log); + } + + /** */ + private class QueryAndSecurityAwareTask extends SecurityAwareTask implements QueryAwareTask { + /** */ + private final QueryKey qryKey; + + /** */ + public QueryAndSecurityAwareTask(QueryKey qryKey, SecurityContext secCtx, Runnable qryTask) { + super(secCtx, qryTask); + + this.qryKey = qryKey; + } + + /** {@inheritDoc} */ + @Override public QueryKey queryKey() { + return qryKey; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryAndSecurityAwareTask.class, this); + } + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryKey.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryKey.java new file mode 100644 index 0000000000000..411b0fdc52a07 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryKey.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.task; + +import java.util.Objects; +import java.util.UUID; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Query key. + */ +class QueryKey { + /** */ + private final UUID qryId; + + /** */ + private final long fragmentId; + + /** */ + QueryKey(UUID qryId, long fragmentId) { + this.qryId = qryId; + this.fragmentId = fragmentId; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryKey key = (QueryKey)o; + + return fragmentId == key.fragmentId && Objects.equals(qryId, key.qryId); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return U.safeAbs(31 * (31 + (qryId != null ? qryId.hashCode() : 0)) + Long.hashCode(fragmentId)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryKey.class, this); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueue.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueue.java new file mode 100644 index 0000000000000..6bdc044076a1b --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueue.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.task; + +import java.lang.reflect.Array; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * A tasks queue with filtering (based on linked nodes). + */ +class QueryTasksQueue { + /** + * Linked list node class. + */ + private static class Node { + /** */ + QueryAwareTask item; + + /** Next node in chain. */ + Node next; + + /** */ + Node(QueryAwareTask item) { + this.item = item; + } + } + + /** Current number of elements */ + private final AtomicInteger cnt = new AtomicInteger(); + + /** Head of linked list. */ + Node head; + + /** Tail of linked list. */ + private Node last; + + /** */ + private final ReentrantLock lock = new ReentrantLock(); + + /** Wait condition for waiting takes. */ + private final Condition notEmpty = lock.newCondition(); + + /** Set of blocked (currently running) queries. */ + private final Set blockedQrys = new HashSet<>(); + + /** + * Creates a {@code LinkedBlockingQueue}. + */ + QueryTasksQueue() { + last = head = new Node(null); + } + + /** Queue size. */ + public int size() { + return cnt.get(); + } + + /** Add a task to the queue. */ + public void addTask(QueryAwareTask task) { + lock.lock(); + + try { + assert last.next == null : "Unexpected last.next: " + last.next; + + last = last.next = new Node(task); + + cnt.getAndIncrement(); + + notEmpty.signal(); + } + finally { + lock.unlock(); + } + } + + /** Poll task and block query. */ + public QueryAwareTask pollTaskAndBlockQuery(long timeout, TimeUnit unit) throws InterruptedException { + lock.lockInterruptibly(); + + try { + QueryAwareTask res; + + long nanos = unit.toNanos(timeout); + + while (cnt.get() == 0 || (res = dequeue()) == null) { + if (nanos <= 0L) + return null; + + nanos = notEmpty.awaitNanos(nanos); + } + + boolean added = blockedQrys.add(res.queryKey()); + + assert added; + + return res; + } + finally { + lock.unlock(); + } + } + + /** + * Removes a first non-blocked task from the head of the queue. + * + * @return The task. + */ + private QueryAwareTask dequeue() { + assert lock.isHeldByCurrentThread(); + assert head.item == null : "Unexpected head.item: " + head.item; + + for (Node pred = head, cur = pred.next; cur != null; pred = cur, cur = cur.next) { + if (!blockedQrys.contains(cur.item.queryKey())) { // Skip tasks for blocked queries. + QueryAwareTask res = cur.item; + + unlink(pred, cur); + + if (cnt.decrementAndGet() > 0) + notEmpty.signal(); + + return res; + } + } + + return null; + } + + /** Unblock query. */ + public void unblockQuery(QueryKey qryKey) { + lock.lock(); + + try { + boolean removed = blockedQrys.remove(qryKey); + + assert removed; + + if (cnt.get() > 0) + notEmpty.signal(); + } + finally { + lock.unlock(); + } + } + + /** Remove task */ + public boolean removeTask(QueryAwareTask task) { + if (task == null) + return false; + + lock.lock(); + + try { + for (Node pred = head, cur = pred.next; cur != null; pred = cur, cur = cur.next) { + if (task.equals(cur.item)) { + unlink(pred, cur); + + cnt.getAndDecrement(); + + return true; + } + } + + return false; + } + finally { + lock.unlock(); + } + } + + /** + * Unlinks interior Node cur with predecessor pred. + */ + private void unlink(Node pred, Node cur) { + cur.item = null; + pred.next = cur.next; + + if (last == cur) + last = pred; + } + + /** */ + public T[] toArray(T[] a) { + lock.lock(); + + try { + int size = cnt.get(); + + if (a.length < size) + a = (T[])Array.newInstance(a.getClass().getComponentType(), size); + + int k = 0; + + for (Node cur = head.next; cur != null; cur = cur.next) + a[k++] = (T)cur.item; + + while (a.length > k) + a[k++] = null; + + return a; + } + finally { + lock.unlock(); + } + } + + /** */ + public int drainTo(Collection c, int maxElements) { + Objects.requireNonNull(c); + + if (maxElements <= 0) + return 0; + + lock.lock(); + + try { + int n = Math.min(maxElements, cnt.get()); + int i = 0; + + for (Node cur = head.next; i < n && cur != null; cur = cur.next, i++) { + c.add(cur.item); + + unlink(head, cur); + + cnt.getAndDecrement(); + } + + return i; + } + finally { + lock.unlock(); + } + } + + /** + * @return {@code BlockingQueue} on top of {@code QueryTasksQueue}. This blocking queue implements only methods + * required by {@code ThreadPoolExecutor}. + */ + public BlockingQueue blockingQueue() { + return new BlockingQueue<>() { + @Override public boolean add(@NotNull Runnable runnable) { + addTask((QueryAwareTask)runnable); + + return true; + } + + @Override public boolean offer(@NotNull Runnable runnable) { + return add(runnable); + } + + @Override public boolean offer(Runnable runnable, long timeout, @NotNull TimeUnit unit) { + return add(runnable); + } + + @Override public void put(@NotNull Runnable runnable) { + add(runnable); + } + + @Override public int remainingCapacity() { + return Integer.MAX_VALUE; + } + + @Override public boolean remove(Object o) { + return removeTask((QueryAwareTask)o); + } + + @Override public @NotNull Runnable take() throws InterruptedException { + return pollTaskAndBlockQuery(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } + + @Override public @Nullable Runnable poll(long timeout, @NotNull TimeUnit unit) throws InterruptedException { + return pollTaskAndBlockQuery(0, TimeUnit.NANOSECONDS); + } + + @Override public Runnable remove() { + throw new UnsupportedOperationException(); + } + + @Override public Runnable poll() { + throw new UnsupportedOperationException(); + } + + @Override public int size() { + return QueryTasksQueue.this.size(); + } + + @Override public boolean isEmpty() { + return size() == 0; + } + + @Override public @NotNull Object[] toArray() { + return toArray(new Object[size()]); + } + + @Override public @NotNull T[] toArray(@NotNull T[] a) { + return QueryTasksQueue.this.toArray(a); + } + + @Override public int drainTo(@NotNull Collection c) { + return drainTo(c, Integer.MAX_VALUE); + } + + @Override public int drainTo(@NotNull Collection c, int maxElements) { + return QueryTasksQueue.this.drainTo(c, maxElements); + } + + @Override public boolean contains(Object o) { + throw new UnsupportedOperationException(); + } + + @Override public Runnable element() { + throw new UnsupportedOperationException(); + } + + @Override public Runnable peek() { + throw new UnsupportedOperationException(); + } + + @Override public @NotNull Iterator iterator() { + throw new UnsupportedOperationException(); + } + + @Override public boolean containsAll(@NotNull Collection c) { + throw new UnsupportedOperationException(); + } + + @Override public boolean addAll(@NotNull Collection c) { + throw new UnsupportedOperationException(); + } + + @Override public boolean removeAll(@NotNull Collection c) { + throw new UnsupportedOperationException(); + } + + @Override public boolean retainAll(@NotNull Collection c) { + throw new UnsupportedOperationException(); + } + + @Override public void clear() { + throw new UnsupportedOperationException(); + } + }; + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/StripedQueryTaskExecutor.java similarity index 61% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/StripedQueryTaskExecutor.java index b489d05192cd1..213c7cbdda033 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/StripedQueryTaskExecutor.java @@ -15,11 +15,10 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.calcite.exec; +package org.apache.ignite.internal.processors.query.calcite.exec.task; import java.util.UUID; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.processors.query.calcite.util.AbstractService; import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; @@ -28,25 +27,15 @@ import static org.apache.ignite.internal.processors.pool.PoolProcessor.THREAD_POOLS; /** - * Query task executor. + * Query task executor based on striped pool. */ -public class QueryTaskExecutorImpl extends AbstractService implements QueryTaskExecutor, Thread.UncaughtExceptionHandler { - /** */ - public static final String THREAD_POOL_NAME = "CalciteQueryExecutor"; - - /** */ - private final GridKernalContext ctx; - +public class StripedQueryTaskExecutor extends AbstractQueryTaskExecutor { /** */ private IgniteStripedThreadPoolExecutor stripedThreadPoolExecutor; /** */ - private Thread.UncaughtExceptionHandler eHnd; - - /** */ - public QueryTaskExecutorImpl(GridKernalContext ctx) { + public StripedQueryTaskExecutor(GridKernalContext ctx) { super(ctx); - this.ctx = ctx; } /** @@ -56,40 +45,16 @@ public void stripedThreadPoolExecutor(IgniteStripedThreadPoolExecutor stripedThr this.stripedThreadPoolExecutor = stripedThreadPoolExecutor; } - /** - * @param eHnd Uncaught exception handler. - */ - public void exceptionHandler(Thread.UncaughtExceptionHandler eHnd) { - this.eHnd = eHnd; - } - /** {@inheritDoc} */ @Override public void execute(UUID qryId, long fragmentId, Runnable qryTask) { SecurityContext secCtx = ctx.security().securityContext(); - stripedThreadPoolExecutor.execute( - () -> { - try (AutoCloseable ignored = ctx.security().withContext(secCtx)) { - qryTask.run(); - } - catch (Throwable e) { - U.warn(log, "Uncaught exception", e); - - /* - * No exceptions are rethrown here to preserve the current thread from being destroyed, - * because other queries may be pinned to the current thread id. - * However, unrecoverable errors must be processed by FailureHandler. - */ - uncaughtException(Thread.currentThread(), e); - } - }, - hash(qryId, fragmentId) - ); + stripedThreadPoolExecutor.execute(new SecurityAwareTask(secCtx, qryTask), hash(qryId, fragmentId)); } /** {@inheritDoc} */ @Override public void onStart(GridKernalContext ctx) { - exceptionHandler(ctx.uncaughtExceptionHandler()); + super.onStart(ctx); IgniteStripedThreadPoolExecutor executor = new IgniteStripedThreadPoolExecutor( ctx.config().getQueryThreadPoolSize(), @@ -110,12 +75,6 @@ public void exceptionHandler(Thread.UncaughtExceptionHandler eHnd) { U.shutdownNow(getClass(), stripedThreadPoolExecutor, log); } - /** {@inheritDoc} */ - @Override public void uncaughtException(Thread t, Throwable e) { - if (eHnd != null) - eHnd.uncaughtException(t, e); - } - /** */ private static int hash(UUID qryId, long fragmentId) { // inlined Objects.hash(...) diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java index 6e416d64295ec..03275e190368a 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java @@ -33,7 +33,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; - import com.google.common.collect.ImmutableMap; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; @@ -48,8 +47,10 @@ import org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistry; import org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistryImpl; import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutor; -import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutorImpl; import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutServiceImpl; +import org.apache.ignite.internal.processors.query.calcite.exec.task.AbstractQueryTaskExecutor; +import org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor; +import org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTracker; import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker; import org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage; @@ -78,16 +79,16 @@ @RunWith(Parameterized.class) public class AbstractExecutionTest extends GridCommonAbstractTest { /** Last parameter number. */ - protected static final int LAST_PARAM_NUM = 0; + protected static final int LAST_PARAM_NUM = 1; /** Params string. */ - protected static final String PARAMS_STRING = "Execution strategy = {0}"; + protected static final String PARAMS_STRING = "Task executor = {0}, Execution strategy = {1}"; /** */ private Throwable lastE; /** */ - private Map taskExecutors; + private Map taskExecutors; /** */ private Map exchangeServices; @@ -101,6 +102,15 @@ public class AbstractExecutionTest extends GridCommonAbstractTest { /** */ protected int nodesCnt = 3; + /** */ + enum TaskExecutorType { + /** */ + STRIPED, + + /** */ + QUERY_BLOCKING + } + /** */ enum ExecutionStrategy { /** */ @@ -141,9 +151,19 @@ public T2 nextTask(Deque> tasks) { /** */ @Parameterized.Parameters(name = PARAMS_STRING) public static List parameters() { - return Stream.of(ExecutionStrategy.values()).map(s -> new Object[]{s}).collect(Collectors.toList()); + List params = Stream.of(ExecutionStrategy.values()) + .map(s -> new Object[] {TaskExecutorType.STRIPED, s}) + .collect(Collectors.toList()); + + params.add(new Object[] {TaskExecutorType.QUERY_BLOCKING, ExecutionStrategy.FIFO}); + + return params; } + /** Task executor. */ + @Parameterized.Parameter + public TaskExecutorType taskExecutorType; + /** Execution direction. */ @Parameterized.Parameter(LAST_PARAM_NUM) public ExecutionStrategy execStgy; @@ -167,16 +187,29 @@ public void setup() throws Exception { kernal.add(new NoOpIgniteSecurityProcessor(kernal)); kernal.add(new GridCacheProcessor(kernal)); - QueryTaskExecutorImpl taskExecutor = new QueryTaskExecutorImpl(kernal); - taskExecutor.stripedThreadPoolExecutor(new IgniteTestStripedThreadPoolExecutor( - execStgy, - kernal.config().getQueryThreadPoolSize(), - kernal.igniteInstanceName(), - "calciteQry", - this::handle, - true, - DFLT_THREAD_KEEP_ALIVE_TIME - )); + AbstractQueryTaskExecutor taskExecutor; + + if (taskExecutorType == TaskExecutorType.STRIPED) { + StripedQueryTaskExecutor executor = new StripedQueryTaskExecutor(kernal); + + executor.stripedThreadPoolExecutor(new IgniteTestStripedThreadPoolExecutor( + execStgy, + kernal.config().getQueryThreadPoolSize(), + kernal.igniteInstanceName(), + "calciteQry", + this::handle, + true, + DFLT_THREAD_KEEP_ALIVE_TIME + )); + + taskExecutor = executor; + } + else { + taskExecutor = new QueryBlockingTaskExecutor(kernal); + + taskExecutor.onStart(kernal); + } + taskExecutors.put(uuid, taskExecutor); MailboxRegistryImpl mailboxRegistry = new MailboxRegistryImpl(kernal); @@ -263,7 +296,7 @@ public IgniteTestStripedThreadPoolExecutor( /** */ @After public void tearDown() { - taskExecutors.values().forEach(QueryTaskExecutorImpl::tearDown); + taskExecutors.values().forEach(AbstractQueryTaskExecutor::tearDown); if (lastE != null) throw new AssertionError(lastE); diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutorTest.java new file mode 100644 index 0000000000000..9f4a7a6c8a1fe --- /dev/null +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutorTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.task; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.security.NoOpIgniteSecurityProcessor; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.GridTestKernalContext; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +/** */ +public class QueryBlockingTaskExecutorTest extends GridCommonAbstractTest { + /** Tests that tasks for different queries can be executed concurrently. */ + @Test + public void testConcurrentTasks() throws Exception { + GridTestKernalContext ctx = newContext(new IgniteConfiguration().setQueryThreadPoolSize(10)); + ctx.add(new NoOpIgniteSecurityProcessor(ctx)); + QueryBlockingTaskExecutor executor = new QueryBlockingTaskExecutor(ctx); + executor.onStart(ctx); + + CountDownLatch latch = new CountDownLatch(1); + + AtomicInteger cnt = new AtomicInteger(); + AtomicBoolean fail = new AtomicBoolean(); + + UUID qryId1 = UUID.randomUUID(); + UUID qryId2 = UUID.randomUUID(); + + Runnable task = () -> { + cnt.incrementAndGet(); + + try { + if (!latch.await(1_000L, TimeUnit.MILLISECONDS)) + fail.set(true); + } + catch (InterruptedException e) { + fail.set(true); + } + }; + + executor.execute(qryId1, 0, task); + executor.execute(qryId1, 1, task); + executor.execute(qryId2, 0, task); + + assertTrue("Failed to wait for tasks completion", + GridTestUtils.waitForCondition(() -> cnt.get() == 3, 1_000L)); + + latch.countDown(); + + assertFalse("Failed to wait for latch", fail.get()); + } + + /** Tests that tasks for the same query can't be executed concurrently. */ + @Test + public void testSameQueryTasks() throws Exception { + GridTestKernalContext ctx = newContext(new IgniteConfiguration().setQueryThreadPoolSize(10)); + ctx.add(new NoOpIgniteSecurityProcessor(ctx)); + QueryBlockingTaskExecutor executor = new QueryBlockingTaskExecutor(ctx); + executor.onStart(ctx); + + int qryCnt = 20; + int taskCnt = 10_000; + + UUID[] qryIds = new UUID[qryCnt]; + AtomicBoolean[] blocked = new AtomicBoolean[qryCnt]; + + for (int i = 0; i < qryCnt; i += 2) + qryIds[i] = qryIds[i + 1] = UUID.randomUUID(); + + for (int i = 0; i < qryCnt; i++) + blocked[i] = new AtomicBoolean(); + + AtomicInteger cnt = new AtomicInteger(); + AtomicBoolean fail = new AtomicBoolean(); + + for (int i = 0; i < taskCnt; i++) { + int qryIdx = ThreadLocalRandom.current().nextInt(qryCnt); + + executor.execute(qryIds[qryIdx], qryIdx, () -> { + if (!blocked[qryIdx].compareAndSet(false, true)) + fail.set(true); + + doSleep(ThreadLocalRandom.current().nextLong(10L)); + + blocked[qryIdx].set(false); + + cnt.incrementAndGet(); + }); + } + + assertTrue("Failed to wait for tasks completion", + GridTestUtils.waitForCondition(() -> cnt.get() == taskCnt, getTestTimeout())); + + assertFalse("Tasks for the same query executed concurrently", fail.get()); + } +} diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueueTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueueTest.java new file mode 100644 index 0000000000000..134bf1d15e81c --- /dev/null +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueueTest.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.task; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +/** Tests QueryTasksQueue data structure. */ +public class QueryTasksQueueTest extends GridCommonAbstractTest { + /** */ + @Test + public void testQueryBlockingUnblocking() throws Exception { + long waitTimeout = 10_000L; + + QueryTasksQueue queue = new QueryTasksQueue(); + UUID qryId1 = UUID.randomUUID(); + UUID qryId2 = UUID.randomUUID(); + QueryKey qryKey1 = new QueryKey(qryId1, 0); + QueryKey qryKey2 = new QueryKey(qryId2, 0); + QueryKey qryKey3 = new QueryKey(qryId1, 1); + + queue.addTask(new TestQueryAwareTask(qryKey1)); + queue.addTask(new TestQueryAwareTask(qryKey1)); + queue.addTask(new TestQueryAwareTask(qryKey2)); + queue.addTask(new TestQueryAwareTask(qryKey2)); + queue.addTask(new TestQueryAwareTask(qryKey1)); + queue.addTask(new TestQueryAwareTask(qryKey3)); + + QueryAwareTask task = queue.pollTaskAndBlockQuery(waitTimeout, TimeUnit.MILLISECONDS); + assertEquals(qryKey1, task.queryKey()); + + task = queue.pollTaskAndBlockQuery(waitTimeout, TimeUnit.MILLISECONDS); + assertEquals(qryKey2, task.queryKey()); + + task = queue.pollTaskAndBlockQuery(waitTimeout, TimeUnit.MILLISECONDS); + assertEquals(qryKey3, task.queryKey()); + + // Test threads parking and unparking. + QueryAwareTask[] res = new TestQueryAwareTask[1]; + + Runnable pollAndStoreResult = () -> { + try { + res[0] = queue.pollTaskAndBlockQuery(waitTimeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + }; + + // Unparking on unblock query. + Thread thread1 = new Thread(pollAndStoreResult); + + thread1.start(); + + assertTrue(GridTestUtils.waitForCondition(() -> thread1.getState() == Thread.State.TIMED_WAITING, waitTimeout)); + + queue.unblockQuery(qryKey2); + + thread1.join(waitTimeout); + + assertFalse(thread1.isAlive()); + + assertEquals(qryKey2, res[0].queryKey()); + + // Unparking on new task. + queue.unblockQuery(qryKey3); + + Thread thread2 = new Thread(pollAndStoreResult); + + thread2.start(); + + assertTrue(GridTestUtils.waitForCondition(() -> thread2.getState() == Thread.State.TIMED_WAITING, waitTimeout)); + + queue.addTask(new TestQueryAwareTask(qryKey3)); + + thread2.join(waitTimeout); + + assertFalse(thread2.isAlive()); + + assertEquals(qryKey3, res[0].queryKey()); + + // Get next task after unblocking all pending locks. + queue.unblockQuery(qryKey1); + queue.unblockQuery(qryKey2); + queue.unblockQuery(qryKey3); + + task = queue.pollTaskAndBlockQuery(waitTimeout, TimeUnit.MILLISECONDS); + assertEquals(qryKey1, task.queryKey()); + + // Unparking on unblock query second time. + Thread thread3 = new Thread(pollAndStoreResult); + + thread3.start(); + + assertTrue(GridTestUtils.waitForCondition(() -> thread3.getState() == Thread.State.TIMED_WAITING, waitTimeout)); + + queue.unblockQuery(qryKey1); + + thread3.join(waitTimeout); + + assertFalse(thread3.isAlive()); + + assertEquals(qryKey1, res[0].queryKey()); + + assertEquals(0, queue.size()); + } + + /** */ + @Test + public void testToArray() { + QueryTasksQueue queue = new QueryTasksQueue(); + + QueryKey qryKey1 = new QueryKey(UUID.randomUUID(), 0); + QueryKey qryKey2 = new QueryKey(UUID.randomUUID(), 1); + QueryKey qryKey3 = new QueryKey(UUID.randomUUID(), 2); + + queue.addTask(new TestQueryAwareTask(qryKey1)); + queue.addTask(new TestQueryAwareTask(qryKey2)); + queue.addTask(new TestQueryAwareTask(qryKey3)); + queue.addTask(new TestQueryAwareTask(qryKey1)); + + assertEquals(4, queue.size()); + + QueryAwareTask[] tasks = queue.toArray(new TestQueryAwareTask[2]); + + assertEquals(4, queue.size()); + assertEquals(4, tasks.length); + assertEquals(qryKey1, tasks[0].queryKey()); + assertEquals(qryKey2, tasks[1].queryKey()); + assertEquals(qryKey3, tasks[2].queryKey()); + assertEquals(qryKey1, tasks[3].queryKey()); + + tasks = queue.toArray(new TestQueryAwareTask[] { + null, null, null, null, + new TestQueryAwareTask(qryKey1), + new TestQueryAwareTask(qryKey2) + }); + + assertEquals(4, queue.size()); + assertEquals(6, tasks.length); + assertEquals(qryKey1, tasks[0].queryKey()); + assertEquals(qryKey2, tasks[1].queryKey()); + assertEquals(qryKey3, tasks[2].queryKey()); + assertEquals(qryKey1, tasks[3].queryKey()); + assertNull(tasks[4]); + assertNull(tasks[5]); + } + + /** */ + @Test + public void testDrainTo() { + QueryTasksQueue queue = new QueryTasksQueue(); + + QueryKey qryKey1 = new QueryKey(UUID.randomUUID(), 0); + QueryKey qryKey2 = new QueryKey(UUID.randomUUID(), 1); + QueryKey qryKey3 = new QueryKey(UUID.randomUUID(), 2); + + queue.addTask(new TestQueryAwareTask(qryKey1)); + queue.addTask(new TestQueryAwareTask(qryKey2)); + queue.addTask(new TestQueryAwareTask(qryKey3)); + queue.addTask(new TestQueryAwareTask(qryKey1)); + queue.addTask(new TestQueryAwareTask(qryKey2)); + queue.addTask(new TestQueryAwareTask(qryKey3)); + + List tasks = new ArrayList<>(); + assertEquals(2, queue.drainTo(tasks, 2)); + + assertEquals(4, queue.size()); + assertEquals(2, tasks.size()); + assertEquals(qryKey1, tasks.get(0).queryKey()); + assertEquals(qryKey2, tasks.get(1).queryKey()); + + assertEquals(1, queue.drainTo(tasks, 1)); + + assertEquals(3, queue.size()); + assertEquals(3, tasks.size()); + assertEquals(qryKey3, tasks.get(2).queryKey()); + + tasks.clear(); + + assertEquals(3, queue.drainTo(tasks, Integer.MAX_VALUE)); + + assertEquals(0, queue.size()); + assertEquals(3, tasks.size()); + assertEquals(qryKey1, tasks.get(0).queryKey()); + assertEquals(qryKey2, tasks.get(1).queryKey()); + assertEquals(qryKey3, tasks.get(2).queryKey()); + } + + /** */ + @Test + public void testRemove() { + QueryTasksQueue queue = new QueryTasksQueue(); + + QueryKey qryKey1 = new QueryKey(UUID.randomUUID(), 0); + QueryKey qryKey2 = new QueryKey(UUID.randomUUID(), 1); + QueryAwareTask task1 = new TestQueryAwareTask(qryKey1); + QueryAwareTask task2 = new TestQueryAwareTask(qryKey1); + QueryAwareTask task3 = new TestQueryAwareTask(qryKey2); + QueryAwareTask task4 = new TestQueryAwareTask(qryKey2); + + queue.addTask(task1); + queue.addTask(task2); + queue.addTask(task3); + queue.addTask(task4); + + assertEquals(4, queue.size()); + + queue.removeTask(task2); + + assertEquals(3, queue.size()); + + assertEqualsArraysAware(new QueryAwareTask[] {task1, task3, task4}, queue.toArray(new TestQueryAwareTask[3])); + + queue.removeTask(task1); + + assertEquals(2, queue.size()); + + assertEqualsArraysAware(new QueryAwareTask[] {task3, task4}, queue.toArray(new TestQueryAwareTask[2])); + + queue.removeTask(task4); + + assertEquals(1, queue.size()); + + assertEqualsArraysAware(new QueryAwareTask[] {task3}, queue.toArray(new TestQueryAwareTask[1])); + + queue.removeTask(task3); + + assertEquals(0, queue.size()); + + // Check add after remove of all tasks. + queue.addTask(task1); + queue.addTask(task2); + assertEquals(2, queue.size()); + + assertEqualsArraysAware(new QueryAwareTask[] {task1, task2}, queue.toArray(new TestQueryAwareTask[2])); + } + + /** */ + private static class TestQueryAwareTask implements QueryAwareTask { + /** */ + private final QueryKey qryKey; + + /** */ + public TestQueryAwareTask(QueryKey qryKey) { + this.qryKey = qryKey; + } + + /** {@inheritDoc} */ + @Override public QueryKey queryKey() { + return qryKey; + } + + /** */ + @Override public void run() { + // No-op. + } + } +} diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java index 81e6024b56bc5..81c724c396275 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java @@ -187,12 +187,12 @@ protected void assertThrows(String sql, Class cls, String m } /** */ - protected IgniteCache createAndPopulateTable() { - return createAndPopulateTable(client, 2, CacheMode.PARTITIONED); + protected void createAndPopulateTable() { + createAndPopulateTable(client, 2, CacheMode.PARTITIONED); } /** */ - protected IgniteCache createAndPopulateTable(Ignite ignite, int backups, CacheMode cacheMode) { + protected void createAndPopulateTable(Ignite ignite, int backups, CacheMode cacheMode) { IgniteCache person = ignite.getOrCreateCache(this.cacheConfiguration() .setName(TABLE_NAME) .setSqlSchema("PUBLIC") @@ -212,8 +212,6 @@ protected IgniteCache createAndPopulateTable(Ignite ignite, i put(ignite, person, idx++, new Employer("Ilya", 15d)); put(ignite, person, idx++, new Employer("Roma", 10d)); put(ignite, person, idx, new Employer("Roma", 10d)); - - return person; } /** */ diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/QueryBlockingTaskExecutorIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/QueryBlockingTaskExecutorIntegrationTest.java new file mode 100644 index 0000000000000..c31b287caa8d3 --- /dev/null +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/QueryBlockingTaskExecutorIntegrationTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.integration; + +import org.apache.ignite.internal.processors.query.calcite.QueryChecker; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.WithSystemProperty; +import org.junit.Test; + +import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR; + +/** + * Integration test with common queries for query blocking task executor instead of striped task executor. + */ +@WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR, value = "true") +public class QueryBlockingTaskExecutorIntegrationTest extends AbstractBasicIntegrationTransactionalTest { + /** */ + @Test + public void testSimpleScan() { + createAndPopulateTable(); + + assertQuery("SELECT * FROM person").resultSize(5).check(); + } + + /** */ + @Test + public void testJoinRehash() throws Exception { + sql("CREATE TABLE order_items (id varchar, orderId int, amount int, PRIMARY KEY (id))\n" + + " WITH \"cache_name=order_items,backups=1," + atomicity() + "\""); + + sql("CREATE TABLE orders (id int, region varchar, PRIMARY KEY (id))\n" + + " WITH \"cache_name=orders,backups=1," + atomicity() + "\""); + + sql("CREATE INDEX order_items_orderId ON order_items (orderId ASC)"); + sql("CREATE INDEX orders_region ON orders (region ASC)"); + + for (int i = 0; i < 500; i++) { + sql("INSERT INTO orders VALUES(?, ?)", i, "region" + i % 10); + for (int j = 0; j < 20; j++) + sql("INSERT INTO order_items VALUES(?, ?, ?)", i + "_" + j, i, j); + } + + String sql = "SELECT sum(i.amount)" + + " FROM order_items i JOIN orders o ON o.id=i.orderId" + + " WHERE o.region = ?"; + + assertQuery(sql) + .withParams("region0") + .matches(QueryChecker.containsSubPlan("IgniteMergeJoin")) + .matches(QueryChecker.containsSubPlan("IgniteExchange(distribution=[affinity")) + .returns(9500L) // 50 * sum(0 .. 19) + .check(); + + // Create concurrent queries with a lot of tasks (join rehashing also add a concurrency, + // since send batches between nodes) + GridTestUtils.runMultiThreaded(() -> { + for (int i = 0; i < 100; i++) + assertQuery(sql).withParams("region" + (i % 10)).returns(9500L).check(); + }, 10, "query_starter"); + } +} diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java index b24e8e66d3d22..1c33edd3b06f6 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java @@ -59,7 +59,7 @@ import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.calcite.Query; import org.apache.ignite.internal.processors.query.calcite.QueryRegistry; -import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutorImpl; +import org.apache.ignite.internal.processors.query.calcite.exec.task.AbstractQueryTaskExecutor; import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; @@ -312,7 +312,7 @@ public void testUserQueriesMetrics() throws Exception { /** */ @Test public void testThreadPoolMetrics() { - String regName = metricName(PoolProcessor.THREAD_POOLS, QueryTaskExecutorImpl.THREAD_POOL_NAME); + String regName = metricName(PoolProcessor.THREAD_POOLS, AbstractQueryTaskExecutor.THREAD_POOL_NAME); MetricRegistry mreg = client.context().metric().registry(regName); LongMetric tasksCnt = mreg.findMetric("CompletedTaskCount"); diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDefinedFunctionsIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDefinedFunctionsIntegrationTest.java index 2f5796393cf29..6772c0030dfc7 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDefinedFunctionsIntegrationTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDefinedFunctionsIntegrationTest.java @@ -18,18 +18,35 @@ package org.apache.ignite.internal.processors.query.calcite.integration; import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.annotations.QuerySqlFunction; +import org.apache.ignite.calcite.CalciteQueryEngineConfiguration; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.WithSystemProperty; import org.junit.Test; +import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR; + /** * Integration test for user defined functions. */ +@WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR, value = "true") public class UserDefinedFunctionsIntegrationTest extends AbstractBasicIntegrationTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.getSqlConfiguration().setQueryEnginesConfiguration(new CalciteQueryEngineConfiguration()); + + return cfg; + } + /** */ @Test public void testFunctions() throws Exception { @@ -132,6 +149,25 @@ public static int addFour(int a, int b, int c, int d) { } } + /** */ + @Test + public void testInnerSql() { + IgniteCache emp4 = client.getOrCreateCache(this.cacheConfiguration() + .setName("emp4") + .setSqlFunctionClasses(InnerSqlFunctionsLibrary.class) + .setSqlSchema("PUBLIC") + .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("emp4"))) + ); + + for (int i = 0; i < 100; i++) + put(client, emp4, i, new Employer("Name" + i, (double)i)); + + assertQuery(grid(0), "SELECT sum(salary(?, _key)) FROM emp4") + .withParams(grid(0).name()) + .returns(4950d) + .check(); + } + /** */ public static class MulFunctionsLibrary { /** */ @@ -167,4 +203,16 @@ public static String echo(String s) { return s; } } + + /** */ + public static class InnerSqlFunctionsLibrary { + /** */ + @QuerySqlFunction + public static double salary(String ignite, int key) { + return (double)Ignition.ignite(ignite) + .cache("emp4") + .query(new SqlFieldsQuery("SELECT salary FROM emp4 WHERE _key = ?").setArgs(key)) + .getAll().get(0).get(0); + } + } } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java index 56c23704a510b..95a8c3454e45c 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java @@ -62,8 +62,8 @@ import org.apache.ignite.internal.processors.failure.FailureProcessor; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; -import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutorImpl; import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; +import org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader; import org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage; import org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl; @@ -123,7 +123,7 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest { protected List nodes; /** */ - protected List executors; + protected List executors; /** */ protected volatile Throwable lastE; @@ -147,7 +147,7 @@ public void setup() { @After public void tearDown() throws Throwable { if (!F.isEmpty(executors)) - executors.forEach(QueryTaskExecutorImpl::tearDown); + executors.forEach(StripedQueryTaskExecutor::tearDown); if (lastE != null) throw lastE; diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java index 512861139f514..b3b52d4b840dd 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java @@ -39,10 +39,10 @@ import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; import org.apache.ignite.internal.processors.query.calcite.exec.LogicalRelImplementor; import org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistryImpl; -import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutorImpl; import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node; import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox; import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode; +import org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTracker; import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker; import org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl; @@ -380,7 +380,7 @@ private Node implementFragment( kernal.add(new NoOpIgniteSecurityProcessor(kernal)); kernal.add(new GridCacheProcessor(kernal)); - QueryTaskExecutorImpl taskExecutor = new QueryTaskExecutorImpl(kernal); + StripedQueryTaskExecutor taskExecutor = new StripedQueryTaskExecutor(kernal); taskExecutor.stripedThreadPoolExecutor(new IgniteStripedThreadPoolExecutor( kernal.config().getQueryThreadPoolSize(), kernal.igniteInstanceName(), diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java index 5bdf5d417b510..b9f1914f9e10d 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java +++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java @@ -17,13 +17,8 @@ package org.apache.ignite.testsuites; -import org.apache.ignite.internal.processors.query.calcite.QueryCheckerTest; -import org.apache.ignite.internal.processors.query.calcite.exec.ClosableIteratorsHolderTest; -import org.apache.ignite.internal.processors.query.calcite.exec.KeyFilteringCursorTest; import org.apache.ignite.internal.processors.query.calcite.exec.LogicalRelImplementorTest; import org.apache.ignite.internal.processors.query.calcite.exec.NumericTypesPrecisionsTest; -import org.apache.ignite.internal.processors.query.calcite.exec.exp.IgniteSqlFunctionsTest; -import org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTrackerTest; import org.apache.ignite.internal.processors.query.calcite.jdbc.JdbcConnectionEnabledPropertyTest; import org.apache.ignite.internal.processors.query.calcite.jdbc.JdbcSetClientInfoTest; import org.apache.ignite.internal.processors.query.calcite.jdbc.JdbcThinTransactionalSelfTest; @@ -43,13 +38,10 @@ PlannerTestSuite.class, ExecutionTestSuite.class, IntegrationTestSuite.class, + UtilTestSuite.class, - ClosableIteratorsHolderTest.class, - MemoryTrackerTest.class, - QueryCheckerTest.class, SqlCustomParserTest.class, SqlReservedWordsTest.class, - IgniteSqlFunctionsTest.class, LogicalRelImplementorTest.class, ScriptTestSuite.class, @@ -57,7 +49,6 @@ NumericTypesPrecisionsTest.class, - KeyFilteringCursorTest.class, SqlTransactionsIsolationTest.class, SqlTransactionsUnsupportedModesTest.class, diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java index 8e64b50f6d923..50032d9253f5e 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java +++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.query.calcite.integration.OperatorsExtensionIntegrationTest; import org.apache.ignite.internal.processors.query.calcite.integration.PartitionPruneTest; import org.apache.ignite.internal.processors.query.calcite.integration.PartitionsReservationIntegrationTest; +import org.apache.ignite.internal.processors.query.calcite.integration.QueryBlockingTaskExecutorIntegrationTest; import org.apache.ignite.internal.processors.query.calcite.integration.QueryEngineConfigurationIntegrationTest; import org.apache.ignite.internal.processors.query.calcite.integration.QueryMetadataIntegrationTest; import org.apache.ignite.internal.processors.query.calcite.integration.QueryWithPartitionsIntegrationTest; @@ -151,6 +152,7 @@ OperatorsExtensionIntegrationTest.class, SessionContextSqlFunctionTest.class, SqlPlanHistoryIntegrationTest.class, + QueryBlockingTaskExecutorIntegrationTest.class, }) public class IntegrationTestSuite { } diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/UtilTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/UtilTestSuite.java new file mode 100644 index 0000000000000..527f0240060d6 --- /dev/null +++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/UtilTestSuite.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testsuites; + +import org.apache.ignite.internal.processors.query.calcite.QueryCheckerTest; +import org.apache.ignite.internal.processors.query.calcite.exec.ClosableIteratorsHolderTest; +import org.apache.ignite.internal.processors.query.calcite.exec.KeyFilteringCursorTest; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.IgniteSqlFunctionsTest; +import org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutorTest; +import org.apache.ignite.internal.processors.query.calcite.exec.task.QueryTasksQueueTest; +import org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTrackerTest; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +/** + * Calcite utility classes tests. + */ +@RunWith(Suite.class) +@Suite.SuiteClasses({ + ClosableIteratorsHolderTest.class, + MemoryTrackerTest.class, + QueryCheckerTest.class, + IgniteSqlFunctionsTest.class, + KeyFilteringCursorTest.class, + QueryBlockingTaskExecutorTest.class, + QueryTasksQueueTest.class, +}) +public class UtilTestSuite { +}