This repository was archived by the owner on Sep 23, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
201 lines (174 loc) · 7.42 KB
/
main.py
File metadata and controls
201 lines (174 loc) · 7.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import os
import json
import hmac
import hashlib
import time
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel
from typing import List, Optional, Dict, Any, AsyncGenerator
import asyncio
import google.generativeai as genai
from concurrent.futures import ThreadPoolExecutor
from dotenv import load_dotenv
import httpx
from fastapi.middleware.cors import CORSMiddleware
load_dotenv()
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Webhook signature verification
def verify_signature(request_body: bytes, signature_header: str, secret: str, timestamp_tolerance: int = 300) -> bool:
# signature_header is expected in the format: t=timestamp,v1=signature
try:
parts = dict(item.split('=') for item in signature_header.split(','))
timestamp = int(parts['t'])
signature = parts['v1']
except Exception:
return False
# Check timestamp tolerance
now = int(time.time())
if abs(now - timestamp) > timestamp_tolerance:
return False
# Reconstruct signed payload
signed_payload = f"{timestamp}.{request_body.decode()}"
expected_signature = hmac.new(
secret.encode(),
signed_payload.encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected_signature)
async def verify_webhook(request: Request):
signature_header = request.headers.get("layercode-signature")
if not signature_header:
raise HTTPException(status_code=401, detail="Missing signature header")
body = await request.body()
if not verify_signature(body, signature_header, os.getenv("LAYERCODE_WEBHOOK_SECRET", "")):
raise HTTPException(status_code=401, detail="Invalid signature")
return body
class MessageContent(BaseModel):
type: str
text: str
class Message(BaseModel):
role: str
content: List[MessageContent]
session_messages: Dict[str, List[Message]] = {}
# Load prompt and welcome message from layercode.config.json
try:
config_path = os.path.join(os.path.dirname(__file__), "layercode.config.json")
with open(config_path, "r") as config_file:
_layercode_config = json.load(config_file)
except Exception:
_layercode_config = {}
SYSTEM_PROMPT = _layercode_config.get(
"prompt",
"You are a helpful conversation assistant. You should respond to the user's message in a conversational manner. Your output will be spoken by a TTS model. You should respond in a way that is easy for the TTS model to speak and sound natural.",
)
WELCOME_MESSAGE = _layercode_config.get(
"welcome_message",
"Welcome to Layercode. How can I help you today?",
)
class RequestBody(BaseModel):
text: str
type: str
session_id: str
turn_id: str
GOOGLE_API_KEY = os.getenv("GOOGLE_GENERATIVE_AI_API_KEY")
genai.configure(api_key=GOOGLE_API_KEY)
gemini_executor = ThreadPoolExecutor(max_workers=2)
def to_gemini_messages(messages: List[Message]):
# Flatten to Gemini's expected format
return [
{"role": m.role, "parts": [c.text for c in m.content if c.type == "text"]}
for m in messages
]
def gemini_stream_response(messages: List[Message], system_prompt: str):
model = genai.GenerativeModel("gemini-2.0-flash-001")
# Copy messages to avoid mutating the original
messages_for_gemini = messages.copy()
if messages_for_gemini and messages_for_gemini[0].role == "user":
# Prepend system prompt to the first user message
messages_for_gemini[0].content[0].text = f"{system_prompt}\n\n{messages_for_gemini[0].content[0].text}"
chat = model.start_chat(history=to_gemini_messages(messages_for_gemini))
return chat.send_message(messages[-1].content[0].text, stream=True)
async def stream_google_gemini(messages: List[Message], system_prompt: str) -> AsyncGenerator[str, None]:
loop = asyncio.get_event_loop()
stream = await loop.run_in_executor(
gemini_executor, gemini_stream_response, messages, system_prompt
)
for chunk in stream:
if hasattr(chunk, "text"):
yield chunk.text
elif isinstance(chunk, dict) and "text" in chunk:
yield chunk["text"]
@app.post("/api/agent")
async def agent_endpoint(body: RequestBody, verified_body: bytes = Depends(verify_webhook)):
messages = session_messages.setdefault(body.session_id, [])
# Add user message
messages.append(Message(role="user", content=[MessageContent(type="text", text=body.text)]))
if body.type == "session.start":
async def welcome_stream():
data = json.dumps(
{
"type": "response.tts",
"content": WELCOME_MESSAGE,
"turn_id": body.turn_id,
}
)
yield f"data: {data}\n\n"
messages.append(Message(role="assistant", content=[MessageContent(type="text", text=WELCOME_MESSAGE)]))
session_messages[body.session_id] = messages
end_data = json.dumps({"type": "response.end", "turn_id": body.turn_id})
yield f"data: {end_data}\n\n"
return StreamingResponse(welcome_stream(), media_type="text/event-stream")
text_stream = stream_google_gemini(messages, SYSTEM_PROMPT)
async def streaming_and_save():
# Optionally send a data message (like in Next.js)
data = json.dumps({"textToBeShown": "Hello, how can I help you today?"})
yield f"data: {data}\n\n"
full_response = ""
async for chunk in text_stream:
full_response += chunk
data = json.dumps(
{"type": "response.tts", "content": chunk, "turn_id": body.turn_id}
)
yield f"data: {data}\n\n"
end_data = json.dumps({"type": "response.end", "turn_id": body.turn_id})
yield f"data: {end_data}\n\n"
# Save assistant's response to session
messages.append(Message(role="assistant", content=[MessageContent(type="text", text=full_response)]))
session_messages[body.session_id] = messages
return StreamingResponse(streaming_and_save(), media_type="text/event-stream")
@app.post("/api/authorize")
async def authorize_endpoint(request: Request):
api_key = os.getenv("LAYERCODE_API_KEY")
if not api_key:
return JSONResponse({"error": "LAYERCODE_API_KEY is not set."}, status_code=500)
try:
body = await request.json()
except Exception:
return JSONResponse({"error": "Invalid JSON body."}, status_code=400)
if not body or not body.get("agent_id"):
return JSONResponse({"error": "Missing agent_id in request body."}, status_code=400)
endpoint = "https://api.layercode.com/v1/agents/web/authorize_session"
try:
async with httpx.AsyncClient() as client:
response = await client.post(
endpoint,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
},
json=body,
)
if response.status_code != 200:
return JSONResponse({"error": response.text}, status_code=500)
return JSONResponse(response.json())
except Exception as error:
print("Layercode authorize session response error:", str(error))
return JSONResponse({"error": str(error)}, status_code=500)