diff --git a/app/jobs/pollable_job_wrapper.rb b/app/jobs/pollable_job_wrapper.rb index eb1fe78fdab..6c1b20c1de1 100644 --- a/app/jobs/pollable_job_wrapper.rb +++ b/app/jobs/pollable_job_wrapper.rb @@ -70,6 +70,13 @@ def error(job, exception) end def failure(job) + begin + unwrapped_job = Enqueuer.unwrap_job(@handler) + unwrapped_job.recover_from_failure if unwrapped_job.respond_to?(:recover_from_failure) + rescue StandardError => e + logger.error("failure recovery failed for #{unwrapped_job.class.name}: #{e.class}: #{e.message}") + end + change_state(job, PollableJobModel::FAILED_STATE) end diff --git a/app/jobs/v3/services/update_broker_job.rb b/app/jobs/v3/services/update_broker_job.rb index f8e2c219d88..6463f6eb912 100644 --- a/app/jobs/v3/services/update_broker_job.rb +++ b/app/jobs/v3/services/update_broker_job.rb @@ -37,6 +37,14 @@ def display_name 'service_broker.update' end + def recover_from_failure + ServiceBroker.where(guid: broker_guid, state: ServiceBrokerStateEnum::SYNCHRONIZING). + update(state: previous_broker_state) + rescue StandardError => e + logger = Steno.logger('cc.background') + logger.error("Failed to recover broker state for #{broker_guid}: #{e.class}: #{e.message}") + end + private attr_reader :update_request_guid, :broker_guid, :previous_broker_state, :user_audit_info @@ -66,17 +74,32 @@ def perform @warnings rescue StandardError => e - ServiceBroker.where(id: update_request.service_broker_id).update(state: previous_broker_state) + rollback_broker_state raise V3::ServiceBrokerUpdate::InvalidServiceBroker.new(e.message) if e.is_a?(Sequel::ValidationFailed) - raise e + raise ensure - update_request.destroy + destroy_update_request end private + def rollback_broker_state + return unless update_request + + ServiceBroker.where(id: update_request.service_broker_id, state: ServiceBrokerStateEnum::SYNCHRONIZING). + update(state: previous_broker_state) + rescue StandardError + # Best effort only; wrapper failure hook will retry + end + + def destroy_update_request + update_request&.destroy + rescue StandardError + # Don't mask original failure + end + def update_params params = {} params[:name] = update_request.name unless update_request.name.nil? diff --git a/spec/unit/jobs/pollable_job_wrapper_spec.rb b/spec/unit/jobs/pollable_job_wrapper_spec.rb index 0db1f850068..f09ca359c30 100644 --- a/spec/unit/jobs/pollable_job_wrapper_spec.rb +++ b/spec/unit/jobs/pollable_job_wrapper_spec.rb @@ -201,6 +201,55 @@ class BigException < StandardError execute_all_jobs(expected_successes: 0, expected_failures: 1) end end + + context 'when the job implements recover_from_failure' do + let(:broker) do + VCAP::CloudController::ServiceBroker.create( + name: 'recovery-test-broker', + broker_url: 'http://example.org/broker-url', + auth_username: 'username', + auth_password: 'password', + state: VCAP::CloudController::ServiceBrokerStateEnum::SYNCHRONIZING + ) + end + + let(:update_request) do + VCAP::CloudController::ServiceBrokerUpdateRequest.create( + name: 'new-name', + service_broker_id: broker.id + ) + end + + let(:user_audit_info) { instance_double(VCAP::CloudController::UserAuditInfo, { user_guid: Sham.guid }) } + let(:update_job) { VCAP::CloudController::V3::UpdateBrokerJob.new(update_request.guid, broker.guid, 'AVAILABLE', user_audit_info:) } + let(:pollable_job) { PollableJobWrapper.new(update_job) } + + before do + allow_any_instance_of(VCAP::CloudController::V3::UpdateBrokerJob::Perform).to receive(:perform).and_raise(StandardError.new('job failed')) + end + + it 'calls recover_from_failure and reverts broker state to AVAILABLE' do + enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new.enqueue(pollable_job) + VCAP::CloudController::PollableJobModel.make(delayed_job_guid: enqueued_job.guid, state: 'PROCESSING') + + execute_all_jobs(expected_successes: 0, expected_failures: 1) + + broker.reload + expect(broker.state).to eq('AVAILABLE') + end + end + + context 'when the job does not implement recover_from_failure' do + it 'still marks the job as FAILED without error' do + enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new.enqueue(pollable_job) + job_model = VCAP::CloudController::PollableJobModel.make(delayed_job_guid: enqueued_job.guid, state: 'PROCESSING') + + execute_all_jobs(expected_successes: 0, expected_failures: 1) + + job_model.reload + expect(job_model.state).to eq('FAILED') + end + end end end diff --git a/spec/unit/jobs/v3/services/update_broker_job_spec.rb b/spec/unit/jobs/v3/services/update_broker_job_spec.rb index 0d3727f4afa..34e4a489163 100644 --- a/spec/unit/jobs/v3/services/update_broker_job_spec.rb +++ b/spec/unit/jobs/v3/services/update_broker_job_spec.rb @@ -446,6 +446,33 @@ module V3 end end + context 'when database disconnects during state rollback' do + let(:catalog_error) { StandardError.new('Catalog fetch failed') } + + before do + allow_any_instance_of(VCAP::CloudController::V3::ServiceBrokerCatalogUpdater).to receive(:refresh).and_raise(catalog_error) + + mock_dataset = instance_double(Sequel::Postgres::Dataset) + allow(mock_dataset).to receive(:update).and_raise(Sequel::DatabaseDisconnectError.new('connection lost')) + + allow(ServiceBroker).to receive(:where).and_call_original + allow(ServiceBroker).to receive(:where).with(id: broker.id).and_return(mock_dataset) + end + + it 'swallows the database error and re-raises the original catalog error' do + expect { job.perform }.to raise_error(catalog_error) + end + + it 'does not raise a database connection error' do + expect { job.perform }.not_to raise_error(Sequel::DatabaseDisconnectError) + end + + it 'still cleans up the update request' do + expect { job.perform }.to raise_error(catalog_error) + expect(ServiceBrokerUpdateRequest.where(id: update_broker_request.id).all).to be_empty + end + end + context 'when the broker ceases to exist during the job' do it 'raises a ServiceBrokerGone error' do broker.destroy @@ -457,6 +484,65 @@ module V3 end end + describe '#recover_from_failure' do + let(:previous_state) { ServiceBrokerStateEnum::AVAILABLE } + + subject(:job) do + UpdateBrokerJob.new(update_broker_request.guid, broker.guid, previous_state, user_audit_info:) + end + + context 'when broker is in SYNCHRONIZING state' do + before do + broker.update(state: ServiceBrokerStateEnum::SYNCHRONIZING) + end + + it 'reverts the broker to the previous state' do + job.recover_from_failure + + broker.reload + expect(broker.state).to eq(ServiceBrokerStateEnum::AVAILABLE) + end + end + + context 'when broker is not in SYNCHRONIZING state' do + before do + broker.update(state: ServiceBrokerStateEnum::AVAILABLE) + end + + it 'does not change the broker state' do + job.recover_from_failure + + broker.reload + expect(broker.state).to eq(ServiceBrokerStateEnum::AVAILABLE) + end + end + + context 'when broker state has changed to something else' do + before do + broker.update(state: ServiceBrokerStateEnum::DELETE_IN_PROGRESS) + end + + it 'does not overwrite the newer state' do + job.recover_from_failure + + broker.reload + expect(broker.state).to eq(ServiceBrokerStateEnum::DELETE_IN_PROGRESS) + end + end + + context 'when database error occurs during recovery' do + before do + broker.update(state: ServiceBrokerStateEnum::SYNCHRONIZING) + allow(ServiceBroker).to receive(:where).and_raise(Sequel::DatabaseError.new('connection lost')) + allow(Steno).to receive(:logger).and_return(double(error: nil)) + end + + it 'logs the error and does not raise' do + expect { job.recover_from_failure }.not_to raise_error + end + end + end + def setup_broker_with_invalid_catalog catalog = instance_double(Services::ServiceBrokers::V2::Catalog)