Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 68 additions & 11 deletions c/src/ml-api-service-extension.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ _ml_extension_msg_thread (gpointer data)
int status;

g_mutex_lock (&mls->lock);
ext->running = TRUE;
g_cond_signal (&mls->cond);
g_mutex_unlock (&mls->lock);

Expand Down Expand Up @@ -233,6 +232,66 @@ _ml_extension_msg_thread (gpointer data)
return NULL;
}

/**
* @brief Internal function to lazily start the message thread.
* @note This avoids creating/joining threads for applications that only create/destroy handles.
*/
static int
_ml_extension_ensure_msg_thread (ml_service_s * mls)
{
ml_extension_s *ext;
g_autofree gchar *thread_name = NULL;
g_autoptr (GError) err = NULL;

if (!mls || !mls->priv)
return ML_ERROR_INVALID_PARAMETER;

ext = (ml_extension_s *) mls->priv;

/* Fast path: already created. */
if (ext->msg_thread && ext->msg_queue)
return ML_ERROR_NONE;

g_mutex_lock (&mls->lock);

/* Double-check under lock. */
if (ext->msg_thread && ext->msg_queue) {
g_mutex_unlock (&mls->lock);
return ML_ERROR_NONE;
}

if (!ext->msg_queue) {
ext->msg_queue = g_async_queue_new_full (_ml_extension_msg_free);
if (!ext->msg_queue) {
g_mutex_unlock (&mls->lock);
return ML_ERROR_OUT_OF_MEMORY;
}
}

if (!ext->msg_thread) {
ext->running = TRUE;

thread_name = g_strdup_printf ("ml-ext-msg-%d", getpid ());
ext->msg_thread = g_thread_try_new (thread_name, _ml_extension_msg_thread,
mls, &err);
if (!ext->msg_thread) {
ext->running = FALSE;
_ml_error_report ("Failed to create ml-service extension message thread: %s",
err ? err->message : "unknown error");

/* Keep msg_queue for a later retry, but stop the thread control flag. */
g_mutex_unlock (&mls->lock);
return ML_ERROR_OUT_OF_MEMORY;
}

/* Wait until the message thread has been initialized. */
g_cond_wait (&mls->cond, &mls->lock);
}

g_mutex_unlock (&mls->lock);
return ML_ERROR_NONE;
}

/**
* @brief Wrapper to release tensors-info handle.
*/
Expand Down Expand Up @@ -586,7 +645,6 @@ int
_ml_service_extension_create (ml_service_s * mls, JsonObject * object)
{
ml_extension_s *ext;
g_autofree gchar *thread_name = g_strdup_printf ("ml-ext-msg-%d", getpid ());
int status;

mls->priv = ext = g_try_new0 (ml_extension_s, 1);
Expand All @@ -601,22 +659,15 @@ _ml_service_extension_create (ml_service_s * mls, JsonObject * object)
ext->max_input = DEFAULT_MAX_INPUT;
ext->node_table = g_hash_table_new_full (g_str_hash, g_str_equal, g_free,
_ml_extension_node_info_free);
ext->msg_thread = NULL;
ext->msg_queue = NULL;

status = _ml_extension_conf_parse_json (mls, object);
if (status != ML_ERROR_NONE) {
_ml_error_report_return (status,
"Failed to parse the ml-service extension configuration.");
}

g_mutex_lock (&mls->lock);

ext->msg_queue = g_async_queue_new_full (_ml_extension_msg_free);
ext->msg_thread = g_thread_new (thread_name, _ml_extension_msg_thread, mls);

/* Wait until the message thread has been initialized. */
g_cond_wait (&mls->cond, &mls->lock);
g_mutex_unlock (&mls->lock);

return ML_ERROR_NONE;
}

Expand Down Expand Up @@ -826,6 +877,12 @@ _ml_service_extension_request (ml_service_s * mls, const char *name,
ml_extension_msg_s *msg;
int status, len;

status = _ml_extension_ensure_msg_thread (mls);
if (status != ML_ERROR_NONE) {
_ml_error_report_return (status,
"Failed to start ml-service extension internal message thread.");
}

if (ext->type == ML_EXTENSION_TYPE_PIPELINE) {
ml_service_node_info_s *node_info;

Expand Down
Loading