Skip to content

Commit 55cc423

Browse files
committed
feat(compress): add the compress method for large content
Signed-off-by: kaixuan xu <[email protected]>
1 parent bca013d commit 55cc423

File tree

4 files changed

+83
-28
lines changed

4 files changed

+83
-28
lines changed

opengemini-client-api/src/main/java/io/opengemini/client/api/Configuration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class Configuration {
4141

4242
CompressMethod compressMethod;
4343

44-
// deprecated
44+
// deprecated, will use compressMethod and contentType
4545
boolean gzipEnabled;
4646

4747
HttpClientConfig httpConfig;

opengemini-client-common/src/main/java/io/opengemini/client/common/BaseClient.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ public BaseClient(Configuration conf) {
5555
contentEncodingHeader.add("gzip");
5656
headers.put("Content-Encoding", contentEncodingHeader);
5757
}
58+
59+
applyCodec(conf);
60+
5861
String httpPrefix;
5962
if (conf.getHttpConfig().tlsConfig() != null) {
6063
httpPrefix = "https://";
@@ -77,6 +80,37 @@ public BaseClient(Configuration conf) {
7780
scheduler.ifPresent(this::startHealthCheck);
7881
}
7982

83+
private void applyCodec(Configuration config) {
84+
if (headers == null) {
85+
throw new IllegalStateException("Headers map is not initialized");
86+
}
87+
88+
List<String> acceptHeader = new ArrayList<>();
89+
switch (config.getContentType()) {
90+
case MSGPACK:
91+
acceptHeader.add("application/msgpack");
92+
break;
93+
case JSON:
94+
acceptHeader.add("application/json");
95+
break;
96+
}
97+
headers.put("Accept", acceptHeader);
98+
99+
List<String> acceptEncodingHeader = new ArrayList<>();
100+
switch (config.getCompressMethod()) {
101+
case GZIP:
102+
acceptEncodingHeader.add("gzip");
103+
break;
104+
case ZSTD:
105+
acceptEncodingHeader.add("zstd");
106+
break;
107+
case SNAPPY:
108+
acceptEncodingHeader.add("snappy");
109+
break;
110+
}
111+
headers.put("Accept-Encoding", acceptEncodingHeader);
112+
}
113+
80114
/**
81115
* Health Check
82116
* Start schedule task(period 10s) to ping all server url

opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,19 @@
1616

1717
package io.opengemini.client.impl;
1818

19+
import com.sun.org.apache.bcel.internal.generic.RETURN;
1920
import io.github.openfacade.http.BasicAuthRequestFilter;
2021
import io.github.openfacade.http.HttpClient;
2122
import io.github.openfacade.http.HttpClientConfig;
2223
import io.github.openfacade.http.HttpClientFactory;
2324
import io.github.openfacade.http.HttpResponse;
24-
import io.opengemini.client.api.AuthConfig;
25-
import io.opengemini.client.api.AuthType;
26-
import io.opengemini.client.api.Configuration;
27-
import io.opengemini.client.api.OpenGeminiException;
28-
import io.opengemini.client.api.Pong;
29-
import io.opengemini.client.api.Query;
30-
import io.opengemini.client.api.QueryResult;
25+
import io.opengemini.client.api.*;
3126
import io.opengemini.client.common.BaseAsyncClient;
3227
import io.opengemini.client.common.HeaderConst;
3328
import io.opengemini.client.common.JacksonService;
29+
import io.opengemini.client.common.compress.GzipCompressor;
30+
import io.opengemini.client.common.compress.SnappyCompressor;
31+
import io.opengemini.client.common.compress.ZstdCompressor;
3432
import org.jetbrains.annotations.NotNull;
3533

3634
import java.io.IOException;
@@ -122,22 +120,37 @@ protected CompletableFuture<Pong> executePing() {
122120
}
123121

124122
private <T> T processResponseBody(HttpResponse response, Class<T> type) throws IOException {
125-
String contentType = response.headers().get("Content-Type").get(0);
126-
byte[] body = response.body();
127-
if (contentType.contains("application/x-gzip")) {
128-
// Handle gzip content type
129-
// body = GzipService.decompress(body);
123+
String contentType = response.headers().get("Content-Type") != null ? response.headers().get("Content-Type").get(0) : null;
124+
String contentEncoding = response.headers().get("Content-Encoding") != null ? response.headers().get("Content-Encoding").get(0) : null;
125+
byte[] body = processCompression(contentEncoding, response.body(), type);
126+
127+
return processContentType(contentType, body, type);
128+
}
129+
130+
private <T> byte[] processCompression(String compressMethod, byte[] body, Class<T> type ) throws IOException {
131+
byte[] decompressedBody = null;
132+
if (CompressMethod.GZIP.getValue().equals(compressMethod)) {
133+
GzipCompressor compressor = new GzipCompressor();
134+
decompressedBody = compressor.decompress(body);
135+
} else if (CompressMethod.SNAPPY.getValue().equals(compressMethod)) {
136+
SnappyCompressor compressor = new SnappyCompressor();
137+
decompressedBody = compressor.decompress(body);
138+
139+
} else if (CompressMethod.ZSTD.getValue().equals(compressMethod)) {
140+
ZstdCompressor compressor = new ZstdCompressor();
141+
decompressedBody = compressor.decompress(body);
130142
}
131143

132-
if (contentType.contains("application/msgpack")) {
133-
// Handle msgpack content type
134-
//return MsgPackService.toObject(body, type);
135-
} else if (contentType.contains("application/json")) {
136-
// Handle JSON content type
144+
return decompressedBody != null ? decompressedBody : body;
145+
}
146+
147+
private <T> T processContentType(String contentType, byte[] body, Class<T> type) throws IOException {
148+
if (ContentType.JSON.getValue().equals(contentType)) {
137149
return JacksonService.toObject(body, type);
150+
} else if (ContentType.MSGPACK.getValue().equals(contentType)) {
151+
throw new IOException("Unsupported content type: " + contentType);
138152
}
139-
// Default handling
140-
return JacksonService.toObject(body, type);
153+
return JacksonService.toObject(body, type);
141154
}
142155

143156
public CompletableFuture<HttpResponse> get(String url) {

opengemini-client/src/test/java/io/opengemini/client/impl/OpenGeminiClientWriteTest.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,7 @@
1818

1919
import io.github.openfacade.http.HttpClientConfig;
2020
import io.github.openfacade.http.HttpClientEngine;
21-
import io.opengemini.client.api.Address;
22-
import io.opengemini.client.api.Configuration;
23-
import io.opengemini.client.api.OpenGeminiException;
24-
import io.opengemini.client.api.Point;
25-
import io.opengemini.client.api.Query;
26-
import io.opengemini.client.api.QueryResult;
27-
import io.opengemini.client.api.RpConfig;
28-
import io.opengemini.client.api.Series;
21+
import io.opengemini.client.api.*;
2922
import org.junit.jupiter.api.AfterAll;
3023
import org.junit.jupiter.api.Assertions;
3124
import org.junit.jupiter.api.TestInstance;
@@ -64,6 +57,21 @@ protected List<OpenGeminiClient> clientList() throws OpenGeminiException {
6457
.build();
6558
clients.add(OpenGeminiClientFactory.create(configuration));
6659
}
60+
61+
List<CompressMethod> compressMethods = Arrays.asList(CompressMethod.GZIP, CompressMethod.ZSTD, CompressMethod.SNAPPY);
62+
for (CompressMethod compressMethod : compressMethods) {
63+
HttpClientConfig httpConfig = new HttpClientConfig.Builder()
64+
.engine(HttpClientEngine.AsyncHttpClient)
65+
.connectTimeout(Duration.ofSeconds(3))
66+
.timeout(Duration.ofSeconds(3))
67+
.build();
68+
Configuration configuration = Configuration.builder()
69+
.addresses(Collections.singletonList(new Address("127.0.0.1", 8086)))
70+
.httpConfig(httpConfig)
71+
.compressMethod(compressMethod)
72+
.build();
73+
clients.add(OpenGeminiClientFactory.create(configuration));
74+
}
6775
return clients;
6876
}
6977

0 commit comments

Comments
 (0)