Skip to content

Commit 05b2480

Browse files
feat(mcp_server): add workflow run and rerun tools (#728)
## 📝 Description - Added dedicated MCP workflow tooling: extracted shared helpers, introduced workflows.Register, and split workflows_search into its own file with refreshed tests/fixtures. - Implemented two new MCP tools: workflows_run for scheduling fresh workflow executions (with branch/ref validation, parameter handling, repo lookups, feature-gate + permission checks) and workflows_rerun for rescheduling existing workflows with idempotent request tokens. - Expanded test coverage for both tools plus shared helpers; added rich workflow/project/RBAC/user stubs so workflow tool tests can exercise RPC flows deterministically. - Updated feature-gating utilities to differentiate read vs write MCP tooling flags, ensuring rerun/run flows are guarded by the new write flag. ## ✅ Checklist - [x] I have tested this change - [x] This change requires documentation update --------- Co-authored-by: Amir Hasanbasic <43892661+hamir-suspect@users.noreply.github.com>
1 parent 6332755 commit 05b2480

File tree

11 files changed

+1945
-374
lines changed

11 files changed

+1945
-374
lines changed

mcp_server/pkg/tools/internal/shared/features.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ import (
88
"github.com/semaphoreio/semaphore/mcp_server/pkg/internalapi"
99
)
1010

11-
const readToolsFeatureFlag = "mcp_server_read_tools"
11+
const (
12+
readToolsFeatureFlag = "mcp_server_read_tools"
13+
writeToolsFeatureFlag = "mcp_server_write_tools"
14+
)
1215

1316
// EnsureReadToolsFeature verifies that the organization has the AI read tools feature enabled.
1417
func EnsureReadToolsFeature(ctx context.Context, api internalapi.Provider, orgID string) error {
@@ -23,7 +26,26 @@ func EnsureReadToolsFeature(ctx context.Context, api internalapi.Provider, orgID
2326
}
2427

2528
if state != feature.Enabled {
26-
return fmt.Errorf("Semaphore MCP tools are disabled for this organization. Please contact support if you believe this is an error.")
29+
return fmt.Errorf("Semaphore MCP read tools are disabled for this organization. Please contact support if you believe this is an error.")
30+
}
31+
32+
return nil
33+
}
34+
35+
// EnsureWriteToolsFeature verifies that the organization has the AI write tools feature enabled.
36+
func EnsureWriteToolsFeature(ctx context.Context, api internalapi.Provider, orgID string) error {
37+
featureClient := api.Features()
38+
if featureClient == nil {
39+
return fmt.Errorf("Semaphore MCP tools are temporarily unavailable. Please try again later.")
40+
}
41+
42+
state, err := featureClient.FeatureState(orgID, writeToolsFeatureFlag)
43+
if err != nil {
44+
return fmt.Errorf("We couldn't verify access to Semaphore MCP tools right now. Please try again in a few moments.")
45+
}
46+
47+
if state != feature.Enabled {
48+
return fmt.Errorf("Semaphore MCP write tools are disabled for this organization. Please contact support if you believe this is an error.")
2749
}
2850

2951
return nil
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package workflows
2+
3+
import "strings"
4+
5+
// summary represents workflow metadata returned by workflows_search.
6+
type summary struct {
7+
ID string `json:"id"`
8+
InitialPipeline string `json:"initialPipelineId,omitempty"`
9+
ProjectID string `json:"projectId,omitempty"`
10+
OrganizationID string `json:"organizationId,omitempty"`
11+
Branch string `json:"branch,omitempty"`
12+
CommitSHA string `json:"commitSha,omitempty"`
13+
RequesterID string `json:"requesterId,omitempty"`
14+
TriggeredBy string `json:"triggeredBy,omitempty"`
15+
CreatedAt string `json:"createdAt,omitempty"`
16+
RerunOf string `json:"rerunOf,omitempty"`
17+
RepositoryID string `json:"repositoryId,omitempty"`
18+
}
19+
20+
type listResult struct {
21+
Workflows []summary `json:"workflows"`
22+
NextCursor string `json:"nextCursor,omitempty"`
23+
}
24+
25+
type runResult struct {
26+
WorkflowID string `json:"workflowId"`
27+
PipelineID string `json:"pipelineId"`
28+
Reference string `json:"reference"`
29+
CommitSHA string `json:"commitSha,omitempty"`
30+
PipelineFile string `json:"pipelineFile"`
31+
}
32+
33+
type rerunResult struct {
34+
WorkflowID string `json:"workflowId"`
35+
PipelineID string `json:"pipelineId"`
36+
RerunOf string `json:"rerunOf"`
37+
ProjectID string `json:"projectId"`
38+
OrgID string `json:"organizationId"`
39+
}
40+
41+
func humanizeTriggeredBy(value string) string {
42+
value = strings.TrimSpace(value)
43+
if value == "" {
44+
return "Unspecified"
45+
}
46+
parts := strings.Split(value, "_")
47+
for i, part := range parts {
48+
if part == "" {
49+
continue
50+
}
51+
part = strings.ToLower(part)
52+
parts[i] = strings.ToUpper(part[:1]) + part[1:]
53+
}
54+
return strings.Join(parts, " ")
55+
}
56+
57+
func shortenCommit(sha string) string {
58+
sha = strings.TrimSpace(sha)
59+
if len(sha) > 12 {
60+
return sha[:12]
61+
}
62+
return sha
63+
}
64+
65+
func normalizeID(value string) string {
66+
return strings.ToLower(strings.TrimSpace(value))
67+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package workflows
2+
3+
import (
4+
"github.com/mark3labs/mcp-go/server"
5+
6+
"github.com/semaphoreio/semaphore/mcp_server/pkg/internalapi"
7+
)
8+
9+
const (
10+
searchToolName = "workflows_search"
11+
runToolName = "workflows_run"
12+
rerunToolName = "workflows_rerun"
13+
defaultLimit = 20
14+
maxLimit = 100
15+
missingWorkflowError = "workflow gRPC endpoint is not configured"
16+
projectViewPermission = "project.view"
17+
projectRunPermission = "project.job.rerun"
18+
defaultPipelineFile = ".semaphore/semaphore.yml"
19+
)
20+
21+
// Register wires workflow tooling into the MCP server.
22+
func Register(s *server.MCPServer, api internalapi.Provider) {
23+
s.AddTool(newSearchTool(searchToolName, searchFullDescription()), listHandler(api))
24+
s.AddTool(newRunTool(runToolName, runFullDescription()), runHandler(api))
25+
s.AddTool(newRerunTool(rerunToolName, rerunFullDescription()), rerunHandler(api))
26+
}
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
package workflows
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
8+
"github.com/google/uuid"
9+
"github.com/mark3labs/mcp-go/mcp"
10+
"github.com/mark3labs/mcp-go/server"
11+
"github.com/sirupsen/logrus"
12+
13+
"github.com/semaphoreio/semaphore/mcp_server/pkg/authz"
14+
workflowpb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/plumber_w_f.workflow"
15+
"github.com/semaphoreio/semaphore/mcp_server/pkg/internalapi"
16+
"github.com/semaphoreio/semaphore/mcp_server/pkg/logging"
17+
"github.com/semaphoreio/semaphore/mcp_server/pkg/tools/internal/shared"
18+
)
19+
20+
func rerunFullDescription() string {
21+
return `Rerun an existing workflow.
22+
23+
Use this when you need to:
24+
- Rerun a previously completed or failed workflow
25+
- Restart a workflow without changing its parameters
26+
27+
Required inputs:
28+
- workflow_id: ID of the workflow to rerun
29+
30+
The authenticated user must have permission to rerun workflows for the originating project.`
31+
}
32+
33+
func newRerunTool(name, description string) mcp.Tool {
34+
return mcp.NewTool(
35+
name,
36+
mcp.WithDescription(description),
37+
mcp.WithString(
38+
"workflow_id",
39+
mcp.Required(),
40+
mcp.Description("Workflow ID to rerun."),
41+
),
42+
mcp.WithIdempotentHintAnnotation(false),
43+
)
44+
}
45+
46+
func rerunHandler(api internalapi.Provider) server.ToolHandlerFunc {
47+
return func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
48+
workflowClient := api.Workflow()
49+
if workflowClient == nil {
50+
return mcp.NewToolResultError(missingWorkflowError), nil
51+
}
52+
53+
workflowIDRaw, err := req.RequireString("workflow_id")
54+
if err != nil {
55+
return mcp.NewToolResultError(`Missing required argument: workflow_id. Provide the workflow ID to rerun.`), nil
56+
}
57+
workflowID, err := sanitizeWorkflowID(workflowIDRaw, "workflow_id")
58+
if err != nil {
59+
return mcp.NewToolResultError(err.Error()), nil
60+
}
61+
62+
userID := strings.ToLower(strings.TrimSpace(req.Header.Get("X-Semaphore-User-ID")))
63+
if err := shared.ValidateUUID(userID, "x-semaphore-user-id header"); err != nil {
64+
return mcp.NewToolResultError(fmt.Sprintf(`%v
65+
66+
The authentication layer must inject the X-Semaphore-User-ID header so we can authorize workflow reruns.`, err)), nil
67+
}
68+
69+
describeCtx, cancelDescribe := context.WithTimeout(ctx, api.CallTimeout())
70+
defer cancelDescribe()
71+
72+
describeResp, err := workflowClient.Describe(describeCtx, &workflowpb.DescribeRequest{WfId: workflowID})
73+
if err != nil {
74+
logging.ForComponent("rpc").
75+
WithFields(logrus.Fields{
76+
"rpc": "workflow.Describe",
77+
"wfId": workflowID,
78+
}).
79+
WithError(err).
80+
Error("workflow describe RPC failed")
81+
return mcp.NewToolResultError("Unable to load workflow details. Confirm the workflow exists and try again."), nil
82+
}
83+
84+
if err := shared.CheckStatus(describeResp.GetStatus()); err != nil {
85+
return mcp.NewToolResultError(fmt.Sprintf("Unable to load workflow details: %v", err)), nil
86+
}
87+
88+
workflow := describeResp.GetWorkflow()
89+
if workflow == nil {
90+
return mcp.NewToolResultError("Workflow details are missing from the response. Please retry."), nil
91+
}
92+
93+
orgID := strings.TrimSpace(workflow.GetOrganizationId())
94+
if err := shared.ValidateUUID(orgID, "workflow organization_id"); err != nil {
95+
return mcp.NewToolResultError("Unable to determine workflow organization. Please try again later."), nil
96+
}
97+
98+
projectID := strings.TrimSpace(workflow.GetProjectId())
99+
if err := shared.ValidateUUID(projectID, "workflow project_id"); err != nil {
100+
return mcp.NewToolResultError("Unable to determine workflow project. Please try again later."), nil
101+
}
102+
103+
if err := shared.EnsureWriteToolsFeature(ctx, api, orgID); err != nil {
104+
return mcp.NewToolResultError(err.Error()), nil
105+
}
106+
107+
tracker := shared.TrackToolExecution(ctx, rerunToolName, orgID)
108+
defer tracker.Cleanup()
109+
110+
if err := authz.CheckProjectPermission(ctx, api, userID, orgID, projectID, projectRunPermission); err != nil {
111+
return shared.ProjectAuthorizationError(err, orgID, projectID, projectRunPermission), nil
112+
}
113+
114+
rescheduleCtx, cancelReschedule := context.WithTimeout(ctx, api.CallTimeout())
115+
defer cancelReschedule()
116+
117+
requestToken := uuid.NewString()
118+
119+
rescheduleReq := &workflowpb.RescheduleRequest{
120+
WfId: workflowID,
121+
RequesterId: userID,
122+
RequestToken: requestToken,
123+
}
124+
125+
rescheduleResp, err := workflowClient.Reschedule(rescheduleCtx, rescheduleReq)
126+
if err != nil {
127+
logging.ForComponent("rpc").
128+
WithFields(logrus.Fields{
129+
"rpc": "workflow.Reschedule",
130+
"wfId": workflowID,
131+
"projectId": projectID,
132+
"orgId": orgID,
133+
}).
134+
WithError(err).
135+
Error("workflow reschedule RPC failed")
136+
return mcp.NewToolResultError("Workflow rerun failed. Confirm the workflow exists and try again."), nil
137+
}
138+
139+
if err := shared.CheckStatus(rescheduleResp.GetStatus()); err != nil {
140+
return mcp.NewToolResultError(fmt.Sprintf("Workflow rerun failed: %v", err)), nil
141+
}
142+
143+
result := rerunResult{
144+
WorkflowID: strings.TrimSpace(rescheduleResp.GetWfId()),
145+
PipelineID: strings.TrimSpace(rescheduleResp.GetPplId()),
146+
RerunOf: workflowID,
147+
ProjectID: projectID,
148+
OrgID: orgID,
149+
}
150+
151+
markdown := formatRerunMarkdown(result)
152+
markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars)
153+
154+
tracker.MarkSuccess()
155+
return &mcp.CallToolResult{
156+
Content: []mcp.Content{mcp.NewTextContent(markdown)},
157+
StructuredContent: result,
158+
}, nil
159+
}
160+
}
161+
162+
func sanitizeWorkflowID(raw, field string) (string, error) {
163+
value := strings.TrimSpace(raw)
164+
if value == "" {
165+
return "", fmt.Errorf("%s is required", field)
166+
}
167+
if strings.ContainsAny(value, " \t\r\n") {
168+
return "", fmt.Errorf("%s must not contain whitespace", field)
169+
}
170+
if len(value) > 128 {
171+
return "", fmt.Errorf("%s must not exceed 128 characters", field)
172+
}
173+
return value, nil
174+
}
175+
176+
func formatRerunMarkdown(result rerunResult) string {
177+
mb := shared.NewMarkdownBuilder()
178+
mb.H1("Workflow Rerun Scheduled")
179+
if result.WorkflowID != "" {
180+
mb.KeyValue("Workflow ID", fmt.Sprintf("`%s`", result.WorkflowID))
181+
}
182+
if result.PipelineID != "" {
183+
mb.KeyValue("Initial Pipeline", fmt.Sprintf("`%s`", result.PipelineID))
184+
}
185+
if result.RerunOf != "" {
186+
mb.KeyValue("Rerun Of", fmt.Sprintf("`%s`", result.RerunOf))
187+
}
188+
if result.ProjectID != "" {
189+
mb.KeyValue("Project ID", fmt.Sprintf("`%s`", result.ProjectID))
190+
}
191+
if result.OrgID != "" {
192+
mb.KeyValue("Organization ID", fmt.Sprintf("`%s`", result.OrgID))
193+
}
194+
return mb.String()
195+
}

0 commit comments

Comments
 (0)