From 34e233bdb46caf5e89ac6657d3008868ff34d69e Mon Sep 17 00:00:00 2001 From: Daniel Fernandes Date: Wed, 28 Jan 2026 13:37:51 +0000 Subject: [PATCH 1/8] Add log_response_details middleware --- src/blueapi/service/main.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 9711f2005..ed1b70388 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -17,8 +17,9 @@ Response, status, ) +from fastapi.concurrency import iterate_in_threadpool from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import RedirectResponse +from fastapi.responses import RedirectResponse, StreamingResponse from fastapi.security import OAuth2AuthorizationCodeBearer from observability_utils.tracing import ( add_span_attributes, @@ -154,6 +155,7 @@ def get_app(config: ApplicationConfig): app.add_exception_handler(jwt.PyJWTError, on_token_error_401) app.middleware("http")(add_api_version_header) app.middleware("http")(inject_propagated_observability_context) + app.middleware("http")(log_response_details) app.middleware("http")(log_request_details) if config.api.cors: app.add_middleware( @@ -607,6 +609,23 @@ async def log_request_details( return response +async def log_response_details( + request: Request, call_next: Callable[[Request], Awaitable[StreamingResponse]] +) -> Response: + response = await call_next(request) + + response_body = [section async for section in response.body_iterator] + response.body_iterator = iterate_in_threadpool(iter(response_body)) + + msg = f"Response body: {response_body}" + if request.url.path == "/healthz": + LOGGER.debug(msg) + else: + LOGGER.info(msg) + + return response + + async def inject_propagated_observability_context( request: Request, call_next: Callable[[Request], Awaitable[Response]] ) -> Response: From 25b4f8a2724a7d299f6dd9ae713ad7de35190119 Mon Sep 17 00:00:00 2001 From: Daniel Fernandes Date: Wed, 28 Jan 2026 13:43:20 +0000 Subject: [PATCH 2/8] Fix log statement, add comment --- src/blueapi/service/main.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index ed1b70388..9dfcc12c0 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -600,7 +600,8 @@ async def add_api_version_header( async def log_request_details( request: Request, call_next: Callable[[Request], Awaitable[Response]] ) -> Response: - msg = f"method: {request.method} url: {request.url} body: {await request.body()}" + msg = f"Request method: {request.method} url: {request.url} \ + body: {await request.body()}" if request.url.path == "/healthz": LOGGER.debug(msg) else: @@ -614,6 +615,7 @@ async def log_response_details( ) -> Response: response = await call_next(request) + # https://github.com/Kludex/starlette/issues/874#issuecomment-1027743996 response_body = [section async for section in response.body_iterator] response.body_iterator = iterate_in_threadpool(iter(response_body)) From b51e5352d79c7d42fd5c128a2b504b3c656f353e Mon Sep 17 00:00:00 2001 From: Daniel Fernandes Date: Wed, 28 Jan 2026 14:40:27 +0000 Subject: [PATCH 3/8] Aggregate response and request logs into single log --- src/blueapi/service/main.py | 28 +++++++++++----------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 9dfcc12c0..f125c56b6 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -155,7 +155,6 @@ def get_app(config: ApplicationConfig): app.add_exception_handler(jwt.PyJWTError, on_token_error_401) app.middleware("http")(add_api_version_header) app.middleware("http")(inject_propagated_observability_context) - app.middleware("http")(log_response_details) app.middleware("http")(log_request_details) if config.api.cors: app.add_middleware( @@ -598,19 +597,6 @@ async def add_api_version_header( async def log_request_details( - request: Request, call_next: Callable[[Request], Awaitable[Response]] -) -> Response: - msg = f"Request method: {request.method} url: {request.url} \ - body: {await request.body()}" - if request.url.path == "/healthz": - LOGGER.debug(msg) - else: - LOGGER.info(msg) - response = await call_next(request) - return response - - -async def log_response_details( request: Request, call_next: Callable[[Request], Awaitable[StreamingResponse]] ) -> Response: response = await call_next(request) @@ -619,11 +605,19 @@ async def log_response_details( response_body = [section async for section in response.body_iterator] response.body_iterator = iterate_in_threadpool(iter(response_body)) - msg = f"Response body: {response_body}" + msg = ( + f"{getattr(request.client, 'host', 'NO_ADDRESS')} {request.method}" + f" {request.url.path} {response.status_code}" + ) + + extra = { + "request_body": request.body, + "response_body": response_body, + } if request.url.path == "/healthz": - LOGGER.debug(msg) + LOGGER.debug(msg, extra=extra) else: - LOGGER.info(msg) + LOGGER.info(msg, extra=extra) return response From 128a1f89954af26c2ab17986e4443bc5c12dfe59 Mon Sep 17 00:00:00 2001 From: Daniel Fernandes Date: Wed, 28 Jan 2026 15:10:25 +0000 Subject: [PATCH 4/8] Fix placement of request body consumption --- src/blueapi/service/main.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index f125c56b6..c3f6dff2d 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -599,25 +599,27 @@ async def add_api_version_header( async def log_request_details( request: Request, call_next: Callable[[Request], Awaitable[StreamingResponse]] ) -> Response: + request_body = await request.body() + response = await call_next(request) # https://github.com/Kludex/starlette/issues/874#issuecomment-1027743996 response_body = [section async for section in response.body_iterator] response.body_iterator = iterate_in_threadpool(iter(response_body)) - msg = ( + log_message = ( f"{getattr(request.client, 'host', 'NO_ADDRESS')} {request.method}" f" {request.url.path} {response.status_code}" ) extra = { - "request_body": request.body, + "request_body": request_body, "response_body": response_body, } if request.url.path == "/healthz": - LOGGER.debug(msg, extra=extra) + LOGGER.debug(log_message, extra=extra) else: - LOGGER.info(msg, extra=extra) + LOGGER.info(log_message, extra=extra) return response From 659381017f122488e36cb5d98aa9730a400951d9 Mon Sep 17 00:00:00 2001 From: Daniel Fernandes Date: Wed, 28 Jan 2026 15:10:37 +0000 Subject: [PATCH 5/8] Fix logging test --- tests/unit_tests/service/test_main.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/unit_tests/service/test_main.py b/tests/unit_tests/service/test_main.py index 92d7cd414..d31ae3b66 100644 --- a/tests/unit_tests/service/test_main.py +++ b/tests/unit_tests/service/test_main.py @@ -22,7 +22,11 @@ async def root(): assert response.status_code == 200 logger.info.assert_called_once_with( - "method: GET url: http://testserver/ body: b''" + "testclient GET / 200", + extra={ + "request_body": b"", + "response_body": [b'{"message":"Hello World"}'], + }, ) From 46738e18bd4c5bed002817f029ec63d58db98052 Mon Sep 17 00:00:00 2001 From: Daniel Fernandes Date: Wed, 28 Jan 2026 15:21:30 +0000 Subject: [PATCH 6/8] Add docstring --- src/blueapi/service/main.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index c3f6dff2d..90dd0a950 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -599,6 +599,8 @@ async def add_api_version_header( async def log_request_details( request: Request, call_next: Callable[[Request], Awaitable[StreamingResponse]] ) -> Response: + """Middleware to log all request's host, method, path, status and request and + response bodies""" request_body = await request.body() response = await call_next(request) From 610f0a108486770021dc1629eda98a3dbbd5f4d5 Mon Sep 17 00:00:00 2001 From: Daniel Fernandes Date: Wed, 28 Jan 2026 15:29:50 +0000 Subject: [PATCH 7/8] Make log test endpoint a POST --- tests/unit_tests/service/test_main.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/unit_tests/service/test_main.py b/tests/unit_tests/service/test_main.py index d31ae3b66..2a7e8cb65 100644 --- a/tests/unit_tests/service/test_main.py +++ b/tests/unit_tests/service/test_main.py @@ -13,18 +13,18 @@ async def test_log_request_details(): app = FastAPI() app.middleware("http")(log_request_details) - @app.get("/") + @app.post("/") async def root(): return {"message": "Hello World"} client = TestClient(app) - response = client.get("/") + response = client.post("/", content="foo") assert response.status_code == 200 logger.info.assert_called_once_with( - "testclient GET / 200", + "testclient POST / 200", extra={ - "request_body": b"", + "request_body": b"foo", "response_body": [b'{"message":"Hello World"}'], }, ) From 5d10315ef984db4f75664a17890164c574bc1028 Mon Sep 17 00:00:00 2001 From: Daniel Fernandes Date: Fri, 13 Feb 2026 15:11:49 +0000 Subject: [PATCH 8/8] Make request and reponse bodies truncateable --- helm/blueapi/README.md | 5 ++- helm/blueapi/values.schema.json | 4 ++ helm/blueapi/values.yaml | 2 + src/blueapi/config.py | 1 + src/blueapi/service/main.py | 66 ++++++++++++++++++++------------- 5 files changed, 51 insertions(+), 27 deletions(-) diff --git a/helm/blueapi/README.md b/helm/blueapi/README.md index e4fe2c162..91c6b6b78 100644 --- a/helm/blueapi/README.md +++ b/helm/blueapi/README.md @@ -47,9 +47,10 @@ A Helm chart deploying a worker pod that runs Bluesky plans | tracing | object | `{"otlp":{"enabled":false,"protocol":"http/protobuf","server":{"host":"http://opentelemetry-collector.tracing","port":4318}}}` | Configure tracing: opentelemetry-collector.tracing should be available in all Diamond clusters | | volumeMounts | list | `[{"mountPath":"/config","name":"worker-config","readOnly":true}]` | Additional volumeMounts on the output StatefulSet definition. Define how volumes are mounted to the container referenced by using the same name. | | volumes | list | `[]` | Additional volumes on the output StatefulSet definition. Define volumes from e.g. Secrets, ConfigMaps or the Filesystem | -| worker | object | `{"api":{"url":"http://0.0.0.0:8000/"},"env":{"sources":[{"kind":"planFunctions","module":"dodal.plans"},{"kind":"planFunctions","module":"dodal.plan_stubs.wrapped"}]},"logging":{"graylog":{"enabled":false,"url":"tcp://graylog-log-target.diamond.ac.uk:12231/"},"level":"INFO"},"scratch":{"repositories":[],"root":"/workspace"},"stomp":{"auth":{"password":"guest","username":"guest"},"enabled":false,"url":"tcp://rabbitmq:61613/"}}` | Config for the worker goes here, will be mounted into a config file | +| worker | object | `{"api":{"url":"http://0.0.0.0:8000/"},"env":{"sources":[{"kind":"planFunctions","module":"dodal.plans"},{"kind":"planFunctions","module":"dodal.plan_stubs.wrapped"}]},"logging":{"graylog":{"enabled":false,"url":"tcp://graylog-log-target.diamond.ac.uk:12231/"},"level":"INFO","truncateBodies":true},"scratch":{"repositories":[],"root":"/workspace"},"stomp":{"auth":{"password":"guest","username":"guest"},"enabled":false,"url":"tcp://rabbitmq:61613/"}}` | Config for the worker goes here, will be mounted into a config file | | worker.api.url | string | `"http://0.0.0.0:8000/"` | 0.0.0.0 required to allow non-loopback traffic If using hostNetwork, the port must be free on the host | | worker.env.sources | list | `[{"kind":"planFunctions","module":"dodal.plans"},{"kind":"planFunctions","module":"dodal.plan_stubs.wrapped"}]` | modules (must be installed in the venv) to fetch devices/plans from | -| worker.logging | object | `{"graylog":{"enabled":false,"url":"tcp://graylog-log-target.diamond.ac.uk:12231/"},"level":"INFO"}` | Configures logging. Port 12231 is the `dodal` input on graylog which will be renamed `blueapi` | +| worker.logging | object | `{"graylog":{"enabled":false,"url":"tcp://graylog-log-target.diamond.ac.uk:12231/"},"level":"INFO","truncateBodies":true}` | Configures logging. Port 12231 is the `dodal` input on graylog which will be renamed `blueapi` | +| worker.logging.truncateBodies | bool | `true` | Whether to truncate request and response bodies, which can be arbitrarily large | | worker.scratch | object | `{"repositories":[],"root":"/workspace"}` | If initContainer is enabled the default branch of python projects in this section are installed into the venv *without their dependencies* | | worker.stomp | object | `{"auth":{"password":"guest","username":"guest"},"enabled":false,"url":"tcp://rabbitmq:61613/"}` | Message bus configuration for returning status to GDA/forwarding documents downstream Password may be in the form ${ENV_VAR} to be fetched from an environment variable e.g. mounted from a SealedSecret | diff --git a/helm/blueapi/values.schema.json b/helm/blueapi/values.schema.json index 1578c0dad..16d77fae1 100644 --- a/helm/blueapi/values.schema.json +++ b/helm/blueapi/values.schema.json @@ -393,6 +393,10 @@ }, "level": { "type": "string" + }, + "truncateBodies": { + "description": "Whether to truncate request and response bodies, which can be arbitrarily large", + "type": "boolean" } } }, diff --git a/helm/blueapi/values.yaml b/helm/blueapi/values.yaml index ec6515921..22bc296b9 100644 --- a/helm/blueapi/values.yaml +++ b/helm/blueapi/values.yaml @@ -205,6 +205,8 @@ worker: # -- Configures logging. Port 12231 is the `dodal` input on graylog which will be renamed `blueapi` logging: level: "INFO" + # -- Whether to truncate request and response bodies, which can be arbitrarily large + truncateBodies: True graylog: enabled: False url: tcp://graylog-log-target.diamond.ac.uk:12231/ diff --git a/src/blueapi/config.py b/src/blueapi/config.py index 54baa500a..75aad7ba9 100644 --- a/src/blueapi/config.py +++ b/src/blueapi/config.py @@ -152,6 +152,7 @@ class GraylogConfig(BlueapiBaseModel): class LoggingConfig(BlueapiBaseModel): level: LogLevel = "INFO" + truncate_bodies: bool = True graylog: GraylogConfig = GraylogConfig() diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 6b653e693..1cbcd38ad 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -155,7 +155,7 @@ def get_app(config: ApplicationConfig): app.add_exception_handler(jwt.PyJWTError, on_token_error_401) app.middleware("http")(add_api_version_header) app.middleware("http")(inject_propagated_observability_context) - app.middleware("http")(log_request_details) + app.middleware("http")(log_request_details(config.logging.truncate_bodies)) if config.api.cors: app.add_middleware( CORSMiddleware, @@ -595,34 +595,50 @@ async def add_api_version_header( return response -async def log_request_details( - request: Request, call_next: Callable[[Request], Awaitable[StreamingResponse]] -) -> Response: - """Middleware to log all request's host, method, path, status and request and - response bodies""" - request_body = await request.body() - - response = await call_next(request) +def log_request_details(truncate_bodies: bool): + async def inner( + request: Request, call_next: Callable[[Request], Awaitable[StreamingResponse]] + ) -> Response: + """Middleware to log all request's host, method, path, status and request and + response bodies""" + request_body = await request.body() + + response = await call_next(request) + + # https://github.com/Kludex/starlette/issues/874#issuecomment-1027743996 + response_body_list = [section async for section in response.body_iterator] + response.body_iterator = iterate_in_threadpool(iter(response_body_list)) + + response_body = b"" + for r in response_body_list: + if type(r) is bytes: + response_body += r + elif type(r) is str: + response_body += r.encode("utf-8") + elif type(r) is memoryview[int]: + response_body += bytes(r) + + log_message = ( + f"{getattr(request.client, 'host', 'NO_ADDRESS')} {request.method}" + f" {request.url.path} {response.status_code}" + ) - # https://github.com/Kludex/starlette/issues/874#issuecomment-1027743996 - response_body = [section async for section in response.body_iterator] - response.body_iterator = iterate_in_threadpool(iter(response_body)) + if truncate_bodies: + request_body = request_body[:1024] + response_body = response_body[:1024] - log_message = ( - f"{getattr(request.client, 'host', 'NO_ADDRESS')} {request.method}" - f" {request.url.path} {response.status_code}" - ) + extra = { + "request_body": request_body, + "response_body": response_body, + } + if request.url.path == "/healthz": + LOGGER.debug(log_message, extra=extra) + else: + LOGGER.info(log_message, extra=extra) - extra = { - "request_body": request_body, - "response_body": response_body, - } - if request.url.path == "/healthz": - LOGGER.debug(log_message, extra=extra) - else: - LOGGER.info(log_message, extra=extra) + return response - return response + return inner async def inject_propagated_observability_context(