Skip to content

Adds 'redis' WORKER_TYPE#7296

Open
dkliban wants to merge 3 commits intopulp:mainfrom
dkliban:7210
Open

Adds 'redis' WORKER_TYPE#7296
dkliban wants to merge 3 commits intopulp:mainfrom
dkliban:7210

Conversation

@dkliban
Copy link
Member

@dkliban dkliban commented Feb 8, 2026

This adds WORKER_TYPE setting. The default value is 'pulpcore'. When 'redis' is selected, the tasking system uses Redis to lock resources. Redis workers produce less load on the PostgreSQL database.

closes: #7210

Generated By: Claude Code.

📜 Checklist

  • Commits are cleanly separated with meaningful messages (simple features and bug fixes should be squashed to one commit)
  • A changelog entry or entries has been added for any significant changes
  • Follows the Pulp policy on AI Usage
  • (For new features) - User documentation and test coverage has been added

See: Pull Request Walkthrough

This adds WORKER_TYPE setting. The default value is 'pulpcore'. When 'redis' is selected,
the tasking system uses Redis to lock resources. Redis workers produce less load on the
PostgreSQL database.

closes: pulp#7210

Generated By: Claude Code.
@dkliban dkliban force-pushed the 7210 branch 2 times, most recently from da96f66 to dd46ad1 Compare February 13, 2026 16:11
Added redis connection checks to the worker so it shuts down if the connection is broken.
Copy link
Contributor

@gerrod3 gerrod3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's still a lot I haven't deeply reviewed yet, but this was getting long and I had a big idea around dispatch that I want to discuss

Comment on lines 192 to 203
current_app = AppStatus.objects.current()
if current_app:
_logger.info(
"TASK EXECUTION: Task %s being executed by %s (app_type=%s)",
task.pk,
current_app.name,
current_app.app_type,
)
else:
_logger.info(
"TASK EXECUTION: Task %s being executed with no AppStatus.current()", task.pk
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? Can this be moved to log_task_start? The value should be set on the task object after set_running is called.

Copy link
Member Author

@dkliban dkliban Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed. This was added when I was doing some debugging. I am going to remove this logging.

Comment on lines 237 to 240
finally:
# Safety net: if we crashed before reaching the lock release above,
# still try to release locks here (e.g., if crash during task execution)
if safe_release_task_locks(task):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure this is wrong. finally always runs regardless of exception or returning early.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is not right, but the finally block is still needed in certain exception scenarios. I'll update the comment to reflect this.

Comment on lines +183 to +186
def execute_task(task):
"""Redis-aware task execution that releases Redis locks for immediate tasks."""
# This extra stack is needed to isolate the current_task ContextVar
contextvars.copy_context().run(_execute_task, task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading through this version and the base version there is nothing much different between the two besides that this one calls safe_release_task_locks. Could this be a wrapper of the original with a try/finally to release the redis locks?

Comment on lines +488 to +489
current_app = AppStatus.objects.current()
lock_owner = current_app.name if current_app else f"immediate-{task.pk}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably same question as before in _execute_task, but when is this ever None?

Copy link
Member Author

@dkliban dkliban Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should never be None. Once again this is remains of code left over from debugging initial issues.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove it then.

except Exception:
# Exception during execute_task()
# Atomically release all locks as safety net
safe_release_task_locks(task, lock_owner)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed, execute_task should already handle letting go of the locks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is correct. i'll update the comment to more accurately state that the except block is for the case where using_workdir() fails before the execute function gets a chance to run and release locks itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you create a redis_using_workdir(task, app_lock) that handles this in a finally block?

Comment on lines +466 to +470
task = Task.objects.create(**task_payload)
if execute_now:
# Try to atomically acquire task lock and resource locks
# are_resources_available() now acquires ALL locks atomically
if are_resources_available(task):
Copy link
Contributor

@gerrod3 gerrod3 Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Crazy idea, not sure if you want to try it: but how about trying to acquire the redis locks before the task hits the DB?

In the default worker scenario, dispatch acquires the lock on the task on creation because app_lock is set to the current task dispatcher (usually the API worker) and the task worker's fetch_task can only select from tasks that have this field be null.

The new redis worker selects from any task that is waiting and thus there is this time window between the the task object hitting the DB and line 470's are_resources_available that you have to account for inside dispatch. Instead we can acquire the task lock first, then create the task, then try to acquire the task's needed resources locks, if successful execute, else defer and finally do a safe release of the task lock. This way dispatch shouldn't be fighting against task workers to get the task lock.

Suggested change
task = Task.objects.create(**task_payload)
if execute_now:
# Try to atomically acquire task lock and resource locks
# are_resources_available() now acquires ALL locks atomically
if are_resources_available(task):
# note that the pulp_id is set once the object is instantiated even if not saved to the DB yet!
task = Task(**task_payload)
# new function to just acquire the lock on the task
aquire_lock(task.pulp_id)
task.save()
if execute_now:
# Change this function to only get task's resources since we already hold the task lock
if are_resources_available(resources):
# now guarenteed to have task + resource locks

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i like this idea. I would be interested in implementing it as a follow up PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok instead of pre-aquiring the task lock before it hits the DB we can just do what the normal worker does and have fetch_task only select tasks that have app_lock=None. Then we can remove all this task refreshing logic because there won't be any worker able to grab the lock before us.

Copy link
Contributor

@gerrod3 gerrod3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lots of comment/logging statements to remove. Need more specificity on the try/except blocks. And finally there are gaps in the task logic that need to be addressed.

local resource_name = ARGV[2 + i]

-- Remove from set
local removed = redis.call("srem", key, lock_owner)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call can fail if the item at key is no longer a set, i.e. is now a string for an exclusive lock.

# Determine lock owner
if lock_owner is None:
current_app = AppStatus.objects.current()
lock_owner = current_app.name if current_app else f"immediate-{task.pk}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ever set to immediate-{task.pk}? Shouldn't AppStatus.objects.current() always return an object?

# Determine lock owner
if lock_owner is None:
current_app = await sync_to_async(AppStatus.objects.current)()
lock_owner = current_app.name if current_app else f"immediate-{task.pk}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

return ["error"] # Return non-empty list to indicate failure


def release_resource_locks(redis_conn, lock_owner, task_lock_key, resources, shared_resources=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def release_resource_locks(redis_conn, lock_owner, task_lock_key, resources, shared_resources=None):
def release_resource_locks(redis_conn, lock_owner, task_lock_key, resources=None, shared_resources=None):

Comment on lines +391 to +399
# Log debug for successful releases
num_released_exclusive = len(exclusive_resources) - len(not_owned_exclusive)
num_released_shared = len(shared_resources) - len(not_in_shared)
if num_released_exclusive > 0:
_logger.debug("Released %d exclusive lock(s)", num_released_exclusive)
if num_released_shared > 0:
_logger.debug("Released %d shared lock(s)", num_released_shared)
if not task_lock_not_owned:
_logger.debug("Released task lock %s", task_lock_key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need these debug logs?

than 5 seconds, then subtracts the number of active workers to get the
number of tasks waiting to be picked up by workers.
"""
# Calculate the cutoff time (5 seconds ago)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove the inline comments from this method? The code should speak for itself.

Comment on lines +427 to +440
def _release_resource_locks(self, task_lock_key, resources, shared_resources=None):
"""
Atomically release task lock and resource locks.

Uses a Lua script to ensure we only release locks that we own.

Args:
task_lock_key (str): Redis key for the task lock (e.g., "task:{task_id}")
resources (list): List of exclusive resource names to release locks for
shared_resources (list): Optional list of shared resource names
"""
release_resource_locks(
self.redis_conn, self.name, task_lock_key, resources, shared_resources
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this method, it doesn't do anything meaningful.

"""
# Query waiting tasks, sorted by creation time, limited
waiting_tasks = (
Task.objects.filter(state=TASK_STATES.WAITING)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Task.objects.filter(state=TASK_STATES.WAITING)
Task.objects.filter(state=TASK_STATES.WAITING, app_lock=None)

Ok thinking about it some more, this is super important! We need to change the dispatch and set_running code that handles app_lock back to how the normal worker does it. So much of our task logic depends on this and changing it is a fools errand. cleanup_redis_locks_for_worker is completely inoperable without app_lock being correct!

):
self.ignored_task_ids.remove(pk)

def cleanup_redis_locks_for_worker(self, app_worker):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment about fixing the app_lock logic. This method currently can't be reasoned about or expected to work correctly without app_lock behaving exactly like the normal pulpcore worker.

Comment on lines +466 to +470
task = Task.objects.create(**task_payload)
if execute_now:
# Try to atomically acquire task lock and resource locks
# are_resources_available() now acquires ALL locks atomically
if are_resources_available(task):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok instead of pre-aquiring the task lock before it hits the DB we can just do what the normal worker does and have fetch_task only select tasks that have app_lock=None. Then we can remove all this task refreshing logic because there won't be any worker able to grab the lock before us.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add a Redis based worker to the tasking system

2 participants

Comments