diff --git a/backend/alembic/versions/add_workflow_crm_tables.py b/backend/alembic/versions/add_workflow_crm_tables.py new file mode 100644 index 00000000..9548a33a --- /dev/null +++ b/backend/alembic/versions/add_workflow_crm_tables.py @@ -0,0 +1,107 @@ +"""Add workflow and CRM tables. + +Revision ID: add_workflow_crm +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import UUID, JSON + +revision = "add_workflow_crm" +down_revision = None # standalone migration, safe to run in any order +branch_labels = None +depends_on = None + + +def upgrade(): + conn = op.get_bind() + + # ─── Workflows ───────────────────────────────────── + if not conn.dialect.has_table(conn, "workflows"): + op.create_table( + "workflows", + sa.Column("id", UUID(as_uuid=True), primary_key=True, server_default=sa.text("gen_random_uuid()")), + sa.Column("title", sa.String(500), nullable=False), + sa.Column("user_instruction", sa.Text, nullable=False), + sa.Column("status", sa.String(20), nullable=False, server_default="planning"), + sa.Column("created_by", UUID(as_uuid=True), sa.ForeignKey("users.id"), nullable=False), + sa.Column("tenant_id", UUID(as_uuid=True), sa.ForeignKey("tenants.id"), nullable=False), + sa.Column("summary", sa.Text), + sa.Column("next_steps", sa.Text), + sa.Column("plan_data", JSON), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()), + sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()), + sa.Column("completed_at", sa.DateTime(timezone=True)), + ) + + if not conn.dialect.has_table(conn, "workflow_steps"): + op.create_table( + "workflow_steps", + sa.Column("id", UUID(as_uuid=True), primary_key=True, server_default=sa.text("gen_random_uuid()")), + sa.Column("workflow_id", UUID(as_uuid=True), sa.ForeignKey("workflows.id", ondelete="CASCADE"), nullable=False), + sa.Column("agent_id", UUID(as_uuid=True), sa.ForeignKey("agents.id")), + sa.Column("task_id", UUID(as_uuid=True), sa.ForeignKey("tasks.id")), + sa.Column("step_order", sa.Integer, nullable=False), + sa.Column("title", sa.String(500), nullable=False), + sa.Column("instruction", sa.Text), + sa.Column("agent_name", sa.String(100)), + sa.Column("status", sa.String(20), nullable=False, server_default="pending"), + sa.Column("deliverable_type", sa.String(50), server_default="markdown"), + sa.Column("deliverable_data", JSON), + sa.Column("raw_output", sa.Text), + sa.Column("started_at", sa.DateTime(timezone=True)), + sa.Column("completed_at", sa.DateTime(timezone=True)), + ) + + # ─── CRM ─────────────────────────────────────────── + if not conn.dialect.has_table(conn, "crm_contacts"): + op.create_table( + "crm_contacts", + sa.Column("id", UUID(as_uuid=True), primary_key=True, server_default=sa.text("gen_random_uuid()")), + sa.Column("tenant_id", UUID(as_uuid=True), sa.ForeignKey("tenants.id"), nullable=False, index=True), + sa.Column("name", sa.String(200), nullable=False), + sa.Column("company", sa.String(300)), + sa.Column("email", sa.String(300)), + sa.Column("phone", sa.String(100)), + sa.Column("country", sa.String(100)), + sa.Column("industry", sa.String(200)), + sa.Column("source", sa.String(100)), + sa.Column("tags", JSON, server_default=sa.text("'[]'")), + sa.Column("chatwoot_contact_id", sa.Integer), + sa.Column("notes", sa.Text), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()), + sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()), + ) + + if not conn.dialect.has_table(conn, "crm_deals"): + op.create_table( + "crm_deals", + sa.Column("id", UUID(as_uuid=True), primary_key=True, server_default=sa.text("gen_random_uuid()")), + sa.Column("tenant_id", UUID(as_uuid=True), sa.ForeignKey("tenants.id"), nullable=False, index=True), + sa.Column("contact_id", UUID(as_uuid=True), sa.ForeignKey("crm_contacts.id", ondelete="CASCADE"), nullable=False), + sa.Column("title", sa.String(300), nullable=False), + sa.Column("stage", sa.String(50), nullable=False, server_default="lead"), + sa.Column("value", sa.Numeric(12, 2)), + sa.Column("currency", sa.String(10), server_default="USD"), + sa.Column("notes", sa.Text), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()), + sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()), + ) + + if not conn.dialect.has_table(conn, "crm_activities"): + op.create_table( + "crm_activities", + sa.Column("id", UUID(as_uuid=True), primary_key=True, server_default=sa.text("gen_random_uuid()")), + sa.Column("tenant_id", UUID(as_uuid=True), sa.ForeignKey("tenants.id"), nullable=False, index=True), + sa.Column("contact_id", UUID(as_uuid=True), sa.ForeignKey("crm_contacts.id", ondelete="CASCADE"), nullable=False), + sa.Column("type", sa.String(50), nullable=False), + sa.Column("summary", sa.Text, nullable=False), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()), + ) + + +def downgrade(): + op.drop_table("crm_activities") + op.drop_table("crm_deals") + op.drop_table("crm_contacts") + op.drop_table("workflow_steps") + op.drop_table("workflows") diff --git a/backend/app/api/crm.py b/backend/app/api/crm.py new file mode 100644 index 00000000..0425b3c4 --- /dev/null +++ b/backend/app/api/crm.py @@ -0,0 +1,322 @@ +"""CRM REST API — contacts, deals, activities with batch operations.""" + +import uuid + +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel +from sqlalchemy import select, or_, delete as sql_delete, func +from sqlalchemy.orm import selectinload + +from app.database import async_session +from app.api.auth import get_current_user +from app.models.user import User +from app.models.crm import CRMContact, CRMDeal, CRMActivity + +router = APIRouter(prefix="/crm", tags=["crm"]) + +VALID_STAGES = ("lead", "contacted", "qualified", "proposal", "negotiation", "won", "lost") + + +class ContactCreate(BaseModel): + name: str + company: str | None = None + email: str | None = None + phone: str | None = None + country: str | None = None + industry: str | None = None + source: str | None = None + tags: list[str] | None = None + notes: str | None = None + +class ContactUpdate(BaseModel): + name: str | None = None + company: str | None = None + email: str | None = None + phone: str | None = None + country: str | None = None + industry: str | None = None + source: str | None = None + tags: list[str] | None = None + notes: str | None = None + +class DealCreate(BaseModel): + contact_id: uuid.UUID + title: str + stage: str = "lead" + value: float | None = None + currency: str = "USD" + notes: str | None = None + +class DealUpdate(BaseModel): + title: str | None = None + stage: str | None = None + value: float | None = None + currency: str | None = None + notes: str | None = None + +class ActivityCreate(BaseModel): + contact_id: uuid.UUID + type: str + summary: str + +class BatchStageUpdate(BaseModel): + deal_ids: list[uuid.UUID] + stage: str + +class BatchIds(BaseModel): + ids: list[uuid.UUID] + + +def _contact_dict(c: CRMContact) -> dict: + d = { + "id": str(c.id), "name": c.name, "company": c.company, + "email": c.email, "phone": c.phone, "country": c.country, + "industry": c.industry, "source": c.source, "tags": c.tags or [], + "notes": c.notes, + "created_at": c.created_at.isoformat() if c.created_at else None, + } + if hasattr(c, "deals") and c.deals is not None: + d["deals"] = [{"id": str(dl.id), "title": dl.title, "stage": dl.stage, + "value": float(dl.value) if dl.value else None, "currency": dl.currency} + for dl in c.deals] + else: + d["deals"] = [] + return d + +def _deal_dict(d: CRMDeal) -> dict: + return { + "id": str(d.id), "contact_id": str(d.contact_id), "title": d.title, + "stage": d.stage, "value": float(d.value) if d.value else None, + "currency": d.currency, "notes": d.notes, + "created_at": d.created_at.isoformat() if d.created_at else None, + "contact_name": d.contact.name if d.contact else None, + "contact_company": d.contact.company if d.contact else None, + "contact_email": d.contact.email if d.contact else None, + } + + +# ── Contacts ── + +@router.get("/contacts") +async def list_contacts(search: str = "", country: str = "", source: str = "", + page: int = 1, size: int = 100, + current_user: User = Depends(get_current_user)): + async with async_session() as db: + q = select(CRMContact).where(CRMContact.tenant_id == current_user.tenant_id + ).options(selectinload(CRMContact.deals)) + if search: + q = q.where(or_(CRMContact.name.ilike(f"%{search}%"), + CRMContact.company.ilike(f"%{search}%"), + CRMContact.email.ilike(f"%{search}%"))) + if country: + q = q.where(CRMContact.country.ilike(f"%{country}%")) + if source: + q = q.where(CRMContact.source == source) + total = (await db.execute(select(func.count()).select_from(q.subquery()))).scalar() or 0 + q = q.order_by(CRMContact.created_at.desc()).offset((page - 1) * size).limit(size) + contacts = (await db.execute(q)).scalars().all() + return {"items": [_contact_dict(c) for c in contacts], "total": total, "page": page} + + +@router.get("/contacts/{contact_id}") +async def get_contact(contact_id: uuid.UUID, current_user: User = Depends(get_current_user)): + async with async_session() as db: + c = (await db.execute( + select(CRMContact).where(CRMContact.id == contact_id, CRMContact.tenant_id == current_user.tenant_id + ).options(selectinload(CRMContact.deals), selectinload(CRMContact.activities)) + )).scalar_one_or_none() + if not c: + raise HTTPException(404, "Contact not found") + d = _contact_dict(c) + d["activities"] = [{"id": str(a.id), "type": a.type, "summary": a.summary, + "created_at": a.created_at.isoformat() if a.created_at else None} + for a in sorted(c.activities, key=lambda x: x.created_at, reverse=True)[:50]] + return d + + +@router.post("/contacts", status_code=201) +async def create_contact(body: ContactCreate, current_user: User = Depends(get_current_user)): + async with async_session() as db: + c = CRMContact(tenant_id=current_user.tenant_id, **body.model_dump(exclude_none=True)) + db.add(c) + await db.commit() + await db.refresh(c) + return {"id": str(c.id), "name": c.name, "company": c.company, "email": c.email, + "phone": c.phone, "country": c.country, "industry": c.industry, + "source": c.source, "tags": c.tags or [], "deals": [], + "created_at": c.created_at.isoformat() if c.created_at else None} + + +@router.patch("/contacts/{contact_id}") +async def update_contact(contact_id: uuid.UUID, body: ContactUpdate, current_user: User = Depends(get_current_user)): + async with async_session() as db: + c = (await db.execute(select(CRMContact).where( + CRMContact.id == contact_id, CRMContact.tenant_id == current_user.tenant_id))).scalar_one_or_none() + if not c: + raise HTTPException(404, "Contact not found") + for k, v in body.model_dump(exclude_none=True).items(): + setattr(c, k, v) + await db.commit() + await db.refresh(c) + return {"id": str(c.id), "name": c.name, "company": c.company, "email": c.email, + "phone": c.phone, "country": c.country, "industry": c.industry, + "source": c.source, "tags": c.tags or [], "deals": [], + "created_at": c.created_at.isoformat() if c.created_at else None} + + +@router.delete("/contacts/{contact_id}") +async def delete_contact(contact_id: uuid.UUID, current_user: User = Depends(get_current_user)): + async with async_session() as db: + c = (await db.execute(select(CRMContact).where( + CRMContact.id == contact_id, CRMContact.tenant_id == current_user.tenant_id))).scalar_one_or_none() + if not c: + raise HTTPException(404, "Contact not found") + await db.execute(sql_delete(CRMActivity).where(CRMActivity.contact_id == contact_id)) + await db.execute(sql_delete(CRMDeal).where(CRMDeal.contact_id == contact_id)) + await db.execute(sql_delete(CRMContact).where(CRMContact.id == contact_id)) + await db.commit() + return {"message": "deleted"} + + +@router.post("/contacts/batch-delete") +async def batch_delete_contacts(body: BatchIds, current_user: User = Depends(get_current_user)): + async with async_session() as db: + for cid in body.ids: + await db.execute(sql_delete(CRMActivity).where(CRMActivity.contact_id == cid, CRMActivity.tenant_id == current_user.tenant_id)) + await db.execute(sql_delete(CRMDeal).where(CRMDeal.contact_id == cid, CRMDeal.tenant_id == current_user.tenant_id)) + await db.execute(sql_delete(CRMContact).where(CRMContact.id == cid, CRMContact.tenant_id == current_user.tenant_id)) + await db.commit() + return {"deleted": len(body.ids)} + + +# ── Deals ── + +@router.get("/deals") +async def list_deals(stage: str = "", current_user: User = Depends(get_current_user)): + async with async_session() as db: + q = select(CRMDeal).where(CRMDeal.tenant_id == current_user.tenant_id).options(selectinload(CRMDeal.contact)) + if stage: + q = q.where(CRMDeal.stage == stage) + deals = (await db.execute(q.order_by(CRMDeal.created_at.desc()).limit(500))).scalars().all() + return [_deal_dict(d) for d in deals] + + +@router.post("/deals", status_code=201) +async def create_deal(body: DealCreate, current_user: User = Depends(get_current_user)): + if body.stage not in VALID_STAGES: + raise HTTPException(400, f"Invalid stage: {body.stage}") + async with async_session() as db: + contact = (await db.execute(select(CRMContact).where( + CRMContact.id == body.contact_id, CRMContact.tenant_id == current_user.tenant_id))).scalar_one_or_none() + if not contact: + raise HTTPException(404, "Contact not found") + d = CRMDeal(tenant_id=current_user.tenant_id, **body.model_dump()) + db.add(d) + db.add(CRMActivity(tenant_id=current_user.tenant_id, contact_id=body.contact_id, + type="deal_update", summary=f"Created deal: {body.title}")) + await db.commit() + await db.refresh(d) + return {"id": str(d.id), "title": d.title, "stage": d.stage, + "value": float(d.value) if d.value else None, + "contact_name": contact.name, "contact_company": contact.company, "contact_email": contact.email} + + +@router.patch("/deals/{deal_id}") +async def update_deal(deal_id: uuid.UUID, body: DealUpdate, current_user: User = Depends(get_current_user)): + if body.stage and body.stage not in VALID_STAGES: + raise HTTPException(400, f"Invalid stage: {body.stage}") + async with async_session() as db: + d = (await db.execute(select(CRMDeal).where( + CRMDeal.id == deal_id, CRMDeal.tenant_id == current_user.tenant_id))).scalar_one_or_none() + if not d: + raise HTTPException(404, "Deal not found") + old_stage = d.stage + for k, v in body.model_dump(exclude_none=True).items(): + setattr(d, k, v) + if body.stage and body.stage != old_stage: + db.add(CRMActivity(tenant_id=current_user.tenant_id, contact_id=d.contact_id, + type="deal_update", summary=f"Stage: {old_stage} -> {body.stage}")) + await db.commit() + return {"id": str(d.id), "title": d.title, "stage": d.stage} + + +@router.delete("/deals/{deal_id}") +async def delete_deal(deal_id: uuid.UUID, current_user: User = Depends(get_current_user)): + async with async_session() as db: + d = (await db.execute(select(CRMDeal).where( + CRMDeal.id == deal_id, CRMDeal.tenant_id == current_user.tenant_id))).scalar_one_or_none() + if not d: + raise HTTPException(404, "Deal not found") + await db.execute(sql_delete(CRMDeal).where(CRMDeal.id == deal_id)) + await db.commit() + return {"message": "deleted"} + + +@router.post("/deals/batch-stage") +async def batch_update_stage(body: BatchStageUpdate, current_user: User = Depends(get_current_user)): + if body.stage not in VALID_STAGES: + raise HTTPException(400, f"Invalid stage") + async with async_session() as db: + n = 0 + for did in body.deal_ids: + d = (await db.execute(select(CRMDeal).where( + CRMDeal.id == did, CRMDeal.tenant_id == current_user.tenant_id))).scalar_one_or_none() + if d and d.stage != body.stage: + old = d.stage + d.stage = body.stage + db.add(CRMActivity(tenant_id=current_user.tenant_id, contact_id=d.contact_id, + type="deal_update", summary=f"Batch: {old} -> {body.stage}")) + n += 1 + await db.commit() + return {"updated": n} + + +@router.post("/deals/batch-delete") +async def batch_delete_deals(body: BatchIds, current_user: User = Depends(get_current_user)): + async with async_session() as db: + for did in body.ids: + await db.execute(sql_delete(CRMDeal).where(CRMDeal.id == did, CRMDeal.tenant_id == current_user.tenant_id)) + await db.commit() + return {"deleted": len(body.ids)} + + +# ── Activities ── + +@router.get("/activities/{contact_id}") +async def list_activities(contact_id: uuid.UUID, current_user: User = Depends(get_current_user)): + async with async_session() as db: + acts = (await db.execute( + select(CRMActivity).where(CRMActivity.contact_id == contact_id, CRMActivity.tenant_id == current_user.tenant_id + ).order_by(CRMActivity.created_at.desc()).limit(100))).scalars().all() + return [{"id": str(a.id), "type": a.type, "summary": a.summary, + "created_at": a.created_at.isoformat() if a.created_at else None} for a in acts] + + +@router.post("/activities", status_code=201) +async def create_activity(body: ActivityCreate, current_user: User = Depends(get_current_user)): + async with async_session() as db: + cr = (await db.execute(select(CRMContact).where( + CRMContact.id == body.contact_id, CRMContact.tenant_id == current_user.tenant_id))).scalar_one_or_none() + if not cr: + raise HTTPException(404, "Contact not found") + a = CRMActivity(tenant_id=current_user.tenant_id, **body.model_dump()) + db.add(a) + await db.commit() + await db.refresh(a) + return {"id": str(a.id), "type": a.type, "summary": a.summary} + + +# ── Stats ── + +@router.get("/stats") +async def crm_stats(current_user: User = Depends(get_current_user)): + async with async_session() as db: + tid = current_user.tenant_id + contacts = (await db.execute(select(func.count()).select_from(CRMContact).where(CRMContact.tenant_id == tid))).scalar() or 0 + deals_total = (await db.execute(select(func.count()).select_from(CRMDeal).where(CRMDeal.tenant_id == tid))).scalar() or 0 + pipeline = {} + for s in VALID_STAGES: + row = (await db.execute(select(func.count(), func.coalesce(func.sum(CRMDeal.value), 0)).where( + CRMDeal.tenant_id == tid, CRMDeal.stage == s))).one() + pipeline[s] = {"count": row[0], "value": float(row[1])} + return {"contacts": contacts, "deals": deals_total, "pipeline": pipeline} diff --git a/backend/app/api/workflows.py b/backend/app/api/workflows.py new file mode 100644 index 00000000..416a8c06 --- /dev/null +++ b/backend/app/api/workflows.py @@ -0,0 +1,594 @@ +"""Workflow API routes.""" + +import csv +import io +import json +import re +import uuid + +from fastapi import APIRouter, Depends, HTTPException +from fastapi.responses import StreamingResponse +from pydantic import BaseModel, Field +from sqlalchemy.ext.asyncio import AsyncSession + +from app.database import get_db +from app.api.auth import get_current_user +from app.models.user import User + +router = APIRouter(prefix="/workflows", tags=["workflows"]) + + +def _can_access_workflow(wf: "Workflow | None", user: User) -> bool: + """Check user can access workflow: owner + same tenant, or platform_admin.""" + if not wf: + return False + if user.role == "platform_admin": + return True + return wf.created_by == user.id and wf.tenant_id == user.tenant_id + + +class WorkflowChat(BaseModel): + message: str = Field(min_length=1, max_length=5000) + + +class WorkflowCreate(BaseModel): + instruction: str = Field(min_length=2, max_length=5000) + + + + +class WorkflowUpdate(BaseModel): + title: str | None = None + +class WorkflowStepOut(BaseModel): + id: uuid.UUID + step_order: int + title: str + agent_name: str | None = None + status: str + deliverable_type: str + deliverable_data: dict | None = None + raw_output: str | None = None + started_at: str | None = None + completed_at: str | None = None + + model_config = {"from_attributes": True} + + +class WorkflowOut(BaseModel): + id: uuid.UUID + title: str + user_instruction: str + status: str + summary: str | None = None + next_steps: str | None = None + steps: list[WorkflowStepOut] = [] + created_at: str + completed_at: str | None = None + + model_config = {"from_attributes": True} + + +class WorkflowListItem(BaseModel): + id: uuid.UUID + title: str + status: str + created_at: str + completed_at: str | None = None + + model_config = {"from_attributes": True} + + +@router.post("/", status_code=201) +async def create_workflow( + body: WorkflowCreate, + current_user: User = Depends(get_current_user), +): + from app.services.workflow_orchestrator import create_and_run_workflow + wf_id = await create_and_run_workflow( + instruction=body.instruction, + user_id=current_user.id, + tenant_id=current_user.tenant_id, + ) + return {"id": str(wf_id), "message": "Workflow started"} + + +@router.get("/") +async def list_workflows( + page: int = 1, + size: int = 20, + tenant_id: str | None = None, + current_user: User = Depends(get_current_user), +): + from app.services.workflow_orchestrator import list_workflows as _list + effective_tenant = uuid.UUID(tenant_id) if tenant_id and current_user.role == "platform_admin" else current_user.tenant_id + workflows, total = await _list(effective_tenant, current_user.id, page, size) + return { + "items": [ + { + "id": str(w.id), "title": w.title, "status": w.status, + "created_at": w.created_at.isoformat() if w.created_at else None, + "completed_at": w.completed_at.isoformat() if w.completed_at else None, + } + for w in workflows + ], + "total": total, "page": page, "page_size": size, + } + + +@router.get("/{workflow_id}") +async def get_workflow( + workflow_id: uuid.UUID, + current_user: User = Depends(get_current_user), +): + from app.services.workflow_orchestrator import get_workflow_detail + wf = await get_workflow_detail(workflow_id) + if not _can_access_workflow(wf, current_user): + raise HTTPException(404, "Workflow not found") + + steps = sorted(wf.steps, key=lambda s: s.step_order) + return { + "id": str(wf.id), "title": wf.title, "user_instruction": wf.user_instruction, + "status": wf.status, "summary": wf.summary, "next_steps": wf.next_steps, + "created_at": wf.created_at.isoformat() if wf.created_at else None, + "completed_at": wf.completed_at.isoformat() if wf.completed_at else None, + "steps": [ + { + "id": str(s.id), "step_order": s.step_order, "title": s.title, + "agent_name": s.agent_name, "status": s.status, + "deliverable_type": s.deliverable_type, + "deliverable_data": s.deliverable_data, + "raw_output": s.raw_output, + "started_at": s.started_at.isoformat() if s.started_at else None, + "completed_at": s.completed_at.isoformat() if s.completed_at else None, + } + for s in steps + ], + } + + +@router.post("/{workflow_id}/steps/{step_id}/export") +async def export_step_csv( + workflow_id: uuid.UUID, + step_id: uuid.UUID, + current_user: User = Depends(get_current_user), +): + from app.services.workflow_orchestrator import get_workflow_detail + wf = await get_workflow_detail(workflow_id) + if not _can_access_workflow(wf, current_user): + raise HTTPException(404, "Workflow not found") + + step = next((s for s in wf.steps if s.id == step_id), None) + if not step or not step.raw_output: + raise HTTPException(404, "Step not found or no output") + + # Parse markdown table to CSV + lines = [l.strip() for l in step.raw_output.split("\n") if l.strip().startswith("|")] + if len(lines) < 2: + raise HTTPException(400, "No table found in output") + + rows = [] + for line in lines: + cells = [c.strip() for c in line.strip("|").split("|")] + if all(re.match(r"^[-:]+$", c) for c in cells): + continue + rows.append(cells) + + output = io.StringIO() + writer = csv.writer(output) + for row in rows: + writer.writerow(row) + output.seek(0) + + return StreamingResponse( + iter([output.getvalue()]), + media_type="text/csv", + headers={"Content-Disposition": f"attachment; filename=step-{step_id}.csv"}, + ) + + + + +@router.put("/{workflow_id}") +async def update_workflow( + workflow_id: uuid.UUID, + body: WorkflowUpdate, + current_user: User = Depends(get_current_user), +): + from app.database import async_session + from app.models.workflow import Workflow + from sqlalchemy import select + + async with async_session() as db: + result = await db.execute(select(Workflow).where(Workflow.id == workflow_id)) + wf = result.scalar_one_or_none() + if not _can_access_workflow(wf, current_user): + raise HTTPException(404, "Workflow not found") + if body.title is not None: + wf.title = body.title + await db.commit() + return {"message": "Workflow updated"} + + +@router.post("/{workflow_id}/retry") +async def retry_workflow( + workflow_id: uuid.UUID, + current_user: User = Depends(get_current_user), +): + """Retry all failed steps in a workflow.""" + import asyncio + from app.database import async_session + from app.models.workflow import Workflow, WorkflowStep + from app.services.workflow_orchestrator import _execute_step, _summarize_workflow + from sqlalchemy import select + from sqlalchemy.orm import selectinload + + async with async_session() as db: + result = await db.execute( + select(Workflow).where(Workflow.id == workflow_id).options(selectinload(Workflow.steps)) + ) + wf = result.scalar_one_or_none() + if not _can_access_workflow(wf, current_user): + raise HTTPException(404, "Workflow not found") + + failed_steps = [s for s in wf.steps if s.status == "failed"] + if not failed_steps: + return {"message": "No failed steps to retry", "retried": 0} + + # Reset failed steps + for s in failed_steps: + s.status = "pending" + s.raw_output = None + s.deliverable_data = None + wf.status = "running" + await db.commit() + + async def _retry_bg(): + from datetime import datetime, timezone + async with async_session() as db: + result = await db.execute( + select(Workflow).where(Workflow.id == workflow_id).options(selectinload(Workflow.steps)) + ) + wf = result.scalar_one() + steps = sorted(wf.steps, key=lambda s: s.step_order) + + prev_outputs = [] + for step in steps: + if step.status == "done" and step.raw_output: + prev_outputs.append(f"[{step.title}]\n{step.raw_output[:2000]}") + continue + if step.status != "pending": + continue + + async with async_session() as db: + result = await db.execute(select(WorkflowStep).where(WorkflowStep.id == step.id)) + s = result.scalar_one() + s.status = "running" + s.started_at = datetime.now(timezone.utc) + await db.commit() + + try: + output = await _execute_step(step, prev_outputs, wf.user_instruction) + prev_outputs.append(f"[{step.title}]\n{output[:2000]}") + async with async_session() as db: + result = await db.execute(select(WorkflowStep).where(WorkflowStep.id == step.id)) + s = result.scalar_one() + s.status = "done" + s.raw_output = output + s.deliverable_data = {"content": output} + s.completed_at = datetime.now(timezone.utc) + await db.commit() + await asyncio.sleep(3) + except Exception as e: + import logging + logging.getLogger(__name__).error(f"Retry step {step.id} failed: {e}") + async with async_session() as db: + result = await db.execute(select(WorkflowStep).where(WorkflowStep.id == step.id)) + s = result.scalar_one() + s.status = "failed" + s.raw_output = f"Error: {str(e)[:500]}" + s.completed_at = datetime.now(timezone.utc) + await db.commit() + + await _summarize_workflow(workflow_id) + + asyncio.create_task(_retry_bg(), name=f"workflow-retry-{workflow_id}") + return {"message": f"Retrying {len(failed_steps)} failed steps", "retried": len(failed_steps)} + + +@router.delete("/{workflow_id}") +async def delete_workflow( + workflow_id: uuid.UUID, + current_user: User = Depends(get_current_user), +): + from app.database import async_session + from app.models.workflow import Workflow, WorkflowStep + from sqlalchemy import select, delete as sql_delete + + async with async_session() as db: + result = await db.execute(select(Workflow).where(Workflow.id == workflow_id)) + wf = result.scalar_one_or_none() + if not _can_access_workflow(wf, current_user): + raise HTTPException(404, "Workflow not found") + await db.execute(sql_delete(WorkflowStep).where(WorkflowStep.workflow_id == workflow_id)) + await db.execute(sql_delete(Workflow).where(Workflow.id == workflow_id)) + await db.commit() + + return {"message": "Workflow deleted"} + + + +@router.post("/{workflow_id}/steps/{step_id}/import-to-crm") +async def import_step_to_crm( + workflow_id: uuid.UUID, + step_id: uuid.UUID, + current_user: User = Depends(get_current_user), +): + """Parse a workflow step's table output and import contacts into CRM.""" + import re as _re + from app.services.workflow_orchestrator import get_workflow_detail + from app.models.crm import CRMContact + + wf = await get_workflow_detail(workflow_id) + if not _can_access_workflow(wf, current_user): + raise HTTPException(404, "Workflow not found") + + step = next((s for s in wf.steps if s.id == step_id), None) + if not step or not step.raw_output: + raise HTTPException(404, "Step not found or no output") + + raw = step.raw_output + + # Parse markdown table: find rows with enough columns that contain real company data + table_lines = [l.strip() for l in raw.split("\n") if l.strip().startswith("|")] + + contacts = [] + for line in table_lines: + cells = [c.strip().replace("**", "").strip() for c in line.strip("|").split("|")] + + # Skip separator rows + if all(_re.match(r"^[-:]+$", c) for c in cells): + continue + # Skip header rows + if any(kw in line.lower() for kw in ["公司名称", "company", "国家", "country", "优先级", "priority"]): + continue + # Skip sub-category headers (e.g. | **太阳能EPC** |) + non_empty = [c for c in cells if c and c not in ("-", "")] + if len(non_empty) <= 2: + continue + # Must have at least 5 columns to be a real data row + if len(cells) < 5: + continue + # First cell might be a number, skip it to get company name + idx = 0 + if cells[0].isdigit() or cells[0] in ("", "#"): + idx = 1 + if idx >= len(cells): + continue + + company = cells[idx].strip() + # Skip if company name looks like a category or number + if not company or len(company) < 3 or company.isdigit(): + continue + if any(kw in company for kw in ["太阳能", "电信运营商", "矿业公司", "微型电网", "进口商", "分销商", "排名", "公司"]): + continue + + # Extract fields by position after company + country = cells[idx+1].strip() if idx+1 < len(cells) else "" + contact_person = cells[idx+2].strip() if idx+2 < len(cells) else "" + title = cells[idx+3].strip() if idx+3 < len(cells) else "" + email = cells[idx+4].strip() if idx+4 < len(cells) else "" + phone = cells[idx+5].strip() if idx+5 < len(cells) else "" + + # Clean empty/placeholder values + for val_name in ["country", "contact_person", "title", "email", "phone"]: + v = locals()[val_name] + if v in ("-", "", "?", "N/A", "Management", "Sales", "Procurement", "Operations", "BD Team", "Sales Team", "Energy Team", "Sustainability"): + locals()[val_name] = "" + + # Re-read cleaned values + clean_email = email if email not in ("-", "", "?", "N/A") else "" + clean_phone = phone if phone not in ("-", "", "?", "N/A") else "" + clean_country = country if country not in ("-", "", "?") else "" + clean_contact = contact_person if contact_person not in ("-", "", "?", "Management", "Sales", "Procurement", "Operations", "BD Team", "Sales Team", "Energy Team", "Sustainability", "CEO Office") else "" + + contacts.append({ + "company": company, + "name": clean_contact or company, + "country": clean_country, + "email": clean_email if "@" in (clean_email or "") else None, + "phone": clean_phone if clean_phone and len(clean_phone) > 5 else None, + "title": title if title not in ("-", "", "?") else None, + }) + + if not contacts: + raise HTTPException(400, "No valid contact data found in step output") + + from app.database import async_session + from sqlalchemy import select + imported = 0 + skipped = 0 + + async with async_session() as db: + for c in contacts: + # Check duplicate by company name + existing = await db.execute( + select(CRMContact).where( + CRMContact.tenant_id == current_user.tenant_id, + CRMContact.company == c["company"], + ) + ) + if existing.scalar_one_or_none(): + skipped += 1 + continue + + contact = CRMContact( + tenant_id=current_user.tenant_id, + name=c["name"], + company=c["company"], + email=c["email"], + phone=c["phone"], + country=c["country"], + industry=step.title, + source="workflow", + tags=["auto-import", wf.title[:50]], + ) + db.add(contact) + imported += 1 + + if imported > 0: + await db.commit() + + return {"imported": imported, "skipped": skipped, "total": len(contacts)} + + + +@router.post("/{workflow_id}/steps/{step_id}/export-pdf") +async def export_step_pdf( + workflow_id: uuid.UUID, + step_id: uuid.UUID, + current_user: User = Depends(get_current_user), +): + """Export a workflow step's markdown output as a styled PDF.""" + from app.services.workflow_orchestrator import get_workflow_detail + import subprocess, tempfile, os + + wf = await get_workflow_detail(workflow_id) + if not _can_access_workflow(wf, current_user): + raise HTTPException(404, "Workflow not found") + + step = next((s for s in wf.steps if s.id == step_id), None) + if not step or not step.raw_output: + raise HTTPException(404, "Step not found or no output") + + md_content = step.raw_output + title = step.title or "Report" + + # Build HTML from markdown + try: + import markdown + html_body = markdown.markdown( + md_content, + extensions=["tables", "fenced_code", "nl2br"], + ) + except ImportError: + # Fallback: basic conversion + html_body = md_content.replace("\n", "
") + + html = f""" + + +
+
PulseAgent Workflow Report
+
{title}
+
+

{title}

+{html_body} +
+ Generated by PulseAgent AI Workflow Engine +
+""" + + # Try weasyprint first, then wkhtmltopdf, then return HTML + pdf_bytes = None + + # Method 1: weasyprint + try: + from weasyprint import HTML as WeasyHTML + pdf_bytes = WeasyHTML(string=html).write_pdf() + except ImportError: + pass + + # Method 2: wkhtmltopdf + if not pdf_bytes: + try: + with tempfile.NamedTemporaryFile(suffix=".html", delete=False, mode="w") as f: + f.write(html) + html_path = f.name + pdf_path = html_path.replace(".html", ".pdf") + result = subprocess.run( + ["wkhtmltopdf", "--quiet", "--encoding", "utf-8", "--page-size", "A4", html_path, pdf_path], + capture_output=True, timeout=30, + ) + if result.returncode == 0 and os.path.exists(pdf_path): + with open(pdf_path, "rb") as f: + pdf_bytes = f.read() + os.unlink(html_path) + if os.path.exists(pdf_path): + os.unlink(pdf_path) + except (FileNotFoundError, subprocess.TimeoutExpired): + pass + + # Method 3: Return HTML as downloadable file + if not pdf_bytes: + return StreamingResponse( + iter([html.encode("utf-8")]), + media_type="text/html", + headers={"Content-Disposition": f"attachment; filename={title}.html"}, + ) + + return StreamingResponse( + iter([pdf_bytes]), + media_type="application/pdf", + headers={"Content-Disposition": f"attachment; filename={title}.pdf"}, + ) + + +@router.post("/{workflow_id}/chat") +async def workflow_chat( + workflow_id: uuid.UUID, + body: WorkflowChat, + current_user: User = Depends(get_current_user), +): + """Chat with AI in the context of a workflow — can dispatch agent actions.""" + from app.services.workflow_orchestrator import get_workflow_detail, call_llm_simple + + wf = await get_workflow_detail(workflow_id) + if not _can_access_workflow(wf, current_user): + raise HTTPException(404, "Workflow not found") + + # Build context from workflow state + steps_ctx = "" + for s in sorted(wf.steps, key=lambda x: x.step_order): + status_icon = "✅" if s.status == "done" else "❌" if s.status == "failed" else "🔄" if s.status == "running" else "⏳" + output_preview = (s.raw_output or "")[:600] + steps_ctx += f"\n### {status_icon} 步骤{s.step_order + 1}: {s.title} ({s.agent_name})\n状态: {s.status}\n产出摘要: {output_preview}\n" + + system = f"""你是 PulseAgent 工作流助手。用户正在查看一个外贸工作流的执行结果,你需要基于工作流上下文回答问题、提供建议、或帮助用户采取下一步行动。 + +## 工作流信息 +- 标题: {wf.title} +- 用户指令: {wf.user_instruction} +- 状态: {wf.status} +{f'- 汇总: {wf.summary}' if wf.summary else ''} +{f'- 建议下一步: {wf.next_steps}' if wf.next_steps else ''} + +## 各步骤执行结果 +{steps_ctx} + +## 你的职责 +1. 回答用户关于工作流结果的任何问题 +2. 基于产出数据提供深入分析和建议 +3. 如果用户要求采取行动(发邮件、发社媒、重试失败步骤等),告知具体操作方式 +4. 用中文回答,简洁专业""" + + try: + reply = await call_llm_simple(system, body.message) + return {"reply": reply} + except Exception as e: + raise HTTPException(500, f"AI 回复失败: {str(e)[:200]}") diff --git a/backend/app/main.py b/backend/app/main.py index d76c2be6..358e69fa 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -277,6 +277,8 @@ def _bg_task_error(t): from app.api.gateway import router as gateway_router from app.api.admin import router as admin_router from app.api.pages import router as pages_router, public_router as pages_public_router +from app.api.workflows import router as workflows_router +from app.api.crm import router as crm_router app.include_router(auth_router, prefix=settings.API_PREFIX) app.include_router(agents_router, prefix=settings.API_PREFIX) @@ -315,6 +317,8 @@ def _bg_task_error(t): app.include_router(admin_router, prefix=settings.API_PREFIX) app.include_router(pages_router, prefix=settings.API_PREFIX) app.include_router(pages_public_router) # Public endpoint for /p/{short_id}, no API prefix +app.include_router(workflows_router, prefix=settings.API_PREFIX) +app.include_router(crm_router, prefix=settings.API_PREFIX) @app.get("/api/health", response_model=HealthResponse, tags=["health"]) diff --git a/backend/app/models/crm.py b/backend/app/models/crm.py new file mode 100644 index 00000000..f315f06d --- /dev/null +++ b/backend/app/models/crm.py @@ -0,0 +1,62 @@ +"""CRM models — lightweight CRM for trade contacts and deals.""" + +import uuid +from datetime import datetime + +from sqlalchemy import DateTime, ForeignKey, Integer, Numeric, String, Text, func +from sqlalchemy.dialects.postgresql import ARRAY, JSON, UUID +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from app.database import Base + + +class CRMContact(Base): + __tablename__ = "crm_contacts" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + tenant_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("tenants.id"), nullable=False, index=True) + name: Mapped[str] = mapped_column(String(200), nullable=False) + company: Mapped[str | None] = mapped_column(String(300)) + email: Mapped[str | None] = mapped_column(String(300)) + phone: Mapped[str | None] = mapped_column(String(100)) + country: Mapped[str | None] = mapped_column(String(100)) + industry: Mapped[str | None] = mapped_column(String(200)) + source: Mapped[str | None] = mapped_column(String(100)) # google_maps, apollo, manual, chatwoot + tags: Mapped[list | None] = mapped_column(JSON, default=list) + chatwoot_contact_id: Mapped[int | None] = mapped_column(Integer) + notes: Mapped[str | None] = mapped_column(Text) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now()) + + deals: Mapped[list["CRMDeal"]] = relationship(back_populates="contact", cascade="all, delete-orphan") + activities: Mapped[list["CRMActivity"]] = relationship(back_populates="contact", cascade="all, delete-orphan") + + +class CRMDeal(Base): + __tablename__ = "crm_deals" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + tenant_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("tenants.id"), nullable=False, index=True) + contact_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("crm_contacts.id"), nullable=False) + title: Mapped[str] = mapped_column(String(300), nullable=False) + stage: Mapped[str] = mapped_column(String(50), default="lead", nullable=False) # lead/contacted/qualified/proposal/negotiation/won/lost + value: Mapped[float | None] = mapped_column(Numeric(12, 2)) + currency: Mapped[str] = mapped_column(String(10), default="USD") + notes: Mapped[str | None] = mapped_column(Text) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now()) + + contact: Mapped["CRMContact"] = relationship(back_populates="deals") + + +class CRMActivity(Base): + __tablename__ = "crm_activities" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + tenant_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("tenants.id"), nullable=False, index=True) + contact_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("crm_contacts.id"), nullable=False) + type: Mapped[str] = mapped_column(String(50), nullable=False) # email/call/meeting/note/deal_update + summary: Mapped[str] = mapped_column(Text, nullable=False) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) + + contact: Mapped["CRMContact"] = relationship(back_populates="activities") diff --git a/backend/app/models/workflow.py b/backend/app/models/workflow.py new file mode 100644 index 00000000..89bd1a8c --- /dev/null +++ b/backend/app/models/workflow.py @@ -0,0 +1,55 @@ +"""Workflow models — multi-agent orchestrated workflows.""" + +import uuid +from datetime import datetime + +from sqlalchemy import DateTime, ForeignKey, Integer, String, Text, func +from sqlalchemy.dialects.postgresql import JSON, UUID +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from app.database import Base + + +class Workflow(Base): + __tablename__ = "workflows" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + title: Mapped[str] = mapped_column(String(500), nullable=False) + user_instruction: Mapped[str] = mapped_column(Text, nullable=False) + status: Mapped[str] = mapped_column(String(20), default="planning", nullable=False) # planning/running/done/failed + created_by: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False) + tenant_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("tenants.id"), nullable=False) + summary: Mapped[str | None] = mapped_column(Text) + next_steps: Mapped[str | None] = mapped_column(Text) + plan_data: Mapped[dict | None] = mapped_column(JSON) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now()) + completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + + steps: Mapped[list["WorkflowStep"]] = relationship(back_populates="workflow", cascade="all, delete-orphan", order_by="WorkflowStep.step_order") + creator: Mapped["User"] = relationship("User", foreign_keys=[created_by]) + + +class WorkflowStep(Base): + __tablename__ = "workflow_steps" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + workflow_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("workflows.id"), nullable=False) + agent_id: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), ForeignKey("agents.id")) + task_id: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), ForeignKey("tasks.id")) + step_order: Mapped[int] = mapped_column(Integer, nullable=False) + title: Mapped[str] = mapped_column(String(500), nullable=False) + instruction: Mapped[str | None] = mapped_column(Text) + agent_name: Mapped[str | None] = mapped_column(String(100)) + status: Mapped[str] = mapped_column(String(20), default="pending", nullable=False) # pending/running/done/failed + deliverable_type: Mapped[str] = mapped_column(String(50), default="markdown") # table/markdown/email_template/pi/bant + deliverable_data: Mapped[dict | None] = mapped_column(JSON) + raw_output: Mapped[str | None] = mapped_column(Text) + started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + + workflow: Mapped["Workflow"] = relationship(back_populates="steps") + + +from app.models.agent import Agent # noqa +from app.models.user import User # noqa diff --git a/backend/app/services/workflow_orchestrator.py b/backend/app/services/workflow_orchestrator.py new file mode 100644 index 00000000..375c3177 --- /dev/null +++ b/backend/app/services/workflow_orchestrator.py @@ -0,0 +1,562 @@ +"""Workflow Orchestrator — plans and executes multi-agent trade workflows. + +Three phases: +1. Planning: LLM generates execution plan from user instruction + agent capabilities +2. Execution: Sequential step execution with full tool-calling loop +3. Summary: LLM summarizes all deliverables and suggests next steps +""" + +import asyncio +import json +import logging +import uuid +from datetime import datetime, timezone + +from sqlalchemy import select +from sqlalchemy.orm import selectinload + +from app.database import async_session +from app.models.agent import Agent +from app.models.llm import LLMModel +from app.models.workflow import Workflow, WorkflowStep + +logger = logging.getLogger(__name__) + + + +async def _retry_llm_call(fn, max_retries=3, base_delay=5.0): + """Retry LLM calls on transient errors with exponential backoff.""" + retryable = ("429", "overloaded", "rate_limit", "disconnected", "ConnectError", "timeout", "502", "503", "524") + for attempt in range(max_retries + 1): + try: + return await fn() + except Exception as e: + err_str = str(e) + if any(k in err_str for k in retryable) and attempt < max_retries: + delay = base_delay * (2 ** attempt) + logger.warning(f"LLM transient error (attempt {attempt+1}/{max_retries+1}): {err_str[:100]}, retrying in {delay}s...") + await asyncio.sleep(delay) + continue + raise + +# Agent name -> role mapping for the planning prompt +AGENT_ROLES = { + "Leo": "客户挖掘专家 — google_maps_search, find_emails, enrich_lead, web_search, jina_search, jina_read", + "Ava": "情报研究员 — enrich_lead, jina_search, jina_read, recall, remember, web_search", + "Marco": "跨文化开发信+社媒营销专家 — send_email, publish_social, social_plan_content, social_batch_schedule, social_list_posts, social_get_analytics, jina_search", + "Iris": "市场战略师 — web_search, jina_search, jina_read, social_get_analytics", + "Serena": "SDR需求分析 — recall, remember, jina_search, send_email, read_emails, reply_email", + "Orion": "成交专家 — generate_pi, send_email, write_file, execute_code", + "Meeseeks": "任务执行者 — write_file, execute_code, list_files, web_search", +} + +PLANNING_PROMPT = """你是 PulseAgent 外贸全流程工作流编排器。 + +用户输入了一个外贸业务目标,你需要编排多个 AI 数字员工协作完成全流程。 + +## 可用数字员工 +{agent_list} + +## 可用工具清单 +- web_search: Tavily 互联网搜索(行业报告、公司信息、市场数据) +- jina_search: Jina AI 搜索(结构化搜索结果) +- jina_read: 读取网页正文内容 +- google_maps_search: 按关键词+地区搜索 Google Maps 企业(名称、地址、电话、网站) +- find_emails: 查找公司域名关联邮箱 (Hunter.io) +- enrich_lead: 企业画像增强(规模、行业、联系人、社媒) +- send_email: 发送邮件(SMTP) +- read_emails: 读取收件箱 +- reply_email: 回复邮件 +- publish_social: 发布社媒到 LinkedIn/Twitter/Facebook 等(Postiz,真实发布) +- social_plan_content: AI 生成多天多平台社媒内容排期 +- social_batch_schedule: 批量排期发布社媒 +- social_list_posts: 查看已排期/已发布帖子 +- social_get_analytics: 获取社媒互动/曝光数据分析 +- generate_pi: 生成 Proforma Invoice 形式发票 +- write_file: 保存文件到工作空间 +- execute_code: 执行 Python 代码 +- remember / recall: 长期语义记忆存取 + +## 标准外贸全流程步骤(按需裁剪) +1. **市场调研** (Iris) → web_search + jina_search 搜索目标市场数据,输出市场分析报告 +2. **客户挖掘** (Leo) → google_maps_search + find_emails + enrich_lead 挖掘潜在客户,输出客户列表 +3. **客户画像** (Ava) → enrich_lead + jina_read 深入调研重点客户,输出客户画像 +4. **社媒营销** (Marco) → social_plan_content 生成内容计划 + publish_social 真实发布到社媒平台 +5. **开发信** (Marco) → 基于客户数据撰写个性化开发信 + send_email 真实发送 +6. **SDR 跟进** (Serena) → BANT 需求分析话术 + send_email 首次触达 +7. **报价单** (Orion) → generate_pi 生成 Proforma Invoice + +## 输出格式 +严格返回 JSON(无其他文字): +{{ + "title": "工作流标题(简短中文)", + "steps": [ + {{ + "agent_name": "员工名字(必须是上面列表中的)", + "title": "步骤标题", + "instruction": "给该员工的具体指令,要包含用户的产品/行业/目标市场等上下文。指令要足够详细,让员工能独立完成。", + "deliverable_type": "table|markdown|email_template|pi|social_post|report" + }} + ] +}} + +规则: +- 根据用户目标选择 4-7 个步骤,不必包含所有步骤 +- 不要添加"综合报告"或"汇总"步骤,工作流引擎会自动生成汇总 +- instruction 必须具体,包含用户提供的所有上下文(产品、公司、目标市场等) +- 如果用户提供了公司官网、产品信息,在每步 instruction 中都要传递 +- table 类型用于客户列表,email_template 用于开发信,report 用于分析报告,markdown 用于通用内容 +""" + +SUMMARY_PROMPT = """以下是一个外贸全流程工作流的执行结果: + +用户指令:{instruction} + +各步骤交付物: +{deliverables} + +请生成汇总,严格返回 JSON: +{{ + "summary": "200字以内的关键成果汇总", + "next_steps": "- 建议1(具体可操作)\\n- 建议2\\n- 建议3\\n- 建议4\\n- 建议5" +}} +""" + + +async def get_available_agents(tenant_id: uuid.UUID) -> list[dict]: + async with async_session() as db: + result = await db.execute( + select(Agent).where(Agent.tenant_id == tenant_id, Agent.status.in_(["running", "idle"])) + ) + agents = result.scalars().all() + return [ + { + "name": a.name, + "id": str(a.id), + "role": a.role_description or "", + "tools": AGENT_ROLES.get(a.name, ""), + } + for a in agents + ] + + +async def get_default_llm_model() -> LLMModel | None: + async with async_session() as db: + # Prefer non-free models to avoid rate limiting + result = await db.execute( + select(LLMModel).where(LLMModel.enabled == True) + .order_by(LLMModel.model.contains(":free").asc()) + .limit(1) + ) + return result.scalar_one_or_none() + + +async def call_llm_simple(system_prompt: str, user_prompt: str) -> str: + model = await get_default_llm_model() + if not model: + raise RuntimeError("No LLM model configured") + + from app.services.llm_client import create_llm_client + from app.services.llm_utils import LLMMessage + + client = create_llm_client( + provider=model.provider, + api_key=model.api_key_encrypted, + model=model.model, + base_url=model.base_url, + timeout=120.0, + ) + try: + async def _do_call(): + return await client.complete( + messages=[ + LLMMessage(role="system", content=system_prompt), + LLMMessage(role="user", content=user_prompt), + ], + temperature=0.3, + max_tokens=4096, + ) + response = await _retry_llm_call(_do_call, max_retries=3, base_delay=8.0) + return response.content or "" + finally: + await client.close() + + +async def plan_workflow(instruction: str, tenant_id: uuid.UUID) -> dict: + agents = await get_available_agents(tenant_id) + if not agents: + raise ValueError("No active agents found — please ensure at least one agent is running") + + agent_list = "\n".join( + f"- **{a['name']}**: {a['role']}" + (f" (工具: {a['tools']})" if a['tools'] else "") + for a in agents + ) + system = PLANNING_PROMPT.format(agent_list=agent_list) + + raw = await call_llm_simple(system, instruction) + raw = raw.strip() + if raw.startswith("```"): + raw = raw.split("\n", 1)[-1].rsplit("```", 1)[0] + + try: + plan = json.loads(raw) + except json.JSONDecodeError as e: + logger.error(f"Plan JSON parse error: {e}, raw: {raw[:500]}") + raise ValueError(f"LLM returned invalid plan: {str(e)[:100]}") + + # Map agent names to IDs + agent_map = {a["name"]: a["id"] for a in agents} + for step in plan.get("steps", []): + name = step.get("agent_name", "") + step["agent_id"] = agent_map.get(name) + if not step["agent_id"]: + for aname, aid in agent_map.items(): + if name.lower() in aname.lower() or aname.lower() in name.lower(): + step["agent_id"] = aid + step["agent_name"] = aname + break + if not step.get("agent_id") and agents: + step["agent_id"] = agents[0]["id"] + step["agent_name"] = agents[0]["name"] + + return plan + + +async def create_and_run_workflow( + instruction: str, user_id: uuid.UUID, tenant_id: uuid.UUID +) -> uuid.UUID: + # Create workflow record immediately, plan + execute in background + async with async_session() as db: + workflow = Workflow( + title=instruction[:80], + user_instruction=instruction, + status="planning", + created_by=user_id, + tenant_id=tenant_id, + ) + db.add(workflow) + await db.flush() + wf_id = workflow.id + await db.commit() + + asyncio.create_task(_plan_and_execute(wf_id, instruction, tenant_id), name=f"workflow-{wf_id}") + return wf_id + + +async def _plan_and_execute(workflow_id: uuid.UUID, instruction: str, tenant_id: uuid.UUID) -> None: + try: + plan = await plan_workflow(instruction, tenant_id) + title = plan.get("title", instruction[:80]) + + async with async_session() as db: + result = await db.execute(select(Workflow).where(Workflow.id == workflow_id)) + wf = result.scalar_one() + wf.title = title + wf.plan_data = plan + + for i, step_data in enumerate(plan.get("steps", [])): + step = WorkflowStep( + workflow_id=workflow_id, + agent_id=uuid.UUID(step_data["agent_id"]) if step_data.get("agent_id") else None, + step_order=i, + title=step_data.get("title", f"步骤 {i+1}"), + instruction=step_data.get("instruction", ""), + agent_name=step_data.get("agent_name", ""), + deliverable_type=step_data.get("deliverable_type", "markdown"), + ) + db.add(step) + + wf.status = "running" + await db.commit() + + await _execute_workflow(workflow_id) + + except Exception as e: + logger.error(f"Workflow {workflow_id} plan+execute error: {e}") + import traceback + traceback.print_exc() + try: + async with async_session() as db: + result = await db.execute(select(Workflow).where(Workflow.id == workflow_id)) + wf = result.scalar_one_or_none() + if wf: + wf.status = "failed" + wf.summary = f"规划失败: {str(e)[:200]}" + await db.commit() + except Exception as db_err: + logger.error(f"Failed to update workflow {workflow_id} status: {db_err}") + + +async def _execute_workflow(workflow_id: uuid.UUID) -> None: + try: + async with async_session() as db: + result = await db.execute( + select(Workflow).where(Workflow.id == workflow_id).options(selectinload(Workflow.steps)) + ) + workflow = result.scalar_one_or_none() + if not workflow: + return + steps = sorted(workflow.steps, key=lambda s: s.step_order) + + prev_outputs: list[str] = [] + + for step in steps: + async with async_session() as db: + result = await db.execute(select(WorkflowStep).where(WorkflowStep.id == step.id)) + s = result.scalar_one() + s.status = "running" + s.started_at = datetime.now(timezone.utc) + await db.commit() + + try: + output = await _execute_step(step, prev_outputs, workflow.user_instruction) + prev_outputs.append(f"[{step.title}]\n{output[:2000]}") + + async with async_session() as db: + result = await db.execute(select(WorkflowStep).where(WorkflowStep.id == step.id)) + s = result.scalar_one() + s.status = "done" + s.raw_output = output + s.deliverable_data = {"content": output} + s.completed_at = datetime.now(timezone.utc) + await db.commit() + + print(f"[Workflow] Step {step.step_order+1} '{step.title}' done ({len(output)} chars)") + + except Exception as e: + logger.error(f"Workflow step {step.id} failed: {e}") + async with async_session() as db: + result = await db.execute(select(WorkflowStep).where(WorkflowStep.id == step.id)) + s = result.scalar_one() + s.status = "failed" + s.raw_output = f"Error: {str(e)[:500]}" + s.completed_at = datetime.now(timezone.utc) + await db.commit() + + # Phase 3: Summary + await _summarize_workflow(workflow_id) + + except Exception as e: + logger.error(f"Workflow {workflow_id} execution error: {e}") + import traceback + traceback.print_exc() + try: + async with async_session() as db: + result = await db.execute(select(Workflow).where(Workflow.id == workflow_id)) + wf = result.scalar_one_or_none() + if wf: + wf.status = "failed" + await db.commit() + except Exception as db_err: + logger.error(f"Failed to update workflow {workflow_id} status: {db_err}") + + +async def _execute_step(step: WorkflowStep, prev_outputs: list[str], user_instruction: str) -> str: + if not step.agent_id: + return "No agent assigned" + + context_from_prev = "\n\n".join(prev_outputs[-3:]) if prev_outputs else "" + + deliverable_hints = { + "table": "输出 Markdown 表格,包含完整数据(公司名、联系人、邮箱、电话、网站等列)", + "markdown": "输出结构化 Markdown 报告", + "email_template": "输出 3 封不同风格的个性化开发信模板(邮件主题 + 正文),可直接发送", + "pi": "使用 generate_pi 工具生成 Proforma Invoice", + "social_post": """发布社媒内容。必须按以下步骤调用 publish_social 工具: +第一步:调用 publish_social(action="list_channels") 获取已连接的社媒渠道列表 +第二步:根据返回的 integration ID,调用 publish_social(action="post", content="帖子内容", integration_ids=["id1","id2"]) +注意:action 和 content 是必填参数。如果 list_channels 返回空,说明未连接社媒,改为输出帖子文案即可。""", + "report": "输出详细的分析报告,包含数据、洞察和建议", + "bant": "输出 BANT 客户分析(Budget/Authority/Need/Timeline)", + } + + hint = deliverable_hints.get(step.deliverable_type, "输出结构化 Markdown") + + task_prompt = f"""你正在执行一个外贸全流程工作流的一个环节。请充分使用你的工具来完成任务。 + +## 用户的总体目标 +{user_instruction} + +## 你的当前任务 +{step.title} + +## 具体指令 +{step.instruction} + +## 交付物要求 +{hint} + +{f'## 前置步骤产出(供参考)\n{context_from_prev}' if context_from_prev else ''} + +重要: +1. 主动使用你的工具(搜索、数据库查询、邮件发送等)获取真实数据 +2. 不要编造数据,如果工具调用失败,说明原因 +3. 直接输出交付物内容""" + + async with async_session() as db: + agent_result = await db.execute(select(Agent).where(Agent.id == step.agent_id)) + agent = agent_result.scalar_one_or_none() + if not agent: + return "Agent not found" + + model_id = agent.primary_model_id or agent.fallback_model_id + if not model_id: + return f"{agent.name} 未配置 LLM 模型" + + model_result = await db.execute(select(LLMModel).where(LLMModel.id == model_id)) + model = model_result.scalar_one_or_none() + if not model: + return "LLM model not found" + + creator_id = agent.creator_id + agent_name = agent.name + agent_role = agent.role_description or "" + + from app.services.llm_client import create_llm_client + from app.services.llm_utils import LLMMessage, get_max_tokens + from app.services.agent_context import build_agent_context + from app.services.agent_tools import get_agent_tools_for_llm, execute_tool + + system_prompt = await build_agent_context(step.agent_id, agent_name, agent_role) + + client = create_llm_client( + provider=model.provider, + api_key=model.api_key_encrypted, + model=model.model, + base_url=model.base_url, + timeout=600.0, + ) + + tools = await get_agent_tools_for_llm(step.agent_id) + + messages = [ + LLMMessage(role="system", content=system_prompt), + LLMMessage(role="user", content=task_prompt), + ] + + try: + reply = "" + tool_results_collected = [] + last_content = "" + repeat_count = 0 + for round_i in range(10): # Up to 10 tool rounds (reduced from 15) + async def _do_step_call(): + return await client.complete( + messages=messages, + tools=tools if tools else None, + temperature=0.5, + max_tokens=get_max_tokens(model.provider, model.model), + ) + response = await _retry_llm_call(_do_step_call, max_retries=3, base_delay=8.0) + + # Detect repeated content (loop prevention) + current_content = (response.content or "")[:200] + if current_content and current_content == last_content: + repeat_count += 1 + if repeat_count >= 2: + print(f"[Workflow] Step '{step.title}' detected loop (same output {repeat_count+1}x), stopping") + reply = response.content or "" + if tool_results_collected: + reply += "\n\n## 工具调用结果\n\n" + "\n\n".join(tool_results_collected[-3:]) + break + else: + repeat_count = 0 + last_content = current_content + + if response.tool_calls: + messages.append(LLMMessage( + role="assistant", content=response.content or None, + tool_calls=[{"id": tc["id"], "type": "function", "function": tc["function"]} for tc in response.tool_calls], + )) + for tc in response.tool_calls: + fn = tc["function"] + tool_name = fn["name"] + try: + args = json.loads(fn.get("arguments", "{}")) if fn.get("arguments") else {} + except Exception: + args = {} + print(f"[Workflow] Step '{step.title}' calling tool: {tool_name}") + tool_result = await execute_tool(tool_name, args, step.agent_id, creator_id) + tool_result_str = str(tool_result)[:8000] + tool_results_collected.append(f"[{tool_name}]: {tool_result_str[:2000]}") + messages.append(LLMMessage(role="tool", tool_call_id=tc["id"], content=tool_result_str)) + else: + reply = response.content or "" + break + else: + if tool_results_collected: + reply = "## 工具调用结果汇总\n\n" + "\n\n".join(tool_results_collected[-5:]) + else: + reply = response.content or "(max tool rounds reached)" + + return reply + except Exception as e: + logger.error(f"Step execution error: {e}") + raise + finally: + await client.close() + + +async def _summarize_workflow(workflow_id: uuid.UUID) -> None: + async with async_session() as db: + result = await db.execute( + select(Workflow).where(Workflow.id == workflow_id).options(selectinload(Workflow.steps)) + ) + workflow = result.scalar_one_or_none() + if not workflow: + return + + deliverables = "" + for s in sorted(workflow.steps, key=lambda x: x.step_order): + status_icon = "[完成]" if s.status == "done" else "[失败]" + output = (s.raw_output or "")[:800] + deliverables += f"\n### {status_icon} {s.title} ({s.agent_name})\n{output}\n" + + try: + system = "你是一个外贸工作流汇总助手。返回 JSON 格式。" + user_msg = SUMMARY_PROMPT.format(instruction=workflow.user_instruction, deliverables=deliverables) + raw = await call_llm_simple(system, user_msg) + raw = raw.strip() + if raw.startswith("```"): + raw = raw.split("\n", 1)[-1].rsplit("```", 1)[0] + result_data = json.loads(raw) + except Exception as e: + logger.error(f"Summary generation failed: {e}") + result_data = {"summary": "工作流已完成,请查看各步骤交付物", "next_steps": "- 审阅各步骤产出\n- 筛选高价值客户\n- 发送开发信"} + + async with async_session() as db: + result = await db.execute(select(Workflow).where(Workflow.id == workflow_id)) + wf = result.scalar_one() + wf.status = "done" + wf.summary = result_data.get("summary", "") + wf.next_steps = result_data.get("next_steps", "") + wf.completed_at = datetime.now(timezone.utc) + await db.commit() + + print(f"[Workflow] {workflow_id} completed with summary") + + +async def get_workflow_detail(workflow_id: uuid.UUID) -> Workflow | None: + async with async_session() as db: + result = await db.execute( + select(Workflow).where(Workflow.id == workflow_id).options(selectinload(Workflow.steps)) + ) + return result.scalar_one_or_none() + + +async def list_workflows(tenant_id: uuid.UUID, user_id: uuid.UUID, page: int = 1, size: int = 20) -> tuple[list[Workflow], int]: + async with async_session() as db: + from sqlalchemy import func as sqlfunc + count_result = await db.execute( + select(sqlfunc.count()).select_from(Workflow).where( + Workflow.tenant_id == tenant_id, Workflow.created_by == user_id + ) + ) + total = count_result.scalar() or 0 + + result = await db.execute( + select(Workflow).where( + Workflow.tenant_id == tenant_id, Workflow.created_by == user_id + ).order_by(Workflow.created_at.desc()).offset((page - 1) * size).limit(size) + ) + return list(result.scalars().all()), total diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 9531b273..f1682bba 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -14,6 +14,9 @@ import Messages from './pages/Messages'; import EnterpriseSettings from './pages/EnterpriseSettings'; import InvitationCodes from './pages/InvitationCodes'; import AdminCompanies from './pages/AdminCompanies'; +import WorkflowList from './pages/WorkflowList'; +import WorkflowDetail from './pages/WorkflowDetail'; +import CRMDashboard from './pages/CRMDashboard'; function ProtectedRoute({ children }: { children: React.ReactNode }) { const token = useAuthStore((s) => s.token); @@ -116,6 +119,9 @@ export default function App() { } /> } /> } /> + } /> + } /> + } /> diff --git a/frontend/src/i18n/en.json b/frontend/src/i18n/en.json index 51e9a1b5..5b93255e 100644 --- a/frontend/src/i18n/en.json +++ b/frontend/src/i18n/en.json @@ -53,7 +53,9 @@ "newAgent": "New Digital Employee", "enterprise": "Company Settings", "language": "Language", - "plaza": "Plaza" + "plaza": "Plaza", + "workflow": "Workflows", + "crm": "CRM" }, "auth": { @@ -920,5 +922,118 @@ "ws_step6": "Find the bot in WeCom and send a test message", "ws_note": "WebSocket mode requires no public IP, callback URL, or domain verification (ICP). The connection is managed automatically." } + }, + "workflow": { + "title": "Workflows", + "subtitle": "Enter your business goal and AI agents will collaborate to complete it", + "placeholder": "e.g. Help me develop the German LED lighting market", + "creating": "Starting...", + "create": "Start Workflow", + "loading": "Loading...", + "empty": "No workflows yet. Enter your goal to get started.", + "deleteConfirm": "Delete this workflow?", + "status": { + "planning": "Planning", + "running": "Running", + "done": "Completed", + "failed": "Failed" + }, + "detail": { + "loading": "Loading...", + "retrying": "Retrying...", + "retryFailed": "Retry Failed Steps", + "runningProgress": "Running {{done}}/{{total}}", + "chat": "Chat", + "hideChat": "Hide Chat", + "planningTitle": "Agents are planning", + "planningDesc": "AI team is analyzing your requirements, breaking down tasks and assigning to the best agents...", + "analyzing": "Analyzing requirements...", + "breakingDown": "Breaking down tasks...", + "assigning": "Assigning agents...", + "noSteps": "No task steps yet", + "summary": "Summary", + "copied": "Copied", + "copy": "Copy", + "importCRM": "Import to CRM", + "agentWorking": "{{name}} is working...", + "pendingExecution": "Waiting for execution", + "noContent": "No content", + "assembling": "Agent team assembling", + "assemblingDesc": "AI is analyzing \"{{instruction}}\", breaking it down into subtasks and assigning them to the best agents.", + "estimatedTime": "Usually takes 10-30 seconds", + "selectStep": "Select a step on the left to view deliverables" + }, + "chat": { + "title": "Workflow Assistant", + "context": "Based on current workflow context", + "empty": "Ask me anything about this workflow", + "suggestAnalyze": "Analyze the customer mining results", + "suggestPriority": "Which customers should I prioritize?", + "suggestOptimize": "Help me optimize outreach email content", + "suggestRetry": "Retry failed steps", + "thinking": "Thinking...", + "send": "Send", + "placeholder": "Type a message...", + "error": "Error: {{message}}" + }, + "importSuccess": "Import successful: {{imported}} contacts, skipped {{skipped}}" + }, + "crm": { + "title": "CRM", + "stats": "{{contacts}} contacts / {{deals}} deals", + "pipeline": "Pipeline", + "contacts": "Contacts", + "addContact": "+ Contact", + "searchPlaceholder": "Search name, company, email...", + "deleteSelected": "Delete {{count}}", + "deleteConfirm": "Delete {{count}} contacts?", + "loading": "Loading...", + "noContacts": "No contacts yet", + "showing": "Showing {{shown}} of {{total}}", + "newContact": "New Contact", + "editContact": "Edit Contact", + "fields": { + "name": "Name", + "nameRequired": "Name *", + "company": "Company", + "email": "Email", + "phone": "Phone", + "country": "Country", + "industry": "Industry", + "source": "Source", + "notes": "Notes", + "deals": "Deals", + "contactName": "Contact name" + }, + "deals": "Deals", + "addDeal": "+ Deal", + "newDeal": "New Deal", + "dealFields": { + "title": "Title", + "titleRequired": "Title *", + "value": "Value", + "currency": "Currency", + "stage": "Stage" + }, + "activity": "Activity", + "create": "Create", + "createDeal": "Create Deal", + "save": "Save", + "cancel": "Cancel", + "delete": "Delete", + "del": "Del", + "edit": "Edit", + "dropHere": "Drop here", + "deleteContactConfirm": "Delete this contact and all deals?", + "deleteDealConfirm": "Delete deal?", + "stages": { + "lead": "Lead", + "contacted": "Contacted", + "qualified": "Qualified", + "proposal": "Proposal", + "negotiation": "Negotiation", + "won": "Won", + "lost": "Lost" + } } } \ No newline at end of file diff --git a/frontend/src/i18n/zh.json b/frontend/src/i18n/zh.json index 08bef963..cd150875 100644 --- a/frontend/src/i18n/zh.json +++ b/frontend/src/i18n/zh.json @@ -35,7 +35,9 @@ "adminCompanies": "公司管理", "platformSettings": "平台设置", "language": "语言", - "plaza": "广场" + "plaza": "广场", + "workflow": "工作流", + "crm": "客户关系" }, "plaza": { "title": "智能体广场", @@ -1062,5 +1064,118 @@ "ws_step6": "在企业微信中找到机器人,发送消息测试", "ws_note": "WebSocket 长连接模式无需公网 IP、回调地址或域名备案(ICP),连接由系统自动管理。" } + }, + "workflow": { + "title": "工作流", + "subtitle": "输入你的外贸业务目标,AI 数字员工团队将自动协作完成", + "placeholder": "例如:帮我开发德国LED灯具市场", + "creating": "启动中...", + "create": "启动工作流", + "loading": "加载中...", + "empty": "还没有工作流,输入你的目标开始吧", + "deleteConfirm": "确定删除该工作流?", + "status": { + "planning": "规划中", + "running": "执行中", + "done": "已完成", + "failed": "失败" + }, + "detail": { + "loading": "加载中...", + "retrying": "重试中...", + "retryFailed": "重试失败步骤", + "runningProgress": "执行中 {{done}}/{{total}}", + "chat": "对话", + "hideChat": "收起对话", + "planningTitle": "数字员工正在规划中", + "planningDesc": "AI 团队正在分析您的需求,拆解任务并分配给最合适的数字员工,请稍候...", + "analyzing": "分析需求...", + "breakingDown": "拆解任务...", + "assigning": "分配员工...", + "noSteps": "暂无任务步骤", + "summary": "汇总", + "copied": "已复制", + "copy": "复制", + "importCRM": "导入CRM", + "agentWorking": "{{name}} 正在工作中...", + "pendingExecution": "等待执行", + "noContent": "无内容", + "assembling": "数字员工团队正在集结", + "assemblingDesc": "AI 正在分析「{{instruction}}」,将自动拆解为多个子任务并分配给最合适的数字员工执行。", + "estimatedTime": "通常需要 10-30 秒", + "selectStep": "选择左侧步骤查看交付物" + }, + "chat": { + "title": "工作流助手", + "context": "基于当前工作流上下文", + "empty": "可以问我关于这个工作流的任何问题", + "suggestAnalyze": "分析一下客户挖掘的结果", + "suggestPriority": "哪些客户值得优先跟进?", + "suggestOptimize": "帮我优化开发信内容", + "suggestRetry": "重试失败的步骤", + "thinking": "思考中...", + "send": "发送", + "placeholder": "输入消息...", + "error": "错误: {{message}}" + }, + "importSuccess": "导入成功: {{imported}} 条客户, 跳过 {{skipped}} 条" + }, + "crm": { + "title": "客户关系管理", + "stats": "{{contacts}} 联系人 / {{deals}} 商机", + "pipeline": "销售管道", + "contacts": "联系人", + "addContact": "+ 联系人", + "searchPlaceholder": "搜索姓名、公司、邮箱...", + "deleteSelected": "删除 {{count}}", + "deleteConfirm": "确定删除 {{count}} 个联系人?", + "loading": "加载中...", + "noContacts": "暂无联系人", + "showing": "显示 {{shown}} / {{total}}", + "newContact": "新建联系人", + "editContact": "编辑联系人", + "fields": { + "name": "姓名", + "nameRequired": "姓名 *", + "company": "公司", + "email": "邮箱", + "phone": "电话", + "country": "国家", + "industry": "行业", + "source": "来源", + "notes": "备注", + "deals": "商机", + "contactName": "联系人姓名" + }, + "deals": "商机", + "addDeal": "+ 商机", + "newDeal": "新建商机", + "dealFields": { + "title": "标题", + "titleRequired": "标题 *", + "value": "金额", + "currency": "币种", + "stage": "阶段" + }, + "activity": "活动记录", + "create": "创建", + "createDeal": "创建商机", + "save": "保存", + "cancel": "取消", + "delete": "删除", + "del": "删除", + "edit": "编辑", + "dropHere": "拖放到此处", + "deleteContactConfirm": "确定删除此联系人及所有商机?", + "deleteDealConfirm": "确定删除商机?", + "stages": { + "lead": "线索", + "contacted": "已联系", + "qualified": "已筛选", + "proposal": "方案中", + "negotiation": "谈判中", + "won": "成交", + "lost": "流失" + } } } diff --git a/frontend/src/pages/CRMDashboard.tsx b/frontend/src/pages/CRMDashboard.tsx new file mode 100644 index 00000000..90047d06 --- /dev/null +++ b/frontend/src/pages/CRMDashboard.tsx @@ -0,0 +1,408 @@ +import { useState, useEffect, useCallback, useRef, type DragEvent } from 'react'; +import { useTranslation } from 'react-i18next'; +import { crmApi } from '../services/api'; + +const inp: React.CSSProperties = { + width: '100%', padding: '8px 12px', borderRadius: 6, + border: '1px solid var(--border)', background: 'var(--bg-secondary)', + color: 'var(--text-primary)', fontSize: 13, +}; +const btn: React.CSSProperties = { + padding: '6px 14px', borderRadius: 6, border: 'none', + background: 'var(--accent)', color: '#fff', cursor: 'pointer', fontSize: 13, fontWeight: 500, +}; +const btnSec: React.CSSProperties = { ...btn, background: 'transparent', border: '1px solid var(--border)', color: 'var(--text-secondary)' }; + +export default function CRMDashboard() { + const { t } = useTranslation(); + + const STAGES = [ + { key: 'lead', label: t('crm.stages.lead', 'Lead'), color: '#94a3b8' }, + { key: 'contacted', label: t('crm.stages.contacted', 'Contacted'), color: '#60a5fa' }, + { key: 'qualified', label: t('crm.stages.qualified', 'Qualified'), color: '#a78bfa' }, + { key: 'proposal', label: t('crm.stages.proposal', 'Proposal'), color: '#f59e0b' }, + { key: 'negotiation', label: t('crm.stages.negotiation', 'Negotiation'), color: '#fb923c' }, + { key: 'won', label: t('crm.stages.won', 'Won'), color: '#10b981' }, + { key: 'lost', label: t('crm.stages.lost', 'Lost'), color: '#ef4444' }, + ]; + + const [view, setView] = useState<'pipeline' | 'contacts'>('pipeline'); + const [deals, setDeals] = useState([]); + const [contacts, setContacts] = useState([]); + const [total, setTotal] = useState(0); + const [search, setSearch] = useState(''); + const [loading, setLoading] = useState(false); + const [selected, setSelected] = useState>(new Set()); + const [detail, setDetail] = useState(null); + const [showAdd, setShowAdd] = useState(false); + const [showDeal, setShowDeal] = useState(null); + const [editContact, setEditContact] = useState(null); + const [stats, setStats] = useState(null); + const [dragId, setDragId] = useState(null); + const [dragOver, setDragOver] = useState(null); + const [form, setForm] = useState({ name: '', company: '', email: '', phone: '', country: '', industry: '', source: '', notes: '' }); + const [dealForm, setDealForm] = useState({ title: '', value: '', currency: 'USD', stage: 'lead', notes: '' }); + + const fetchDeals = useCallback(async () => { try { setDeals(await crmApi.listDeals()); } catch {} }, []); + const fetchContacts = useCallback(async () => { + setLoading(true); + try { + const res = await crmApi.listContacts({ search: search || undefined }); + setContacts(res.items || []); setTotal(res.total || 0); + } catch {} + setLoading(false); + }, [search]); + const fetchStats = useCallback(async () => { try { setStats(await crmApi.stats()); } catch {} }, []); + + useEffect(() => { fetchDeals(); fetchContacts(); fetchStats(); }, []); + useEffect(() => { const tm = setTimeout(fetchContacts, 300); return () => clearTimeout(tm); }, [search]); + + // -- Drag & Drop -- + const onDragStart = (e: DragEvent, dealId: string) => { setDragId(dealId); e.dataTransfer.effectAllowed = 'move'; }; + const onDragOver = (e: DragEvent, stageKey: string) => { e.preventDefault(); setDragOver(stageKey); }; + const onDragLeave = () => setDragOver(null); + const onDrop = async (e: DragEvent, stageKey: string) => { + e.preventDefault(); setDragOver(null); + if (!dragId) return; + const deal = deals.find(d => d.id === dragId); + if (deal && deal.stage !== stageKey) { + setDeals(prev => prev.map(d => d.id === dragId ? { ...d, stage: stageKey } : d)); + try { await crmApi.updateDeal(dragId, { stage: stageKey }); fetchStats(); } catch { fetchDeals(); } + } + setDragId(null); + }; + + // -- Batch -- + const toggleSelect = (id: string) => setSelected(prev => { const s = new Set(prev); s.has(id) ? s.delete(id) : s.add(id); return s; }); + const selectAll = () => { if (selected.size === contacts.length) setSelected(new Set()); else setSelected(new Set(contacts.map(c => c.id))); }; + const batchDelete = async () => { + if (!selected.size || !confirm(t('crm.deleteConfirm', 'Delete {{count}} contacts?', { count: selected.size }))) return; + await crmApi.batchDeleteContacts([...selected]); setSelected(new Set()); fetchContacts(); fetchDeals(); fetchStats(); + }; + + // -- CRUD -- + const addContact = async () => { + if (!form.name.trim()) return; + await crmApi.createContact(form); + setForm({ name: '', company: '', email: '', phone: '', country: '', industry: '', source: '', notes: '' }); + setShowAdd(false); fetchContacts(); fetchStats(); + }; + const saveEdit = async () => { + if (!editContact) return; + await crmApi.updateContact(editContact.id, editContact); + setEditContact(null); fetchContacts(); if (detail?.id === editContact.id) openDetail(editContact.id); + }; + const deleteContact = async (id: string) => { + if (!confirm(t('crm.deleteContactConfirm', 'Delete this contact and all deals?'))) return; + await crmApi.deleteContact(id); setDetail(null); fetchContacts(); fetchDeals(); fetchStats(); + }; + const openDetail = async (id: string) => { try { setDetail(await crmApi.getContact(id)); } catch {} }; + const addDeal = async () => { + if (!showDeal || !dealForm.title) return; + await crmApi.createDeal({ contact_id: showDeal, title: dealForm.title, value: dealForm.value ? +dealForm.value : null, currency: dealForm.currency, stage: dealForm.stage, notes: dealForm.notes }); + setDealForm({ title: '', value: '', currency: 'USD', stage: 'lead', notes: '' }); + setShowDeal(null); fetchDeals(); fetchStats(); if (detail) openDetail(detail.id); + }; + const deleteDeal = async (id: string) => { if (confirm(t('crm.deleteDealConfirm', 'Delete deal?'))) { await crmApi.deleteDeal(id); fetchDeals(); fetchStats(); if (detail) openDetail(detail.id); } }; + + const stageColor = (s: string) => STAGES.find(st => st.key === s)?.color || '#888'; + + const fieldLabels: Record = { + name: t('crm.fields.name', 'Name'), + company: t('crm.fields.company', 'Company'), + email: t('crm.fields.email', 'Email'), + phone: t('crm.fields.phone', 'Phone'), + country: t('crm.fields.country', 'Country'), + industry: t('crm.fields.industry', 'Industry'), + source: t('crm.fields.source', 'Source'), + notes: t('crm.fields.notes', 'Notes'), + }; + + const tableHeaders = [ + t('crm.fields.name', 'Name'), + t('crm.fields.company', 'Company'), + t('crm.fields.email', 'Email'), + t('crm.fields.phone', 'Phone'), + t('crm.fields.country', 'Country'), + t('crm.fields.source', 'Source'), + t('crm.fields.deals', 'Deals'), + '', + ]; + + return ( +
+ {/* Header */} +
+

{t('crm.title', 'CRM')}

+ {stats && {t('crm.stats', '{{contacts}} contacts / {{deals}} deals', { contacts: stats.contacts, deals: stats.deals })}} +
+
+ {(['pipeline', 'contacts'] as const).map(v => ( + + ))} +
+ +
+ +
+ {/* -- Pipeline View -- */} + {view === 'pipeline' && ( +
+ {STAGES.map(stage => { + const sd = deals.filter(d => d.stage === stage.key); + const tv = sd.reduce((s, d) => s + (d.value || 0), 0); + const isOver = dragOver === stage.key; + return ( +
onDragOver(e, stage.key)} + onDragLeave={onDragLeave} + onDrop={e => onDrop(e, stage.key)} + style={{ + minWidth: 210, flex: 1, display: 'flex', flexDirection: 'column', + background: isOver ? `${stage.color}10` : 'var(--bg-secondary)', + borderRadius: 8, border: isOver ? `2px dashed ${stage.color}` : '2px solid transparent', + transition: 'all 0.15s', + }}> +
+ {stage.label} + {sd.length}{tv > 0 ? ` \u00b7 $${(tv/1000).toFixed(0)}k` : ''} +
+
+ {sd.map(deal => ( +
onDragStart(e, deal.id)} + style={{ + padding: '10px 12px', borderRadius: 6, cursor: 'grab', + background: 'var(--bg-primary)', border: '1px solid var(--border)', + opacity: dragId === deal.id ? 0.4 : 1, transition: 'opacity 0.15s', + }}> +
{deal.title}
+
+ {deal.contact_name}{deal.contact_company ? ` \u00b7 ${deal.contact_company}` : ''} +
+ {deal.value != null && deal.value > 0 && ( +
{deal.currency} {deal.value.toLocaleString()}
+ )} +
+ +
+
+ ))} + {sd.length === 0 &&
{t('crm.dropHere', 'Drop here')}
} +
+
+ ); + })} +
+ )} + + {/* -- Contacts View -- */} + {view === 'contacts' && ( +
+
+ setSearch(e.target.value)} placeholder={t('crm.searchPlaceholder', 'Search name, company, email...')} + style={{ ...inp, maxWidth: 350 }} /> + {selected.size > 0 && ( + + )} +
+ {loading ?

{t('crm.loading', 'Loading...')}

: contacts.length === 0 ? ( +

{t('crm.noContacts', 'No contacts yet')}

+ ) : ( + + + + + {tableHeaders.map(h => ( + + ))} + + + + {contacts.map(c => ( + openDetail(c.id)}> + + + + + + + + + + + ))} + +
+ 0} onChange={selectAll} /> + {h}
e.stopPropagation()}> + toggleSelect(c.id)} /> + {c.name}{c.company || '-'}{c.email || '-'}{c.phone || '-'}{c.country || '-'}{c.source || '-'} + {(c.deals?.length || 0) > 0 ? {c.deals.length} : '-'} + e.stopPropagation()}> + +
+ )} + {total > contacts.length &&

{t('crm.showing', 'Showing {{shown}} of {{total}}', { shown: contacts.length, total })}

} +
+ )} +
+ + {/* -- Contact Detail Panel -- */} + {detail && ( +
setDetail(null)}> +
e.stopPropagation()}> +
+

{detail.name}

+
+ + + +
+
+
+ {[ + [t('crm.fields.company', 'Company'), detail.company], + [t('crm.fields.email', 'Email'), detail.email], + [t('crm.fields.phone', 'Phone'), detail.phone], + [t('crm.fields.country', 'Country'), detail.country], + [t('crm.fields.industry', 'Industry'), detail.industry], + [t('crm.fields.source', 'Source'), detail.source], + ].map(([l, v]) => ( +
{l as string}
{(v as string) || '-'}
+ ))} +
+ {detail.tags?.length > 0 &&
+ {detail.tags.map((tg: string) => {tg})} +
} + {detail.notes &&
{detail.notes}
} + + {/* Deals */} +
+
+

{t('crm.deals', 'Deals')}

+ +
+ {(detail.deals || []).map((d: any) => ( +
+
+
{d.title}
+
{d.currency} {d.value?.toLocaleString() || 0}
+
+
+ {d.stage} + +
+
+ ))} +
+ + {/* Activities */} +

{t('crm.activity', 'Activity')}

+ {(detail.activities || []).map((a: any) => ( +
+ {a.created_at?.slice(0, 10)} + {a.type} + {a.summary} +
+ ))} +
+
+ )} + + {/* -- Add Contact Modal -- */} + {showAdd && ( +
setShowAdd(false)}> +
e.stopPropagation()}> +

{t('crm.newContact', 'New Contact')}

+
+ {[{ k: 'name', l: t('crm.fields.nameRequired', 'Name *'), p: t('crm.fields.contactName', 'Contact name') }, + { k: 'company', l: t('crm.fields.company', 'Company'), p: t('crm.fields.company', 'Company') }, + { k: 'email', l: t('crm.fields.email', 'Email'), p: 'email@co.com' }, + { k: 'phone', l: t('crm.fields.phone', 'Phone'), p: '+1...' }, + { k: 'country', l: t('crm.fields.country', 'Country'), p: 'Germany' }, + { k: 'industry', l: t('crm.fields.industry', 'Industry'), p: 'LED Lighting' }, + { k: 'source', l: t('crm.fields.source', 'Source'), p: 'manual' }].map(f => ( +
+ + setForm(p => ({ ...p, [f.k]: e.target.value }))} placeholder={f.p} style={inp} /> +
+ ))} +
+ +