From 5756aeb463b19cc8e71f6365ef6c1ff2f8f9465c Mon Sep 17 00:00:00 2001 From: Albert Wu Date: Wed, 1 Jul 2026 14:31:27 -0700 Subject: [PATCH] test(e2e): land submitqueue requests through runway --- example/runway/server/BUILD.bazel | 1 + example/runway/server/main.go | 42 ++++-- example/submitqueue/docker-compose.yml | 15 ++ runway/controller/internal/signal/BUILD.bazel | 13 ++ runway/controller/internal/signal/result.go | 54 +++++++ runway/controller/merge/BUILD.bazel | 7 + runway/controller/merge/merge.go | 63 ++++++--- runway/controller/merge/merge_test.go | 132 ++++++++++++++++-- .../controller/mergeconflictcheck/BUILD.bazel | 7 + .../mergeconflictcheck/mergeconflictcheck.go | 63 ++++++--- .../mergeconflictcheck_test.go | 128 +++++++++++++++-- test/e2e/submitqueue/suite_test.go | 55 ++++++-- 12 files changed, 505 insertions(+), 75 deletions(-) create mode 100644 runway/controller/internal/signal/BUILD.bazel create mode 100644 runway/controller/internal/signal/result.go diff --git a/example/runway/server/BUILD.bazel b/example/runway/server/BUILD.bazel index e8bb1200..e85f27a1 100644 --- a/example/runway/server/BUILD.bazel +++ b/example/runway/server/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//runway/controller", "//runway/controller/merge", "//runway/controller/mergeconflictcheck", + "//runway/extension/merger/noop", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_uber_go_tally//:tally", "@org_golang_google_grpc//:grpc", diff --git a/example/runway/server/main.go b/example/runway/server/main.go index e5b4d485..e47253a8 100644 --- a/example/runway/server/main.go +++ b/example/runway/server/main.go @@ -39,6 +39,7 @@ import ( "github.com/uber/submitqueue/runway/controller" "github.com/uber/submitqueue/runway/controller/merge" "github.com/uber/submitqueue/runway/controller/mergeconflictcheck" + "github.com/uber/submitqueue/runway/extension/merger/noop" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/reflection" @@ -152,21 +153,29 @@ func run() error { ), ) + noOpMerger := noop.New() + mergeConflictCheckController := mergeconflictcheck.NewController(mergeconflictcheck.Params{ - Logger: logger.Sugar(), - Scope: scope, - TopicKey: runwaymq.TopicKeyMergeConflictCheck, - ConsumerGroup: "runway-mergeconflictcheck", + Logger: logger.Sugar(), + Scope: scope, + TopicKey: runwaymq.TopicKeyMergeConflictCheck, + SignalTopicKey: runwaymq.TopicKeyMergeConflictCheckSignal, + ConsumerGroup: "runway-mergeconflictcheck", + Merger: noOpMerger, + Registry: registry, }) if err := primaryConsumer.Register(mergeConflictCheckController); err != nil { return fmt.Errorf("failed to register merge-conflict-check controller: %w", err) } mergeController := merge.NewController(merge.Params{ - Logger: logger.Sugar(), - Scope: scope, - TopicKey: runwaymq.TopicKeyMerge, - ConsumerGroup: "runway-merge", + Logger: logger.Sugar(), + Scope: scope, + TopicKey: runwaymq.TopicKeyMerge, + SignalTopicKey: runwaymq.TopicKeyMergeSignal, + ConsumerGroup: "runway-merge", + Merger: noOpMerger, + Registry: registry, }) if err := primaryConsumer.Register(mergeController); err != nil { return fmt.Errorf("failed to register merge controller: %w", err) @@ -235,10 +244,9 @@ func run() error { return err } -// newTopicRegistry builds the TopicRegistry for Runway's consumed merge queues. -// Runway is the consumer of the merge-conflict-check and merge queues; each is -// registered with a consuming subscription. The corresponding signal queues -// (where results are published) are not wired yet. +// newTopicRegistry builds the TopicRegistry for Runway's merge queues. Runway +// consumes the merge-conflict-check and merge queues and publishes results to +// the corresponding signal queues. func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRegistry, error) { return consumer.NewTopicRegistry([]consumer.TopicConfig{ { @@ -257,5 +265,15 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe subscriberName, "runway-merge", ), }, + { + Key: runwaymq.TopicKeyMergeConflictCheckSignal, + Name: "merge-conflict-check-signal", + Queue: q, + }, + { + Key: runwaymq.TopicKeyMergeSignal, + Name: "merge-signal", + Queue: q, + }, }) } diff --git a/example/submitqueue/docker-compose.yml b/example/submitqueue/docker-compose.yml index 1b3ed04b..97bc95e5 100644 --- a/example/submitqueue/docker-compose.yml +++ b/example/submitqueue/docker-compose.yml @@ -81,3 +81,18 @@ services: condition: service_healthy mysql-queue: condition: service_healthy + + runway-service: + build: + context: ${REPO_ROOT} + dockerfile: example/runway/server/Dockerfile + ports: + - "8080" # Random ephemeral port to avoid conflicts + environment: + - PORT=:8080 + # Queue infrastructure connection shared with SubmitQueue services. + - QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true + - HOSTNAME=runway-dev + depends_on: + mysql-queue: + condition: service_healthy diff --git a/runway/controller/internal/signal/BUILD.bazel b/runway/controller/internal/signal/BUILD.bazel new file mode 100644 index 00000000..c61cabd1 --- /dev/null +++ b/runway/controller/internal/signal/BUILD.bazel @@ -0,0 +1,13 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "signal", + srcs = ["result.go"], + importpath = "github.com/uber/submitqueue/runway/controller/internal/signal", + visibility = ["//visibility:public"], + deps = [ + "//api/runway/messagequeue", + "//platform/base/messagequeue", + "//platform/consumer", + ], +) diff --git a/runway/controller/internal/signal/result.go b/runway/controller/internal/signal/result.go new file mode 100644 index 00000000..32750541 --- /dev/null +++ b/runway/controller/internal/signal/result.go @@ -0,0 +1,54 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package signal holds helpers for Runway result signal publication. +package signal + +import ( + "context" + "fmt" + + runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" +) + +// PublishMergeResult publishes a Runway MergeResult to a signal topic. +func PublishMergeResult(ctx context.Context, registry consumer.TopicRegistry, topicKey consumer.TopicKey, result *runwaymq.MergeResult, partitionKey string) error { + if result == nil { + return fmt.Errorf("merge result is required") + } + + payload, err := runwaymq.Marshal(result) + if err != nil { + return fmt.Errorf("failed to serialize merge result: %w", err) + } + + q, ok := registry.Queue(topicKey) + if !ok { + return fmt.Errorf("no queue registered for topic key %s", topicKey) + } + + topicName, ok := registry.TopicName(topicKey) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", topicKey) + } + + msg := entityqueue.NewMessage(result.Id, payload, partitionKey, nil) + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} diff --git a/runway/controller/merge/BUILD.bazel b/runway/controller/merge/BUILD.bazel index 3053e8f7..aa002e62 100644 --- a/runway/controller/merge/BUILD.bazel +++ b/runway/controller/merge/BUILD.bazel @@ -7,8 +7,11 @@ go_library( visibility = ["//visibility:public"], deps = [ "//api/runway/messagequeue", + "//api/runway/messagequeue/protopb", "//platform/consumer", "//platform/metrics", + "//runway/controller/internal/signal", + "//runway/extension/merger", "@com_github_uber_go_tally//:tally", "@org_uber_go_zap//:zap", ], @@ -20,8 +23,12 @@ go_test( embed = [":merge"], deps = [ "//api/runway/messagequeue", + "//api/runway/messagequeue/protopb", "//platform/base/messagequeue", + "//platform/consumer", "//platform/extension/messagequeue/mock", + "//runway/extension/merger", + "//runway/extension/merger/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally//:tally", diff --git a/runway/controller/merge/merge.go b/runway/controller/merge/merge.go index d3f889ae..59bc56e3 100644 --- a/runway/controller/merge/merge.go +++ b/runway/controller/merge/merge.go @@ -16,20 +16,22 @@ // request asks Runway to apply an ordered sequence of merge steps onto the target // branch and commit the result. // -// Currently a parse-and-log stub: it deserializes the MergeRequest off the queue -// and logs it. The real merge (apply and commit the steps, then publish a -// MergeResult with the produced revisions to the merge-signal queue) is not wired -// yet. +// It deserializes the MergeRequest off the queue, runs the configured merger, +// and publishes a MergeResult with produced revisions to the merge-signal queue. package merge import ( "context" + "errors" "fmt" "github.com/uber-go/tally" runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + runwaypb "github.com/uber/submitqueue/api/runway/messagequeue/protopb" "github.com/uber/submitqueue/platform/consumer" "github.com/uber/submitqueue/platform/metrics" + "github.com/uber/submitqueue/runway/controller/internal/signal" + "github.com/uber/submitqueue/runway/extension/merger" "go.uber.org/zap" ) @@ -38,16 +40,22 @@ var _ consumer.Controller = (*Controller)(nil) // Controller handles merge queue messages. type Controller struct { - logger *zap.SugaredLogger - metricsScope tally.Scope - topicKey consumer.TopicKey - consumerGroup string + logger *zap.SugaredLogger + metricsScope tally.Scope + merger merger.Merger + registry consumer.TopicRegistry + topicKey consumer.TopicKey + signalTopicKey consumer.TopicKey + consumerGroup string } // Params are the parameters for creating a new merge controller. type Params struct { - TopicKey consumer.TopicKey - ConsumerGroup string + TopicKey consumer.TopicKey + SignalTopicKey consumer.TopicKey + ConsumerGroup string + Merger merger.Merger + Registry consumer.TopicRegistry Scope tally.Scope Logger *zap.SugaredLogger @@ -56,10 +64,13 @@ type Params struct { // NewController creates a new merge controller for the runway service. func NewController(p Params) *Controller { return &Controller{ - logger: p.Logger.Named("merge_controller"), - metricsScope: p.Scope.SubScope("merge_controller"), - topicKey: p.TopicKey, - consumerGroup: p.ConsumerGroup, + logger: p.Logger.Named("merge_controller"), + metricsScope: p.Scope.SubScope("merge_controller"), + merger: p.Merger, + registry: p.Registry, + topicKey: p.TopicKey, + signalTopicKey: p.SignalTopicKey, + consumerGroup: p.ConsumerGroup, } } @@ -80,9 +91,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return fmt.Errorf("failed to deserialize merge request: %w", err) } - // TODO: apply and commit the ordered merge steps and publish a MergeResult - // with the produced revisions to the merge-signal queue. For now the request - // is only logged after parsing. c.logger.Infow("received merge request", "id", request.Id, "queue_name", request.QueueName, @@ -91,6 +99,27 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r "partition_key", msg.PartitionKey, ) + if c.merger == nil { + return fmt.Errorf("merger is required") + } + + result, err := c.merger.Merge(ctx, request) + if errors.Is(err, merger.ErrConflict) { + result = &runwaymq.MergeResult{ + Id: request.Id, + Outcome: runwaypb.Outcome_FAILED, + Reason: err.Error(), + } + } else if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "merger_errors", 1) + return fmt.Errorf("failed to merge request: %w", err) + } + + if err := signal.PublishMergeResult(ctx, c.registry, c.signalTopicKey, result, msg.PartitionKey); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) + return fmt.Errorf("failed to publish merge result: %w", err) + } + return nil } diff --git a/runway/controller/merge/merge_test.go b/runway/controller/merge/merge_test.go index c74135eb..b2acaab5 100644 --- a/runway/controller/merge/merge_test.go +++ b/runway/controller/merge/merge_test.go @@ -16,14 +16,19 @@ package merge import ( "context" + "errors" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + runwaypb "github.com/uber/submitqueue/api/runway/messagequeue/protopb" entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" queuemock "github.com/uber/submitqueue/platform/extension/messagequeue/mock" + "github.com/uber/submitqueue/runway/extension/merger" + mergermock "github.com/uber/submitqueue/runway/extension/merger/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) @@ -34,14 +39,25 @@ const ( testPartitionKey = "test-queue" ) -func newController(t *testing.T) *Controller { +func newController(t *testing.T, ctrl *gomock.Controller) (*Controller, *mergermock.MockMerger, *queuemock.MockQueue, *queuemock.MockPublisher) { t.Helper() - return NewController(Params{ - Logger: zaptest.NewLogger(t).Sugar(), - Scope: tally.NoopScope, - TopicKey: runwaymq.TopicKeyMerge, - ConsumerGroup: "runway-merge", + q := queuemock.NewMockQueue(ctrl) + publisher := queuemock.NewMockPublisher(ctrl) + mockMerger := mergermock.NewMockMerger(ctrl) + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: runwaymq.TopicKeyMergeSignal, Name: "merge-signal", Queue: q}, }) + require.NoError(t, err) + + return NewController(Params{ + Logger: zaptest.NewLogger(t).Sugar(), + Scope: tally.NoopScope, + TopicKey: runwaymq.TopicKeyMerge, + SignalTopicKey: runwaymq.TopicKeyMergeSignal, + ConsumerGroup: "runway-merge", + Merger: mockMerger, + Registry: registry, + }), mockMerger, q, publisher } func newDelivery(t *testing.T, ctrl *gomock.Controller, payload []byte) *queuemock.MockDelivery { @@ -61,16 +77,17 @@ func requestPayload(t *testing.T, req runwaymq.MergeRequest) []byte { } func TestNewController(t *testing.T) { - controller := newController(t) + ctrl := gomock.NewController(t) + controller, _, _, _ := newController(t, ctrl) require.NotNil(t, controller) assert.Equal(t, runwaymq.TopicKeyMerge, controller.TopicKey()) assert.Equal(t, "runway-merge", controller.ConsumerGroup()) assert.Equal(t, "merge", controller.Name()) } -func TestProcess_LogsParsedRequest(t *testing.T) { +func TestProcess_PublishesMergeSignal(t *testing.T) { ctrl := gomock.NewController(t) - controller := newController(t) + controller, mockMerger, q, publisher := newController(t, ctrl) req := runwaymq.MergeRequest{ Id: testID, @@ -79,14 +96,109 @@ func TestProcess_LogsParsedRequest(t *testing.T) { } delivery := newDelivery(t, ctrl, requestPayload(t, req)) + mockMerger.EXPECT(). + Merge(gomock.Any(), gomock.Any()). + Return(&runwaymq.MergeResult{ + Id: testID, + Outcome: runwaypb.Outcome_SUCCEEDED, + Steps: []*runwaymq.StepResult{{ + StepId: "step-1", + Outputs: []*runwaymq.StepOutput{{Id: "fake-output"}}, + }}, + }, nil) + expectPublish(t, q, publisher, "merge-signal", func(result *runwaymq.MergeResult) { + assert.Equal(t, testID, result.Id) + assert.Equal(t, runwaypb.Outcome_SUCCEEDED, result.Outcome) + require.Len(t, result.Steps, 1) + assert.Equal(t, "step-1", result.Steps[0].StepId) + require.Len(t, result.Steps[0].Outputs, 1) + assert.Equal(t, "fake-output", result.Steps[0].Outputs[0].Id) + }, nil) + require.NoError(t, controller.Process(context.Background(), delivery)) } +func TestProcess_PublishesFailedSignalOnConflict(t *testing.T) { + ctrl := gomock.NewController(t) + controller, mockMerger, q, publisher := newController(t, ctrl) + + req := runwaymq.MergeRequest{ + Id: testID, + QueueName: testQueue, + Steps: []*runwaymq.MergeStep{{StepId: "step-1"}}, + } + delivery := newDelivery(t, ctrl, requestPayload(t, req)) + + mockMerger.EXPECT().Merge(gomock.Any(), gomock.Any()).Return(nil, merger.ErrConflict) + expectPublish(t, q, publisher, "merge-signal", func(result *runwaymq.MergeResult) { + assert.Equal(t, testID, result.Id) + assert.Equal(t, runwaypb.Outcome_FAILED, result.Outcome) + assert.Equal(t, merger.ErrConflict.Error(), result.Reason) + }, nil) + + require.NoError(t, controller.Process(context.Background(), delivery)) +} + +func TestProcess_ReturnsErrorOnMergeError(t *testing.T) { + ctrl := gomock.NewController(t) + controller, mockMerger, _, _ := newController(t, ctrl) + + req := runwaymq.MergeRequest{ + Id: testID, + QueueName: testQueue, + Steps: []*runwaymq.MergeStep{{StepId: "step-1"}}, + } + delivery := newDelivery(t, ctrl, requestPayload(t, req)) + + mockMerger.EXPECT().Merge(gomock.Any(), gomock.Any()).Return(nil, errors.New("merge backend down")) + + require.Error(t, controller.Process(context.Background(), delivery)) +} + +func TestProcess_ReturnsErrorOnPublishError(t *testing.T) { + ctrl := gomock.NewController(t) + controller, mockMerger, q, publisher := newController(t, ctrl) + + req := runwaymq.MergeRequest{ + Id: testID, + QueueName: testQueue, + Steps: []*runwaymq.MergeStep{{StepId: "step-1"}}, + } + delivery := newDelivery(t, ctrl, requestPayload(t, req)) + + mockMerger.EXPECT(). + Merge(gomock.Any(), gomock.Any()). + Return(&runwaymq.MergeResult{Id: testID, Outcome: runwaypb.Outcome_SUCCEEDED}, nil) + expectPublish(t, q, publisher, "merge-signal", func(result *runwaymq.MergeResult) { + assert.Equal(t, testID, result.Id) + assert.Equal(t, runwaypb.Outcome_SUCCEEDED, result.Outcome) + }, errors.New("queue publish failed")) + + require.Error(t, controller.Process(context.Background(), delivery)) +} + func TestProcess_DeserializeError(t *testing.T) { ctrl := gomock.NewController(t) - controller := newController(t) + controller, _, _, _ := newController(t, ctrl) delivery := newDelivery(t, ctrl, []byte(`{"id": not json}`)) require.Error(t, controller.Process(context.Background(), delivery)) } + +func expectPublish(t *testing.T, q *queuemock.MockQueue, publisher *queuemock.MockPublisher, topic string, assertResult func(*runwaymq.MergeResult), publishErr error) { + t.Helper() + + q.EXPECT().Publisher().Return(publisher) + publisher.EXPECT(). + Publish(gomock.Any(), topic, gomock.Any()). + DoAndReturn(func(_ context.Context, _ string, msg entityqueue.Message) error { + assert.Equal(t, testID, msg.ID) + assert.Equal(t, testPartitionKey, msg.PartitionKey) + + result := &runwaymq.MergeResult{} + require.NoError(t, runwaymq.Unmarshal(msg.Payload, result)) + assertResult(result) + return publishErr + }) +} diff --git a/runway/controller/mergeconflictcheck/BUILD.bazel b/runway/controller/mergeconflictcheck/BUILD.bazel index 6fa3c304..9132df37 100644 --- a/runway/controller/mergeconflictcheck/BUILD.bazel +++ b/runway/controller/mergeconflictcheck/BUILD.bazel @@ -7,8 +7,11 @@ go_library( visibility = ["//visibility:public"], deps = [ "//api/runway/messagequeue", + "//api/runway/messagequeue/protopb", "//platform/consumer", "//platform/metrics", + "//runway/controller/internal/signal", + "//runway/extension/merger", "@com_github_uber_go_tally//:tally", "@org_uber_go_zap//:zap", ], @@ -20,8 +23,12 @@ go_test( embed = [":mergeconflictcheck"], deps = [ "//api/runway/messagequeue", + "//api/runway/messagequeue/protopb", "//platform/base/messagequeue", + "//platform/consumer", "//platform/extension/messagequeue/mock", + "//runway/extension/merger", + "//runway/extension/merger/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally//:tally", diff --git a/runway/controller/mergeconflictcheck/mergeconflictcheck.go b/runway/controller/mergeconflictcheck/mergeconflictcheck.go index 689a64e9..cca0c84d 100644 --- a/runway/controller/mergeconflictcheck/mergeconflictcheck.go +++ b/runway/controller/mergeconflictcheck/mergeconflictcheck.go @@ -16,19 +16,23 @@ // Runway's merge-conflict-check queue. A request asks whether an ordered sequence // of merge steps applies cleanly onto the target branch without committing. // -// Currently a parse-and-log stub: it deserializes the MergeRequest off the queue -// and logs it. The real check (attempt the merge without committing and publish a -// MergeResult to the merge-conflict-check-signal queue) is not wired yet. +// It deserializes the MergeRequest off the queue, checks mergeability through +// the configured merger, and publishes a MergeResult to the +// merge-conflict-check-signal queue. package mergeconflictcheck import ( "context" + "errors" "fmt" "github.com/uber-go/tally" runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + runwaypb "github.com/uber/submitqueue/api/runway/messagequeue/protopb" "github.com/uber/submitqueue/platform/consumer" "github.com/uber/submitqueue/platform/metrics" + "github.com/uber/submitqueue/runway/controller/internal/signal" + "github.com/uber/submitqueue/runway/extension/merger" "go.uber.org/zap" ) @@ -37,16 +41,22 @@ var _ consumer.Controller = (*Controller)(nil) // Controller handles merge-conflict-check queue messages. type Controller struct { - logger *zap.SugaredLogger - metricsScope tally.Scope - topicKey consumer.TopicKey - consumerGroup string + logger *zap.SugaredLogger + metricsScope tally.Scope + merger merger.Merger + registry consumer.TopicRegistry + topicKey consumer.TopicKey + signalTopicKey consumer.TopicKey + consumerGroup string } // Params are the parameters for creating a new merge-conflict-check controller. type Params struct { - TopicKey consumer.TopicKey - ConsumerGroup string + TopicKey consumer.TopicKey + SignalTopicKey consumer.TopicKey + ConsumerGroup string + Merger merger.Merger + Registry consumer.TopicRegistry Scope tally.Scope Logger *zap.SugaredLogger @@ -55,10 +65,13 @@ type Params struct { // NewController creates a new merge-conflict-check controller for the runway service. func NewController(p Params) *Controller { return &Controller{ - logger: p.Logger.Named("mergeconflictcheck_controller"), - metricsScope: p.Scope.SubScope("mergeconflictcheck_controller"), - topicKey: p.TopicKey, - consumerGroup: p.ConsumerGroup, + logger: p.Logger.Named("mergeconflictcheck_controller"), + metricsScope: p.Scope.SubScope("mergeconflictcheck_controller"), + merger: p.Merger, + registry: p.Registry, + topicKey: p.TopicKey, + signalTopicKey: p.SignalTopicKey, + consumerGroup: p.ConsumerGroup, } } @@ -79,9 +92,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return fmt.Errorf("failed to deserialize merge request: %w", err) } - // TODO: attempt the ordered merge steps without committing and publish a - // MergeResult to the merge-conflict-check-signal queue. For now the request - // is only logged after parsing. c.logger.Infow("received merge-conflict-check request", "id", request.Id, "queue_name", request.QueueName, @@ -90,6 +100,27 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r "partition_key", msg.PartitionKey, ) + if c.merger == nil { + return fmt.Errorf("merger is required") + } + + result, err := c.merger.CheckMergeability(ctx, request) + if errors.Is(err, merger.ErrConflict) { + result = &runwaymq.MergeResult{ + Id: request.Id, + Outcome: runwaypb.Outcome_FAILED, + Reason: err.Error(), + } + } else if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "merger_errors", 1) + return fmt.Errorf("failed to check mergeability: %w", err) + } + + if err := signal.PublishMergeResult(ctx, c.registry, c.signalTopicKey, result, msg.PartitionKey); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) + return fmt.Errorf("failed to publish merge-conflict-check result: %w", err) + } + return nil } diff --git a/runway/controller/mergeconflictcheck/mergeconflictcheck_test.go b/runway/controller/mergeconflictcheck/mergeconflictcheck_test.go index 70fd41ad..6ad5bb66 100644 --- a/runway/controller/mergeconflictcheck/mergeconflictcheck_test.go +++ b/runway/controller/mergeconflictcheck/mergeconflictcheck_test.go @@ -16,14 +16,19 @@ package mergeconflictcheck import ( "context" + "errors" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + runwaypb "github.com/uber/submitqueue/api/runway/messagequeue/protopb" entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" queuemock "github.com/uber/submitqueue/platform/extension/messagequeue/mock" + "github.com/uber/submitqueue/runway/extension/merger" + mergermock "github.com/uber/submitqueue/runway/extension/merger/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) @@ -34,14 +39,25 @@ const ( testPartitionKey = "test-queue" ) -func newController(t *testing.T) *Controller { +func newController(t *testing.T, ctrl *gomock.Controller) (*Controller, *mergermock.MockMerger, *queuemock.MockQueue, *queuemock.MockPublisher) { t.Helper() - return NewController(Params{ - Logger: zaptest.NewLogger(t).Sugar(), - Scope: tally.NoopScope, - TopicKey: runwaymq.TopicKeyMergeConflictCheck, - ConsumerGroup: "runway-mergeconflictcheck", + q := queuemock.NewMockQueue(ctrl) + publisher := queuemock.NewMockPublisher(ctrl) + mockMerger := mergermock.NewMockMerger(ctrl) + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: runwaymq.TopicKeyMergeConflictCheckSignal, Name: "merge-conflict-check-signal", Queue: q}, }) + require.NoError(t, err) + + return NewController(Params{ + Logger: zaptest.NewLogger(t).Sugar(), + Scope: tally.NoopScope, + TopicKey: runwaymq.TopicKeyMergeConflictCheck, + SignalTopicKey: runwaymq.TopicKeyMergeConflictCheckSignal, + ConsumerGroup: "runway-mergeconflictcheck", + Merger: mockMerger, + Registry: registry, + }), mockMerger, q, publisher } func newDelivery(t *testing.T, ctrl *gomock.Controller, payload []byte) *queuemock.MockDelivery { @@ -61,16 +77,17 @@ func requestPayload(t *testing.T, req runwaymq.MergeRequest) []byte { } func TestNewController(t *testing.T) { - controller := newController(t) + ctrl := gomock.NewController(t) + controller, _, _, _ := newController(t, ctrl) require.NotNil(t, controller) assert.Equal(t, runwaymq.TopicKeyMergeConflictCheck, controller.TopicKey()) assert.Equal(t, "runway-mergeconflictcheck", controller.ConsumerGroup()) assert.Equal(t, "merge-conflict-check", controller.Name()) } -func TestProcess_LogsParsedRequest(t *testing.T) { +func TestProcess_PublishesMergeConflictCheckSignal(t *testing.T) { ctrl := gomock.NewController(t) - controller := newController(t) + controller, mockMerger, q, publisher := newController(t, ctrl) req := runwaymq.MergeRequest{ Id: testID, @@ -79,14 +96,105 @@ func TestProcess_LogsParsedRequest(t *testing.T) { } delivery := newDelivery(t, ctrl, requestPayload(t, req)) + mockMerger.EXPECT(). + CheckMergeability(gomock.Any(), gomock.Any()). + Return(&runwaymq.MergeResult{ + Id: testID, + Outcome: runwaypb.Outcome_SUCCEEDED, + Steps: []*runwaymq.StepResult{{StepId: "step-1"}}, + }, nil) + expectPublish(t, q, publisher, "merge-conflict-check-signal", func(result *runwaymq.MergeResult) { + assert.Equal(t, testID, result.Id) + assert.Equal(t, runwaypb.Outcome_SUCCEEDED, result.Outcome) + require.Len(t, result.Steps, 1) + assert.Equal(t, "step-1", result.Steps[0].StepId) + assert.Empty(t, result.Steps[0].Outputs) + }, nil) + require.NoError(t, controller.Process(context.Background(), delivery)) } +func TestProcess_PublishesFailedSignalOnConflict(t *testing.T) { + ctrl := gomock.NewController(t) + controller, mockMerger, q, publisher := newController(t, ctrl) + + req := runwaymq.MergeRequest{ + Id: testID, + QueueName: testQueue, + Steps: []*runwaymq.MergeStep{{StepId: "step-1"}}, + } + delivery := newDelivery(t, ctrl, requestPayload(t, req)) + + mockMerger.EXPECT().CheckMergeability(gomock.Any(), gomock.Any()).Return(nil, merger.ErrConflict) + expectPublish(t, q, publisher, "merge-conflict-check-signal", func(result *runwaymq.MergeResult) { + assert.Equal(t, testID, result.Id) + assert.Equal(t, runwaypb.Outcome_FAILED, result.Outcome) + assert.Equal(t, merger.ErrConflict.Error(), result.Reason) + }, nil) + + require.NoError(t, controller.Process(context.Background(), delivery)) +} + +func TestProcess_ReturnsErrorOnCheckError(t *testing.T) { + ctrl := gomock.NewController(t) + controller, mockMerger, _, _ := newController(t, ctrl) + + req := runwaymq.MergeRequest{ + Id: testID, + QueueName: testQueue, + Steps: []*runwaymq.MergeStep{{StepId: "step-1"}}, + } + delivery := newDelivery(t, ctrl, requestPayload(t, req)) + + mockMerger.EXPECT().CheckMergeability(gomock.Any(), gomock.Any()).Return(nil, errors.New("merge backend down")) + + require.Error(t, controller.Process(context.Background(), delivery)) +} + +func TestProcess_ReturnsErrorOnPublishError(t *testing.T) { + ctrl := gomock.NewController(t) + controller, mockMerger, q, publisher := newController(t, ctrl) + + req := runwaymq.MergeRequest{ + Id: testID, + QueueName: testQueue, + Steps: []*runwaymq.MergeStep{{StepId: "step-1"}}, + } + delivery := newDelivery(t, ctrl, requestPayload(t, req)) + + mockMerger.EXPECT(). + CheckMergeability(gomock.Any(), gomock.Any()). + Return(&runwaymq.MergeResult{Id: testID, Outcome: runwaypb.Outcome_SUCCEEDED}, nil) + expectPublish(t, q, publisher, "merge-conflict-check-signal", func(result *runwaymq.MergeResult) { + assert.Equal(t, testID, result.Id) + assert.Equal(t, runwaypb.Outcome_SUCCEEDED, result.Outcome) + }, errors.New("queue publish failed")) + + require.Error(t, controller.Process(context.Background(), delivery)) +} + func TestProcess_DeserializeError(t *testing.T) { ctrl := gomock.NewController(t) - controller := newController(t) + controller, _, _, _ := newController(t, ctrl) delivery := newDelivery(t, ctrl, []byte(`{"id": not json}`)) require.Error(t, controller.Process(context.Background(), delivery)) } + +func expectPublish(t *testing.T, q *queuemock.MockQueue, publisher *queuemock.MockPublisher, topic string, assertResult func(*runwaymq.MergeResult), publishErr error) { + t.Helper() + + q.EXPECT().Publisher().Return(publisher) + publisher.EXPECT(). + Publish(gomock.Any(), topic, gomock.Any()). + DoAndReturn(func(_ context.Context, _ string, msg entityqueue.Message) error { + assert.Equal(t, testID, msg.ID) + assert.Equal(t, testPartitionKey, msg.PartitionKey) + + result := &runwaymq.MergeResult{} + require.NoError(t, runwaymq.Unmarshal(msg.Payload, result)) + assertResult(result) + return publishErr + }) +} diff --git a/test/e2e/submitqueue/suite_test.go b/test/e2e/submitqueue/suite_test.go index e124452a..28aea875 100644 --- a/test/e2e/submitqueue/suite_test.go +++ b/test/e2e/submitqueue/suite_test.go @@ -67,6 +67,7 @@ func TestE2EIntegration(t *testing.T) { const ( persistTimeout = 30 * time.Second persistPollInterval = 500 * time.Millisecond + workflowTimeout = 60 * time.Second ) func (s *E2EIntegrationSuite) SetupSuite() { @@ -135,6 +136,7 @@ func (s *E2EIntegrationSuite) TearDownSuite() { gatewayStopErr := s.stack.StopService("gateway-service", stopTimeoutSec) orchestratorStopErr := s.stack.StopService("orchestrator-service", stopTimeoutSec) + runwayStopErr := s.stack.StopService("runway-service", stopTimeoutSec) if assert.NoError(t, gatewayStopErr, "failed to stop gateway service") { exitCode, err := s.stack.ServiceExitCode("gateway-service") @@ -152,6 +154,14 @@ func (s *E2EIntegrationSuite) TearDownSuite() { } } + if assert.NoError(t, runwayStopErr, "failed to stop runway service") { + exitCode, err := s.stack.ServiceExitCode("runway-service") + if assert.NoError(t, err, "failed to get runway exit code") { + assert.Equal(t, wantExitCode, exitCode, + "runway should exit with 128+SIGTERM (%d) on graceful shutdown", wantExitCode) + } + } + // Compose stack cleanup handled automatically by t.Cleanup } @@ -183,18 +193,18 @@ func (s *E2EIntegrationSuite) TestLandRequest_SinglePR() { s.log.Logf("Land request (single PR) succeeded: sqid=%s", resp.Sqid) } -// TestLandRequest_PersistsStartedLogViaGatewayConsumer verifies the request-log +// TestLandRequest_AdvancesBeyondAcceptedViaGatewayConsumer verifies the request-log // ownership invariant end-to-end: the orchestrator only *publishes* request log // entries to the log topic (it never writes the request log itself), and the // gateway's log consumer drains that topic and persists them to storage. // // We observe this through the gateway Status RPC: immediately after Land the // status is "accepted" (the gateway's synchronous direct write), and once the -// orchestrator's start controller publishes "started" to the log topic, the -// gateway consumer persists it and Status advances to "started". Seeing -// "started" therefore proves the publish→consume→persist path works across both -// services. -func (s *E2EIntegrationSuite) TestLandRequest_PersistsStartedLogViaGatewayConsumer() { +// orchestrator publishes downstream statuses to the log topic, the gateway +// consumer persists them and Status advances beyond "accepted". Because the full +// local workflow can now advance quickly, the latest visible status may already +// be later than "started" by the time this poll observes it. +func (s *E2EIntegrationSuite) TestLandRequest_AdvancesBeyondAcceptedViaGatewayConsumer() { t := s.T() landResp, err := s.gatewayClient.Land(s.ctx, &gatewaypb.LandRequest{ @@ -205,7 +215,7 @@ func (s *E2EIntegrationSuite) TestLandRequest_PersistsStartedLogViaGatewayConsum require.NoError(t, err, "Land request failed") require.NotEmpty(t, landResp.Sqid, "SQID should not be empty") sqid := landResp.Sqid - s.log.Logf("Land succeeded: sqid=%s; waiting for gateway consumer to persist 'started'", sqid) + s.log.Logf("Land succeeded: sqid=%s; waiting for gateway consumer to persist downstream status", sqid) require.Eventually(t, func() bool { resp, statusErr := s.gatewayClient.Status(s.ctx, &gatewaypb.StatusRequest{Sqid: sqid}) @@ -214,11 +224,36 @@ func (s *E2EIntegrationSuite) TestLandRequest_PersistsStartedLogViaGatewayConsum return false } s.log.Logf("Status(%s) = %q", sqid, resp.Status) - return resp.Status == string(entity.RequestStatusStarted) + return resp.Status != "" && resp.Status != string(entity.RequestStatusAccepted) }, persistTimeout, persistPollInterval, - "request %s should reach status %q via the gateway log consumer", sqid, entity.RequestStatusStarted) + "request %s should advance beyond %q via the gateway log consumer", sqid, entity.RequestStatusAccepted) + + s.log.Logf("Gateway consumer persisted downstream status for sqid=%s", sqid) +} - s.log.Logf("Gateway consumer persisted orchestrator-published 'started' log for sqid=%s", sqid) +func (s *E2EIntegrationSuite) TestLandRequest_ReachesLandedStatus() { + t := s.T() + + landResp, err := s.gatewayClient.Land(s.ctx, &gatewaypb.LandRequest{ + Queue: "e2e-test-queue", + Change: &changepb.Change{Uris: []string{"github://uber/e2e-landed/pull/777/abcdef0123456789abcdef0123456789abcdef01"}}, + Strategy: mergestrategypb.Strategy_REBASE, + }) + require.NoError(t, err, "Land request failed") + require.NotEmpty(t, landResp.Sqid, "SQID should not be empty") + sqid := landResp.Sqid + s.log.Logf("Land succeeded: sqid=%s; waiting for terminal landed status", sqid) + + require.Eventually(t, func() bool { + resp, statusErr := s.gatewayClient.Status(s.ctx, &gatewaypb.StatusRequest{Sqid: sqid}) + if statusErr != nil { + s.log.Logf("Status(%s) not ready yet: %v", sqid, statusErr) + return false + } + s.log.Logf("Status(%s) = %q", sqid, resp.Status) + return resp.Status == string(entity.RequestStatusLanded) + }, workflowTimeout, persistPollInterval, + "request %s should reach terminal status %q", sqid, entity.RequestStatusLanded) } // TestCancelRequest_InvalidSqid verifies the gateway rejects an empty sqid