Skip to content
2 changes: 1 addition & 1 deletion src/google/adk/tools/mcp_tool/mcp_session_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ async def before_request(
)
return

if 'Authorization' in headers:
if any(key.lower() == 'authorization' for key in headers):
logger.debug('Authorization header already present, not overwriting')
return

Expand Down
19 changes: 19 additions & 0 deletions tests/unittests/tools/mcp_tool/test_mcp_session_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1298,6 +1298,25 @@ def mock_refresh(req):

assert headers["Authorization"] == "Bearer refreshed_token"

@pytest.mark.skipif(not AIO_SUPPORTED, reason="google.auth.aio not supported")
@pytest.mark.asyncio
async def test_before_request_preserves_lowercase_authorization_header(self):
"""An existing lowercase authorization header prevents token injection."""
from google.adk.tools.mcp_tool.mcp_session_manager import _RefreshableAsyncCredentials

mock_creds = Mock()
mock_creds.expired = True
mock_creds.token = "service_account_token"
mock_creds.refresh = Mock()

credentials = _RefreshableAsyncCredentials(mock_creds)
headers = {"authorization": "Bearer user_token"}

await credentials.before_request(None, "GET", "http://example.com", headers)

assert headers == {"authorization": "Bearer user_token"}
mock_creds.refresh.assert_not_called()


class TestGoogleAuthAsyncByteStream:

Expand Down