diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
index 0c23f2146c007..7e64f3735759e 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
@@ -44,8 +44,8 @@
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.SecurityContext;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -236,7 +236,7 @@ protected List> sql(String sql, Object... params) {
protected List> sqlAsRoot(IgniteEx ignite, String sql) throws Exception {
SecurityContext secCtx = authenticate(grid(0), DFAULT_USER_NAME, "ignite");
- try (OperationSecurityContext ignored = ignite.context().security().withContext(secCtx)) {
+ try (Scope ignored = ignite.context().security().withContext(secCtx)) {
return sql(ignite, sql);
}
}
diff --git a/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java b/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java
index 02321a705a2d8..7634b43564594 100644
--- a/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java
@@ -54,12 +54,12 @@
import org.apache.ignite.internal.management.cache.VerifyBackupPartitionsTask;
import org.apache.ignite.internal.processors.security.AbstractSecurityTest;
import org.apache.ignite.internal.processors.security.AbstractTestSecurityPluginProvider;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.PublicAccessJob;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.processors.security.compute.ComputePermissionCheckTest;
import org.apache.ignite.internal.processors.security.impl.TestSecurityData;
import org.apache.ignite.internal.processors.security.impl.TestSecurityPluginProvider;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.lang.ConsumerX;
import org.apache.ignite.internal.util.lang.RunnableX;
import org.apache.ignite.internal.util.lang.gridfunc.AtomicIntegerFactoryCallable;
@@ -476,7 +476,7 @@ public void testSystemTaskCancel() throws Exception {
SecurityContext initiatorSecCtx = securityContext("no-permissions-login-0");
SupplierX> starter = () -> {
- try (OperationSecurityContext ignored1 = grid(0).context().security().withContext(initiatorSecCtx)) {
+ try (Scope ignored1 = grid(0).context().security().withContext(initiatorSecCtx)) {
return new TestFutureAdapter<>(
grid(0).context().closure().runAsync(
BROADCAST,
@@ -525,7 +525,7 @@ private void checkTaskCancel(
assertTrue(taskStartedLatch.await(getTestTimeout(), MILLISECONDS));
try (
- OperationSecurityContext ignored = initiator == null
+ Scope ignored = initiator == null
? null
: grid(0).context().security().withContext(initiator)
) {
diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java
new file mode 100644
index 0000000000000..6953d8b853891
--- /dev/null
+++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.thread.context.concurrent.OperationContextAwareExecutor;
+import org.apache.ignite.internal.thread.context.function.OperationContextAwareCallable;
+import org.apache.ignite.internal.thread.context.function.OperationContextAwareRunnable;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.thread.context.Scope.NOOP_SCOPE;
+
+/**
+ * Represents a storage of {@link OperationContextAttribute}s and their corresponding values bound to the JVM thread.
+ * The state of {@link OperationContext} is determined by a sequence of {@link Update}s applied to it. Each Update
+ * stores the updated or newly added {@link OperationContextAttribute} values and link to the previous Update.
+ *
+ * {@link OperationContext} Updates can be undone in the same order they were applied by closing the {@link Scope}
+ * associated with each update (see {@link #set(OperationContextAttribute, Object)} and related methods).
+ *
+ * {@link OperationContext} bound to one JVM thread can be saved and restored in another thread using the snapshot
+ * mechanism (see {@link #createSnapshot()} and {@link #restoreSnapshot(OperationContextSnapshot) methods}). This
+ * provides basic functionality for implementing asynchronous executors that automatically propagate
+ * {@link OperationContext} data between JVM threads.
+ *
+ *
+ * @see Scope
+ * @see OperationContextSnapshot
+ * @see OperationContextAwareCallable
+ * @see OperationContextAwareRunnable
+ * @see OperationContextAwareExecutor
+ */
+public class OperationContext {
+ /** */
+ private static final ThreadLocal INSTANCE = ThreadLocal.withInitial(OperationContext::new);
+
+ /**
+ * Sequence of updated applied to the {@link OperationContext}. Each update holds a link to the previous Update,
+ * so we store only the reference to the last one.
+ */
+ @Nullable private Update lastUpd;
+
+ /** */
+ private OperationContext() {
+ // No-op.
+ }
+
+ /**
+ * Retrieves value associated with specified attribute by accessing {@link OperationContext} bound to the thread
+ * this method is called from. If no value is explicitly associated with specified attribute,
+ * {@link OperationContextAttribute#initialValue()} is returned.
+ *
+ * @param attr Context Attribute.
+ * @return Context Attribute Value.
+ */
+ @Nullable public static T get(OperationContextAttribute attr) {
+ return INSTANCE.get().getInternal(attr);
+ }
+
+ /**
+ * Updates the value of the specified attribute for the {@link OperationContext} bound to the thread this method
+ * is called from.
+ *
+ * @param attr Context Attribute.
+ * @return Scope instance that, when closed, undoes the applied update. It is crucial to undo all applied
+ * {@link OperationContext} updates to free up thread-bound resources and avoid memory leaks, so it is highly
+ * encouraged to use a try-with-resource block to close the returned Scope. Note, updates must be undone in the
+ * same order and in the same thread they were applied.
+ */
+ public static Scope set(OperationContextAttribute attr, T val) {
+ OperationContext ctx = INSTANCE.get();
+
+ return ctx.getInternal(attr) == val ? NOOP_SCOPE : ctx.applyAttributeUpdates(new AttributeValueHolder<>(attr, val));
+ }
+
+ /**
+ * Updates the values of the specified attributes for the {@link OperationContext} bound to the thread this method
+ * is called from.
+ *
+ * @param attr1 First Context Attribute.
+ * @param val1 Values associated with first Context Attribute.
+ * @param attr2 Second Context Attribute.
+ * @param val2 Values associated with second Context Attribute.
+ * @return Scope instance that, when closed, undoes the applied update. It is crucial to undo all applied
+ * {@link OperationContext} updates to free up thread-bound resources and avoid memory leaks, so it is highly
+ * encouraged to use a try-with-resource block to close the returned Scope. Note, updates must be undone in the
+ * same order and in the same thread they were applied.
+ */
+ public static Scope set(
+ OperationContextAttribute attr1, T1 val1,
+ OperationContextAttribute attr2, T2 val2
+ ) {
+ return ContextUpdater.create().set(attr1, val1).set(attr2, val2).apply();
+ }
+
+ /**
+ * Updates the values of the specified attributes for the {@link OperationContext} bound to the thread this method
+ * is called from.
+ *
+ * @param attr1 First Context Attribute.
+ * @param val1 Values associated with first Context Attribute.
+ * @param attr2 Second Context Attribute.
+ * @param val2 Values associated with second Context Attribute.
+ * @param attr3 Third Context Attribute.
+ * @param val3 Values associated with third Context Attribute.
+ * @return Scope instance that, when closed, undoes the applied update. It is crucial to undo all applied
+ * {@link OperationContext} updates to free up thread-bound resources and avoid memory leaks, so it is highly
+ * encouraged to use a try-with-resource block to close the returned Scope. Note, updates must be undone in the
+ * same order and in the same thread they were applied.
+ */
+ public static Scope set(
+ OperationContextAttribute attr1, T1 val1,
+ OperationContextAttribute attr2, T2 val2,
+ OperationContextAttribute attr3, T3 val3
+ ) {
+ return ContextUpdater.create().set(attr1, val1).set(attr2, val2).set(attr3, val3).apply();
+ }
+
+ /**
+ * Creates Snapshot of all attributes and their corresponding values stored in the {@link OperationContext} bound
+ * the thread this method is called from.
+ *
+ * @return Context Snapshot.
+ */
+ public static OperationContextSnapshot createSnapshot() {
+ return INSTANCE.get().createSnapshotInternal();
+ }
+
+ /**
+ * Restores values of all attributes for {@link OperationContext} bound to the thread this method is called from.
+ *
+ * @param snp Context Snapshot.
+ * @return Scope instance that, when closed, undoes the applied operation. It is crucial to undo all applied
+ * {@link OperationContext} updates to free up thread-bound resources and avoid memory leaks, so it is highly
+ * encouraged to use a try-with-resource block to close the returned Scope. Note, updates must be undone in the
+ * same order and in the same thread they were applied.
+ */
+ public static Scope restoreSnapshot(OperationContextSnapshot snp) {
+ return INSTANCE.get().restoreSnapshotInternal(snp);
+ }
+
+ /**
+ * Retrieves value for the specified attribute from the current {@link OperationContext}. If no value is explicitly
+ * associated with specified attribute, {@link OperationContextAttribute#initialValue()} is returned.
+ */
+ @Nullable private T getInternal(OperationContextAttribute attr) {
+ if (lastUpd == null || (lastUpd.storedAttrBits & attr.bitmask()) == 0)
+ return attr.initialValue(); // OperationContext does not store value for the specified attribute.
+
+ AttributeValueHolder valHolder = findAttributeValue(attr);
+
+ assert valHolder != null;
+ assert valHolder.attr.equals(attr);
+
+ return valHolder.val;
+ }
+
+ /** Updates the current context with the specified attributes and their corresponding values. */
+ private Scope applyAttributeUpdates(AttributeValueHolder>... attrVals) {
+ lastUpd = new Update(attrVals, lastUpd);
+
+ return lastUpd;
+ }
+
+ /** Undoes the latest updated. */
+ private void undo(Update upd) {
+ assert lastUpd == upd;
+
+ lastUpd = lastUpd.prev;
+ }
+
+ /** Iterates over the currently applied context updates and finds the latest value associated with the specified attribute. */
+ private AttributeValueHolder findAttributeValue(OperationContextAttribute attr) {
+ for (Update upd = lastUpd; upd != null; upd = upd.prev) {
+ if (!upd.holdsValueFor(attr))
+ continue;
+
+ return upd.value(attr);
+ }
+
+ return null;
+ }
+
+ /** */
+ private OperationContextSnapshot createSnapshotInternal() {
+ // The sequence of updates defines the state of the OperationContext. Each update is linked to the previous
+ // one and immutable. Therefore, to restore the context state elsewhere, we only need to share a reference to
+ // the most recent update.
+ return lastUpd;
+ }
+
+ /** */
+ private Scope restoreSnapshotInternal(OperationContextSnapshot newSnp) {
+ OperationContextSnapshot prevSnp = createSnapshotInternal();
+
+ if (newSnp == prevSnp)
+ return NOOP_SCOPE;
+
+ changeState(prevSnp, newSnp);
+
+ return () -> changeState(newSnp, prevSnp);
+ }
+
+ /** */
+ private void changeState(OperationContextSnapshot expState, OperationContextSnapshot newState) {
+ assert lastUpd == expState;
+
+ lastUpd = (Update)newState;
+ }
+
+ /** Represents Update applied to the {@link OperationContext}. */
+ private class Update implements Scope, OperationContextSnapshot {
+ /** Updated attributes and their corresponding values. */
+ private final AttributeValueHolder>[] attrVals;
+
+ /**
+ * Bits representing all attributes which values were changed by this update.
+ *
+ * @see OperationContextAttribute#bitmask()
+ */
+ private final int updAttrBits;
+
+ /**
+ * Bits representing all attributes stored in the current {@link OperationContext} after this Update and all
+ * preceding are applied. We need this for two purposes:
+ *
+ *
fast check whether any of the currently applied {@link OperationContext} Updates store value for the
+ * particular attribute
+ *
do not recalculate state of all attributes when update is undone
+ *
+ *
+ * @see OperationContextAttribute#bitmask()
+ */
+ private final int storedAttrBits;
+
+ /** Link to the previous update. */
+ private final Update prev;
+
+ /** */
+ Update(AttributeValueHolder>[] attrVals, Update prev) {
+ this.attrVals = attrVals;
+ this.prev = prev;
+
+ updAttrBits = mergeUpdatedAttributeBits(attrVals);
+ storedAttrBits = prev == null ? updAttrBits : prev.storedAttrBits | updAttrBits;
+ }
+
+ /** @return Whether current update contains value for the specified attribute. */
+ boolean holdsValueFor(OperationContextAttribute> attr) {
+ return (updAttrBits & attr.bitmask()) != 0;
+ }
+
+ /**
+ * @return Attribute value that was set by the current update for the specified attribute. {@code null} if
+ * specified Attribute was not changed by this update.
+ */
+ @Nullable AttributeValueHolder value(OperationContextAttribute attr) {
+ // We iterate in reverse order to correctly handle the case when the value for the same attribute is
+ // specified multiple times.
+ for (int i = attrVals.length - 1; i >= 0; i--) {
+ AttributeValueHolder> valHolder = attrVals[i];
+
+ if (valHolder.attr.equals(attr))
+ return ((AttributeValueHolder)valHolder);
+ }
+
+ return null;
+ }
+
+ /** */
+ private int mergeUpdatedAttributeBits(AttributeValueHolder>[] attrVals) {
+ int res = 0;
+
+ for (AttributeValueHolder> valHolder : attrVals)
+ res |= valHolder.attr.bitmask();
+
+ return res;
+ }
+
+ /** */
+ @Override public void close() {
+ undo(this);
+ }
+ }
+
+ /** Immutable container that stores an attribute and its corresponding value. */
+ private static class AttributeValueHolder {
+ /** */
+ private final OperationContextAttribute attr;
+
+ /** */
+ private final T val;
+
+ /** */
+ AttributeValueHolder(OperationContextAttribute attr, T val) {
+ this.attr = attr;
+ this.val = val;
+ }
+ }
+
+ /** Allows to change multiple attribute values in a single update operation and skip updates that changes nothing. */
+ private static class ContextUpdater {
+ /** */
+ private static final int INIT_UPDATES_CAPACITY = 3;
+
+ /** */
+ private final OperationContext ctx;
+
+ /** */
+ private List> updates;
+
+ /** */
+ private ContextUpdater(OperationContext ctx) {
+ this.ctx = ctx;
+ }
+
+ /** */
+ ContextUpdater set(OperationContextAttribute attr, T val) {
+ if (ctx.getInternal(attr) == val)
+ return this;
+
+ if (updates == null)
+ updates = new ArrayList<>(INIT_UPDATES_CAPACITY);
+
+ updates.add(new AttributeValueHolder<>(attr, val));
+
+ return this;
+ }
+
+ /** */
+ Scope apply() {
+ if (F.isEmpty(updates))
+ return NOOP_SCOPE;
+
+ AttributeValueHolder>[] sealedUpdates = new AttributeValueHolder[updates.size()];
+
+ updates.toArray(sealedUpdates);
+
+ return ctx.applyAttributeUpdates(sealedUpdates);
+ }
+
+ /** */
+ static ContextUpdater create() {
+ return new ContextUpdater(INSTANCE.get());
+ }
+ }
+}
diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java
new file mode 100644
index 0000000000000..499d241d9ccba
--- /dev/null
+++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents a key to access and modify {@link OperationContext} records.
+ *
+ * @see OperationContext
+ * @see OperationContext#get(OperationContextAttribute)
+ * @see OperationContext#set(OperationContextAttribute, Object)
+ */
+public class OperationContextAttribute {
+ /** */
+ static final AtomicInteger ID_GEN = new AtomicInteger();
+
+ /** */
+ static final int MAX_ATTR_CNT = Integer.SIZE;
+
+ /** */
+ private final int bitmask;
+
+ /** */
+ @Nullable private final T initVal;
+
+ /** */
+ private OperationContextAttribute(int bitmask, @Nullable T initVal) {
+ this.bitmask = bitmask;
+ this.initVal = initVal;
+ }
+
+ /**
+ * Initial Value associated with the current Attribute. Initial value will be automatically returned by the
+ * {@link OperationContext#get} method if Attribute's value has not been previously set.
+ * @see OperationContext#get(OperationContextAttribute)
+ */
+ @Nullable public T initialValue() {
+ return initVal;
+ }
+
+ /**
+ * Unique attribute bitmask calculated by shifting one from 0 to {@link Integer#SIZE}. It provides an ability to
+ * use {@link OperationContext} Attribute with bit fields.
+ */
+ int bitmask() {
+ return bitmask;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object other) {
+ if (this == other)
+ return true;
+
+ if (!(other instanceof OperationContextAttribute))
+ return false;
+
+ return bitmask == ((OperationContextAttribute>)other).bitmask;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return bitmask;
+ }
+
+ /**
+ * Creates new instance of the {@link OperationContext} Attribute with Initial Value set to {@code null}.
+ *
+ * Note, that the maximum number of attribute instances that can be created is currently limited to
+ * {@link #MAX_ATTR_CNT} for implementation reasons.
+ *
+ */
+ public static OperationContextAttribute newInstance() {
+ return newInstance(null);
+ }
+
+ /**
+ * Creates new instance of the {@link OperationContext} Attribute with the specified Initial Value. The Initial
+ * Value is returned by {@link OperationContext#get} method if the Attribute's value is not explicitly set in the
+ * {@link OperationContext}.
+ *
+ * Note, that the maximum number of attribute instances that can be created is currently limited to
+ * {@link #MAX_ATTR_CNT} for implementation reasons.
+ *
+ */
+ public static OperationContextAttribute newInstance(T initVal) {
+ int id = ID_GEN.getAndIncrement();
+
+ assert id < MAX_ATTR_CNT : "Exceeded maximum supported number of created Attributes instances [maxCnt=" + MAX_ATTR_CNT + ']';
+
+ return new OperationContextAttribute<>(1 << id, initVal);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/OperationSecurityContext.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextSnapshot.java
similarity index 51%
rename from modules/core/src/main/java/org/apache/ignite/internal/processors/security/OperationSecurityContext.java
rename to modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextSnapshot.java
index 7f65d942948ad..d863aa3f11a39 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/OperationSecurityContext.java
+++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextSnapshot.java
@@ -15,32 +15,22 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.security;
+package org.apache.ignite.internal.thread.context;
+
+import org.apache.ignite.internal.thread.context.function.OperationContextAwareCallable;
+import org.apache.ignite.internal.thread.context.function.OperationContextAwareRunnable;
/**
+ * Represents snapshot of all Attributes and their corresponding values for a particular {@link OperationContext}
+ * instance. Its main purpose to save {@link OperationContext} state and restore it later, possible for
+ * {@link OperationContext} bound to another thread.
*
+ * @see OperationContext
+ * @see OperationContext#createSnapshot()
+ * @see OperationContext#restoreSnapshot(OperationContextSnapshot)
+ * @see OperationContextAwareCallable
+ * @see OperationContextAwareRunnable
*/
-public class OperationSecurityContext implements AutoCloseable {
- /** Ignite Security. */
- private final IgniteSecurity proc;
-
- /** Security context. */
- private final SecurityContext secCtx;
-
- /**
- * @param proc Ignite Security.
- * @param secCtx Security context.
- */
- OperationSecurityContext(IgniteSecurity proc, SecurityContext secCtx) {
- this.proc = proc;
- this.secCtx = secCtx;
- }
-
- /** {@inheritDoc} */
- @Override public void close() {
- if (secCtx == null)
- ((IgniteSecurityProcessor)proc).restoreDefaultContext();
- else
- proc.withContext(secCtx);
- }
+public interface OperationContextSnapshot {
+ // No-op.
}
diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/Scope.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/Scope.java
new file mode 100644
index 0000000000000..a48cf2e57aedb
--- /dev/null
+++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/Scope.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context;
+
+/**
+ * Represents the Scope of {@link OperationContext} attributes update. Explicitly calling {@link #close()} method undoes
+ * the applied changes and restores previous attribute values, if any. Note that every Scope relating to a specific
+ * {@link OperationContext} update must be closed to free up thread-bound resources and avoid memory leaks, so it is
+ * highly encouraged to use a try-with-resource block with Scope instances.
+ *
+ * Scope is result of the following {@link OperationContext} update operations:
+ *
+ *
{@link OperationContext#set(OperationContextAttribute, Object)} - creates a new or update an existing mapping
+ * between specified {@link OperationContextAttribute} and its value
+ *
{@link OperationContext#restoreSnapshot(OperationContextSnapshot)} - updates {@link OperationContextAttribute}
+ * values to match the values stored in {@link OperationContextSnapshot}
+ *
+ *
+ *
+ * @see OperationContext#set(OperationContextAttribute, Object)
+ * @see OperationContext#restoreSnapshot(OperationContextSnapshot)
+ */
+public interface Scope extends AutoCloseable {
+ /** Scope instance that does nothing when closed. */
+ Scope NOOP_SCOPE = () -> {};
+
+ /** Closes the scope. This operation cannot fail. */
+ @Override void close();
+}
diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/concurrent/OperationContextAwareExecutor.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/concurrent/OperationContextAwareExecutor.java
new file mode 100644
index 0000000000000..22ef5141739d5
--- /dev/null
+++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/concurrent/OperationContextAwareExecutor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context.concurrent;
+
+import java.util.concurrent.Executor;
+import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
+import org.apache.ignite.internal.thread.context.function.OperationContextAwareRunnable;
+import org.jetbrains.annotations.NotNull;
+
+/** */
+public class OperationContextAwareExecutor implements Executor {
+ /** */
+ private final Executor delegate;
+
+ /** */
+ private OperationContextAwareExecutor(Executor delegate) {
+ this.delegate = delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void execute(@NotNull Runnable command) {
+ delegate.execute(OperationContextAwareRunnable.wrap(command));
+ }
+
+ /**
+ * Creates executor wrapper that automatically captures {@link OperationContextSnapshot} of {@link OperationContext}
+ * for the thread that invokes task execution. Captured {@link OperationContextSnapshot} will be restored before
+ * task execution, potentially in another thread.
+ */
+ public static Executor wrap(Executor delegate) {
+ return delegate == null ? null : new OperationContextAwareExecutor(delegate);
+ }
+}
diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareCallable.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareCallable.java
new file mode 100644
index 0000000000000..909a4d88dcb83
--- /dev/null
+++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareCallable.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context.function;
+
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
+import org.apache.ignite.internal.thread.context.Scope;
+
+/** */
+public class OperationContextAwareCallable extends OperationContextAwareWrapper> implements Callable {
+ /** */
+ private OperationContextAwareCallable(Callable delegate, OperationContextSnapshot snapshot) {
+ super(delegate, snapshot);
+ }
+
+ /** {@inheritDoc} */
+ @Override public T call() throws Exception {
+ try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) {
+ return delegate.call();
+ }
+ }
+
+ /**
+ * Creates a wrapper that stores a specified {@link Callable} along with the {@link OperationContextSnapshot} of
+ * {@link OperationContext} bound to the thread when this method is called. Captured
+ * {@link OperationContextSnapshot} will be restored before {@link Callable} execution, potentially in another
+ * thread.
+ */
+ public static Callable wrap(Callable delegate) {
+ return wrap(delegate, OperationContextAwareCallable::new);
+ }
+
+ /**
+ * Creates a wrapper that stores a specified {@link Callable} along with the {@link OperationContextSnapshot} of
+ * {@link OperationContext} bound to the thread when this method is called. Captured
+ * {@link OperationContextSnapshot} will be restored before {@link Callable} execution, potentially in another
+ * thread.
+ * If {@link OperationContext} holds no data when this method is called, it does nothing and returns original
+ * {@link Callable}.
+ */
+ public static Callable wrapIfContextNotEmpty(Callable delegate) {
+ return wrap(delegate, OperationContextAwareCallable::new, true);
+ }
+
+ /** The same as {@link #wrap(Callable)} but wraps each specified {@link Callable}. */
+ public static Collection> wrap(Collection extends Callable> tasks) {
+ return tasks == null ? null : tasks.stream().map(OperationContextAwareCallable::wrap).collect(Collectors.toList());
+ }
+
+ /** The same as {@link #wrapIfContextNotEmpty(Callable)} but wraps each specified {@link Callable}. */
+ public static Collection> wrapIfContextNotEmpty(Collection extends Callable> tasks) {
+ return tasks == null ? null : tasks.stream().map(OperationContextAwareCallable::wrapIfContextNotEmpty).collect(Collectors.toList());
+ }
+}
diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareRunnable.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareRunnable.java
new file mode 100644
index 0000000000000..04f11ba710768
--- /dev/null
+++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareRunnable.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context.function;
+
+import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
+import org.apache.ignite.internal.thread.context.Scope;
+
+/** */
+public class OperationContextAwareRunnable extends OperationContextAwareWrapper implements Runnable {
+ /** */
+ public OperationContextAwareRunnable(Runnable delegate, OperationContextSnapshot snapshot) {
+ super(delegate, snapshot);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) {
+ delegate.run();
+ }
+ }
+
+ /**
+ * Creates a wrapper that stores a specified {@link Runnable} along with the {@link OperationContextSnapshot} of
+ * {@link OperationContext} bound to the thread when this method is called. Captured
+ * {@link OperationContextSnapshot} will be restored before {@link Runnable} execution, potentially in another
+ * thread.
+ */
+ public static Runnable wrap(Runnable delegate) {
+ return wrap(delegate, OperationContextAwareRunnable::new);
+ }
+
+ /**
+ * Creates a wrapper that stores a specified {@link Runnable} along with the {@link OperationContextSnapshot} of
+ * {@link OperationContext} bound to the thread when this method is called. Captured
+ * {@link OperationContextSnapshot} will be restored before {@link Runnable} execution, potentially in another
+ * thread.
+ * If {@link OperationContext} holds no data when this method is called, it does nothing and returns original
+ * {@link Runnable}.
+ */
+ public static Runnable wrapIfContextNotEmpty(Runnable delegate) {
+ return wrap(delegate, OperationContextAwareRunnable::new, true);
+ }
+}
diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java
new file mode 100644
index 0000000000000..a741d3b6d3cfc
--- /dev/null
+++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context.function;
+
+import java.util.function.BiFunction;
+import org.apache.ignite.internal.IgniteInternalWrapper;
+import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
+
+/** */
+abstract class OperationContextAwareWrapper implements IgniteInternalWrapper {
+ /** */
+ protected final T delegate;
+
+ /** */
+ protected final OperationContextSnapshot snapshot;
+
+ /** */
+ @Override public T delegate() {
+ return delegate;
+ }
+
+ /** */
+ protected OperationContextAwareWrapper(T delegate, OperationContextSnapshot snapshot) {
+ this.delegate = delegate;
+ this.snapshot = snapshot;
+ }
+
+ /** */
+ protected static T wrap(T delegate, BiFunction wrapper) {
+ return wrap(delegate, wrapper, false);
+ }
+
+ /** */
+ protected static T wrap(T delegate, BiFunction wrapper, boolean ignoreEmptyContext) {
+ if (delegate == null || delegate instanceof OperationContextAwareWrapper)
+ return delegate;
+
+ OperationContextSnapshot snapshot = OperationContext.createSnapshot();
+
+ if (ignoreEmptyContext && snapshot == null)
+ return delegate;
+
+ return wrapper.apply(delegate, snapshot);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java
index 2092e1256ad8c..9456ecfcd0ad7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java
@@ -27,8 +27,8 @@
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteScheduler;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.SecurityUtils;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
@@ -211,7 +211,7 @@ private SecurityAwareClosure(UUID secSubjId, Callable c) {
@Override public void run() {
assert runnable != null;
- try (OperationSecurityContext c = ctx.security().withContext(secSubjId)) {
+ try (Scope ignored = ctx.security().withContext(secSubjId)) {
runnable.run();
}
}
@@ -220,7 +220,7 @@ private SecurityAwareClosure(UUID secSubjId, Callable c) {
@Override public T call() throws Exception {
assert call != null;
- try (OperationSecurityContext c = ctx.security().withContext(secSubjId)) {
+ try (Scope ignored = ctx.security().withContext(secSubjId)) {
return call.call();
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwareBiPredicate.java b/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwareBiPredicate.java
index 5ae1ee1f673a2..51f75b823baaf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwareBiPredicate.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwareBiPredicate.java
@@ -21,8 +21,8 @@
import java.util.UUID;
import org.apache.ignite.internal.processors.security.AbstractSecurityAwareExternalizable;
import org.apache.ignite.internal.processors.security.IgniteSecurity;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.sandbox.IgniteSandbox;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.lang.IgniteBiPredicate;
/**
@@ -52,7 +52,7 @@ public SecurityAwareBiPredicate(UUID subjectId, IgniteBiPredicate origin
@Override public boolean apply(E1 e1, E2 e2) {
IgniteSecurity security = ignite.context().security();
- try (OperationSecurityContext c = security.withContext(subjectId)) {
+ try (Scope ignored = security.withContext(subjectId)) {
IgniteSandbox sandbox = security.sandbox();
return sandbox.enabled() ? sandbox.execute(() -> original.apply(e1, e2)) : original.apply(e1, e2);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwarePredicate.java b/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwarePredicate.java
index 4c412db01aeda..60723b00e8f93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwarePredicate.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwarePredicate.java
@@ -21,8 +21,8 @@
import java.util.UUID;
import org.apache.ignite.internal.processors.security.AbstractSecurityAwareExternalizable;
import org.apache.ignite.internal.processors.security.IgniteSecurity;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.sandbox.IgniteSandbox;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.lang.IgnitePredicate;
/**
@@ -52,7 +52,7 @@ public SecurityAwarePredicate(UUID subjectId, IgnitePredicate original) {
@Override public boolean apply(E evt) {
IgniteSecurity security = ignite.context().security();
- try (OperationSecurityContext c = security.withContext(subjectId)) {
+ try (Scope ignored = security.withContext(subjectId)) {
IgniteSandbox sandbox = security.sandbox();
return sandbox.enabled() ? sandbox.execute(() -> original.apply(evt)) : original.apply(evt);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/consistency/ConsistencyRepairTask.java b/modules/core/src/main/java/org/apache/ignite/internal/management/consistency/ConsistencyRepairTask.java
index 0a46c30d73bd7..8cbdf15845856 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/management/consistency/ConsistencyRepairTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/management/consistency/ConsistencyRepairTask.java
@@ -40,7 +40,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.near.consistency.IgniteIrreparableConsistencyViolationException;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
+import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.F;
@@ -273,7 +273,7 @@ private String processPartition(int p, ConsistencyRepairCommandArg arg) {
* @param keys Keys.
*/
private void repair(IgniteCache