diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 47dfca0..4bb5f09 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -5,21 +5,6 @@ on: branches: [ main ] jobs: - commit: - name: Commit Message Validation - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - with: - fetch-depth: 0 - - run: git show-ref - - uses: actions-rs/install@v0.1 - with: - crate: git-cz - version: latest - - name: Validate commit messages - run: git-cz check ${{ github.event.pull_request.base.sha }}..${{ github.event.pull_request.head.sha }} - test: name: Test runs-on: ubuntu-latest @@ -38,7 +23,7 @@ jobs: uses: gradle/actions/setup-gradle@v3 - name: Start the cluster - run: ./scripts/quick_start.sh + run: docker compose -f ci/docker-compose.yml up -d - name: Run test run: ./gradlew test diff --git a/build.gradle.kts b/build.gradle.kts index ed8d2aa..9302f6f 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -2,7 +2,7 @@ import com.adarshr.gradle.testlogger.TestLoggerExtension import com.adarshr.gradle.testlogger.theme.ThemeType group = "cloud.xline" -version = "1.0-SNAPSHOT" +version = "0.1.0-SNAPSHOT" buildscript { repositories { @@ -27,6 +27,7 @@ subprojects { apply(plugin = "java-library") apply(plugin = "org.gradle.test-retry") apply(plugin = "com.adarshr.test-logger") + apply(plugin = "maven-publish") tasks { named("compileJava") { @@ -47,8 +48,26 @@ subprojects { } } - extensions.getByType().apply { + configure { + publications { + create("maven") { + groupId = rootProject.group.toString() + version = rootProject.version.toString() + + from(components["java"]) + + } + } + } + + configure { + withSourcesJar() + withJavadocJar() + } + + configure { theme = ThemeType.MOCHA_PARALLEL showStandardStreams = false } + } diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml new file mode 100644 index 0000000..8c9b23b --- /dev/null +++ b/ci/docker-compose.yml @@ -0,0 +1,71 @@ +version: '3.9' + +networks: + xline_network: + driver: bridge + ipam: + driver: default + config: + - subnet: "172.18.0.0/16" + +services: + node1: + image: ghcr.io/xline-kv/xline:latest + networks: + xline_network: + ipv4_address: 172.18.0.2 + volumes: + - .:/mnt + ports: + - "2379:2379" + environment: + RUST_LOG: curp=debug,xline=debug + command: > + xline + --name node1 + --members node1=http://172.18.0.2:2379,node2=http://172.18.0.3:2379,node3=http://172.18.0.4:2379 + --storage-engine rocksdb + --data-dir /usr/local/xline/data-dir + --auth-public-key /mnt/public.pem + --auth-private-key /mnt/private.pem + --is-leader + + node2: + image: ghcr.io/xline-kv/xline:latest + networks: + xline_network: + ipv4_address: 172.18.0.3 + volumes: + - .:/mnt + ports: + - "2380:2379" + environment: + RUST_LOG: curp=debug,xline=debug + command: > + xline + --name node2 + --members node1=http://172.18.0.2:2379,node2=http://172.18.0.3:2379,node3=http://172.18.0.4:2379 + --storage-engine rocksdb + --data-dir /usr/local/xline/data-dir + --auth-public-key /mnt/public.pem + --auth-private-key /mnt/private.pem + + node3: + image: ghcr.io/xline-kv/xline:latest + networks: + xline_network: + ipv4_address: 172.18.0.4 + volumes: + - .:/mnt + ports: + - "2381:2379" + environment: + RUST_LOG: curp=debug,xline=debug + command: > + xline + --name node3 + --members node1=http://172.18.0.2:2379,node2=http://172.18.0.3:2379,node3=http://172.18.0.4:2379 + --storage-engine rocksdb + --data-dir /usr/local/xline/data-dir + --auth-public-key /mnt/public.pem + --auth-private-key /mnt/private.pem diff --git a/ci/private.pem b/ci/private.pem new file mode 100644 index 0000000..a888426 --- /dev/null +++ b/ci/private.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCnAxxSXJYWZCKr +6f6j0HRUwkhX/0+GXjEclWoLA5+KZAuWMSu8bz6X+IScv4vNwORlGSWOnrz+8mb2 +I0F6teVZWfWFqsnyWk7IxM+h9yTg7aY/8685YfWTL7fpWq1/3Fniz4QbsYFuzB1V +gaZ5fD2CSYIKzSD+qVSlXF25JDFHV7b2OdHrX0UKZOTWY/VE//STt+PJKdX9R3pl +kGwAzJIkkcAZy0vhvqT3ASTgXchNeN8wGYYb3YirkqIsQB5Xcs1R1W+yz+IrVa6/ +0WMcyE6qtJPZ0lviyT0nHV/pZjXuD4B0aja/1fk/HmXDPMjpK1BuCBTStM/KlcrA +oAxo+YDhAgMBAAECggEAIyJhY+Y8YMuCC753JkklH+ubQn/gX/kSxduc6mJBvuBb +G6aOd97DQT8zzrHxHEDXC3ml0AIO6mdeR6uVC9aWQBzPrOYIA+cBqfTVZVJTvMnh +7pQ6KY01F1izjPDZjQtzEWbseNL30rI3/ZP/zJDZc745EEKlDU3cE8mBogA+Ka6w +GLozT9qQf8knBrtzxH6SvrZpfaRlP95is82b4IuPhqYdG7dVYFTALE1MyVrCbS4Y +KytjNLgwp1bIQtWrzMebBGoiU+DvDcRY8zvOfFupDwpYCt3p1aU5wyYYdr74esV7 +jjqHj89Ua65JHJ3XnMAaMc4dHM2FsGqMsOv/DDKInQKBgQDawckQEekx0QuP3eJP +GWdZ87oc+FVjDe3bYhAnCf/yXRJoqcs5vr1m1yCXFfsjbQFYHWXR9AUtNn5HCwOZ +zoT1Mv96fXBVGQORgzvlUWS43uKpfIPDVv2I6ZcKSIQAGOgcWYvmBDhYqPHgmx3o +VSrNGWtLdyw3rD1J6O+1RwtbiwKBgQDDchmY59EXBiTvlyT3Qjl0vZFMHa+TElbh +ikNtYltbUHtamOXZzpdk/KA7X2dYi0QpVfbbpfP/ly5lYvgZwl8h90Obopru+ACM +ndlKBfNQYArmWY6bJ2CwF7j1aTCCHZuVuX6/pzFVStRcssn15uoVaIyKd/MhJzLF +S3ertQkSwwKBgAniMYRhWsjeaghQ/RWXzzyYL3N5oNn92h5MWvB4mjDIFbnW2hC8 +1m/cDmPlIVijZyklAuGuhcFaMfBhxgLf+s/dQv+0xSuDGs8rP7yHpeZYY6NGtelQ +d9oEu8dCKXybo3kMbq6wyB7xWyRLvdkuZ+WmXVumgb/uL0K0nIfzMscrAoGAeA1e +K845YSslBQaSbk7/e/X1iguyDWT2eRO01zvTYgPNwZipl2CPHjkPM2km0fy5oaps +N/94IUd7+EsSmsAKL5LytGbtRFyR+c376rw8+OIFz/iy4BsQCRqJQjWa1lHZf96x +PIg2hW2xhD9OTv3IS94sdeG4NmUdipMQryhEqoECgYEAkvXOg66IAVTrO6qgoyl5 +42oufa/QE+qOAYoQEpmx3SZx6tMkycfAQqUHYcXhW1HNjyGbbg/sl13yddnPQqig ++ObtQNSIqGZWCc/HIqM//pPI3MHPhWARMOmAbk0I1mT0QKhuFfSugV2xb1Dj/Rvf +0VdB8txY+5Wz6zP1F2g46gM= +-----END PRIVATE KEY----- diff --git a/ci/public.pem b/ci/public.pem new file mode 100644 index 0000000..4c52eb6 --- /dev/null +++ b/ci/public.pem @@ -0,0 +1,9 @@ +-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEApwMcUlyWFmQiq+n+o9B0 +VMJIV/9Phl4xHJVqCwOfimQLljErvG8+l/iEnL+LzcDkZRkljp68/vJm9iNBerXl +WVn1harJ8lpOyMTPofck4O2mP/OvOWH1ky+36Vqtf9xZ4s+EG7GBbswdVYGmeXw9 +gkmCCs0g/qlUpVxduSQxR1e29jnR619FCmTk1mP1RP/0k7fjySnV/Ud6ZZBsAMyS +JJHAGctL4b6k9wEk4F3ITXjfMBmGG92Iq5KiLEAeV3LNUdVvss/iK1Wuv9FjHMhO +qrST2dJb4sk9Jx1f6WY17g+AdGo2v9X5Px5lwzzI6StQbggU0rTPypXKwKAMaPmA +4QIDAQAB +-----END PUBLIC KEY----- diff --git a/jxline-core/src/main/java/cloud/xline/jxline/Txn.java b/jxline-core/src/main/java/cloud/xline/jxline/Txn.java index 1f631d0..635fb5e 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/Txn.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/Txn.java @@ -17,8 +17,8 @@ package cloud.xline.jxline; import cloud.xline.jxline.kv.TxnResponse; -import io.etcd.jetcd.op.Cmp; -import io.etcd.jetcd.op.Op; +import cloud.xline.jxline.op.Cmp; +import cloud.xline.jxline.op.Op; import java.util.concurrent.CompletableFuture; diff --git a/jxline-core/src/main/java/cloud/xline/jxline/exceptions/CurpException.java b/jxline-core/src/main/java/cloud/xline/jxline/exceptions/CurpException.java index 73f736f..c3b5dc7 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/exceptions/CurpException.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/exceptions/CurpException.java @@ -105,4 +105,9 @@ public static CurpException toCurpException(Status status, @Nullable Metadata tr CurpError curpError = trailers.get(STATUS_DETAILS_KEY); return new CurpException(curpError); } + + @Override + public String toString() { + return "CurpError(" + error.toString().replace("\n", "") + ")"; + } } diff --git a/jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java b/jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java index da2379d..cf58546 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java @@ -4,6 +4,7 @@ import cloud.xline.jxline.ProtocolClient; import cloud.xline.jxline.Txn; import cloud.xline.jxline.kv.*; +import cloud.xline.jxline.op.TxnImpl; import cloud.xline.jxline.support.Requests; import com.xline.protobuf.Command; import io.etcd.jetcd.ByteSequence; @@ -16,7 +17,7 @@ import static java.util.Objects.requireNonNull; -public class KVImpl extends Impl implements KV { +class KVImpl extends Impl implements KV { private final ProtocolClient protocolClient; @@ -37,7 +38,7 @@ public CompletableFuture put( requireNonNull(value, "value should not be null"); requireNonNull(option, "option should not be null"); Command cmd = - Requests.mapPutRequest(key, value, option, this.connectionManager().getNamespace()); + Requests.mapPutCommand(key, value, option, this.connectionManager().getNamespace()); return protocolClient.propose( cmd, true, @@ -55,7 +56,7 @@ public CompletableFuture get(ByteSequence key, GetOption option) { requireNonNull(key, "key should not be null"); requireNonNull(option, "option should not be null"); Command cmd = - Requests.mapRangeRequest(key, option, this.connectionManager().getNamespace()); + Requests.mapRangeCommand(key, option, this.connectionManager().getNamespace()); return protocolClient.propose( cmd, true, @@ -73,7 +74,7 @@ public CompletableFuture delete(ByteSequence key, DeleteOption o requireNonNull(key, "key should not be null"); requireNonNull(option, "option should not be null"); Command cmd = - Requests.mapDeleteRequest(key, option, this.connectionManager().getNamespace()); + Requests.mapDeleteCommand(key, option, this.connectionManager().getNamespace()); return protocolClient.propose( cmd, true, @@ -94,6 +95,14 @@ public CompletableFuture compact(long revision, CompactOption o @Override public Txn txn() { - return null; + return TxnImpl.newTxn( + this.connectionManager().getNamespace(), + cmd -> + protocolClient.propose( + cmd, + true, + (sr, asr) -> + new TxnResponse( + sr, asr, this.connectionManager().getNamespace()))); } } diff --git a/jxline-core/src/main/java/cloud/xline/jxline/impl/ProtocolClientImpl.java b/jxline-core/src/main/java/cloud/xline/jxline/impl/ProtocolClientImpl.java index 93f6fdb..85d6747 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/impl/ProtocolClientImpl.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/impl/ProtocolClientImpl.java @@ -191,6 +191,10 @@ CommandResponse fastRound(ProposeId id, Command cmd) { throw XlineException.toXlineException(e); } catch (ExecutionException e) { Throwable cause = e.getCause(); + // extract the most inner exception + while (cause instanceof ExecutionException) { + cause = cause.getCause(); + } if (!(cause instanceof CurpException)) { throw XlineException.toXlineException(cause); } @@ -368,8 +372,8 @@ void checkUpdate(FetchClusterResponse res) { if (res.hasLeaderId() && this.term < res.getTerm()) { this.term = res.getTerm(); this.leaderId = res.getLeaderId(); - logger().info("client term updates to " + this.term); - logger().info("client leader id updates to " + this.leaderId); + logger().info("client term updates to {}", this.term); + logger().info("client leader id updates to {}", this.leaderId); } if (res.getClusterVersion() == this.clusterVersion) { return; diff --git a/jxline-core/src/main/java/cloud/xline/jxline/op/Cmp.java b/jxline-core/src/main/java/cloud/xline/jxline/op/Cmp.java new file mode 100644 index 0000000..31fd569 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/op/Cmp.java @@ -0,0 +1,89 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.op; + +import cloud.xline.jxline.Txn; +import com.google.protobuf.ByteString; +import com.xline.protobuf.Compare; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.support.Util; + + +/** The compare predicate in {@link Txn}. */ +public class Cmp { + + public enum Op { + EQUAL, + GREATER, + LESS, + NOT_EQUAL + } + + private final ByteString key; + private final Op op; + private final CmpTarget target; + + public Cmp(ByteSequence key, Op compareOp, CmpTarget target) { + this.key = ByteString.copyFrom(key.getBytes()); + this.op = compareOp; + this.target = target; + } + + Compare toCompare(ByteSequence namespace) { + Compare.Builder compareBuilder = + Compare.newBuilder().setKey(Util.prefixNamespace(this.key, namespace)); + switch (this.op) { + case EQUAL: + compareBuilder.setResult(Compare.CompareResult.EQUAL); + break; + case GREATER: + compareBuilder.setResult(Compare.CompareResult.GREATER); + break; + case LESS: + compareBuilder.setResult(Compare.CompareResult.LESS); + break; + case NOT_EQUAL: + compareBuilder.setResult(Compare.CompareResult.NOT_EQUAL); + break; + default: + throw new IllegalArgumentException("Unexpected compare type (" + this.op + ")"); + } + + Compare.CompareTarget target = this.target.getTarget(); + Object value = this.target.getTargetValue(); + + compareBuilder.setTarget(target); + switch (target) { + case VERSION: + compareBuilder.setVersion((Long) value); + break; + case VALUE: + compareBuilder.setValue((ByteString) value); + break; + case MOD: + compareBuilder.setModRevision((Long) value); + break; + case CREATE: + compareBuilder.setCreateRevision((Long) value); + break; + default: + throw new IllegalArgumentException("Unexpected target type (" + target + ")"); + } + + return compareBuilder.build(); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/op/CmpTarget.java b/jxline-core/src/main/java/cloud/xline/jxline/op/CmpTarget.java new file mode 100644 index 0000000..170caf2 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/op/CmpTarget.java @@ -0,0 +1,123 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.op; + +import cloud.xline.jxline.Txn; +import com.google.protobuf.ByteString; +import com.xline.protobuf.Compare; +import io.etcd.jetcd.ByteSequence; + +/** + * Cmp target used in {@link Txn}. + */ +public abstract class CmpTarget { + + /** + * Cmp on a given version. + * + * @param version version to compare + * @return the version compare target + */ + public static VersionCmpTarget version(long version) { + return new VersionCmpTarget(version); + } + + /** + * Cmp on the create revision. + * + * @param revision the create revision + * @return the create revision compare target + */ + public static CreateRevisionCmpTarget createRevision(long revision) { + return new CreateRevisionCmpTarget(revision); + } + + /** + * Cmp on the modification revision. + * + * @param revision the modification revision + * @return the modification revision compare target + */ + public static ModRevisionCmpTarget modRevision(long revision) { + return new ModRevisionCmpTarget(revision); + } + + /** + * Cmp on the value. + * + * @param value the value to compare + * @return the value compare target + */ + public static ValueCmpTarget value(ByteSequence value) { + return new ValueCmpTarget(ByteString.copyFrom(value.getBytes())); + } + + private final Compare.CompareTarget target; + private final T targetValue; + + protected CmpTarget(Compare.CompareTarget target, T targetValue) { + this.target = target; + this.targetValue = targetValue; + } + + /** + * Get the compare target used for this compare. + * + * @return the compare target used for this compare + */ + public Compare.CompareTarget getTarget() { + return target; + } + + /** + * Get the compare target value of this compare. + * + * @return the compare target value of this compare. + */ + public T getTargetValue() { + return targetValue; + } + + public static final class VersionCmpTarget extends CmpTarget { + + VersionCmpTarget(Long targetValue) { + super(Compare.CompareTarget.VERSION, targetValue); + } + } + + public static final class CreateRevisionCmpTarget extends CmpTarget { + + CreateRevisionCmpTarget(Long targetValue) { + super(Compare.CompareTarget.CREATE, targetValue); + } + } + + public static final class ModRevisionCmpTarget extends CmpTarget { + + ModRevisionCmpTarget(Long targetValue) { + super(Compare.CompareTarget.MOD, targetValue); + } + } + + public static final class ValueCmpTarget extends CmpTarget { + + ValueCmpTarget(ByteString targetValue) { + super(Compare.CompareTarget.VALUE, targetValue); + } + } + +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/op/Op.java b/jxline-core/src/main/java/cloud/xline/jxline/op/Op.java new file mode 100644 index 0000000..8117923 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/op/Op.java @@ -0,0 +1,167 @@ +/* + * Copyright 2016-2021 The jetcd authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.xline.jxline.op; + +import cloud.xline.jxline.support.Requests; +import com.google.protobuf.ByteString; +import com.xline.protobuf.DeleteRangeRequest; +import com.xline.protobuf.RequestOp; +import com.xline.protobuf.TxnRequest; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.options.DeleteOption; +import io.etcd.jetcd.options.GetOption; +import io.etcd.jetcd.options.PutOption; +import io.etcd.jetcd.support.Util; + +/** Copied From Etcd Operation. */ +public abstract class Op { + + /** Operation type. */ + public enum Type { + PUT, + RANGE, + DELETE_RANGE, + TXN + } + + protected final Type type; + protected final ByteString key; + + protected Op(Type type, ByteString key) { + this.type = type; + this.key = key; + } + + abstract RequestOp toRequestOp(ByteSequence namespace); + + public static PutOp put(ByteSequence key, ByteSequence value, PutOption option) { + return new PutOp( + ByteString.copyFrom(key.getBytes()), ByteString.copyFrom(value.getBytes()), option); + } + + public static GetOp get(ByteSequence key, GetOption option) { + return new GetOp(ByteString.copyFrom(key.getBytes()), option); + } + + public static DeleteOp delete(ByteSequence key, DeleteOption option) { + return new DeleteOp(ByteString.copyFrom(key.getBytes()), option); + } + + public static TxnOp txn(Cmp[] cmps, Op[] thenOps, Op[] elseOps) { + return new TxnOp(cmps, thenOps, elseOps); + } + + public static final class PutOp extends Op { + + private final ByteString value; + private final PutOption option; + + private PutOp(ByteString key, ByteString value, PutOption option) { + super(Type.PUT, key); + this.value = value; + this.option = option; + } + + @Override + RequestOp toRequestOp(ByteSequence namespace) { + return RequestOp.newBuilder() + .setRequestPut( + Requests.mapPutRequest( + ByteSequence.from(key), + ByteSequence.from(value), + option, + namespace)) + .build(); + } + } + + public static final class GetOp extends Op { + + private final GetOption option; + + private GetOp(ByteString key, GetOption option) { + super(Type.RANGE, key); + this.option = option; + } + + @Override + RequestOp toRequestOp(ByteSequence namespace) { + return RequestOp.newBuilder() + .setRequestRange( + Requests.mapRangeRequest(ByteSequence.from(key), option, namespace)) + .build(); + } + } + + public static final class DeleteOp extends Op { + + private final DeleteOption option; + + DeleteOp(ByteString key, DeleteOption option) { + super(Type.DELETE_RANGE, key); + this.option = option; + } + + @Override + RequestOp toRequestOp(ByteSequence namespace) { + return RequestOp.newBuilder() + .setRequestDeleteRange( + DeleteRangeRequest.newBuilder() + .setKey(Util.prefixNamespace(key, namespace)) + .setPrevKv(option.isPrevKV())) + .build(); + } + } + + public static final class TxnOp extends Op { + private final Cmp[] cmps; + private final Op[] thenOps; + private final Op[] elseOps; + + private TxnOp(Cmp[] cmps, Op[] thenOps, Op[] elseOps) { + super(Type.TXN, null); + this.cmps = cmps; + this.thenOps = thenOps; + this.elseOps = elseOps; + } + + @Override + RequestOp toRequestOp(ByteSequence namespace) { + TxnRequest.Builder txn = TxnRequest.newBuilder(); + + if (cmps != null) { + for (Cmp cmp : cmps) { + txn.addCompare(cmp.toCompare(namespace)); + } + } + + if (thenOps != null) { + for (Op thenOp : thenOps) { + txn.addSuccess(thenOp.toRequestOp(namespace)); + } + } + + if (elseOps != null) { + for (Op elseOp : elseOps) { + txn.addFailure(elseOp.toRequestOp(namespace)); + } + } + + return RequestOp.newBuilder().setRequestTxn(txn).build(); + } + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/op/TxnImpl.java b/jxline-core/src/main/java/cloud/xline/jxline/op/TxnImpl.java new file mode 100644 index 0000000..f84d241 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/op/TxnImpl.java @@ -0,0 +1,161 @@ +package cloud.xline.jxline.op; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Stream; + +import cloud.xline.jxline.Txn; +import cloud.xline.jxline.kv.TxnResponse; +import com.xline.protobuf.*; +import io.etcd.jetcd.ByteSequence; + +import com.google.common.annotations.VisibleForTesting; + +/** Build a transaction. */ +public class TxnImpl implements Txn { + + public static TxnImpl newTxn( + ByteSequence namespace, Function> f) { + return new TxnImpl(namespace, f); + } + + @VisibleForTesting + static TxnImpl newTxn(Function> f) { + return newTxn(ByteSequence.EMPTY, f); + } + + private final ByteSequence namespace; + + private final List cmpList = new ArrayList<>(); + private final List successOpList = new ArrayList<>(); + private final List failureOpList = new ArrayList<>(); + private final Function> requestF; + + private boolean seenThen = false; + private boolean seenElse = false; + + private TxnImpl(ByteSequence namespace, Function> f) { + this.requestF = f; + this.namespace = namespace; + } + + @Override + public TxnImpl If(Cmp... cmps) { + return If(Arrays.asList(cmps)); + } + + TxnImpl If(List cmps) { + if (this.seenThen) { + throw new IllegalArgumentException("cannot call If after Then!"); + } + if (this.seenElse) { + throw new IllegalArgumentException("cannot call If after Else!"); + } + + cmpList.addAll(cmps); + return this; + } + + @Override + public TxnImpl Then(Op... ops) { + return Then(Arrays.asList(ops)); + } + + TxnImpl Then(List ops) { + if (this.seenElse) { + throw new IllegalArgumentException("cannot call Then after Else!"); + } + + this.seenThen = true; + + successOpList.addAll(ops); + return this; + } + + @Override + public TxnImpl Else(Op... ops) { + return Else(Arrays.asList(ops)); + } + + TxnImpl Else(List ops) { + this.seenElse = true; + + failureOpList.addAll(ops); + return this; + } + + @Override + public CompletableFuture commit() { + return this.requestF.apply(this.toTxnRequest()); + } + + private Command toTxnRequest() { + TxnRequest.Builder requestBuilder = TxnRequest.newBuilder(); + + for (Cmp c : this.cmpList) { + requestBuilder.addCompare(c.toCompare(namespace)); + } + + for (Op o : this.successOpList) { + requestBuilder.addSuccess(o.toRequestOp(namespace)); + } + + for (Op o : this.failureOpList) { + requestBuilder.addFailure(o.toRequestOp(namespace)); + } + + TxnRequest txnReq = requestBuilder.build(); + + return Command.newBuilder() + .addAllKeys(getTxnReqKeyRanges(txnReq)) + .setRequest(RequestWithToken.newBuilder().setTxnRequest(txnReq).build()) + .build(); + } + + private static List getTxnReqKeyRanges(TxnRequest req) { + List keyRanges = new ArrayList<>(); + req.getCompareList() + .forEach( + cmp -> + keyRanges.add( + KeyRange.newBuilder() + .setKey(cmp.getKey()) + .setRangeEnd(cmp.getRangeEnd()) + .build())); + Stream.concat(req.getSuccessList().stream(), req.getFailureList().stream()) + .forEach( + op -> { + if (op.hasRequestRange()) { + keyRanges.add( + KeyRange.newBuilder() + .setKey(op.getRequestRange().getKey()) + .setRangeEnd(op.getRequestRange().getRangeEnd()) + .build()); + return; + } + if (op.hasRequestPut()) { + keyRanges.add( + KeyRange.newBuilder() + .setKey(op.getRequestPut().getKey()) + .build()); + return; + } + if (op.hasRequestDeleteRange()) { + keyRanges.add( + KeyRange.newBuilder() + .setKey(op.getRequestDeleteRange().getKey()) + .setRangeEnd( + op.getRequestDeleteRange().getRangeEnd()) + .build()); + return; + } + if (op.hasRequestTxn()) { + keyRanges.addAll(getTxnReqKeyRanges(op.getRequestTxn())); + } + }); + return keyRanges; + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/support/Requests.java b/jxline-core/src/main/java/cloud/xline/jxline/support/Requests.java index cba5775..3def5ee 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/support/Requests.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/support/Requests.java @@ -20,17 +20,21 @@ public final class Requests { * @param namespace the namespace binding to the command * @return the command */ - public static Command mapPutRequest( + public static PutRequest mapPutRequest( ByteSequence key, ByteSequence value, PutOption option, ByteSequence namespace) { - PutRequest req = - PutRequest.newBuilder() - .setKey(Util.prefixNamespace(key, namespace)) - .setValue(ByteString.copyFrom(value.getBytes())) - .setLease(option.getLeaseId()) - .setPrevKv(option.getPrevKV()) - .build(); + return PutRequest.newBuilder() + .setKey(Util.prefixNamespace(key, namespace)) + .setValue(ByteString.copyFrom(value.getBytes())) + .setLease(option.getLeaseId()) + .setPrevKv(option.getPrevKV()) + .build(); + } + + public static Command mapPutCommand( + ByteSequence key, ByteSequence value, PutOption option, ByteSequence namespace) { + PutRequest req = mapPutRequest(key, value, option, namespace); return Command.newBuilder() - .addKeys(KeyRange.newBuilder().setKey(ByteString.copyFrom(key.getBytes())).build()) + .addKeys(KeyRange.newBuilder().setKey(Util.prefixNamespace(key, namespace))) .setRequest( RequestWithToken.newBuilder().setPutRequest(req).build()) // TODO: add token .build(); @@ -44,27 +48,40 @@ public static Command mapPutRequest( * @param namespace the namespace binding to the command * @return the command */ - public static Command mapRangeRequest( + public static RangeRequest.Builder mapRangeRequest( + ByteSequence key, GetOption option, ByteSequence namespace) { + return RangeRequest.newBuilder() + .setKey(Util.prefixNamespace(key, namespace)) + .setCountOnly(option.isCountOnly()) + .setLimit(option.getLimit()) + .setRevision(option.getRevision()) + .setKeysOnly(option.isKeysOnly()) + .setSerializable(option.isSerializable()) + .setSortOrder(toRangeRequestSortOrder(option.getSortOrder())) + .setSortTarget(toRangeRequestSortTarget(option.getSortField())) + .setMinCreateRevision(option.getMinCreateRevision()) + .setMaxCreateRevision(option.getMaxCreateRevision()) + .setMinModRevision(option.getMinModRevision()) + .setMaxModRevision(option.getMaxModRevision()); + } + + public static Command mapRangeCommand( ByteSequence key, GetOption option, ByteSequence namespace) { - RangeRequest.Builder builder = - RangeRequest.newBuilder() - .setKey(Util.prefixNamespace(key, namespace)) - .setCountOnly(option.isCountOnly()) - .setLimit(option.getLimit()) - .setRevision(option.getRevision()) - .setKeysOnly(option.isKeysOnly()) - .setSerializable(option.isSerializable()) - .setSortOrder(toRangeRequestSortOrder(option.getSortOrder())) - .setSortTarget(toRangeRequestSortTarget(option.getSortField())) - .setMinCreateRevision(option.getMinCreateRevision()) - .setMaxCreateRevision(option.getMaxCreateRevision()) - .setMinModRevision(option.getMinModRevision()) - .setMaxModRevision(option.getMaxModRevision()); + RangeRequest.Builder builder = mapRangeRequest(key, option, namespace); + KeyRange.Builder keyRange = + KeyRange.newBuilder().setKey(Util.prefixNamespace(key, namespace)); defineRangeRequestEnd( - key, option.getEndKey(), option.isPrefix(), namespace, builder::setRangeEnd); + key, + option.getEndKey(), + option.isPrefix(), + namespace, + endKey -> { + builder.setRangeEnd(endKey); + keyRange.setRangeEnd(endKey); + }); return Command.newBuilder() - .addKeys(KeyRange.newBuilder().setKey(ByteString.copyFrom(key.getBytes())).build()) + .addKeys(keyRange) .setRequest( RequestWithToken.newBuilder() .setRangeRequest(builder.build()) @@ -80,18 +97,30 @@ public static Command mapRangeRequest( * @param namespace the namespace binding to the command * @return the command */ - public static Command mapDeleteRequest( + public static DeleteRangeRequest.Builder mapDeleteRequest( + ByteSequence key, DeleteOption option, ByteSequence namespace) { + return DeleteRangeRequest.newBuilder() + .setKey(Util.prefixNamespace(key, namespace)) + .setPrevKv(option.isPrevKV()); + } + + public static Command mapDeleteCommand( ByteSequence key, DeleteOption option, ByteSequence namespace) { - DeleteRangeRequest.Builder builder = - DeleteRangeRequest.newBuilder() - .setKey(Util.prefixNamespace(key, namespace)) - .setPrevKv(option.isPrevKV()); + DeleteRangeRequest.Builder builder = mapDeleteRequest(key, option, namespace); + KeyRange.Builder keyRange = + KeyRange.newBuilder().setKey(Util.prefixNamespace(key, namespace)); defineRangeRequestEnd( - key, option.getEndKey(), option.isPrefix(), namespace, builder::setRangeEnd); - + key, + option.getEndKey(), + option.isPrefix(), + namespace, + endKey -> { + builder.setRangeEnd(endKey); + keyRange.setRangeEnd(endKey); + }); return Command.newBuilder() - .addKeys(KeyRange.newBuilder().setKey(ByteString.copyFrom(key.getBytes())).build()) + .addKeys(keyRange) .setRequest( RequestWithToken.newBuilder() .setDeleteRangeRequest(builder.build()) diff --git a/jxline-core/src/test/java/KVTest.java b/jxline-core/src/test/java/KVTest.java new file mode 100644 index 0000000..beedf91 --- /dev/null +++ b/jxline-core/src/test/java/KVTest.java @@ -0,0 +1,237 @@ +import cloud.xline.jxline.Client; +import cloud.xline.jxline.KV; +import cloud.xline.jxline.Txn; +import cloud.xline.jxline.kv.DeleteResponse; +import cloud.xline.jxline.kv.GetResponse; +import cloud.xline.jxline.kv.PutResponse; +import cloud.xline.jxline.kv.TxnResponse; +import cloud.xline.jxline.op.Cmp; +import cloud.xline.jxline.op.CmpTarget; +import cloud.xline.jxline.op.Op; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.options.DeleteOption; +import io.etcd.jetcd.options.GetOption; +import io.etcd.jetcd.options.PutOption; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import static org.assertj.core.api.Assertions.*; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +@Timeout(value = 20) +public class KVTest { + private static KV kvClient; + + private static final ByteSequence SAMPLE_KEY = bytesOf("sample_key"); + private static final ByteSequence SAMPLE_VALUE = bytesOf("sample_value"); + private static final ByteSequence SAMPLE_KEY_2 = bytesOf("sample_key2"); + private static final ByteSequence SAMPLE_VALUE_2 = bytesOf("sample_value2"); + private static final ByteSequence SAMPLE_KEY_3 = bytesOf("sample_key3"); + + private static final String INIT_ENDPOINT = "http://127.0.0.1:2379"; + + @BeforeAll + static void onConnect() { + kvClient = Client.builder().endpoints(INIT_ENDPOINT).build().getKVClient(); + } + + public static ByteSequence bytesOf(final String string) { + return ByteSequence.from(string, StandardCharsets.UTF_8); + } + + public static String randomString() { + return java.util.UUID.randomUUID().toString(); + } + + @Test + void testItWorks() throws Exception { + ByteSequence key = ByteSequence.from("Hello Xline", Charset.defaultCharset()); + ByteSequence value = ByteSequence.from("Hi", Charset.defaultCharset()); + PutResponse putResponse = kvClient.put(key, value).get(); + assertThat(putResponse).isNotNull(); + GetResponse getResponse = kvClient.get(key).get(); + assertThat(getResponse).isNotNull(); + assertThat(getResponse.getCount()).isEqualTo(1); + assertThat(getResponse.getKvs().get(0).getValue()).isEqualTo(value); + } + + @Test + public void testByteSequence() { + ByteSequence prefix = bytesOf("/test-service/"); + ByteSequence subPrefix = bytesOf("uuids/"); + + String keyString = randomString(); + ByteSequence key = bytesOf(keyString); + ByteSequence prefixedKey = prefix.concat(subPrefix).concat(key); + assertThat(prefixedKey.startsWith(prefix)).isTrue(); + assertThat( + prefixedKey + .substring(prefix.size() + subPrefix.size()) + .toString(StandardCharsets.UTF_8)) + .isEqualTo(keyString); + assertThat(prefixedKey.substring(prefix.size(), prefix.size() + subPrefix.size())) + .isEqualTo(subPrefix); + } + + @Test + public void testPut() throws Exception { + CompletableFuture feature = kvClient.put(SAMPLE_KEY, SAMPLE_VALUE); + PutResponse response = feature.get(); + assertThat(response.getHeader()).isNotNull(); + assertThat(!response.hasPrevKv()).isTrue(); + } + + @Test + public void testGet() throws Exception { + CompletableFuture feature = kvClient.put(SAMPLE_KEY_2, SAMPLE_VALUE_2); + feature.get(); + CompletableFuture getFeature = kvClient.get(SAMPLE_KEY_2); + GetResponse response = getFeature.get(); + assertThat(response.getKvs()).hasSize(1); + assertThat(response.getKvs().get(0).getValue().toString(StandardCharsets.UTF_8)) + .isEqualTo(SAMPLE_VALUE_2.toString(StandardCharsets.UTF_8)); + assertThat(!response.isMore()).isTrue(); + } + + @Test + public void testGetWithRev() throws Exception { + CompletableFuture feature = kvClient.put(SAMPLE_KEY_3, SAMPLE_VALUE); + PutResponse putResp = feature.get(); + kvClient.put(SAMPLE_KEY_3, SAMPLE_VALUE_2).get(); + GetOption option = + GetOption.builder().withRevision(putResp.getHeader().getRevision()).build(); + CompletableFuture getFeature = kvClient.get(SAMPLE_KEY_3, option); + GetResponse response = getFeature.get(); + assertThat(response.getKvs()).hasSize(1); + assertThat(response.getKvs().get(0).getValue().toString(StandardCharsets.UTF_8)) + .isEqualTo(SAMPLE_VALUE.toString(StandardCharsets.UTF_8)); + } + + @Test + public void testDelete() throws Exception { + // Put content so that we actually have something to delete + testPut(); + + ByteSequence keyToDelete = SAMPLE_KEY; + + // count keys about to delete + CompletableFuture getFeature = kvClient.get(keyToDelete); + GetResponse resp = getFeature.get(); + + // delete the keys + CompletableFuture deleteFuture = kvClient.delete(keyToDelete); + DeleteResponse delResp = deleteFuture.get(); + assertThat(delResp.getDeleted()).isEqualTo(resp.getKvs().size()); + } + + @Test + public void testGetSortedPrefix() throws Exception { + String prefix = randomString(); + int numPrefix = 3; + putKeysWithPrefix(prefix, numPrefix); + + GetOption option = + GetOption.builder() + .withSortField(GetOption.SortTarget.KEY) + .withSortOrder(GetOption.SortOrder.DESCEND) + .isPrefix(true) + .build(); + CompletableFuture getFeature = kvClient.get(bytesOf(prefix), option); + GetResponse response = getFeature.get(); + + assertThat(response.getKvs()).hasSize(numPrefix); + for (int i = 0; i < numPrefix; i++) { + assertThat(response.getKvs().get(i).getKey().toString(StandardCharsets.UTF_8)) + .isEqualTo(prefix + (numPrefix - i - 1)); + assertThat(response.getKvs().get(i).getValue().toString(StandardCharsets.UTF_8)) + .isEqualTo(String.valueOf(numPrefix - i - 1)); + } + } + + @Test + public void testGetAndDeleteWithPrefix() throws Exception { + String prefix = randomString(); + ByteSequence key = bytesOf(prefix); + int numPrefixes = 10; + + putKeysWithPrefix(prefix, numPrefixes); + + // verify get withPrefix. + CompletableFuture getFuture = + kvClient.get(key, GetOption.builder().isPrefix(true).build()); + GetResponse getResp = getFuture.get(); + assertThat(getResp.getCount()).isEqualTo(numPrefixes); + + // verify del withPrefix. + DeleteOption deleteOpt = DeleteOption.builder().isPrefix(true).build(); + CompletableFuture delFuture = kvClient.delete(key, deleteOpt); + DeleteResponse delResp = delFuture.get(); + assertThat(delResp.getDeleted()).isEqualTo(numPrefixes); + } + + private static void putKeysWithPrefix(String prefix, int numPrefixes) + throws ExecutionException, InterruptedException { + for (int i = 0; i < numPrefixes; i++) { + ByteSequence key = bytesOf(prefix + i); + ByteSequence value = bytesOf("" + i); + kvClient.put(key, value).get(); + } + } + + @Test + public void testTxn() throws Exception { + ByteSequence sampleKey = bytesOf("txn_key"); + ByteSequence sampleValue = bytesOf("xyz"); + ByteSequence cmpValue = bytesOf("abc"); + ByteSequence putValue = bytesOf("XYZ"); + ByteSequence putValueNew = bytesOf("ABC"); + // put the original txn key value pair + kvClient.put(sampleKey, sampleValue).get(); + + // construct txn operation + Txn txn = kvClient.txn(); + Cmp cmp = new Cmp(sampleKey, Cmp.Op.GREATER, CmpTarget.value(cmpValue)); + CompletableFuture txnResp = + txn.If(cmp) + .Then(Op.put(sampleKey, putValue, PutOption.DEFAULT)) + .Else(Op.put(sampleKey, putValueNew, PutOption.DEFAULT)) + .commit(); + txnResp.get(); + // get the value + GetResponse getResp = kvClient.get(sampleKey).get(); + assertThat(getResp.getKvs()).hasSize(1); + assertThat(getResp.getKvs().get(0).getValue().toString(StandardCharsets.UTF_8)) + .isEqualTo(putValue.toString(StandardCharsets.UTF_8)); + } + + @Test + public void testTxnForCmpOpNotEqual() throws Exception { + ByteSequence sampleKey = bytesOf("txn_key"); + ByteSequence sampleValue = bytesOf("xyz"); + ByteSequence cmpValue = bytesOf("abc"); + ByteSequence putValue = bytesOf("XYZ"); + ByteSequence putValueNew = bytesOf("ABC"); + // put the original txn key value pair + kvClient.put(sampleKey, sampleValue).get(); + + // construct txn operation + Txn txn = kvClient.txn(); + Cmp cmp = new Cmp(sampleKey, Cmp.Op.NOT_EQUAL, CmpTarget.value(cmpValue)); + CompletableFuture txnResp = + txn.If(cmp) + .Then(Op.put(sampleKey, putValue, PutOption.DEFAULT)) + .Else(Op.put(sampleKey, putValueNew, PutOption.DEFAULT)) + .commit(); + txnResp.get(); + // get the value + GetResponse getResp = kvClient.get(sampleKey).get(); + assertThat(getResp.getKvs()).hasSize(1); + assertThat(getResp.getKvs().get(0).getValue().toString(StandardCharsets.UTF_8)) + .isEqualTo(putValue.toString(StandardCharsets.UTF_8)); + } +} diff --git a/jxline-core/src/test/java/ProtocolTest.java b/jxline-core/src/test/java/ProtocolTest.java index 3e56ab7..ffdaaf3 100644 --- a/jxline-core/src/test/java/ProtocolTest.java +++ b/jxline-core/src/test/java/ProtocolTest.java @@ -10,9 +10,9 @@ @Timeout(value = 20) public class ProtocolTest { - static ProtocolClient client; + private static ProtocolClient client; - static String INIT_ENDPOINT = "http://172.20.0.5:2379"; + private static final String INIT_ENDPOINT = "http://127.0.0.1:2379"; @BeforeAll static void onConnect() { diff --git a/scripts/quick_start.sh b/scripts/quick_start.sh deleted file mode 100755 index f522f54..0000000 --- a/scripts/quick_start.sh +++ /dev/null @@ -1,71 +0,0 @@ -#!/bin/bash -DIR=$( - cd "$(dirname "$0")" - pwd -) -SERVERS=("172.20.0.2" "172.20.0.3" "172.20.0.4" "172.20.0.5") -MEMBERS="node1=${SERVERS[1]}:2379,node2=${SERVERS[2]}:2379,node3=${SERVERS[3]}:2379" - -# run xline node by index -# args: -# $1: index of the node -run_xline() { - cmd="/usr/local/bin/xline \ - --name node${1} \ - --members ${MEMBERS} \ - --storage-engine rocksdb \ - --data-dir /usr/local/xline/data-dir \ - --auth-public-key /mnt/public.pem \ - --auth-private-key /mnt/private.pem" - - if [ ${1} -eq 1 ]; then - cmd="${cmd} --is-leader" - fi - - docker exec -e RUST_LOG=debug -d node${1} ${cmd} - echo "command is: docker exec -e RUST_LOG=debug -d node${1} ${cmd}" -} - -# run cluster of xline/etcd in container -run_cluster() { - echo cluster starting - run_xline 1 & - run_xline 2 & - run_xline 3 & - wait - echo cluster started -} - -# stop all containers -stop_all() { - echo stopping - for name in "node1" "node2" "node3" "node4"; do - docker_id=$(docker ps -qf "name=${name}") - if [ -n "$docker_id" ]; then - docker stop $docker_id - fi - done - sleep 1 - echo stopped -} - -# run container of xline/etcd use specified image -# args: -# $1: size of cluster -run_container() { - echo container starting - size=${1} - image="ghcr.io/xline-kv/xline:latest" - for ((i = 1; i <= ${size}; i++)); do - docker run -d -it --rm --name=node${i} --net=xline_net --ip=${SERVERS[$i]} --cap-add=NET_ADMIN --cpu-shares=1024 -m=512M -v ${DIR}:/mnt ${image} bash & - done - docker run -d -it --rm --name=node4 --net=xline_net --ip=${SERVERS[0]} --cap-add=NET_ADMIN --cpu-shares=1024 -m=512M -v ${DIR}:/mnt gcr.io/etcd-development/etcd:v3.5.5 bash & - wait - echo container started -} - -stop_all -docker network create --subnet=172.20.0.0/24 xline_net >/dev/null 2>&1 - -run_container 3 -run_cluster