diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java index 0c23f2146c007..7e64f3735759e 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java @@ -44,8 +44,8 @@ import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex; import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable; import org.apache.ignite.internal.processors.query.calcite.util.Commons; -import org.apache.ignite.internal.processors.security.OperationSecurityContext; import org.apache.ignite.internal.processors.security.SecurityContext; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -236,7 +236,7 @@ protected List> sql(String sql, Object... params) { protected List> sqlAsRoot(IgniteEx ignite, String sql) throws Exception { SecurityContext secCtx = authenticate(grid(0), DFAULT_USER_NAME, "ignite"); - try (OperationSecurityContext ignored = ignite.context().security().withContext(secCtx)) { + try (Scope ignored = ignite.context().security().withContext(secCtx)) { return sql(ignite, sql); } } diff --git a/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java b/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java index 02321a705a2d8..7634b43564594 100644 --- a/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java @@ -54,12 +54,12 @@ import org.apache.ignite.internal.management.cache.VerifyBackupPartitionsTask; import org.apache.ignite.internal.processors.security.AbstractSecurityTest; import org.apache.ignite.internal.processors.security.AbstractTestSecurityPluginProvider; -import org.apache.ignite.internal.processors.security.OperationSecurityContext; import org.apache.ignite.internal.processors.security.PublicAccessJob; import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.processors.security.compute.ComputePermissionCheckTest; import org.apache.ignite.internal.processors.security.impl.TestSecurityData; import org.apache.ignite.internal.processors.security.impl.TestSecurityPluginProvider; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.lang.ConsumerX; import org.apache.ignite.internal.util.lang.RunnableX; import org.apache.ignite.internal.util.lang.gridfunc.AtomicIntegerFactoryCallable; @@ -476,7 +476,7 @@ public void testSystemTaskCancel() throws Exception { SecurityContext initiatorSecCtx = securityContext("no-permissions-login-0"); SupplierX> starter = () -> { - try (OperationSecurityContext ignored1 = grid(0).context().security().withContext(initiatorSecCtx)) { + try (Scope ignored1 = grid(0).context().security().withContext(initiatorSecCtx)) { return new TestFutureAdapter<>( grid(0).context().closure().runAsync( BROADCAST, @@ -525,7 +525,7 @@ private void checkTaskCancel( assertTrue(taskStartedLatch.await(getTestTimeout(), MILLISECONDS)); try ( - OperationSecurityContext ignored = initiator == null + Scope ignored = initiator == null ? null : grid(0).context().security().withContext(initiator) ) { diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java new file mode 100644 index 0000000000000..6953d8b853891 --- /dev/null +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.thread.context; + +import java.util.ArrayList; +import java.util.List; +import org.apache.ignite.internal.thread.context.concurrent.OperationContextAwareExecutor; +import org.apache.ignite.internal.thread.context.function.OperationContextAwareCallable; +import org.apache.ignite.internal.thread.context.function.OperationContextAwareRunnable; +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.thread.context.Scope.NOOP_SCOPE; + +/** + * Represents a storage of {@link OperationContextAttribute}s and their corresponding values bound to the JVM thread. + * The state of {@link OperationContext} is determined by a sequence of {@link Update}s applied to it. Each Update + * stores the updated or newly added {@link OperationContextAttribute} values and link to the previous Update. + *
+ *         +-----------+   +-----------+
+ *         |           |   | A1 -> V2  |
+ * null <--| A1 -> V1  |<--|           |
+ *         |           |   | A2 -> V3  |
+ *         +-----------+   +-----------+
+ *
+ * {@link OperationContext} Updates can be undone in the same order they were applied by closing the {@link Scope} + * associated with each update (see {@link #set(OperationContextAttribute, Object)} and related methods). + *

+ * {@link OperationContext} bound to one JVM thread can be saved and restored in another thread using the snapshot + * mechanism (see {@link #createSnapshot()} and {@link #restoreSnapshot(OperationContextSnapshot) methods}). This + * provides basic functionality for implementing asynchronous executors that automatically propagate + * {@link OperationContext} data between JVM threads. + *

+ * + * @see Scope + * @see OperationContextSnapshot + * @see OperationContextAwareCallable + * @see OperationContextAwareRunnable + * @see OperationContextAwareExecutor + */ +public class OperationContext { + /** */ + private static final ThreadLocal INSTANCE = ThreadLocal.withInitial(OperationContext::new); + + /** + * Sequence of updated applied to the {@link OperationContext}. Each update holds a link to the previous Update, + * so we store only the reference to the last one. + */ + @Nullable private Update lastUpd; + + /** */ + private OperationContext() { + // No-op. + } + + /** + * Retrieves value associated with specified attribute by accessing {@link OperationContext} bound to the thread + * this method is called from. If no value is explicitly associated with specified attribute, + * {@link OperationContextAttribute#initialValue()} is returned. + * + * @param attr Context Attribute. + * @return Context Attribute Value. + */ + @Nullable public static T get(OperationContextAttribute attr) { + return INSTANCE.get().getInternal(attr); + } + + /** + * Updates the value of the specified attribute for the {@link OperationContext} bound to the thread this method + * is called from. + * + * @param attr Context Attribute. + * @return Scope instance that, when closed, undoes the applied update. It is crucial to undo all applied + * {@link OperationContext} updates to free up thread-bound resources and avoid memory leaks, so it is highly + * encouraged to use a try-with-resource block to close the returned Scope. Note, updates must be undone in the + * same order and in the same thread they were applied. + */ + public static Scope set(OperationContextAttribute attr, T val) { + OperationContext ctx = INSTANCE.get(); + + return ctx.getInternal(attr) == val ? NOOP_SCOPE : ctx.applyAttributeUpdates(new AttributeValueHolder<>(attr, val)); + } + + /** + * Updates the values of the specified attributes for the {@link OperationContext} bound to the thread this method + * is called from. + * + * @param attr1 First Context Attribute. + * @param val1 Values associated with first Context Attribute. + * @param attr2 Second Context Attribute. + * @param val2 Values associated with second Context Attribute. + * @return Scope instance that, when closed, undoes the applied update. It is crucial to undo all applied + * {@link OperationContext} updates to free up thread-bound resources and avoid memory leaks, so it is highly + * encouraged to use a try-with-resource block to close the returned Scope. Note, updates must be undone in the + * same order and in the same thread they were applied. + */ + public static Scope set( + OperationContextAttribute attr1, T1 val1, + OperationContextAttribute attr2, T2 val2 + ) { + return ContextUpdater.create().set(attr1, val1).set(attr2, val2).apply(); + } + + /** + * Updates the values of the specified attributes for the {@link OperationContext} bound to the thread this method + * is called from. + * + * @param attr1 First Context Attribute. + * @param val1 Values associated with first Context Attribute. + * @param attr2 Second Context Attribute. + * @param val2 Values associated with second Context Attribute. + * @param attr3 Third Context Attribute. + * @param val3 Values associated with third Context Attribute. + * @return Scope instance that, when closed, undoes the applied update. It is crucial to undo all applied + * {@link OperationContext} updates to free up thread-bound resources and avoid memory leaks, so it is highly + * encouraged to use a try-with-resource block to close the returned Scope. Note, updates must be undone in the + * same order and in the same thread they were applied. + */ + public static Scope set( + OperationContextAttribute attr1, T1 val1, + OperationContextAttribute attr2, T2 val2, + OperationContextAttribute attr3, T3 val3 + ) { + return ContextUpdater.create().set(attr1, val1).set(attr2, val2).set(attr3, val3).apply(); + } + + /** + * Creates Snapshot of all attributes and their corresponding values stored in the {@link OperationContext} bound + * the thread this method is called from. + * + * @return Context Snapshot. + */ + public static OperationContextSnapshot createSnapshot() { + return INSTANCE.get().createSnapshotInternal(); + } + + /** + * Restores values of all attributes for {@link OperationContext} bound to the thread this method is called from. + * + * @param snp Context Snapshot. + * @return Scope instance that, when closed, undoes the applied operation. It is crucial to undo all applied + * {@link OperationContext} updates to free up thread-bound resources and avoid memory leaks, so it is highly + * encouraged to use a try-with-resource block to close the returned Scope. Note, updates must be undone in the + * same order and in the same thread they were applied. + */ + public static Scope restoreSnapshot(OperationContextSnapshot snp) { + return INSTANCE.get().restoreSnapshotInternal(snp); + } + + /** + * Retrieves value for the specified attribute from the current {@link OperationContext}. If no value is explicitly + * associated with specified attribute, {@link OperationContextAttribute#initialValue()} is returned. + */ + @Nullable private T getInternal(OperationContextAttribute attr) { + if (lastUpd == null || (lastUpd.storedAttrBits & attr.bitmask()) == 0) + return attr.initialValue(); // OperationContext does not store value for the specified attribute. + + AttributeValueHolder valHolder = findAttributeValue(attr); + + assert valHolder != null; + assert valHolder.attr.equals(attr); + + return valHolder.val; + } + + /** Updates the current context with the specified attributes and their corresponding values. */ + private Scope applyAttributeUpdates(AttributeValueHolder... attrVals) { + lastUpd = new Update(attrVals, lastUpd); + + return lastUpd; + } + + /** Undoes the latest updated. */ + private void undo(Update upd) { + assert lastUpd == upd; + + lastUpd = lastUpd.prev; + } + + /** Iterates over the currently applied context updates and finds the latest value associated with the specified attribute. */ + private AttributeValueHolder findAttributeValue(OperationContextAttribute attr) { + for (Update upd = lastUpd; upd != null; upd = upd.prev) { + if (!upd.holdsValueFor(attr)) + continue; + + return upd.value(attr); + } + + return null; + } + + /** */ + private OperationContextSnapshot createSnapshotInternal() { + // The sequence of updates defines the state of the OperationContext. Each update is linked to the previous + // one and immutable. Therefore, to restore the context state elsewhere, we only need to share a reference to + // the most recent update. + return lastUpd; + } + + /** */ + private Scope restoreSnapshotInternal(OperationContextSnapshot newSnp) { + OperationContextSnapshot prevSnp = createSnapshotInternal(); + + if (newSnp == prevSnp) + return NOOP_SCOPE; + + changeState(prevSnp, newSnp); + + return () -> changeState(newSnp, prevSnp); + } + + /** */ + private void changeState(OperationContextSnapshot expState, OperationContextSnapshot newState) { + assert lastUpd == expState; + + lastUpd = (Update)newState; + } + + /** Represents Update applied to the {@link OperationContext}. */ + private class Update implements Scope, OperationContextSnapshot { + /** Updated attributes and their corresponding values. */ + private final AttributeValueHolder[] attrVals; + + /** + * Bits representing all attributes which values were changed by this update. + * + * @see OperationContextAttribute#bitmask() + */ + private final int updAttrBits; + + /** + * Bits representing all attributes stored in the current {@link OperationContext} after this Update and all + * preceding are applied. We need this for two purposes: + *
    + *
  • fast check whether any of the currently applied {@link OperationContext} Updates store value for the + * particular attribute
  • + *
  • do not recalculate state of all attributes when update is undone
  • + *
+ * + * @see OperationContextAttribute#bitmask() + */ + private final int storedAttrBits; + + /** Link to the previous update. */ + private final Update prev; + + /** */ + Update(AttributeValueHolder[] attrVals, Update prev) { + this.attrVals = attrVals; + this.prev = prev; + + updAttrBits = mergeUpdatedAttributeBits(attrVals); + storedAttrBits = prev == null ? updAttrBits : prev.storedAttrBits | updAttrBits; + } + + /** @return Whether current update contains value for the specified attribute. */ + boolean holdsValueFor(OperationContextAttribute attr) { + return (updAttrBits & attr.bitmask()) != 0; + } + + /** + * @return Attribute value that was set by the current update for the specified attribute. {@code null} if + * specified Attribute was not changed by this update. + */ + @Nullable AttributeValueHolder value(OperationContextAttribute attr) { + // We iterate in reverse order to correctly handle the case when the value for the same attribute is + // specified multiple times. + for (int i = attrVals.length - 1; i >= 0; i--) { + AttributeValueHolder valHolder = attrVals[i]; + + if (valHolder.attr.equals(attr)) + return ((AttributeValueHolder)valHolder); + } + + return null; + } + + /** */ + private int mergeUpdatedAttributeBits(AttributeValueHolder[] attrVals) { + int res = 0; + + for (AttributeValueHolder valHolder : attrVals) + res |= valHolder.attr.bitmask(); + + return res; + } + + /** */ + @Override public void close() { + undo(this); + } + } + + /** Immutable container that stores an attribute and its corresponding value. */ + private static class AttributeValueHolder { + /** */ + private final OperationContextAttribute attr; + + /** */ + private final T val; + + /** */ + AttributeValueHolder(OperationContextAttribute attr, T val) { + this.attr = attr; + this.val = val; + } + } + + /** Allows to change multiple attribute values in a single update operation and skip updates that changes nothing. */ + private static class ContextUpdater { + /** */ + private static final int INIT_UPDATES_CAPACITY = 3; + + /** */ + private final OperationContext ctx; + + /** */ + private List> updates; + + /** */ + private ContextUpdater(OperationContext ctx) { + this.ctx = ctx; + } + + /** */ + ContextUpdater set(OperationContextAttribute attr, T val) { + if (ctx.getInternal(attr) == val) + return this; + + if (updates == null) + updates = new ArrayList<>(INIT_UPDATES_CAPACITY); + + updates.add(new AttributeValueHolder<>(attr, val)); + + return this; + } + + /** */ + Scope apply() { + if (F.isEmpty(updates)) + return NOOP_SCOPE; + + AttributeValueHolder[] sealedUpdates = new AttributeValueHolder[updates.size()]; + + updates.toArray(sealedUpdates); + + return ctx.applyAttributeUpdates(sealedUpdates); + } + + /** */ + static ContextUpdater create() { + return new ContextUpdater(INSTANCE.get()); + } + } +} diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java new file mode 100644 index 0000000000000..499d241d9ccba --- /dev/null +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextAttribute.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.thread.context; + +import java.util.concurrent.atomic.AtomicInteger; +import org.jetbrains.annotations.Nullable; + +/** + * Represents a key to access and modify {@link OperationContext} records. + * + * @see OperationContext + * @see OperationContext#get(OperationContextAttribute) + * @see OperationContext#set(OperationContextAttribute, Object) + */ +public class OperationContextAttribute { + /** */ + static final AtomicInteger ID_GEN = new AtomicInteger(); + + /** */ + static final int MAX_ATTR_CNT = Integer.SIZE; + + /** */ + private final int bitmask; + + /** */ + @Nullable private final T initVal; + + /** */ + private OperationContextAttribute(int bitmask, @Nullable T initVal) { + this.bitmask = bitmask; + this.initVal = initVal; + } + + /** + * Initial Value associated with the current Attribute. Initial value will be automatically returned by the + * {@link OperationContext#get} method if Attribute's value has not been previously set. + * @see OperationContext#get(OperationContextAttribute) + */ + @Nullable public T initialValue() { + return initVal; + } + + /** + * Unique attribute bitmask calculated by shifting one from 0 to {@link Integer#SIZE}. It provides an ability to + * use {@link OperationContext} Attribute with bit fields. + */ + int bitmask() { + return bitmask; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object other) { + if (this == other) + return true; + + if (!(other instanceof OperationContextAttribute)) + return false; + + return bitmask == ((OperationContextAttribute)other).bitmask; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return bitmask; + } + + /** + * Creates new instance of the {@link OperationContext} Attribute with Initial Value set to {@code null}. + *

+ * Note, that the maximum number of attribute instances that can be created is currently limited to + * {@link #MAX_ATTR_CNT} for implementation reasons. + *

+ */ + public static OperationContextAttribute newInstance() { + return newInstance(null); + } + + /** + * Creates new instance of the {@link OperationContext} Attribute with the specified Initial Value. The Initial + * Value is returned by {@link OperationContext#get} method if the Attribute's value is not explicitly set in the + * {@link OperationContext}. + *

+ * Note, that the maximum number of attribute instances that can be created is currently limited to + * {@link #MAX_ATTR_CNT} for implementation reasons. + *

+ */ + public static OperationContextAttribute newInstance(T initVal) { + int id = ID_GEN.getAndIncrement(); + + assert id < MAX_ATTR_CNT : "Exceeded maximum supported number of created Attributes instances [maxCnt=" + MAX_ATTR_CNT + ']'; + + return new OperationContextAttribute<>(1 << id, initVal); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/OperationSecurityContext.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextSnapshot.java similarity index 51% rename from modules/core/src/main/java/org/apache/ignite/internal/processors/security/OperationSecurityContext.java rename to modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextSnapshot.java index 7f65d942948ad..d863aa3f11a39 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/OperationSecurityContext.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContextSnapshot.java @@ -15,32 +15,22 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.security; +package org.apache.ignite.internal.thread.context; + +import org.apache.ignite.internal.thread.context.function.OperationContextAwareCallable; +import org.apache.ignite.internal.thread.context.function.OperationContextAwareRunnable; /** + * Represents snapshot of all Attributes and their corresponding values for a particular {@link OperationContext} + * instance. Its main purpose to save {@link OperationContext} state and restore it later, possible for + * {@link OperationContext} bound to another thread. * + * @see OperationContext + * @see OperationContext#createSnapshot() + * @see OperationContext#restoreSnapshot(OperationContextSnapshot) + * @see OperationContextAwareCallable + * @see OperationContextAwareRunnable */ -public class OperationSecurityContext implements AutoCloseable { - /** Ignite Security. */ - private final IgniteSecurity proc; - - /** Security context. */ - private final SecurityContext secCtx; - - /** - * @param proc Ignite Security. - * @param secCtx Security context. - */ - OperationSecurityContext(IgniteSecurity proc, SecurityContext secCtx) { - this.proc = proc; - this.secCtx = secCtx; - } - - /** {@inheritDoc} */ - @Override public void close() { - if (secCtx == null) - ((IgniteSecurityProcessor)proc).restoreDefaultContext(); - else - proc.withContext(secCtx); - } +public interface OperationContextSnapshot { + // No-op. } diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/Scope.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/Scope.java new file mode 100644 index 0000000000000..a48cf2e57aedb --- /dev/null +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/Scope.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.thread.context; + +/** + * Represents the Scope of {@link OperationContext} attributes update. Explicitly calling {@link #close()} method undoes + * the applied changes and restores previous attribute values, if any. Note that every Scope relating to a specific + * {@link OperationContext} update must be closed to free up thread-bound resources and avoid memory leaks, so it is + * highly encouraged to use a try-with-resource block with Scope instances. + *

+ * Scope is result of the following {@link OperationContext} update operations: + *

    + *
  • {@link OperationContext#set(OperationContextAttribute, Object)} - creates a new or update an existing mapping + * between specified {@link OperationContextAttribute} and its value
  • + *
  • {@link OperationContext#restoreSnapshot(OperationContextSnapshot)} - updates {@link OperationContextAttribute} + * values to match the values stored in {@link OperationContextSnapshot}
  • + *
+ *

+ * + * @see OperationContext#set(OperationContextAttribute, Object) + * @see OperationContext#restoreSnapshot(OperationContextSnapshot) + */ +public interface Scope extends AutoCloseable { + /** Scope instance that does nothing when closed. */ + Scope NOOP_SCOPE = () -> {}; + + /** Closes the scope. This operation cannot fail. */ + @Override void close(); +} diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/concurrent/OperationContextAwareExecutor.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/concurrent/OperationContextAwareExecutor.java new file mode 100644 index 0000000000000..22ef5141739d5 --- /dev/null +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/concurrent/OperationContextAwareExecutor.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.thread.context.concurrent; + +import java.util.concurrent.Executor; +import org.apache.ignite.internal.thread.context.OperationContext; +import org.apache.ignite.internal.thread.context.OperationContextSnapshot; +import org.apache.ignite.internal.thread.context.function.OperationContextAwareRunnable; +import org.jetbrains.annotations.NotNull; + +/** */ +public class OperationContextAwareExecutor implements Executor { + /** */ + private final Executor delegate; + + /** */ + private OperationContextAwareExecutor(Executor delegate) { + this.delegate = delegate; + } + + /** {@inheritDoc} */ + @Override public void execute(@NotNull Runnable command) { + delegate.execute(OperationContextAwareRunnable.wrap(command)); + } + + /** + * Creates executor wrapper that automatically captures {@link OperationContextSnapshot} of {@link OperationContext} + * for the thread that invokes task execution. Captured {@link OperationContextSnapshot} will be restored before + * task execution, potentially in another thread. + */ + public static Executor wrap(Executor delegate) { + return delegate == null ? null : new OperationContextAwareExecutor(delegate); + } +} diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareCallable.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareCallable.java new file mode 100644 index 0000000000000..909a4d88dcb83 --- /dev/null +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareCallable.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.thread.context.function; + +import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.stream.Collectors; +import org.apache.ignite.internal.thread.context.OperationContext; +import org.apache.ignite.internal.thread.context.OperationContextSnapshot; +import org.apache.ignite.internal.thread.context.Scope; + +/** */ +public class OperationContextAwareCallable extends OperationContextAwareWrapper> implements Callable { + /** */ + private OperationContextAwareCallable(Callable delegate, OperationContextSnapshot snapshot) { + super(delegate, snapshot); + } + + /** {@inheritDoc} */ + @Override public T call() throws Exception { + try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) { + return delegate.call(); + } + } + + /** + * Creates a wrapper that stores a specified {@link Callable} along with the {@link OperationContextSnapshot} of + * {@link OperationContext} bound to the thread when this method is called. Captured + * {@link OperationContextSnapshot} will be restored before {@link Callable} execution, potentially in another + * thread. + */ + public static Callable wrap(Callable delegate) { + return wrap(delegate, OperationContextAwareCallable::new); + } + + /** + * Creates a wrapper that stores a specified {@link Callable} along with the {@link OperationContextSnapshot} of + * {@link OperationContext} bound to the thread when this method is called. Captured + * {@link OperationContextSnapshot} will be restored before {@link Callable} execution, potentially in another + * thread. + * If {@link OperationContext} holds no data when this method is called, it does nothing and returns original + * {@link Callable}. + */ + public static Callable wrapIfContextNotEmpty(Callable delegate) { + return wrap(delegate, OperationContextAwareCallable::new, true); + } + + /** The same as {@link #wrap(Callable)} but wraps each specified {@link Callable}. */ + public static Collection> wrap(Collection> tasks) { + return tasks == null ? null : tasks.stream().map(OperationContextAwareCallable::wrap).collect(Collectors.toList()); + } + + /** The same as {@link #wrapIfContextNotEmpty(Callable)} but wraps each specified {@link Callable}. */ + public static Collection> wrapIfContextNotEmpty(Collection> tasks) { + return tasks == null ? null : tasks.stream().map(OperationContextAwareCallable::wrapIfContextNotEmpty).collect(Collectors.toList()); + } +} diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareRunnable.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareRunnable.java new file mode 100644 index 0000000000000..04f11ba710768 --- /dev/null +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareRunnable.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.thread.context.function; + +import org.apache.ignite.internal.thread.context.OperationContext; +import org.apache.ignite.internal.thread.context.OperationContextSnapshot; +import org.apache.ignite.internal.thread.context.Scope; + +/** */ +public class OperationContextAwareRunnable extends OperationContextAwareWrapper implements Runnable { + /** */ + public OperationContextAwareRunnable(Runnable delegate, OperationContextSnapshot snapshot) { + super(delegate, snapshot); + } + + /** {@inheritDoc} */ + @Override public void run() { + try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) { + delegate.run(); + } + } + + /** + * Creates a wrapper that stores a specified {@link Runnable} along with the {@link OperationContextSnapshot} of + * {@link OperationContext} bound to the thread when this method is called. Captured + * {@link OperationContextSnapshot} will be restored before {@link Runnable} execution, potentially in another + * thread. + */ + public static Runnable wrap(Runnable delegate) { + return wrap(delegate, OperationContextAwareRunnable::new); + } + + /** + * Creates a wrapper that stores a specified {@link Runnable} along with the {@link OperationContextSnapshot} of + * {@link OperationContext} bound to the thread when this method is called. Captured + * {@link OperationContextSnapshot} will be restored before {@link Runnable} execution, potentially in another + * thread. + * If {@link OperationContext} holds no data when this method is called, it does nothing and returns original + * {@link Runnable}. + */ + public static Runnable wrapIfContextNotEmpty(Runnable delegate) { + return wrap(delegate, OperationContextAwareRunnable::new, true); + } +} diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java new file mode 100644 index 0000000000000..a741d3b6d3cfc --- /dev/null +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.thread.context.function; + +import java.util.function.BiFunction; +import org.apache.ignite.internal.IgniteInternalWrapper; +import org.apache.ignite.internal.thread.context.OperationContext; +import org.apache.ignite.internal.thread.context.OperationContextSnapshot; + +/** */ +abstract class OperationContextAwareWrapper implements IgniteInternalWrapper { + /** */ + protected final T delegate; + + /** */ + protected final OperationContextSnapshot snapshot; + + /** */ + @Override public T delegate() { + return delegate; + } + + /** */ + protected OperationContextAwareWrapper(T delegate, OperationContextSnapshot snapshot) { + this.delegate = delegate; + this.snapshot = snapshot; + } + + /** */ + protected static T wrap(T delegate, BiFunction wrapper) { + return wrap(delegate, wrapper, false); + } + + /** */ + protected static T wrap(T delegate, BiFunction wrapper, boolean ignoreEmptyContext) { + if (delegate == null || delegate instanceof OperationContextAwareWrapper) + return delegate; + + OperationContextSnapshot snapshot = OperationContext.createSnapshot(); + + if (ignoreEmptyContext && snapshot == null) + return delegate; + + return wrapper.apply(delegate, snapshot); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java index 2092e1256ad8c..9456ecfcd0ad7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java @@ -27,8 +27,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.apache.ignite.IgniteScheduler; -import org.apache.ignite.internal.processors.security.OperationSecurityContext; import org.apache.ignite.internal.processors.security.SecurityUtils; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.lang.GridPlainCallable; import org.apache.ignite.internal.util.lang.GridPlainRunnable; @@ -211,7 +211,7 @@ private SecurityAwareClosure(UUID secSubjId, Callable c) { @Override public void run() { assert runnable != null; - try (OperationSecurityContext c = ctx.security().withContext(secSubjId)) { + try (Scope ignored = ctx.security().withContext(secSubjId)) { runnable.run(); } } @@ -220,7 +220,7 @@ private SecurityAwareClosure(UUID secSubjId, Callable c) { @Override public T call() throws Exception { assert call != null; - try (OperationSecurityContext c = ctx.security().withContext(secSubjId)) { + try (Scope ignored = ctx.security().withContext(secSubjId)) { return call.call(); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwareBiPredicate.java b/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwareBiPredicate.java index 5ae1ee1f673a2..51f75b823baaf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwareBiPredicate.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwareBiPredicate.java @@ -21,8 +21,8 @@ import java.util.UUID; import org.apache.ignite.internal.processors.security.AbstractSecurityAwareExternalizable; import org.apache.ignite.internal.processors.security.IgniteSecurity; -import org.apache.ignite.internal.processors.security.OperationSecurityContext; import org.apache.ignite.internal.processors.security.sandbox.IgniteSandbox; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.lang.IgniteBiPredicate; /** @@ -52,7 +52,7 @@ public SecurityAwareBiPredicate(UUID subjectId, IgniteBiPredicate origin @Override public boolean apply(E1 e1, E2 e2) { IgniteSecurity security = ignite.context().security(); - try (OperationSecurityContext c = security.withContext(subjectId)) { + try (Scope ignored = security.withContext(subjectId)) { IgniteSandbox sandbox = security.sandbox(); return sandbox.enabled() ? sandbox.execute(() -> original.apply(e1, e2)) : original.apply(e1, e2); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwarePredicate.java b/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwarePredicate.java index 4c412db01aeda..60723b00e8f93 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwarePredicate.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/SecurityAwarePredicate.java @@ -21,8 +21,8 @@ import java.util.UUID; import org.apache.ignite.internal.processors.security.AbstractSecurityAwareExternalizable; import org.apache.ignite.internal.processors.security.IgniteSecurity; -import org.apache.ignite.internal.processors.security.OperationSecurityContext; import org.apache.ignite.internal.processors.security.sandbox.IgniteSandbox; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.lang.IgnitePredicate; /** @@ -52,7 +52,7 @@ public SecurityAwarePredicate(UUID subjectId, IgnitePredicate original) { @Override public boolean apply(E evt) { IgniteSecurity security = ignite.context().security(); - try (OperationSecurityContext c = security.withContext(subjectId)) { + try (Scope ignored = security.withContext(subjectId)) { IgniteSandbox sandbox = security.sandbox(); return sandbox.enabled() ? sandbox.execute(() -> original.apply(evt)) : original.apply(evt); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/consistency/ConsistencyRepairTask.java b/modules/core/src/main/java/org/apache/ignite/internal/management/consistency/ConsistencyRepairTask.java index 0a46c30d73bd7..8cbdf15845856 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/management/consistency/ConsistencyRepairTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/consistency/ConsistencyRepairTask.java @@ -40,7 +40,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.near.consistency.IgniteIrreparableConsistencyViolationException; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; -import org.apache.ignite.internal.processors.security.OperationSecurityContext; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.typedef.F; @@ -273,7 +273,7 @@ private String processPartition(int p, ConsistencyRepairCommandArg arg) { * @param keys Keys. */ private void repair(IgniteCache cache, Set keys) { - try (OperationSecurityContext ignored = ignite.context().security().withContext(ignite.localNode().id())) { + try (Scope ignored = ignite.context().security().withContext(ignite.localNode().id())) { cache.getAll(keys); // Repair. } catch (CacheException e) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 633ebef235b02..dddb30bb6ae59 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -99,12 +99,12 @@ import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter; import org.apache.ignite.internal.processors.pool.PoolProcessor; -import org.apache.ignite.internal.processors.security.OperationSecurityContext; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.tracing.MTC; import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings; import org.apache.ignite.internal.processors.tracing.Span; import org.apache.ignite.internal.processors.tracing.SpanTags; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.StripedCompositeReadWriteLock; @@ -1876,7 +1876,7 @@ private void invokeListener(Byte plc, GridMessageListener lsnr, UUID nodeId, Obj UUID newSecSubjId = secSubjId != null ? secSubjId : nodeId; - try (OperationSecurityContext s = ctx.security().withContext(newSecSubjId)) { + try (Scope ignored = ctx.security().withContext(newSecSubjId)) { lsnr.onMessage(nodeId, msg, plc); } finally { @@ -3644,7 +3644,7 @@ private class GridUserMessageListener implements GridMessageListener { if (msgBody != null) { if (predLsnr != null) { - try (OperationSecurityContext s = ctx.security().withContext(initNodeId)) { + try (Scope ignored = ctx.security().withContext(initNodeId)) { if (!predLsnr.apply(nodeId, msgBody)) removeMessageListener(TOPIC_COMM_USER, this); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index f3b93fe947e51..de9a08f18f316 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -91,12 +91,12 @@ import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.processors.cluster.IGridClusterStateProcessor; import org.apache.ignite.internal.processors.security.IgniteSecurity; -import org.apache.ignite.internal.processors.security.OperationSecurityContext; import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.processors.tracing.messages.SpanContainer; import org.apache.ignite.internal.systemview.ClusterNodeViewWalker; import org.apache.ignite.internal.systemview.NodeAttributeViewWalker; import org.apache.ignite.internal.systemview.NodeMetricsViewWalker; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; import org.apache.ignite.internal.util.GridSpinBusyLock; @@ -938,7 +938,7 @@ public SecurityAwareNotificationTask(DiscoveryNotification notification) { if (customMsg instanceof SecurityAwareCustomMessageWrapper) { UUID secSubjId = ((SecurityAwareCustomMessageWrapper)customMsg).securitySubjectId(); - try (OperationSecurityContext ignored = ctx.security().withContext(secSubjId)) { + try (Scope ignored = ctx.security().withContext(secSubjId)) { super.run(); } } @@ -949,7 +949,7 @@ public SecurityAwareNotificationTask(DiscoveryNotification notification) { notification.getNode() ); - try (OperationSecurityContext ignored = ctx.security().withContext(initiatorNodeSecCtx)) { + try (Scope ignored = ctx.security().withContext(initiatorNodeSecCtx)) { super.run(); } } @@ -3119,7 +3119,7 @@ private void body0() throws InterruptedException { blockingSectionEnd(); } - try (OperationSecurityContext ignored = withRemoteSecurityContext(ctx, evt.secCtx)) { + try (Scope ignored = withRemoteSecurityContext(ctx, evt.secCtx)) { int type = evt.type; AffinityTopologyVersion topVer = evt.topVer; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index e0fd3c3ff4296..90a9972779d94 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -112,9 +112,9 @@ import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter; import org.apache.ignite.internal.processors.platform.client.cache.ImmutableArrayMap; import org.apache.ignite.internal.processors.platform.client.cache.ImmutableArraySet; -import org.apache.ignite.internal.processors.security.OperationSecurityContext; import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.processors.task.GridInternal; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; @@ -3914,7 +3914,7 @@ else if (ctx.gate().isStopped()) ctx.operationContextPerCall(opCtx); ctx.shared().txContextReset(); - try (OperationSecurityContext ignored = ctx.kernalContext().security().withContext(secCtx)) { + try (Scope ignored = ctx.kernalContext().security().withContext(secCtx)) { opFut = op.op(tx0).chain(clo); } catch (Throwable e) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index bbc2e78bbb4c0..5918023afff9b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -108,11 +108,11 @@ import org.apache.ignite.internal.processors.metric.impl.BooleanMetricImpl; import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl; import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchangeWorkerTask; -import org.apache.ignite.internal.processors.security.OperationSecurityContext; import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.tracing.Span; import org.apache.ignite.internal.processors.tracing.SpanTags; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.GridListSet; import org.apache.ignite.internal.util.GridPartitionStateMap; import org.apache.ignite.internal.util.GridStringBuilder; @@ -3065,7 +3065,7 @@ private void body0() throws InterruptedException, IgniteCheckedException { if (task == null) continue; // Main while loop. - try (OperationSecurityContext c = withRemoteSecurityContext(cctx.kernalContext(), task.securityContext())) { + try (Scope ignored = withRemoteSecurityContext(cctx.kernalContext(), task.securityContext())) { if (!isExchangeTask(task)) { processCustomTask(task); @@ -3191,7 +3191,7 @@ else if (task instanceof ForceRebalanceExchangeTask) { break; } - catch (IgniteFutureTimeoutCheckedException ignored) { + catch (IgniteFutureTimeoutCheckedException ignoredEx) { updateHeartbeat(); if (nextDumpTime <= U.currentTimeMillis()) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java index 96c6d8e8aff50..6b420ad7f14e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java @@ -52,8 +52,8 @@ import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor; import org.apache.ignite.internal.processors.query.QuerySchemaPatch; import org.apache.ignite.internal.processors.query.QueryUtils; -import org.apache.ignite.internal.processors.security.OperationSecurityContext; import org.apache.ignite.internal.processors.security.SecurityContext; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; @@ -153,7 +153,7 @@ public class ValidationOnNodeJoinUtils { for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : nodeData.caches().values()) { if (secCtx != null && cacheInfo.cacheType() == CacheType.USER) { - try (OperationSecurityContext s = ctx.security().withContext(secCtx)) { + try (Scope ignored = ctx.security().withContext(secCtx)) { GridCacheProcessor.authorizeCacheCreate(ctx.security(), cacheInfo.cacheData().config()); } catch (SecurityException ex) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareFilter.java index 1f15faa34d8b6..91d53798b0e0c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareFilter.java @@ -25,8 +25,8 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.internal.processors.security.AbstractSecurityAwareExternalizable; import org.apache.ignite.internal.processors.security.IgniteSecurity; -import org.apache.ignite.internal.processors.security.OperationSecurityContext; import org.apache.ignite.internal.processors.security.sandbox.IgniteSandbox; +import org.apache.ignite.internal.thread.context.Scope; /** * Security aware remote filter. @@ -60,7 +60,7 @@ public SecurityAwareFilter(UUID subjectId, CacheEntryEventFilter original) IgniteSecurity security = ignite.context().security(); - try (OperationSecurityContext c = security.withContext(subjectId)) { + try (Scope ignored = security.withContext(subjectId)) { IgniteSandbox sandbox = security.sandbox(); return sandbox.enabled() ? sandbox.execute(() -> original.evaluate(evt)) : original.evaluate(evt); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareTransformerFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareTransformerFactory.java index 8e4e8585304c5..9e958a4ff5aaf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareTransformerFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/SecurityAwareTransformerFactory.java @@ -22,8 +22,8 @@ import javax.cache.configuration.Factory; import org.apache.ignite.internal.processors.security.AbstractSecurityAwareExternalizable; import org.apache.ignite.internal.processors.security.IgniteSecurity; -import org.apache.ignite.internal.processors.security.OperationSecurityContext; import org.apache.ignite.internal.processors.security.sandbox.IgniteSandbox; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.lang.IgniteClosure; /** @@ -58,7 +58,7 @@ public SecurityAwareTransformerFactory(UUID subjectId, Factory cl.apply(e)) : cl.apply(e); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java index 3f481dd8eff07..90f32db551fff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java @@ -50,12 +50,12 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; import org.apache.ignite.internal.processors.query.GridQueryProcessor; -import org.apache.ignite.internal.processors.security.OperationSecurityContext; import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.processors.security.SecurityUtils; import org.apache.ignite.internal.processors.service.GridServiceNotFoundException; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.LT; @@ -540,7 +540,7 @@ private void execute0(boolean skipNtf) { SqlFieldsQuery.setThreadedQueryInitiatorId("task:" + ses.getTaskName() + ":" + getJobId()); - try (OperationSecurityContext ignored = ctx.security().withContext(secCtx)) { + try (Scope ignored = ctx.security().withContext(secCtx)) { if (partsReservation != null) { try { if (!partsReservation.reserve()) { @@ -760,7 +760,7 @@ public void cancel(boolean sys) { status = CANCELLED; U.wrapThreadLoader(dep.classLoader(), (IgniteRunnable)() -> { - try (OperationSecurityContext c = ctx.security().withContext(secCtx)) { + try (Scope ignored = ctx.security().withContext(secCtx)) { job0.cancel(); } }); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java index f56867a0eebbf..3d6f6ab75c3be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java @@ -39,7 +39,7 @@ import org.apache.ignite.internal.processors.odbc.odbc.OdbcConnectionContext; import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext; import org.apache.ignite.internal.processors.platform.client.ClientStatus; -import org.apache.ignite.internal.processors.security.OperationSecurityContext; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter; import org.apache.ignite.internal.util.nio.GridNioSession; @@ -230,7 +230,7 @@ public ClientListenerNioListener( ClientListenerResponse resp; - try (OperationSecurityContext ignored = ctx.security().withContext(connCtx.securityContext())) { + try (Scope ignored = ctx.security().withContext(connCtx.securityContext())) { resp = hnd.handle(req); } @@ -569,7 +569,7 @@ private void ensureConnectionAllowed(ClientListenerConnectionContext connCtx) th // When security is enabled, only an administrator can connect and execute commands. if (connCtx.securityContext() != null) { - try (OperationSecurityContext ignored = ctx.security().withContext(connCtx.securityContext())) { + try (Scope ignored = ctx.security().withContext(connCtx.securityContext())) { ctx.security().authorize(SecurityPermission.ADMIN_OPS); } catch (SecurityException e) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java index 7c40e9d583cf7..79b31b9f07205 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java @@ -43,12 +43,12 @@ import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; import org.apache.ignite.internal.processors.security.IgniteSecurity; -import org.apache.ignite.internal.processors.security.thread.SecurityAwareIoPool; -import org.apache.ignite.internal.processors.security.thread.SecurityAwareStripedExecutor; -import org.apache.ignite.internal.processors.security.thread.SecurityAwareStripedThreadPoolExecutor; -import org.apache.ignite.internal.processors.security.thread.SecurityAwareThreadPoolExecutor; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.systemview.StripedExecutorTaskViewWalker; +import org.apache.ignite.internal.thread.pool.OperationContextAwareIoPool; +import org.apache.ignite.internal.thread.pool.OperationContextAwareStripedExecutor; +import org.apache.ignite.internal.thread.pool.OperationContextAwareStripedThreadPoolExecutor; +import org.apache.ignite.internal.thread.pool.OperationContextAwareThreadPoolExecutor; import org.apache.ignite.internal.util.StripedExecutor; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; @@ -280,7 +280,7 @@ public PoolProcessor(GridKernalContext ctx) { throw new IgniteException("Failed to register IO executor pool because its ID as " + "already used: " + id); - extPools[id] = ctx.security().enabled() ? new SecurityAwareIoPool(ctx.security(), ex) : ex; + extPools[id] = ctx.security().enabled() ? OperationContextAwareIoPool.wrap(ex) : ex; } } } @@ -1216,8 +1216,7 @@ private IgniteStripedThreadPoolExecutor createStripedThreadPoolExecutor( long keepAliveTime ) { return ctx.security().enabled() - ? new SecurityAwareStripedThreadPoolExecutor( - ctx.security(), + ? new OperationContextAwareStripedThreadPoolExecutor( concurrentLvl, igniteInstanceName, threadNamePrefix, @@ -1245,8 +1244,7 @@ private StripedExecutor createStripedExecutor( long failureDetectionTimeout ) { return ctx.security().enabled() - ? new SecurityAwareStripedExecutor( - ctx.security(), + ? new OperationContextAwareStripedExecutor( cnt, igniteInstanceName, poolName, @@ -1270,8 +1268,7 @@ private IgniteThreadPoolExecutor createExecutorService( UncaughtExceptionHandler eHnd ) { return ctx.security().enabled() - ? new SecurityAwareThreadPoolExecutor( - ctx.security(), + ? new OperationContextAwareThreadPoolExecutor( threadNamePrefix, igniteInstanceName, corePoolSize, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java index 0c521ceef9b73..f26b3af040e2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java @@ -68,8 +68,8 @@ import org.apache.ignite.internal.processors.rest.request.GridRestRequest; import org.apache.ignite.internal.processors.rest.request.GridRestTaskRequest; import org.apache.ignite.internal.processors.rest.request.RestQueryRequest; -import org.apache.ignite.internal.processors.security.OperationSecurityContext; import org.apache.ignite.internal.processors.security.SecurityContext; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.GridSpinReadWriteLock; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.typedef.C1; @@ -290,7 +290,7 @@ else if (!(req instanceof GridRestAuthenticationRequest) && startLatch.getCount( if (secCtx0 == null || ses.isTokenExpired(sesTokTtl)) ses.secCtx = secCtx0 = authenticate(req, ses); - try (OperationSecurityContext s = ctx.security().withContext(secCtx0)) { + try (Scope ignored = ctx.security().withContext(secCtx0)) { authorize(req); return handleRequest0(req); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurity.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurity.java index bdcaa966de1cd..b673aaa383d9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurity.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurity.java @@ -22,6 +22,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.security.sandbox.IgniteSandbox; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.plugin.security.AuthenticationContext; import org.apache.ignite.plugin.security.SecurityCredentials; import org.apache.ignite.plugin.security.SecurityException; @@ -42,30 +43,30 @@ */ public interface IgniteSecurity { /** - * Creates {@link OperationSecurityContext}. All calls of methods {@link #authorize(String, SecurityPermission)} or {@link + * Creates {@link Scope}. All calls of methods {@link #authorize(String, SecurityPermission)} or {@link * #authorize(SecurityPermission)} will be processed into the context of passed {@link SecurityContext} until - * holder {@link OperationSecurityContext} will be closed. + * holder {@link Scope} will be closed. * * @param secCtx Security Context. * @return Security context holder. */ - public OperationSecurityContext withContext(SecurityContext secCtx); + public Scope withContext(SecurityContext secCtx); /** - * Creates {@link OperationSecurityContext}. All calls of methods {@link #authorize(String, SecurityPermission)} or {@link + * Creates {@link Scope}. All calls of methods {@link #authorize(String, SecurityPermission)} or {@link * #authorize(SecurityPermission)} will be processed into the context of {@link SecurityContext} that is owned by - * the node with given nodeId until holder {@link OperationSecurityContext} will be closed. + * the node with given nodeId until holder {@link Scope} will be closed. * * @param nodeId Node id. * @return Security context holder. */ - public OperationSecurityContext withContext(UUID nodeId); + public Scope withContext(UUID nodeId); /** @return {@code True} if current thread executed in default security context. */ public boolean isDefaultContext(); /** - * @return SecurityContext of holder {@link OperationSecurityContext}. + * @return SecurityContext of holder {@link Scope}. */ public SecurityContext securityContext(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessor.java index 39721c717f1f4..7b34ed75db2dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessor.java @@ -32,6 +32,9 @@ import org.apache.ignite.internal.processors.security.sandbox.AccessControllerSandbox; import org.apache.ignite.internal.processors.security.sandbox.IgniteSandbox; import org.apache.ignite.internal.processors.security.sandbox.NoOpSandbox; +import org.apache.ignite.internal.thread.context.OperationContext; +import org.apache.ignite.internal.thread.context.OperationContextAttribute; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; @@ -85,8 +88,8 @@ static boolean hasSandboxedNodes() { return SANDBOXED_NODES_COUNTER.get() > 0; } - /** Current security context if differs from {@link #dfltSecCtx}. */ - private final ThreadLocal curSecCtx = new ThreadLocal<>(); + /** Context attribute that holds Security Context. */ + private static final OperationContextAttribute SEC_CTX = OperationContextAttribute.newInstance(); /** Security processor. */ private final GridSecurityProcessor secPrc; @@ -106,13 +109,6 @@ static boolean hasSandboxedNodes() { /** Default security context. */ private volatile SecurityContext dfltSecCtx; - /** Default operation security context for the case when current and new contexts are default. */ - private final OperationSecurityContext dfltOpCtx = new OperationSecurityContext(this, null) { - @Override public void close() { - // No-op. - } - }; - /** * @param ctx Grid kernal context. * @param secPrc Security processor. @@ -129,25 +125,12 @@ public IgniteSecurityProcessor(GridKernalContext ctx, GridSecurityProcessor secP } /** {@inheritDoc} */ - @Override public OperationSecurityContext withContext(SecurityContext secCtx) { - assert secCtx != null; - - SecurityContext dflt = dfltSecCtx; - SecurityContext cur = curSecCtx.get(); - - boolean isNewCtxDflt = secCtx == dflt; - boolean isCurCtxDflt = cur == null; - - if (isCurCtxDflt && isNewCtxDflt) - return dfltOpCtx; - - curSecCtx.set(isNewCtxDflt ? null : secCtx); - - return new OperationSecurityContext(this, isCurCtxDflt ? null : cur); + @Override public Scope withContext(SecurityContext secCtx) { + return OperationContext.set(SEC_CTX, secCtx == dfltSecCtx ? null : secCtx); } /** {@inheritDoc} */ - @Override public OperationSecurityContext withContext(UUID subjId) { + @Override public Scope withContext(UUID subjId) { try { SecurityContext res = secPrc.securityContext(subjId); @@ -187,19 +170,14 @@ public IgniteSecurityProcessor(GridKernalContext ctx, GridSecurityProcessor secP ); } - /** Restores local node context for the current thread. */ - void restoreDefaultContext() { - curSecCtx.set(null); - } - /** {@inheritDoc} */ @Override public boolean isDefaultContext() { - return curSecCtx.get() == null; + return OperationContext.get(SEC_CTX) == null; } /** {@inheritDoc} */ @Override public SecurityContext securityContext() { - SecurityContext res = curSecCtx.get(); + SecurityContext res = OperationContext.get(SEC_CTX); return res == null ? dfltSecCtx : res; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/NoOpIgniteSecurityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/NoOpIgniteSecurityProcessor.java index 0dc6151cf6cf6..4ad28405ce340 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/NoOpIgniteSecurityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/NoOpIgniteSecurityProcessor.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.security.sandbox.IgniteSandbox; import org.apache.ignite.internal.processors.security.sandbox.NoOpSandbox; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.plugin.security.AuthenticationContext; import org.apache.ignite.plugin.security.SecurityCredentials; import org.apache.ignite.plugin.security.SecurityException; @@ -44,13 +45,6 @@ public class NoOpIgniteSecurityProcessor extends IgniteSecurityAdapter { /** Error message that occurs when trying to perform security operations if security disabled. */ public static final String SECURITY_DISABLED_ERROR_MSG = "Operation cannot be performed: Ignite security disabled."; - /** No operation security context. */ - private final OperationSecurityContext opSecCtx = new OperationSecurityContext(this, null) { - @Override public void close() { - // No-op. - } - }; - /** Instance of IgniteSandbox. */ private final IgniteSandbox sandbox = new NoOpSandbox(); @@ -62,13 +56,13 @@ public NoOpIgniteSecurityProcessor(GridKernalContext ctx) { } /** {@inheritDoc} */ - @Override public OperationSecurityContext withContext(SecurityContext secCtx) { - return opSecCtx; + @Override public Scope withContext(SecurityContext secCtx) { + return Scope.NOOP_SCOPE; } /** {@inheritDoc} */ - @Override public OperationSecurityContext withContext(UUID nodeId) { - return opSecCtx; + @Override public Scope withContext(UUID nodeId) { + return Scope.NOOP_SCOPE; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/SecurityUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/SecurityUtils.java index 24dfa4ca01246..0911df2a584eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/SecurityUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/SecurityUtils.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.security.sandbox.IgniteDomainCombiner; import org.apache.ignite.internal.processors.security.sandbox.IgniteSandbox; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.U; @@ -201,9 +202,9 @@ public static UUID securitySubjectId(GridCacheSharedContext cctx) { * context change is needed. * Note that this method is safe to use only when it is known to be called in the security context of the local node * (e.g. in system workers). - * @return {@link OperationSecurityContext} instance if new security context is set, otherwise {@code null}. + * @return {@link Scope} instance if new security context is set, otherwise {@code null}. */ - public static OperationSecurityContext withRemoteSecurityContext(GridKernalContext ctx, SecurityContext secCtx) { + public static Scope withRemoteSecurityContext(GridKernalContext ctx, SecurityContext secCtx) { if (secCtx == null) return null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareCallable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareCallable.java deleted file mode 100644 index 43845f198f8ae..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareCallable.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.security.thread; - -import java.util.Collection; -import java.util.concurrent.Callable; -import java.util.stream.Collectors; -import org.apache.ignite.internal.processors.security.IgniteSecurity; -import org.apache.ignite.internal.processors.security.OperationSecurityContext; -import org.apache.ignite.internal.processors.security.SecurityContext; - -/** - * Represents a {@link Callable} wrapper that executes the original {@link Callable} with the security context - * current at the time the wrapper was created. - */ -class SecurityAwareCallable implements Callable { - /** Original callable. */ - private final Callable delegate; - - /** */ - private final IgniteSecurity security; - - /** */ - private final SecurityContext secCtx; - - /** */ - private SecurityAwareCallable(IgniteSecurity security, Callable delegate) { - assert security.enabled(); - assert delegate != null; - - this.delegate = delegate; - this.security = security; - secCtx = security.securityContext(); - } - - /** {@inheritDoc} */ - @Override public T call() throws Exception { - try (OperationSecurityContext ignored = security.withContext(secCtx)) { - return delegate.call(); - } - } - - /** */ - static Callable of(IgniteSecurity sec, Callable delegate) { - if (delegate == null || sec.isDefaultContext()) - return delegate; - - return new SecurityAwareCallable<>(sec, delegate); - } - - /** */ - static Collection> of( - IgniteSecurity sec, - Collection> tasks - ) { - if (tasks == null || sec.isDefaultContext()) - return tasks; - - return tasks.stream().map(t -> t == null ? null : new SecurityAwareCallable<>(sec, t)).collect(Collectors.toList()); - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareRunnable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareRunnable.java deleted file mode 100644 index 4ea8765d147e6..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareRunnable.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.security.thread; - -import org.apache.ignite.internal.processors.security.IgniteSecurity; -import org.apache.ignite.internal.processors.security.OperationSecurityContext; -import org.apache.ignite.internal.processors.security.SecurityContext; - -/** - * Represents a {@link Runnable} wrapper that executes the original {@link Runnable} with the security context - * current at the time the wrapper was created. - */ -class SecurityAwareRunnable implements Runnable { - /** */ - private final Runnable delegate; - - /** */ - private final IgniteSecurity security; - - /** */ - private final SecurityContext secCtx; - - /** */ - private SecurityAwareRunnable(IgniteSecurity security, Runnable delegate) { - assert security.enabled(); - assert delegate != null; - - this.delegate = delegate; - this.security = security; - secCtx = security.securityContext(); - } - - /** {@inheritDoc} */ - @Override public void run() { - try (OperationSecurityContext ignored = security.withContext(secCtx)) { - delegate.run(); - } - } - - /** */ - static Runnable of(IgniteSecurity security, Runnable delegate) { - if (delegate == null || security.isDefaultContext()) - return delegate; - - return new SecurityAwareRunnable(security, delegate); - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java index aed2ac33acb1a..da4189d8f2eaa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java @@ -70,9 +70,9 @@ import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; import org.apache.ignite.internal.processors.platform.services.PlatformService; import org.apache.ignite.internal.processors.platform.services.PlatformServiceConfiguration; -import org.apache.ignite.internal.processors.security.OperationSecurityContext; import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.systemview.ServiceViewWalker; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -2080,7 +2080,7 @@ private SecurityException checkDeployPermissionDuringJoin(ClusterNode node, List return err; } - try (OperationSecurityContext ignored = ctx.security().withContext(secCtx)) { + try (Scope ignored = ctx.security().withContext(secCtx)) { for (ServiceInfo desc : svcs) { SecurityException err = checkPermissions(desc.name(), SERVICE_DEPLOY); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareIoPool.java b/modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareIoPool.java similarity index 58% rename from modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareIoPool.java rename to modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareIoPool.java index c9e2359c1ac37..6768d4a21ded3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareIoPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareIoPool.java @@ -15,18 +15,14 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.security.thread; +package org.apache.ignite.internal.thread.pool; import java.util.concurrent.Executor; -import org.apache.ignite.internal.processors.security.IgniteSecurity; +import org.apache.ignite.internal.thread.context.concurrent.OperationContextAwareExecutor; import org.apache.ignite.plugin.extensions.communication.IoPool; -import org.jetbrains.annotations.NotNull; - -/** Wrapper of {@link IoPool} that executes tasks in security context that was actual when task was added to pool queue. */ -public class SecurityAwareIoPool implements IoPool { - /** */ - private final IgniteSecurity security; +/** */ +public class OperationContextAwareIoPool implements IoPool { /** */ private final IoPool delegate; @@ -34,20 +30,9 @@ public class SecurityAwareIoPool implements IoPool { private final Executor executor; /** */ - public SecurityAwareIoPool(IgniteSecurity security, IoPool delegate) { - assert security.enabled(); - assert delegate != null; - - this.security = security; + private OperationContextAwareIoPool(IoPool delegate) { this.delegate = delegate; - - final Executor delegateExecutor = delegate.executor(); - - executor = delegateExecutor == null ? null : new Executor() { - @Override public void execute(@NotNull Runnable cmd) { - delegateExecutor.execute(SecurityAwareRunnable.of(SecurityAwareIoPool.this.security, cmd)); - } - }; + this.executor = OperationContextAwareExecutor.wrap(delegate.executor()); } /** {@inheritDoc} */ @@ -59,4 +44,9 @@ public SecurityAwareIoPool(IgniteSecurity security, IoPool delegate) { @Override public Executor executor() { return executor; } + + /** */ + public static IoPool wrap(IoPool pool) { + return pool == null ? null : new OperationContextAwareIoPool(pool); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareStripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareStripedExecutor.java similarity index 59% rename from modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareStripedExecutor.java rename to modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareStripedExecutor.java index f2239716cd839..268776a4d63e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareStripedExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareStripedExecutor.java @@ -15,42 +15,19 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.security.thread; +package org.apache.ignite.internal.thread.pool; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.processors.security.IgniteSecurity; +import org.apache.ignite.internal.thread.context.function.OperationContextAwareRunnable; import org.apache.ignite.internal.util.StripedExecutor; import org.apache.ignite.internal.util.worker.GridWorkerListener; import org.apache.ignite.lang.IgniteInClosure; import org.jetbrains.annotations.NotNull; -/** - * Extends {@link StripedExecutor} with the ability to execute tasks in security context that was actual when task was - * added to executor's queue. - */ -public class SecurityAwareStripedExecutor extends StripedExecutor { - /** */ - private final IgniteSecurity security; - - /** */ - public SecurityAwareStripedExecutor( - IgniteSecurity security, - int cnt, - String igniteInstanceName, - String poolName, - IgniteLogger log, - IgniteInClosure errHnd, - GridWorkerListener gridWorkerLsnr, - long failureDetectionTimeout - ) { - super(cnt, igniteInstanceName, poolName, log, errHnd, gridWorkerLsnr, failureDetectionTimeout); - - this.security = security; - } - +/** */ +public class OperationContextAwareStripedExecutor extends StripedExecutor { /** */ - public SecurityAwareStripedExecutor( - IgniteSecurity security, + public OperationContextAwareStripedExecutor( int cnt, String igniteInstanceName, String poolName, @@ -61,17 +38,15 @@ public SecurityAwareStripedExecutor( long failureDetectionTimeout ) { super(cnt, igniteInstanceName, poolName, log, errHnd, stealTasks, gridWorkerLsnr, failureDetectionTimeout); - - this.security = security; } /** {@inheritDoc} */ - @Override public void execute(int idx, Runnable cmd) { - super.execute(idx, SecurityAwareRunnable.of(security, cmd)); + @Override public void execute(@NotNull Runnable cmd) { + super.execute(OperationContextAwareRunnable.wrapIfContextNotEmpty(cmd)); } /** {@inheritDoc} */ - @Override public void execute(@NotNull Runnable cmd) { - super.execute(SecurityAwareRunnable.of(security, cmd)); + @Override public void execute(int idx, Runnable cmd) { + super.execute(idx, OperationContextAwareRunnable.wrapIfContextNotEmpty(cmd)); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareStripedThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareStripedThreadPoolExecutor.java similarity index 59% rename from modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareStripedThreadPoolExecutor.java rename to modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareStripedThreadPoolExecutor.java index ff80f2972a8f5..5b70bfcc21f0c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareStripedThreadPoolExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareStripedThreadPoolExecutor.java @@ -15,35 +15,27 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.security.thread; +package org.apache.ignite.internal.thread.pool; -import org.apache.ignite.internal.processors.security.IgniteSecurity; +import org.apache.ignite.internal.thread.context.function.OperationContextAwareRunnable; import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; -/** - * Extends {@link IgniteStripedThreadPoolExecutor} with the ability to execute tasks in security context that was actual - * when task was added to executor's queue. - */ -public class SecurityAwareStripedThreadPoolExecutor extends IgniteStripedThreadPoolExecutor { - /** */ - private final IgniteSecurity security; - +/** */ +public class OperationContextAwareStripedThreadPoolExecutor extends IgniteStripedThreadPoolExecutor { /** */ - public SecurityAwareStripedThreadPoolExecutor( - IgniteSecurity security, - int concurrentLvl, - String igniteInstanceName, + public OperationContextAwareStripedThreadPoolExecutor( + int concurrentLvl, + String igniteInstanceName, String threadNamePrefix, - Thread.UncaughtExceptionHandler eHnd, - boolean allowCoreThreadTimeOut, + Thread.UncaughtExceptionHandler eHnd, + boolean allowCoreThreadTimeOut, long keepAliveTime ) { super(concurrentLvl, igniteInstanceName, threadNamePrefix, eHnd, allowCoreThreadTimeOut, keepAliveTime); - this.security = security; } /** {@inheritDoc} */ @Override public void execute(Runnable task, int idx) { - super.execute(SecurityAwareRunnable.of(security, task), idx); + super.execute(OperationContextAwareRunnable.wrapIfContextNotEmpty(task), idx); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareThreadPoolExecutor.java similarity index 65% rename from modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareThreadPoolExecutor.java rename to modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareThreadPoolExecutor.java index 16c3216ab9902..72888dd78a283 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/thread/SecurityAwareThreadPoolExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/thread/pool/OperationContextAwareThreadPoolExecutor.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.security.thread; +package org.apache.ignite.internal.thread.pool; import java.util.Collection; import java.util.List; @@ -23,24 +23,17 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.ignite.internal.processors.security.IgniteSecurity; +import org.apache.ignite.internal.thread.context.function.OperationContextAwareCallable; +import org.apache.ignite.internal.thread.context.function.OperationContextAwareRunnable; import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.jetbrains.annotations.NotNull; -/** - * Extends {@link ThreadPoolExecutor} with the ability to execute tasks in security context that was actual when task was - * added to executor's queue. - */ -public class SecurityAwareThreadPoolExecutor extends IgniteThreadPoolExecutor { - /** */ - private final IgniteSecurity security; - +/** */ +public class OperationContextAwareThreadPoolExecutor extends IgniteThreadPoolExecutor { /** */ - public SecurityAwareThreadPoolExecutor( - IgniteSecurity security, + public OperationContextAwareThreadPoolExecutor( String threadNamePrefix, String igniteInstanceName, int corePoolSize, @@ -51,51 +44,51 @@ public SecurityAwareThreadPoolExecutor( Thread.UncaughtExceptionHandler eHnd ) { super(threadNamePrefix, igniteInstanceName, corePoolSize, maxPoolSize, keepAliveTime, workQ, plc, eHnd); - - this.security = security; } /** {@inheritDoc} */ @NotNull @Override public Future submit(@NotNull Callable task) { - return super.submit(SecurityAwareCallable.of(security, task)); + return super.submit(OperationContextAwareCallable.wrapIfContextNotEmpty(task)); } /** {@inheritDoc} */ @NotNull @Override public Future submit(@NotNull Runnable task, T res) { - return super.submit(SecurityAwareRunnable.of(security, task), res); + return super.submit(OperationContextAwareRunnable.wrapIfContextNotEmpty(task), res); } /** {@inheritDoc} */ @NotNull @Override public Future submit(@NotNull Runnable task) { - return super.submit(SecurityAwareRunnable.of(security, task)); + return super.submit(OperationContextAwareRunnable.wrapIfContextNotEmpty(task)); } /** {@inheritDoc} */ - @NotNull @Override public List> invokeAll( - @NotNull Collection> tasks) throws InterruptedException { - return super.invokeAll(SecurityAwareCallable.of(security, tasks)); + @NotNull @Override public List> invokeAll(@NotNull Collection> tasks) throws InterruptedException { + return super.invokeAll(OperationContextAwareCallable.wrapIfContextNotEmpty(tasks)); } /** {@inheritDoc} */ - @NotNull @Override public List> invokeAll(@NotNull Collection> tasks, - long timeout, @NotNull TimeUnit unit) throws InterruptedException { - return super.invokeAll(SecurityAwareCallable.of(security, tasks), timeout, unit); + @NotNull @Override public List> invokeAll( + @NotNull Collection> tasks, + long timeout, + @NotNull TimeUnit unit + ) throws InterruptedException { + return super.invokeAll(OperationContextAwareCallable.wrapIfContextNotEmpty(tasks), timeout, unit); } /** {@inheritDoc} */ @NotNull @Override public T invokeAny(@NotNull Collection> tasks) throws InterruptedException, ExecutionException { - return super.invokeAny(SecurityAwareCallable.of(security, tasks)); + return super.invokeAny(OperationContextAwareCallable.wrapIfContextNotEmpty(tasks)); } /** {@inheritDoc} */ @Override public T invokeAny(@NotNull Collection> tasks, long timeout, @NotNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return super.invokeAny(SecurityAwareCallable.of(security, tasks), timeout, unit); + return super.invokeAny(OperationContextAwareCallable.wrapIfContextNotEmpty(tasks), timeout, unit); } /** {@inheritDoc} */ @Override public void execute(@NotNull Runnable cmd) { - super.execute(SecurityAwareRunnable.of(security, cmd)); + super.execute(OperationContextAwareRunnable.wrapIfContextNotEmpty(cmd)); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorSelfTest.java index 9976502b213b6..06fa3cc2e7da6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorSelfTest.java @@ -29,8 +29,8 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.security.IgniteSecurity; -import org.apache.ignite.internal.processors.security.OperationSecurityContext; import org.apache.ignite.internal.processors.security.SecurityContext; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.lang.ConsumerX; import org.apache.ignite.internal.util.lang.RunnableX; import org.apache.ignite.internal.util.typedef.internal.U; @@ -190,7 +190,7 @@ public void testUserManagementPermission() throws Exception { for (int i = 0; i < NODES_COUNT; ++i) { final int nodeIdx = i; - try (OperationSecurityContext ignored = grid(nodeIdx).context().security().withContext(secCtx)) { + try (Scope ignored = grid(nodeIdx).context().security().withContext(secCtx)) { GridTestUtils.assertThrows(log, () -> { grid(nodeIdx).context().security().createUser("test1", "test1".toCharArray()); @@ -462,7 +462,7 @@ public void testRemoteNodeSecurityContext() throws Exception { for (int i = 1; i < NODES_COUNT; i++) { IgniteSecurity security = ignite(i).context().security(); - try (OperationSecurityContext ignored = security.withContext(subj.id())) { + try (Scope ignored = security.withContext(subj.id())) { SecuritySubject rmtSubj = security.securityContext().subject(); assertEquals(subj.id(), rmtSubj.id()); @@ -545,7 +545,7 @@ public static void asRoot(IgniteEx ignite, ConsumerX action, Str assertNotNull(secCtx); - try (OperationSecurityContext ignored = ignite.context().security().withContext(secCtx)) { + try (Scope ignored = ignite.context().security().withContext(secCtx)) { action.accept(ignite.context().security()); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java new file mode 100644 index 0000000000000..788321c9efff0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java @@ -0,0 +1,635 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.thread.context; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.thread.pool.OperationContextAwareStripedExecutor; +import org.apache.ignite.internal.thread.pool.OperationContextAwareStripedThreadPoolExecutor; +import org.apache.ignite.internal.thread.pool.OperationContextAwareThreadPoolExecutor; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause; +import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause; + +/** */ +public class OperationContextAttributesTest extends GridCommonAbstractTest { + /** */ + private static final String DFLT_STR_VAL = "default"; + + /** */ + private static final int DFLT_INT_VAL = -1; + + /** */ + private static final OperationContextAttribute STR_ATTR = OperationContextAttribute.newInstance(DFLT_STR_VAL); + + /** */ + private static final OperationContextAttribute INT_ATTR = OperationContextAttribute.newInstance(DFLT_INT_VAL); + + /** */ + private ExecutorService poolToShutdownAfterTest; + + /** */ + private int beforeTestReservedAttrIds; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + AttributeValueChecker.CHECKS.clear(); + + beforeTestReservedAttrIds = OperationContextAttribute.ID_GEN.get(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + if (poolToShutdownAfterTest != null) + poolToShutdownAfterTest.shutdownNow(); + + // Releases attribute IDs reserved during the test. + OperationContextAttribute.ID_GEN.set(beforeTestReservedAttrIds); + } + + /** */ + @Test + public void testNotAttachedAttribute() { + // No opened scope. + assertEquals(DFLT_STR_VAL, OperationContext.get(STR_ATTR)); + + // Scope opened but testing attribute is not set. + try (Scope ignored = OperationContext.set(INT_ATTR, 0)) { + assertEquals(DFLT_STR_VAL, OperationContext.get(STR_ATTR)); + } + } + + /** */ + @Test + public void testAttachedAttribute() { + try (Scope ignored = OperationContext.set(STR_ATTR, "test")) { + assertEquals("test", OperationContext.get(STR_ATTR)); + } + } + + /** */ + @Test + public void testAttributeValueSearchUpScopeStack() { + try (Scope ignored1 = OperationContext.set(STR_ATTR, "test1")) { + try (Scope ignored2 = OperationContext.set(INT_ATTR, 2)) { + checkAttributeValues("test1", 2); + } + } + } + + /** */ + @Test + public void testAttributeValueOverwrite() { + try (Scope ignored = OperationContext.set(STR_ATTR, "test1", INT_ATTR, 1, STR_ATTR, "test2")) { + checkAttributeValues("test2", 1); + } + } + + /** */ + @Test + public void testConsequentScopes() { + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + + try (Scope ignored1 = OperationContext.set(STR_ATTR, "test1", INT_ATTR, 1)) { + checkAttributeValues("test1", 1); + } + + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + + try (Scope ignored2 = OperationContext.set(INT_ATTR, 2)) { + checkAttributeValues(DFLT_STR_VAL, 2); + } + + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + } + + /** */ + @Test + public void testNestedScopes() { + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + + try (Scope ignored1 = OperationContext.set(INT_ATTR, 1)) { + checkAttributeValues(DFLT_STR_VAL, 1); + + try (Scope ignored2 = OperationContext.set(STR_ATTR, "test2")) { + checkAttributeValues("test2", 1); + } + + checkAttributeValues(DFLT_STR_VAL, 1); + } + + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + } + + /** */ + @Test + public void testNestedScopesAttributeValueOverwriteAndInheritance() { + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + + try (Scope ignored1 = OperationContext.set(INT_ATTR, 1, STR_ATTR, "test1")) { + checkAttributeValues("test1", 1); + + try (Scope ignored2 = OperationContext.set(STR_ATTR, "test2")) { + checkAttributeValues("test2", 1); + } + + checkAttributeValues("test1", 1); + } + + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + } + + /** */ + @Test + public void testNullAttributeValue() { + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + + try (Scope ignored1 = OperationContext.set(INT_ATTR, null, STR_ATTR, null)) { + checkAttributeValues(null, null); + + try (Scope ignored2 = OperationContext.set(STR_ATTR, "test2")) { + checkAttributeValues("test2", null); + + try (Scope ignored3 = OperationContext.set(STR_ATTR, null)) { + checkAttributeValues(null, null); + } + + checkAttributeValues("test2", null); + } + + checkAttributeValues(null, null); + } + + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + } + + /** */ + @Test + public void testScopeWithInitialAttributeValue() { + try (Scope scope1 = OperationContext.set(INT_ATTR, DFLT_INT_VAL, STR_ATTR, DFLT_STR_VAL)) { + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + + assertTrue(scope1 == Scope.NOOP_SCOPE); + + try (Scope scope2 = OperationContext.set(INT_ATTR, DFLT_INT_VAL)) { + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + + assertTrue(scope2 == Scope.NOOP_SCOPE); + } + + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + } + } + + /** */ + @Test + public void testNestedScopeWithTheSameAttributeValue() { + try (Scope ignored1 = OperationContext.set(INT_ATTR, 1)) { + checkAttributeValues(DFLT_STR_VAL, 1); + + try (Scope scope = OperationContext.set(INT_ATTR, 1)) { + checkAttributeValues(DFLT_STR_VAL, 1); + + assertTrue(scope == Scope.NOOP_SCOPE); + } + + checkAttributeValues(DFLT_STR_VAL, 1); + } + } + + /** */ + @Test + public void testRuntimeAttributeCreation() { + try (Scope ignored1 = OperationContext.set(INT_ATTR, 1)) { + OperationContextAttribute attr = OperationContextAttribute.newInstance(); + + assertNull(OperationContext.get(attr)); + + try (Scope ignored2 = OperationContext.set(attr, "test")) { + assertEquals("test", OperationContext.get(attr)); + } + + assertNull(OperationContext.get(attr)); + } + } + + /** */ + @Test + public void testMaximumAttributesInstanceCount() { + int cnt = OperationContextAttribute.MAX_ATTR_CNT - OperationContextAttribute.ID_GEN.get(); + + List> attrs = new ArrayList<>(cnt); + LinkedList scopes = new LinkedList<>(); + + for (int i = 0; i < cnt; i++) { + attrs.add(OperationContextAttribute.newInstance()); + + scopes.push(OperationContext.set(attrs.get(i), i)); + } + + try { + for (int i = 0; i < cnt; i++) + assertTrue(i == OperationContext.get(attrs.get(i))); + } + finally { + scopes.forEach(Scope::close); + } + + assertTrue(attrs.stream().allMatch(attr -> OperationContext.get(attr) == null)); + + assertThrowsAnyCause( + log, + OperationContextAttribute::newInstance, + AssertionError.class, + "Exceeded maximum supported number of created Attributes instances" + ); + } + + /** */ + @Test + public void testUnorderedScopeClosing() { + Scope scope1 = OperationContext.set(INT_ATTR, 0); + + try { + try (Scope ignored = OperationContext.set(STR_ATTR, "test")) { + assertThrowsWithCause(scope1::close, AssertionError.class); + } + } + finally { + scope1.close(); + } + + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + + assertThrowsWithCause(scope1::close, AssertionError.class); + } + + /** */ + @Test + public void testEmptySnapshot() { + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + + OperationContextSnapshot snapshot = OperationContext.createSnapshot(); + + try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) { + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + } + + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + } + + /** */ + @Test + public void testSnapshot() { + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + + OperationContextSnapshot snapshot; + + try (Scope ignored = OperationContext.set(INT_ATTR, 1, STR_ATTR, "test1")) { + checkAttributeValues("test1", 1); + + snapshot = OperationContext.createSnapshot(); + } + + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + + try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) { + checkAttributeValues("test1", 1); + } + + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + } + + /** */ + @Test + public void testNestedScopeSnapshot() { + OperationContextSnapshot snapshot; + + try (Scope ignored1 = OperationContext.set(INT_ATTR, 1, STR_ATTR, "test1")) { + try (Scope ignored2 = OperationContext.set(STR_ATTR, "test2")) { + checkAttributeValues("test2", 1); + + snapshot = OperationContext.createSnapshot(); + } + } + + try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) { + checkAttributeValues("test2", 1); + } + + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + } + + /** */ + @Test + public void testNestedScopeInSnapshotScope() { + OperationContextSnapshot snapshot0; + + try (Scope ignored = OperationContext.set(INT_ATTR, 1, STR_ATTR, "test1")) { + checkAttributeValues("test1", 1); + + snapshot0 = OperationContext.createSnapshot(); + } + + OperationContextSnapshot snapshot1; + + try (Scope ignored1 = OperationContext.restoreSnapshot(snapshot0)) { + checkAttributeValues("test1", 1); + + try (Scope ignored2 = OperationContext.set(INT_ATTR, 2)) { + checkAttributeValues("test1", 2); + + snapshot1 = OperationContext.createSnapshot(); + } + + checkAttributeValues("test1", 1); + } + + try (Scope ignored0 = OperationContext.restoreSnapshot(snapshot1)) { + checkAttributeValues("test1", 2); + } + + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + } + + /** */ + @Test + public void testSnapshotRestoreInExistingScope() { + OperationContextSnapshot snapshot; + + try (Scope ignored = OperationContext.set(STR_ATTR, "test1")) { + checkAttributeValues("test1", DFLT_INT_VAL); + + snapshot = OperationContext.createSnapshot(); + } + + try (Scope ignored1 = OperationContext.set(INT_ATTR, 1)) { + checkAttributeValues(DFLT_STR_VAL, 1); + + // Note, snapshot restores the state of the entire context, including attributes that do not have a value set. + try (Scope ignored2 = OperationContext.restoreSnapshot(snapshot)) { + checkAttributeValues("test1", DFLT_INT_VAL); + } + + checkAttributeValues(DFLT_STR_VAL, 1); + } + + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + } + + /** */ + @Test + public void testSnapshotNotAffectedByConsequentContextUpdates() { + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + + OperationContextSnapshot snapshot; + + try (Scope ignored0 = OperationContext.set(INT_ATTR, 1)) { + checkAttributeValues(DFLT_STR_VAL, 1); + + snapshot = OperationContext.createSnapshot(); + + try (Scope ignored1 = OperationContext.set(STR_ATTR, "test")) { + checkAttributeValues("test", 1); + + try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) { + checkAttributeValues(DFLT_STR_VAL, 1); + } + + checkAttributeValues("test", 1); + } + } + + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + } + + /** */ + @Test + public void testSnapshotScopeUnorderedClosing() { + OperationContextSnapshot snapshot; + + try (Scope ignored = OperationContext.set(STR_ATTR, "test1")) { + checkAttributeValues("test1", DFLT_INT_VAL); + + snapshot = OperationContext.createSnapshot(); + } + + try (Scope snpScope = OperationContext.restoreSnapshot(snapshot)) { + try (Scope ignored1 = OperationContext.set(INT_ATTR, 2)) { + checkAttributeValues("test1", 2); + + assertThrowsWithCause(snpScope::close, AssertionError.class); + } + } + + checkAttributeValues(DFLT_STR_VAL, DFLT_INT_VAL); + } + + /** */ + @Test + public void testContextAwareThreadPool() throws Exception { + OperationContextAwareThreadPoolExecutor pool = deferShutdown(new OperationContextAwareThreadPoolExecutor( + "test", + null, + 1, + 1, + Long.MAX_VALUE, + new LinkedBlockingQueue<>(), + GridIoPolicy.UNDEFINED, + null)); + + doContextAwareExecutorServiceTest(pool); + } + + /** */ + @Test + public void testContextAwareStripedThreadPoolExecutor() throws Exception { + OperationContextAwareStripedThreadPoolExecutor pool = deferShutdown(new OperationContextAwareStripedThreadPoolExecutor( + 2, + getTestIgniteInstanceName(0), + "", + (t, e) -> log.error("", e), + false, + 0 + )); + + BiConsumerX checks = (s, i) -> pool.execute(new AttributeValueChecker(s, i), 1); + + createAttributeChecks(checks); + + AttributeValueChecker.assertAllCreatedChecksPassed(); + } + + /** */ + @Test + public void testContextAwareStripedExecutor() throws Exception { + OperationContextAwareStripedExecutor pool = deferShutdown(new OperationContextAwareStripedExecutor( + 2, + getTestIgniteInstanceName(0), + "", + log, + e -> {}, + false, + null, + getTestTimeout() + )); + + BiConsumerX checks = (s, i) -> { + pool.execute( new AttributeValueChecker(s, i)); + pool.execute(1, new AttributeValueChecker(s, i)); + }; + + createAttributeChecks(checks); + + AttributeValueChecker.assertAllCreatedChecksPassed(); + } + + /** */ + private void doContextAwareExecutorServiceTest(ExecutorService pool) throws Exception { + CountDownLatch poolUnblockedLatch = blockPool(pool); + + BiConsumerX asyncChecks = (s, i) -> { + pool.submit((Runnable)new AttributeValueChecker(s, i)); + pool.submit(new AttributeValueChecker(s, i), 0); + pool.submit((Callable)new AttributeValueChecker(s, i)); + }; + + BiConsumerX syncChecks = (s, i) -> { + pool.invokeAny(List.of((Callable)new AttributeValueChecker(s, i))); + pool.invokeAny(List.of((Callable)new AttributeValueChecker(s, i)), 1000, MILLISECONDS); + pool.invokeAll(List.of((Callable)new AttributeValueChecker(s, i))); + pool.invokeAll(List.of((Callable)new AttributeValueChecker(s, i)), 1000, MILLISECONDS); + }; + + createAttributeChecks(asyncChecks); + + poolUnblockedLatch.countDown(); + + createAttributeChecks(syncChecks); + + AttributeValueChecker.assertAllCreatedChecksPassed(); + } + + /** */ + private CountDownLatch blockPool(ExecutorService pool) { + CountDownLatch latch = new CountDownLatch(1); + + pool.submit(() -> { + try { + latch.await(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + return latch; + } + + /** */ + private T deferShutdown(T pool) { + poolToShutdownAfterTest = pool; + + return pool; + } + + /** */ + private void createAttributeChecks(BiConsumerX checkGenerator) throws Exception { + try (Scope ignored = OperationContext.set(STR_ATTR, "test1", INT_ATTR, 1)) { + checkGenerator.accept("test1", 1); + } + + try (Scope ignored = OperationContext.set(STR_ATTR, "test2", INT_ATTR, 2)) { + checkGenerator.accept("test2", 2); + } + + checkGenerator.accept(DFLT_STR_VAL, DFLT_INT_VAL); + } + + /** */ + private static void checkAttributeValues(String strAttrVal, Integer intAttrVal) { + assertEquals(intAttrVal, OperationContext.get(INT_ATTR)); + assertEquals(strAttrVal, OperationContext.get(STR_ATTR)); + } + + /** */ + private static class AttributeValueChecker extends CompletableFuture implements Runnable, Callable { + /** */ + static final List CHECKS = new ArrayList<>(); + + /** */ + private final String strAttrVal; + + /** */ + private final Integer intAttrVal; + + /** */ + public AttributeValueChecker(String strAttrVal, Integer intAttrVal) { + this.strAttrVal = strAttrVal; + this.intAttrVal = intAttrVal; + + CHECKS.add(this); + } + + /** {@inheritDoc} */ + @Override public void run() { + try { + checkAttributeValues(strAttrVal, intAttrVal); + + complete(null); + } + catch (Throwable e) { + completeExceptionally(e); + } + } + + /** {@inheritDoc} */ + @Override public Integer call() { + run(); + + return 0; + } + + /** */ + static void assertAllCreatedChecksPassed() throws Exception { + for (AttributeValueChecker check : CHECKS) { + check.get(1000, MILLISECONDS); + } + } + } + + /** */ + private interface BiConsumerX { + /** */ + void accept(T t, U u) throws Exception; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java index 52d0c722bb0bb..ba4e875162cad 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java @@ -73,6 +73,7 @@ import org.apache.ignite.internal.processors.security.service.ServiceAuthorizationTest; import org.apache.ignite.internal.processors.security.service.ServiceStaticConfigTest; import org.apache.ignite.internal.processors.security.snapshot.SnapshotPermissionCheckTest; +import org.apache.ignite.internal.thread.context.OperationContextAttributesTest; import org.apache.ignite.ssl.MultipleSSLContextsTest; import org.apache.ignite.tools.junit.JUnitTeamcityReporter; import org.junit.BeforeClass; @@ -145,6 +146,7 @@ ActivationOnJoinWithoutPermissionsWithPersistenceTest.class, SecurityContextInternalFuturePropagationTest.class, NodeConnectionCertificateCapturingTest.class, + OperationContextAttributesTest.class, }) public class SecurityTestSuite { /** */ diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/authentication/SqlUserCommandSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/authentication/SqlUserCommandSelfTest.java index 524e99927364e..201ea7140fe24 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/authentication/SqlUserCommandSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/authentication/SqlUserCommandSelfTest.java @@ -26,8 +26,8 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.processors.authentication.IgniteAccessControlException; import org.apache.ignite.internal.processors.authentication.UserManagementException; -import org.apache.ignite.internal.processors.security.OperationSecurityContext; import org.apache.ignite.internal.processors.security.SecurityContext; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -268,7 +268,7 @@ private void doSqlAsRoot(int nodeIdx, String sql) throws Exception { private void doSqlAs(int nodeIdx, String sql, String login, String pwd) throws Exception { SecurityContext secCtx = authenticate(grid(0), login, pwd); - try (OperationSecurityContext ignored = grid(nodeIdx).context().security().withContext(secCtx)) { + try (Scope ignored = grid(nodeIdx).context().security().withContext(secCtx)) { doSql(nodeIdx, sql); } }