Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions example/runway/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//runway/controller",
"//runway/controller/merge",
"//runway/controller/mergeconflictcheck",
"//runway/extension/merger/noop",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_uber_go_tally//:tally",
"@org_golang_google_grpc//:grpc",
Expand Down
42 changes: 30 additions & 12 deletions example/runway/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/uber/submitqueue/runway/controller"
"github.com/uber/submitqueue/runway/controller/merge"
"github.com/uber/submitqueue/runway/controller/mergeconflictcheck"
"github.com/uber/submitqueue/runway/extension/merger/noop"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
Expand Down Expand Up @@ -152,21 +153,29 @@ func run() error {
),
)

noOpMerger := noop.New()

mergeConflictCheckController := mergeconflictcheck.NewController(mergeconflictcheck.Params{
Logger: logger.Sugar(),
Scope: scope,
TopicKey: runwaymq.TopicKeyMergeConflictCheck,
ConsumerGroup: "runway-mergeconflictcheck",
Logger: logger.Sugar(),
Scope: scope,
TopicKey: runwaymq.TopicKeyMergeConflictCheck,
SignalTopicKey: runwaymq.TopicKeyMergeConflictCheckSignal,
ConsumerGroup: "runway-mergeconflictcheck",
Merger: noOpMerger,
Registry: registry,
})
if err := primaryConsumer.Register(mergeConflictCheckController); err != nil {
return fmt.Errorf("failed to register merge-conflict-check controller: %w", err)
}

mergeController := merge.NewController(merge.Params{
Logger: logger.Sugar(),
Scope: scope,
TopicKey: runwaymq.TopicKeyMerge,
ConsumerGroup: "runway-merge",
Logger: logger.Sugar(),
Scope: scope,
TopicKey: runwaymq.TopicKeyMerge,
SignalTopicKey: runwaymq.TopicKeyMergeSignal,
ConsumerGroup: "runway-merge",
Merger: noOpMerger,
Registry: registry,
})
if err := primaryConsumer.Register(mergeController); err != nil {
return fmt.Errorf("failed to register merge controller: %w", err)
Expand Down Expand Up @@ -235,10 +244,9 @@ func run() error {
return err
}

// newTopicRegistry builds the TopicRegistry for Runway's consumed merge queues.
// Runway is the consumer of the merge-conflict-check and merge queues; each is
// registered with a consuming subscription. The corresponding signal queues
// (where results are published) are not wired yet.
// newTopicRegistry builds the TopicRegistry for Runway's merge queues. Runway
// consumes the merge-conflict-check and merge queues and publishes results to
// the corresponding signal queues.
func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRegistry, error) {
return consumer.NewTopicRegistry([]consumer.TopicConfig{
{
Expand All @@ -257,5 +265,15 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
subscriberName, "runway-merge",
),
},
{
Key: runwaymq.TopicKeyMergeConflictCheckSignal,
Name: "merge-conflict-check-signal",
Queue: q,
},
{
Key: runwaymq.TopicKeyMergeSignal,
Name: "merge-signal",
Queue: q,
},
})
}
15 changes: 15 additions & 0 deletions example/submitqueue/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,18 @@ services:
condition: service_healthy
mysql-queue:
condition: service_healthy

runway-service:
build:
context: ${REPO_ROOT}
dockerfile: example/runway/server/Dockerfile
ports:
- "8080" # Random ephemeral port to avoid conflicts
environment:
- PORT=:8080
# Queue infrastructure connection shared with SubmitQueue services.
- QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true
- HOSTNAME=runway-dev
depends_on:
mysql-queue:
condition: service_healthy
13 changes: 13 additions & 0 deletions runway/controller/internal/signal/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "signal",
srcs = ["result.go"],
importpath = "github.com/uber/submitqueue/runway/controller/internal/signal",
visibility = ["//visibility:public"],
deps = [
"//api/runway/messagequeue",
"//platform/base/messagequeue",
"//platform/consumer",
],
)
54 changes: 54 additions & 0 deletions runway/controller/internal/signal/result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package signal holds helpers for Runway result signal publication.
package signal

import (
"context"
"fmt"

runwaymq "github.com/uber/submitqueue/api/runway/messagequeue"
entityqueue "github.com/uber/submitqueue/platform/base/messagequeue"
"github.com/uber/submitqueue/platform/consumer"
)

// PublishMergeResult publishes a Runway MergeResult to a signal topic.
func PublishMergeResult(ctx context.Context, registry consumer.TopicRegistry, topicKey consumer.TopicKey, result *runwaymq.MergeResult, partitionKey string) error {
if result == nil {
return fmt.Errorf("merge result is required")
}

payload, err := runwaymq.Marshal(result)
if err != nil {
return fmt.Errorf("failed to serialize merge result: %w", err)
}

q, ok := registry.Queue(topicKey)
if !ok {
return fmt.Errorf("no queue registered for topic key %s", topicKey)
}

topicName, ok := registry.TopicName(topicKey)
if !ok {
return fmt.Errorf("no topic name registered for topic key %s", topicKey)
}

msg := entityqueue.NewMessage(result.Id, payload, partitionKey, nil)
if err := q.Publisher().Publish(ctx, topicName, msg); err != nil {
return fmt.Errorf("failed to publish message: %w", err)
}

return nil
}
7 changes: 7 additions & 0 deletions runway/controller/merge/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//api/runway/messagequeue",
"//api/runway/messagequeue/protopb",
"//platform/consumer",
"//platform/metrics",
"//runway/controller/internal/signal",
"//runway/extension/merger",
"@com_github_uber_go_tally//:tally",
"@org_uber_go_zap//:zap",
],
Expand All @@ -20,8 +23,12 @@ go_test(
embed = [":merge"],
deps = [
"//api/runway/messagequeue",
"//api/runway/messagequeue/protopb",
"//platform/base/messagequeue",
"//platform/consumer",
"//platform/extension/messagequeue/mock",
"//runway/extension/merger",
"//runway/extension/merger/mock",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_uber_go_tally//:tally",
Expand Down
63 changes: 46 additions & 17 deletions runway/controller/merge/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@
// request asks Runway to apply an ordered sequence of merge steps onto the target
// branch and commit the result.
//
// Currently a parse-and-log stub: it deserializes the MergeRequest off the queue
// and logs it. The real merge (apply and commit the steps, then publish a
// MergeResult with the produced revisions to the merge-signal queue) is not wired
// yet.
// It deserializes the MergeRequest off the queue, runs the configured merger,
// and publishes a MergeResult with produced revisions to the merge-signal queue.
package merge

import (
"context"
"errors"
"fmt"

"github.com/uber-go/tally"
runwaymq "github.com/uber/submitqueue/api/runway/messagequeue"
runwaypb "github.com/uber/submitqueue/api/runway/messagequeue/protopb"
"github.com/uber/submitqueue/platform/consumer"
"github.com/uber/submitqueue/platform/metrics"
"github.com/uber/submitqueue/runway/controller/internal/signal"
"github.com/uber/submitqueue/runway/extension/merger"
"go.uber.org/zap"
)

Expand All @@ -38,16 +40,22 @@ var _ consumer.Controller = (*Controller)(nil)

// Controller handles merge queue messages.
type Controller struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
topicKey consumer.TopicKey
consumerGroup string
logger *zap.SugaredLogger
metricsScope tally.Scope
merger merger.Merger
registry consumer.TopicRegistry
topicKey consumer.TopicKey
signalTopicKey consumer.TopicKey
consumerGroup string
}

// Params are the parameters for creating a new merge controller.
type Params struct {
TopicKey consumer.TopicKey
ConsumerGroup string
TopicKey consumer.TopicKey
SignalTopicKey consumer.TopicKey
ConsumerGroup string
Merger merger.Merger
Registry consumer.TopicRegistry

Scope tally.Scope
Logger *zap.SugaredLogger
Expand All @@ -56,10 +64,13 @@ type Params struct {
// NewController creates a new merge controller for the runway service.
func NewController(p Params) *Controller {
return &Controller{
logger: p.Logger.Named("merge_controller"),
metricsScope: p.Scope.SubScope("merge_controller"),
topicKey: p.TopicKey,
consumerGroup: p.ConsumerGroup,
logger: p.Logger.Named("merge_controller"),
metricsScope: p.Scope.SubScope("merge_controller"),
merger: p.Merger,
registry: p.Registry,
topicKey: p.TopicKey,
signalTopicKey: p.SignalTopicKey,
consumerGroup: p.ConsumerGroup,
}
}

Expand All @@ -80,9 +91,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
return fmt.Errorf("failed to deserialize merge request: %w", err)
}

// TODO: apply and commit the ordered merge steps and publish a MergeResult
// with the produced revisions to the merge-signal queue. For now the request
// is only logged after parsing.
c.logger.Infow("received merge request",
"id", request.Id,
"queue_name", request.QueueName,
Expand All @@ -91,6 +99,27 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
"partition_key", msg.PartitionKey,
)

if c.merger == nil {
return fmt.Errorf("merger is required")
}

result, err := c.merger.Merge(ctx, request)
if errors.Is(err, merger.ErrConflict) {
result = &runwaymq.MergeResult{
Id: request.Id,
Outcome: runwaypb.Outcome_FAILED,
Reason: err.Error(),
}
} else if err != nil {
metrics.NamedCounter(c.metricsScope, opName, "merger_errors", 1)
return fmt.Errorf("failed to merge request: %w", err)
}

if err := signal.PublishMergeResult(ctx, c.registry, c.signalTopicKey, result, msg.PartitionKey); err != nil {
metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1)
return fmt.Errorf("failed to publish merge result: %w", err)
}
Comment thread
albertywu marked this conversation as resolved.

return nil
}

Expand Down
Loading