diff --git a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcQueryMetricsTest.java b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcQueryMetricsTest.java index ecad3c4a9521..f78d3f2ae6d1 100644 --- a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcQueryMetricsTest.java +++ b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcQueryMetricsTest.java @@ -18,30 +18,25 @@ package org.apache.ignite.jdbc; import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl; -import static org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource.CANCELED_QUERIES; -import static org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource.FAILED_QUERIES; -import static org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource.SUCCESSFUL_QUERIES; -import static org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource.TIMED_OUT_QUERIES; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.sql.ResultSet; import java.sql.SQLException; +import java.time.Duration; import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.ignite.internal.jdbc.JdbcStatement; import org.apache.ignite.internal.metrics.LongMetric; import org.apache.ignite.internal.metrics.MetricSet; +import org.apache.ignite.internal.sql.metrics.QueryMetrics; import org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource; -import org.awaitility.Awaitility; -import org.hamcrest.Matchers; +import org.awaitility.core.ThrowingRunnable; import org.junit.jupiter.api.Test; /** @@ -66,19 +61,13 @@ public void testScriptErrors() throws SQLException { try (var stmt = conn.prepareStatement("SELECT 1; SELECT 1/?;")) { stmt.setInt(1, 0); - long success0 = metricValue(SUCCESSFUL_QUERIES); - long failed0 = metricValue(FAILED_QUERIES); - long cancelled0 = metricValue(CANCELED_QUERIES); - long timedout0 = metricValue(TIMED_OUT_QUERIES); + QueryMetrics initialMetrics = currentMetrics(); // The first statement is OK boolean rs1 = stmt.execute(); assertTrue(rs1); - assertEquals(success0 + 1, metricValue(SUCCESSFUL_QUERIES)); - assertEquals(failed0, metricValue(FAILED_QUERIES)); - assertEquals(cancelled0, metricValue(CANCELED_QUERIES)); - assertEquals(timedout0, metricValue(TIMED_OUT_QUERIES)); + awaitMetrics(initialMetrics, 1, 0, 0, 0); // The second statement fails assertThrows(SQLException.class, () -> { @@ -88,10 +77,8 @@ public void testScriptErrors() throws SQLException { rs.getInt(1); } }); - assertEquals(success0 + 1, metricValue(SUCCESSFUL_QUERIES)); - assertEquals(failed0 + 1, metricValue(FAILED_QUERIES)); - assertEquals(cancelled0, metricValue(CANCELED_QUERIES)); - assertEquals(timedout0, metricValue(TIMED_OUT_QUERIES)); + + awaitMetrics(initialMetrics, 1, 1, 0, 0); } } @@ -100,17 +87,13 @@ public void testScriptCancellation() throws SQLException { try (var stmt = conn.prepareStatement("SELECT 1; SELECT 1/?;")) { stmt.setInt(1, 0); - long success0 = metricValue(SUCCESSFUL_QUERIES); - long failed0 = metricValue(FAILED_QUERIES); - long cancelled0 = metricValue(CANCELED_QUERIES); + QueryMetrics initialMetrics = currentMetrics(); // The first statement is OK boolean rs1 = stmt.execute(); assertTrue(rs1); - assertEquals(success0 + 1, metricValue(SUCCESSFUL_QUERIES)); - assertEquals(failed0, metricValue(FAILED_QUERIES)); - assertEquals(cancelled0, metricValue(CANCELED_QUERIES)); + awaitMetrics(initialMetrics, 1, 0, 0, 0); // The second statement is cancelled assertThrows(SQLException.class, () -> { @@ -121,23 +104,15 @@ public void testScriptCancellation() throws SQLException { rs.getInt(1); } }); - assertEquals(success0 + 1, metricValue(SUCCESSFUL_QUERIES)); - assertEquals(failed0 + 1, metricValue(FAILED_QUERIES)); - assertEquals(cancelled0 + 1, metricValue(CANCELED_QUERIES)); + + awaitMetrics(initialMetrics, 1, 1, 1, 0); } } @Test public void testScriptTimeout() { - Callable> runScript = () -> { - - long success0 = metricValue(SUCCESSFUL_QUERIES); - long failed0 = metricValue(FAILED_QUERIES); - long cancelled0 = metricValue(CANCELED_QUERIES); - long timedout0 = metricValue(TIMED_OUT_QUERIES); - - log.info("Initial Metrics: success: {}, failed: {}, cancelled: {}, timed out: ", - success0, failed0, cancelled0, timedout0); + ThrowingRunnable runScript = () -> { + QueryMetrics initialMetrics = currentMetrics(); SQLException err; @@ -176,29 +151,14 @@ public void testScriptTimeout() { log.info("Script timed out"); - // Check the metrics - - long success1 = metricValue(SUCCESSFUL_QUERIES); - long failed1 = metricValue(FAILED_QUERIES); - long cancelled1 = metricValue(CANCELED_QUERIES); - long timedout1 = metricValue(TIMED_OUT_QUERIES); - - log.info("Metrics: success: {}, failed: {}, cancelled: {}, timed out: {}", - success1, failed1, cancelled1, timedout1); - - return Map.of( - SUCCESSFUL_QUERIES, success1 - success0, - FAILED_QUERIES, failed1 - failed0, - CANCELED_QUERIES, cancelled1 - cancelled0, - TIMED_OUT_QUERIES, timedout1 - timedout0 - ); + assertThat(currentMetrics(), initialMetrics.hasDeltas(1, 2, 0, 2)); }; - Map delta = Map.of( - SUCCESSFUL_QUERIES, 1L, FAILED_QUERIES, 2L, CANCELED_QUERIES, 0L, TIMED_OUT_QUERIES, 2L - ); - - Awaitility.await().ignoreExceptions().until(runScript, Matchers.equalTo(delta)); + // We need to guard against first statement possible timeout due to the timing issues, so let's retry the test until success. + await() + .pollDelay(Duration.ZERO) + .ignoreExceptions() + .untilAsserted(runScript); } private long metricValue(String name) { @@ -210,4 +170,18 @@ private long metricValue(String name) { return metric.value(); }).sum(); } + + private QueryMetrics currentMetrics() { + return new QueryMetrics(this::metricValue); + } + + private void awaitMetrics( + QueryMetrics initialMetrics, + long succeededDelta, + long failedDelta, + long canceledDelta, + long timedOutDelta + ) { + initialMetrics.awaitDeltas(this::metricValue, succeededDelta, failedDelta, canceledDelta, timedOutDelta); + } } diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlQueryExecutionMetricsTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlQueryExecutionMetricsTest.java index 9f1cb349c9f5..698b540e8fe7 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlQueryExecutionMetricsTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlQueryExecutionMetricsTest.java @@ -19,21 +19,13 @@ import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl; import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException; -import static org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource.CANCELED_QUERIES; -import static org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource.FAILED_QUERIES; -import static org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource.SUCCESSFUL_QUERIES; -import static org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource.TIMED_OUT_QUERIES; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.Objects; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; @@ -41,6 +33,7 @@ import org.apache.ignite.internal.metrics.LongMetric; import org.apache.ignite.internal.metrics.MetricSet; import org.apache.ignite.internal.sql.BaseSqlIntegrationTest; +import org.apache.ignite.internal.sql.metrics.QueryMetrics; import org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource; import org.apache.ignite.lang.CancelHandle; import org.apache.ignite.lang.ErrorGroups.Sql; @@ -51,7 +44,6 @@ import org.apache.ignite.sql.Statement; import org.apache.ignite.sql.async.AsyncResultSet; import org.apache.ignite.tx.Transaction; -import org.awaitility.Awaitility; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -79,12 +71,7 @@ public void beforeAll() { @ParameterizedTest @MethodSource("singleSuccessful") public void testSingle(String sqlString) { - Map metrics = Map.of( - SUCCESSFUL_QUERIES, 1L, FAILED_QUERIES, 0L, - CANCELED_QUERIES, 0L, TIMED_OUT_QUERIES, 0L - ); - - assertMetricIncreased(() -> sql(sqlString), metrics); + assertMetricIncreased(() -> sql(sqlString), 1, 0, 0, 0); } private static Stream singleSuccessful() { @@ -100,12 +87,7 @@ private static Stream singleSuccessful() { @ParameterizedTest @MethodSource("singleUnsuccessful") public void testSingleWithErrors(String sqlString, Object[] params) { - Map metrics = Map.of( - SUCCESSFUL_QUERIES, 0L, FAILED_QUERIES, 1L, - CANCELED_QUERIES, 0L, TIMED_OUT_QUERIES, 0L - ); - - assertMetricIncreased(() -> assertThrows(SqlException.class, () -> sql(sqlString, params)), metrics); + assertMetricIncreased(() -> assertThrows(SqlException.class, () -> sql(sqlString, params)), 0, 1, 0, 0); } private static Stream singleUnsuccessful() { @@ -130,21 +112,14 @@ private static Stream singleUnsuccessful() { public void testSingleCancellation() { IgniteSql sql = igniteSql(); - Map metrics = Map.of( - SUCCESSFUL_QUERIES, 0L, FAILED_QUERIES, 1L, - CANCELED_QUERIES, 1L, TIMED_OUT_QUERIES, 0L - ); + CancelHandle cancelHandle = CancelHandle.create(); - { - CancelHandle cancelHandle = CancelHandle.create(); - - assertMetricIncreased(() -> assertThrows(CompletionException.class, () -> { - CompletableFuture> f = sql.executeAsync((Transaction) null, cancelHandle.token(), - "SELECT x FROM system_range(1, 10000000000)"); - cancelHandle.cancelAsync(); - f.join(); - }), metrics); - } + assertMetricIncreased(() -> assertThrows(CompletionException.class, () -> { + CompletableFuture> f = sql.executeAsync((Transaction) null, cancelHandle.token(), + "SELECT x FROM system_range(1, 10000000000)"); + cancelHandle.cancelAsync(); + f.join(); + }), 0, 1, 1, 0); } @Test @@ -154,11 +129,6 @@ public void testSingleTimeout() { int timeoutSeconds = 100; TimeUnit timeoutUnit = TimeUnit.MILLISECONDS; - Map metrics = Map.of( - SUCCESSFUL_QUERIES, 0L, FAILED_QUERIES, 1L, - CANCELED_QUERIES, 0L, TIMED_OUT_QUERIES, 1L - ); - // Run multiple times to make the test case stable w/o setting large timeout values. assertMetricIncreased(() -> assertThrowsSqlException(Sql.EXECUTION_CANCELLED_ERR, "", () -> { Statement statement = sql.statementBuilder() @@ -173,7 +143,7 @@ public void testSingleTimeout() { assertNotNull(rs.next()); } } - }), metrics); + }), 0, 1, 0, 1); } @ParameterizedTest @@ -188,12 +158,7 @@ public void testScriptSuccessful(List statements) { log.info("Script:\n{}", script); - Map metrics = Map.of( - SUCCESSFUL_QUERIES, (long) statements.size(), FAILED_QUERIES, 0L, - CANCELED_QUERIES, 0L, TIMED_OUT_QUERIES, 0L - ); - - assertMetricIncreased(() -> sql.executeScript(script), metrics); + assertMetricIncreased(() -> sql.executeScript(script), statements.size(), 0, 0, 0); } private static Stream> scriptsSuccessful() { @@ -218,12 +183,7 @@ public void testScriptWithErrors(List statements, Object[] params, int s String script = String.join(";" + System.lineSeparator(), statements); - Map metrics = Map.of( - SUCCESSFUL_QUERIES, (long) success, FAILED_QUERIES, (long) error, - CANCELED_QUERIES, 0L, TIMED_OUT_QUERIES, 0L - ); - - assertMetricIncreased(() -> assertThrows(SqlException.class, () -> sql.executeScript(script, params)), metrics); + assertMetricIncreased(() -> assertThrows(SqlException.class, () -> sql.executeScript(script, params)), success, error, 0, 0); } private static Stream scriptsUnsuccessful() { @@ -244,37 +204,18 @@ private static Stream scriptsUnsuccessful() { ); } - private void assertMetricIncreased(Runnable task, Map deltas) { - Callable condition = () -> { - // Collect current metric values. - Map expected = new HashMap<>(); - for (Entry e : deltas.entrySet()) { - String metricName = e.getKey(); - long value = longMetricValue(metricName); - expected.put(metricName, value + e.getValue()); - } - - // Run inside the condition. - task.run(); - - // Collect actual metric values. - Map actual = new HashMap<>(); - for (String metricName : expected.keySet()) { - long actualVal = longMetricValue(metricName); - actual.put(metricName, actualVal); - } - boolean ok = actual.equals(expected); - - log.info("Expected: {}", expected); - log.info("Delta: {}", deltas); - log.info("Actual: {}", actual); - log.info("Check passes: {}", ok); + private void assertMetricIncreased( + Runnable task, + long succeededDelta, + long failedDelta, + long canceledDelta, + long timedOutDelta + ) { + QueryMetrics initialMetrics = new QueryMetrics(this::longMetricValue); - return ok; - }; + task.run(); - // Checks multiple times until values match - Awaitility.await().ignoreExceptions().until(condition); + initialMetrics.awaitDeltas(this::longMetricValue, succeededDelta, failedDelta, canceledDelta, timedOutDelta); } private long longMetricValue(String metricName) { diff --git a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/metrics/QueryMetrics.java b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/metrics/QueryMetrics.java new file mode 100644 index 000000000000..b512704bac2e --- /dev/null +++ b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/metrics/QueryMetrics.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.metrics; + +import static org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource.CANCELED_QUERIES; +import static org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource.FAILED_QUERIES; +import static org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource.SUCCESSFUL_QUERIES; +import static org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource.TIMED_OUT_QUERIES; +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.function.ToLongFunction; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; + +/** Snapshot of query execution metric counters captured at a point in time. */ +public class QueryMetrics { + private final long succeeded; + private final long failed; + private final long canceled; + private final long timedOut; + + /** + * Creates a snapshot by reading metric values using the given function. + * + * @param metricValue Function that returns the current value for a given metric name. + */ + public QueryMetrics(ToLongFunction metricValue) { + succeeded = metricValue.applyAsLong(SUCCESSFUL_QUERIES); + failed = metricValue.applyAsLong(FAILED_QUERIES); + canceled = metricValue.applyAsLong(CANCELED_QUERIES); + timedOut = metricValue.applyAsLong(TIMED_OUT_QUERIES); + } + + /** + * Polls the given metric value function until the metric deltas (relative to this snapshot) match the expected values. + * + * @param metricValue Function that returns the current value for a given metric name. + * @param succeededDelta Expected increase in succeeded count. + * @param failedDelta Expected increase in failed count. + * @param canceledDelta Expected increase in canceled count. + * @param timedOutDelta Expected increase in timed-out count. + */ + public void awaitDeltas( + ToLongFunction metricValue, + long succeededDelta, + long failedDelta, + long canceledDelta, + long timedOutDelta + ) { + await().pollDelay(Duration.ZERO).until( + () -> new QueryMetrics(metricValue), + hasDeltas(succeededDelta, failedDelta, canceledDelta, timedOutDelta) + ); + } + + /** + * Returns a Hamcrest matcher that checks whether the actual {@link QueryMetrics} equals this snapshot plus the given deltas. Intended + * for use with Awaitility to poll until asynchronously updated metrics reach the expected values. + * + * @param succeededDelta Expected increase in succeeded count. + * @param failedDelta Expected increase in failed count. + * @param canceledDelta Expected increase in canceled count. + * @param timedOutDelta Expected increase in timed-out count. + */ + public Matcher hasDeltas(long succeededDelta, long failedDelta, long canceledDelta, long timedOutDelta) { + long expectedSucceeded = succeeded + succeededDelta; + long expectedFailed = failed + failedDelta; + long expectedCanceled = canceled + canceledDelta; + long expectedTimedOut = timedOut + timedOutDelta; + + return new TypeSafeMatcher<>() { + @Override + protected boolean matchesSafely(QueryMetrics actual) { + return actual.succeeded == expectedSucceeded + && actual.failed == expectedFailed + && actual.canceled == expectedCanceled + && actual.timedOut == expectedTimedOut; + } + + @Override + public void describeTo(Description description) { + description.appendText("metrics [succeeded=").appendValue(expectedSucceeded) + .appendText(", failed=").appendValue(expectedFailed) + .appendText(", canceled=").appendValue(expectedCanceled) + .appendText(", timedOut=").appendValue(expectedTimedOut) + .appendText("]"); + } + + @Override + protected void describeMismatchSafely(QueryMetrics actual, Description mismatchDescription) { + boolean first = true; + if (actual.succeeded != expectedSucceeded) { + mismatchDescription.appendText("succeeded was ").appendValue(actual.succeeded); + first = false; + } + if (actual.failed != expectedFailed) { + if (!first) { + mismatchDescription.appendText(", "); + } + mismatchDescription.appendText("failed was ").appendValue(actual.failed); + first = false; + } + if (actual.canceled != expectedCanceled) { + if (!first) { + mismatchDescription.appendText(", "); + } + mismatchDescription.appendText("canceled was ").appendValue(actual.canceled); + first = false; + } + if (actual.timedOut != expectedTimedOut) { + if (!first) { + mismatchDescription.appendText(", "); + } + mismatchDescription.appendText("timedOut was ").appendValue(actual.timedOut); + } + } + }; + } +}