Skip to content

Commit 6325a81

Browse files
authored
Merge pull request #33 from AmedeeBulle/queues
feat: ✨ add queues / streaming sample
2 parents 0cfdfce + 1bb65be commit 6325a81

File tree

7 files changed

+891
-0
lines changed

7 files changed

+891
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ as well as the messages received.
5959
| Publish telemetry (WSS - Secure MQTT over WebSocket) | | [Sample](./samples/python/publish-websockets/) |
6060
| Raw command-response scenario | | [Sample](./samples/python/command-response/) |
6161
| Direct database connection — query telemetry | [Sample](./samples/script/query-db/) | [Sample](./samples/python/query-db/) |
62+
| Streaming IoT Platform data (Database queues) | | [Sample](./samples/python/queues/) |
6263

6364
## Documentation
6465

samples/python/queues/README.md

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
# Streaming IoT Platform data
2+
3+
This example demonstrates how to use
4+
[Transactional Event Queues](https://www.oracle.com/database/advanced-queuing/)
5+
to stream IoT Platform data.
6+
7+
You should already be familiar with connecting to the IoT Platform database.
8+
If not, review the [Direct database connection example](../query-db/README.md)
9+
in this repository.
10+
11+
The provided scripts allow you to subscribe and stream from the raw and normalized
12+
message queues.
13+
14+
## Concepts
15+
16+
Digital Twin Instances data is available through data tables, but it can also be
17+
streamed using database Transactional Event Queues.
18+
19+
The following queues are available:
20+
21+
| Queue name | Data type | Description |
22+
| ---------------- | --------------------- | -------------------------------- |
23+
| raw_data_in | raw_data_in_type | Incoming raw messages |
24+
| raw_data_out | raw_data_out_type | Outgoing raw messages (commands) |
25+
| normalized_data | JSON | Normalized data |
26+
| rejected_data_in | rejected_data_in_type | Rejected incoming messages |
27+
28+
The `normalized_data` queues is a JSON queue, while the others use an Abstract Data Type
29+
(ADT).
30+
More information on the data model is available in the
31+
[Transactional Event Queues](https://docs.oracle.com/en-us/iaas/Content/internet-of-things/iot-domain-database-schema.htm#queues)
32+
section of the IoT Platform documentation.
33+
34+
Queue subscribers can be implemented in a durable or non-durable way:
35+
36+
- Durable subscribers: messages are kept in the queue until a client connects and read
37+
available messages.
38+
Note that the retention for the IoT Platform queues is set to 24 hours.
39+
- Non-durable subscribers: only receive messages issued while the client is
40+
connected.
41+
The Python SDK does not support non-durable subscribers as such, but this can be
42+
emulated by registering an ephemeral subscriber when a client connects.
43+
44+
## Sample scripts
45+
46+
Two sample scripts are provided:
47+
48+
- `sub-raw`: subscribe and stream all incoming raw data (ADT). It is implemented
49+
as a non-durable subscriber.
50+
- `sub-norm`: subscribe and stream the normalized data (JSON), using a durable subscriber.
51+
52+
Both scripts demonstrate how to use rules to filter data based on the Digital Twin
53+
Instance Id and/or the endpoint (raw data) or content path (normalized data).
54+
55+
More information on using queues with the Python SDK is available on
56+
[Using Oracle Transactional Event Queues and Advanced Queuing](https://python-oracledb.readthedocs.io/en/stable/user_guide/aq.html).
57+
58+
## Prerequisites
59+
60+
Install the Python dependencies.
61+
(Using a [Python virtual environment](https://docs.python.org/3/library/venv.html) is recommended):
62+
63+
```sh
64+
pip install -r requirements.txt
65+
```
66+
67+
When using `oracledb` in _Thick_ mode, the
68+
[Oracle Instant Client](https://www.oracle.com/europe/database/technologies/instant-client.html)
69+
must be installed (the 23ai Release Update or newer is recommended).
70+
The `sqlnet.ora` parameter `SSL_SERVER_DN_MATCH` must also be set to `true`.
71+
72+
## Configure and run the scripts
73+
74+
Copy `config.distr.py` to `config.py` and set the following variables:
75+
76+
- `db_connect_string`: The `dbConnectionString` property of your IoT Domain Group.
77+
- `db_token_scope`: The `dbTokenScope` property of your IoT Domain Group.
78+
- `iot_domain_short_name`: The hostname part of the `deviceHost` property of your IoT Domain.
79+
- `oci_auth_type`: The OCI authentication type. Use "ConfigFileAuthentication"
80+
for API key authentication, or "InstancePrincipal".
81+
- `oci_profile`: OCI CLI profile to use for token retrieval (API key authentication only).
82+
- `thick_mode`: Set to `True` to use the `oracledb` thick mode driver.
83+
- `subscriber_name`: Name of the durable subscriber for the `sub-norm` sample.
84+
85+
### `sub-raw`
86+
87+
Run the script. Without parameter, it will show all messages.
88+
You can filter by Digital Twin Instance OCID, display name, or endpoint (MQTT topic).
89+
90+
```text
91+
$ ./sub-raw.py --help
92+
usage: sub-raw.py [-h] [-v] [-d] [--id ID | --display-name DISPLAY_NAME] [--endpoint ENDPOINT]
93+
94+
Subscribe to the raw messages stream from IoT Platform.
95+
96+
options:
97+
-h, --help show this help message and exit
98+
-v, --verbose Enable verbose (INFO level) logging.
99+
-d, --debug Enable debug (DEBUG level) logging.
100+
--id ID The Digital Twin Instance OCID (mutually exclusive with --display-name).
101+
--display-name DISPLAY_NAME
102+
The Digital Twin Instance display name (mutually exclusive with --id).
103+
--endpoint ENDPOINT The message endpoint (topic).
104+
./sub-raw.py -v
105+
2025-11-10 14:24:03,316 - INFO - sub-raw.py - Connected
106+
2025-11-10 14:24:06,785 - INFO - sub-raw.py - Subscriber aq_sub_183a199f_8820_449e_a912_99339877c23a registered
107+
2025-11-10 14:24:06,807 - INFO - sub-raw.py - Listening for messages
108+
....
109+
OCID : ocid1.iotdigitaltwininstance.oc1.<redacted>
110+
Time received: 2025-11-10 13:24:52.109055
111+
Endpoint : zigbee2mqtt/sonoff-temp-04
112+
Content : {"temperature":19.8,"humidity":64.9,"battery":71,"linkquality":51}
113+
.....
114+
OCID : ocid1.iotdigitaltwininstance.oc1.<redacted>
115+
Time received: 2025-11-10 13:25:48.182738
116+
Endpoint : ttn/devices/bulles-minilora-01/up
117+
Content : {"temperature":16.1,"humidity":11,"battery":96,"rssi":-76,"snr":12}
118+
119+
OCID : ocid1.iotdigitaltwininstance.oc1.<redacted>
120+
Time received: 2025-11-10 13:25:50.742284
121+
Endpoint : zigbee2mqtt/sonoff-temp-06
122+
Content : {"temperature":21.2,"humidity":51.7,"battery":80,"linkquality":58}
123+
^C
124+
Interrupted
125+
2025-11-10 14:25:51,831 - INFO - sub-raw.py - Subscriber aq_sub_183a199f_8820_449e_a912_99339877c23a unregistered
126+
2025-11-10 14:25:51,855 - INFO - sub-raw.py - Disconnected
127+
```
128+
129+
### `sub-norm`
130+
131+
The `sub-norm` script is similar and provides additional commands to manage
132+
the durable subscription:
133+
134+
```text
135+
$ ./sub-norm.py --help
136+
Usage: sub-norm.py [OPTIONS] COMMAND [ARGS]...
137+
138+
Stream Digital Twin normalized data.
139+
140+
This example illustrate the use of "durable subscribers": once the
141+
subscriber has been created, messages are retained and returned when the
142+
client connects.
143+
144+
Options:
145+
-v, --verbose Verbose mode
146+
-d, --debug Debug mode
147+
--help Show this message and exit.
148+
149+
Commands:
150+
stream Stream data.
151+
subscribe Subscribe to the normalized queue.
152+
unsubscribe Unsubscribe to the normalized queue.
153+
$ ./sub-norm.py subscribe --help
154+
Usage: sub-norm.py subscribe [OPTIONS]
155+
156+
Subscribe to the normalized queue.
157+
158+
Options:
159+
--id TEXT Digital Twin Instance ID (mutually exclusive with
160+
--display-name)
161+
--display-name TEXT Digital Twin Instance display name (mutually exclusive
162+
with --id)
163+
--content-path TEXT Path to the content
164+
--help Show this message and exit.
165+
$ ./sub-norm.py -v subscribe --content-path temperature
166+
2025-11-10 14:30:19,053 - INFO - sub-norm.py - Connected
167+
2025-11-10 14:30:22,026 - INFO - sub-norm.py - Subscriber sub_norm_subscriber registered
168+
2025-11-10 14:30:22,048 - INFO - sub-norm.py - Disconnected
169+
$ ./sub-norm.py -v stream
170+
2025-11-10 14:30:32,240 - INFO - sub-norm.py - Connected
171+
2025-11-10 14:30:32,242 - INFO - sub-norm.py - Listening for messages
172+
.
173+
OCID : ocid1.iotdigitaltwininstance.oc1.<redacted>
174+
Time observed: 2025-11-10T13:30:51.351412Z
175+
Content path : temperature
176+
Value : 15.9
177+
..
178+
OCID : ocid1.iotdigitaltwininstance.oc1.<redacted>
179+
Time observed: 2025-11-10T13:31:14.526270Z
180+
Content path : temperature
181+
Value : 21.0
182+
^C
183+
Interrupted
184+
2025-11-10 14:31:17,731 - INFO - sub-norm.py - Disconnected
185+
$ ./sub-norm.py -v unsubscribe
186+
2025-11-10 14:31:26,963 - INFO - sub-norm.py - Connected
187+
2025-11-10 14:31:27,275 - INFO - sub-norm.py - Subscriber sub_norm_subscriber unregistered
188+
2025-11-10 14:31:27,296 - INFO - sub-norm.py - Disconnected
189+
```
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#!/usr/bin/env python3
2+
"""Configuration constants for the IoT Platform queue samples.
3+
4+
This file defines database and authentication parameters. Copy and rename to
5+
config.py, then edit values as needed.
6+
7+
See in-file comments for details on each variable.
8+
"""
9+
import os
10+
11+
# Database connect string and token scope as provided by the OCI IoT Platform.
12+
# These are the dbConnectionString and dbTokenScope properties of your IoT Domain Group.
13+
# They can be retrieved with:
14+
# oci iot domain-group get --iot-domain-group-id <IoT Domain Group OCID> \
15+
# --query 'data.["db-connection-string", "db-token-scope"]'
16+
db_connect_string = "tcps:adb.<region>.oraclecloud.com:1521/<redacted>"
17+
db_token_scope = "urn:oracle:db::id::<Compartment OCID>"
18+
19+
# Domain short name.
20+
# This is the hostname part of the IoT Domain device host and can be retrieved using:
21+
# oci iot domain get --iot-domain-id <IoT Domain OCID> |
22+
# jq -r '.'data."device-host"' | split(".")[0]'
23+
iot_domain_short_name = "<Domain SHort Name>"
24+
25+
# OCI Authentication type. Must be either "ConfigFileAuthentication" or "InstancePrincipal"
26+
# oci_auth_type = "ConfigFileAuthentication"
27+
oci_auth_type = "InstancePrincipal"
28+
29+
# OCI CLI profile to use for token retrieval when authentication type is "ConfigFileAuthentication"
30+
oci_profile = os.getenv("OCI_CLI_PROFILE", "DEFAULT")
31+
32+
# Select Thick or Thin mode for oracledb.
33+
# TL;DR: use Thin mode unless you specifically need the Thick driver.
34+
# See
35+
# https://python-oracledb.readthedocs.io/en/latest/user_guide/appendix_b.html
36+
# for a detailed explanation.
37+
thick_mode = False
38+
# In Thick mode, if the Oracle Client libraries can't be found, set the location below. See
39+
# https://python-oracledb.readthedocs.io/en/latest/user_guide/initialization.html#enabling-python-oracledb-thick-mode
40+
# for more information on setting lib_dir for your operating system.
41+
lib_dir = None
42+
43+
# For the "durable" sample (sub_norm.py), the name of the durable subscriber
44+
subscriber_name = "sub_norm_subscriber"
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
click~=8.2.0
2+
oci~=2.0,>=2.161
3+
oracledb>=3.2.0

samples/python/queues/sqlnet.ora

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
SSL_SERVER_DN_MATCH=TRUE

0 commit comments

Comments
 (0)