From b002752029597bf03d5aef1fc852948ff8c21bd3 Mon Sep 17 00:00:00 2001 From: Katharina Przybill <30441792+kathap@users.noreply.github.com> Date: Tue, 7 Apr 2026 14:04:36 +0200 Subject: [PATCH 1/2] Fix broker stuck in SYNCHRONIZING on DB error during rollback When a service broker update job fails and attempts to revert the broker state, a database connection failure could cause the job to crash without properly handling the original error. This left the broker stuck in SYNCHRONIZING state with a FAILED job. This change wraps the state rollback operation in error handling to catch database errors and allow the original exception to be raised and the job to be retried properly. Changes: - app/jobs/v3/services/update_broker_job.rb: Add error handling around ServiceBroker.where().update() call in rescue block to gracefully handle database disconnections during state rollback - spec/unit/jobs/v3/services/update_broker_job_spec.rb: Add test case for database disconnect during state rollback --- app/jobs/v3/services/update_broker_job.rb | 11 +++++--- .../v3/services/update_broker_job_spec.rb | 27 +++++++++++++++++++ 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/app/jobs/v3/services/update_broker_job.rb b/app/jobs/v3/services/update_broker_job.rb index f8e2c219d88..59f092acc19 100644 --- a/app/jobs/v3/services/update_broker_job.rb +++ b/app/jobs/v3/services/update_broker_job.rb @@ -66,13 +66,18 @@ def perform @warnings rescue StandardError => e - ServiceBroker.where(id: update_request.service_broker_id).update(state: previous_broker_state) + begin + ServiceBroker.where(id: update_request.service_broker_id).update(state: previous_broker_state) if update_request + rescue StandardError + # Swallow errors during state rollback (e.g., DB connection issues) + # so the original exception can be raised and the job retried + end raise V3::ServiceBrokerUpdate::InvalidServiceBroker.new(e.message) if e.is_a?(Sequel::ValidationFailed) - raise e + raise ensure - update_request.destroy + update_request.destroy if update_request end private diff --git a/spec/unit/jobs/v3/services/update_broker_job_spec.rb b/spec/unit/jobs/v3/services/update_broker_job_spec.rb index 0d3727f4afa..de0ab4df244 100644 --- a/spec/unit/jobs/v3/services/update_broker_job_spec.rb +++ b/spec/unit/jobs/v3/services/update_broker_job_spec.rb @@ -446,6 +446,33 @@ module V3 end end + context 'when database disconnects during state rollback' do + let(:catalog_error) { StandardError.new('Catalog fetch failed') } + + before do + allow_any_instance_of(VCAP::CloudController::V3::ServiceBrokerCatalogUpdater).to receive(:refresh).and_raise(catalog_error) + + mock_dataset = instance_double(Sequel::Postgres::Dataset) + allow(mock_dataset).to receive(:update).and_raise(Sequel::DatabaseDisconnectError.new('connection lost')) + + allow(ServiceBroker).to receive(:where).and_call_original + allow(ServiceBroker).to receive(:where).with(id: broker.id).and_return(mock_dataset) + end + + it 'swallows the database error and re-raises the original catalog error' do + expect { job.perform }.to raise_error(catalog_error) + end + + it 'does not raise a database connection error' do + expect { job.perform }.not_to raise_error(Sequel::DatabaseDisconnectError) + end + + it 'still cleans up the update request' do + expect { job.perform }.to raise_error(catalog_error) + expect(ServiceBrokerUpdateRequest.where(id: update_broker_request.id).all).to be_empty + end + end + context 'when the broker ceases to exist during the job' do it 'raises a ServiceBrokerGone error' do broker.destroy From 1c425fce49707d8e57a73317ad7bfe1a28ec4af1 Mon Sep 17 00:00:00 2001 From: Katharina Przybill <30441792+kathap@users.noreply.github.com> Date: Fri, 10 Apr 2026 08:51:56 +0200 Subject: [PATCH 2/2] Add failure recovery hook to prevent stuck brokers When UpdateBrokerJob exhausts retries and transitions to FAILED, invoke recover_from_failure to revert broker from SYNCHRONIZING back to previous state. This ensures brokers don't remain stuck when jobs fail during extended database outages. Changes: - UpdateBrokerJob: Add recover_from_failure method with conditional WHERE clause to safely revert SYNCHRONIZING brokers - PollableJobWrapper: Call recover_from_failure hook in failure method - Extract rollback_broker_state and destroy_update_request helpers for cleaner error handling - Add tests for recovery hook behavior and edge cases --- app/jobs/pollable_job_wrapper.rb | 7 +++ app/jobs/v3/services/update_broker_job.rb | 32 +++++++--- spec/unit/jobs/pollable_job_wrapper_spec.rb | 49 +++++++++++++++ .../v3/services/update_broker_job_spec.rb | 59 +++++++++++++++++++ 4 files changed, 140 insertions(+), 7 deletions(-) diff --git a/app/jobs/pollable_job_wrapper.rb b/app/jobs/pollable_job_wrapper.rb index eb1fe78fdab..6c1b20c1de1 100644 --- a/app/jobs/pollable_job_wrapper.rb +++ b/app/jobs/pollable_job_wrapper.rb @@ -70,6 +70,13 @@ def error(job, exception) end def failure(job) + begin + unwrapped_job = Enqueuer.unwrap_job(@handler) + unwrapped_job.recover_from_failure if unwrapped_job.respond_to?(:recover_from_failure) + rescue StandardError => e + logger.error("failure recovery failed for #{unwrapped_job.class.name}: #{e.class}: #{e.message}") + end + change_state(job, PollableJobModel::FAILED_STATE) end diff --git a/app/jobs/v3/services/update_broker_job.rb b/app/jobs/v3/services/update_broker_job.rb index 59f092acc19..6463f6eb912 100644 --- a/app/jobs/v3/services/update_broker_job.rb +++ b/app/jobs/v3/services/update_broker_job.rb @@ -37,6 +37,14 @@ def display_name 'service_broker.update' end + def recover_from_failure + ServiceBroker.where(guid: broker_guid, state: ServiceBrokerStateEnum::SYNCHRONIZING). + update(state: previous_broker_state) + rescue StandardError => e + logger = Steno.logger('cc.background') + logger.error("Failed to recover broker state for #{broker_guid}: #{e.class}: #{e.message}") + end + private attr_reader :update_request_guid, :broker_guid, :previous_broker_state, :user_audit_info @@ -66,22 +74,32 @@ def perform @warnings rescue StandardError => e - begin - ServiceBroker.where(id: update_request.service_broker_id).update(state: previous_broker_state) if update_request - rescue StandardError - # Swallow errors during state rollback (e.g., DB connection issues) - # so the original exception can be raised and the job retried - end + rollback_broker_state raise V3::ServiceBrokerUpdate::InvalidServiceBroker.new(e.message) if e.is_a?(Sequel::ValidationFailed) raise ensure - update_request.destroy if update_request + destroy_update_request end private + def rollback_broker_state + return unless update_request + + ServiceBroker.where(id: update_request.service_broker_id, state: ServiceBrokerStateEnum::SYNCHRONIZING). + update(state: previous_broker_state) + rescue StandardError + # Best effort only; wrapper failure hook will retry + end + + def destroy_update_request + update_request&.destroy + rescue StandardError + # Don't mask original failure + end + def update_params params = {} params[:name] = update_request.name unless update_request.name.nil? diff --git a/spec/unit/jobs/pollable_job_wrapper_spec.rb b/spec/unit/jobs/pollable_job_wrapper_spec.rb index 0db1f850068..f09ca359c30 100644 --- a/spec/unit/jobs/pollable_job_wrapper_spec.rb +++ b/spec/unit/jobs/pollable_job_wrapper_spec.rb @@ -201,6 +201,55 @@ class BigException < StandardError execute_all_jobs(expected_successes: 0, expected_failures: 1) end end + + context 'when the job implements recover_from_failure' do + let(:broker) do + VCAP::CloudController::ServiceBroker.create( + name: 'recovery-test-broker', + broker_url: 'http://example.org/broker-url', + auth_username: 'username', + auth_password: 'password', + state: VCAP::CloudController::ServiceBrokerStateEnum::SYNCHRONIZING + ) + end + + let(:update_request) do + VCAP::CloudController::ServiceBrokerUpdateRequest.create( + name: 'new-name', + service_broker_id: broker.id + ) + end + + let(:user_audit_info) { instance_double(VCAP::CloudController::UserAuditInfo, { user_guid: Sham.guid }) } + let(:update_job) { VCAP::CloudController::V3::UpdateBrokerJob.new(update_request.guid, broker.guid, 'AVAILABLE', user_audit_info:) } + let(:pollable_job) { PollableJobWrapper.new(update_job) } + + before do + allow_any_instance_of(VCAP::CloudController::V3::UpdateBrokerJob::Perform).to receive(:perform).and_raise(StandardError.new('job failed')) + end + + it 'calls recover_from_failure and reverts broker state to AVAILABLE' do + enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new.enqueue(pollable_job) + VCAP::CloudController::PollableJobModel.make(delayed_job_guid: enqueued_job.guid, state: 'PROCESSING') + + execute_all_jobs(expected_successes: 0, expected_failures: 1) + + broker.reload + expect(broker.state).to eq('AVAILABLE') + end + end + + context 'when the job does not implement recover_from_failure' do + it 'still marks the job as FAILED without error' do + enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new.enqueue(pollable_job) + job_model = VCAP::CloudController::PollableJobModel.make(delayed_job_guid: enqueued_job.guid, state: 'PROCESSING') + + execute_all_jobs(expected_successes: 0, expected_failures: 1) + + job_model.reload + expect(job_model.state).to eq('FAILED') + end + end end end diff --git a/spec/unit/jobs/v3/services/update_broker_job_spec.rb b/spec/unit/jobs/v3/services/update_broker_job_spec.rb index de0ab4df244..34e4a489163 100644 --- a/spec/unit/jobs/v3/services/update_broker_job_spec.rb +++ b/spec/unit/jobs/v3/services/update_broker_job_spec.rb @@ -484,6 +484,65 @@ module V3 end end + describe '#recover_from_failure' do + let(:previous_state) { ServiceBrokerStateEnum::AVAILABLE } + + subject(:job) do + UpdateBrokerJob.new(update_broker_request.guid, broker.guid, previous_state, user_audit_info:) + end + + context 'when broker is in SYNCHRONIZING state' do + before do + broker.update(state: ServiceBrokerStateEnum::SYNCHRONIZING) + end + + it 'reverts the broker to the previous state' do + job.recover_from_failure + + broker.reload + expect(broker.state).to eq(ServiceBrokerStateEnum::AVAILABLE) + end + end + + context 'when broker is not in SYNCHRONIZING state' do + before do + broker.update(state: ServiceBrokerStateEnum::AVAILABLE) + end + + it 'does not change the broker state' do + job.recover_from_failure + + broker.reload + expect(broker.state).to eq(ServiceBrokerStateEnum::AVAILABLE) + end + end + + context 'when broker state has changed to something else' do + before do + broker.update(state: ServiceBrokerStateEnum::DELETE_IN_PROGRESS) + end + + it 'does not overwrite the newer state' do + job.recover_from_failure + + broker.reload + expect(broker.state).to eq(ServiceBrokerStateEnum::DELETE_IN_PROGRESS) + end + end + + context 'when database error occurs during recovery' do + before do + broker.update(state: ServiceBrokerStateEnum::SYNCHRONIZING) + allow(ServiceBroker).to receive(:where).and_raise(Sequel::DatabaseError.new('connection lost')) + allow(Steno).to receive(:logger).and_return(double(error: nil)) + end + + it 'logs the error and does not raise' do + expect { job.recover_from_failure }.not_to raise_error + end + end + end + def setup_broker_with_invalid_catalog catalog = instance_double(Services::ServiceBrokers::V2::Catalog)