88import lombok .Getter ;
99import lombok .Setter ;
1010import net .swofty .redisapi .exceptions .ChannelAlreadyRegisteredException ;
11+ import net .swofty .redisapi .exceptions .MessageFailureException ;
1112import redis .clients .jedis .Jedis ;
1213import redis .clients .jedis .JedisPool ;
1314import redis .clients .jedis .JedisPoolConfig ;
@@ -53,22 +54,30 @@ public static RedisAPI generateInstance(String uri, String password) {
5354 throw new CouldNotConnectToRedisException ("Either invalid Redis URI passed through; '" + uri + "' OR invalid Redis Password passed through; '" + password + "'" );
5455 }
5556
56- Jedis jedis = api .getPool ().getResource ();
57- jedis .connect ();
58-
59- EventRegistry .pubSub = new JedisPubSub () {
60- @ Override
61- public void onMessage (String channel , String message ) {
62- System .out .println ("Received " + message );
63- EventRegistry .handleAll (channel , message );
64- super .onMessage (channel , message );
65- }
66- };
67-
6857 instance = api ;
6958 return api ;
7059 }
7160
61+ /**
62+ * Starts listeners for the Redis Pub/Sub channels
63+ */
64+ public void startListeners () {
65+ new Thread (() -> {
66+ try (Jedis jedis = getPool ().getResource ()) {
67+ EventRegistry .pubSub = new JedisPubSub () {
68+ @ Override
69+ public void onMessage (String channel , String message ) {
70+ EventRegistry .handleAll (channel , message );
71+ }
72+ };
73+ jedis .subscribe (EventRegistry .pubSub , ChannelRegistry .registeredChannels .stream ().map ((e ) -> e .channelName ).toArray (String []::new ));
74+ getPool ().returnResource (jedis );
75+ } catch (Exception e ) {
76+ e .printStackTrace ();
77+ }
78+ }).start ();
79+ }
80+
7281 /**
7382 * Creates a new main Redis pool instance, there will only ever be one at a time so #getInstance should be used after generation
7483 * @param uri the URI used to connect to the Redis server running
@@ -94,9 +103,11 @@ public void setFilterID(String filterId) {
94103 * @param message the message being sent across that channel
95104 */
96105 public void publishMessage (RedisChannel channel , String message ) {
97- Jedis jedis = pool .getResource ();
98- jedis .connect ();
99- jedis .publish (channel .channelName , "all" + ";" + message );
106+ try (Jedis jedis = pool .getResource ()) {
107+ jedis .publish (channel .channelName , "none" + ";" + message );
108+ } catch (Exception ex ) {
109+ throw new MessageFailureException ("Failed to send message to redis" , ex );
110+ }
100111 }
101112
102113 /**
@@ -107,9 +118,11 @@ public void publishMessage(RedisChannel channel, String message) {
107118 * @param message the message being sent across that channel
108119 */
109120 public void publishMessage (String filterId , RedisChannel channel , String message ) {
110- Jedis jedis = pool .getResource ();
111- jedis .connect ();
112- jedis .publish (channel .channelName , filterId + ";" + message );
121+ try (Jedis jedis = pool .getResource ()) {
122+ jedis .publish (channel .channelName , filterId + ";" + message );
123+ } catch (Exception ex ) {
124+ throw new MessageFailureException ("Failed to send message to redis" , ex );
125+ }
113126 }
114127
115128 /**
@@ -139,4 +152,4 @@ public RedisChannel registerChannel(String channelName, @NonNull Consumer<RedisM
139152 ChannelRegistry .registerChannel (channel );
140153 return channel ;
141154 }
142- }
155+ }
0 commit comments