Skip to content

Commit eaa2f7e

Browse files
committed
Add new endpoint
1 parent aff5eb6 commit eaa2f7e

File tree

4 files changed

+170
-33
lines changed

4 files changed

+170
-33
lines changed

src/main/java/com/lbry/globe/handler/HTTPHandler.java

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
import com.lbry.globe.Main;
44
import com.lbry.globe.api.API;
55

6+
import com.lbry.globe.util.DHT;
7+
import com.lbry.globe.util.Hex;
8+
import com.lbry.globe.util.TimeoutFutureManager;
9+
import com.lbry.globe.util.UDP;
610
import io.netty.buffer.ByteBuf;
711
import io.netty.buffer.Unpooled;
812
import io.netty.channel.ChannelHandlerContext;
@@ -13,9 +17,11 @@
1317
import java.io.ByteArrayOutputStream;
1418
import java.io.InputStream;
1519
import java.io.IOException;
20+
import java.net.InetSocketAddress;
1621
import java.net.URI;
1722
import java.security.MessageDigest;
1823
import java.util.*;
24+
import java.util.concurrent.CompletableFuture;
1925
import java.util.logging.Level;
2026
import java.util.logging.Logger;
2127

@@ -85,6 +91,106 @@ private void handleResponse(ChannelHandlerContext ctx){
8591
ctx.write(response);
8692
return;
8793
}
94+
if("/api/command".equals(uri.getPath())){
95+
JSONObject json = new JSONObject();
96+
97+
String[] queryParts = uri.getQuery()!=null?uri.getQuery().split(";"):new String[]{""};
98+
if("ping".equals(queryParts[0]) || "findNode".equals(queryParts[0]) || "findValue".equals(queryParts[0])){
99+
//STORE IS NOT SUPPORTED
100+
json.put("query",queryParts);
101+
102+
Map<InetSocketAddress,Boolean> peers = DHT.getPeers();
103+
CompletableFuture<UDP.Packet>[] futures = new CompletableFuture[peers.size()];
104+
int i=0;
105+
for(Map.Entry<InetSocketAddress,Boolean> entry : peers.entrySet()){
106+
try{
107+
if("ping".equals(queryParts[0])){
108+
futures[i] = DHT.ping(DHT.getSocket(),entry.getKey());
109+
}
110+
if("findNode".equals(queryParts[0])){
111+
futures[i] = DHT.findNode(DHT.getSocket(),entry.getKey(),queryParts.length>=2?Hex.decode(queryParts[1]):new byte[48]);
112+
}
113+
if("findValue".equals(queryParts[0])){
114+
futures[i] = DHT.findValue(DHT.getSocket(),entry.getKey(),queryParts.length>=2?Hex.decode(queryParts[1]):new byte[48]);
115+
}
116+
}catch(IOException ignored){}
117+
i++;
118+
}
119+
120+
CompletableFuture<List<UDP.Packet>> total = TimeoutFutureManager.getBulk(futures);
121+
122+
JSONObject jsonData = new JSONObject();
123+
json.put("data",jsonData);
124+
try{
125+
List<UDP.Packet> responses = total.get();
126+
for(UDP.Packet resp : responses){
127+
if(resp!=null){
128+
DHT.Message<?> message = DHT.Message.fromBencode(resp.getData());
129+
if("ping".equals(queryParts[0])){
130+
String pong = (String) message.getPayload();
131+
jsonData.put(resp.getAddress().getAddress().getHostAddress()+":"+resp.getAddress().getPort(),pong);
132+
}
133+
if("findNode".equals(queryParts[0])){
134+
JSONArray payload = new JSONArray();
135+
List<List<Object>> nodes = (List<List<Object>>) message.getPayload();
136+
for(List<Object> node : nodes){
137+
JSONObject p = new JSONObject();
138+
p.put("nodeID", Hex.encode((byte[]) node.get(0)));
139+
p.put("hostname",node.get(1));
140+
p.put("port",node.get(2));
141+
payload.put(p);
142+
}
143+
jsonData.put(resp.getAddress().getAddress().getHostAddress()+":"+resp.getAddress().getPort(),payload);
144+
}
145+
if("findValue".equals(queryParts[0])){
146+
Map<String,Object> map = (Map<String, Object>) message.getPayload();
147+
JSONObject payload = new JSONObject();
148+
payload.put("p",map.get("p"));
149+
payload.put("protocolVersion",map.get("protocolVersion"));
150+
JSONArray contacts = new JSONArray();
151+
List<List<Object>> nodes = (List<List<Object>>) map.get("contacts");
152+
for(List<Object> node : nodes){
153+
JSONObject p = new JSONObject();
154+
p.put("nodeID", Hex.encode((byte[]) node.get(0)));
155+
p.put("hostname",node.get(1));
156+
p.put("port",node.get(2));
157+
contacts.put(p);
158+
}
159+
payload.put("contacts",contacts);
160+
//payload.put("token",Hex.encode((byte[]) map.get("token")));
161+
jsonData.put(resp.getAddress().getAddress().getHostAddress()+":"+resp.getAddress().getPort(),payload);
162+
}
163+
}
164+
}
165+
}catch(Exception e){
166+
e.printStackTrace();
167+
}
168+
169+
170+
171+
// for(Map.Entry<InetSocketAddress,Boolean> dest : DHT.getPeers().entrySet()){
172+
// if(!dest.getValue()){
173+
// try{
174+
//
175+
// UDP.Packet packet = DHT.ping(DHT.getSocket(),dest.getKey()).get(1, TimeUnit.SECONDS);
176+
// DHT.Message<?> message = DHT.Message.fromBencode(packet.getData());
177+
// json.put(dest.getKey().toString(),message.getPayload());
178+
// }catch(Exception e){
179+
// json.put(dest.getKey().toString(),e.toString());
180+
// }
181+
// }
182+
// }
183+
}else{
184+
json.put("error","Expecting one of 'ping','findNode' or 'findValue'.");
185+
}
186+
187+
ByteBuf responseContent = Unpooled.copiedBuffer(json.toString().getBytes());
188+
FullHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(),HttpResponseStatus.OK,responseContent);
189+
response.headers().add("Content-Length",responseContent.capacity());
190+
response.headers().add("Content-Type","application/json");
191+
ctx.write(response);
192+
return;
193+
}
88194
byte[] fileData = null;
89195
try{
90196
fileData = HTTPHandler.readResource(HTTPHandler.getResource(uri.getPath().substring(1)));

src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import java.io.IOException;
1111
import java.net.*;
1212
import java.util.*;
13-
import java.util.concurrent.ConcurrentHashMap;
1413
import java.util.concurrent.ConcurrentLinkedQueue;
1514

1615
import org.json.JSONObject;
@@ -29,25 +28,13 @@ public class DHTNodeFinderThread implements Runnable{
2928
"s2.lbry.network:4444",
3029
};
3130

32-
private static final DatagramSocket SOCKET;
33-
34-
static{
35-
try{
36-
SOCKET = new DatagramSocket();
37-
}catch(SocketException e){
38-
throw new RuntimeException(e);
39-
}
40-
}
41-
42-
private final Map<InetSocketAddress,Boolean> pingableDHTs = new ConcurrentHashMap<>();
43-
4431
private final Queue<UDP.Packet> incoming = new ConcurrentLinkedQueue<>();
4532

4633
@Override
4734
public void run(){
4835
for(String bootstrap : DHTNodeFinderThread.BOOTSTRAP){
4936
URI uri = URI.create("udp://"+bootstrap);
50-
this.pingableDHTs.put(new InetSocketAddress(uri.getHost(),uri.getPort()),true);
37+
DHT.getPeers().put(new InetSocketAddress(uri.getHost(),uri.getPort()),true);
5138
}
5239

5340
this.startSender();
@@ -59,7 +46,7 @@ private void startSender(){
5946
new Thread(() -> {
6047
while(true){
6148
System.out.println("[BULK PING]");
62-
for(InetSocketAddress socketAddress : DHTNodeFinderThread.this.pingableDHTs.keySet()){
49+
for(InetSocketAddress socketAddress : DHT.getPeers().keySet()){
6350
String hostname = socketAddress.getHostName();
6451
int port = socketAddress.getPort();
6552
try{
@@ -82,7 +69,7 @@ private void startSender(){
8269
}
8370

8471
private void doPing(InetSocketAddress destination) throws IOException{
85-
DHT.ping(DHTNodeFinderThread.SOCKET,destination).thenAccept((UDP.Packet packet) -> {
72+
DHT.ping(DHT.getSocket(),destination).thenAccept((UDP.Packet packet) -> {
8673
byte[] receivingBytes = packet.getData();
8774
DHT.Message<?> message = DHT.Message.fromBencode(receivingBytes);
8875
System.out.println(" - [Ping Response] "+message);
@@ -119,7 +106,7 @@ private void doPing(InetSocketAddress destination) throws IOException{
119106
}
120107

121108
private void doFindNode(InetSocketAddress destination) throws IOException{
122-
DHT.findNode(DHTNodeFinderThread.SOCKET,destination).thenAccept((UDP.Packet packet) -> {
109+
DHT.findNode(DHT.getSocket(),destination,new byte[48]).thenAccept((UDP.Packet packet) -> {
123110
byte[] receivingBytes = packet.getData();
124111
DHT.Message<?> message = DHT.Message.fromBencode(receivingBytes);
125112
System.out.println(" - [FindNode Response] "+message);
@@ -129,13 +116,13 @@ private void doFindNode(InetSocketAddress destination) throws IOException{
129116
String hostname = (String) n.get(1);
130117
int port = (int) ((long) n.get(2));
131118
InetSocketAddress existingSocketAddr = null;
132-
for(InetSocketAddress addr : this.pingableDHTs.keySet()){
119+
for(InetSocketAddress addr : DHT.getPeers().keySet()){
133120
if(addr.getHostName().equals(hostname) && addr.getPort()==port){
134121
existingSocketAddr = addr;
135122
}
136123
}
137124
if(existingSocketAddr==null){
138-
this.pingableDHTs.put(new InetSocketAddress(hostname,port),false);
125+
DHT.getPeers().put(new InetSocketAddress(hostname,port),false);
139126
}
140127
}
141128
}).exceptionally((Throwable e) -> null);
@@ -145,7 +132,7 @@ private void startReceiver(){
145132
new Thread(() -> {
146133
while(true) {
147134
try {
148-
UDP.Packet receiverPacket = UDP.receive(DHTNodeFinderThread.SOCKET);
135+
UDP.Packet receiverPacket = UDP.receive(DHT.getSocket());
149136
DHTNodeFinderThread.this.incoming.add(receiverPacket);
150137

151138
byte[] receivingBytes = receiverPacket.getData();
@@ -161,7 +148,7 @@ private void startReceiver(){
161148
}
162149

163150
private void handleIncomingMessages(){
164-
while(DHTNodeFinderThread.SOCKET.isBound()){
151+
while(DHT.getSocket().isBound()){
165152
while(this.incoming.peek()!=null){
166153
UDP.Packet receiverPacket = this.incoming.poll();
167154
byte[] receivingBytes = receiverPacket.getData();

src/main/java/com/lbry/globe/util/DHT.java

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,44 +3,71 @@
33
import java.io.IOException;
44
import java.net.DatagramSocket;
55
import java.net.InetSocketAddress;
6+
import java.net.SocketException;
67
import java.nio.ByteBuffer;
78
import java.util.*;
8-
import java.util.concurrent.CompletableFuture;
9-
import java.util.concurrent.Executors;
10-
import java.util.concurrent.ScheduledExecutorService;
11-
import java.util.concurrent.TimeUnit;
9+
import java.util.concurrent.*;
1210

1311
public class DHT{
1412

15-
private static final ScheduledExecutorService ses = Executors.newScheduledThreadPool(12);
16-
private static final TimeoutFutureManager<RPCID,UDP.Packet> futureManager = new TimeoutFutureManager<>(ses);
13+
public static byte[] NODE_ID = new byte[48];
14+
15+
private static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
16+
private static final TimeoutFutureManager<RPCID,UDP.Packet> futureManager = new TimeoutFutureManager<>(executor);
17+
private static final Map<InetSocketAddress,Boolean> peers = new ConcurrentHashMap<>();
18+
private static final DatagramSocket socket;
19+
20+
static{
21+
try{
22+
socket = new DatagramSocket();
23+
}catch(SocketException e){
24+
throw new RuntimeException(e);
25+
}
26+
}
1727

1828
public static TimeoutFutureManager<RPCID,UDP.Packet> getFutureManager(){
1929
return DHT.futureManager;
2030
}
2131

32+
public static DatagramSocket getSocket(){
33+
return DHT.socket;
34+
}
35+
36+
public static Map<InetSocketAddress,Boolean> getPeers(){
37+
return DHT.peers;
38+
}
39+
2240
public static CompletableFuture<UDP.Packet> ping(DatagramSocket socket,InetSocketAddress destination) throws IOException {
2341
byte[] rpcID = new byte[20];
2442
new Random().nextBytes(rpcID);
2543

26-
DHT.Message<String> pingMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,rpcID,new byte[48],"ping",Collections.singletonList(Collections.singletonMap("protocolVersion",1)));
44+
DHT.Message<String> pingMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,rpcID,DHT.NODE_ID,"ping",Collections.singletonList(Collections.singletonMap("protocolVersion",1)));
2745

2846
return DHT.sendWithFuture(socket,destination,pingMessage);
2947
}
3048

31-
public static CompletableFuture<UDP.Packet> findNode(DatagramSocket socket,InetSocketAddress destination) throws IOException{
49+
public static CompletableFuture<UDP.Packet> findNode(DatagramSocket socket,InetSocketAddress destination,byte[] key) throws IOException{
3250
byte[] rpcID = new byte[20];
3351
new Random().nextBytes(rpcID);
3452

35-
DHT.Message<String> findNodeMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,rpcID,new byte[48],"findNode",Arrays.asList(new byte[48],Collections.singletonMap("protocolVersion",1)));
53+
DHT.Message<String> findNodeMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,rpcID,DHT.NODE_ID,"findNode",Arrays.asList(key,Collections.singletonMap("protocolVersion",1)));
54+
55+
return DHT.sendWithFuture(socket,destination,findNodeMessage);
56+
}
57+
58+
public static CompletableFuture<UDP.Packet> findValue(DatagramSocket socket,InetSocketAddress destination,byte[] key) throws IOException{
59+
byte[] rpcID = new byte[20];
60+
new Random().nextBytes(rpcID);
61+
62+
DHT.Message<String> findNodeMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,rpcID,DHT.NODE_ID,"findValue",Arrays.asList(key,Collections.singletonMap("protocolVersion",1)));
3663

3764
return DHT.sendWithFuture(socket,destination,findNodeMessage);
3865
}
3966

4067
protected static CompletableFuture<UDP.Packet> sendWithFuture(DatagramSocket socket,InetSocketAddress destination, DHT.Message<?> message) throws IOException{
4168
UDP.send(socket,new UDP.Packet(destination,message.toBencode()));
4269
RPCID key = new RPCID(message);
43-
return DHT.futureManager.createFuture(key,5,TimeUnit.SECONDS);
70+
return DHT.futureManager.createFuture(key,1,TimeUnit.SECONDS);
4471
}
4572

4673
public static class Message<P>{
@@ -96,7 +123,7 @@ public List<?> getArguments(){
96123
return this.arguments;
97124
}
98125

99-
public byte[] toBencode(){
126+
public Map<String,Object> toMap(){
100127
Map<String,Object> dictionary = new HashMap<>();
101128
dictionary.put("0",this.type);
102129
dictionary.put("1",this.rpcID);
@@ -107,7 +134,11 @@ public byte[] toBencode(){
107134
if(this.arguments!=null){
108135
dictionary.put("4",this.arguments);
109136
}
110-
return BencodeConverter.encode(dictionary);
137+
return dictionary;
138+
}
139+
140+
public byte[] toBencode(){
141+
return BencodeConverter.encode(this.toMap());
111142
}
112143

113144
private DHT.Message<P> setFromBencode(byte[] data){

src/main/java/com/lbry/globe/util/TimeoutFutureManager.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package com.lbry.globe.util;
22

3+
import java.util.List;
34
import java.util.concurrent.*;
5+
import java.util.stream.Collectors;
6+
import java.util.stream.Stream;
47

58
public class TimeoutFutureManager<K,V>{
69

@@ -33,4 +36,14 @@ public void finishFuture(K key,V value){
3336
}
3437
}
3538

39+
public static <V> CompletableFuture<List<V>> getBulk(CompletableFuture<V>[] futures){
40+
return CompletableFuture.allOf(futures).exceptionally((t) -> null).thenApply((v) -> Stream.of(futures).map(future -> {
41+
try{
42+
return future.join();
43+
}catch(Exception e){
44+
return null;
45+
}
46+
}).collect(Collectors.toList()));
47+
}
48+
3649
}

0 commit comments

Comments
 (0)