Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,25 @@
package org.apache.ignite.jdbc;

import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource.CANCELED_QUERIES;
import static org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource.FAILED_QUERIES;
import static org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource.SUCCESSFUL_QUERIES;
import static org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource.TIMED_OUT_QUERIES;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.ignite.internal.jdbc.JdbcStatement;
import org.apache.ignite.internal.metrics.LongMetric;
import org.apache.ignite.internal.metrics.MetricSet;
import org.apache.ignite.internal.sql.metrics.QueryMetrics;
import org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource;
import org.awaitility.Awaitility;
import org.hamcrest.Matchers;
import org.awaitility.core.ThrowingRunnable;
import org.junit.jupiter.api.Test;

/**
Expand All @@ -66,19 +61,13 @@ public void testScriptErrors() throws SQLException {
try (var stmt = conn.prepareStatement("SELECT 1; SELECT 1/?;")) {
stmt.setInt(1, 0);

long success0 = metricValue(SUCCESSFUL_QUERIES);
long failed0 = metricValue(FAILED_QUERIES);
long cancelled0 = metricValue(CANCELED_QUERIES);
long timedout0 = metricValue(TIMED_OUT_QUERIES);
QueryMetrics initialMetrics = currentMetrics();

// The first statement is OK
boolean rs1 = stmt.execute();
assertTrue(rs1);

assertEquals(success0 + 1, metricValue(SUCCESSFUL_QUERIES));
assertEquals(failed0, metricValue(FAILED_QUERIES));
assertEquals(cancelled0, metricValue(CANCELED_QUERIES));
assertEquals(timedout0, metricValue(TIMED_OUT_QUERIES));
awaitMetrics(initialMetrics, 1, 0, 0, 0);

// The second statement fails
assertThrows(SQLException.class, () -> {
Expand All @@ -88,10 +77,8 @@ public void testScriptErrors() throws SQLException {
rs.getInt(1);
}
});
assertEquals(success0 + 1, metricValue(SUCCESSFUL_QUERIES));
assertEquals(failed0 + 1, metricValue(FAILED_QUERIES));
assertEquals(cancelled0, metricValue(CANCELED_QUERIES));
assertEquals(timedout0, metricValue(TIMED_OUT_QUERIES));

awaitMetrics(initialMetrics, 1, 1, 0, 0);
}
}

Expand All @@ -100,17 +87,13 @@ public void testScriptCancellation() throws SQLException {
try (var stmt = conn.prepareStatement("SELECT 1; SELECT 1/?;")) {
stmt.setInt(1, 0);

long success0 = metricValue(SUCCESSFUL_QUERIES);
long failed0 = metricValue(FAILED_QUERIES);
long cancelled0 = metricValue(CANCELED_QUERIES);
QueryMetrics initialMetrics = currentMetrics();

// The first statement is OK
boolean rs1 = stmt.execute();
assertTrue(rs1);

assertEquals(success0 + 1, metricValue(SUCCESSFUL_QUERIES));
assertEquals(failed0, metricValue(FAILED_QUERIES));
assertEquals(cancelled0, metricValue(CANCELED_QUERIES));
awaitMetrics(initialMetrics, 1, 0, 0, 0);

// The second statement is cancelled
assertThrows(SQLException.class, () -> {
Expand All @@ -121,23 +104,15 @@ public void testScriptCancellation() throws SQLException {
rs.getInt(1);
}
});
assertEquals(success0 + 1, metricValue(SUCCESSFUL_QUERIES));
assertEquals(failed0 + 1, metricValue(FAILED_QUERIES));
assertEquals(cancelled0 + 1, metricValue(CANCELED_QUERIES));

awaitMetrics(initialMetrics, 1, 1, 1, 0);
}
}

@Test
public void testScriptTimeout() {
Callable<Map<String, Long>> runScript = () -> {

long success0 = metricValue(SUCCESSFUL_QUERIES);
long failed0 = metricValue(FAILED_QUERIES);
long cancelled0 = metricValue(CANCELED_QUERIES);
long timedout0 = metricValue(TIMED_OUT_QUERIES);

log.info("Initial Metrics: success: {}, failed: {}, cancelled: {}, timed out: ",
success0, failed0, cancelled0, timedout0);
ThrowingRunnable runScript = () -> {
QueryMetrics initialMetrics = currentMetrics();

SQLException err;

Expand Down Expand Up @@ -176,29 +151,14 @@ public void testScriptTimeout() {

log.info("Script timed out");

// Check the metrics

long success1 = metricValue(SUCCESSFUL_QUERIES);
long failed1 = metricValue(FAILED_QUERIES);
long cancelled1 = metricValue(CANCELED_QUERIES);
long timedout1 = metricValue(TIMED_OUT_QUERIES);

log.info("Metrics: success: {}, failed: {}, cancelled: {}, timed out: {}",
success1, failed1, cancelled1, timedout1);

return Map.of(
SUCCESSFUL_QUERIES, success1 - success0,
FAILED_QUERIES, failed1 - failed0,
CANCELED_QUERIES, cancelled1 - cancelled0,
TIMED_OUT_QUERIES, timedout1 - timedout0
);
assertThat(currentMetrics(), initialMetrics.hasDeltas(1, 2, 0, 2));
};

Map<String, Long> delta = Map.of(
SUCCESSFUL_QUERIES, 1L, FAILED_QUERIES, 2L, CANCELED_QUERIES, 0L, TIMED_OUT_QUERIES, 2L
);

Awaitility.await().ignoreExceptions().until(runScript, Matchers.equalTo(delta));
// We need to guard against first statement possible timeout due to the timing issues, so let's retry the test until success.
await()
.pollDelay(Duration.ZERO)
.ignoreExceptions()
.untilAsserted(runScript);
}

private long metricValue(String name) {
Expand All @@ -210,4 +170,18 @@ private long metricValue(String name) {
return metric.value();
}).sum();
}

private QueryMetrics currentMetrics() {
return new QueryMetrics(this::metricValue);
}

private void awaitMetrics(
QueryMetrics initialMetrics,
long succeededDelta,
long failedDelta,
long canceledDelta,
long timedOutDelta
) {
initialMetrics.awaitDeltas(this::metricValue, succeededDelta, failedDelta, canceledDelta, timedOutDelta);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,21 @@

import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
import static org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource.CANCELED_QUERIES;
import static org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource.FAILED_QUERIES;
import static org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource.SUCCESSFUL_QUERIES;
import static org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource.TIMED_OUT_QUERIES;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.ignite.internal.metrics.LongMetric;
import org.apache.ignite.internal.metrics.MetricSet;
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
import org.apache.ignite.internal.sql.metrics.QueryMetrics;
import org.apache.ignite.internal.sql.metrics.SqlQueryMetricSource;
import org.apache.ignite.lang.CancelHandle;
import org.apache.ignite.lang.ErrorGroups.Sql;
Expand All @@ -51,7 +44,6 @@
import org.apache.ignite.sql.Statement;
import org.apache.ignite.sql.async.AsyncResultSet;
import org.apache.ignite.tx.Transaction;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -79,12 +71,7 @@ public void beforeAll() {
@ParameterizedTest
@MethodSource("singleSuccessful")
public void testSingle(String sqlString) {
Map<String, Long> metrics = Map.of(
SUCCESSFUL_QUERIES, 1L, FAILED_QUERIES, 0L,
CANCELED_QUERIES, 0L, TIMED_OUT_QUERIES, 0L
);

assertMetricIncreased(() -> sql(sqlString), metrics);
assertMetricIncreased(() -> sql(sqlString), 1, 0, 0, 0);
}

private static Stream<Arguments> singleSuccessful() {
Expand All @@ -100,12 +87,7 @@ private static Stream<Arguments> singleSuccessful() {
@ParameterizedTest
@MethodSource("singleUnsuccessful")
public void testSingleWithErrors(String sqlString, Object[] params) {
Map<String, Long> metrics = Map.of(
SUCCESSFUL_QUERIES, 0L, FAILED_QUERIES, 1L,
CANCELED_QUERIES, 0L, TIMED_OUT_QUERIES, 0L
);

assertMetricIncreased(() -> assertThrows(SqlException.class, () -> sql(sqlString, params)), metrics);
assertMetricIncreased(() -> assertThrows(SqlException.class, () -> sql(sqlString, params)), 0, 1, 0, 0);
}

private static Stream<Arguments> singleUnsuccessful() {
Expand All @@ -130,21 +112,14 @@ private static Stream<Arguments> singleUnsuccessful() {
public void testSingleCancellation() {
IgniteSql sql = igniteSql();

Map<String, Long> metrics = Map.of(
SUCCESSFUL_QUERIES, 0L, FAILED_QUERIES, 1L,
CANCELED_QUERIES, 1L, TIMED_OUT_QUERIES, 0L
);
CancelHandle cancelHandle = CancelHandle.create();

{
CancelHandle cancelHandle = CancelHandle.create();

assertMetricIncreased(() -> assertThrows(CompletionException.class, () -> {
CompletableFuture<AsyncResultSet<SqlRow>> f = sql.executeAsync((Transaction) null, cancelHandle.token(),
"SELECT x FROM system_range(1, 10000000000)");
cancelHandle.cancelAsync();
f.join();
}), metrics);
}
assertMetricIncreased(() -> assertThrows(CompletionException.class, () -> {
CompletableFuture<AsyncResultSet<SqlRow>> f = sql.executeAsync((Transaction) null, cancelHandle.token(),
"SELECT x FROM system_range(1, 10000000000)");
cancelHandle.cancelAsync();
f.join();
}), 0, 1, 1, 0);
}

@Test
Expand All @@ -154,11 +129,6 @@ public void testSingleTimeout() {
int timeoutSeconds = 100;
TimeUnit timeoutUnit = TimeUnit.MILLISECONDS;

Map<String, Long> metrics = Map.of(
SUCCESSFUL_QUERIES, 0L, FAILED_QUERIES, 1L,
CANCELED_QUERIES, 0L, TIMED_OUT_QUERIES, 1L
);

// Run multiple times to make the test case stable w/o setting large timeout values.
assertMetricIncreased(() -> assertThrowsSqlException(Sql.EXECUTION_CANCELLED_ERR, "", () -> {
Statement statement = sql.statementBuilder()
Expand All @@ -173,7 +143,7 @@ public void testSingleTimeout() {
assertNotNull(rs.next());
}
}
}), metrics);
}), 0, 1, 0, 1);
}

@ParameterizedTest
Expand All @@ -188,12 +158,7 @@ public void testScriptSuccessful(List<String> statements) {

log.info("Script:\n{}", script);

Map<String, Long> metrics = Map.of(
SUCCESSFUL_QUERIES, (long) statements.size(), FAILED_QUERIES, 0L,
CANCELED_QUERIES, 0L, TIMED_OUT_QUERIES, 0L
);

assertMetricIncreased(() -> sql.executeScript(script), metrics);
assertMetricIncreased(() -> sql.executeScript(script), statements.size(), 0, 0, 0);
}

private static Stream<List<String>> scriptsSuccessful() {
Expand All @@ -218,12 +183,7 @@ public void testScriptWithErrors(List<String> statements, Object[] params, int s

String script = String.join(";" + System.lineSeparator(), statements);

Map<String, Long> metrics = Map.of(
SUCCESSFUL_QUERIES, (long) success, FAILED_QUERIES, (long) error,
CANCELED_QUERIES, 0L, TIMED_OUT_QUERIES, 0L
);

assertMetricIncreased(() -> assertThrows(SqlException.class, () -> sql.executeScript(script, params)), metrics);
assertMetricIncreased(() -> assertThrows(SqlException.class, () -> sql.executeScript(script, params)), success, error, 0, 0);
}

private static Stream<Arguments> scriptsUnsuccessful() {
Expand All @@ -244,37 +204,18 @@ private static Stream<Arguments> scriptsUnsuccessful() {
);
}

private void assertMetricIncreased(Runnable task, Map<String, Long> deltas) {
Callable<Boolean> condition = () -> {
// Collect current metric values.
Map<String, Long> expected = new HashMap<>();
for (Entry<String, Long> e : deltas.entrySet()) {
String metricName = e.getKey();
long value = longMetricValue(metricName);
expected.put(metricName, value + e.getValue());
}

// Run inside the condition.
task.run();

// Collect actual metric values.
Map<String, Long> actual = new HashMap<>();
for (String metricName : expected.keySet()) {
long actualVal = longMetricValue(metricName);
actual.put(metricName, actualVal);
}
boolean ok = actual.equals(expected);

log.info("Expected: {}", expected);
log.info("Delta: {}", deltas);
log.info("Actual: {}", actual);
log.info("Check passes: {}", ok);
private void assertMetricIncreased(
Runnable task,
long succeededDelta,
long failedDelta,
long canceledDelta,
long timedOutDelta
) {
QueryMetrics initialMetrics = new QueryMetrics(this::longMetricValue);

return ok;
};
task.run();

// Checks multiple times until values match
Awaitility.await().ignoreExceptions().until(condition);
initialMetrics.awaitDeltas(this::longMetricValue, succeededDelta, failedDelta, canceledDelta, timedOutDelta);
}

private long longMetricValue(String metricName) {
Expand Down
Loading