Skip to content

Commit b4257c5

Browse files
committed
add support for TopK
1 parent 141cb42 commit b4257c5

File tree

4 files changed

+158
-3
lines changed

4 files changed

+158
-3
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>com.redislabs</groupId>
88
<artifactId>jrebloom</artifactId>
9-
<version>1.3.0-SNAPSHOT</version>
9+
<version>2.0.0-SNAPSHOT</version>
1010

1111
<name>JReBloom</name>
1212
<description>Official client for ReBloom</description>

src/main/java/io/rebloom/client/Client.java

Lines changed: 116 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,15 @@
44
import redis.clients.jedis.Jedis;
55
import redis.clients.jedis.JedisPool;
66
import redis.clients.jedis.JedisPoolConfig;
7+
import redis.clients.jedis.Protocol;
8+
import redis.clients.jedis.commands.ProtocolCommand;
79
import redis.clients.jedis.exceptions.JedisException;
810
import redis.clients.jedis.util.Pool;
11+
import redis.clients.jedis.util.SafeEncoder;
912

1013
import java.io.Closeable;
1114
import java.util.*;
15+
import java.util.stream.Collectors;
1216

1317
/**
1418
* Client is the main ReBloom client class, wrapping connection management and all ReBloom commands
@@ -20,13 +24,20 @@ Jedis _conn() {
2024
return pool.getResource();
2125
}
2226

23-
private Connection sendCommand(Jedis conn, Command command, String ...args) {
27+
private Connection sendCommand(Jedis conn, String key, ProtocolCommand command, String ...args) {
28+
String[] fullArgs = new String[args.length + 1];
29+
fullArgs[0] = key;
30+
System.arraycopy(args, 0, fullArgs, 1, args.length);
31+
return sendCommand(conn, command, fullArgs);
32+
}
33+
34+
private Connection sendCommand(Jedis conn, ProtocolCommand command, String ...args) {
2435
Connection client = conn.getClient();
2536
client.sendCommand(command, args);
2637
return client;
2738
}
2839

29-
private Connection sendCommand(Jedis conn, Command command, byte[]... args) {
40+
private Connection sendCommand(Jedis conn, ProtocolCommand command, byte[]... args) {
3041
Connection client = conn.getClient();
3142
client.sendCommand(command, args);
3243
return client;
@@ -204,6 +215,109 @@ public boolean delete(String name) {
204215
return conn.del(name) != 0;
205216
}
206217
}
218+
219+
/**
220+
* TOPK.RESERVE key topk width depth decay
221+
*
222+
* Reserve a topk filter.
223+
* @param key The key of the filter
224+
* @param topk
225+
* @param width
226+
* @param depth
227+
* @param decay
228+
*
229+
* Note that if a filter is not reserved, a new one is created when {@link #add(String, byte[])}
230+
* is called.
231+
*/
232+
public void topkCreateFilter(String key, long topk, long width, long depth, double decay) {
233+
try (Jedis conn = _conn()) {
234+
String rep = sendCommand(conn, TopKCommand.RESERVE, SafeEncoder.encode(key), Protocol.toByteArray(topk),
235+
Protocol.toByteArray(width), Protocol.toByteArray(depth),Protocol.toByteArray(decay))
236+
.getStatusCodeReply();
237+
238+
if (!rep.equals("OK")) {
239+
throw new JedisException(rep);
240+
}
241+
}
242+
}
243+
244+
/**
245+
* TOPK.ADD key item [item ...]
246+
*
247+
* Adds an item to the filter
248+
* @param key The key of the filter
249+
* @param items The items to add to the filter
250+
* @return list of items dropped from the list.
251+
*/
252+
public List<String> topkAdd(String key, String ...items) {
253+
try (Jedis conn = _conn()) {
254+
return sendCommand(conn, key, TopKCommand.ADD, items).getMultiBulkReply();
255+
}
256+
}
257+
258+
/**
259+
* TOPK.INCRBY key item increment [item increment ...]
260+
*
261+
* Adds an item to the filter
262+
* @param key The key of the filter
263+
* @param item The item to to increment
264+
* @return list of items dropped from the list.
265+
*/
266+
public String topkIncrBy(String key, String item, long increment) {
267+
try (Jedis conn = _conn()) {
268+
return sendCommand(conn, TopKCommand.INCRBY, SafeEncoder.encode(key), SafeEncoder.encode(item), Protocol.toByteArray(increment))
269+
.getMultiBulkReply().get(0);
270+
}
271+
}
272+
273+
/**
274+
* TOPK.QUERY key item [item ...]
275+
*
276+
* Checks whether an item is one of Top-K items.
277+
*
278+
* @param key The key of the filter
279+
* @param items The items to check in the list
280+
* @return list of indicator for each item requested
281+
*/
282+
public List<Boolean> topkQuery(String key, String ...items) {
283+
try (Jedis conn = _conn()) {
284+
return sendCommand(conn, key, TopKCommand.QUERY, items)
285+
.getIntegerMultiBulkReply()
286+
.stream().map(s -> s!=0)
287+
.collect(Collectors.toList());
288+
}
289+
}
290+
291+
/**
292+
* TOPK.COUNT key item [item ...]
293+
*
294+
* Returns count for an item.
295+
*
296+
* @param key The key of the filter
297+
* @param items The items to check in the list
298+
* @return list of counters per item.
299+
*/
300+
public List<Long> topkCount(String key, String ...items) {
301+
try (Jedis conn = _conn()) {
302+
return sendCommand(conn, key, TopKCommand.COUNT, items)
303+
.getIntegerMultiBulkReply();
304+
}
305+
}
306+
307+
/**
308+
* TOPK.LIST key
309+
*
310+
* Return full list of items in Top K list.
311+
*
312+
* @param key The key of the filter
313+
* @return list of items in the list.
314+
*/
315+
public List<String> topkList(String key) {
316+
try (Jedis conn = _conn()) {
317+
return sendCommand(conn, TopKCommand.LIST, key)
318+
.getMultiBulkReply();
319+
}
320+
}
207321

208322
@Override
209323
public void close(){
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package io.rebloom.client;
2+
3+
import redis.clients.jedis.commands.ProtocolCommand;
4+
import redis.clients.jedis.util.SafeEncoder;
5+
6+
public enum TopKCommand implements ProtocolCommand {
7+
RESERVE("TOPK.RESERVE"),
8+
ADD("TOPK.ADD"),
9+
INCRBY("TOPK.INCRBY"),
10+
QUERY("TOPK.QUERY"),
11+
COUNT("TOPK.COUNT"),
12+
LIST("TOPK.LIST"),
13+
INFO("TOPK.INFO");
14+
15+
private final byte[] raw;
16+
17+
TopKCommand(String alt) {
18+
raw = SafeEncoder.encode(alt);
19+
}
20+
21+
public byte[] getRaw() {
22+
return raw;
23+
}
24+
}

src/test/java/io/rebloom/client/ClientTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,5 +132,22 @@ public void testExample() {
132132
client.createFilter("specialBloom", 10000, 0.0001);
133133
client.add("specialBloom", "foo");
134134
}
135+
136+
@Test
137+
public void createTopKFilter() {
138+
cl.topkCreateFilter("aaa", 30, 2000, 7, 0.925);
139+
140+
assertEquals(Arrays.asList(null, null), cl.topkAdd("aaa", "bb", "cc"));
141+
142+
assertEquals(Arrays.asList(true, false, true), cl.topkQuery("aaa", "bb", "gg", "cc"));
143+
144+
assertEquals(Arrays.asList(1L, 0L, 1L), cl.topkCount("aaa", "bb", "gg", "cc"));
145+
146+
assertTrue( cl.topkList("aaa").stream().allMatch( s -> Arrays.asList("bb", "cc").contains(s) || s == null));
147+
148+
assertEquals(null, cl.topkIncrBy("aaa", "ff", 10));
149+
150+
assertTrue( cl.topkList("aaa").stream().allMatch( s -> Arrays.asList("bb", "cc", "ff").contains(s) || s == null));
151+
}
135152

136153
}

0 commit comments

Comments
 (0)