Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions mcp_server/pkg/tools/internal/shared/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import (
"github.com/semaphoreio/semaphore/mcp_server/pkg/internalapi"
)

const readToolsFeatureFlag = "mcp_server_read_tools"
const (
readToolsFeatureFlag = "mcp_server_read_tools"
writeToolsFeatureFlag = "mcp_server_write_tools"
)

// EnsureReadToolsFeature verifies that the organization has the AI read tools feature enabled.
func EnsureReadToolsFeature(ctx context.Context, api internalapi.Provider, orgID string) error {
Expand All @@ -23,7 +26,26 @@ func EnsureReadToolsFeature(ctx context.Context, api internalapi.Provider, orgID
}

if state != feature.Enabled {
return fmt.Errorf("Semaphore MCP tools are disabled for this organization. Please contact support if you believe this is an error.")
return fmt.Errorf("Semaphore MCP read tools are disabled for this organization. Please contact support if you believe this is an error.")
}

return nil
}

// EnsureWriteToolsFeature verifies that the organization has the AI write tools feature enabled.
func EnsureWriteToolsFeature(ctx context.Context, api internalapi.Provider, orgID string) error {
featureClient := api.Features()
if featureClient == nil {
return fmt.Errorf("Semaphore MCP tools are temporarily unavailable. Please try again later.")
}

state, err := featureClient.FeatureState(orgID, writeToolsFeatureFlag)
if err != nil {
return fmt.Errorf("We couldn't verify access to Semaphore MCP tools right now. Please try again in a few moments.")
}

if state != feature.Enabled {
return fmt.Errorf("Semaphore MCP write tools are disabled for this organization. Please contact support if you believe this is an error.")
}

return nil
Expand Down
67 changes: 67 additions & 0 deletions mcp_server/pkg/tools/workflows/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package workflows

import "strings"

// summary represents workflow metadata returned by workflows_search.
type summary struct {
ID string `json:"id"`
InitialPipeline string `json:"initialPipelineId,omitempty"`
ProjectID string `json:"projectId,omitempty"`
OrganizationID string `json:"organizationId,omitempty"`
Branch string `json:"branch,omitempty"`
CommitSHA string `json:"commitSha,omitempty"`
RequesterID string `json:"requesterId,omitempty"`
TriggeredBy string `json:"triggeredBy,omitempty"`
CreatedAt string `json:"createdAt,omitempty"`
RerunOf string `json:"rerunOf,omitempty"`
RepositoryID string `json:"repositoryId,omitempty"`
}

type listResult struct {
Workflows []summary `json:"workflows"`
NextCursor string `json:"nextCursor,omitempty"`
}

type runResult struct {
WorkflowID string `json:"workflowId"`
PipelineID string `json:"pipelineId"`
Reference string `json:"reference"`
CommitSHA string `json:"commitSha,omitempty"`
PipelineFile string `json:"pipelineFile"`
}

type rerunResult struct {
WorkflowID string `json:"workflowId"`
PipelineID string `json:"pipelineId"`
RerunOf string `json:"rerunOf"`
ProjectID string `json:"projectId"`
OrgID string `json:"organizationId"`
}

func humanizeTriggeredBy(value string) string {
value = strings.TrimSpace(value)
if value == "" {
return "Unspecified"
}
parts := strings.Split(value, "_")
for i, part := range parts {
if part == "" {
continue
}
part = strings.ToLower(part)
parts[i] = strings.ToUpper(part[:1]) + part[1:]
}
return strings.Join(parts, " ")
}

func shortenCommit(sha string) string {
sha = strings.TrimSpace(sha)
if len(sha) > 12 {
return sha[:12]
}
return sha
}

func normalizeID(value string) string {
return strings.ToLower(strings.TrimSpace(value))
}
26 changes: 26 additions & 0 deletions mcp_server/pkg/tools/workflows/register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package workflows

import (
"github.com/mark3labs/mcp-go/server"

"github.com/semaphoreio/semaphore/mcp_server/pkg/internalapi"
)

const (
searchToolName = "workflows_search"
runToolName = "workflows_run"
rerunToolName = "workflows_rerun"
defaultLimit = 20
maxLimit = 100
missingWorkflowError = "workflow gRPC endpoint is not configured"
projectViewPermission = "project.view"
projectRunPermission = "project.job.rerun"
defaultPipelineFile = ".semaphore/semaphore.yml"
)

// Register wires workflow tooling into the MCP server.
func Register(s *server.MCPServer, api internalapi.Provider) {
s.AddTool(newSearchTool(searchToolName, searchFullDescription()), listHandler(api))
s.AddTool(newRunTool(runToolName, runFullDescription()), runHandler(api))
s.AddTool(newRerunTool(rerunToolName, rerunFullDescription()), rerunHandler(api))
}
195 changes: 195 additions & 0 deletions mcp_server/pkg/tools/workflows/rerun_tool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package workflows

import (
"context"
"fmt"
"strings"

"github.com/google/uuid"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
"github.com/sirupsen/logrus"

"github.com/semaphoreio/semaphore/mcp_server/pkg/authz"
workflowpb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/plumber_w_f.workflow"
"github.com/semaphoreio/semaphore/mcp_server/pkg/internalapi"
"github.com/semaphoreio/semaphore/mcp_server/pkg/logging"
"github.com/semaphoreio/semaphore/mcp_server/pkg/tools/internal/shared"
)

func rerunFullDescription() string {
return `Rerun an existing workflow.

Use this when you need to:
- Rerun a previously completed or failed workflow
- Restart a workflow without changing its parameters

Required inputs:
- workflow_id: ID of the workflow to rerun

The authenticated user must have permission to rerun workflows for the originating project.`
}

func newRerunTool(name, description string) mcp.Tool {
return mcp.NewTool(
name,
mcp.WithDescription(description),
mcp.WithString(
"workflow_id",
mcp.Required(),
mcp.Description("Workflow ID to rerun."),
),
mcp.WithIdempotentHintAnnotation(false),
)
}

func rerunHandler(api internalapi.Provider) server.ToolHandlerFunc {
return func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
workflowClient := api.Workflow()
if workflowClient == nil {
return mcp.NewToolResultError(missingWorkflowError), nil
}

workflowIDRaw, err := req.RequireString("workflow_id")
if err != nil {
return mcp.NewToolResultError(`Missing required argument: workflow_id. Provide the workflow ID to rerun.`), nil
}
workflowID, err := sanitizeWorkflowID(workflowIDRaw, "workflow_id")
if err != nil {
return mcp.NewToolResultError(err.Error()), nil
}

userID := strings.ToLower(strings.TrimSpace(req.Header.Get("X-Semaphore-User-ID")))
if err := shared.ValidateUUID(userID, "x-semaphore-user-id header"); err != nil {
return mcp.NewToolResultError(fmt.Sprintf(`%v

The authentication layer must inject the X-Semaphore-User-ID header so we can authorize workflow reruns.`, err)), nil
}

describeCtx, cancelDescribe := context.WithTimeout(ctx, api.CallTimeout())
defer cancelDescribe()

describeResp, err := workflowClient.Describe(describeCtx, &workflowpb.DescribeRequest{WfId: workflowID})
if err != nil {
logging.ForComponent("rpc").
WithFields(logrus.Fields{
"rpc": "workflow.Describe",
"wfId": workflowID,
}).
WithError(err).
Error("workflow describe RPC failed")
return mcp.NewToolResultError("Unable to load workflow details. Confirm the workflow exists and try again."), nil
}

if err := shared.CheckStatus(describeResp.GetStatus()); err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Unable to load workflow details: %v", err)), nil
}

workflow := describeResp.GetWorkflow()
if workflow == nil {
return mcp.NewToolResultError("Workflow details are missing from the response. Please retry."), nil
}

orgID := strings.TrimSpace(workflow.GetOrganizationId())
if err := shared.ValidateUUID(orgID, "workflow organization_id"); err != nil {
return mcp.NewToolResultError("Unable to determine workflow organization. Please try again later."), nil
}

projectID := strings.TrimSpace(workflow.GetProjectId())
if err := shared.ValidateUUID(projectID, "workflow project_id"); err != nil {
return mcp.NewToolResultError("Unable to determine workflow project. Please try again later."), nil
}

if err := shared.EnsureWriteToolsFeature(ctx, api, orgID); err != nil {
return mcp.NewToolResultError(err.Error()), nil
}

tracker := shared.TrackToolExecution(ctx, rerunToolName, orgID)
defer tracker.Cleanup()

if err := authz.CheckProjectPermission(ctx, api, userID, orgID, projectID, projectRunPermission); err != nil {
return shared.ProjectAuthorizationError(err, orgID, projectID, projectRunPermission), nil
}

rescheduleCtx, cancelReschedule := context.WithTimeout(ctx, api.CallTimeout())
defer cancelReschedule()

requestToken := uuid.NewString()

rescheduleReq := &workflowpb.RescheduleRequest{
WfId: workflowID,
RequesterId: userID,
RequestToken: requestToken,
}

rescheduleResp, err := workflowClient.Reschedule(rescheduleCtx, rescheduleReq)
if err != nil {
logging.ForComponent("rpc").
WithFields(logrus.Fields{
"rpc": "workflow.Reschedule",
"wfId": workflowID,
"projectId": projectID,
"orgId": orgID,
}).
WithError(err).
Error("workflow reschedule RPC failed")
return mcp.NewToolResultError("Workflow rerun failed. Confirm the workflow exists and try again."), nil
}

if err := shared.CheckStatus(rescheduleResp.GetStatus()); err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Workflow rerun failed: %v", err)), nil
}

result := rerunResult{
WorkflowID: strings.TrimSpace(rescheduleResp.GetWfId()),
PipelineID: strings.TrimSpace(rescheduleResp.GetPplId()),
RerunOf: workflowID,
ProjectID: projectID,
OrgID: orgID,
}

markdown := formatRerunMarkdown(result)
markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars)

tracker.MarkSuccess()
return &mcp.CallToolResult{
Content: []mcp.Content{mcp.NewTextContent(markdown)},
StructuredContent: result,
}, nil
}
}

func sanitizeWorkflowID(raw, field string) (string, error) {
value := strings.TrimSpace(raw)
if value == "" {
return "", fmt.Errorf("%s is required", field)
}
if strings.ContainsAny(value, " \t\r\n") {
return "", fmt.Errorf("%s must not contain whitespace", field)
}
if len(value) > 128 {
return "", fmt.Errorf("%s must not exceed 128 characters", field)
}
return value, nil
}

func formatRerunMarkdown(result rerunResult) string {
mb := shared.NewMarkdownBuilder()
mb.H1("Workflow Rerun Scheduled")
if result.WorkflowID != "" {
mb.KeyValue("Workflow ID", fmt.Sprintf("`%s`", result.WorkflowID))
}
if result.PipelineID != "" {
mb.KeyValue("Initial Pipeline", fmt.Sprintf("`%s`", result.PipelineID))
}
if result.RerunOf != "" {
mb.KeyValue("Rerun Of", fmt.Sprintf("`%s`", result.RerunOf))
}
if result.ProjectID != "" {
mb.KeyValue("Project ID", fmt.Sprintf("`%s`", result.ProjectID))
}
if result.OrgID != "" {
mb.KeyValue("Organization ID", fmt.Sprintf("`%s`", result.OrgID))
}
return mb.String()
}
Loading