diff --git a/README.md b/README.md index 96f92e35..b7773eae 100644 --- a/README.md +++ b/README.md @@ -89,7 +89,7 @@ MQTT module for the [OpenDAQ SDK](https://github.com/openDAQ/openDAQ). The modul ] } ``` - The *‘Timestamp’* and *‘Unit’* fields may be omitted. The fields inside *‘Unit’* may also be omitted. Example: + The *’Timestamp’* and *’Unit’* fields may be omitted. The fields inside *’Unit’* may also be omitted. The `"Value"` and `"Timestamp"` fields support dot-separated paths for accessing nested JSON fields, e.g. `"data.temperature"` or `"info.ts"` (see section 4 for details). Example: ```json { "/mirip/UNet3AC2/sensor/data":[ @@ -130,10 +130,17 @@ MQTT module for the [OpenDAQ SDK](https://github.com/openDAQ/openDAQ). The modul - **Where**: *include/mqtt_streaming_module/mqtt_json_decoder_fb_impl.h, src/mqtt_json_decoder_fb_impl.cpp* - **Purpose**: To parse JSON string data to extract a value and a timestamp, and to send data and domain samples based on this data. - **Main properties**: - - *ValueKey* (string) — Specifies the JSON field name from which value data will be extracted. This property is required. It should be contained in the incoming JSON messages. Otherwise, a parsing error will occur. + - *ValueKey* (string) — Specifies the JSON field name (or dot-separated path for nested objects) from which value data will be extracted. Use `.` to access nested fields, e.g. `"data.temperature"` extracts the `temperature` field from inside the `data` object. This property is required. It should be contained in the incoming JSON messages. Otherwise, a parsing error will occur. - *DomainMode* (list) — Defines how the timestamp of the decoded signal is generated. By default it is set to *None* (0), which means that the decoded signal doesn't have a timestamp. If set to *Extract from message* (1), the JSON decoder will try to extract the timestamp from the incoming JSON messages (see *DomainKey* property). If set to *System time* (2), the timestamp of the decoded signal is set to the system time when the JSON message is received. - - *DomainKey* (string) — Specifies the JSON field name from which timestamp will be extracted. This property is optional. If it is set it should be contained in the incoming JSON messages. Otherwise, a parsing error will occur. + - *DomainKey* (string) — Specifies the JSON field name (or dot-separated path for nested objects) from which the timestamp will be extracted. Dot notation is supported, e.g. `"info.timestamp"` extracts `timestamp` from inside the `info` object. This property is optional. If it is set it should be contained in the incoming JSON messages. Otherwise, a parsing error will occur. - *Unit* (string) — Specifies the unit symbol for the decoded value. This property is optional. + + Dot-notation paths support arbitrary nesting depth. For example, `"sensor.values.temperature"` traverses `sensor` → `values` → `temperature`. + Example of a nested JSON MQTT message and the corresponding property values: + ```json + {"data": {"temperature": 25.68, "humidity": 72.1}, "info": {"timestamp": 1776332277}} + ``` + For this message, set *ValueKey* to `"data.temperature"` and *DomainKey* to `"info.timestamp"`. --- ## Building MQTTStreamingModule diff --git a/modules/mqtt_streaming_module/tests/test_data.h b/modules/mqtt_streaming_module/tests/test_data.h index 5a0998d4..8f3d6416 100644 --- a/modules/mqtt_streaming_module/tests/test_data.h +++ b/modules/mqtt_streaming_module/tests/test_data.h @@ -222,6 +222,31 @@ inline const std::string MISSING_FIELD_JSON_DATA_2 = R"json({ } )json"; +inline const std::string NESTED_JSON_DATA_0 = R"json({ + "data": {"temperature": }, + "info": {"timestamp": } +} +)json"; + +inline const std::string NESTED_JSON_DATA_1 = R"json({ + "data": {"temperature": } +} +)json"; + +inline const std::string NESTED_JSON_DATA_2 = R"json({ + "data": {"temperature": }, + "timestamp": +} +)json"; + +inline const std::string DEEP_NESTED_JSON_DATA = R"json({ + "sensor": { + "values": {"temperature": }, + "metadata": {"unit": "°C", "ts": , "location": "lab"} + } +} +)json"; + inline const std::vector> DATA_DOUBLE_INT_0 = {{23.50000001, 1761567115}, {-0.00000005583, 1761567116}, {19.84916651651, 1761567117}, diff --git a/modules/mqtt_streaming_module/tests/test_mqtt_json_decoder_fb.cpp b/modules/mqtt_streaming_module/tests/test_mqtt_json_decoder_fb.cpp index 630c4c3d..df6c40cc 100644 --- a/modules/mqtt_streaming_module/tests/test_mqtt_json_decoder_fb.cpp +++ b/modules/mqtt_streaming_module/tests/test_mqtt_json_decoder_fb.cpp @@ -200,20 +200,86 @@ std::vector replacePlaceholders(const std::vector size_t + { + const size_t closeQ = jsonTemplate.rfind('"', from); + if (closeQ == std::string::npos) + return std::string::npos; + const size_t openQ = jsonTemplate.rfind('"', closeQ - 1); + if (openQ == std::string::npos) + return std::string::npos; + key = jsonTemplate.substr(openQ + 1, closeQ - openQ - 1); + return openQ; + }; + + std::vector segments; + size_t scanFrom = std::string::npos; + + { + std::string key; + scanFrom = extractKeyBefore(pos, key); + if (scanFrom == std::string::npos) + return ""; + segments.push_back(key); + } + + // walk up ancestor objects by finding the '{' that opens each level + while (scanFrom > 0) + { + // scan backwards for the '{' at depth 0 that opens the current object + int depth = 0; + size_t p = scanFrom; + size_t bracePos = std::string::npos; + while (p > 0) + { + --p; + const char c = jsonTemplate[p]; + if (c == '}') + ++depth; + else if (c == '{') + { + if (depth == 0) + { + bracePos = p; + break; + } + --depth; + } + } + if (bracePos == std::string::npos) + break; + + size_t q = bracePos; + while (q > 0 && std::isspace(static_cast(jsonTemplate[q - 1]))) + --q; + if (q == 0 || jsonTemplate[q - 1] != ':') + break; // root object — no parent key + --q; // skip ':' + + std::string ancestorKey; + const size_t ancestorOpenQ = extractKeyBefore(q, ancestorKey); + if (ancestorOpenQ == std::string::npos) + break; + + segments.push_back(ancestorKey); + scanFrom = ancestorOpenQ; + } + + std::reverse(segments.begin(), segments.end()); + + std::string result; + for (size_t i = 0; i < segments.size(); ++i) + { + if (i > 0) + result += '.'; + result += segments[i]; + } return result; } @@ -1181,3 +1247,65 @@ TEST_F(MqttJsonDecoderFbTest, PacketWithTheSameTS) EXPECT_EQ(getComponentStatus(), ok); EXPECT_EQ(getStatusMsg().find(warnMsg), std::string::npos); } + +TEST_F(MqttJsonDecoderFbTest, NestedValueFieldWithDomain) +{ + auto dataToReceive = transferData(DATA_DOUBLE_INT_0, NESTED_JSON_DATA_0); + ASSERT_EQ(DATA_DOUBLE_INT_0.size(), dataToReceive.size()); + ASSERT_TRUE(compareData(DATA_DOUBLE_INT_0, dataToReceive)); + ASSERT_EQ(decoderObj.getStatusContainer().getStatus("ComponentStatus"), + Enumeration("ComponentStatusType", "Ok", decoderObj.getContext().getTypeManager())); + EXPECT_NE(decoderObj.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Parsing succeeded"), std::string::npos); +} + +TEST_F(MqttJsonDecoderFbTest, NestedValueMixedDepth) +{ + auto dataToReceive = transferData(DATA_DOUBLE_INT_2, NESTED_JSON_DATA_2); + ASSERT_EQ(DATA_DOUBLE_INT_2.size(), dataToReceive.size()); + ASSERT_TRUE(compareData(DATA_DOUBLE_INT_2, dataToReceive)); + ASSERT_EQ(decoderObj.getStatusContainer().getStatus("ComponentStatus"), + Enumeration("ComponentStatusType", "Ok", decoderObj.getContext().getTypeManager())); + EXPECT_NE(decoderObj.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Parsing succeeded"), std::string::npos); +} + +TEST_F(MqttJsonDecoderFbTest, DeepNestedValueField) +{ + auto dataToReceive = transferData(DATA_DOUBLE_INT_1, DEEP_NESTED_JSON_DATA); + ASSERT_EQ(DATA_DOUBLE_INT_1.size(), dataToReceive.size()); + ASSERT_TRUE(compareData(DATA_DOUBLE_INT_1, dataToReceive)); + ASSERT_EQ(decoderObj.getStatusContainer().getStatus("ComponentStatus"), + Enumeration("ComponentStatusType", "Ok", decoderObj.getContext().getTypeManager())); + EXPECT_NE(decoderObj.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Parsing succeeded"), std::string::npos); +} + +TEST_F(MqttJsonDecoderFbTest, NestedValueFieldWithoutDomain) +{ + auto dataToReceive = transferDataWithoutDomain(DATA_DOUBLE_INT_1, NESTED_JSON_DATA_1); + ASSERT_EQ(DATA_DOUBLE_INT_1.size(), dataToReceive.size()); + ASSERT_TRUE(compareData(DATA_DOUBLE_INT_1, dataToReceive)); + ASSERT_EQ(decoderObj.getStatusContainer().getStatus("ComponentStatus"), + Enumeration("ComponentStatusType", "Ok", decoderObj.getContext().getTypeManager())); + EXPECT_NE(decoderObj.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Parsing succeeded"), std::string::npos); +} + +TEST_F(MqttJsonDecoderFbTest, NestedMissingField) +{ + const auto topic = buildTopicName(); + CreateDecoderFB(topic, "data.nonexistent", DDSM::None, ""); + + auto signal = getSignals()[0]; + auto reader = daq::PacketReader(signal); + + auto msgs = replacePlaceholders(DATA_DOUBLE_INT_0, DEEP_NESTED_JSON_DATA); + for (const auto& str : msgs) + { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + onSignalsMessage({topic, std::vector(str.begin(), str.end()), 1, 0}); + } + + auto dataToReceive = read(reader, signal, 1000); + ASSERT_EQ(0u, dataToReceive.size()); + ASSERT_EQ(decoderObj.getStatusContainer().getStatus("ComponentStatus"), + Enumeration("ComponentStatusType", "Error", decoderObj.getContext().getTypeManager())); + EXPECT_NE(decoderObj.getStatusContainer().getStatusMessage("ComponentStatus").toStdString().find("Parsing failed"), std::string::npos); +} \ No newline at end of file diff --git a/shared/mqtt_streaming_protocol/include/mqtt_streaming_protocol/MqttDataWrapper.h b/shared/mqtt_streaming_protocol/include/mqtt_streaming_protocol/MqttDataWrapper.h index 882dd087..ea7c3157 100644 --- a/shared/mqtt_streaming_protocol/include/mqtt_streaming_protocol/MqttDataWrapper.h +++ b/shared/mqtt_streaming_protocol/include/mqtt_streaming_protocol/MqttDataWrapper.h @@ -104,8 +104,8 @@ class MqttDataWrapper final bool parseJson(ExtractionContext& ctx, const std::string& json); bool parseJsonFields(ExtractionContext& ctx); - bool extractValue(ExtractionContext& ctx, const std::string& jsonFieldName); - bool extractTimestamp(ExtractionContext& ctx, const std::string& jsonFieldName); + bool extractValue(ExtractionContext& ctx, const rapidjson::Value& node); + bool extractTimestamp(ExtractionContext& ctx, const rapidjson::Value& node); bool validateExtractionResult(ExtractionContext& ctx); bool buildPackets(ExtractionContext& ctx, const uint64_t externalTs); diff --git a/shared/mqtt_streaming_protocol/src/MqttDataWrapper.cpp b/shared/mqtt_streaming_protocol/src/MqttDataWrapper.cpp index 5dfc11a4..5fbbb8b2 100644 --- a/shared/mqtt_streaming_protocol/src/MqttDataWrapper.cpp +++ b/shared/mqtt_streaming_protocol/src/MqttDataWrapper.cpp @@ -85,6 +85,24 @@ std::pair> parseHomogeneousArray(const } return result; } + +const rapidjson::Value* resolveJsonPath(const rapidjson::Value& root, const std::string& dotPath) +{ + const rapidjson::Value* cur = &root; + std::string::size_type start = 0; + while (start < dotPath.size()) + { + auto dot = dotPath.find('.', start); + if (dot == std::string::npos) + dot = dotPath.size(); + const std::string segment(dotPath, start, dot - start); + if (!cur->IsObject() || !cur->HasMember(segment)) + return nullptr; + cur = &(*cur)[segment]; + start = dot + 1; + } + return cur; +} } // namespace namespace mqtt @@ -159,19 +177,17 @@ bool MqttDataWrapper::parseJsonFields(ExtractionContext& ctx) return false; try { - for (auto it = ctx.jsonDocument.MemberBegin(); it != ctx.jsonDocument.MemberEnd(); ++it) + if (!msgDescriptor.valueFieldName.empty()) { - const std::string name = it->name.GetString(); - bool processed = false; - processed |= extractValue(ctx, name); - if (domainSignalMode == DomainSignalMode::ExtractFromMessage && !processed) - { - processed |= extractTimestamp(ctx, name); - } - if (!processed) - { - // the field is not precessed - } + const auto* node = resolveJsonPath(ctx.jsonDocument, msgDescriptor.valueFieldName); + if (node) + extractValue(ctx, *node); + } + if (domainSignalMode == DomainSignalMode::ExtractFromMessage && !msgDescriptor.tsFieldName.empty()) + { + const auto* node = resolveJsonPath(ctx.jsonDocument, msgDescriptor.tsFieldName); + if (node) + extractTimestamp(ctx, *node); } } catch (...) @@ -182,121 +198,111 @@ bool MqttDataWrapper::parseJsonFields(ExtractionContext& ctx) return true; } -bool MqttDataWrapper::extractValue(ExtractionContext& ctx, const std::string& jsonFieldName) +bool MqttDataWrapper::extractValue(ExtractionContext& ctx, const rapidjson::Value& node) { - bool fieldFound = (!msgDescriptor.valueFieldName.empty() && jsonFieldName == msgDescriptor.valueFieldName); - if (fieldFound) + const auto fillContext = [&ctx](mqtt::CmdResult& parsingStatus, auto&& out) + { + ctx.result.merge(parsingStatus); + ctx.valueExtracted = parsingStatus.success; + if (parsingStatus.success) + ctx.value = std::move(out); + }; + if (node.IsArray()) { - const auto fillContext = [&ctx](mqtt::CmdResult& parsingStatus, auto&& out) + const auto& arr = node.GetArray(); + if (arr.Empty()) { - ctx.result.merge(parsingStatus); - ctx.valueExtracted = parsingStatus.success; - if (parsingStatus.success) - ctx.value = std::move(out); - }; - const auto& v = ctx.jsonDocument[jsonFieldName]; - if (v.IsArray()) + ctx.result.addError("Value field is an array but it is empty. "); + } + else if (arr[0].IsInt64() || arr[0].IsUint64()) { - const auto& arr = v.GetArray(); - if (arr.Empty()) - { - ctx.result.addError("Value field is an array but it is empty. "); - } - else if (arr[0].IsInt64() || arr[0].IsUint64()) - { - auto [parsingStatus, out] = parseHomogeneousArray(arr); - fillContext(parsingStatus, std::move(out)); - } - else if (arr[0].IsDouble()) - { - auto [parsingStatus, out] = parseHomogeneousArray(arr); - fillContext(parsingStatus, std::move(out)); - } - else if (arr[0].IsString()) - { - auto [parsingStatus, out] = parseHomogeneousArray(arr); - fillContext(parsingStatus, std::move(out)); - } - else - { - ctx.result.addError(fmt::format("Unsupported value type for '{}' array. ", jsonFieldName)); - } + auto [parsingStatus, out] = parseHomogeneousArray(arr); + fillContext(parsingStatus, std::move(out)); + } + else if (arr[0].IsDouble()) + { + auto [parsingStatus, out] = parseHomogeneousArray(arr); + fillContext(parsingStatus, std::move(out)); + } + else if (arr[0].IsString()) + { + auto [parsingStatus, out] = parseHomogeneousArray(arr); + fillContext(parsingStatus, std::move(out)); } else { - ctx.valueExtracted = true; - if (v.IsInt64()) - ctx.value = std::vector{v.GetInt64()}; - else if (v.IsUint64()) - ctx.value = std::vector{static_cast(v.GetUint64())}; - else if (v.IsDouble()) - ctx.value = std::vector{v.GetDouble()}; - else if (v.IsString()) - ctx.value = std::vector{std::string(v.GetString())}; - else - { - ctx.result.addError(fmt::format("Unsupported value type for '{}'. ", jsonFieldName)); - ctx.valueExtracted = false; - } + ctx.result.addError(fmt::format("Unsupported value type for '{}' array. ", msgDescriptor.valueFieldName)); + } + } + else + { + ctx.valueExtracted = true; + if (node.IsInt64()) + ctx.value = std::vector{node.GetInt64()}; + else if (node.IsUint64()) + ctx.value = std::vector{static_cast(node.GetUint64())}; + else if (node.IsDouble()) + ctx.value = std::vector{node.GetDouble()}; + else if (node.IsString()) + ctx.value = std::vector{std::string(node.GetString())}; + else + { + ctx.result.addError(fmt::format("Unsupported value type for '{}'. ", msgDescriptor.valueFieldName)); + ctx.valueExtracted = false; } } - return fieldFound; + return ctx.valueExtracted; } -bool MqttDataWrapper::extractTimestamp(ExtractionContext& ctx, const std::string& jsonFieldName) +bool MqttDataWrapper::extractTimestamp(ExtractionContext& ctx, const rapidjson::Value& node) { - bool fieldFound = (!msgDescriptor.tsFieldName.empty() && jsonFieldName == msgDescriptor.tsFieldName); - if (fieldFound) + if (node.IsArray()) { - const auto& tsField = ctx.jsonDocument[jsonFieldName]; - if (tsField.IsArray()) + const auto& arr = node.GetArray(); + if (arr.Empty()) { - const auto& arr = tsField.GetArray(); - if (arr.Empty()) - { - ctx.result.addError("Timestamp field is an array but it is empty. "); - } - else if (arr[0].IsInt() || arr[0].IsUint64() || arr[0].IsInt64()) - { - auto [parsingStatus, out] = parseHomogeneousArray(arr); - ctx.result.merge(parsingStatus); - ctx.tsExtracted = parsingStatus.success; - if (parsingStatus.success) - ctx.ts = std::move(out); + ctx.result.addError("Timestamp field is an array but it is empty. "); + } + else if (arr[0].IsInt() || arr[0].IsUint64() || arr[0].IsInt64()) + { + auto [parsingStatus, out] = parseHomogeneousArray(arr); + ctx.result.merge(parsingStatus); + ctx.tsExtracted = parsingStatus.success; + if (parsingStatus.success) + ctx.ts = std::move(out); - std::for_each(ctx.ts.begin(), ctx.ts.end(), [](auto& val) { val = mqtt::utils::numericToMicroseconds(val); }); - } - else if (arr[0].IsString()) - { - auto [parsingStatus, out] = parseHomogeneousArray(arr); - ctx.result.merge(parsingStatus); - ctx.tsExtracted = parsingStatus.success; - if (parsingStatus.success) - { - ctx.ts.reserve(out.size()); - std::for_each(out.cbegin(), out.cend(), [&ctx](const auto& val) { ctx.ts.push_back(utils::toUnixTicks(val)); }); - } - } - else + std::for_each(ctx.ts.begin(), ctx.ts.end(), [](auto& val) { val = mqtt::utils::numericToMicroseconds(val); }); + } + else if (arr[0].IsString()) + { + auto [parsingStatus, out] = parseHomogeneousArray(arr); + ctx.result.merge(parsingStatus); + ctx.tsExtracted = parsingStatus.success; + if (parsingStatus.success) { - ctx.result.addError("Timestamp value type is not supported. "); + ctx.ts.reserve(out.size()); + std::for_each(out.cbegin(), out.cend(), [&ctx](const auto& val) { ctx.ts.push_back(utils::toUnixTicks(val)); }); } } else { - ctx.tsExtracted = true; - if (tsField.IsInt() || tsField.IsUint64() || tsField.IsInt64()) - ctx.ts.push_back(mqtt::utils::numericToMicroseconds(tsField.GetUint64())); - else if (tsField.IsString()) - ctx.ts.push_back(utils::toUnixTicks(tsField.GetString())); - else - { - ctx.result.addError("Timestamp value type is not supported. "); - ctx.tsExtracted = false; - } + ctx.result.addError("Timestamp value type is not supported. "); + } + } + else + { + ctx.tsExtracted = true; + if (node.IsInt() || node.IsUint64() || node.IsInt64()) + ctx.ts.push_back(mqtt::utils::numericToMicroseconds(node.GetUint64())); + else if (node.IsString()) + ctx.ts.push_back(utils::toUnixTicks(node.GetString())); + else + { + ctx.result.addError("Timestamp value type is not supported. "); + ctx.tsExtracted = false; } } - return fieldFound; + return ctx.tsExtracted; } bool MqttDataWrapper::validateExtractionResult(ExtractionContext& ctx)