77import org .iot .dsa .DSRuntime ;
88import org .iot .dsa .DSRuntime .Timer ;
99import org .iot .dsa .dslink .DSIRequester ;
10+ import org .iot .dsa .dslink .requester .AbstractSubscribeHandler ;
1011import org .iot .dsa .dslink .requester .ErrorType ;
1112import org .iot .dsa .dslink .requester .OutboundStream ;
12- import org .iot .dsa .dslink .requester .OutboundSubscribeHandler ;
13- import org .iot .dsa .logging .DSLogger ;
1413import org .iot .dsa .node .DSElement ;
1514import org .iot .dsa .node .DSIValue ;
15+ import org .iot .dsa .node .DSLong ;
1616import org .iot .dsa .node .DSMap ;
1717import org .iot .dsa .node .DSMap .Entry ;
1818import org .iot .dsa .node .DSStatus ;
1919import org .iot .dsa .time .DSDateTime ;
20- import org .iot .dsa .util .DSException ;
2120import okhttp3 .Response ;
2221
23- public class SubscriptionRule extends DSLogger implements OutboundSubscribeHandler , UpdateSender {
22+ public class SubscriptionRule extends AbstractSubscribeHandler implements UpdateSender {
2423
2524 private AbstractRuleNode node ;
26- private OutboundStream stream ;
25+ // private OutboundStream stream;
2726 private long lastUpdateTime = -1 ;
2827 private Timer future = null ;
2928 private SubUpdate storedUpdate ;
@@ -66,7 +65,7 @@ public void run() {
6665 private void init () {
6766 DSIRequester requester = MainNode .getRequester ();
6867 int qos = 0 ;
69- requester .subscribe (this .subPath , qos , this );
68+ requester .subscribe (this .subPath , DSLong . valueOf ( qos ) , this );
7069 }
7170
7271 private void learnPattern () {
@@ -103,25 +102,28 @@ private void learnPattern() {
103102
104103 @ Override
105104 public void onClose () {
106- info ("Rule with sub path " + subPath + ": onClose called" );
107- close ();
105+ super .onClose ();
106+ node .info ("Rule with sub path " + subPath + ": onClose called" );
107+ // close();
108108 }
109109
110110 @ Override
111111 public void onError (ErrorType type , String msg ) {
112- info ("Rule with sub path " + subPath + ": onError called with msg " + msg );
113- DSException .throwRuntime (new RuntimeException (msg ));
112+ super .onError (type , msg );
113+ node .info ("Rule with sub path " + subPath + ": onError called with msg " + msg );
114+ // DSException.throwRuntime(new RuntimeException(msg));
114115 }
115116
116117 @ Override
117118 public void onInit (String path , DSIValue qos , OutboundStream stream ) {
118- info ("Rule with sub path " + subPath + ": onInit called" );
119- this .stream = stream ;
119+ super .onInit (path , qos , stream );
120+ node .info ("Rule with sub path " + subPath + ": onInit called" );
121+ //this.stream = stream;
120122 }
121123
122124 @ Override
123125 public void onUpdate (DSDateTime dateTime , DSElement value , DSStatus status ) {
124- info ("Rule with sub path " + subPath + ": onUpdate called with value " + (value !=null ? value : "Null" ));
126+ node . info ("Rule with sub path " + subPath + ": onUpdate called with value " + (value !=null ? value : "Null" ));
125127 storedUpdate = new SubUpdate (dateTime .toString (), value .toString (), status .toString (), dateTime .timeInMillis ());
126128 if (lastUpdateTime < 0 || System .currentTimeMillis () - lastUpdateTime >= minRefreshRate ) {
127129 if (future != null ) {
@@ -188,7 +190,7 @@ private boolean sendUpdate(final SubUpdate update) {
188190 body = body .replaceAll (Constants .PLACEHOLDER_BLOCK_END , "" );
189191 }
190192
191- info ("Rule with sub path " + subPath + ": sending Update with value " + (update .value !=null ? update .value : "Null" ));
193+ node . info ("Rule with sub path " + subPath + ": sending Update with value " + (update .value !=null ? update .value : "Null" ));
192194
193195 Response resp = restInvoke (urlParams , body );
194196 return resp != null && resp .code () == 200 ;
@@ -227,7 +229,7 @@ public Queue<SubUpdate> sendBatchUpdate(Queue<SubUpdate> updates) {
227229 }
228230 sb .append (suffix );
229231 String body = sb .toString ();
230- info ("Rule with sub path " + subPath + ": sending batch update" );
232+ node . info ("Rule with sub path " + subPath + ": sending batch update" );
231233
232234 Response resp = restInvoke (urlParams , body );
233235 if (resp != null && resp .code () == 200 ) {
@@ -243,16 +245,16 @@ private Response restInvoke(DSMap urlParams, String body) {
243245 try {
244246 resp = getWebClientProxy ().invoke (method , restUrl , urlParams , body );
245247 } catch (Exception e ) {
246- warn ("" , e );
248+ node . warn ("" , e );
247249 }
248250 node .responseRecieved (resp , rowNum );
249251 return resp ;
250252 }
251253
252254 public void close () {
253- if (stream != null && stream . isStreamOpen () ) {
254- info ("Rule with sub path " + subPath + ": closing Stream" );
255- stream .closeStream ();
255+ if (! isClosed () && getStream () != null ) {
256+ node . info ("Rule with sub path " + subPath + ": closing Stream" );
257+ getStream () .closeStream ();
256258 }
257259 }
258260
0 commit comments