Skip to content

Commit 364866d

Browse files
committed
Improve pinging hubs
1 parent d4eeff7 commit 364866d

File tree

4 files changed

+70
-41
lines changed

4 files changed

+70
-41
lines changed

src/main/java/com/lbry/globe/Main.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.netty.channel.socket.nio.NioServerSocketChannel;
1818
import io.netty.handler.codec.http.HttpRequestDecoder;
1919
import io.netty.handler.codec.http.HttpResponseEncoder;
20+
import io.netty.util.concurrent.DefaultThreadFactory;
2021

2122
import java.util.Arrays;
2223
import java.util.logging.Logger;
@@ -35,7 +36,7 @@ public Main(String... args){
3536

3637
@Override
3738
public void run(){
38-
EventLoopGroup group = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
39+
EventLoopGroup group = new MultiThreadIoEventLoopGroup(new DefaultThreadFactory("Netty Event Loop"),NioIoHandler.newFactory());
3940
this.runTCPServerHTTP(group);
4041
}
4142

@@ -59,17 +60,17 @@ protected void initChannel(SocketChannel socketChannel){
5960

6061
public static void main(String... args){
6162
Main.LOGGER.info("Loading nodes cache");
62-
Runtime.getRuntime().addShutdownHook(new Thread(API::saveNodes));
63+
Runtime.getRuntime().addShutdownHook(new Thread(API::saveNodes,"Save Nodes"));
6364
API.loadNodes();
6465
Main.LOGGER.info("Loading GeoIP cache");
65-
Runtime.getRuntime().addShutdownHook(new Thread(GeoIP::saveCache));
66+
Runtime.getRuntime().addShutdownHook(new Thread(GeoIP::saveCache,"Save Cache"));
6667
GeoIP.loadCache();
6768
Main.LOGGER.info("Starting finder thread for blockchain nodes");
68-
new Thread(new BlockchainNodeFinderThread()).start();
69+
new Thread(new BlockchainNodeFinderThread(),"Block Node Finder").start();
6970
Main.LOGGER.info("Starting finder thread for DHT nodes");
70-
new Thread(new DHTNodeFinderThread()).start();
71+
new DHTNodeFinderThread().run();
7172
Main.LOGGER.info("Starting finder thread for hub nodes");
72-
new Thread(new HubNodeFinderThread()).start();
73+
new HubNodeFinderThread().run();
7374
Main.LOGGER.info("Starting server");
7475
new Main(args).run();
7576
}

src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.util.concurrent.Executors;
1515
import java.util.concurrent.TimeUnit;
1616

17+
import io.netty.util.concurrent.DefaultThreadFactory;
1718
import org.json.JSONObject;
1819

1920
public class DHTNodeFinderThread implements Runnable{
@@ -45,8 +46,8 @@ public void run(){
4546
}
4647

4748
private void startSender(){
48-
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(() -> {
49-
System.out.println("[BULK PING]");
49+
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("DHT Sender")).scheduleWithFixedDelay(() -> {
50+
System.out.println("[DHT] BULK PING");
5051
API.saveNodes();
5152
for(InetSocketAddress socketAddress : DHT.getPeers().keySet()){
5253
String hostname = socketAddress.getHostName();
@@ -139,7 +140,7 @@ private void startReceiver(){
139140
e.printStackTrace();
140141
}
141142
}
142-
}).start();
143+
},"DHT Receiver").start();
143144
}
144145

145146
private void handleIncomingMessages(){

src/main/java/com/lbry/globe/thread/HubNodeFinderThread.java

Lines changed: 56 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,20 @@
77

88
import java.io.InputStream;
99
import java.net.InetAddress;
10+
import java.net.InetSocketAddress;
1011
import java.net.Socket;
1112
import java.util.*;
13+
import java.util.concurrent.CompletableFuture;
14+
import java.util.concurrent.Executors;
15+
import java.util.concurrent.TimeUnit;
1216

17+
import io.netty.util.concurrent.DefaultThreadFactory;
18+
import io.netty.util.concurrent.ThreadPerTaskExecutor;
1319
import org.json.JSONArray;
1420
import org.json.JSONObject;
1521

22+
import javax.net.SocketFactory;
23+
1624
public class HubNodeFinderThread implements Runnable{
1725

1826
public static final String[] HUBS = {
@@ -34,36 +42,59 @@ public class HubNodeFinderThread implements Runnable{
3442

3543
private static final Map<InetAddress,Long> LAST_SEEN = new TreeMap<>(Comparator.comparing(InetAddress::getHostAddress));
3644

45+
private static final Map<InetAddress,Socket> SOCKETS = new TreeMap<>(Comparator.comparing(InetAddress::getHostAddress));
46+
3747
@Override
38-
public void run() {
39-
while(true){
48+
public void run(){
49+
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("Hub Sender")).scheduleWithFixedDelay(() -> {
50+
System.out.println("[HUB] BULK PING");
4051
for(String hostname : HubNodeFinderThread.HUBS){
4152
try{
4253
for(InetAddress ip : InetAddress.getAllByName(hostname)){
43-
new Thread(() -> {
44-
try{
45-
Socket s = new Socket(ip,50001);
46-
JSONObject obj = new JSONObject();
47-
obj.put("id",new Random().nextInt());
48-
obj.put("method","server.banner");
49-
obj.put("params",new JSONArray());
50-
s.getOutputStream().write((obj+"\n").getBytes());
51-
s.getOutputStream().flush();
52-
InputStream in = s.getInputStream();
53-
StringBuilder sb = new StringBuilder();
54-
int b;
55-
while((b = in.read())!='\n'){
56-
sb.append(new String(new byte[]{(byte) (b & 0xFF)}));
57-
}
58-
in.close();
59-
JSONObject respObj = new JSONObject(sb.toString());
60-
boolean successful = respObj.has("result") && !respObj.isNull("result");
61-
if(successful){
62-
LAST_SEEN.put(ip,System.currentTimeMillis());
54+
if(!HubNodeFinderThread.SOCKETS.containsKey(ip)){
55+
HubNodeFinderThread.SOCKETS.put(ip,new Socket());
56+
}
57+
try{
58+
if(!HubNodeFinderThread.SOCKETS.get(ip).isConnected() || HubNodeFinderThread.SOCKETS.get(ip).isClosed()){
59+
if(HubNodeFinderThread.SOCKETS.get(ip).isClosed()){
60+
HubNodeFinderThread.SOCKETS.put(ip,new Socket());
6361
}
64-
}catch(Exception e){
62+
HubNodeFinderThread.SOCKETS.get(ip).connect(new InetSocketAddress(ip,50001),1000);
6563
}
66-
}).start();
64+
}catch(Exception ignored){}
65+
66+
Socket s = HubNodeFinderThread.SOCKETS.get(ip);
67+
if(s==null || !s.isConnected() || s.isClosed()){
68+
continue;
69+
}
70+
System.out.println(" - [Hub] To: "+s.getRemoteSocketAddress());
71+
72+
JSONObject obj = new JSONObject();
73+
obj.put("id",new Random().nextInt());
74+
obj.put("method","server.banner");
75+
obj.put("params",new JSONArray());
76+
s.getOutputStream().write((obj+"\n").getBytes());
77+
s.getOutputStream().flush();
78+
}
79+
for(InetAddress ip : InetAddress.getAllByName(hostname)){
80+
Socket s = HubNodeFinderThread.SOCKETS.get(ip);
81+
if(s==null || !s.isConnected() || s.isClosed()){
82+
continue;
83+
}
84+
System.out.println(" - [Hub] From: "+s.getRemoteSocketAddress());
85+
86+
InputStream in = s.getInputStream();
87+
StringBuilder sb = new StringBuilder();
88+
int b;
89+
while((b = in.read())!='\n'){
90+
sb.append(new String(new byte[]{(byte) (b & 0xFF)}));
91+
}
92+
in.close();
93+
JSONObject respObj = new JSONObject(sb.toString());
94+
boolean successful = respObj.has("result") && !respObj.isNull("result");
95+
if(successful){
96+
LAST_SEEN.put(ip,System.currentTimeMillis());
97+
}
6798
}
6899
}catch(Exception e){
69100
e.printStackTrace();
@@ -123,13 +154,7 @@ public void run() {
123154
}
124155

125156
API.saveNodes();
126-
127-
try {
128-
Thread.sleep(10_000);
129-
} catch (InterruptedException e) {
130-
throw new RuntimeException(e);
131-
}
132-
}
157+
},0,10,TimeUnit.SECONDS);
133158
}
134159

135160
}

src/main/java/com/lbry/globe/util/DHT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.lbry.globe.util;
22

3+
import io.netty.util.concurrent.DefaultThreadFactory;
4+
35
import java.io.IOException;
46
import java.net.DatagramSocket;
57
import java.net.InetSocketAddress;
@@ -12,7 +14,7 @@ public class DHT{
1214

1315
public static byte[] NODE_ID = new byte[48];
1416

15-
private static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
17+
private static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("Timeout Future"));
1618
private static final TimeoutFutureManager<RPCID,UDP.Packet> futureManager = new TimeoutFutureManager<>(executor);
1719
private static final Map<InetSocketAddress,Boolean> peers = new ConcurrentHashMap<>();
1820
private static final DatagramSocket socket;

0 commit comments

Comments
 (0)