88import org .iot .dsa .node .DSInfo ;
99import org .iot .dsa .node .DSLong ;
1010import org .iot .dsa .node .DSMap ;
11- import org .iot .dsa .node .DSNode ;
1211import org .iot .dsa .node .DSString ;
1312import org .iot .dsa .node .DSValueType ;
1413import org .iot .dsa .node .action .ActionInvocation ;
1514import org .iot .dsa .node .action .ActionResult ;
1615import org .iot .dsa .node .action .DSAction ;
17- import org .iot .dsa .node .event .DSIEvent ;
18- import org .iot .dsa .node .event .DSISubscriber ;
19- import org .iot .dsa .node .event .DSITopic ;
16+ import org .iot .dsa .node .event .DSEventFilter ;
2017import org .iot .dsa .util .DSException ;
2118
2219/**
2320 * The root node of this link.
2421 */
2522public class MainNode extends DSMainNode implements PurgeSettings {
23+
2624 private static final Object requesterLock = new Object ();
2725 private static DSIRequester requester ;
2826 public static MainNode instance ;
29-
30- private DSInfo purgeEnabled = getInfo (Constants .BUFFER_PURGE_ENABLED );
3127 private DSInfo maxBufferSize = getInfo (Constants .BUFFER_MAX_SIZE );
28+ private DSInfo purgeEnabled = getInfo (Constants .BUFFER_PURGE_ENABLED );
3229
3330 public MainNode () {
3431 }
3532
33+ public long getMaxSizeInBytes () {
34+ return maxBufferSize .getValue ().toElement ().toLong ();
35+ }
36+
3637 public static DSIRequester getRequester () {
3738 synchronized (requesterLock ) {
3839 while (requester == null ) {
@@ -46,6 +47,10 @@ public static DSIRequester getRequester() {
4647 }
4748 }
4849
50+ public boolean isPurgeEnabled () {
51+ return purgeEnabled .getValue ().toElement ().toBoolean ();
52+ }
53+
4954 public static void setRequester (DSIRequester requester ) {
5055 synchronized (requesterLock ) {
5156 MainNode .requester = requester ;
@@ -63,30 +68,23 @@ protected void declareDefaults() {
6368 declareDefault (Constants .ACT_ADD_BASIC_CONN , makeAddBasicConnectionAction ());
6469 declareDefault (Constants .ACT_ADD_OAUTH_CLIENT_CONN , makeAddOauthClientConnectionAction ());
6570 declareDefault (Constants .ACT_ADD_OAUTH_PASSWORD_CONN , makeAddOauthPassConnectionAction ());
66- declareDefault (Constants .BUFFER_PURGE_ENABLED , DSBool .FALSE , "Whether old unsent records should automatically be purged from the buffer when the buffer gets too large" );
67- declareDefault (Constants .BUFFER_MAX_SIZE , DSLong .valueOf (1074000000 ), "Maximum size of buffer in bytes; only applies if auto-purge is enabled" );
68- declareDefault ("Docs" , DSString .valueOf ("https://github.com/iot-dsa-v2/dslink-java-v2-restadapter/blob/develop/docs/Usage_Guide.md" )).setTransient (true ).setReadOnly (true );
69- }
70-
71- public boolean isPurgeEnabled () {
72- return purgeEnabled .getValue ().toElement ().toBoolean ();
73- }
74-
75- public long getMaxSizeInBytes () {
76- return maxBufferSize .getValue ().toElement ().toLong ();
71+ declareDefault (Constants .BUFFER_PURGE_ENABLED , DSBool .FALSE ,
72+ "Whether old unsent records should automatically be purged from the buffer when the buffer gets too large" );
73+ declareDefault (Constants .BUFFER_MAX_SIZE , DSLong .valueOf (1074000000 ),
74+ "Maximum size of buffer in bytes; only applies if auto-purge is enabled" );
75+ declareDefault ("Docs" , DSString .valueOf (
76+ "https://github.com/iot-dsa-v2/dslink-java-v2-restadapter/blob/develop/docs/Usage_Guide.md" ))
77+ .setTransient (true ).setReadOnly (true );
7778 }
7879
7980 @ Override
8081 protected void onStarted () {
8182 instance = this ;
82- getLink ().getConnection ().subscribe (
83- DSLinkConnection .CONNECTED , null , null , new DSISubscriber () {
84- @ Override
85- public void onEvent (DSNode node , DSInfo child ,
86- DSIEvent event ) {
87- MainNode .setRequester (getLink ().getConnection ().getRequester ());
88- }
89- });
83+ getLink ().getUpstream ().subscribe (new DSEventFilter (
84+ ((event , node , child , data ) -> MainNode .setRequester (
85+ getLink ().getUpstream ().getRequester ())),
86+ DSLinkConnection .CONNECTED_EVENT ,
87+ null ));
9088 }
9189
9290 private void addBasicConnection (DSMap parameters ) {
0 commit comments