Skip to content

Commit ae006fe

Browse files
Merge branch 'main' into gl/fix/output-parser-exception-retry-logic
# Conflicts: # lib/crewai/src/crewai/llms/providers/anthropic/completion.py
2 parents 07ac8fb + 61ad1fb commit ae006fe

29 files changed

+3156
-1362
lines changed

docs/en/concepts/llms.mdx

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1200,6 +1200,52 @@ Learn how to get the most out of your LLM configuration:
12001200
)
12011201
```
12021202
</Accordion>
1203+
1204+
<Accordion title="Transport Interceptors">
1205+
CrewAI provides message interceptors for several providers, allowing you to hook into request/response cycles at the transport layer.
1206+
1207+
**Supported Providers:**
1208+
- ✅ OpenAI
1209+
- ✅ Anthropic
1210+
1211+
**Basic Usage:**
1212+
```python
1213+
import httpx
1214+
from crewai import LLM
1215+
from crewai.llms.hooks import BaseInterceptor
1216+
1217+
class CustomInterceptor(BaseInterceptor[httpx.Request, httpx.Response]):
1218+
"""Custom interceptor to modify requests and responses."""
1219+
1220+
def on_outbound(self, request: httpx.Request) -> httpx.Request:
1221+
"""Print request before sending to the LLM provider."""
1222+
print(request)
1223+
return request
1224+
1225+
def on_inbound(self, response: httpx.Response) -> httpx.Response:
1226+
"""Process response after receiving from the LLM provider."""
1227+
print(f"Status: {response.status_code}")
1228+
print(f"Response time: {response.elapsed}")
1229+
return response
1230+
1231+
# Use the interceptor with an LLM
1232+
llm = LLM(
1233+
model="openai/gpt-4o",
1234+
interceptor=CustomInterceptor()
1235+
)
1236+
```
1237+
1238+
**Important Notes:**
1239+
- Both methods must return the received object or type of object.
1240+
- Modifying received objects may result in unexpected behavior or application crashes.
1241+
- Not all providers support interceptors - check the supported providers list above
1242+
1243+
<Info>
1244+
Interceptors operate at the transport layer. This is particularly useful for:
1245+
- Message transformation and filtering
1246+
- Debugging API interactions
1247+
</Info>
1248+
</Accordion>
12031249
</AccordionGroup>
12041250

12051251
## Common Issues and Solutions

lib/crewai/src/crewai/llm.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
)
2121

2222
from dotenv import load_dotenv
23+
import httpx
2324
from pydantic import BaseModel, Field
2425
from typing_extensions import Self
2526

@@ -53,6 +54,7 @@
5354
from litellm.utils import supports_response_schema
5455

5556
from crewai.agent.core import Agent
57+
from crewai.llms.hooks.base import BaseInterceptor
5658
from crewai.task import Task
5759
from crewai.tools.base_tool import BaseTool
5860
from crewai.utilities.types import LLMMessage
@@ -334,6 +336,8 @@ def __new__(cls, model: str, is_litellm: bool = False, **kwargs: Any) -> LLM:
334336
return cast(
335337
Self, native_class(model=model_string, provider=provider, **kwargs)
336338
)
339+
except NotImplementedError:
340+
raise
337341
except Exception as e:
338342
raise ImportError(f"Error importing native provider: {e}") from e
339343

@@ -403,6 +407,7 @@ def __init__(
403407
callbacks: list[Any] | None = None,
404408
reasoning_effort: Literal["none", "low", "medium", "high"] | None = None,
405409
stream: bool = False,
410+
interceptor: BaseInterceptor[httpx.Request, httpx.Response] | None = None,
406411
**kwargs: Any,
407412
) -> None:
408413
"""Initialize LLM instance.
@@ -442,6 +447,7 @@ def __init__(
442447
self.additional_params = kwargs
443448
self.is_anthropic = self._is_anthropic_model(model)
444449
self.stream = stream
450+
self.interceptor = interceptor
445451

446452
litellm.drop_params = True
447453

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
"""Interceptor contracts for crewai"""
2+
3+
from crewai.llms.hooks.base import BaseInterceptor
4+
5+
6+
__all__ = ["BaseInterceptor"]
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
"""Base classes for LLM transport interceptors.
2+
3+
This module provides abstract base classes for intercepting and modifying
4+
outbound and inbound messages at the transport level.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
from abc import ABC, abstractmethod
10+
from typing import Generic, TypeVar
11+
12+
13+
T = TypeVar("T")
14+
U = TypeVar("U")
15+
16+
17+
class BaseInterceptor(ABC, Generic[T, U]):
18+
"""Abstract base class for intercepting transport-level messages.
19+
20+
Provides hooks to intercept and modify outbound and inbound messages
21+
at the transport layer.
22+
23+
Type parameters:
24+
T: Outbound message type (e.g., httpx.Request)
25+
U: Inbound message type (e.g., httpx.Response)
26+
27+
Example:
28+
>>> class CustomInterceptor(BaseInterceptor[httpx.Request, httpx.Response]):
29+
... def on_outbound(self, message: httpx.Request) -> httpx.Request:
30+
... message.headers["X-Custom-Header"] = "value"
31+
... return message
32+
...
33+
... def on_inbound(self, message: httpx.Response) -> httpx.Response:
34+
... print(f"Status: {message.status_code}")
35+
... return message
36+
"""
37+
38+
@abstractmethod
39+
def on_outbound(self, message: T) -> T:
40+
"""Intercept outbound message before sending.
41+
42+
Args:
43+
message: Outbound message object.
44+
45+
Returns:
46+
Modified message object.
47+
"""
48+
...
49+
50+
@abstractmethod
51+
def on_inbound(self, message: U) -> U:
52+
"""Intercept inbound message after receiving.
53+
54+
Args:
55+
message: Inbound message object.
56+
57+
Returns:
58+
Modified message object.
59+
"""
60+
...
61+
62+
async def aon_outbound(self, message: T) -> T:
63+
"""Async version of on_outbound.
64+
65+
Args:
66+
message: Outbound message object.
67+
68+
Returns:
69+
Modified message object.
70+
"""
71+
raise NotImplementedError
72+
73+
async def aon_inbound(self, message: U) -> U:
74+
"""Async version of on_inbound.
75+
76+
Args:
77+
message: Inbound message object.
78+
79+
Returns:
80+
Modified message object.
81+
"""
82+
raise NotImplementedError
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
"""HTTP transport implementations for LLM request/response interception.
2+
3+
This module provides internal transport classes that integrate with BaseInterceptor
4+
to enable request/response modification at the transport level.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
from typing import TYPE_CHECKING, Any
10+
11+
import httpx
12+
13+
14+
if TYPE_CHECKING:
15+
from crewai.llms.hooks.base import BaseInterceptor
16+
17+
18+
class HTTPTransport(httpx.HTTPTransport):
19+
"""HTTP transport that uses an interceptor for request/response modification.
20+
21+
This transport is used internally when a user provides a BaseInterceptor.
22+
Users should not instantiate this class directly - instead, pass an interceptor
23+
to the LLM client and this transport will be created automatically.
24+
"""
25+
26+
def __init__(
27+
self,
28+
interceptor: BaseInterceptor[httpx.Request, httpx.Response],
29+
**kwargs: Any,
30+
) -> None:
31+
"""Initialize transport with interceptor.
32+
33+
Args:
34+
interceptor: HTTP interceptor for modifying raw request/response objects.
35+
**kwargs: Additional arguments passed to httpx.HTTPTransport.
36+
"""
37+
super().__init__(**kwargs)
38+
self.interceptor = interceptor
39+
40+
def handle_request(self, request: httpx.Request) -> httpx.Response:
41+
"""Handle request with interception.
42+
43+
Args:
44+
request: The HTTP request to handle.
45+
46+
Returns:
47+
The HTTP response.
48+
"""
49+
request = self.interceptor.on_outbound(request)
50+
response = super().handle_request(request)
51+
return self.interceptor.on_inbound(response)
52+
53+
54+
class AsyncHTTPransport(httpx.AsyncHTTPTransport):
55+
"""Async HTTP transport that uses an interceptor for request/response modification.
56+
57+
This transport is used internally when a user provides a BaseInterceptor.
58+
Users should not instantiate this class directly - instead, pass an interceptor
59+
to the LLM client and this transport will be created automatically.
60+
"""
61+
62+
def __init__(
63+
self,
64+
interceptor: BaseInterceptor[httpx.Request, httpx.Response],
65+
**kwargs: Any,
66+
) -> None:
67+
"""Initialize async transport with interceptor.
68+
69+
Args:
70+
interceptor: HTTP interceptor for modifying raw request/response objects.
71+
**kwargs: Additional arguments passed to httpx.AsyncHTTPTransport.
72+
"""
73+
super().__init__(**kwargs)
74+
self.interceptor = interceptor
75+
76+
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
77+
"""Handle async request with interception.
78+
79+
Args:
80+
request: The HTTP request to handle.
81+
82+
Returns:
83+
The HTTP response.
84+
"""
85+
request = await self.interceptor.aon_outbound(request)
86+
response = await super().handle_async_request(request)
87+
return await self.interceptor.aon_inbound(response)

lib/crewai/src/crewai/llms/providers/anthropic/completion.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,28 @@
22

33
import logging
44
import os
5-
from typing import Any, cast
5+
from typing import TYPE_CHECKING, Any, cast
66

77
from pydantic import BaseModel
88

99
from crewai.events.types.llm_events import LLMCallType
1010
from crewai.llms.base_llm import BaseLLM
11+
from crewai.llms.hooks.transport import HTTPTransport
1112
from crewai.utilities.agent_utils import is_context_length_exceeded
1213
from crewai.utilities.exceptions.context_window_exceeding_exception import (
1314
LLMContextLengthExceededError,
1415
)
1516
from crewai.utilities.types import LLMMessage
1617

1718

19+
if TYPE_CHECKING:
20+
from crewai.llms.hooks.base import BaseInterceptor
21+
1822
try:
1923
from anthropic import Anthropic
2024
from anthropic.types import Message
2125
from anthropic.types.tool_use_block import ToolUseBlock
26+
import httpx
2227
except ImportError:
2328
raise ImportError(
2429
'Anthropic native provider not available, to install: uv add "crewai[anthropic]"'
@@ -45,6 +50,7 @@ def __init__(
4550
stop_sequences: list[str] | None = None,
4651
stream: bool = False,
4752
client_params: dict[str, Any] | None = None,
53+
interceptor: BaseInterceptor[httpx.Request, httpx.Response] | None = None,
4854
**kwargs: Any,
4955
):
5056
"""Initialize Anthropic chat completion client.
@@ -61,13 +67,15 @@ def __init__(
6167
stop_sequences: Stop sequences (Anthropic uses stop_sequences, not stop)
6268
stream: Enable streaming responses
6369
client_params: Additional parameters for the Anthropic client
70+
interceptor: HTTP interceptor for modifying requests/responses at transport level.
6471
**kwargs: Additional parameters
6572
"""
6673
super().__init__(
6774
model=model, temperature=temperature, stop=stop_sequences or [], **kwargs
6875
)
6976

7077
# Client params
78+
self.interceptor = interceptor
7179
self.client_params = client_params
7280
self.base_url = base_url
7381
self.timeout = timeout
@@ -100,6 +108,11 @@ def _get_client_params(self) -> dict[str, Any]:
100108
"max_retries": self.max_retries,
101109
}
102110

111+
if self.interceptor:
112+
transport = HTTPTransport(interceptor=self.interceptor)
113+
http_client = httpx.Client(transport=transport)
114+
client_params["http_client"] = http_client # type: ignore[assignment]
115+
103116
if self.client_params:
104117
client_params.update(self.client_params)
105118

0 commit comments

Comments
 (0)