From d5d3f76152137ad978c165e86e3c4ee7e28f675d Mon Sep 17 00:00:00 2001 From: Ckemplen <87258247+Ckemplen@users.noreply.github.com> Date: Mon, 16 Jun 2025 16:48:02 +0100 Subject: [PATCH] Add thread lifecycle management --- layers/CognitiveLayer.py | 20 +++++++++++ orchestration/CognitiveArchitecture.py | 49 +++++++++++++------------- ui/app.py | 39 ++++++++++++++++++-- 3 files changed, 80 insertions(+), 28 deletions(-) diff --git a/layers/CognitiveLayer.py b/layers/CognitiveLayer.py index 3bd1cc3..b002d46 100644 --- a/layers/CognitiveLayer.py +++ b/layers/CognitiveLayer.py @@ -22,6 +22,8 @@ import logging import queue import pathlib +import threading +import time from reasoning_engines.GPTModels import GPTModel from capability_manager import CapabilityManager @@ -68,6 +70,9 @@ def __init__(self, name): self.total_tokens: int = 0 self.total_cost: float = 0 + self._stop_event = threading.Event() + self.running = False + # Set up logging self.logger = logging.getLogger(name) # Create a logger for this layer self.logger.setLevel(logging.DEBUG) # Set the logging level @@ -251,3 +256,18 @@ def create_product(self): raise NotImplementedError("Subclasses must implement create_product method.") + def stop(self): + """Signal the main loop to stop.""" + self._stop_event.set() + + def main_loop(self): + """Run the layer until stop is requested.""" + self.running = True + while not self._stop_event.is_set(): + try: + self.execute() + except Exception as e: + self.logger.error(f"Error in main loop: {e}") + time.sleep(0.1) + self.running = False + diff --git a/orchestration/CognitiveArchitecture.py b/orchestration/CognitiveArchitecture.py index 2fdf4ab..a085386 100644 --- a/orchestration/CognitiveArchitecture.py +++ b/orchestration/CognitiveArchitecture.py @@ -96,43 +96,42 @@ def _check_thread_status(self): print(f'{key}: running={thread.running()}, done={thread.done()}') def start_execution(self): - """ - Start the execution of all layers in separate threads. - """ - # Create a thread for each layer - aspirational_thread = threading.Thread(target=self.aspirational_layer.main_loop) + """Start all layers in separate threads.""" + aspirational_thread = threading.Thread(target=self.aspirational_layer.main_loop, daemon=True) self.threads[LayerHierarchy.ASPIRATIONAL] = aspirational_thread - global_strategy_thread = threading.Thread(target=self.global_strategy_layer.main_loop) + global_strategy_thread = threading.Thread(target=self.global_strategy_layer.main_loop, daemon=True) self.threads[LayerHierarchy.GLOBAL_STRATEGY] = global_strategy_thread - agent_model_thread = threading.Thread(target=self.agent_model_layer.main_loop) + agent_model_thread = threading.Thread(target=self.agent_model_layer.main_loop, daemon=True) self.threads[LayerHierarchy.AGENT_MODEL] = agent_model_thread - executive_function_thread = threading.Thread(target=self.executive_function_layer.main_loop) + executive_function_thread = threading.Thread(target=self.executive_function_layer.main_loop, daemon=True) self.threads[LayerHierarchy.EXECUTIVE_FUNCTION] = executive_function_thread - cognitive_control_thread = threading.Thread(target=self.cognitive_control_layer.main_loop) + cognitive_control_thread = threading.Thread(target=self.cognitive_control_layer.main_loop, daemon=True) self.threads[LayerHierarchy.COGNITIVE_CONTROL] = cognitive_control_thread - task_prosecution_thread = threading.Thread(target=self.task_prosecution_layer.main_loop) + task_prosecution_thread = threading.Thread(target=self.task_prosecution_layer.main_loop, daemon=True) self.threads[LayerHierarchy.TASK_PROSECUTION] = task_prosecution_thread - # Start all threads - aspirational_thread.start() - global_strategy_thread.start() - agent_model_thread.start() - executive_function_thread.start() - cognitive_control_thread.start() - task_prosecution_thread.start() - - # Wait for all threads to finish - aspirational_thread.join() - global_strategy_thread.join() - agent_model_thread.join() - executive_function_thread.join() - cognitive_control_thread.join() - task_prosecution_thread.join() + for t in self.threads.values(): + t.start() + + def stop_execution(self): + """Signal all layers to stop and wait for threads to finish.""" + for layer in [ + self.aspirational_layer, + self.global_strategy_layer, + self.agent_model_layer, + self.executive_function_layer, + self.cognitive_control_layer, + self.task_prosecution_layer, + ]: + layer.stop() + + for t in self.threads.values(): + t.join() def process_input(self, input_data): """ diff --git a/ui/app.py b/ui/app.py index bb72c6d..2f55325 100644 --- a/ui/app.py +++ b/ui/app.py @@ -1,6 +1,8 @@ from flask import Flask, request, redirect, url_for, render_template_string import threading import time +import signal +import atexit from orchestration import CognitiveArchitecture @@ -9,6 +11,11 @@ # Instantiate the cognitive architecture architecture = CognitiveArchitecture() +# Control events for background threads +stop_event = threading.Event() +arch_thread = None +resource_thread = None + # Simple in-memory list of tasks tasks = [] @@ -20,12 +27,14 @@ def run_architecture(): """Start the cognitive architecture execution.""" architecture.start_execution() + stop_event.wait() + architecture.stop_execution() def update_resources(): """Periodically update resource status.""" global remaining_currency - while True: + while not stop_event.is_set(): if currency_resource: try: remaining_currency = currency_resource.get_remaining() @@ -34,9 +43,33 @@ def update_resources(): time.sleep(5) +def start_threads(): + global arch_thread, resource_thread + arch_thread = threading.Thread(target=run_architecture, daemon=True) + resource_thread = threading.Thread(target=update_resources, daemon=True) + arch_thread.start() + resource_thread.start() + + +def stop_threads(*args): + stop_event.set() + if arch_thread: + arch_thread.join() + if resource_thread: + resource_thread.join() + + # Start background threads -threading.Thread(target=run_architecture, daemon=True).start() -threading.Thread(target=update_resources, daemon=True).start() +start_threads() + +@app.teardown_appcontext +def shutdown_session(exception=None): + stop_threads() + +# Ensure background threads shut down with the Flask app +atexit.register(stop_threads) +signal.signal(signal.SIGINT, stop_threads) +signal.signal(signal.SIGTERM, stop_threads) @app.route('/tasks')