Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions app/jobs/pollable_job_wrapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ def error(job, exception)
end

def failure(job)
begin
unwrapped_job = Enqueuer.unwrap_job(@handler)
unwrapped_job.recover_from_failure if unwrapped_job.respond_to?(:recover_from_failure)
rescue StandardError => e
logger.error("failure recovery failed for #{unwrapped_job.class.name}: #{e.class}: #{e.message}")
end

change_state(job, PollableJobModel::FAILED_STATE)
end

Expand Down
29 changes: 26 additions & 3 deletions app/jobs/v3/services/update_broker_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ def display_name
'service_broker.update'
end

def recover_from_failure
ServiceBroker.where(guid: broker_guid, state: ServiceBrokerStateEnum::SYNCHRONIZING).
update(state: previous_broker_state)
rescue StandardError => e
logger = Steno.logger('cc.background')
logger.error("Failed to recover broker state for #{broker_guid}: #{e.class}: #{e.message}")
end

private

attr_reader :update_request_guid, :broker_guid, :previous_broker_state, :user_audit_info
Expand Down Expand Up @@ -66,17 +74,32 @@ def perform

@warnings
rescue StandardError => e
ServiceBroker.where(id: update_request.service_broker_id).update(state: previous_broker_state)
rollback_broker_state

raise V3::ServiceBrokerUpdate::InvalidServiceBroker.new(e.message) if e.is_a?(Sequel::ValidationFailed)

raise e
raise
ensure
update_request.destroy
destroy_update_request
end

private

def rollback_broker_state
return unless update_request

ServiceBroker.where(id: update_request.service_broker_id, state: ServiceBrokerStateEnum::SYNCHRONIZING).
update(state: previous_broker_state)
rescue StandardError
# Best effort only; wrapper failure hook will retry
end

def destroy_update_request
update_request&.destroy
rescue StandardError
# Don't mask original failure
end

def update_params
params = {}
params[:name] = update_request.name unless update_request.name.nil?
Expand Down
49 changes: 49 additions & 0 deletions spec/unit/jobs/pollable_job_wrapper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,55 @@ class BigException < StandardError
execute_all_jobs(expected_successes: 0, expected_failures: 1)
end
end

context 'when the job implements recover_from_failure' do
let(:broker) do
VCAP::CloudController::ServiceBroker.create(
name: 'recovery-test-broker',
broker_url: 'http://example.org/broker-url',
auth_username: 'username',
auth_password: 'password',
state: VCAP::CloudController::ServiceBrokerStateEnum::SYNCHRONIZING
)
end

let(:update_request) do
VCAP::CloudController::ServiceBrokerUpdateRequest.create(
name: 'new-name',
service_broker_id: broker.id
)
end

let(:user_audit_info) { instance_double(VCAP::CloudController::UserAuditInfo, { user_guid: Sham.guid }) }
let(:update_job) { VCAP::CloudController::V3::UpdateBrokerJob.new(update_request.guid, broker.guid, 'AVAILABLE', user_audit_info:) }
let(:pollable_job) { PollableJobWrapper.new(update_job) }

before do
allow_any_instance_of(VCAP::CloudController::V3::UpdateBrokerJob::Perform).to receive(:perform).and_raise(StandardError.new('job failed'))
end

it 'calls recover_from_failure and reverts broker state to AVAILABLE' do
enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new.enqueue(pollable_job)
VCAP::CloudController::PollableJobModel.make(delayed_job_guid: enqueued_job.guid, state: 'PROCESSING')

execute_all_jobs(expected_successes: 0, expected_failures: 1)

broker.reload
expect(broker.state).to eq('AVAILABLE')
end
end

context 'when the job does not implement recover_from_failure' do
it 'still marks the job as FAILED without error' do
enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new.enqueue(pollable_job)
job_model = VCAP::CloudController::PollableJobModel.make(delayed_job_guid: enqueued_job.guid, state: 'PROCESSING')

execute_all_jobs(expected_successes: 0, expected_failures: 1)

job_model.reload
expect(job_model.state).to eq('FAILED')
end
end
end
end

Expand Down
86 changes: 86 additions & 0 deletions spec/unit/jobs/v3/services/update_broker_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,33 @@ module V3
end
end

context 'when database disconnects during state rollback' do
let(:catalog_error) { StandardError.new('Catalog fetch failed') }

before do
allow_any_instance_of(VCAP::CloudController::V3::ServiceBrokerCatalogUpdater).to receive(:refresh).and_raise(catalog_error)

mock_dataset = instance_double(Sequel::Postgres::Dataset)
allow(mock_dataset).to receive(:update).and_raise(Sequel::DatabaseDisconnectError.new('connection lost'))

allow(ServiceBroker).to receive(:where).and_call_original
allow(ServiceBroker).to receive(:where).with(id: broker.id).and_return(mock_dataset)
end

it 'swallows the database error and re-raises the original catalog error' do
expect { job.perform }.to raise_error(catalog_error)
end

it 'does not raise a database connection error' do
expect { job.perform }.not_to raise_error(Sequel::DatabaseDisconnectError)
end

it 'still cleans up the update request' do
expect { job.perform }.to raise_error(catalog_error)
expect(ServiceBrokerUpdateRequest.where(id: update_broker_request.id).all).to be_empty
end
end

context 'when the broker ceases to exist during the job' do
it 'raises a ServiceBrokerGone error' do
broker.destroy
Expand All @@ -457,6 +484,65 @@ module V3
end
end

describe '#recover_from_failure' do
let(:previous_state) { ServiceBrokerStateEnum::AVAILABLE }

subject(:job) do
UpdateBrokerJob.new(update_broker_request.guid, broker.guid, previous_state, user_audit_info:)
end

context 'when broker is in SYNCHRONIZING state' do
before do
broker.update(state: ServiceBrokerStateEnum::SYNCHRONIZING)
end

it 'reverts the broker to the previous state' do
job.recover_from_failure

broker.reload
expect(broker.state).to eq(ServiceBrokerStateEnum::AVAILABLE)
end
end

context 'when broker is not in SYNCHRONIZING state' do
before do
broker.update(state: ServiceBrokerStateEnum::AVAILABLE)
end

it 'does not change the broker state' do
job.recover_from_failure

broker.reload
expect(broker.state).to eq(ServiceBrokerStateEnum::AVAILABLE)
end
end

context 'when broker state has changed to something else' do
before do
broker.update(state: ServiceBrokerStateEnum::DELETE_IN_PROGRESS)
end

it 'does not overwrite the newer state' do
job.recover_from_failure

broker.reload
expect(broker.state).to eq(ServiceBrokerStateEnum::DELETE_IN_PROGRESS)
end
end

context 'when database error occurs during recovery' do
before do
broker.update(state: ServiceBrokerStateEnum::SYNCHRONIZING)
allow(ServiceBroker).to receive(:where).and_raise(Sequel::DatabaseError.new('connection lost'))
allow(Steno).to receive(:logger).and_return(double(error: nil))
end

it 'logs the error and does not raise' do
expect { job.recover_from_failure }.not_to raise_error
end
end
end

def setup_broker_with_invalid_catalog
catalog = instance_double(Services::ServiceBrokers::V2::Catalog)

Expand Down
Loading