From 65f2babcec784cd99e598155565a85f6980b63ba Mon Sep 17 00:00:00 2001 From: exzile Date: Fri, 26 Jun 2026 17:24:41 -0400 Subject: [PATCH] Add idle model unload for mediapipe LLM graphs Adds an opt-in idle timeout that unloads an LLM graph's heavy resources (freeing GPU/CPU memory) after a period with no inference, and lazily reloads on the next request, so the GPU can be shared with other workloads. Follows llama.cpp's --sleep-idle-seconds model. - Config: idle_unload_timeout_seconds on the mediapipe config entry (0 = disabled, default). Added to the JSON schema. Phase-1 scope: restricted to LLM continuous-batching graphs (HttpLLMCalculator); rejected for graphs with Python nodes or non-LLM calculators at config validation. - State machine: new UNLOADED state + UnloadEvent. AVAILABLE->UNLOADED on idle; UNLOADED wakes via the existing reload path. isAvailable() is false for UNLOADED, convertToModelStatus() maps it to AVAILABLE so health/ readiness still see the servable (it auto-reloads). - Concurrency: a per-definition recursive lifecycleMtx serializes reload/retire/unload/wakeUp so the watcher and config threads never race on graph state or side packets. unload() is non-blocking, tears down only after confirming the AVAILABLE->UNLOADED transition, and skips while requests or inferences are in flight. The idle timeout is cached in an atomic for lock-free watcher reads. - In-flight guard: an RAII ActiveInferenceGuard (held for the executor's lifetime) keeps a shared_ptr inference counter so a generation that outlives the idle timeout is never unloaded mid-stream; completing an inference refreshes the activity timestamp. lastActivityTimeNs and the counter are shared_ptr so they outlive the definition if an executor is still running during retire. - Wake-up failure is retryable: if the lazy reload fails, the graph reverts to UNLOADED (not a wedged failed state) so the next request re-attempts the wake and self-heals once the underlying issue is resolved; the current request gets a clean error. - Metrics: ovms_graph_loaded gauge (1 loaded / 0 unloaded) per graph. - Composes with --cache_dir so wake-up is a cache import, not a recompile. Tested: state-machine + schema unit tests; in-flight-guard and wake-failure unit tests; functional unload/reload/idle-reset/disabled-default/concurrency with a real model; and end-to-end on an Intel Arc GPU (idle unload frees resources, long generations are not unloaded mid-stream, wake-up reloads and serves, soak shows no leak, failed wake self-heals). No regressions vs main. Implements #4141 Co-Authored-By: Claude Opus 4.8 --- docs/llm/reference.md | 10 + src/dags/pipelinedefinitionstatus.cpp | 56 +- src/dags/pipelinedefinitionstatus.hpp | 37 +- .../mediapipegraphconfig.cpp | 9 + .../mediapipegraphconfig.hpp | 16 + .../mediapipegraphdefinition.cpp | 205 +++++- .../mediapipegraphdefinition.hpp | 59 ++ .../mediapipegraphexecutor.cpp | 18 +- .../mediapipegraphexecutor.hpp | 58 +- src/metrics/metric_config.cpp | 1 + src/metrics/metric_config.hpp | 2 + src/model_metric_reporter.cpp | 12 + src/model_metric_reporter.hpp | 3 + src/modelmanager.cpp | 82 +++ src/modelmanager.hpp | 6 + src/schema.cpp | 4 + src/test/llm/llmnode_test.cpp | 638 ++++++++++++++++++ src/test/mediapipeflow_test.cpp | 97 +++ src/test/pipelinedefinitionstatus_test.cpp | 130 ++++ src/test/schema_test.cpp | 60 ++ src/test/test_utils.hpp | 17 + 21 files changed, 1509 insertions(+), 11 deletions(-) diff --git a/docs/llm/reference.md b/docs/llm/reference.md index 698d05031b..00a9602e3e 100644 --- a/docs/llm/reference.md +++ b/docs/llm/reference.md @@ -109,6 +109,16 @@ The calculator supports the following `node_options` for tuning the pipeline con - `optional string tool_parser` - name of the parser to use for tool calls extraction from model output before creating a response; - `optional bool enable_tool_guided_generation` - enable enforcing tool schema during generation. Requires setting response parser. [default = false]; - `optional SparseAttentionConfig sparse_attention_config` - Sparse attention configuration. Disabled if not specified. +- `optional int64 idle_unload_timeout_seconds` - unload the graph's model resources after this many seconds with no inference requests, freeing GPU/CPU memory; the model is reloaded automatically on the next request. `0` disables the feature [default = 0]. See [Idle model unload](#idle-model-unload). + +### Idle model unload +When `idle_unload_timeout_seconds` is set to a positive value, the model server unloads the LLM graph's heavy resources (the continuous batching pipeline, freeing GPU VRAM / host memory) after the configured period without any inference requests. The first request after an unload transparently reloads the model and is served once it is ready, so the GPU can be used by other workloads while a model is idle. + +Notes: +- Only inference requests reset the idle timer; status/metrics/health endpoints do not keep a model loaded. +- The first request after an idle unload pays the reload latency. Combine with [model caching](../model_cache.md) (`--cache_dir`) so the reload is a fast cache import rather than a full recompile. +- The graph reports as `AVAILABLE` while idle-unloaded (it auto-reloads on demand). The `ovms_graph_loaded` metric reports `1` when loaded and `0` when idle-unloaded. +- Supported for LLM continuous-batching graphs. Graphs containing Python nodes are not supported with this setting. ### Caching settings The value of `cache_size` might have performance and stability implications. It is used for storing LLM model KV cache data. Adjust it based on your environment capabilities, model size and expected level of concurrency. diff --git a/src/dags/pipelinedefinitionstatus.cpp b/src/dags/pipelinedefinitionstatus.cpp index 5fb27479b0..17afaa3a0e 100644 --- a/src/dags/pipelinedefinitionstatus.cpp +++ b/src/dags/pipelinedefinitionstatus.cpp @@ -35,7 +35,8 @@ const std::string& pipelineDefinitionStateCodeToString(PipelineDefinitionStateCo {PipelineDefinitionStateCode::LOADING_PRECONDITION_FAILED_REQUIRED_REVALIDATION, "LOADING_PRECONDITION_FAILED_REQUIRED_REVALIDATION"}, {PipelineDefinitionStateCode::AVAILABLE_REQUIRED_REVALIDATION, "AVAILABLE_REQUIRED_REVALIDATION"}, {PipelineDefinitionStateCode::AVAILABLE, "AVAILABLE"}, - {PipelineDefinitionStateCode::RETIRED, "RETIRED"}}; + {PipelineDefinitionStateCode::RETIRED, "RETIRED"}, + {PipelineDefinitionStateCode::UNLOADED, "UNLOADED"}}; return names.at(code); } @@ -62,6 +63,9 @@ StateKeeper BeginState::handle(const RetireEvent& e) const { throw std::logic_error(INVALID_TRANSITION_MESSAGE); return {}; } +StateKeeper BeginState::handle(const UnloadEvent& e) const { + return {}; // unload is a no-op when not yet loaded +} PipelineDefinitionStateCode ReloadState::getStateCode() const { return code; @@ -84,6 +88,9 @@ StateKeeper ReloadState::handle(const RetireEvent& e) const { throw std::logic_error(INVALID_TRANSITION_MESSAGE); return {}; } +StateKeeper ReloadState::handle(const UnloadEvent& e) const { + return {}; // unload is a no-op while reloading +} PipelineDefinitionStateCode AvailableState::getStateCode() const { return code; @@ -105,6 +112,9 @@ StateChanger AvailableState::handle(const UsedMod StateChanger AvailableState::handle(const RetireEvent& e) const { return {}; } +StateChanger AvailableState::handle(const UnloadEvent& e) const { + return {}; +} PipelineDefinitionStateCode AvailableRequiredRevalidation::getStateCode() const { return code; @@ -124,6 +134,9 @@ StateKeeper AvailableRequiredRevalidation::handle(const UsedModelChangedEvent& e StateChanger AvailableRequiredRevalidation::handle(const RetireEvent& e) const { return {}; } +StateKeeper AvailableRequiredRevalidation::handle(const UnloadEvent& e) const { + return {}; // unload is a no-op in AVAILABLE_REQUIRED_REVALIDATION +} PipelineDefinitionStateCode LoadingPreconditionFailedState::getStateCode() const { return code; @@ -145,6 +158,10 @@ StateChanger LoadingPreconditio StateChanger LoadingPreconditionFailedState::handle(const RetireEvent& e) const { return {}; } +StateChanger LoadingPreconditionFailedState::handle(const UnloadEvent& e) const { + // Revert a failed wake-up reload back to UNLOADED so the next request retries. + return {}; +} PipelineDefinitionStateCode LoadingFailedLastValidationRequiredRevalidation::getStateCode() const { return code; @@ -164,6 +181,9 @@ StateKeeper LoadingFailedLastValidationRequiredRevalidation::handle(const UsedMo StateChanger LoadingFailedLastValidationRequiredRevalidation::handle(const RetireEvent& e) const { return {}; } +StateKeeper LoadingFailedLastValidationRequiredRevalidation::handle(const UnloadEvent& e) const { + return {}; // unload is a no-op when loading already failed +} PipelineDefinitionStateCode RetiredState::getStateCode() const { return code; @@ -187,6 +207,31 @@ StateKeeper RetiredState::handle(const RetireEvent& e) const { throw std::logic_error(INVALID_TRANSITION_MESSAGE); return {}; } +StateKeeper RetiredState::handle(const UnloadEvent& e) const { + return {}; // unload is a no-op when already retired +} + +PipelineDefinitionStateCode UnloadedState::getStateCode() const { + return code; +} +StateChanger UnloadedState::handle(const ReloadEvent& e) const { + return {}; // wake-up: transition through reload path +} +StateChanger UnloadedState::handle(const RetireEvent& e) const { + return {}; // config removal while unloaded +} +StateChanger UnloadedState::handle(const ValidationPassedEvent& e) const { + return {}; // defensive: if validation passes directly, go available +} +StateKeeper UnloadedState::handle(const ValidationFailedEvent& e) const { + return {}; +} +StateKeeper UnloadedState::handle(const UsedModelChangedEvent& e) const { + return {}; +} +StateKeeper UnloadedState::handle(const UnloadEvent& e) const { + return {}; // already unloaded, idempotent +} PipelineDefinitionStatus::PipelineDefinitionStatus(const std::string& type, const std::string& name) : MachineState(type, name) {} @@ -233,6 +278,15 @@ std::tuple PipelineDefinitionSta ModelVersionState::END, ModelVersionStatusErrorCode::OK}; + case PipelineDefinitionStateCode::UNLOADED: + // Report AVAILABLE: the graph auto-reloads on the next inference request, + // so health checks and routing should treat it as available. Reporting END + // or UNLOADING would cause clients and load-balancers to permanently + // exclude this servable from their pools. + return { + ModelVersionState::AVAILABLE, + ModelVersionStatusErrorCode::OK}; + default: return {}; } diff --git a/src/dags/pipelinedefinitionstatus.hpp b/src/dags/pipelinedefinitionstatus.hpp index 59039f08a0..05366e8c99 100644 --- a/src/dags/pipelinedefinitionstatus.hpp +++ b/src/dags/pipelinedefinitionstatus.hpp @@ -34,7 +34,8 @@ enum class PipelineDefinitionStateCode { LOADING_PRECONDITION_FAILED_REQUIRED_REVALIDATION, AVAILABLE_REQUIRED_REVALIDATION, AVAILABLE, - RETIRED + RETIRED, + UNLOADED }; const std::string& pipelineDefinitionStateCodeToString(PipelineDefinitionStateCode code); @@ -112,6 +113,11 @@ struct LoadingFailedLastValidationRequiredRevalidation; * State in which pipeline is retired - removed from config */ struct RetiredState; +/** + * State in which pipeline is idle-unloaded (resources freed) but not retired. + * Auto-reloads on the next inference request. + */ +struct UnloadedState; #define EVENT_STRUCT_WITH_NAME(x) \ struct x { \ @@ -131,6 +137,7 @@ EVENT_STRUCT_WITH_NAME(ValidationFailedEvent); EVENT_STRUCT_WITH_NAME(ValidationPassedEvent); EVENT_STRUCT_WITH_NAME(UsedModelChangedEvent); EVENT_STRUCT_WITH_NAME(RetireEvent); +EVENT_STRUCT_WITH_NAME(UnloadEvent); template struct StateChanger { @@ -155,6 +162,7 @@ struct BeginState { StateChanger handle(const ValidationFailedEvent& e) const; StateKeeper handle(const UsedModelChangedEvent& e) const; StateKeeper handle(const RetireEvent& e) const; + StateKeeper handle(const UnloadEvent& e) const; }; struct ReloadState { @@ -165,6 +173,7 @@ struct ReloadState { StateChanger handle(const ValidationFailedEvent& e) const; StateKeeper handle(const UsedModelChangedEvent& e) const; StateKeeper handle(const RetireEvent& e) const; + StateKeeper handle(const UnloadEvent& e) const; }; struct AvailableState { @@ -175,6 +184,7 @@ struct AvailableState { StateKeeper handle(const ValidationFailedEvent& e) const; StateChanger handle(const UsedModelChangedEvent& e) const; StateChanger handle(const RetireEvent& e) const; + StateChanger handle(const UnloadEvent& e) const; }; struct AvailableRequiredRevalidation { @@ -185,6 +195,7 @@ struct AvailableRequiredRevalidation { StateChanger handle(const ValidationFailedEvent& e) const; StateKeeper handle(const UsedModelChangedEvent& e) const; StateChanger handle(const RetireEvent& e) const; + StateKeeper handle(const UnloadEvent& e) const; }; struct LoadingPreconditionFailedState { @@ -195,6 +206,11 @@ struct LoadingPreconditionFailedState { StateKeeper handle(const ValidationFailedEvent& e) const; StateChanger handle(const UsedModelChangedEvent& e) const; StateChanger handle(const RetireEvent& e) const; + // A failed wake-up reload of an idle graph reverts to UNLOADED so the next + // inference request can retry the wake (self-healing once the underlying issue + // is resolved). Only wakeUpIfUnloaded() sends UnloadEvent from this state; + // the watcher's unload() only does so from AVAILABLE. + StateChanger handle(const UnloadEvent& e) const; }; struct LoadingFailedLastValidationRequiredRevalidation { @@ -205,6 +221,7 @@ struct LoadingFailedLastValidationRequiredRevalidation { StateChanger handle(const ValidationFailedEvent& e) const; StateKeeper handle(const UsedModelChangedEvent& e) const; StateChanger handle(const RetireEvent& e) const; + StateKeeper handle(const UnloadEvent& e) const; }; struct RetiredState { @@ -215,9 +232,25 @@ struct RetiredState { StateChanger handle(const ValidationFailedEvent& e) const; StateKeeper handle(const UsedModelChangedEvent& e) const; StateKeeper handle(const RetireEvent& e) const; + StateKeeper handle(const UnloadEvent& e) const; +}; + +struct UnloadedState { + static const PipelineDefinitionStateCode code = PipelineDefinitionStateCode::UNLOADED; + PipelineDefinitionStateCode getStateCode() const; + // Wake-up: reuse the reload path + StateChanger handle(const ReloadEvent& e) const; + // Config removal while unloaded + StateChanger handle(const RetireEvent& e) const; + // Defensive: if validation somehow passes after an unload, go back to AVAILABLE + StateChanger handle(const ValidationPassedEvent& e) const; + // All other events are no-ops in UNLOADED + StateKeeper handle(const ValidationFailedEvent& e) const; + StateKeeper handle(const UsedModelChangedEvent& e) const; + StateKeeper handle(const UnloadEvent& e) const; }; -class PipelineDefinitionStatus : public MachineState { +class PipelineDefinitionStatus : public MachineState { public: PipelineDefinitionStatus(const std::string& type, const std::string& name); bool isAvailable() const; diff --git a/src/mediapipe_internal/mediapipegraphconfig.cpp b/src/mediapipe_internal/mediapipegraphconfig.cpp index 200de9c289..14930f115c 100644 --- a/src/mediapipe_internal/mediapipegraphconfig.cpp +++ b/src/mediapipe_internal/mediapipegraphconfig.cpp @@ -119,6 +119,15 @@ Status MediapipeGraphConfig::parseNode(const rapidjson::Value& v) { this->setSubconfigPath(DEFAULT_SUBCONFIG_FILENAME); this->setModelMeshSubconfigPath(DEFAULT_MODELMESH_SUBCONFIG_FILENAME); } + if (v.HasMember("idle_unload_timeout_seconds")) { + int timeoutSeconds = v["idle_unload_timeout_seconds"].GetInt(); + if (timeoutSeconds < 0) { + SPDLOG_ERROR("idle_unload_timeout_seconds must be >= 0 for mediapipe graph: {}", this->getGraphName()); + return StatusCode::JSON_INVALID; + } + this->setIdleUnloadTimeoutSeconds(timeoutSeconds); + SPDLOG_DEBUG("Mediapipe graph {} idle_unload_timeout_seconds set to {}", this->getGraphName(), timeoutSeconds); + } } catch (std::logic_error& e) { SPDLOG_DEBUG("Relative path error: {}", e.what()); return StatusCode::INTERNAL_ERROR; diff --git a/src/mediapipe_internal/mediapipegraphconfig.hpp b/src/mediapipe_internal/mediapipegraphconfig.hpp index a8237b1e0f..46c9240fe8 100644 --- a/src/mediapipe_internal/mediapipegraphconfig.hpp +++ b/src/mediapipe_internal/mediapipegraphconfig.hpp @@ -75,6 +75,14 @@ class MediapipeGraphConfig { */ GraphQueueSizeValue graphQueueSize; + /** + * @brief Idle unload timeout in seconds. + * 0 (default) = feature disabled. + * When > 0, the graph's heavy resources are freed after this many seconds + * of zero in-flight requests, and lazily reloaded on the next inference. + */ + int idleUnloadTimeoutSeconds = 0; + public: MediapipeGraphConfig(const std::string& graphName = "", const std::string& basePath = "", @@ -195,6 +203,14 @@ class MediapipeGraphConfig { return std::get(*this->graphQueueSize); } + int getIdleUnloadTimeoutSeconds() const { + return this->idleUnloadTimeoutSeconds; + } + + void setIdleUnloadTimeoutSeconds(int seconds) { + this->idleUnloadTimeoutSeconds = seconds; + } + bool isReloadRequired(const MediapipeGraphConfig& rhs) const; /** diff --git a/src/mediapipe_internal/mediapipegraphdefinition.cpp b/src/mediapipe_internal/mediapipegraphdefinition.cpp index 945dbf1d70..0302dc9308 100644 --- a/src/mediapipe_internal/mediapipegraphdefinition.cpp +++ b/src/mediapipe_internal/mediapipegraphdefinition.cpp @@ -210,6 +210,42 @@ Status MediapipeGraphDefinition::validate(const ServableNameChecker& checker) { if (!validationResult.ok()) { return validationResult; } + // Phase-1 restriction: idle unload is not supported for graphs with Python nodes. + // Python nodes may hold per-request iterator state (e.g. PythonExecutorCalculator) + // that cannot be safely reconstructed after a resource-free/reload cycle. + if (mgconfig.getIdleUnloadTimeoutSeconds() > 0) { + for (int i = 0; i < this->config.node_size(); ++i) { + const std::string& calculator = this->config.node(i).calculator(); + if (calculator == "PythonExecutorCalculator" || calculator == "PyTorchCalculator") { + SPDLOG_LOGGER_ERROR(modelmanager_logger, + "Mediapipe graph {}: idle_unload_timeout_seconds is not supported for graphs " + "containing Python calculator nodes ({}). " + "Remove idle_unload_timeout_seconds or remove the Python node.", + getName(), calculator); + return StatusCode::MEDIAPIPE_GRAPH_CONFIG_FILE_INVALID; + } + } + // Phase-1 scope restriction: idle_unload_timeout_seconds is validated only for + // LLM/VLM continuous-batching graphs (graphs containing HttpLLMCalculator). + // Other node types (embeddings, rerank, STT, TTS, image-gen, plain passthrough) + // have not been validated for the idle-unload/lazy-reload cycle. + bool hasLlmCalculator = false; + for (int i = 0; i < this->config.node_size(); ++i) { + if (this->config.node(i).calculator() == "HttpLLMCalculator") { + hasLlmCalculator = true; + break; + } + } + if (!hasLlmCalculator) { + SPDLOG_LOGGER_ERROR(modelmanager_logger, + "Mediapipe graph {}: idle_unload_timeout_seconds is only supported for " + "LLM/VLM continuous-batching graphs (HttpLLMCalculator) in this release. " + "Remove idle_unload_timeout_seconds from non-LLM graph configurations.", + getName()); + return StatusCode::MEDIAPIPE_GRAPH_CONFIG_FILE_INVALID; + } + } + validationResult = resolveGraphQueueSize(); if (!validationResult.ok()) { return validationResult; @@ -257,6 +293,8 @@ Status MediapipeGraphDefinition::validate(const ServableNameChecker& checker) { lock.unlock(); notifier.passed = true; + // Graph resources are now loaded (covers both initial load and wake-up reload). + SET_IF_ENABLED(this->reporter->graphLoaded, 1); SPDLOG_LOGGER_DEBUG(modelmanager_logger, "Finished validation of mediapipe: {}", getName()); SPDLOG_LOGGER_INFO(modelmanager_logger, "Mediapipe: {} inputs: {}", getName(), getTensorMapString(inputsInfo)); SPDLOG_LOGGER_INFO(modelmanager_logger, "Mediapipe: {} outputs: {}", getName(), getTensorMapString(outputsInfo)); @@ -294,7 +332,18 @@ MediapipeGraphDefinition::MediapipeGraphDefinition(const std::string name, pythonBackend(pythonBackend), reporter(std::make_unique(metricConfig, registry, name)) { mgconfig = config; + idleUnloadTimeoutSecondsCache.store(mgconfig.getIdleUnloadTimeoutSeconds(), std::memory_order_relaxed); passKfsRequestFlag = false; + // Allocate lastActivityTimeNs initialized to now so the idle timer starts from + // when the graph was loaded, not from the steady_clock epoch (which would + // cause immediate unload of a freshly loaded graph before any request arrives). + // Held as a shared_ptr so executors can safely refresh it even after the + // definition is retired/destroyed. + lastActivityTimeNs = std::make_shared>( + std::chrono::steady_clock::now().time_since_epoch().count()); + // Allocate the active-inference counter once; shared with every executor + // created by this definition so executors can decrement it on completion. + activeInferenceCount = std::make_shared>(0); } Status MediapipeGraphDefinition::createInputsInfo() { @@ -351,6 +400,13 @@ Status MediapipeGraphDefinition::createOutputsInfo() { } Status MediapipeGraphDefinition::create(std::unique_ptr& pipeline) { + // Update idle-tracking timestamp on every inference acquisition path. + // Status endpoints / health checks do not reach this method, so idle + // tracking is automatically inference-only. + lastActivityTimeNs->store( + std::chrono::steady_clock::now().time_since_epoch().count(), + std::memory_order_relaxed); + std::unique_ptr unloadGuard; Status status = waitForLoaded(unloadGuard); if (!status.ok()) { @@ -363,12 +419,14 @@ Status MediapipeGraphDefinition::create(std::unique_ptr& pipeline = std::make_unique(getName(), std::to_string(getVersion()), this->config, this->inputTypes, this->outputTypes, this->inputNames, this->outputNames, *this->sidePacketMaps, - this->pythonBackend, this->reporter.get(), std::move(graphIdGuard)); + this->pythonBackend, this->reporter.get(), std::move(graphIdGuard), + this->activeInferenceCount, this->lastActivityTimeNs); } else { pipeline = std::make_unique(getName(), std::to_string(getVersion()), this->config, this->inputTypes, this->outputTypes, this->inputNames, this->outputNames, *this->sidePacketMaps, - this->pythonBackend, this->reporter.get()); + this->pythonBackend, this->reporter.get(), + this->activeInferenceCount, this->lastActivityTimeNs); } SPDLOG_DEBUG("Created Mediapipe graph executor: {}", getName()); return status; @@ -438,18 +496,25 @@ Status MediapipeGraphDefinition::setStreamTypes() { } Status MediapipeGraphDefinition::reload(const ServableNameChecker& checker, const MediapipeGraphConfig& config) { + // Serialize against unload()/wakeUp() on the watcher/request threads. + // Recursive: wakeUpIfUnloaded() already holds this and calls reload(). + std::lock_guard lock(lifecycleMtx); // block creating new unloadGuards this->status.handle(ReloadEvent()); while (requestsHandlesCounter > 0) { std::this_thread::sleep_for(std::chrono::microseconds(1)); } this->mgconfig = config; + // Refresh the lock-free cache while we still hold lifecycleMtx. + idleUnloadTimeoutSecondsCache.store(this->mgconfig.getIdleUnloadTimeoutSeconds(), std::memory_order_relaxed); this->queue.reset(); this->sidePacketMaps = std::make_shared(); return validate(checker); } void MediapipeGraphDefinition::retire() { + // Serialize against unload()/wakeUp()/reload() on other threads. + std::lock_guard lock(lifecycleMtx); // Block creating new unloadGuards this->status.handle(RetireEvent()); while (requestsHandlesCounter > 0) { @@ -459,6 +524,142 @@ void MediapipeGraphDefinition::retire() { this->sidePacketMaps.reset(); } +bool MediapipeGraphDefinition::isIdleUnloadEnabled() const { + // Lock-free read of the cached timeout (mgconfig is only safe under lifecycleMtx). + return idleUnloadTimeoutSecondsCache.load(std::memory_order_relaxed) > 0; +} + +bool MediapipeGraphDefinition::shouldUnloadDueToIdle() const { + // Advisory pre-filter ONLY — reads no unsynchronized per-definition state. + // It must NOT read this->status (the state-machine variant) without the lock, + // since the config thread can mutate it concurrently. unload() performs the + // authoritative state==AVAILABLE check under lifecycleMtx. + // requestsHandlesCounter, lastActivityTimeNs and idleUnloadTimeoutSecondsCache + // are all atomics, so every read here is data-race-free. We never read mgconfig + // (only safe under lifecycleMtx) on this advisory path. + int64_t timeoutSeconds = idleUnloadTimeoutSecondsCache.load(std::memory_order_relaxed); + if (timeoutSeconds <= 0) { + return false; + } + if (requestsHandlesCounter.load(std::memory_order_relaxed) != 0) { + return false; + } + // Guard: if inferences are actively executing, never report idle. + // activeInferenceCount is bumped when a MediapipeGraphExecutor is created (in + // create()) by its RAII ActiveInferenceGuard, held for the executor's lifetime + // (which spans the inference), and decremented (with a lastActivityTimeNs refresh) + // when the executor is destroyed after the inference completes or throws. + if (activeInferenceCount && activeInferenceCount->load(std::memory_order_acquire) > 0) { + return false; + } + int64_t lastActivity = lastActivityTimeNs->load(std::memory_order_relaxed); + int64_t nowNs = std::chrono::steady_clock::now().time_since_epoch().count(); + int64_t timeoutNs = timeoutSeconds * 1'000'000'000LL; + return (nowNs - lastActivity) >= timeoutNs; +} + +Status MediapipeGraphDefinition::unload() { + // Serialize against wakeUpIfUnloaded()/reload()/retire() using the SAME lock so + // all lifecycle mutations are mutually exclusive. This prevents the watcher thread + // from tearing down resources while the config thread reloads/retires, or while a + // request thread is in the middle of a wake-up reload. + std::lock_guard lock(lifecycleMtx); + + // Re-check the preconditions under the lock. Only AVAILABLE graphs with no + // in-flight requests may be unloaded. If the state changed (e.g. a wake-up + // moved us to RELOADING/AVAILABLE) or a request arrived after the watcher's + // shouldUnloadDueToIdle() check, skip this cycle WITHOUT touching resources. + if (status.getStateCode() != PipelineDefinitionStateCode::AVAILABLE) { + SPDLOG_LOGGER_DEBUG(modelmanager_logger, + "Skipping idle-unload of mediapipe graph {}: state is no longer AVAILABLE", getName()); + return StatusCode::OK; + } + if (requestsHandlesCounter.load(std::memory_order_acquire) != 0) { + SPDLOG_LOGGER_DEBUG(modelmanager_logger, + "Skipping idle-unload of mediapipe graph {}: requests in flight", getName()); + return StatusCode::OK; + } + // Guard: if inferences are actively executing, skip this unload cycle. + // This catches the case where the executor's inference is running (past create()) + // but before ActiveInferenceGuard has decremented — i.e. a long generation that + // outlives idle_unload_timeout_seconds. Re-checked under the lock so the decision + // is consistent with the in-flight inference completing concurrently. + if (activeInferenceCount && activeInferenceCount->load(std::memory_order_acquire) > 0) { + SPDLOG_LOGGER_DEBUG(modelmanager_logger, + "Skipping idle-unload of mediapipe graph {}: active inferences in progress", getName()); + return StatusCode::OK; + } + + // Transition state: AVAILABLE -> UNLOADED (blocks new unloadGuards in waitForLoaded). + this->status.handle(UnloadEvent()); + + // Defensive: only tear down resources if the transition actually happened. + // (UnloadEvent is a no-op on any non-AVAILABLE state.) + if (status.getStateCode() != PipelineDefinitionStateCode::UNLOADED) { + SPDLOG_LOGGER_WARN(modelmanager_logger, + "Idle-unload of mediapipe graph {} aborted: state did not transition to UNLOADED (now {})", + getName(), pipelineDefinitionStateCodeToString(status.getStateCode())); + return StatusCode::OK; + } + + // Once UNLOADED, no new unloadGuards can be acquired and we verified + // requestsHandlesCounter == 0 above, so there is nothing to drain. + // Release queue (pooled graphs hold GPU/CPU resources). + this->queue.reset(); + // Release heavy side-packet resources (GenAI servables, embeddings, etc.) + // Keep the sidePacketMaps object itself — clear() drops the shared_ptrs inside, + // freeing GPU VRAM. validate()/initializeNodes() will repopulate it on wake-up. + this->sidePacketMaps->clear(); + + SET_IF_ENABLED(this->reporter->graphLoaded, 0); + SPDLOG_LOGGER_INFO(modelmanager_logger, + "Mediapipe graph {} idle-unloaded (freed GPU/CPU resources after {}s idle timeout)", + getName(), mgconfig.getIdleUnloadTimeoutSeconds()); + return StatusCode::OK; +} + +Status MediapipeGraphDefinition::wakeUpIfUnloaded(const ServableNameChecker& checker) { + // Recursive: this holds lifecycleMtx and then calls reload(), which re-acquires it. + std::lock_guard lock(lifecycleMtx); + // Double-check under lock: another thread may have already completed the reload. + if (status.getStateCode() != PipelineDefinitionStateCode::UNLOADED) { + return StatusCode::OK; + } + // Re-use the existing reload path: + // handle(ReloadEvent) -> fresh sidePacketMaps -> validate() -> initializeNodes() + // The stored mgconfig holds all required configuration. + SPDLOG_LOGGER_INFO(modelmanager_logger, + "Mediapipe graph {} is UNLOADED; triggering lazy wake-up reload", getName()); + auto start = std::chrono::steady_clock::now(); + Status reloadStatus = reload(checker, this->mgconfig); + auto elapsed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start); + if (reloadStatus.ok()) { + // Only reset the idle timer on a successful wake; on failure leave it so + // the existing failure-state handling applies and we don't mask the error. + lastActivityTimeNs->store( + std::chrono::steady_clock::now().time_since_epoch().count(), + std::memory_order_relaxed); + SPDLOG_LOGGER_INFO(modelmanager_logger, + "Mediapipe graph {} wake-up completed in {}ms", + getName(), elapsed.count()); + } else { + // Wake-up reload failed (e.g. model files temporarily unavailable). reload() + // ran validate() which left the state in LOADING_PRECONDITION_FAILED. Revert + // to UNLOADED so the NEXT inference request retries the wake — making a + // transient failure self-healing rather than permanently wedging a previously + // healthy idle graph. We are still holding lifecycleMtx here. + // (If validate() somehow ended elsewhere, UnloadEvent is a no-op on states + // other than AVAILABLE/LOADING_PRECONDITION_FAILED, so this is safe.) + this->status.handle(UnloadEvent()); + SPDLOG_LOGGER_ERROR(modelmanager_logger, + "Mediapipe graph {} wake-up failed after {}ms: {}. Reverted to UNLOADED; " + "next request will retry the wake.", + getName(), elapsed.count(), reloadStatus.string()); + } + return reloadStatus; +} + bool MediapipeGraphDefinition::isReloadRequired(const MediapipeGraphConfig& config) const { if (getStateCode() == PipelineDefinitionStateCode::RETIRED) { SPDLOG_LOGGER_DEBUG(modelmanager_logger, "Reloading previously retired mediapipe definition: {}", getName()); diff --git a/src/mediapipe_internal/mediapipegraphdefinition.hpp b/src/mediapipe_internal/mediapipegraphdefinition.hpp index b49ed7e456..5d4ea8d631 100644 --- a/src/mediapipe_internal/mediapipegraphdefinition.hpp +++ b/src/mediapipe_internal/mediapipegraphdefinition.hpp @@ -14,8 +14,11 @@ // limitations under the License. //***************************************************************************** #pragma once +#include +#include #include #include +#include #include #include #include @@ -78,6 +81,28 @@ class MediapipeGraphDefinition : public SingleVersionServableDefinition { Status initializeNodes(); bool isReloadRequired(const MediapipeGraphConfig& config) const; + // Idle unload feature + Status unload(); + // wakeUpIfUnloaded: thread-safe wrapper — holds lifecycleMtx, double-checks + // the UNLOADED state, and calls wakeUp() exactly once; other concurrent callers + // wait on the mutex then return immediately since the state is no longer UNLOADED. + Status wakeUpIfUnloaded(const ServableNameChecker& checker); + bool isIdleUnloadEnabled() const; + bool shouldUnloadDueToIdle() const; + + // Test-only: backdate the last-activity timestamp by the given number of seconds + // so idle-timeout behavior can be exercised deterministically without sleeping. + void backdateLastActivityForTest(int64_t seconds) { + int64_t nowNs = std::chrono::steady_clock::now().time_since_epoch().count(); + lastActivityTimeNs->store(nowNs - seconds * 1'000'000'000LL, std::memory_order_relaxed); + } + + // Returns the shared active-inference counter so create() can hand it to the executor. + // Not exposed in tests directly — use shouldUnloadDueToIdle() to observe the effect. + const std::shared_ptr>& getActiveInferenceCount() const { + return activeInferenceCount; + } + static const std::string SCHEDULER_CLASS_NAME; protected: @@ -148,5 +173,39 @@ class MediapipeGraphDefinition : public SingleVersionServableDefinition { std::unique_ptr reporter; std::shared_ptr queue; + + // Idle unload: timestamp (nanoseconds from steady_clock epoch) of the last + // inference activity. Updated in create() on every inference acquisition and + // when an in-flight inference finishes (via ActiveInferenceGuard destructor). + // Held as shared_ptr so executors can safely write to it even after the + // definition is retired/destroyed — the atomic outlives the definition. + std::shared_ptr> lastActivityTimeNs; + + // Count of inferences currently executing on this graph. Incremented when a + // MediapipeGraphExecutor is created (in create()) via the executor's RAII + // ActiveInferenceGuard, and decremented when that executor is destroyed (after + // the caller finishes infer()/inferStream()). The count is therefore held for + // the executor's lifetime, which spans the inference. A non-zero value prevents + // shouldUnloadDueToIdle()/unload() from tearing down the definition. + // Shared_ptr so MediapipeGraphExecutor can hold a copy safely beyond the + // create() call — the executor owns the counter reference for its lifetime. + std::shared_ptr> activeInferenceCount; + + // Cached copy of mgconfig.getIdleUnloadTimeoutSeconds() so the watcher thread can + // read it lock-free. mgconfig itself is only safe to read under lifecycleMtx + // (reload() reassigns it). Updated in the constructor and in reload() (under the + // lock) whenever mgconfig is assigned. + std::atomic idleUnloadTimeoutSecondsCache{0}; + + // Serializes ALL per-definition lifecycle mutations (reload/retire/unload/wakeUp) + // so they are mutually exclusive regardless of which thread runs them or which + // outer lock (ModelManager::configMtx) is held by the caller. This is required + // because unload() runs on the watcher thread (no configMtx) while reload()/retire() + // run on the config thread (under configMtx) and they mutate the same per-definition + // state (this->status variant, this->sidePacketMaps). + // Recursive because wakeUpIfUnloaded() holds it and calls reload(), which also takes it. + // Lock ordering is one-directional: configMtx -> lifecycleMtx. Nothing here ever + // acquires configMtx, so no deadlock is possible. + mutable std::recursive_mutex lifecycleMtx; }; } // namespace ovms diff --git a/src/mediapipe_internal/mediapipegraphexecutor.cpp b/src/mediapipe_internal/mediapipegraphexecutor.cpp index 26757de401..0ae0b5f4e9 100644 --- a/src/mediapipe_internal/mediapipegraphexecutor.cpp +++ b/src/mediapipe_internal/mediapipegraphexecutor.cpp @@ -47,7 +47,9 @@ MediapipeGraphExecutor::MediapipeGraphExecutor( const GraphSidePackets& sidePacketMaps, PythonBackend* pythonBackend, MediapipeServableMetricReporter* mediapipeServableMetricReporter, - GraphIdGuard&& guard) : + GraphIdGuard&& guard, + std::shared_ptr> activeInferenceCount, + std::shared_ptr> lastActivityTimeNs) : name(name), version(version), config(config), @@ -59,7 +61,10 @@ MediapipeGraphExecutor::MediapipeGraphExecutor( pythonBackend(pythonBackend), currentStreamTimestamp(::mediapipe::Timestamp(STARTING_TIMESTAMP_VALUE)), mediapipeServableMetricReporter(mediapipeServableMetricReporter), - guard(std::move(guard)) {} + guard(std::move(guard)), + activeInferenceGuard(activeInferenceCount + ? std::optional(ActiveInferenceGuard(std::move(activeInferenceCount), std::move(lastActivityTimeNs))) + : std::nullopt) {} MediapipeGraphExecutor::MediapipeGraphExecutor( const std::string& name, const std::string& version, @@ -70,7 +75,9 @@ MediapipeGraphExecutor::MediapipeGraphExecutor( std::vector outputNames, const GraphSidePackets& sidePacketMaps, PythonBackend* pythonBackend, - MediapipeServableMetricReporter* mediapipeServableMetricReporter) : + MediapipeServableMetricReporter* mediapipeServableMetricReporter, + std::shared_ptr> activeInferenceCount, + std::shared_ptr> lastActivityTimeNs) : name(name), version(version), config(config), @@ -81,6 +88,9 @@ MediapipeGraphExecutor::MediapipeGraphExecutor( sidePacketMaps(sidePacketMaps), pythonBackend(pythonBackend), currentStreamTimestamp(::mediapipe::Timestamp(STARTING_TIMESTAMP_VALUE)), - mediapipeServableMetricReporter(mediapipeServableMetricReporter) {} + mediapipeServableMetricReporter(mediapipeServableMetricReporter), + activeInferenceGuard(activeInferenceCount + ? std::optional(ActiveInferenceGuard(std::move(activeInferenceCount), std::move(lastActivityTimeNs))) + : std::nullopt) {} } // namespace ovms diff --git a/src/mediapipe_internal/mediapipegraphexecutor.hpp b/src/mediapipe_internal/mediapipegraphexecutor.hpp index 8dae471191..3948c6d722 100644 --- a/src/mediapipe_internal/mediapipegraphexecutor.hpp +++ b/src/mediapipe_internal/mediapipegraphexecutor.hpp @@ -14,6 +14,8 @@ // limitations under the License. //***************************************************************************** #pragma once +#include +#include #include #include #include @@ -48,6 +50,47 @@ namespace ovms { class PythonBackend; class ServableMetricReporter; + +// RAII guard that tracks an in-flight inference on a MediapipeGraphDefinition. +// Increments the counter on construction; decrements it and refreshes +// lastActivityTimeNs on destruction (even if the inference threw). Both are held as +// shared_ptrs so they remain valid even if the definition is reloaded or retired +// while the inference (and thus the owning executor) is still alive. +// The lastActivityTimeNs refresh on decrement ensures that completing a long +// generation resets the idle timer — preventing an immediate re-unload on the next +// watcher cycle. +struct ActiveInferenceGuard { + std::shared_ptr> counter; + std::shared_ptr> lastActivityTimeNs; + + ActiveInferenceGuard(std::shared_ptr> counter, + std::shared_ptr> lastActivityTimeNs) : + counter(std::move(counter)), + lastActivityTimeNs(std::move(lastActivityTimeNs)) { + if (this->counter) { + this->counter->fetch_add(1, std::memory_order_acq_rel); + } + } + + ~ActiveInferenceGuard() { + if (counter) { + // Refresh activity timestamp BEFORE decrementing so the watcher sees a + // recent activity time if it samples between the refresh and the decrement. + if (lastActivityTimeNs) { + lastActivityTimeNs->store( + std::chrono::steady_clock::now().time_since_epoch().count(), + std::memory_order_relaxed); + } + counter->fetch_sub(1, std::memory_order_acq_rel); + } + } + + // Non-copyable, movable. + ActiveInferenceGuard(const ActiveInferenceGuard&) = delete; + ActiveInferenceGuard& operator=(const ActiveInferenceGuard&) = delete; + ActiveInferenceGuard(ActiveInferenceGuard&&) = default; + ActiveInferenceGuard& operator=(ActiveInferenceGuard&&) = default; +}; class MediapipeGraphExecutor; inline StatusCode mediapipeAbslToOvmsStatus(absl::StatusCode code) { @@ -140,6 +183,13 @@ class MediapipeGraphExecutor { MediapipeServableMetricReporter* mediapipeServableMetricReporter; std::optional guard; + // RAII guard tracking this executor's active inference on the parent definition. + // Held for the entire lifetime of the executor so that the in-flight-inference + // check in shouldUnloadDueToIdle() / unload() sees a non-zero count while any + // inference method (infer / inferStream) is executing. On destruction (when the + // executor goes out of scope after inference completes) the counter decrements + // and lastActivityTimeNs is refreshed. + std::optional activeInferenceGuard; public: MediapipeGraphExecutor(const std::string& name, @@ -150,7 +200,9 @@ class MediapipeGraphExecutor { std::vector inputNames, std::vector outputNames, const GraphSidePackets& sidePacketMaps, PythonBackend* pythonBackend, - MediapipeServableMetricReporter* mediapipeServableMetricReporter, GraphIdGuard&& guard); + MediapipeServableMetricReporter* mediapipeServableMetricReporter, GraphIdGuard&& guard, + std::shared_ptr> activeInferenceCount = nullptr, + std::shared_ptr> lastActivityTimeNs = nullptr); // Constructor without graph queue (old path - graph created per-request) MediapipeGraphExecutor(const std::string& name, const std::string& version, @@ -160,7 +212,9 @@ class MediapipeGraphExecutor { std::vector inputNames, std::vector outputNames, const GraphSidePackets& sidePacketMaps, PythonBackend* pythonBackend, - MediapipeServableMetricReporter* mediapipeServableMetricReporter); + MediapipeServableMetricReporter* mediapipeServableMetricReporter, + std::shared_ptr> activeInferenceCount = nullptr, + std::shared_ptr> lastActivityTimeNs = nullptr); template Status infer(const RequestType* request, ResponseType* response, ExecutionContext executionContext) { diff --git a/src/metrics/metric_config.cpp b/src/metrics/metric_config.cpp index fedfb4a806..95997a4ab0 100644 --- a/src/metrics/metric_config.cpp +++ b/src/metrics/metric_config.cpp @@ -55,6 +55,7 @@ const std::string METRIC_NAME_WAIT_FOR_INFER_REQ_TIME = "ovms_wait_for_infer_req // MediaPipe const std::string METRIC_NAME_CURRENT_GRAPHS = "ovms_current_graphs"; +const std::string METRIC_NAME_GRAPH_LOADED = "ovms_graph_loaded"; const std::string METRIC_NAME_RESPONSES = "ovms_responses"; const std::string METRIC_NAME_REQUESTS_ACCEPTED = "ovms_requests_accepted"; diff --git a/src/metrics/metric_config.hpp b/src/metrics/metric_config.hpp index bf1430f9c2..365a5cdb67 100644 --- a/src/metrics/metric_config.hpp +++ b/src/metrics/metric_config.hpp @@ -41,6 +41,7 @@ extern const std::string METRIC_NAME_WAIT_FOR_INFER_REQ_TIME; // MediaPipe extern const std::string METRIC_NAME_CURRENT_GRAPHS; +extern const std::string METRIC_NAME_GRAPH_LOADED; extern const std::string METRIC_NAME_RESPONSES; extern const std::string METRIC_NAME_REQUESTS_ACCEPTED; @@ -100,6 +101,7 @@ class MetricConfig { {METRIC_NAME_INFERENCE_TIME}, {METRIC_NAME_WAIT_FOR_INFER_REQ_TIME}, {METRIC_NAME_CURRENT_GRAPHS}, + {METRIC_NAME_GRAPH_LOADED}, {METRIC_NAME_REQUESTS_ACCEPTED}, {METRIC_NAME_REQUESTS_REJECTED}, {METRIC_NAME_GRAPH_ERROR}, diff --git a/src/model_metric_reporter.cpp b/src/model_metric_reporter.cpp index 93d40140cc..7fd6378ae0 100644 --- a/src/model_metric_reporter.cpp +++ b/src/model_metric_reporter.cpp @@ -349,6 +349,18 @@ MediapipeServableMetricReporter::MediapipeServableMetricReporter(const MetricCon SPDLOG_INFO("DISABLED {}", METRIC_NAME_CURRENT_GRAPHS); } + familyName = METRIC_NAME_GRAPH_LOADED; + if (metricConfig->isFamilyEnabled(familyName)) { + auto family = registry->createFamily(familyName, + "Whether the MediaPipe graph resources are loaded (1) or idle-unloaded (0)."); + THROW_IF_NULL(family, "cannot create family"); + this->graphLoaded = family->addMetric( + {{"name", graphName}}); + THROW_IF_NULL(this->graphLoaded, "cannot create metric"); + } else { + SPDLOG_INFO("DISABLED {}", METRIC_NAME_GRAPH_LOADED); + } + familyName = METRIC_NAME_REQUESTS_ACCEPTED; if (metricConfig->isFamilyEnabled(familyName)) { auto family = registry->createFamily(familyName, diff --git a/src/model_metric_reporter.hpp b/src/model_metric_reporter.hpp index e300334742..c784b3f935 100644 --- a/src/model_metric_reporter.hpp +++ b/src/model_metric_reporter.hpp @@ -168,6 +168,9 @@ class MediapipeServableMetricReporter : public StatusMetricReporter { public: std::unique_ptr currentGraphs; + // 1 = graph resources loaded, 0 = idle-unloaded. Always 1 after a successful + // load for graphs that never enable idle unload. + std::unique_ptr graphLoaded; // KFS std::unique_ptr requestAcceptedGrpcModelInfer; diff --git a/src/modelmanager.cpp b/src/modelmanager.cpp index 25f6d49b2f..565fe36357 100644 --- a/src/modelmanager.cpp +++ b/src/modelmanager.cpp @@ -1052,6 +1052,37 @@ Status ModelManager::configFileReloadNeeded(bool& isNeeded) { return StatusCode::OK; } +void ModelManager::unloadIdleGraphs() { +#if (MEDIAPIPE_DISABLE == 0) + // Collect names of definitions that should be unloaded; iterate under + // a brief shared lock (inside the factory getters), then call unload() + // outside it. unload() re-checks all preconditions under lifecycleMtx + // and is non-blocking (skips graphs with in-flight requests). + std::vector toUnload; + { + const auto& names = mediapipeFactory->getMediapipePipelinesNames(); + for (const auto& name : names) { + MediapipeGraphDefinition* def = mediapipeFactory->findDefinitionByName(name); + if (def && def->shouldUnloadDueToIdle()) { + toUnload.push_back(name); + } + } + } + for (const auto& name : toUnload) { + MediapipeGraphDefinition* def = mediapipeFactory->findDefinitionByName(name); + if (def) { + // Re-check under the per-definition idle mutex to avoid racing with + // a concurrent wakeUp() that may have already transitioned the state. + auto status = def->unload(); + if (!status.ok()) { + SPDLOG_LOGGER_WARN(modelmanager_logger, + "Failed to idle-unload mediapipe graph {}: {}", name, status.string()); + } + } + } +#endif +} + void ModelManager::watcher(std::future exitSignal, bool watchConfigFile) { SPDLOG_LOGGER_INFO(modelmanager_logger, "Started model manager thread"); while (exitSignal.wait_for(std::chrono::milliseconds(this->watcherIntervalMillisec)) == std::future_status::timeout) { @@ -1066,6 +1097,12 @@ void ModelManager::watcher(std::future exitSignal, bool watchConfigFile) { } updateConfigurationWithoutConfigFile(); loadingLock.unlock(); + // Idle-unload sweep: free resources of graphs idle past their timeout. + // Done AFTER releasing configMtx — unload() only needs the factory's + // definitions lock and the per-definition lifecycleMtx, and is + // non-blocking (it skips graphs with in-flight requests rather than + // draining). This keeps configMtx hold time minimal. + unloadIdleGraphs(); SPDLOG_LOGGER_TRACE(modelmanager_logger, "Models configuration and filesystem check cycle end"); } SPDLOG_LOGGER_INFO(modelmanager_logger, "Stopped model manager thread"); @@ -1595,6 +1632,51 @@ const std::vector ModelManager::getNamesOfAvailableModels() const { Status ModelManager::createPipeline(std::unique_ptr& graph, const std::string& name) { #if (MEDIAPIPE_DISABLE == 0) + // Lazy wake-up with bounded retry. A request can observe state==AVAILABLE here, + // then have the watcher flip it to UNLOADED before create()->waitForLoaded() runs, + // which returns MEDIAPIPE_DEFINITION_NOT_LOADED_YET. We retry a bounded number of + // times: wake if UNLOADED, then create(); if create() fails specifically because + // the graph is not-loaded-yet AND it is currently UNLOADED, wake and retry. + // wakeUpIfUnloaded() serialises the transition internally so exactly one of N + // concurrent callers triggers the actual reload; the rest wait and then proceed. + constexpr int kMaxWakeAttempts = 3; + for (int attempt = 0; attempt < kMaxWakeAttempts; ++attempt) { + // Re-fetch the definition each iteration: a concurrent config reload may + // retire+erase it mid-loop, so a cached pointer could dangle. Bail cleanly + // if it is gone. + MediapipeGraphDefinition* def = this->mediapipeFactory->findDefinitionByName(name); + if (def == nullptr) { + SPDLOG_LOGGER_DEBUG(modelmanager_logger, + "Mediapipe graph {} no longer exists during wake-up loop", name); + return StatusCode::MEDIAPIPE_DEFINITION_NAME_MISSING; + } + if (def->getStateCode() == PipelineDefinitionStateCode::UNLOADED) { + auto wakeStatus = def->wakeUpIfUnloaded(*this); + if (!wakeStatus.ok()) { + SPDLOG_LOGGER_ERROR(modelmanager_logger, + "Mediapipe graph {} wake-up failed: {}", name, wakeStatus.string()); + return wakeStatus; + } + } + auto createStatus = this->mediapipeFactory->create(graph, name); + if (createStatus.ok()) { + return createStatus; + } + // Only retry the specific race: graph got idle-unloaded between our check and + // waitForLoaded(). Any other failure (genuine load failure, missing graph, etc.) + // is returned immediately. Re-fetch to avoid using a possibly-stale pointer. + MediapipeGraphDefinition* defAfter = this->mediapipeFactory->findDefinitionByName(name); + bool racedWithUnload = defAfter && + (createStatus == StatusCode::MEDIAPIPE_DEFINITION_NOT_LOADED_YET) && + (defAfter->getStateCode() == PipelineDefinitionStateCode::UNLOADED); + if (!racedWithUnload) { + return createStatus; + } + SPDLOG_LOGGER_DEBUG(modelmanager_logger, + "Mediapipe graph {} was idle-unloaded during request; retrying wake-up (attempt {}/{})", + name, attempt + 1, kMaxWakeAttempts); + } + // Exhausted retries — make one final attempt and return whatever it yields. return this->mediapipeFactory->create(graph, name); #else SPDLOG_ERROR("Mediapipe support was disabled during build process..."); diff --git a/src/modelmanager.hpp b/src/modelmanager.hpp index 22b80b5251..be4e9dcf06 100644 --- a/src/modelmanager.hpp +++ b/src/modelmanager.hpp @@ -128,6 +128,12 @@ class ModelManager : public ServableNameChecker, public MetricProvider, public M */ void watcher(std::future exitSignal, bool watchConfigFile); + /** + * @brief Sweep mediapipe graph definitions and unload any that have been + * idle past their configured idle_unload_timeout_seconds. + */ + void unloadIdleGraphs(); + /** * @brief Cleaner thread for resources cleanup */ diff --git a/src/schema.cpp b/src/schema.cpp index 7d76980dca..0c10a0da93 100644 --- a/src/schema.cpp +++ b/src/schema.cpp @@ -357,6 +357,10 @@ const std::string MODELS_CONFIG_SCHEMA = R"({ }, "subconfig": { "type": "string" + }, + "idle_unload_timeout_seconds": { + "type": "integer", + "minimum": 0 } }, "additionalProperties": false diff --git a/src/test/llm/llmnode_test.cpp b/src/test/llm/llmnode_test.cpp index e13cf29919..850215c88d 100644 --- a/src/test/llm/llmnode_test.cpp +++ b/src/test/llm/llmnode_test.cpp @@ -5178,3 +5178,641 @@ TEST(BaseGenerationConfigBuilderTest, SeedPreservedWhenExplicitlySet) { builder.parseConfigFromRequest(request); EXPECT_EQ(builder.getConfig().rng_seed, 42u); } + +// --------------------------------------------------------------------------- +// Idle unload feature: LLM graph lifecycle (issue #4141) +// These tests require the opt-125m model fixture. +// --------------------------------------------------------------------------- + +class LLMIdleUnloadTest : public ::testing::Test { +protected: + // Builds a minimal continuous-batching LLM graph pbtxt pointing at opt-125m. + static std::string buildOptGraphPbtxt() { + std::string modelsPath = getGenericFullPathForSrcTest("/ovms/src/test/llm_testing/facebook/opt-125m"); + std::string testPbtxt = R"( + input_stream: "HTTP_REQUEST_PAYLOAD:input" + output_stream: "HTTP_RESPONSE_PAYLOAD:output" + + node: { + name: "llmNode" + calculator: "HttpLLMCalculator" + input_stream: "LOOPBACK:loopback" + input_stream: "HTTP_REQUEST_PAYLOAD:input" + input_side_packet: "LLM_NODE_RESOURCES:llm" + output_stream: "LOOPBACK:loopback" + output_stream: "HTTP_RESPONSE_PAYLOAD:output" + input_stream_info: { + tag_index: 'LOOPBACK:0', + back_edge: true + } + node_options: { + [type.googleapis.com / mediapipe.LLMCalculatorOptions]: { + models_path: ")" + + modelsPath + R"(" + cache_size: 1 + } + } + input_stream_handler { + input_stream_handler: "SyncSetInputStreamHandler", + options { + [mediapipe.SyncSetInputStreamHandlerOptions.ext] { + sync_set { + tag_index: "LOOPBACK:0" + } + } + } + } + } + )"; + adjustConfigForTargetPlatform(testPbtxt); + return testPbtxt; + } +}; + +// Unload after idle: build LLM graph with small timeout, simulate idle, unload, assert freed. +TEST_F(LLMIdleUnloadTest, UnloadAfterIdleFreesResources) { + ConstructorEnabledModelManager manager; + std::string testPbtxt = buildOptGraphPbtxt(); + + ovms::MediapipeGraphConfig mgc{"mediaIdle", "", ""}; + mgc.setIdleUnloadTimeoutSeconds(10); + DummyMediapipeGraphDefinition def("mediaIdle", mgc, testPbtxt, nullptr); + def.inputConfig = testPbtxt; + ASSERT_EQ(def.validate(manager), StatusCode::OK); + ASSERT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::AVAILABLE); + ASSERT_NE(def.getGenAiServable("llmNode"), nullptr); + ASSERT_TRUE(def.isIdleUnloadEnabled()); + + // Not yet idle -> should not unload. + ASSERT_FALSE(def.shouldUnloadDueToIdle()); + + // Backdate activity well past the timeout. + def.backdateLastActivityForTest(60); + ASSERT_TRUE(def.shouldUnloadDueToIdle()); + + ASSERT_EQ(def.unload(), StatusCode::OK); + ASSERT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::UNLOADED); + // Resources freed: the GenAi servable map should be empty. + ASSERT_TRUE(def.getGenAiServableMap().empty()); + ASSERT_FALSE(def.isAvailable()); +} + +// Lazy reload: after unload, wakeUpIfUnloaded brings it back to AVAILABLE with resources. +TEST_F(LLMIdleUnloadTest, WakeUpReloadsResources) { + ConstructorEnabledModelManager manager; + std::string testPbtxt = buildOptGraphPbtxt(); + + ovms::MediapipeGraphConfig mgc{"mediaIdle", "", ""}; + mgc.setIdleUnloadTimeoutSeconds(10); + DummyMediapipeGraphDefinition def("mediaIdle", mgc, testPbtxt, nullptr); + def.inputConfig = testPbtxt; + ASSERT_EQ(def.validate(manager), StatusCode::OK); + + def.backdateLastActivityForTest(60); + ASSERT_EQ(def.unload(), StatusCode::OK); + ASSERT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::UNLOADED); + ASSERT_TRUE(def.getGenAiServableMap().empty()); + + // Wake up. + ASSERT_EQ(def.wakeUpIfUnloaded(manager), StatusCode::OK); + ASSERT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::AVAILABLE); + ASSERT_TRUE(def.isAvailable()); + ASSERT_NE(def.getGenAiServable("llmNode"), nullptr); + + // Wake-up while already AVAILABLE is a no-op success. + ASSERT_EQ(def.wakeUpIfUnloaded(manager), StatusCode::OK); + ASSERT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::AVAILABLE); +} + +// Idle timer reset: acquiring the graph (create) refreshes lastActivity. +TEST_F(LLMIdleUnloadTest, CreateResetsIdleTimer) { + ConstructorEnabledModelManager manager; + std::string testPbtxt = buildOptGraphPbtxt(); + + ovms::MediapipeGraphConfig mgc{"mediaIdle", "", ""}; + mgc.setIdleUnloadTimeoutSeconds(10); + DummyMediapipeGraphDefinition def("mediaIdle", mgc, testPbtxt, nullptr); + def.inputConfig = testPbtxt; + ASSERT_EQ(def.validate(manager), StatusCode::OK); + + // Make it look idle. + def.backdateLastActivityForTest(60); + ASSERT_TRUE(def.shouldUnloadDueToIdle()); + + // Acquiring the graph updates lastActivity, so it is no longer idle. + std::unique_ptr executor; + ASSERT_EQ(def.create(executor), StatusCode::OK); + ASSERT_NE(executor, nullptr); + ASSERT_FALSE(def.shouldUnloadDueToIdle()); +} + +// Disabled by default: timeout 0 -> never idle-unloads. +TEST_F(LLMIdleUnloadTest, DisabledByDefaultNeverUnloads) { + ConstructorEnabledModelManager manager; + std::string testPbtxt = buildOptGraphPbtxt(); + + ovms::MediapipeGraphConfig mgc{"mediaIdle", "", ""}; + // idle_unload_timeout_seconds not set -> defaults to 0 (disabled) + DummyMediapipeGraphDefinition def("mediaIdle", mgc, testPbtxt, nullptr); + def.inputConfig = testPbtxt; + ASSERT_EQ(def.validate(manager), StatusCode::OK); + + ASSERT_FALSE(def.isIdleUnloadEnabled()); + def.backdateLastActivityForTest(100000); + ASSERT_FALSE(def.shouldUnloadDueToIdle()); +} + +// Python-node guard: a graph with a Python node + idle timeout > 0 fails validation. +#if (PYTHON_DISABLE == 0) +TEST_F(LLMIdleUnloadTest, PythonNodeWithIdleUnloadRejected) { + ConstructorEnabledModelManager manager; + std::string testPbtxt = R"( + input_stream: "HTTP_REQUEST_PAYLOAD:input" + output_stream: "HTTP_RESPONSE_PAYLOAD:output" + node: { + name: "pythonNode" + calculator: "PythonExecutorCalculator" + input_side_packet: "PYTHON_NODE_RESOURCES:py" + input_stream: "HTTP_REQUEST_PAYLOAD:input" + output_stream: "HTTP_RESPONSE_PAYLOAD:output" + } + )"; + adjustConfigForTargetPlatform(testPbtxt); + + ovms::MediapipeGraphConfig mgc{"mediaPy", "", ""}; + mgc.setIdleUnloadTimeoutSeconds(10); + DummyMediapipeGraphDefinition def("mediaPy", mgc, testPbtxt, nullptr); + def.inputConfig = testPbtxt; + auto status = def.validate(manager); + ASSERT_EQ(status, StatusCode::MEDIAPIPE_GRAPH_CONFIG_FILE_INVALID) << status.string(); +} +#endif + +// Exactly-one-reload under concurrency: N threads call wakeUpIfUnloaded on an UNLOADED def. +// Best-effort: asserts all end AVAILABLE and the graph is loaded exactly once afterwards. +// Note: this verifies the end-state invariant (single AVAILABLE graph, resources present); +// the per-definition mutex guarantees a single reload, but counting reloads deterministically +// from the test would require instrumentation hooks not present, so we assert the observable +// post-condition instead. +TEST_F(LLMIdleUnloadTest, ConcurrentWakeUpEndsAvailable) { + ConstructorEnabledModelManager manager; + std::string testPbtxt = buildOptGraphPbtxt(); + + ovms::MediapipeGraphConfig mgc{"mediaIdle", "", ""}; + mgc.setIdleUnloadTimeoutSeconds(10); + DummyMediapipeGraphDefinition def("mediaIdle", mgc, testPbtxt, nullptr); + def.inputConfig = testPbtxt; + ASSERT_EQ(def.validate(manager), StatusCode::OK); + def.backdateLastActivityForTest(60); + ASSERT_EQ(def.unload(), StatusCode::OK); + ASSERT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::UNLOADED); + + constexpr int kThreads = 8; + std::vector threads; + std::vector results(kThreads, StatusCode::UNKNOWN_ERROR); + for (int i = 0; i < kThreads; ++i) { + threads.emplace_back([&def, &manager, &results, i]() { + results[i] = def.wakeUpIfUnloaded(manager); + }); + } + for (auto& t : threads) { + t.join(); + } + for (int i = 0; i < kThreads; ++i) { + ASSERT_EQ(results[i], StatusCode::OK) << "thread " << i << " status: " << results[i].string(); + } + ASSERT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::AVAILABLE); + ASSERT_NE(def.getGenAiServable("llmNode"), nullptr); +} + +// Best-effort stress: interleave unload() (watcher role) and wakeUpIfUnloaded() +// (request role) repeatedly and assert the graph never ends in a torn state. +// lifecycleMtx makes unload and wake mutually exclusive, so every observed +// settled state must be internally consistent: AVAILABLE with resources, or +// cleanly UNLOADED (empty maps). Determinism is limited by thread scheduling; +// this exercises the FIX 1/FIX 2 serialization rather than asserting an exact +// sequence. +TEST_F(LLMIdleUnloadTest, ConcurrentUnloadWakeNeverTearsState) { + ConstructorEnabledModelManager manager; + std::string testPbtxt = buildOptGraphPbtxt(); + + ovms::MediapipeGraphConfig mgc{"mediaIdle", "", ""}; + mgc.setIdleUnloadTimeoutSeconds(10); + DummyMediapipeGraphDefinition def("mediaIdle", mgc, testPbtxt, nullptr); + def.inputConfig = testPbtxt; + ASSERT_EQ(def.validate(manager), StatusCode::OK); + + std::atomic stop{false}; + std::atomic errors{0}; + + // Unloader thread: keeps backdating + trying to unload. + std::thread unloader([&]() { + while (!stop.load()) { + def.backdateLastActivityForTest(60); + auto s = def.unload(); + if (!s.ok()) + errors.fetch_add(1); + std::this_thread::yield(); + } + }); + + // Several waker threads: keep waking it back up. + constexpr int kWakers = 4; + std::vector wakers; + for (int i = 0; i < kWakers; ++i) { + wakers.emplace_back([&]() { + while (!stop.load()) { + auto s = def.wakeUpIfUnloaded(manager); + if (!s.ok()) + errors.fetch_add(1); + std::this_thread::yield(); + } + }); + } + + // Run for a short bounded period. + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + stop.store(true); + unloader.join(); + for (auto& t : wakers) { + t.join(); + } + + ASSERT_EQ(errors.load(), 0); + + // Quiesce: ensure it ends AVAILABLE with resources (no torn RELOADING/null state). + ASSERT_EQ(def.wakeUpIfUnloaded(manager), StatusCode::OK); + auto finalState = def.getStateCode(); + // A settled state must be either AVAILABLE (with resources) or UNLOADED (empty). + if (finalState == ovms::PipelineDefinitionStateCode::AVAILABLE) { + ASSERT_NE(def.getGenAiServable("llmNode"), nullptr); + } else { + ASSERT_EQ(finalState, ovms::PipelineDefinitionStateCode::UNLOADED); + ASSERT_TRUE(def.getGenAiServableMap().empty()); + } +} + +// Best-effort: exercise unload() (watcher role) concurrently with reload() and +// retire() (config role) on the same definition. Verifies the lifecycleMtx +// serialization (NEW-1 fix): no crash, and a consistent final state. +// NOTE: data races are not deterministically catchable without TSAN (unavailable +// in this environment), so this is a smoke/stress test, not a proof of absence. +TEST_F(LLMIdleUnloadTest, ConcurrentUnloadReloadRetireNoCrash) { + ConstructorEnabledModelManager manager; + std::string testPbtxt = buildOptGraphPbtxt(); + + ovms::MediapipeGraphConfig mgc{"mediaIdle", "", ""}; + mgc.setIdleUnloadTimeoutSeconds(10); + DummyMediapipeGraphDefinition def("mediaIdle", mgc, testPbtxt, nullptr); + def.inputConfig = testPbtxt; + ASSERT_EQ(def.validate(manager), StatusCode::OK); + + std::atomic stop{false}; + std::atomic retired{false}; + + // Watcher-role thread: keep trying to idle-unload. + std::thread unloader([&]() { + while (!stop.load()) { + def.backdateLastActivityForTest(60); + (void)def.unload(); + std::this_thread::yield(); + } + }); + + // Config-role thread: keep reloading (re-bring it up after unload). + std::thread reloader([&]() { + while (!stop.load()) { + (void)def.reload(manager, def.getMediapipeGraphConfig()); + std::this_thread::yield(); + } + }); + + std::this_thread::sleep_for(std::chrono::milliseconds(400)); + stop.store(true); + unloader.join(); + reloader.join(); + + // Now retire concurrently is not needed for crash-safety beyond above, but + // exercise retire() once after the storm to confirm it serializes cleanly. + def.retire(); + retired.store(true); + ASSERT_TRUE(retired.load()); + ASSERT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::RETIRED); +} + +// ───────────────────────────────────────────────────────────────────────────── +// TASK 1 tests: ActiveInferenceGuard — in-flight inference prevents idle unload +// ───────────────────────────────────────────────────────────────────────────── + +// Model-free unit test: directly exercise the activeInferenceCount atomic that +// shouldUnloadDueToIdle() and unload() consult. No LLM model required. +TEST(MediapipeIdleUnloadGuard, ActiveInferenceCountBlocksShouldUnload) { + // Build a minimal graph definition with idle unload enabled. + // Use buildOptGraphPbtxt() indirectly via LLMIdleUnloadTest helpers is not + // available here — we just need a definition with a non-zero timeout and + // a synthetic counter. We can use the shared_ptr that getActiveInferenceCount() + // returns directly, bypassing the executor machinery. + + // A standalone atomic acts as the counter. + auto counter = std::make_shared>(0); + auto lastActivity = std::make_shared>( + std::chrono::steady_clock::now().time_since_epoch().count() - 60LL * 1'000'000'000LL); + + // Simulate increment (inference start). + { + ovms::ActiveInferenceGuard guard(counter, lastActivity); + EXPECT_EQ(counter->load(), 1); + } + // After destruction, counter back to 0 and lastActivity refreshed. + EXPECT_EQ(counter->load(), 0); + int64_t nowNs = std::chrono::steady_clock::now().time_since_epoch().count(); + // lastActivity should be within 2 seconds of now (generous for slow machines). + EXPECT_GT(lastActivity->load(), nowNs - 2LL * 1'000'000'000LL); +} + +TEST(MediapipeIdleUnloadGuard, ActiveInferenceCountExceptionSafe) { + auto counter = std::make_shared>(0); + auto lastActivity = std::make_shared>(0); + + try { + ovms::ActiveInferenceGuard guard(counter, lastActivity); + EXPECT_EQ(counter->load(), 1); + throw std::runtime_error("simulated inference error"); + } catch (...) { + } + // Must be 0 even after exception path. + EXPECT_EQ(counter->load(), 0); +} + +TEST(MediapipeIdleUnloadGuard, MultipleGuardsNested) { + auto counter = std::make_shared>(0); + auto lastActivity = std::make_shared>(0); + { + ovms::ActiveInferenceGuard g1(counter, lastActivity); + EXPECT_EQ(counter->load(), 1); + { + ovms::ActiveInferenceGuard g2(counter, lastActivity); + EXPECT_EQ(counter->load(), 2); + } + EXPECT_EQ(counter->load(), 1); + } + EXPECT_EQ(counter->load(), 0); +} + +// Integration test: create() on a real definition increments the counter; +// when the executor is destroyed the counter returns to 0. +// Requires the LLM model (opt-125m). Guard under GTEST_SKIP for CI environments. +TEST_F(LLMIdleUnloadTest, ActiveInferenceGuardIntegration) { + ConstructorEnabledModelManager manager; + std::string testPbtxt = buildOptGraphPbtxt(); + const std::string testModelsPath = getGenericFullPathForSrcTest("/ovms/src/test/llm_testing/facebook/opt-125m"); + if (!std::filesystem::exists(testModelsPath)) { + GTEST_SKIP() << "opt-125m model not present; skipping integration guard test"; + } + + ovms::MediapipeGraphConfig mgc{"mediaGuard", "", ""}; + mgc.setIdleUnloadTimeoutSeconds(10); + DummyMediapipeGraphDefinition def("mediaGuard", mgc, testPbtxt, nullptr); + def.inputConfig = testPbtxt; + ASSERT_EQ(def.validate(manager), StatusCode::OK); + + auto counterPtr = def.getActiveInferenceCount(); + ASSERT_NE(counterPtr, nullptr); + EXPECT_EQ(counterPtr->load(), 0); + + { + std::unique_ptr executor; + ASSERT_EQ(def.create(executor), StatusCode::OK); + ASSERT_NE(executor, nullptr); + // Counter incremented: executor is alive. + EXPECT_EQ(counterPtr->load(), 1); + + // Backdate activity to look idle — should NOT unload because count > 0. + def.backdateLastActivityForTest(60); + EXPECT_FALSE(def.shouldUnloadDueToIdle()); + EXPECT_EQ(def.unload(), StatusCode::OK); + // unload() should have been skipped (counter > 0) so we stay AVAILABLE. + EXPECT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::AVAILABLE); + } // executor destroyed here -> counter decremented back to 0 + + EXPECT_EQ(counterPtr->load(), 0); + // Completing the inference refreshed lastActivityTimeNs (the ActiveInferenceGuard + // destructor resets the idle timer), so the graph is NOT idle immediately after — + // this is the key behavior preventing an immediate re-unload right after a long + // generation finishes. + EXPECT_FALSE(def.shouldUnloadDueToIdle()); + // After the idle period elapses again (post-inference), it should unload. + def.backdateLastActivityForTest(60); + EXPECT_TRUE(def.shouldUnloadDueToIdle()); + EXPECT_EQ(def.unload(), StatusCode::OK); + EXPECT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::UNLOADED); +} + +// ───────────────────────────────────────────────────────────────────────────── +// Wake-failure recovery: a failed wake-up reload must leave the graph UNLOADED +// (retryable), NOT LOADING_PRECONDITION_FAILED (wedged). Then once the underlying +// problem is resolved, the next wake self-heals to AVAILABLE. +// ───────────────────────────────────────────────────────────────────────────── + +// Returns an LLM graph pbtxt whose models_path points at a nonexistent directory, +// so validate() fails (LLM_NODE_DIRECTORY_DOES_NOT_EXIST) — but it still contains +// HttpLLMCalculator, so the idle-unload scope check passes and we exercise the +// wake/reload/validate failure path. +static std::string buildBrokenOptGraphPbtxt() { + std::string testPbtxt = R"( + input_stream: "HTTP_REQUEST_PAYLOAD:input" + output_stream: "HTTP_RESPONSE_PAYLOAD:output" + + node: { + name: "llmNode" + calculator: "HttpLLMCalculator" + input_stream: "LOOPBACK:loopback" + input_stream: "HTTP_REQUEST_PAYLOAD:input" + input_side_packet: "LLM_NODE_RESOURCES:llm" + output_stream: "LOOPBACK:loopback" + output_stream: "HTTP_RESPONSE_PAYLOAD:output" + input_stream_info: { + tag_index: 'LOOPBACK:0', + back_edge: true + } + node_options: { + [type.googleapis.com / mediapipe.LLMCalculatorOptions]: { + models_path: "/this/path/definitely/does/not/exist/opt-125m" + cache_size: 1 + } + } + input_stream_handler { + input_stream_handler: "SyncSetInputStreamHandler", + options { + [mediapipe.SyncSetInputStreamHandlerOptions.ext] { + sync_set { + tag_index: "LOOPBACK:0" + } + } + } + } + } + )"; + adjustConfigForTargetPlatform(testPbtxt); + return testPbtxt; +} + +TEST_F(LLMIdleUnloadTest, FailedWakeLeavesGraphUnloadedAndRetryable) { + ConstructorEnabledModelManager manager; + std::string goodPbtxt = buildOptGraphPbtxt(); + std::string brokenPbtxt = buildBrokenOptGraphPbtxt(); + + ovms::MediapipeGraphConfig mgc{"mediaWakeFail", "", ""}; + mgc.setIdleUnloadTimeoutSeconds(10); + DummyMediapipeGraphDefinition def("mediaWakeFail", mgc, goodPbtxt, nullptr); + def.inputConfig = goodPbtxt; + ASSERT_EQ(def.validate(manager), StatusCode::OK); + ASSERT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::AVAILABLE); + + // Idle-unload the healthy graph. + def.backdateLastActivityForTest(60); + ASSERT_EQ(def.unload(), StatusCode::OK); + ASSERT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::UNLOADED); + + // Simulate the model becoming temporarily unavailable: swap in a broken config + // so the wake-up reload's validate() fails. + def.inputConfig = brokenPbtxt; + auto failStatus = def.wakeUpIfUnloaded(manager); + EXPECT_FALSE(failStatus.ok()) << "expected wake-up to fail with broken model"; + // CRITICAL: the graph must be retryable, i.e. back in UNLOADED — not wedged in + // LOADING_PRECONDITION_FAILED. + EXPECT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::UNLOADED); + + // A second attempt while still broken also fails but stays retryable. + auto failStatus2 = def.wakeUpIfUnloaded(manager); + EXPECT_FALSE(failStatus2.ok()); + EXPECT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::UNLOADED); + + // Restore the model: the next wake self-heals to AVAILABLE. + def.inputConfig = goodPbtxt; + auto okStatus = def.wakeUpIfUnloaded(manager); + EXPECT_EQ(okStatus, StatusCode::OK) << okStatus.string(); + EXPECT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::AVAILABLE); + EXPECT_NE(def.getGenAiServable("llmNode"), nullptr); +} + +// ───────────────────────────────────────────────────────────────────────────── +// TASK 2 tests: non-LLM scope restriction — idle_unload_timeout on non-LLM graphs +// ───────────────────────────────────────────────────────────────────────────── + +// A plain passthrough graph (no LLM calculator) with idle_unload_timeout > 0 must fail +// validation with MEDIAPIPE_GRAPH_CONFIG_FILE_INVALID. +TEST_F(LLMIdleUnloadTest, NonLlmGraphWithIdleUnloadRejected) { + ConstructorEnabledModelManager manager; + // Minimal graph with a generic passthrough-style calculator (not HttpLLMCalculator). + std::string testPbtxt = R"( + input_stream: "INPUT:input" + output_stream: "OUTPUT:output" + node: { + name: "passthroughNode" + calculator: "PassThroughCalculator" + input_stream: "INPUT:input" + output_stream: "OUTPUT:output" + } + )"; + + ovms::MediapipeGraphConfig mgc{"mediaPassthrough", "", ""}; + mgc.setIdleUnloadTimeoutSeconds(10); + DummyMediapipeGraphDefinition def("mediaPassthrough", mgc, testPbtxt, nullptr); + def.inputConfig = testPbtxt; + auto status = def.validate(manager); + EXPECT_EQ(status, StatusCode::MEDIAPIPE_GRAPH_CONFIG_FILE_INVALID) + << "Expected non-LLM graph with idle_unload_timeout_seconds to be rejected, got: " << status.string(); +} + +// An embeddings-style graph (no HttpLLMCalculator) with idle_unload_timeout > 0 +// must also be rejected. +TEST_F(LLMIdleUnloadTest, EmbeddingsGraphWithIdleUnloadRejected) { + ConstructorEnabledModelManager manager; + std::string testPbtxt = R"( + input_stream: "HTTP_REQUEST_PAYLOAD:input" + output_stream: "HTTP_RESPONSE_PAYLOAD:output" + node: { + name: "embeddingNode" + calculator: "HttpOpenVINOEmbeddingsCalculator" + input_stream: "HTTP_REQUEST_PAYLOAD:input" + output_stream: "HTTP_RESPONSE_PAYLOAD:output" + input_side_packet: "LLM_NODE_RESOURCES:llm" + } + )"; + + ovms::MediapipeGraphConfig mgc{"mediaEmbed", "", ""}; + mgc.setIdleUnloadTimeoutSeconds(5); + DummyMediapipeGraphDefinition def("mediaEmbed", mgc, testPbtxt, nullptr); + def.inputConfig = testPbtxt; + auto status = def.validate(manager); + EXPECT_EQ(status, StatusCode::MEDIAPIPE_GRAPH_CONFIG_FILE_INVALID) + << "Expected embeddings graph with idle_unload_timeout_seconds to be rejected, got: " << status.string(); +} + +// A valid LLM graph with idle_unload_timeout_seconds > 0 must still pass the calculator check. +// (The full validate() may fail if the model files are absent — that's fine; we test the +// calculator-guard path specifically by checking that the rejection is NOT due to the +// non-LLM guard. In CI without the model, validate() may fail with a different status, +// but it must NOT be MEDIAPIPE_GRAPH_CONFIG_FILE_INVALID from our new guard.) +TEST_F(LLMIdleUnloadTest, LlmGraphPassesCalculatorGuard) { + ConstructorEnabledModelManager manager; + // Intentionally malformed LLM graph (no model path) — will fail downstream validation + // but must NOT fail with the non-LLM calculator guard. + std::string testPbtxt = R"( + input_stream: "HTTP_REQUEST_PAYLOAD:input" + output_stream: "HTTP_RESPONSE_PAYLOAD:output" + node: { + name: "llmNode" + calculator: "HttpLLMCalculator" + input_stream: "LOOPBACK:loopback" + input_stream: "HTTP_REQUEST_PAYLOAD:input" + input_side_packet: "LLM_NODE_RESOURCES:llm" + output_stream: "LOOPBACK:loopback" + output_stream: "HTTP_RESPONSE_PAYLOAD:output" + input_stream_info: { + tag_index: 'LOOPBACK:0', + back_edge: true + } + node_options: { + [type.googleapis.com / mediapipe.LLMCalculatorOptions]: { + models_path: "/nonexistent/path/to/model" + cache_size: 1 + } + } + input_stream_handler { + input_stream_handler: "SyncSetInputStreamHandler", + options { + [mediapipe.SyncSetInputStreamHandlerOptions.ext] { + sync_set { + tag_index: "LOOPBACK:0" + } + } + } + } + } + )"; + + ovms::MediapipeGraphConfig mgc{"mediaLlmGuard", "", ""}; + mgc.setIdleUnloadTimeoutSeconds(10); + DummyMediapipeGraphDefinition def("mediaLlmGuard", mgc, testPbtxt, nullptr); + def.inputConfig = testPbtxt; + auto status = def.validate(manager); + // The non-LLM calculator guard must NOT have fired (the status must not be the + // "idle_unload only supported for LLM" rejection). Any other failure (e.g. model + // path not found) is acceptable. + // We cannot distinguish the exact downstream error code without running the model, + // so we simply assert it's not the guard-specific code or (if it happens to pass + // on machines with the model) OK. + bool isNonLlmGuardRejection = (status == StatusCode::MEDIAPIPE_GRAPH_CONFIG_FILE_INVALID); + if (isNonLlmGuardRejection) { + // If the code is MEDIAPIPE_GRAPH_CONFIG_FILE_INVALID, it must be from a different + // guard (e.g. Python node or queue config), not our new non-LLM guard — verify by + // checking the error message doesn't contain our sentinel phrase. + // Since we can't inspect the message here, we check that HttpLLMCalculator is present + // (it is) and assert that the guard check passed (no rejection for non-LLM). + FAIL() << "LLM graph was unexpectedly rejected by idle_unload scope guard; status: " + << status.string(); + } + // Any other status (OK, model-not-found, etc.) is acceptable. +} diff --git a/src/test/mediapipeflow_test.cpp b/src/test/mediapipeflow_test.cpp index 459a0d5115..8c40b57b8c 100644 --- a/src/test/mediapipeflow_test.cpp +++ b/src/test/mediapipeflow_test.cpp @@ -55,6 +55,7 @@ #include "../model_service.hpp" #include "../ovms_exit_codes.hpp" #include "../precision.hpp" +#include "../servable_definition_unload_guard.hpp" #include "../servablemanagermodule.hpp" #include "../server.hpp" #include "../shape.hpp" @@ -4410,3 +4411,99 @@ TEST_F(UnaryQueueReinitTest, GraphIsReinitializedAfterCalculatorError) { ASSERT_TRUE(status.ok()); } } + +// --------------------------------------------------------------------------- +// Idle unload feature: unload() guard correctness (issue #4141, model-free) +// Verifies FIX 1: unload() must NOT tear down resources unless the state was +// actually AVAILABLE and the UnloadEvent transition really happened. +// --------------------------------------------------------------------------- + +// A trivial pbtxt is enough; these tests never reach validate(), they drive the +// state machine directly to exercise unload()'s preconditions. +static const std::string kIdleUnloadDummyPbtxt = R"( + input_stream: "in" + output_stream: "out" +)"; + +TEST(MediapipeIdleUnloadGuard, UnloadIsNoOpWhenStateBegin) { + ovms::MediapipeGraphConfig mgc{"idleGuard", "", ""}; + mgc.setIdleUnloadTimeoutSeconds(10); + DummyMediapipeGraphDefinition def("idleGuard", mgc, kIdleUnloadDummyPbtxt, nullptr); + // Fresh definition is in BEGIN (validate never called). + ASSERT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::BEGIN); + def.insertSidePacketMarkerForTest("marker"); + const void* mapsBefore = def.sidePacketMapsPtrForTest(); + + ASSERT_EQ(def.unload(), ovms::StatusCode::OK); + + // State unchanged and resources untouched. + ASSERT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::BEGIN); + ASSERT_TRUE(def.hasSidePacketMarkerForTest("marker")); + ASSERT_EQ(def.sidePacketMapsPtrForTest(), mapsBefore); +} + +TEST(MediapipeIdleUnloadGuard, UnloadIsNoOpWhenStateReloading) { + ovms::MediapipeGraphConfig mgc{"idleGuard", "", ""}; + mgc.setIdleUnloadTimeoutSeconds(10); + DummyMediapipeGraphDefinition def("idleGuard", mgc, kIdleUnloadDummyPbtxt, nullptr); + // Drive BEGIN -> AVAILABLE -> RELOADING. + def.forceValidationPassedEventForTest(); + ASSERT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::AVAILABLE); + def.forceReloadEventForTest(); + ASSERT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::RELOADING); + + def.insertSidePacketMarkerForTest("marker"); + const void* mapsBefore = def.sidePacketMapsPtrForTest(); + + ASSERT_EQ(def.unload(), ovms::StatusCode::OK); + + // Critical: unload() must NOT have cleared resources while RELOADING. + ASSERT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::RELOADING); + ASSERT_TRUE(def.hasSidePacketMarkerForTest("marker")); + ASSERT_EQ(def.sidePacketMapsPtrForTest(), mapsBefore); +} + +TEST(MediapipeIdleUnloadGuard, UnloadTransitionsAndTearsDownWhenAvailable) { + ovms::MediapipeGraphConfig mgc{"idleGuard", "", ""}; + mgc.setIdleUnloadTimeoutSeconds(10); + DummyMediapipeGraphDefinition def("idleGuard", mgc, kIdleUnloadDummyPbtxt, nullptr); + def.forceValidationPassedEventForTest(); + ASSERT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::AVAILABLE); + def.insertSidePacketMarkerForTest("marker"); + const void* mapsBefore = def.sidePacketMapsPtrForTest(); + + ASSERT_EQ(def.unload(), ovms::StatusCode::OK); + + // Now it should have transitioned and cleared (but kept the same object). + ASSERT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::UNLOADED); + ASSERT_FALSE(def.hasSidePacketMarkerForTest("marker")); + ASSERT_TRUE(def.sidePacketMapsEmptyForTest()); + ASSERT_EQ(def.sidePacketMapsPtrForTest(), mapsBefore); // clear(), not reset() +} + +TEST(MediapipeIdleUnloadGuard, UnloadSkipsWhenRequestsInFlight) { + ovms::MediapipeGraphConfig mgc{"idleGuard", "", ""}; + mgc.setIdleUnloadTimeoutSeconds(10); + DummyMediapipeGraphDefinition def("idleGuard", mgc, kIdleUnloadDummyPbtxt, nullptr); + def.forceValidationPassedEventForTest(); + ASSERT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::AVAILABLE); + def.insertSidePacketMarkerForTest("marker"); + const void* mapsBefore = def.sidePacketMapsPtrForTest(); + + { + // Simulate an in-flight request by holding an unload guard (bumps the counter). + ovms::ServableDefinitionUnloadGuard guard(def); + ASSERT_EQ(def.requestsHandlesCounterForTest(), 1u); + + // unload() must skip without tearing down because counter > 0. + ASSERT_EQ(def.unload(), ovms::StatusCode::OK); + ASSERT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::AVAILABLE); + ASSERT_TRUE(def.hasSidePacketMarkerForTest("marker")); + ASSERT_EQ(def.sidePacketMapsPtrForTest(), mapsBefore); + } + // After the guard releases, unload() now proceeds. + ASSERT_EQ(def.requestsHandlesCounterForTest(), 0u); + ASSERT_EQ(def.unload(), ovms::StatusCode::OK); + ASSERT_EQ(def.getStateCode(), ovms::PipelineDefinitionStateCode::UNLOADED); + ASSERT_TRUE(def.sidePacketMapsEmptyForTest()); +} diff --git a/src/test/pipelinedefinitionstatus_test.cpp b/src/test/pipelinedefinitionstatus_test.cpp index 578ad38295..e920af07dd 100644 --- a/src/test/pipelinedefinitionstatus_test.cpp +++ b/src/test/pipelinedefinitionstatus_test.cpp @@ -302,3 +302,133 @@ TEST(PipelineDefinitionStatus, ConvertToModelStatus) { ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::RETIRED); ASSERT_EQ((std::tuple(ModelVersionState::END, ModelVersionStatusErrorCode::OK)), pds.convertToModelStatus()); } + +// --------------------------------------------------------------------------- +// Idle unload feature: UNLOADED state transitions (issue #4141) +// --------------------------------------------------------------------------- + +TEST(PipelineDefinitionStatus, AvailableThenUnload) { + PipelineDefinitionStatus pds(unusedPipelineType, unusedPipelineName); + pds.handle(ValidationPassedEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::AVAILABLE); + pds.handle(UnloadEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::UNLOADED); +} + +TEST(PipelineDefinitionStatus, UnloadedThenReloadGoesToReloading) { + PipelineDefinitionStatus pds(unusedPipelineType, unusedPipelineName); + pds.handle(ValidationPassedEvent()); + pds.handle(UnloadEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::UNLOADED); + pds.handle(ReloadEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::RELOADING); +} + +TEST(PipelineDefinitionStatus, UnloadedThenReloadThenValidationPassGoesToAvailable) { + PipelineDefinitionStatus pds(unusedPipelineType, unusedPipelineName); + pds.handle(ValidationPassedEvent()); + pds.handle(UnloadEvent()); + pds.handle(ReloadEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::RELOADING); + pds.handle(ValidationPassedEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::AVAILABLE); +} + +TEST(PipelineDefinitionStatus, UnloadedThenValidationPassDefensiveGoesToAvailable) { + PipelineDefinitionStatus pds(unusedPipelineType, unusedPipelineName); + pds.handle(ValidationPassedEvent()); + pds.handle(UnloadEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::UNLOADED); + pds.handle(ValidationPassedEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::AVAILABLE); +} + +TEST(PipelineDefinitionStatus, UnloadedThenRetireGoesToRetired) { + PipelineDefinitionStatus pds(unusedPipelineType, unusedPipelineName); + pds.handle(ValidationPassedEvent()); + pds.handle(UnloadEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::UNLOADED); + pds.handle(RetireEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::RETIRED); +} + +TEST(PipelineDefinitionStatus, UnloadedIsNotAvailable) { + PipelineDefinitionStatus pds(unusedPipelineType, unusedPipelineName); + pds.handle(ValidationPassedEvent()); + ASSERT_TRUE(pds.isAvailable()); + pds.handle(UnloadEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::UNLOADED); + ASSERT_FALSE(pds.isAvailable()); +} + +TEST(PipelineDefinitionStatus, UnloadedConvertsToModelStatusAvailable) { + PipelineDefinitionStatus pds(unusedPipelineType, unusedPipelineName); + pds.handle(ValidationPassedEvent()); + pds.handle(UnloadEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::UNLOADED); + // UNLOADED must report AVAILABLE so health checks / routing do not exclude the + // servable (it auto-reloads on the next inference request). + ASSERT_EQ((std::tuple(ModelVersionState::AVAILABLE, ModelVersionStatusErrorCode::OK)), pds.convertToModelStatus()); +} + +TEST(PipelineDefinitionStatus, UnloadEventOnBeginIsNoOp) { + PipelineDefinitionStatus pds(unusedPipelineType, unusedPipelineName); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::BEGIN); + pds.handle(UnloadEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::BEGIN); +} + +TEST(PipelineDefinitionStatus, UnloadEventOnReloadingIsNoOp) { + PipelineDefinitionStatus pds(unusedPipelineType, unusedPipelineName); + pds.handle(ValidationPassedEvent()); + pds.handle(ReloadEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::RELOADING); + pds.handle(UnloadEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::RELOADING); +} + +TEST(PipelineDefinitionStatus, UnloadEventOnRetiredIsNoOp) { + PipelineDefinitionStatus pds(unusedPipelineType, unusedPipelineName); + pds.handle(ValidationPassedEvent()); + pds.handle(RetireEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::RETIRED); + pds.handle(UnloadEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::RETIRED); +} + +TEST(PipelineDefinitionStatus, UnloadEventOnLoadingPreconditionFailedRevertsToUnloaded) { + // A failed wake-up reload (validate -> LOADING_PRECONDITION_FAILED) is reverted + // to UNLOADED by wakeUpIfUnloaded() via UnloadEvent so the next request retries. + PipelineDefinitionStatus pds(unusedPipelineType, unusedPipelineName); + pds.handle(ValidationFailedEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::LOADING_PRECONDITION_FAILED); + pds.handle(UnloadEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::UNLOADED); +} + +TEST(PipelineDefinitionStatus, UnloadedAfterFailedWakeIsRetryableViaReload) { + // Full retry path: AVAILABLE -> UNLOADED -> (wake) RELOADING -> (fail) FAILED + // -> (revert) UNLOADED -> (retry wake) RELOADING -> (pass) AVAILABLE. + PipelineDefinitionStatus pds(unusedPipelineType, unusedPipelineName); + pds.handle(ValidationPassedEvent()); + pds.handle(UnloadEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::UNLOADED); + pds.handle(ReloadEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::RELOADING); + pds.handle(ValidationFailedEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::LOADING_PRECONDITION_FAILED); + pds.handle(UnloadEvent()); // wakeUpIfUnloaded reverts on failure + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::UNLOADED); + pds.handle(ReloadEvent()); + pds.handle(ValidationPassedEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::AVAILABLE); +} + +TEST(PipelineDefinitionStatus, UnloadEventOnUnloadedIsIdempotent) { + PipelineDefinitionStatus pds(unusedPipelineType, unusedPipelineName); + pds.handle(ValidationPassedEvent()); + pds.handle(UnloadEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::UNLOADED); + pds.handle(UnloadEvent()); + ASSERT_EQ(pds.getStateCode(), ovms::PipelineDefinitionStateCode::UNLOADED); +} diff --git a/src/test/schema_test.cpp b/src/test/schema_test.cpp index 2b5aaf11fd..fad87515f2 100644 --- a/src/test/schema_test.cpp +++ b/src/test/schema_test.cpp @@ -2003,6 +2003,66 @@ TEST(SchemaTest, MediapipeConfigInModelConfigPositive) { auto result = ovms::validateJsonAgainstSchema(configDoc, ovms::MODELS_CONFIG_SCHEMA.c_str()); EXPECT_EQ(result, ovms::StatusCode::OK); } + +TEST(SchemaTest, MediapipeConfigIdleUnloadTimeoutPositive) { + const char* mediapipeConfigPositive = R"( + { + "model_config_list": [], + "mediapipe_config_list": [ + { + "name": "dummy_model", + "graph_path": "graph.pbtxt", + "base_path": "dummy_path_base", + "idle_unload_timeout_seconds": 300 + } + ] + })"; + + rapidjson::Document configDoc; + configDoc.Parse(mediapipeConfigPositive); + auto result = ovms::validateJsonAgainstSchema(configDoc, ovms::MODELS_CONFIG_SCHEMA.c_str()); + EXPECT_EQ(result, ovms::StatusCode::OK); +} + +TEST(SchemaTest, MediapipeConfigIdleUnloadTimeoutNegativeValueRejected) { + const char* mediapipeConfigNegative = R"( + { + "model_config_list": [], + "mediapipe_config_list": [ + { + "name": "dummy_model", + "graph_path": "graph.pbtxt", + "base_path": "dummy_path_base", + "idle_unload_timeout_seconds": -5 + } + ] + })"; + + rapidjson::Document configDoc; + configDoc.Parse(mediapipeConfigNegative); + auto result = ovms::validateJsonAgainstSchema(configDoc, ovms::MODELS_CONFIG_SCHEMA.c_str()); + EXPECT_EQ(result, ovms::StatusCode::JSON_INVALID); +} + +TEST(SchemaTest, MediapipeConfigIdleUnloadTimeoutWrongTypeRejected) { + const char* mediapipeConfigNegative = R"( + { + "model_config_list": [], + "mediapipe_config_list": [ + { + "name": "dummy_model", + "graph_path": "graph.pbtxt", + "base_path": "dummy_path_base", + "idle_unload_timeout_seconds": "notAnInteger" + } + ] + })"; + + rapidjson::Document configDoc; + configDoc.Parse(mediapipeConfigNegative); + auto result = ovms::validateJsonAgainstSchema(configDoc, ovms::MODELS_CONFIG_SCHEMA.c_str()); + EXPECT_EQ(result, ovms::StatusCode::JSON_INVALID); +} #endif TEST(SchemaTest, MediapipeConfigNegativeAdditionalMediapipeConfigField) { diff --git a/src/test/test_utils.hpp b/src/test/test_utils.hpp index 2510ca2b60..eeb00f3d73 100644 --- a/src/test/test_utils.hpp +++ b/src/test/test_utils.hpp @@ -853,6 +853,23 @@ class DummyMediapipeGraphDefinition : public ovms::MediapipeGraphDefinition { ovms::GenAiServableMap& getGenAiServableMap() { return this->sidePacketMaps->genAiServableMap; } + // Test seams for idle-unload concurrency tests. + // Drive the underlying state machine directly. + void forceReloadEventForTest() { this->status.handle(ovms::ReloadEvent()); } + void forceValidationPassedEventForTest() { this->status.handle(ovms::ValidationPassedEvent()); } + // Identity of the sidePacketMaps shared_ptr, so a test can detect whether it + // was reset/swapped (unload uses clear(), not reset(), so the pointer must be stable). + const void* sidePacketMapsPtrForTest() const { return static_cast(this->sidePacketMaps.get()); } + bool sidePacketMapsEmptyForTest() { return this->sidePacketMaps->empty(); } + // Insert a harmless marker into a side-packet map so we can detect teardown. + void insertSidePacketMarkerForTest(const std::string& key) { + this->sidePacketMaps->genAiServableMap.insert({key, nullptr}); + } + bool hasSidePacketMarkerForTest(const std::string& key) { + return this->sidePacketMaps->genAiServableMap.count(key) > 0; + } + uint64_t requestsHandlesCounterForTest() const { return this->requestsHandlesCounter.load(); } + DummyMediapipeGraphDefinition(const std::string name, const ovms::MediapipeGraphConfig& config, std::string inputConfig,