5
5
6
6
package com .aws .greengrass .localdebugconsole ;
7
7
8
+ import com .aws .greengrass .builtin .services .pubsub .PubSubIPCEventStreamAgent ;
9
+ import com .aws .greengrass .builtin .services .pubsub .PublishEvent ;
10
+ import com .aws .greengrass .builtin .services .pubsub .SubscribeRequest ;
8
11
import com .aws .greengrass .deployment .DeviceConfiguration ;
9
12
import com .aws .greengrass .lifecyclemanager .Kernel ;
13
+ import com .aws .greengrass .localdebugconsole .messageutils .CommunicationMessage ;
10
14
import com .aws .greengrass .localdebugconsole .messageutils .DeviceDetails ;
11
15
import com .aws .greengrass .localdebugconsole .messageutils .Message ;
12
16
import com .aws .greengrass .localdebugconsole .messageutils .MessageType ;
13
17
import com .aws .greengrass .localdebugconsole .messageutils .PackedRequest ;
14
18
import com .aws .greengrass .localdebugconsole .messageutils .Request ;
15
19
import com .aws .greengrass .logging .api .Logger ;
20
+ import com .aws .greengrass .mqttclient .MqttClient ;
21
+ import com .aws .greengrass .mqttclient .MqttRequestException ;
22
+ import com .aws .greengrass .mqttclient .v5 .Publish ;
23
+ import com .aws .greengrass .mqttclient .v5 .Subscribe ;
24
+ import com .aws .greengrass .mqttclient .v5 .Unsubscribe ;
16
25
import com .aws .greengrass .util .DefaultConcurrentHashMap ;
17
26
import com .aws .greengrass .util .Pair ;
18
27
import com .fasterxml .jackson .core .JsonProcessingException ;
28
+ import com .fasterxml .jackson .databind .JsonNode ;
19
29
import com .fasterxml .jackson .databind .ObjectMapper ;
20
30
import lombok .AccessLevel ;
21
31
import lombok .Getter ;
22
32
import org .java_websocket .WebSocket ;
23
33
import org .java_websocket .exceptions .WebsocketNotConnectedException ;
24
34
import org .java_websocket .handshake .ClientHandshake ;
25
35
import org .java_websocket .server .WebSocketServer ;
36
+ import software .amazon .awssdk .aws .greengrass .model .ReceiveMode ;
26
37
27
38
import java .net .InetSocketAddress ;
28
39
import java .util .HashSet ;
29
40
import java .util .Map ;
30
41
import java .util .Set ;
31
42
import java .util .concurrent .CompletableFuture ;
43
+ import java .util .concurrent .ConcurrentHashMap ;
32
44
import java .util .concurrent .CopyOnWriteArraySet ;
45
+ import java .util .concurrent .ExecutionException ;
46
+ import java .util .function .Consumer ;
33
47
import javax .inject .Provider ;
34
48
import javax .inject .Singleton ;
35
49
import javax .net .ssl .SSLEngine ;
36
50
37
51
@ Singleton
38
52
public class DashboardServer extends WebSocketServer implements KernelMessagePusher {
39
53
static final String SERVER_START_MESSAGE = "Server started successfully" ;
54
+ private static final String IOT_CORE_SOURCE = "iotcore" ;
40
55
41
56
private final DashboardAPI dashboardAPI ;
42
57
private final Logger logger ;
@@ -47,18 +62,29 @@ public class DashboardServer extends WebSocketServer implements KernelMessagePus
47
62
new DefaultConcurrentHashMap <>(HashSet ::new );
48
63
private final DefaultConcurrentHashMap <String , Set <WebSocket >> logWatchlist =
49
64
new DefaultConcurrentHashMap <>(HashSet ::new );
65
+ private final DefaultConcurrentHashMap <WebSocket , Map <String , SubscribeRequest >> pubSubWatchList =
66
+ new DefaultConcurrentHashMap <>(ConcurrentHashMap ::new );
67
+ private final DefaultConcurrentHashMap <WebSocket , Map <String , Subscribe >> mqttWatchList =
68
+ new DefaultConcurrentHashMap <>(ConcurrentHashMap ::new );
50
69
@ Getter (AccessLevel .PACKAGE )
51
70
private final CompletableFuture <Object > started = new CompletableFuture <>();
52
71
private final Authenticator authenticator ;
72
+ private final MqttClient mqttClient ;
73
+
74
+ PubSubIPCEventStreamAgent pubSubIPCAgent ;
75
+ private final String SERVICE_NAME = "LocalDebugConsole" ;
53
76
54
77
public DashboardServer (InetSocketAddress address , Logger logger , Kernel root , DeviceConfiguration deviceConfig ,
55
78
Authenticator authenticator , Provider <SSLEngine > engineProvider ) {
56
- this (address , logger , new KernelCommunicator (root , logger , deviceConfig ), authenticator , engineProvider );
79
+ this (address , logger , new KernelCommunicator (root , logger , deviceConfig ), authenticator , engineProvider ,
80
+ root .getContext ().get (PubSubIPCEventStreamAgent .class ),
81
+ root .getContext ().get (MqttClient .class ));
57
82
}
58
83
59
84
// constructor for unit testing
60
85
DashboardServer (InetSocketAddress address , Logger logger , DashboardAPI dashboardAPI , Authenticator authenticator ,
61
- Provider <SSLEngine > engineProvider ) {
86
+ Provider <SSLEngine > engineProvider , PubSubIPCEventStreamAgent pubSubIPCAgent ,
87
+ MqttClient mqttClient ) {
62
88
super (address );
63
89
setReuseAddr (true );
64
90
setTcpNoDelay (true );
@@ -69,6 +95,8 @@ public DashboardServer(InetSocketAddress address, Logger logger, Kernel root, De
69
95
this .dashboardAPI = dashboardAPI ;
70
96
this .authenticator = authenticator ;
71
97
this .logger .atInfo ().log ("Starting dashboard server on address: {}" , address );
98
+ this .pubSubIPCAgent = pubSubIPCAgent ;
99
+ this .mqttClient = mqttClient ;
72
100
}
73
101
74
102
// links the API impl and starts the socket server
@@ -180,7 +208,6 @@ public void onMessage(WebSocket conn, String msg) {
180
208
dashboardAPI .updateConfig (req .args [0 ], req .args [1 ])));
181
209
break ;
182
210
}
183
-
184
211
case subscribeToComponent : {
185
212
statusWatchlist .get (req .args [0 ]).add (conn );
186
213
pushComponentChange (req .args [0 ]);
@@ -212,6 +239,18 @@ public void onMessage(WebSocket conn, String msg) {
212
239
sendIfOpen (conn , new Message (MessageType .RESPONSE , packedRequest .requestID , true ));
213
240
break ;
214
241
}
242
+ case subscribeToPubSubTopic : {
243
+ subscribeToPubSubTopic (conn , packedRequest , req );
244
+ break ;
245
+ }
246
+ case publishToPubSubTopic : {
247
+ publishToPubSubTopic (conn , packedRequest , req );
248
+ break ;
249
+ }
250
+ case unsubscribeToPubSubTopic : {
251
+ unsubscribeFromPubSubTopic (conn , packedRequest , req );
252
+ break ;
253
+ }
215
254
default : { // echo
216
255
sendIfOpen (conn , new Message (MessageType .RESPONSE , packedRequest .requestID , req .call ));
217
256
break ;
@@ -220,11 +259,117 @@ public void onMessage(WebSocket conn, String msg) {
220
259
}
221
260
}
222
261
262
+ private void subscribeToPubSubTopic (WebSocket conn , PackedRequest packedRequest , Request req ) {
263
+ JsonNode tree ;
264
+ try {
265
+ tree = jsonMapper .readTree (req .args [0 ]);
266
+ } catch (JsonProcessingException e ) {
267
+ sendIfOpen (conn , new Message (MessageType .RESPONSE , packedRequest .requestID , e .getMessage ()));
268
+ return ;
269
+ }
270
+
271
+ String topicFilter = tree .get ("topicFilter" ).textValue ();
272
+ String source = tree .get ("source" ).textValue ();
273
+ String subId = tree .get ("subId" ).textValue ();
274
+ try {
275
+ if (IOT_CORE_SOURCE .equals (source )) {
276
+ mqttWatchList .get (conn ).computeIfAbsent (subId , (a ) -> {
277
+ Consumer <Publish > cb = (c ) -> {
278
+ CommunicationMessage resMessage =
279
+ new CommunicationMessage (subId , topicFilter , c .getTopic (),
280
+ new String (c .getPayload ()));
281
+ sendIfOpen (conn , new Message (MessageType .PUB_SUB_MSG , resMessage ));
282
+ };
283
+ Subscribe subReq = Subscribe .builder ().callback (cb ).topic (topicFilter ).build ();
284
+ try {
285
+ mqttClient .subscribe (subReq ).get ();
286
+ } catch (MqttRequestException | InterruptedException | ExecutionException e ) {
287
+ throw new RuntimeException (e );
288
+ }
289
+ return subReq ;
290
+ });
291
+ } else {
292
+ pubSubWatchList .get (conn ).computeIfAbsent (subId , (a ) -> {
293
+ Consumer <PublishEvent > cb = (c ) -> {
294
+ CommunicationMessage resMessage =
295
+ new CommunicationMessage (subId , topicFilter , c .getTopic (),
296
+ new String (c .getPayload ()));
297
+ sendIfOpen (conn , new Message (MessageType .PUB_SUB_MSG , resMessage ));
298
+ };
299
+ SubscribeRequest subReq = SubscribeRequest .builder ().callback (cb )
300
+ .receiveMode (ReceiveMode .RECEIVE_ALL_MESSAGES ).topic (topicFilter )
301
+ .serviceName (SERVICE_NAME ).build ();
302
+ pubSubIPCAgent .subscribe (subReq );
303
+ return subReq ;
304
+ });
305
+ }
306
+ sendIfOpen (conn , new Message (MessageType .RESPONSE , packedRequest .requestID , true ));
307
+ } catch (Exception e ) {
308
+ sendIfOpen (conn , new Message (MessageType .RESPONSE , packedRequest .requestID , e .getMessage ()));
309
+ }
310
+ }
311
+
312
+ private void unsubscribeFromPubSubTopic (WebSocket conn , PackedRequest packedRequest , Request req ) {
313
+ SubscribeRequest subReq = pubSubWatchList .get (conn ).remove (req .args [0 ]);
314
+ if (subReq != null ) {
315
+ pubSubIPCAgent .unsubscribe (subReq );
316
+ }
317
+ Subscribe mqttSub = mqttWatchList .get (conn ).remove (req .args [0 ]);
318
+ if (mqttSub != null ) {
319
+ try {
320
+ mqttClient .unsubscribe (Unsubscribe .builder ().topic (mqttSub .getTopic ())
321
+ .subscriptionCallback (mqttSub .getCallback ()).build ());
322
+ } catch (MqttRequestException e ) {
323
+ sendIfOpen (conn ,
324
+ new Message (MessageType .RESPONSE , packedRequest .requestID , e .getMessage ()));
325
+ return ;
326
+ }
327
+ }
328
+ sendIfOpen (conn , new Message (MessageType .RESPONSE , packedRequest .requestID , true ));
329
+ }
330
+
331
+ private void publishToPubSubTopic (WebSocket conn , PackedRequest packedRequest , Request req ) {
332
+ JsonNode tree ;
333
+ try {
334
+ tree = jsonMapper .readTree (req .args [0 ]);
335
+ } catch (JsonProcessingException e ) {
336
+ sendIfOpen (conn , new Message (MessageType .RESPONSE , packedRequest .requestID , e .getMessage ()));
337
+ return ;
338
+ }
339
+
340
+ String topic = tree .get ("topic" ).textValue ();
341
+ String destination = tree .get ("destination" ).textValue ();
342
+ String payload = tree .get ("payload" ).textValue ();
343
+ try {
344
+ if (IOT_CORE_SOURCE .equals (destination )) {
345
+ mqttClient .publish (Publish .builder ()
346
+ .topic (topic )
347
+ .payload (payload .getBytes ())
348
+ .build ());
349
+ } else {
350
+ pubSubIPCAgent .publish (topic , payload .getBytes (), SERVICE_NAME );
351
+ }
352
+ sendIfOpen (conn , new Message (MessageType .RESPONSE , packedRequest .requestID , true ));
353
+ } catch (Exception e ) {
354
+ sendIfOpen (conn , new Message (MessageType .RESPONSE , packedRequest .requestID , e .getMessage ()));
355
+ }
356
+ }
357
+
223
358
@ Override
224
359
public void onClose (WebSocket conn , int code , String reason , boolean remote ) {
225
360
connections .remove (conn );
226
361
statusWatchlist .forEach ((name , set ) -> set .remove (conn ));
227
362
logWatchlist .forEach ((name , set ) -> set .remove (conn ));
363
+ pubSubWatchList .get (conn ).forEach ((topic , sub ) -> pubSubIPCAgent .unsubscribe (sub ));
364
+ mqttWatchList .get (conn ).forEach ((topic , sub ) -> {
365
+ try {
366
+ mqttClient .unsubscribe (Unsubscribe .builder ()
367
+ .subscriptionCallback (sub .getCallback ())
368
+ .topic (sub .getTopic ()).build ());
369
+ } catch (MqttRequestException e ) {
370
+ logger .error ("failed to unsubscribe" , e );
371
+ }
372
+ });
228
373
logger .atInfo ()
229
374
.log ("closed {} with exit code {}, additional info: {}" , conn .getRemoteSocketAddress (), code , reason );
230
375
}
@@ -244,7 +389,7 @@ public void onStart() {
244
389
@ Override
245
390
public void pushComponentListUpdate () {
246
391
for (WebSocket conn : connections ) {
247
- sendIfOpen (conn , new Message (MessageType .COMPONENT_LIST , - 1 , dashboardAPI .getComponentList ()));
392
+ sendIfOpen (conn , new Message (MessageType .COMPONENT_LIST , dashboardAPI .getComponentList ()));
248
393
}
249
394
}
250
395
@@ -253,7 +398,7 @@ public void pushComponentChange(String name) {
253
398
if (statusWatchlist .containsKey (name )) {
254
399
statusWatchlist .computeIfPresent (name , (k ,set ) -> {
255
400
for (WebSocket conn : set ) {
256
- sendIfOpen (conn , new Message (MessageType .COMPONENT_CHANGE , - 1 , dashboardAPI .getComponent (name )));
401
+ sendIfOpen (conn , new Message (MessageType .COMPONENT_CHANGE , dashboardAPI .getComponent (name )));
257
402
}
258
403
return set ;
259
404
});
@@ -263,7 +408,7 @@ public void pushComponentChange(String name) {
263
408
@ Override
264
409
public void pushDependencyGraphUpdate () {
265
410
for (WebSocket conn : connections ) {
266
- sendIfOpen (conn , new Message (MessageType .DEPS_GRAPH , - 1 , dashboardAPI .getDependencyGraph ()));
411
+ sendIfOpen (conn , new Message (MessageType .DEPS_GRAPH , dashboardAPI .getDependencyGraph ()));
267
412
}
268
413
}
269
414
0 commit comments