From ad56f538c560b24569740ddeb9167fdde11c02f8 Mon Sep 17 00:00:00 2001 From: iGxnon Date: Tue, 20 Feb 2024 21:57:01 +0800 Subject: [PATCH 1/9] test: add test for kv client Signed-off-by: iGxnon --- jxline-core/src/test/java/KVTest.java | 185 ++++++++++++++++++++++++++ 1 file changed, 185 insertions(+) create mode 100644 jxline-core/src/test/java/KVTest.java diff --git a/jxline-core/src/test/java/KVTest.java b/jxline-core/src/test/java/KVTest.java new file mode 100644 index 0000000..e20f41a --- /dev/null +++ b/jxline-core/src/test/java/KVTest.java @@ -0,0 +1,185 @@ +import cloud.xline.jxline.Client; +import cloud.xline.jxline.KV; +import cloud.xline.jxline.exceptions.XlineException; +import cloud.xline.jxline.kv.DeleteResponse; +import cloud.xline.jxline.kv.GetResponse; +import cloud.xline.jxline.kv.PutResponse; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.options.DeleteOption; +import io.etcd.jetcd.options.GetOption; +import io.etcd.jetcd.options.PutOption; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import static org.assertj.core.api.Assertions.*; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +@Timeout(value = 20) +public class KVTest { + private static KV kvClient; + + private static final ByteSequence SAMPLE_KEY = bytesOf("sample_key"); + private static final ByteSequence SAMPLE_VALUE = bytesOf("sample_value"); + private static final ByteSequence SAMPLE_KEY_2 = bytesOf("sample_key2"); + private static final ByteSequence SAMPLE_VALUE_2 = bytesOf("sample_value2"); + private static final ByteSequence SAMPLE_KEY_3 = bytesOf("sample_key3"); + + @BeforeAll + static void onConnect() { + kvClient = Client.builder().endpoints("http://172.20.0.5:2379").build().getKVClient(); + } + + public static ByteSequence bytesOf(final String string) { + return ByteSequence.from(string, StandardCharsets.UTF_8); + } + + public static String randomString() { + return java.util.UUID.randomUUID().toString(); + } + + @Test + void testItWorks() throws Exception { + ByteSequence key = ByteSequence.from("Hello Xline", Charset.defaultCharset()); + ByteSequence value = ByteSequence.from("Hi", Charset.defaultCharset()); + PutResponse putResponse = kvClient.put(key, value).get(); + assertThat(putResponse).isNotNull(); + GetResponse getResponse = kvClient.get(key).get(); + assertThat(getResponse).isNotNull(); + assertThat(getResponse.getCount()).isEqualTo(1); + assertThat(getResponse.getKvs().get(0).getValue()).isEqualTo(value); + } + + @Test + public void testByteSequence() { + ByteSequence prefix = bytesOf("/test-service/"); + ByteSequence subPrefix = bytesOf("uuids/"); + + String keyString = randomString(); + ByteSequence key = bytesOf(keyString); + ByteSequence prefixedKey = prefix.concat(subPrefix).concat(key); + assertThat(prefixedKey.startsWith(prefix)).isTrue(); + assertThat( + prefixedKey + .substring(prefix.size() + subPrefix.size()) + .toString(StandardCharsets.UTF_8)) + .isEqualTo(keyString); + assertThat(prefixedKey.substring(prefix.size(), prefix.size() + subPrefix.size())) + .isEqualTo(subPrefix); + } + + @Test + public void testPut() throws Exception { + CompletableFuture feature = kvClient.put(SAMPLE_KEY, SAMPLE_VALUE); + PutResponse response = feature.get(); + assertThat(response.getHeader()).isNotNull(); + assertThat(!response.hasPrevKv()).isTrue(); + } + + @Test + public void testPutWithNotExistLease() { + PutOption option = PutOption.builder().withLeaseId(99999).build(); + CompletableFuture future = kvClient.put(SAMPLE_KEY, SAMPLE_VALUE, option); + assertThatExceptionOfType(XlineException.class).isThrownBy(future::get); + } + + @Test + public void testGet() throws Exception { + CompletableFuture feature = kvClient.put(SAMPLE_KEY_2, SAMPLE_VALUE_2); + feature.get(); + CompletableFuture getFeature = kvClient.get(SAMPLE_KEY_2); + GetResponse response = getFeature.get(); + assertThat(response.getKvs()).hasSize(1); + assertThat(response.getKvs().get(0).getValue().toString(StandardCharsets.UTF_8)) + .isEqualTo(SAMPLE_VALUE_2.toString(StandardCharsets.UTF_8)); + assertThat(!response.isMore()).isTrue(); + } + + @Test + public void testGetWithRev() throws Exception { + CompletableFuture feature = kvClient.put(SAMPLE_KEY_3, SAMPLE_VALUE); + PutResponse putResp = feature.get(); + kvClient.put(SAMPLE_KEY_3, SAMPLE_VALUE_2).get(); + GetOption option = + GetOption.builder().withRevision(putResp.getHeader().getRevision()).build(); + CompletableFuture getFeature = kvClient.get(SAMPLE_KEY_3, option); + GetResponse response = getFeature.get(); + assertThat(response.getKvs()).hasSize(1); + assertThat(response.getKvs().get(0).getValue().toString(StandardCharsets.UTF_8)) + .isEqualTo(SAMPLE_VALUE.toString(StandardCharsets.UTF_8)); + } + + @Test + public void testGetSortedPrefix() throws Exception { + String prefix = randomString(); + int numPrefix = 3; + putKeysWithPrefix(prefix, numPrefix); + + GetOption option = + GetOption.builder() + .withSortField(GetOption.SortTarget.KEY) + .withSortOrder(GetOption.SortOrder.DESCEND) + .isPrefix(true) + .build(); + CompletableFuture getFeature = kvClient.get(bytesOf(prefix), option); + GetResponse response = getFeature.get(); + + assertThat(response.getKvs()).hasSize(numPrefix); + for (int i = 0; i < numPrefix; i++) { + assertThat(response.getKvs().get(i).getKey().toString(StandardCharsets.UTF_8)) + .isEqualTo(prefix + (numPrefix - i - 1)); + assertThat(response.getKvs().get(i).getValue().toString(StandardCharsets.UTF_8)) + .isEqualTo(String.valueOf(numPrefix - i - 1)); + } + } + + @Test + public void testDelete() throws Exception { + // Put content so that we actually have something to delete + testPut(); + + ByteSequence keyToDelete = SAMPLE_KEY; + + // count keys about to delete + CompletableFuture getFeature = kvClient.get(keyToDelete); + GetResponse resp = getFeature.get(); + + // delete the keys + CompletableFuture deleteFuture = kvClient.delete(keyToDelete); + DeleteResponse delResp = deleteFuture.get(); + assertThat(delResp.getDeleted()).isEqualTo(resp.getKvs().size()); + } + + @Test + public void testGetAndDeleteWithPrefix() throws Exception { + String prefix = randomString(); + ByteSequence key = bytesOf(prefix); + int numPrefixes = 10; + + putKeysWithPrefix(prefix, numPrefixes); + + // verify get withPrefix. + CompletableFuture getFuture = + kvClient.get(key, GetOption.builder().isPrefix(true).build()); + GetResponse getResp = getFuture.get(); + assertThat(getResp.getCount()).isEqualTo(numPrefixes); + + // verify del withPrefix. + DeleteOption deleteOpt = DeleteOption.builder().isPrefix(true).build(); + CompletableFuture delFuture = kvClient.delete(key, deleteOpt); + DeleteResponse delResp = delFuture.get(); + assertThat(delResp.getDeleted()).isEqualTo(numPrefixes); + } + + private static void putKeysWithPrefix(String prefix, int numPrefixes) + throws ExecutionException, InterruptedException { + for (int i = 0; i < numPrefixes; i++) { + ByteSequence key = bytesOf(prefix + i); + ByteSequence value = bytesOf("" + i); + kvClient.put(key, value).get(); + } + } +} From e09717a85387bf884a34024acd4a200afa8cd048 Mon Sep 17 00:00:00 2001 From: iGxnon Date: Tue, 20 Feb 2024 22:29:02 +0800 Subject: [PATCH 2/9] test: fix test Signed-off-by: iGxnon --- .../cloud/xline/jxline/exceptions/CurpException.java | 5 +++++ .../java/cloud/xline/jxline/impl/ProtocolClientImpl.java | 4 ++++ jxline-core/src/test/java/KVTest.java | 9 --------- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/jxline-core/src/main/java/cloud/xline/jxline/exceptions/CurpException.java b/jxline-core/src/main/java/cloud/xline/jxline/exceptions/CurpException.java index 73f736f..c3b5dc7 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/exceptions/CurpException.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/exceptions/CurpException.java @@ -105,4 +105,9 @@ public static CurpException toCurpException(Status status, @Nullable Metadata tr CurpError curpError = trailers.get(STATUS_DETAILS_KEY); return new CurpException(curpError); } + + @Override + public String toString() { + return "CurpError(" + error.toString().replace("\n", "") + ")"; + } } diff --git a/jxline-core/src/main/java/cloud/xline/jxline/impl/ProtocolClientImpl.java b/jxline-core/src/main/java/cloud/xline/jxline/impl/ProtocolClientImpl.java index 93f6fdb..9d68402 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/impl/ProtocolClientImpl.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/impl/ProtocolClientImpl.java @@ -191,6 +191,10 @@ CommandResponse fastRound(ProposeId id, Command cmd) { throw XlineException.toXlineException(e); } catch (ExecutionException e) { Throwable cause = e.getCause(); + // extract the most inner exception + while (cause instanceof ExecutionException) { + cause = cause.getCause(); + } if (!(cause instanceof CurpException)) { throw XlineException.toXlineException(cause); } diff --git a/jxline-core/src/test/java/KVTest.java b/jxline-core/src/test/java/KVTest.java index e20f41a..b283a31 100644 --- a/jxline-core/src/test/java/KVTest.java +++ b/jxline-core/src/test/java/KVTest.java @@ -1,13 +1,11 @@ import cloud.xline.jxline.Client; import cloud.xline.jxline.KV; -import cloud.xline.jxline.exceptions.XlineException; import cloud.xline.jxline.kv.DeleteResponse; import cloud.xline.jxline.kv.GetResponse; import cloud.xline.jxline.kv.PutResponse; import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.options.DeleteOption; import io.etcd.jetcd.options.GetOption; -import io.etcd.jetcd.options.PutOption; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -79,13 +77,6 @@ public void testPut() throws Exception { assertThat(!response.hasPrevKv()).isTrue(); } - @Test - public void testPutWithNotExistLease() { - PutOption option = PutOption.builder().withLeaseId(99999).build(); - CompletableFuture future = kvClient.put(SAMPLE_KEY, SAMPLE_VALUE, option); - assertThatExceptionOfType(XlineException.class).isThrownBy(future::get); - } - @Test public void testGet() throws Exception { CompletableFuture feature = kvClient.put(SAMPLE_KEY_2, SAMPLE_VALUE_2); From 50952092c098ac2e2bb7fa7bb082031cd08c097c Mon Sep 17 00:00:00 2001 From: iGxnon Date: Tue, 12 Mar 2024 00:41:32 +0800 Subject: [PATCH 3/9] ci: refactor ci test cluster bootstrap Signed-off-by: iGxnon --- .github/workflows/pr.yml | 2 +- ci/docker-compose.yml | 71 +++++++++++++++++++++ ci/private.pem | 28 ++++++++ ci/public.pem | 9 +++ jxline-core/src/test/java/KVTest.java | 58 +---------------- jxline-core/src/test/java/ProtocolTest.java | 4 +- scripts/quick_start.sh | 71 --------------------- 7 files changed, 114 insertions(+), 129 deletions(-) create mode 100644 ci/docker-compose.yml create mode 100644 ci/private.pem create mode 100644 ci/public.pem delete mode 100755 scripts/quick_start.sh diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 47dfca0..83d6479 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -38,7 +38,7 @@ jobs: uses: gradle/actions/setup-gradle@v3 - name: Start the cluster - run: ./scripts/quick_start.sh + run: docker compose -f ci/docker-compose.yml up -d - name: Run test run: ./gradlew test diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml new file mode 100644 index 0000000..8c9b23b --- /dev/null +++ b/ci/docker-compose.yml @@ -0,0 +1,71 @@ +version: '3.9' + +networks: + xline_network: + driver: bridge + ipam: + driver: default + config: + - subnet: "172.18.0.0/16" + +services: + node1: + image: ghcr.io/xline-kv/xline:latest + networks: + xline_network: + ipv4_address: 172.18.0.2 + volumes: + - .:/mnt + ports: + - "2379:2379" + environment: + RUST_LOG: curp=debug,xline=debug + command: > + xline + --name node1 + --members node1=http://172.18.0.2:2379,node2=http://172.18.0.3:2379,node3=http://172.18.0.4:2379 + --storage-engine rocksdb + --data-dir /usr/local/xline/data-dir + --auth-public-key /mnt/public.pem + --auth-private-key /mnt/private.pem + --is-leader + + node2: + image: ghcr.io/xline-kv/xline:latest + networks: + xline_network: + ipv4_address: 172.18.0.3 + volumes: + - .:/mnt + ports: + - "2380:2379" + environment: + RUST_LOG: curp=debug,xline=debug + command: > + xline + --name node2 + --members node1=http://172.18.0.2:2379,node2=http://172.18.0.3:2379,node3=http://172.18.0.4:2379 + --storage-engine rocksdb + --data-dir /usr/local/xline/data-dir + --auth-public-key /mnt/public.pem + --auth-private-key /mnt/private.pem + + node3: + image: ghcr.io/xline-kv/xline:latest + networks: + xline_network: + ipv4_address: 172.18.0.4 + volumes: + - .:/mnt + ports: + - "2381:2379" + environment: + RUST_LOG: curp=debug,xline=debug + command: > + xline + --name node3 + --members node1=http://172.18.0.2:2379,node2=http://172.18.0.3:2379,node3=http://172.18.0.4:2379 + --storage-engine rocksdb + --data-dir /usr/local/xline/data-dir + --auth-public-key /mnt/public.pem + --auth-private-key /mnt/private.pem diff --git a/ci/private.pem b/ci/private.pem new file mode 100644 index 0000000..a888426 --- /dev/null +++ b/ci/private.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCnAxxSXJYWZCKr +6f6j0HRUwkhX/0+GXjEclWoLA5+KZAuWMSu8bz6X+IScv4vNwORlGSWOnrz+8mb2 +I0F6teVZWfWFqsnyWk7IxM+h9yTg7aY/8685YfWTL7fpWq1/3Fniz4QbsYFuzB1V +gaZ5fD2CSYIKzSD+qVSlXF25JDFHV7b2OdHrX0UKZOTWY/VE//STt+PJKdX9R3pl +kGwAzJIkkcAZy0vhvqT3ASTgXchNeN8wGYYb3YirkqIsQB5Xcs1R1W+yz+IrVa6/ +0WMcyE6qtJPZ0lviyT0nHV/pZjXuD4B0aja/1fk/HmXDPMjpK1BuCBTStM/KlcrA +oAxo+YDhAgMBAAECggEAIyJhY+Y8YMuCC753JkklH+ubQn/gX/kSxduc6mJBvuBb +G6aOd97DQT8zzrHxHEDXC3ml0AIO6mdeR6uVC9aWQBzPrOYIA+cBqfTVZVJTvMnh +7pQ6KY01F1izjPDZjQtzEWbseNL30rI3/ZP/zJDZc745EEKlDU3cE8mBogA+Ka6w +GLozT9qQf8knBrtzxH6SvrZpfaRlP95is82b4IuPhqYdG7dVYFTALE1MyVrCbS4Y +KytjNLgwp1bIQtWrzMebBGoiU+DvDcRY8zvOfFupDwpYCt3p1aU5wyYYdr74esV7 +jjqHj89Ua65JHJ3XnMAaMc4dHM2FsGqMsOv/DDKInQKBgQDawckQEekx0QuP3eJP +GWdZ87oc+FVjDe3bYhAnCf/yXRJoqcs5vr1m1yCXFfsjbQFYHWXR9AUtNn5HCwOZ +zoT1Mv96fXBVGQORgzvlUWS43uKpfIPDVv2I6ZcKSIQAGOgcWYvmBDhYqPHgmx3o +VSrNGWtLdyw3rD1J6O+1RwtbiwKBgQDDchmY59EXBiTvlyT3Qjl0vZFMHa+TElbh +ikNtYltbUHtamOXZzpdk/KA7X2dYi0QpVfbbpfP/ly5lYvgZwl8h90Obopru+ACM +ndlKBfNQYArmWY6bJ2CwF7j1aTCCHZuVuX6/pzFVStRcssn15uoVaIyKd/MhJzLF +S3ertQkSwwKBgAniMYRhWsjeaghQ/RWXzzyYL3N5oNn92h5MWvB4mjDIFbnW2hC8 +1m/cDmPlIVijZyklAuGuhcFaMfBhxgLf+s/dQv+0xSuDGs8rP7yHpeZYY6NGtelQ +d9oEu8dCKXybo3kMbq6wyB7xWyRLvdkuZ+WmXVumgb/uL0K0nIfzMscrAoGAeA1e +K845YSslBQaSbk7/e/X1iguyDWT2eRO01zvTYgPNwZipl2CPHjkPM2km0fy5oaps +N/94IUd7+EsSmsAKL5LytGbtRFyR+c376rw8+OIFz/iy4BsQCRqJQjWa1lHZf96x +PIg2hW2xhD9OTv3IS94sdeG4NmUdipMQryhEqoECgYEAkvXOg66IAVTrO6qgoyl5 +42oufa/QE+qOAYoQEpmx3SZx6tMkycfAQqUHYcXhW1HNjyGbbg/sl13yddnPQqig ++ObtQNSIqGZWCc/HIqM//pPI3MHPhWARMOmAbk0I1mT0QKhuFfSugV2xb1Dj/Rvf +0VdB8txY+5Wz6zP1F2g46gM= +-----END PRIVATE KEY----- diff --git a/ci/public.pem b/ci/public.pem new file mode 100644 index 0000000..4c52eb6 --- /dev/null +++ b/ci/public.pem @@ -0,0 +1,9 @@ +-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEApwMcUlyWFmQiq+n+o9B0 +VMJIV/9Phl4xHJVqCwOfimQLljErvG8+l/iEnL+LzcDkZRkljp68/vJm9iNBerXl +WVn1harJ8lpOyMTPofck4O2mP/OvOWH1ky+36Vqtf9xZ4s+EG7GBbswdVYGmeXw9 +gkmCCs0g/qlUpVxduSQxR1e29jnR619FCmTk1mP1RP/0k7fjySnV/Ud6ZZBsAMyS +JJHAGctL4b6k9wEk4F3ITXjfMBmGG92Iq5KiLEAeV3LNUdVvss/iK1Wuv9FjHMhO +qrST2dJb4sk9Jx1f6WY17g+AdGo2v9X5Px5lwzzI6StQbggU0rTPypXKwKAMaPmA +4QIDAQAB +-----END PUBLIC KEY----- diff --git a/jxline-core/src/test/java/KVTest.java b/jxline-core/src/test/java/KVTest.java index b283a31..130683b 100644 --- a/jxline-core/src/test/java/KVTest.java +++ b/jxline-core/src/test/java/KVTest.java @@ -26,9 +26,11 @@ public class KVTest { private static final ByteSequence SAMPLE_VALUE_2 = bytesOf("sample_value2"); private static final ByteSequence SAMPLE_KEY_3 = bytesOf("sample_key3"); + private static final String INIT_ENDPOINT = "http://127.0.0.1:2379"; + @BeforeAll static void onConnect() { - kvClient = Client.builder().endpoints("http://172.20.0.5:2379").build().getKVClient(); + kvClient = Client.builder().endpoints(INIT_ENDPOINT).build().getKVClient(); } public static ByteSequence bytesOf(final String string) { @@ -103,30 +105,6 @@ public void testGetWithRev() throws Exception { .isEqualTo(SAMPLE_VALUE.toString(StandardCharsets.UTF_8)); } - @Test - public void testGetSortedPrefix() throws Exception { - String prefix = randomString(); - int numPrefix = 3; - putKeysWithPrefix(prefix, numPrefix); - - GetOption option = - GetOption.builder() - .withSortField(GetOption.SortTarget.KEY) - .withSortOrder(GetOption.SortOrder.DESCEND) - .isPrefix(true) - .build(); - CompletableFuture getFeature = kvClient.get(bytesOf(prefix), option); - GetResponse response = getFeature.get(); - - assertThat(response.getKvs()).hasSize(numPrefix); - for (int i = 0; i < numPrefix; i++) { - assertThat(response.getKvs().get(i).getKey().toString(StandardCharsets.UTF_8)) - .isEqualTo(prefix + (numPrefix - i - 1)); - assertThat(response.getKvs().get(i).getValue().toString(StandardCharsets.UTF_8)) - .isEqualTo(String.valueOf(numPrefix - i - 1)); - } - } - @Test public void testDelete() throws Exception { // Put content so that we actually have something to delete @@ -143,34 +121,4 @@ public void testDelete() throws Exception { DeleteResponse delResp = deleteFuture.get(); assertThat(delResp.getDeleted()).isEqualTo(resp.getKvs().size()); } - - @Test - public void testGetAndDeleteWithPrefix() throws Exception { - String prefix = randomString(); - ByteSequence key = bytesOf(prefix); - int numPrefixes = 10; - - putKeysWithPrefix(prefix, numPrefixes); - - // verify get withPrefix. - CompletableFuture getFuture = - kvClient.get(key, GetOption.builder().isPrefix(true).build()); - GetResponse getResp = getFuture.get(); - assertThat(getResp.getCount()).isEqualTo(numPrefixes); - - // verify del withPrefix. - DeleteOption deleteOpt = DeleteOption.builder().isPrefix(true).build(); - CompletableFuture delFuture = kvClient.delete(key, deleteOpt); - DeleteResponse delResp = delFuture.get(); - assertThat(delResp.getDeleted()).isEqualTo(numPrefixes); - } - - private static void putKeysWithPrefix(String prefix, int numPrefixes) - throws ExecutionException, InterruptedException { - for (int i = 0; i < numPrefixes; i++) { - ByteSequence key = bytesOf(prefix + i); - ByteSequence value = bytesOf("" + i); - kvClient.put(key, value).get(); - } - } } diff --git a/jxline-core/src/test/java/ProtocolTest.java b/jxline-core/src/test/java/ProtocolTest.java index 3e56ab7..ffdaaf3 100644 --- a/jxline-core/src/test/java/ProtocolTest.java +++ b/jxline-core/src/test/java/ProtocolTest.java @@ -10,9 +10,9 @@ @Timeout(value = 20) public class ProtocolTest { - static ProtocolClient client; + private static ProtocolClient client; - static String INIT_ENDPOINT = "http://172.20.0.5:2379"; + private static final String INIT_ENDPOINT = "http://127.0.0.1:2379"; @BeforeAll static void onConnect() { diff --git a/scripts/quick_start.sh b/scripts/quick_start.sh deleted file mode 100755 index f522f54..0000000 --- a/scripts/quick_start.sh +++ /dev/null @@ -1,71 +0,0 @@ -#!/bin/bash -DIR=$( - cd "$(dirname "$0")" - pwd -) -SERVERS=("172.20.0.2" "172.20.0.3" "172.20.0.4" "172.20.0.5") -MEMBERS="node1=${SERVERS[1]}:2379,node2=${SERVERS[2]}:2379,node3=${SERVERS[3]}:2379" - -# run xline node by index -# args: -# $1: index of the node -run_xline() { - cmd="/usr/local/bin/xline \ - --name node${1} \ - --members ${MEMBERS} \ - --storage-engine rocksdb \ - --data-dir /usr/local/xline/data-dir \ - --auth-public-key /mnt/public.pem \ - --auth-private-key /mnt/private.pem" - - if [ ${1} -eq 1 ]; then - cmd="${cmd} --is-leader" - fi - - docker exec -e RUST_LOG=debug -d node${1} ${cmd} - echo "command is: docker exec -e RUST_LOG=debug -d node${1} ${cmd}" -} - -# run cluster of xline/etcd in container -run_cluster() { - echo cluster starting - run_xline 1 & - run_xline 2 & - run_xline 3 & - wait - echo cluster started -} - -# stop all containers -stop_all() { - echo stopping - for name in "node1" "node2" "node3" "node4"; do - docker_id=$(docker ps -qf "name=${name}") - if [ -n "$docker_id" ]; then - docker stop $docker_id - fi - done - sleep 1 - echo stopped -} - -# run container of xline/etcd use specified image -# args: -# $1: size of cluster -run_container() { - echo container starting - size=${1} - image="ghcr.io/xline-kv/xline:latest" - for ((i = 1; i <= ${size}; i++)); do - docker run -d -it --rm --name=node${i} --net=xline_net --ip=${SERVERS[$i]} --cap-add=NET_ADMIN --cpu-shares=1024 -m=512M -v ${DIR}:/mnt ${image} bash & - done - docker run -d -it --rm --name=node4 --net=xline_net --ip=${SERVERS[0]} --cap-add=NET_ADMIN --cpu-shares=1024 -m=512M -v ${DIR}:/mnt gcr.io/etcd-development/etcd:v3.5.5 bash & - wait - echo container started -} - -stop_all -docker network create --subnet=172.20.0.0/24 xline_net >/dev/null 2>&1 - -run_container 3 -run_cluster From 5deb8040f947ee3ce9d12c8e31c2c2623b92dddf Mon Sep 17 00:00:00 2001 From: iGxnon Date: Tue, 12 Mar 2024 00:47:15 +0800 Subject: [PATCH 4/9] ci: remove commit message validation Signed-off-by: iGxnon --- .github/workflows/pr.yml | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 83d6479..4bb5f09 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -5,21 +5,6 @@ on: branches: [ main ] jobs: - commit: - name: Commit Message Validation - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - with: - fetch-depth: 0 - - run: git show-ref - - uses: actions-rs/install@v0.1 - with: - crate: git-cz - version: latest - - name: Validate commit messages - run: git-cz check ${{ github.event.pull_request.base.sha }}..${{ github.event.pull_request.head.sha }} - test: name: Test runs-on: ubuntu-latest From 8c9e49e681fa77e6ca81ae7cc9e4ddf6002dacd3 Mon Sep 17 00:00:00 2001 From: iGxnon Date: Tue, 12 Mar 2024 00:52:10 +0800 Subject: [PATCH 5/9] chore: code smell Signed-off-by: iGxnon --- jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java | 2 +- .../main/java/cloud/xline/jxline/impl/ProtocolClientImpl.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java b/jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java index da2379d..4a9c3dd 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java @@ -16,7 +16,7 @@ import static java.util.Objects.requireNonNull; -public class KVImpl extends Impl implements KV { +class KVImpl extends Impl implements KV { private final ProtocolClient protocolClient; diff --git a/jxline-core/src/main/java/cloud/xline/jxline/impl/ProtocolClientImpl.java b/jxline-core/src/main/java/cloud/xline/jxline/impl/ProtocolClientImpl.java index 9d68402..85d6747 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/impl/ProtocolClientImpl.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/impl/ProtocolClientImpl.java @@ -372,8 +372,8 @@ void checkUpdate(FetchClusterResponse res) { if (res.hasLeaderId() && this.term < res.getTerm()) { this.term = res.getTerm(); this.leaderId = res.getLeaderId(); - logger().info("client term updates to " + this.term); - logger().info("client leader id updates to " + this.leaderId); + logger().info("client term updates to {}", this.term); + logger().info("client leader id updates to {}", this.leaderId); } if (res.getClusterVersion() == this.clusterVersion) { return; From 0ca498521a75624c2cce2f6673fda215ddf052ce Mon Sep 17 00:00:00 2001 From: iGxnon Date: Mon, 4 Mar 2024 10:32:36 +0800 Subject: [PATCH 6/9] feat: add txn classes Signed-off-by: iGxnon --- .../src/main/java/cloud/xline/jxline/Txn.java | 4 +- .../java/cloud/xline/jxline/impl/KVImpl.java | 6 +- .../main/java/cloud/xline/jxline/op/Cmp.java | 73 +++++++++ .../java/cloud/xline/jxline/op/CmpTarget.java | 107 +++++++++++++ .../main/java/cloud/xline/jxline/op/Op.java | 151 ++++++++++++++++++ .../java/cloud/xline/jxline/op/TxnImpl.java | 112 +++++++++++++ .../cloud/xline/jxline/support/Requests.java | 67 ++++---- 7 files changed, 487 insertions(+), 33 deletions(-) create mode 100644 jxline-core/src/main/java/cloud/xline/jxline/op/Cmp.java create mode 100644 jxline-core/src/main/java/cloud/xline/jxline/op/CmpTarget.java create mode 100644 jxline-core/src/main/java/cloud/xline/jxline/op/Op.java create mode 100644 jxline-core/src/main/java/cloud/xline/jxline/op/TxnImpl.java diff --git a/jxline-core/src/main/java/cloud/xline/jxline/Txn.java b/jxline-core/src/main/java/cloud/xline/jxline/Txn.java index 1f631d0..635fb5e 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/Txn.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/Txn.java @@ -17,8 +17,8 @@ package cloud.xline.jxline; import cloud.xline.jxline.kv.TxnResponse; -import io.etcd.jetcd.op.Cmp; -import io.etcd.jetcd.op.Op; +import cloud.xline.jxline.op.Cmp; +import cloud.xline.jxline.op.Op; import java.util.concurrent.CompletableFuture; diff --git a/jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java b/jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java index 4a9c3dd..efa3b51 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java @@ -37,7 +37,7 @@ public CompletableFuture put( requireNonNull(value, "value should not be null"); requireNonNull(option, "option should not be null"); Command cmd = - Requests.mapPutRequest(key, value, option, this.connectionManager().getNamespace()); + Requests.mapPutCommand(key, value, option, this.connectionManager().getNamespace()); return protocolClient.propose( cmd, true, @@ -55,7 +55,7 @@ public CompletableFuture get(ByteSequence key, GetOption option) { requireNonNull(key, "key should not be null"); requireNonNull(option, "option should not be null"); Command cmd = - Requests.mapRangeRequest(key, option, this.connectionManager().getNamespace()); + Requests.mapRangeCommand(key, option, this.connectionManager().getNamespace()); return protocolClient.propose( cmd, true, @@ -73,7 +73,7 @@ public CompletableFuture delete(ByteSequence key, DeleteOption o requireNonNull(key, "key should not be null"); requireNonNull(option, "option should not be null"); Command cmd = - Requests.mapDeleteRequest(key, option, this.connectionManager().getNamespace()); + Requests.mapDeleteCommand(key, option, this.connectionManager().getNamespace()); return protocolClient.propose( cmd, true, diff --git a/jxline-core/src/main/java/cloud/xline/jxline/op/Cmp.java b/jxline-core/src/main/java/cloud/xline/jxline/op/Cmp.java new file mode 100644 index 0000000..5f92dd6 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/op/Cmp.java @@ -0,0 +1,73 @@ +package cloud.xline.jxline.op; + +import cloud.xline.jxline.Txn; +import com.google.protobuf.ByteString; +import com.xline.protobuf.Compare; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.support.Util; + + +/** The compare predicate in {@link Txn}. */ +public class Cmp { + + public enum Op { + EQUAL, + GREATER, + LESS, + NOT_EQUAL + } + + private final ByteString key; + private final Op op; + private final CmpTarget target; + + public Cmp(ByteSequence key, Op compareOp, CmpTarget target) { + this.key = ByteString.copyFrom(key.getBytes()); + this.op = compareOp; + this.target = target; + } + + Compare toCompare(ByteSequence namespace) { + Compare.Builder compareBuilder = + Compare.newBuilder().setKey(Util.prefixNamespace(this.key, namespace)); + switch (this.op) { + case EQUAL: + compareBuilder.setResult(Compare.CompareResult.EQUAL); + break; + case GREATER: + compareBuilder.setResult(Compare.CompareResult.GREATER); + break; + case LESS: + compareBuilder.setResult(Compare.CompareResult.LESS); + break; + case NOT_EQUAL: + compareBuilder.setResult(Compare.CompareResult.NOT_EQUAL); + break; + default: + throw new IllegalArgumentException("Unexpected compare type (" + this.op + ")"); + } + + Compare.CompareTarget target = this.target.getTarget(); + Object value = this.target.getTargetValue(); + + compareBuilder.setTarget(target); + switch (target) { + case VERSION: + compareBuilder.setVersion((Long) value); + break; + case VALUE: + compareBuilder.setValue((ByteString) value); + break; + case MOD: + compareBuilder.setModRevision((Long) value); + break; + case CREATE: + compareBuilder.setCreateRevision((Long) value); + break; + default: + throw new IllegalArgumentException("Unexpected target type (" + target + ")"); + } + + return compareBuilder.build(); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/op/CmpTarget.java b/jxline-core/src/main/java/cloud/xline/jxline/op/CmpTarget.java new file mode 100644 index 0000000..72b3b8c --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/op/CmpTarget.java @@ -0,0 +1,107 @@ +package cloud.xline.jxline.op; + +import cloud.xline.jxline.Txn; +import com.google.protobuf.ByteString; +import com.xline.protobuf.Compare; +import io.etcd.jetcd.ByteSequence; + +/** + * Cmp target used in {@link Txn}. + */ +public abstract class CmpTarget { + + /** + * Cmp on a given version. + * + * @param version version to compare + * @return the version compare target + */ + public static VersionCmpTarget version(long version) { + return new VersionCmpTarget(version); + } + + /** + * Cmp on the create revision. + * + * @param revision the create revision + * @return the create revision compare target + */ + public static CreateRevisionCmpTarget createRevision(long revision) { + return new CreateRevisionCmpTarget(revision); + } + + /** + * Cmp on the modification revision. + * + * @param revision the modification revision + * @return the modification revision compare target + */ + public static ModRevisionCmpTarget modRevision(long revision) { + return new ModRevisionCmpTarget(revision); + } + + /** + * Cmp on the value. + * + * @param value the value to compare + * @return the value compare target + */ + public static ValueCmpTarget value(ByteSequence value) { + return new ValueCmpTarget(ByteString.copyFrom(value.getBytes())); + } + + private final Compare.CompareTarget target; + private final T targetValue; + + protected CmpTarget(Compare.CompareTarget target, T targetValue) { + this.target = target; + this.targetValue = targetValue; + } + + /** + * Get the compare target used for this compare. + * + * @return the compare target used for this compare + */ + public Compare.CompareTarget getTarget() { + return target; + } + + /** + * Get the compare target value of this compare. + * + * @return the compare target value of this compare. + */ + public T getTargetValue() { + return targetValue; + } + + public static final class VersionCmpTarget extends CmpTarget { + + VersionCmpTarget(Long targetValue) { + super(Compare.CompareTarget.VERSION, targetValue); + } + } + + public static final class CreateRevisionCmpTarget extends CmpTarget { + + CreateRevisionCmpTarget(Long targetValue) { + super(Compare.CompareTarget.CREATE, targetValue); + } + } + + public static final class ModRevisionCmpTarget extends CmpTarget { + + ModRevisionCmpTarget(Long targetValue) { + super(Compare.CompareTarget.MOD, targetValue); + } + } + + public static final class ValueCmpTarget extends CmpTarget { + + ValueCmpTarget(ByteString targetValue) { + super(Compare.CompareTarget.VALUE, targetValue); + } + } + +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/op/Op.java b/jxline-core/src/main/java/cloud/xline/jxline/op/Op.java new file mode 100644 index 0000000..e6c65e2 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/op/Op.java @@ -0,0 +1,151 @@ +package cloud.xline.jxline.op; + +import cloud.xline.jxline.support.Requests; +import com.google.protobuf.ByteString; +import com.xline.protobuf.DeleteRangeRequest; +import com.xline.protobuf.RequestOp; +import com.xline.protobuf.TxnRequest; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.options.DeleteOption; +import io.etcd.jetcd.options.GetOption; +import io.etcd.jetcd.options.PutOption; +import io.etcd.jetcd.support.Util; + +/** Copied From Etcd Operation. */ +public abstract class Op { + + /** Operation type. */ + public enum Type { + PUT, + RANGE, + DELETE_RANGE, + TXN + } + + protected final Type type; + protected final ByteString key; + + protected Op(Type type, ByteString key) { + this.type = type; + this.key = key; + } + + abstract RequestOp toRequestOp(ByteSequence namespace); + + public static PutOp put(ByteSequence key, ByteSequence value, PutOption option) { + return new PutOp( + ByteString.copyFrom(key.getBytes()), ByteString.copyFrom(value.getBytes()), option); + } + + public static GetOp get(ByteSequence key, GetOption option) { + return new GetOp(ByteString.copyFrom(key.getBytes()), option); + } + + public static DeleteOp delete(ByteSequence key, DeleteOption option) { + return new DeleteOp(ByteString.copyFrom(key.getBytes()), option); + } + + public static TxnOp txn(Cmp[] cmps, Op[] thenOps, Op[] elseOps) { + return new TxnOp(cmps, thenOps, elseOps); + } + + public static final class PutOp extends Op { + + private final ByteString value; + private final PutOption option; + + private PutOp(ByteString key, ByteString value, PutOption option) { + super(Type.PUT, key); + this.value = value; + this.option = option; + } + + @Override + RequestOp toRequestOp(ByteSequence namespace) { + return RequestOp.newBuilder() + .setRequestPut( + Requests.mapPutRequest( + ByteSequence.from(key), + ByteSequence.from(value), + option, + namespace)) + .build(); + } + } + + public static final class GetOp extends Op { + + private final GetOption option; + + private GetOp(ByteString key, GetOption option) { + super(Type.RANGE, key); + this.option = option; + } + + @Override + RequestOp toRequestOp(ByteSequence namespace) { + return RequestOp.newBuilder() + .setRequestRange( + Requests.mapRangeRequest(ByteSequence.from(key), option, namespace)) + .build(); + } + } + + public static final class DeleteOp extends Op { + + private final DeleteOption option; + + DeleteOp(ByteString key, DeleteOption option) { + super(Type.DELETE_RANGE, key); + this.option = option; + } + + @Override + RequestOp toRequestOp(ByteSequence namespace) { + return RequestOp.newBuilder() + .setRequestDeleteRange( + DeleteRangeRequest.newBuilder() + .setKey(Util.prefixNamespace(key, namespace)) + .setPrevKv(option.isPrevKV())) + .build(); + } + } + + public static final class TxnOp extends Op { + private final Cmp[] cmps; + private final Op[] thenOps; + private final Op[] elseOps; + + private TxnOp(Cmp[] cmps, Op[] thenOps, Op[] elseOps) { + super(Type.TXN, null); + this.cmps = cmps; + this.thenOps = thenOps; + this.elseOps = elseOps; + } + + @Override + RequestOp toRequestOp(ByteSequence namespace) { + TxnRequest.Builder txn = TxnRequest.newBuilder(); + + if (cmps != null) { + for (Cmp cmp : cmps) { + txn.addCompare(cmp.toCompare(namespace)); + } + } + + if (thenOps != null) { + for (Op thenOp : thenOps) { + txn.addSuccess(thenOp.toRequestOp(namespace)); + } + } + + if (elseOps != null) { + for (Op elseOp : elseOps) { + txn.addFailure(elseOp.toRequestOp(namespace)); + } + } + + return RequestOp.newBuilder().setRequestTxn(txn).build(); + } + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/op/TxnImpl.java b/jxline-core/src/main/java/cloud/xline/jxline/op/TxnImpl.java new file mode 100644 index 0000000..4cee5c0 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/op/TxnImpl.java @@ -0,0 +1,112 @@ +package cloud.xline.jxline.op; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import cloud.xline.jxline.Txn; +import cloud.xline.jxline.kv.TxnResponse; +import com.xline.protobuf.TxnRequest; +import io.etcd.jetcd.ByteSequence; + +import com.google.common.annotations.VisibleForTesting; + +/** Build a transaction. */ +public class TxnImpl implements Txn { + + public static TxnImpl newTxn( + Function> f, ByteSequence namespace) { + return new TxnImpl(f, namespace); + } + + @VisibleForTesting + static TxnImpl newTxn(Function> f) { + return newTxn(f, ByteSequence.EMPTY); + } + + private final ByteSequence namespace; + + private final List cmpList = new ArrayList<>(); + private final List successOpList = new ArrayList<>(); + private final List failureOpList = new ArrayList<>(); + private final Function> requestF; + + private boolean seenThen = false; + private boolean seenElse = false; + + private TxnImpl( + Function> f, ByteSequence namespace) { + this.requestF = f; + this.namespace = namespace; + } + + @Override + public TxnImpl If(Cmp... cmps) { + return If(Arrays.asList(cmps)); + } + + TxnImpl If(List cmps) { + if (this.seenThen) { + throw new IllegalArgumentException("cannot call If after Then!"); + } + if (this.seenElse) { + throw new IllegalArgumentException("cannot call If after Else!"); + } + + cmpList.addAll(cmps); + return this; + } + + @Override + public TxnImpl Then(Op... ops) { + return Then(Arrays.asList(ops)); + } + + TxnImpl Then(List ops) { + if (this.seenElse) { + throw new IllegalArgumentException("cannot call Then after Else!"); + } + + this.seenThen = true; + + successOpList.addAll(ops); + return this; + } + + @Override + public TxnImpl Else(Op... ops) { + return Else(Arrays.asList(ops)); + } + + TxnImpl Else(List ops) { + this.seenElse = true; + + failureOpList.addAll(ops); + return this; + } + + @Override + public CompletableFuture commit() { + return this.requestF.apply(this.toTxnRequest()); + } + + private TxnRequest toTxnRequest() { + TxnRequest.Builder requestBuilder = TxnRequest.newBuilder(); + + for (Cmp c : this.cmpList) { + requestBuilder.addCompare(c.toCompare(namespace)); + } + + for (Op o : this.successOpList) { + requestBuilder.addSuccess(o.toRequestOp(namespace)); + } + + for (Op o : this.failureOpList) { + requestBuilder.addFailure(o.toRequestOp(namespace)); + } + + return requestBuilder.build(); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/support/Requests.java b/jxline-core/src/main/java/cloud/xline/jxline/support/Requests.java index cba5775..b4b6b5a 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/support/Requests.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/support/Requests.java @@ -20,15 +20,19 @@ public final class Requests { * @param namespace the namespace binding to the command * @return the command */ - public static Command mapPutRequest( + public static PutRequest mapPutRequest( ByteSequence key, ByteSequence value, PutOption option, ByteSequence namespace) { - PutRequest req = - PutRequest.newBuilder() - .setKey(Util.prefixNamespace(key, namespace)) - .setValue(ByteString.copyFrom(value.getBytes())) - .setLease(option.getLeaseId()) - .setPrevKv(option.getPrevKV()) - .build(); + return PutRequest.newBuilder() + .setKey(Util.prefixNamespace(key, namespace)) + .setValue(ByteString.copyFrom(value.getBytes())) + .setLease(option.getLeaseId()) + .setPrevKv(option.getPrevKV()) + .build(); + } + + public static Command mapPutCommand( + ByteSequence key, ByteSequence value, PutOption option, ByteSequence namespace) { + PutRequest req = mapPutRequest(key, value, option, namespace); return Command.newBuilder() .addKeys(KeyRange.newBuilder().setKey(ByteString.copyFrom(key.getBytes())).build()) .setRequest( @@ -44,22 +48,26 @@ public static Command mapPutRequest( * @param namespace the namespace binding to the command * @return the command */ - public static Command mapRangeRequest( + public static RangeRequest.Builder mapRangeRequest( ByteSequence key, GetOption option, ByteSequence namespace) { - RangeRequest.Builder builder = - RangeRequest.newBuilder() - .setKey(Util.prefixNamespace(key, namespace)) - .setCountOnly(option.isCountOnly()) - .setLimit(option.getLimit()) - .setRevision(option.getRevision()) - .setKeysOnly(option.isKeysOnly()) - .setSerializable(option.isSerializable()) - .setSortOrder(toRangeRequestSortOrder(option.getSortOrder())) - .setSortTarget(toRangeRequestSortTarget(option.getSortField())) - .setMinCreateRevision(option.getMinCreateRevision()) - .setMaxCreateRevision(option.getMaxCreateRevision()) - .setMinModRevision(option.getMinModRevision()) - .setMaxModRevision(option.getMaxModRevision()); + return RangeRequest.newBuilder() + .setKey(Util.prefixNamespace(key, namespace)) + .setCountOnly(option.isCountOnly()) + .setLimit(option.getLimit()) + .setRevision(option.getRevision()) + .setKeysOnly(option.isKeysOnly()) + .setSerializable(option.isSerializable()) + .setSortOrder(toRangeRequestSortOrder(option.getSortOrder())) + .setSortTarget(toRangeRequestSortTarget(option.getSortField())) + .setMinCreateRevision(option.getMinCreateRevision()) + .setMaxCreateRevision(option.getMaxCreateRevision()) + .setMinModRevision(option.getMinModRevision()) + .setMaxModRevision(option.getMaxModRevision()); + } + + public static Command mapRangeCommand( + ByteSequence key, GetOption option, ByteSequence namespace) { + RangeRequest.Builder builder = mapRangeRequest(key, option, namespace); defineRangeRequestEnd( key, option.getEndKey(), option.isPrefix(), namespace, builder::setRangeEnd); @@ -80,13 +88,16 @@ public static Command mapRangeRequest( * @param namespace the namespace binding to the command * @return the command */ - public static Command mapDeleteRequest( + public static DeleteRangeRequest.Builder mapDeleteRequest( ByteSequence key, DeleteOption option, ByteSequence namespace) { - DeleteRangeRequest.Builder builder = - DeleteRangeRequest.newBuilder() - .setKey(Util.prefixNamespace(key, namespace)) - .setPrevKv(option.isPrevKV()); + return DeleteRangeRequest.newBuilder() + .setKey(Util.prefixNamespace(key, namespace)) + .setPrevKv(option.isPrevKV()); + } + public static Command mapDeleteCommand( + ByteSequence key, DeleteOption option, ByteSequence namespace) { + DeleteRangeRequest.Builder builder = mapDeleteRequest(key, option, namespace); defineRangeRequestEnd( key, option.getEndKey(), option.isPrefix(), namespace, builder::setRangeEnd); From 4520a1c6d726c95445ae5c1bb62067bdb06ffa21 Mon Sep 17 00:00:00 2001 From: iGxnon Date: Mon, 4 Mar 2024 15:46:41 +0800 Subject: [PATCH 7/9] feat: implement txn interface Signed-off-by: iGxnon --- .../java/cloud/xline/jxline/impl/KVImpl.java | 11 +++++++++- .../java/cloud/xline/jxline/op/TxnImpl.java | 21 +++++++++++-------- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java b/jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java index efa3b51..cf58546 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java @@ -4,6 +4,7 @@ import cloud.xline.jxline.ProtocolClient; import cloud.xline.jxline.Txn; import cloud.xline.jxline.kv.*; +import cloud.xline.jxline.op.TxnImpl; import cloud.xline.jxline.support.Requests; import com.xline.protobuf.Command; import io.etcd.jetcd.ByteSequence; @@ -94,6 +95,14 @@ public CompletableFuture compact(long revision, CompactOption o @Override public Txn txn() { - return null; + return TxnImpl.newTxn( + this.connectionManager().getNamespace(), + cmd -> + protocolClient.propose( + cmd, + true, + (sr, asr) -> + new TxnResponse( + sr, asr, this.connectionManager().getNamespace()))); } } diff --git a/jxline-core/src/main/java/cloud/xline/jxline/op/TxnImpl.java b/jxline-core/src/main/java/cloud/xline/jxline/op/TxnImpl.java index 4cee5c0..7999d44 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/op/TxnImpl.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/op/TxnImpl.java @@ -8,6 +8,8 @@ import cloud.xline.jxline.Txn; import cloud.xline.jxline.kv.TxnResponse; +import com.xline.protobuf.Command; +import com.xline.protobuf.RequestWithToken; import com.xline.protobuf.TxnRequest; import io.etcd.jetcd.ByteSequence; @@ -17,13 +19,13 @@ public class TxnImpl implements Txn { public static TxnImpl newTxn( - Function> f, ByteSequence namespace) { - return new TxnImpl(f, namespace); + ByteSequence namespace, Function> f) { + return new TxnImpl(namespace, f); } @VisibleForTesting - static TxnImpl newTxn(Function> f) { - return newTxn(f, ByteSequence.EMPTY); + static TxnImpl newTxn(Function> f) { + return newTxn(ByteSequence.EMPTY, f); } private final ByteSequence namespace; @@ -31,13 +33,12 @@ static TxnImpl newTxn(Function> f) { private final List cmpList = new ArrayList<>(); private final List successOpList = new ArrayList<>(); private final List failureOpList = new ArrayList<>(); - private final Function> requestF; + private final Function> requestF; private boolean seenThen = false; private boolean seenElse = false; - private TxnImpl( - Function> f, ByteSequence namespace) { + private TxnImpl(ByteSequence namespace, Function> f) { this.requestF = f; this.namespace = namespace; } @@ -92,7 +93,7 @@ public CompletableFuture commit() { return this.requestF.apply(this.toTxnRequest()); } - private TxnRequest toTxnRequest() { + private Command toTxnRequest() { TxnRequest.Builder requestBuilder = TxnRequest.newBuilder(); for (Cmp c : this.cmpList) { @@ -107,6 +108,8 @@ private TxnRequest toTxnRequest() { requestBuilder.addFailure(o.toRequestOp(namespace)); } - return requestBuilder.build(); + return Command.newBuilder() + .setRequest(RequestWithToken.newBuilder().setTxnRequest(requestBuilder).build()) + .build(); } } From a5326ba0baba6bcfec2e840947bdae9228c8bde7 Mon Sep 17 00:00:00 2001 From: iGxnon Date: Tue, 5 Mar 2024 23:57:53 +0800 Subject: [PATCH 8/9] chore: publish to local Signed-off-by: iGxnon --- build.gradle.kts | 23 +++++++++++++++++-- .../main/java/cloud/xline/jxline/op/Cmp.java | 16 +++++++++++++ .../java/cloud/xline/jxline/op/CmpTarget.java | 16 +++++++++++++ .../main/java/cloud/xline/jxline/op/Op.java | 16 +++++++++++++ 4 files changed, 69 insertions(+), 2 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index ed8d2aa..9302f6f 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -2,7 +2,7 @@ import com.adarshr.gradle.testlogger.TestLoggerExtension import com.adarshr.gradle.testlogger.theme.ThemeType group = "cloud.xline" -version = "1.0-SNAPSHOT" +version = "0.1.0-SNAPSHOT" buildscript { repositories { @@ -27,6 +27,7 @@ subprojects { apply(plugin = "java-library") apply(plugin = "org.gradle.test-retry") apply(plugin = "com.adarshr.test-logger") + apply(plugin = "maven-publish") tasks { named("compileJava") { @@ -47,8 +48,26 @@ subprojects { } } - extensions.getByType().apply { + configure { + publications { + create("maven") { + groupId = rootProject.group.toString() + version = rootProject.version.toString() + + from(components["java"]) + + } + } + } + + configure { + withSourcesJar() + withJavadocJar() + } + + configure { theme = ThemeType.MOCHA_PARALLEL showStandardStreams = false } + } diff --git a/jxline-core/src/main/java/cloud/xline/jxline/op/Cmp.java b/jxline-core/src/main/java/cloud/xline/jxline/op/Cmp.java index 5f92dd6..31fd569 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/op/Cmp.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/op/Cmp.java @@ -1,3 +1,19 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cloud.xline.jxline.op; import cloud.xline.jxline.Txn; diff --git a/jxline-core/src/main/java/cloud/xline/jxline/op/CmpTarget.java b/jxline-core/src/main/java/cloud/xline/jxline/op/CmpTarget.java index 72b3b8c..170caf2 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/op/CmpTarget.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/op/CmpTarget.java @@ -1,3 +1,19 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cloud.xline.jxline.op; import cloud.xline.jxline.Txn; diff --git a/jxline-core/src/main/java/cloud/xline/jxline/op/Op.java b/jxline-core/src/main/java/cloud/xline/jxline/op/Op.java index e6c65e2..8117923 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/op/Op.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/op/Op.java @@ -1,3 +1,19 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cloud.xline.jxline.op; import cloud.xline.jxline.support.Requests; From d3064435aa4399a3769925fefca6c02e8eef0f31 Mon Sep 17 00:00:00 2001 From: iGxnon Date: Tue, 12 Mar 2024 02:47:00 +0800 Subject: [PATCH 9/9] test: add txn test Signed-off-by: iGxnon --- .../java/cloud/xline/jxline/op/TxnImpl.java | 54 ++++++++- .../cloud/xline/jxline/support/Requests.java | 30 ++++- jxline-core/src/test/java/KVTest.java | 113 ++++++++++++++++++ 3 files changed, 187 insertions(+), 10 deletions(-) diff --git a/jxline-core/src/main/java/cloud/xline/jxline/op/TxnImpl.java b/jxline-core/src/main/java/cloud/xline/jxline/op/TxnImpl.java index 7999d44..f84d241 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/op/TxnImpl.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/op/TxnImpl.java @@ -5,12 +5,11 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.function.Function; +import java.util.stream.Stream; import cloud.xline.jxline.Txn; import cloud.xline.jxline.kv.TxnResponse; -import com.xline.protobuf.Command; -import com.xline.protobuf.RequestWithToken; -import com.xline.protobuf.TxnRequest; +import com.xline.protobuf.*; import io.etcd.jetcd.ByteSequence; import com.google.common.annotations.VisibleForTesting; @@ -108,8 +107,55 @@ private Command toTxnRequest() { requestBuilder.addFailure(o.toRequestOp(namespace)); } + TxnRequest txnReq = requestBuilder.build(); + return Command.newBuilder() - .setRequest(RequestWithToken.newBuilder().setTxnRequest(requestBuilder).build()) + .addAllKeys(getTxnReqKeyRanges(txnReq)) + .setRequest(RequestWithToken.newBuilder().setTxnRequest(txnReq).build()) .build(); } + + private static List getTxnReqKeyRanges(TxnRequest req) { + List keyRanges = new ArrayList<>(); + req.getCompareList() + .forEach( + cmp -> + keyRanges.add( + KeyRange.newBuilder() + .setKey(cmp.getKey()) + .setRangeEnd(cmp.getRangeEnd()) + .build())); + Stream.concat(req.getSuccessList().stream(), req.getFailureList().stream()) + .forEach( + op -> { + if (op.hasRequestRange()) { + keyRanges.add( + KeyRange.newBuilder() + .setKey(op.getRequestRange().getKey()) + .setRangeEnd(op.getRequestRange().getRangeEnd()) + .build()); + return; + } + if (op.hasRequestPut()) { + keyRanges.add( + KeyRange.newBuilder() + .setKey(op.getRequestPut().getKey()) + .build()); + return; + } + if (op.hasRequestDeleteRange()) { + keyRanges.add( + KeyRange.newBuilder() + .setKey(op.getRequestDeleteRange().getKey()) + .setRangeEnd( + op.getRequestDeleteRange().getRangeEnd()) + .build()); + return; + } + if (op.hasRequestTxn()) { + keyRanges.addAll(getTxnReqKeyRanges(op.getRequestTxn())); + } + }); + return keyRanges; + } } diff --git a/jxline-core/src/main/java/cloud/xline/jxline/support/Requests.java b/jxline-core/src/main/java/cloud/xline/jxline/support/Requests.java index b4b6b5a..3def5ee 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/support/Requests.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/support/Requests.java @@ -34,7 +34,7 @@ public static Command mapPutCommand( ByteSequence key, ByteSequence value, PutOption option, ByteSequence namespace) { PutRequest req = mapPutRequest(key, value, option, namespace); return Command.newBuilder() - .addKeys(KeyRange.newBuilder().setKey(ByteString.copyFrom(key.getBytes())).build()) + .addKeys(KeyRange.newBuilder().setKey(Util.prefixNamespace(key, namespace))) .setRequest( RequestWithToken.newBuilder().setPutRequest(req).build()) // TODO: add token .build(); @@ -68,11 +68,20 @@ public static RangeRequest.Builder mapRangeRequest( public static Command mapRangeCommand( ByteSequence key, GetOption option, ByteSequence namespace) { RangeRequest.Builder builder = mapRangeRequest(key, option, namespace); + KeyRange.Builder keyRange = + KeyRange.newBuilder().setKey(Util.prefixNamespace(key, namespace)); defineRangeRequestEnd( - key, option.getEndKey(), option.isPrefix(), namespace, builder::setRangeEnd); + key, + option.getEndKey(), + option.isPrefix(), + namespace, + endKey -> { + builder.setRangeEnd(endKey); + keyRange.setRangeEnd(endKey); + }); return Command.newBuilder() - .addKeys(KeyRange.newBuilder().setKey(ByteString.copyFrom(key.getBytes())).build()) + .addKeys(keyRange) .setRequest( RequestWithToken.newBuilder() .setRangeRequest(builder.build()) @@ -98,11 +107,20 @@ public static DeleteRangeRequest.Builder mapDeleteRequest( public static Command mapDeleteCommand( ByteSequence key, DeleteOption option, ByteSequence namespace) { DeleteRangeRequest.Builder builder = mapDeleteRequest(key, option, namespace); - defineRangeRequestEnd( - key, option.getEndKey(), option.isPrefix(), namespace, builder::setRangeEnd); + KeyRange.Builder keyRange = + KeyRange.newBuilder().setKey(Util.prefixNamespace(key, namespace)); + defineRangeRequestEnd( + key, + option.getEndKey(), + option.isPrefix(), + namespace, + endKey -> { + builder.setRangeEnd(endKey); + keyRange.setRangeEnd(endKey); + }); return Command.newBuilder() - .addKeys(KeyRange.newBuilder().setKey(ByteString.copyFrom(key.getBytes())).build()) + .addKeys(keyRange) .setRequest( RequestWithToken.newBuilder() .setDeleteRangeRequest(builder.build()) diff --git a/jxline-core/src/test/java/KVTest.java b/jxline-core/src/test/java/KVTest.java index 130683b..beedf91 100644 --- a/jxline-core/src/test/java/KVTest.java +++ b/jxline-core/src/test/java/KVTest.java @@ -1,11 +1,17 @@ import cloud.xline.jxline.Client; import cloud.xline.jxline.KV; +import cloud.xline.jxline.Txn; import cloud.xline.jxline.kv.DeleteResponse; import cloud.xline.jxline.kv.GetResponse; import cloud.xline.jxline.kv.PutResponse; +import cloud.xline.jxline.kv.TxnResponse; +import cloud.xline.jxline.op.Cmp; +import cloud.xline.jxline.op.CmpTarget; +import cloud.xline.jxline.op.Op; import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.options.DeleteOption; import io.etcd.jetcd.options.GetOption; +import io.etcd.jetcd.options.PutOption; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -13,6 +19,7 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -121,4 +128,110 @@ public void testDelete() throws Exception { DeleteResponse delResp = deleteFuture.get(); assertThat(delResp.getDeleted()).isEqualTo(resp.getKvs().size()); } + + @Test + public void testGetSortedPrefix() throws Exception { + String prefix = randomString(); + int numPrefix = 3; + putKeysWithPrefix(prefix, numPrefix); + + GetOption option = + GetOption.builder() + .withSortField(GetOption.SortTarget.KEY) + .withSortOrder(GetOption.SortOrder.DESCEND) + .isPrefix(true) + .build(); + CompletableFuture getFeature = kvClient.get(bytesOf(prefix), option); + GetResponse response = getFeature.get(); + + assertThat(response.getKvs()).hasSize(numPrefix); + for (int i = 0; i < numPrefix; i++) { + assertThat(response.getKvs().get(i).getKey().toString(StandardCharsets.UTF_8)) + .isEqualTo(prefix + (numPrefix - i - 1)); + assertThat(response.getKvs().get(i).getValue().toString(StandardCharsets.UTF_8)) + .isEqualTo(String.valueOf(numPrefix - i - 1)); + } + } + + @Test + public void testGetAndDeleteWithPrefix() throws Exception { + String prefix = randomString(); + ByteSequence key = bytesOf(prefix); + int numPrefixes = 10; + + putKeysWithPrefix(prefix, numPrefixes); + + // verify get withPrefix. + CompletableFuture getFuture = + kvClient.get(key, GetOption.builder().isPrefix(true).build()); + GetResponse getResp = getFuture.get(); + assertThat(getResp.getCount()).isEqualTo(numPrefixes); + + // verify del withPrefix. + DeleteOption deleteOpt = DeleteOption.builder().isPrefix(true).build(); + CompletableFuture delFuture = kvClient.delete(key, deleteOpt); + DeleteResponse delResp = delFuture.get(); + assertThat(delResp.getDeleted()).isEqualTo(numPrefixes); + } + + private static void putKeysWithPrefix(String prefix, int numPrefixes) + throws ExecutionException, InterruptedException { + for (int i = 0; i < numPrefixes; i++) { + ByteSequence key = bytesOf(prefix + i); + ByteSequence value = bytesOf("" + i); + kvClient.put(key, value).get(); + } + } + + @Test + public void testTxn() throws Exception { + ByteSequence sampleKey = bytesOf("txn_key"); + ByteSequence sampleValue = bytesOf("xyz"); + ByteSequence cmpValue = bytesOf("abc"); + ByteSequence putValue = bytesOf("XYZ"); + ByteSequence putValueNew = bytesOf("ABC"); + // put the original txn key value pair + kvClient.put(sampleKey, sampleValue).get(); + + // construct txn operation + Txn txn = kvClient.txn(); + Cmp cmp = new Cmp(sampleKey, Cmp.Op.GREATER, CmpTarget.value(cmpValue)); + CompletableFuture txnResp = + txn.If(cmp) + .Then(Op.put(sampleKey, putValue, PutOption.DEFAULT)) + .Else(Op.put(sampleKey, putValueNew, PutOption.DEFAULT)) + .commit(); + txnResp.get(); + // get the value + GetResponse getResp = kvClient.get(sampleKey).get(); + assertThat(getResp.getKvs()).hasSize(1); + assertThat(getResp.getKvs().get(0).getValue().toString(StandardCharsets.UTF_8)) + .isEqualTo(putValue.toString(StandardCharsets.UTF_8)); + } + + @Test + public void testTxnForCmpOpNotEqual() throws Exception { + ByteSequence sampleKey = bytesOf("txn_key"); + ByteSequence sampleValue = bytesOf("xyz"); + ByteSequence cmpValue = bytesOf("abc"); + ByteSequence putValue = bytesOf("XYZ"); + ByteSequence putValueNew = bytesOf("ABC"); + // put the original txn key value pair + kvClient.put(sampleKey, sampleValue).get(); + + // construct txn operation + Txn txn = kvClient.txn(); + Cmp cmp = new Cmp(sampleKey, Cmp.Op.NOT_EQUAL, CmpTarget.value(cmpValue)); + CompletableFuture txnResp = + txn.If(cmp) + .Then(Op.put(sampleKey, putValue, PutOption.DEFAULT)) + .Else(Op.put(sampleKey, putValueNew, PutOption.DEFAULT)) + .commit(); + txnResp.get(); + // get the value + GetResponse getResp = kvClient.get(sampleKey).get(); + assertThat(getResp.getKvs()).hasSize(1); + assertThat(getResp.getKvs().get(0).getValue().toString(StandardCharsets.UTF_8)) + .isEqualTo(putValue.toString(StandardCharsets.UTF_8)); + } }