Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-24323 SQL Calcite: Add query blocking tasks executor (allows t… #11833

Closed
wants to merge 7 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@
import org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistry;
import org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistryImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutor;
import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutorImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutService;
import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutServiceImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.RexExecutorImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor;
import org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor;
import org.apache.ignite.internal.processors.query.calcite.hint.HintsConfig;
import org.apache.ignite.internal.processors.query.calcite.message.MessageService;
import org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl;
Expand Down Expand Up @@ -124,6 +125,7 @@
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.IgniteSystemProperties.getLong;
import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION;

Expand All @@ -141,6 +143,15 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
defaults = "" + DFLT_IGNITE_CALCITE_PLANNER_TIMEOUT)
public static final String IGNITE_CALCITE_PLANNER_TIMEOUT = "IGNITE_CALCITE_PLANNER_TIMEOUT";

/**
* Planner timeout property name.
*/
@SystemProperty(value = "Calcite-based SQL engine. Use query blocking task executor instead of striped task " +
"executor. Query blocking executor allows to run SQL queries inside user-defined functions at the cost of " +
"some performance penalty", defaults = "" + false)
public static final String IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR =
"IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR";

/** */
public static final FrameworkConfig FRAMEWORK_CONFIG = Frameworks.newConfigBuilder()
.executor(new RexExecutorImpl(DataContexts.EMPTY))
Expand Down Expand Up @@ -186,8 +197,10 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
private final FrameworkConfig frameworkCfg;

/** Query planner timeout. */
private final long queryPlannerTimeout = getLong(IGNITE_CALCITE_PLANNER_TIMEOUT,
DFLT_IGNITE_CALCITE_PLANNER_TIMEOUT);
private final long qryPlannerTimeout = getLong(IGNITE_CALCITE_PLANNER_TIMEOUT, DFLT_IGNITE_CALCITE_PLANNER_TIMEOUT);

/** Use query blocking task executor. */
private final boolean useQryBlockingTaskExecutor = getBoolean(IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR);
alex-plekhanov marked this conversation as resolved.
Show resolved Hide resolved

/** */
private final QueryPlanCache qryPlanCache;
Expand Down Expand Up @@ -254,7 +267,9 @@ public CalciteQueryProcessor(GridKernalContext ctx) {
qryPlanCache = new QueryPlanCacheImpl(ctx);
parserMetrics = new QueryParserMetricsHolder(ctx.metric());
mailboxRegistry = new MailboxRegistryImpl(ctx);
taskExecutor = new QueryTaskExecutorImpl(ctx);
taskExecutor = useQryBlockingTaskExecutor
? new QueryBlockingTaskExecutor(ctx)
: new StripedQueryTaskExecutor(ctx);
executionSvc = new ExecutionServiceImpl<>(ctx, ArrayRowHandler.INSTANCE);
partSvc = new AffinityServiceImpl(ctx);
msgSvc = new MessageServiceImpl(ctx);
Expand Down Expand Up @@ -666,7 +681,7 @@ private <T> T processQuery(
exchangeSvc,
(q, ex) -> qryReg.unregister(q.id(), ex),
log,
queryPlannerTimeout,
qryPlannerTimeout,
timeout
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ public abstract class AbstractNode<Row> implements Node<Row> {
/** */
protected static final int IO_BATCH_CNT = IgniteSystemProperties.getInteger(IGNITE_CALCITE_EXEC_IO_BATCH_CNT, 4);

/** for debug purpose */
private volatile Thread thread;

/**
* {@link Inbox} node may not have proper context at creation time in case it
* creates on first message received from a remote source. This case the context
Expand Down Expand Up @@ -186,12 +183,6 @@ protected void checkState() throws Exception {
throw new QueryCancelledException("The query was timed out.");
if (Thread.interrupted())
throw new IgniteInterruptedCheckedException("Thread was interrupted.");
if (!U.assertionsEnabled())
return;
if (thread == null)
thread = Thread.currentThread();
else
assert thread == Thread.currentThread();
}

/** */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.query.calcite.exec.task;

import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutor;
import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;

/**
* Abstract query task executor.
*/
public abstract class AbstractQueryTaskExecutor extends AbstractService implements QueryTaskExecutor, Thread.UncaughtExceptionHandler {
/** */
public static final String THREAD_POOL_NAME = "CalciteQueryExecutor";

/** */
protected final GridKernalContext ctx;

/** */
protected Thread.UncaughtExceptionHandler eHnd;

/** */
protected AbstractQueryTaskExecutor(GridKernalContext ctx) {
super(ctx);
this.ctx = ctx;
}

/** {@inheritDoc} */
@Override public void uncaughtException(Thread t, Throwable e) {
if (eHnd != null)
eHnd.uncaughtException(t, e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.query.calcite.exec.task;

import org.apache.ignite.internal.util.typedef.internal.S;

/**
* Query aware task.
*/
public class QueryAwareTask implements Runnable {
alex-plekhanov marked this conversation as resolved.
Show resolved Hide resolved
/** */
private final QueryKey qryKey;

/** */
private final Runnable runnable;

/** */
public QueryAwareTask(QueryKey qryKey, Runnable runnable) {
this.qryKey = qryKey;
this.runnable = runnable;
}

/** */
public QueryKey queryKey() {
return qryKey;
}

/** {@inheritDoc} */
@Override public void run() {
runnable.run();
}

/** */
@Override public String toString() {
return S.toString(QueryAwareTask.class, this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.query.calcite.exec.task;

import java.util.UUID;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;

import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
import static org.apache.ignite.internal.processors.pool.PoolProcessor.THREAD_POOLS;

/**
* Query task executor based on queue with query blocking.
*/
public class QueryBlockingTaskExecutor extends AbstractQueryTaskExecutor {
/** */
private final QueryTasksQueue tasksQueue = new QueryTasksQueue();

/** */
private IgniteThreadPoolExecutor executor;

/** */
public QueryBlockingTaskExecutor(GridKernalContext ctx) {
super(ctx);
}

/** {@inheritDoc} */
@Override public void execute(UUID qryId, long fragmentId, Runnable qryTask) {
SecurityContext secCtx = ctx.security().securityContext();

QueryKey qryKey = new QueryKey(qryId, fragmentId);

executor.execute(
new QueryAwareTask(qryKey,
() -> {
try (AutoCloseable ignored = ctx.security().withContext(secCtx)) {
qryTask.run();
}
catch (Throwable e) {
U.warn(log, "Uncaught exception", e);

/*
* No exceptions are rethrown here to preserve the current thread from being destroyed,
* because other queries may be pinned to the current thread id.
* However, unrecoverable errors must be processed by FailureHandler.
*/
uncaughtException(Thread.currentThread(), e);
}
finally {
tasksQueue.unblockQuery(qryKey);
}
}
alex-plekhanov marked this conversation as resolved.
Show resolved Hide resolved
)
);
}

/** {@inheritDoc} */
@Override public void onStart(GridKernalContext ctx) {
eHnd = ctx.uncaughtExceptionHandler();

executor = new IgniteThreadPoolExecutor(
"calciteQry",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All other pools names starts with capital letter.
Let's call it CalciteQueryAwareExecutor and the default one CalciteExecutor.

Copy link
Contributor Author

@alex-plekhanov alex-plekhanov Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a pool name, it's thread prefix. All other thread prefixes start with non-capital letters.

ctx.igniteInstanceName(),
ctx.config().getQueryThreadPoolSize(),
ctx.config().getQueryThreadPoolSize(),
IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME,
tasksQueue.blockingQueue(),
GridIoPolicy.CALLER_THREAD,
eHnd
);

// Prestart threads to ensure that all threads always use queue to poll tasks (without this call worker can
// get its first task directly from 'execute' method, bypassing tasks queue).
executor.prestartAllCoreThreads();

executor.registerMetrics(ctx.metric().registry(metricName(THREAD_POOLS, THREAD_POOL_NAME)));
}

/** {@inheritDoc} */
@Override public void tearDown() {
U.shutdownNow(getClass(), executor, log);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.query.calcite.exec.task;

import java.util.Objects;
import java.util.UUID;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;

/**
* Query key.
*/
public class QueryKey {
alex-plekhanov marked this conversation as resolved.
Show resolved Hide resolved
/** */
private final UUID qryId;

/** */
private final long fragmentId;

/** */
public QueryKey(UUID qryId, long fragmentId) {
this.qryId = qryId;
this.fragmentId = fragmentId;
}

/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;

if (o == null || getClass() != o.getClass())
return false;

QueryKey key = (QueryKey)o;

return fragmentId == key.fragmentId && Objects.equals(qryId, key.qryId);
}

/** {@inheritDoc} */
@Override public int hashCode() {
return U.safeAbs(31 * (31 + (qryId != null ? qryId.hashCode() : 0)) + Long.hashCode(fragmentId));
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(QueryKey.class, this);
}
}
Loading
Loading