Skip to content

Commit f40e0cd

Browse files
committed
- included message throttling capability and auto throttling capability to address processor bog-down during heavy XML operations
1 parent 5005d0e commit f40e0cd

File tree

4 files changed

+131
-35
lines changed

4 files changed

+131
-35
lines changed

SweLib/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ android {
99
defaultConfig {
1010
minSdkVersion 21
1111
targetSdkVersion 28
12-
versionCode 7
13-
versionName '1.0.7'
12+
versionCode 8
13+
versionName '1.0.8'
1414
}
1515
buildTypes {
1616
release {

SweLib/src/main/java/org/sofwerx/ogc/sos/OperationGetResults.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ public ArrayList<Pair<String,String>> getPairs() {
3939
pairs.add(new Pair("request","GetResult"));
4040
pairs.add(new Pair("offering",sensor.getAssignedOffering()));
4141
pairs.add(new Pair("observedProperty",sensor.getFirstObservableProperty()));
42+
//TODO pairs.add(new Pair("temporalFilter","phenomenonTime,now")); //this sisnt working for some reason
4243
pairs.add(new Pair("responseFormat","application/json"));
44+
4345
return pairs;
4446
}
4547

SweLib/src/main/java/org/sofwerx/ogc/sos/SosIpcTransceiver.java

Lines changed: 67 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,42 @@ public class SosIpcTransceiver extends BroadcastReceiver {
4545
public static final String ACTION_SQAN_BROADCAST = "org.sofwerx.sqan.pkt";
4646
private final static String SQAN_PACKET_BYTES = "bytes";
4747
private final static String SQAN_PACKET_CHANNEL = "channel";
48+
final static long DEFAULT_OUTGOING_THROTTLE_RATE = 1000l * 5l;
49+
final static long DEFAULT_INCOMING_THROTTLE_RATE = 1000l * 10l;
4850
private static boolean enableSqAN = true;
4951
private static String channel = SosService.DEFAULT_SWE_CHANNEL;
5052
private SosMessageListener listener;
5153

54+
//Since XML is expensive to marshall/unmarshall, include throttling to ignore messages that come in or go out too fast
55+
private static long throttleRate = -1l;
56+
private static long nextAvailableIntake = Long.MIN_VALUE;
57+
5258
public SosIpcTransceiver(SosMessageListener listener) {
5359
this.listener = listener;
5460
}
5561

5662
public static void setChannel(String channel) { SosIpcTransceiver.channel = channel; }
5763
public static void setEnableSqAN(boolean enable) { SosIpcTransceiver.enableSqAN = enable; }
5864

65+
/**
66+
* Sets a throttle rate (i.e. a min amo8nt of time between messages in ms); all messages
67+
* received in excess of the throttle rate will be dropped (done to prevent bogging
68+
* down the processor in XML marshallig/unmarshalling operations).
69+
* @param rate rate in ms (or -1l if no throttling is needed)
70+
*/
71+
public static void setThrottleRate(long rate) {
72+
if (throttleRate != rate) {
73+
throttleRate = rate;
74+
if (rate > 0l)
75+
Log.d(TAG, "Setting throttle interval to " + Long.toString(rate) + "ms");
76+
else {
77+
Log.d(TAG, "Removing throttle");
78+
}
79+
}
80+
}
81+
82+
public static void clearThrottle() { setThrottleRate(-1l); }
83+
5984
@Override
6085
public void onReceive(Context context, Intent intent) {
6186
if ((context != null) && (intent != null)) {
@@ -98,22 +123,27 @@ public void onMessageReceived(final Context context,final String source, final S
98123
Log.e(TAG, "Null operation received from SOS broadcast IPC");
99124
return;
100125
}
101-
new Thread(() -> {
102-
try {
103-
DocumentBuilderFactory builderFactory = DocumentBuilderFactory.newInstance();
104-
DocumentBuilder docBuilder = builderFactory.newDocumentBuilder();
105-
Document doc = docBuilder.parse(new InputSource(new ByteArrayInputStream(input.getBytes("utf-8"))));
106-
if (doc != null) {
107-
AbstractSosOperation operation = AbstractSosOperation.newFromXML(doc);
108-
if (operation != null) {
109-
if (listener != null)
110-
listener.onSosOperationReceived(operation);
126+
if ((throttleRate <= 0l) || (System.currentTimeMillis() > nextAvailableIntake)) {
127+
if (throttleRate > 0l)
128+
nextAvailableIntake = System.currentTimeMillis() + throttleRate;
129+
new Thread(() -> {
130+
try {
131+
DocumentBuilderFactory builderFactory = DocumentBuilderFactory.newInstance();
132+
DocumentBuilder docBuilder = builderFactory.newDocumentBuilder();
133+
Document doc = docBuilder.parse(new InputSource(new ByteArrayInputStream(input.getBytes("utf-8"))));
134+
if (doc != null) {
135+
AbstractSosOperation operation = AbstractSosOperation.newFromXML(doc);
136+
if (operation != null) {
137+
if (listener != null)
138+
listener.onSosOperationReceived(operation);
139+
}
111140
}
141+
} catch (ParserConfigurationException | IOException | SAXException e) {
142+
Log.e(TAG, "SOS IPC broadcast was not XML: " + input);
112143
}
113-
} catch (ParserConfigurationException | IOException | SAXException e) {
114-
Log.e(TAG,"SOS IPC broadcast was not XML: "+input);
115-
}
116-
}).start();
144+
}).start();
145+
} else
146+
Log.d(TAG,"Dropping message from "+source+" due to flooding: "+input);
117147
}
118148

119149
/**
@@ -122,25 +152,30 @@ public void onMessageReceived(final Context context,final String source, final S
122152
* @param operation
123153
*/
124154
public void broadcast(final Context context, final AbstractSosOperation operation) throws SosException {
125-
if (operation != null) {
126-
if (!operation.isValid()) {
127-
throw new SosException(operation.getClass().getSimpleName()+" does not have all required information");
128-
}
129-
new Thread(() -> {
130-
Document doc = null;
131-
try {
132-
doc = operation.toXML();
133-
} catch (ParserConfigurationException e) {
134-
//throw new SosException("Unable to create document: " + e.getMessage());
135-
}
136-
try {
137-
if (doc != null)
138-
broadcast(context, toString(doc));
139-
} catch (Exception ex) {
140-
//throw new SosException("Unable to convert XML document to string: " + ex.getMessage());
155+
if ((throttleRate <= 0l) || (System.currentTimeMillis() > nextAvailableIntake)) {
156+
if (throttleRate > 0l)
157+
nextAvailableIntake = System.currentTimeMillis() + throttleRate;
158+
if (operation != null) {
159+
if (!operation.isValid()) {
160+
throw new SosException(operation.getClass().getSimpleName() + " does not have all required information");
141161
}
142-
}).start();
143-
}
162+
new Thread(() -> {
163+
Document doc = null;
164+
try {
165+
doc = operation.toXML();
166+
} catch (ParserConfigurationException e) {
167+
//throw new SosException("Unable to create document: " + e.getMessage());
168+
}
169+
try {
170+
if (doc != null)
171+
broadcast(context, toString(doc));
172+
} catch (Exception ex) {
173+
//throw new SosException("Unable to convert XML document to string: " + ex.getMessage());
174+
}
175+
}).start();
176+
}
177+
} else
178+
Log.d(TAG,operation.getClass().getSimpleName()+" operation received but ignored since the current throttle rate of "+Long.toString(throttleRate)+"ms is being exceeded");
144179
}
145180

146181
public final static String toString(Document doc) throws TransformerException {

SweLib/src/main/java/org/sofwerx/ogc/sos/SosService.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ public class SosService implements SosMessageListener {
4040
private boolean sensorMode = true;
4141
private AtomicBoolean shouldPollServer = new AtomicBoolean(false);
4242
private long pollInterval = 1000l * 15l; //default server polling interval
43+
private boolean autoThrottle = false;
44+
private long outgoingThrottleRate = SosIpcTransceiver.DEFAULT_OUTGOING_THROTTLE_RATE;
45+
private long incomingThrottleRate = SosIpcTransceiver.DEFAULT_INCOMING_THROTTLE_RATE;
4346

4447
/**
4548
* Creates a new SosService
@@ -274,13 +277,17 @@ public void registerSensor() {
274277
handler.post(() -> {
275278
if (sosSensor.getAssignedProcedure() == null) {
276279
if (sosSensor.isReadyToRegisterSensor()) {
280+
if (autoThrottle)
281+
SosIpcTransceiver.clearThrottle();
277282
Log.d(SosIpcTransceiver.TAG,"Sensor has all required info to register; contacting server...");
278283
OperationInsertSensor operation = new OperationInsertSensor(sosSensor);
279284
broadcast(operation);
280285
} else
281286
Log.w(SosIpcTransceiver.TAG, "sosSensor does not yet have enough information to register with the SOS server");
282287
} else if (sosSensor.getAssignedTemplate() == null) {
283288
if (sosSensor.isReadyToRegisterResultTemplate()) {
289+
if (autoThrottle)
290+
SosIpcTransceiver.clearThrottle();
284291
OperationInsertResultTemplate operation = new OperationInsertResultTemplate(sosSensor);
285292
broadcast(operation);
286293
} else
@@ -339,6 +346,8 @@ public void onSosOperationReceived(AbstractSosOperation operation) {
339346
Log.i(SosIpcTransceiver.TAG,"InsertSensorResponse received, but it was for sensor "+response.getAssignedProcedure());
340347
}
341348
} else if (operation instanceof OperationInsertResultTemplateResponse) {
349+
if (autoThrottle)
350+
SosIpcTransceiver.setThrottleRate(outgoingThrottleRate);
342351
if (sosSensor != null) {
343352
OperationInsertResultTemplateResponse response = (OperationInsertResultTemplateResponse) operation;
344353
if ((response.getAcceptedTemplate() != null) && (sosSensor.getAssignedProcedure() != null)
@@ -349,8 +358,14 @@ public void onSosOperationReceived(AbstractSosOperation operation) {
349358
if (listener != null)
350359
listener.onSosConfigurationSuccess();
351360
} else
352-
Log.i(SosIpcTransceiver.TAG,"InsertResultTemplateResponse received, but it was for template "+response.getAcceptedTemplate());
361+
Log.i(SosIpcTransceiver.TAG, "InsertResultTemplateResponse received, but it was for template " + response.getAcceptedTemplate());
353362
}
363+
} else if (operation instanceof OperationInsertResult) {
364+
if (autoThrottle)
365+
SosIpcTransceiver.setThrottleRate(incomingThrottleRate);
366+
} else if (operation instanceof OperationInsertResultResponse) {
367+
if (autoThrottle)
368+
SosIpcTransceiver.setThrottleRate(outgoingThrottleRate);
354369
}
355370
if (listener != null)
356371
listener.onSosOperationReceived(operation);
@@ -402,4 +417,48 @@ public boolean isSensorMode() {
402417
*/
403418
public void setSensorMode(boolean sensorMode) { this.sensorMode = sensorMode; }
404419
public void setSensorMode() { setSensorMode(true); }
420+
421+
/**
422+
* Is the service set to automatically throttle incoming messages to offset
423+
* cost on flooding XML unmarshalling.
424+
* @return true == will automatically throttle
425+
*/
426+
public boolean isAutoThrottle() { return autoThrottle; }
427+
428+
/**
429+
* Sets the service to automatically throttle incoming messages (usually done as
430+
* a way to prevent flooding the processor with a bunch of costly XML unmarshalling)
431+
* @param autoThrottle true == the service will automatically throttle
432+
*/
433+
public void setAutoThrottle(boolean autoThrottle) { this.autoThrottle = autoThrottle; }
434+
435+
/**
436+
* Gets the set throttle rate on outgoing messages
437+
* @return interval between messages in ms
438+
*/
439+
public long getOutgoingThrottleRate() { return outgoingThrottleRate; }
440+
441+
/**
442+
* Sets the set throttle rate on outgoing messages
443+
* @param outgoingThrottleRate interval between messages in ms
444+
*/
445+
public void setOutgoingThrottleRate(long outgoingThrottleRate) {
446+
if (outgoingThrottleRate > 0l)
447+
this.outgoingThrottleRate = outgoingThrottleRate;
448+
}
449+
450+
/**
451+
* Gets the set throttle rate on incoming messages
452+
* @return interval between messages in ms
453+
*/
454+
public long getIncomingThrottleRate() { return incomingThrottleRate; }
455+
456+
/**
457+
* Sets the set throttle rate on incoming messages
458+
* @param incomingThrottleRate interval between messages in ms
459+
*/
460+
public void setIncomingThrottleRate(long incomingThrottleRate) {
461+
if (incomingThrottleRate > 0l)
462+
this.incomingThrottleRate = incomingThrottleRate;
463+
}
405464
}

0 commit comments

Comments
 (0)