2929from pulpcore .app .apps import pulp_plugin_configs
3030from pulpcore .app .models import Worker , Task , ApiAppStatus , ContentAppStatus
3131from pulpcore .app .util import PGAdvisoryLock , get_domain
32+ from pulpcore .app import pubsub
3233from pulpcore .exceptions import AdvisoryLockError
3334
3435from pulpcore .tasking .storage import WorkerDirectory
5657THRESHOLD_UNBLOCKED_WAITING_TIME = 5
5758
5859
59- class BasePubSubBackend :
60- WORKER_WAKEUP = "pulp_worker_wakeup"
61- TASK_CANCELLATION = "pulp_worker_cancel"
62- WORKER_METRIC = "pulp_worker_metrics_heartbeat"
63-
64- def wakeup_workers (self ):
65- self .publish (self .WORKER_WAKEUP )
66-
67- def cancel_task (self ):
68- self .publish (self .TASK_CANCELLATION )
69-
70- def record_worker_metrics (self , now ):
71- self .publish (self .WORKER_METRIC , str (now ))
72-
73- # Specific implementation
74- def subscribe (self , channel , callback ):
75- raise NotImplementedError ()
76-
77- def unsubscribe (self , channel ):
78- raise NotImplementedError ()
79-
80- def publish (self , channel , message = None ):
81- raise NotImplementedError ()
82-
83- def fileno (self ):
84- """Add support for being used in select loop."""
85- raise NotImplementedError ()
86-
87- def fetch (self ):
88- """Fetch messages new message, if required."""
89- raise NotImplementedError ()
90-
91-
92- class PostgresPubSub (BasePubSubBackend ):
93-
94- def __init__ (self ):
95- self .cursor = connection .cursor ()
96- self .listening_callback = {}
97-
98- def _notification_handler (self , notification ):
99- callback = self .listening_callback [notification .channel ]
100- callback (message = notification .payload )
101-
102- def subscribe (self , channel , callback ):
103- self .listening_callback [channel ] = callback
104- self .cursor .execute (f"LISTEN { channel } " )
105- connection .connection .add_notify_handler (self ._notification_handler )
106-
107- def unsubscribe (self , channel ):
108- self .cursor .execute (f"UNLISTEN { channel } " )
109-
110- def publish (self , channel , message = None ):
111- if not message :
112- self .cursor .execute (f"NOTIFY { channel } " )
113- else :
114- self .cursor .execute (f"NOTIFY { channel } , { message } " )
115-
116- def fileno (self ):
117- return connection .connection .fileno ()
118-
119- def fetch (self ):
120- connection .connection .execute ("SELECT 1" )
121-
122-
12360class PulpcoreWorker :
12461 def __init__ (self ):
12562 # Notification states from several signal handlers
@@ -132,7 +69,7 @@ def __init__(self):
13269 self .heartbeat_period = timedelta (seconds = settings .WORKER_TTL / 3 )
13370 self .last_metric_heartbeat = timezone .now ()
13471 self .versions = {app .label : app .version for app in pulp_plugin_configs ()}
135- self .pubsub_backend = PostgresPubSub ()
72+ self .pubsub_backend = pubsub . PostgresPubSub (connection )
13673 self .worker = self .handle_worker_heartbeat ()
13774 # This defaults to immediate task cancellation.
13875 # It will be set into the future on moderately graceful worker shutdown,
@@ -277,7 +214,7 @@ def cancel_abandoned_task(self, task, final_state, reason=None):
277214 delete_incomplete_resources (task )
278215 task .set_canceled (final_state = final_state , reason = reason )
279216 if task .reserved_resources_record :
280- self .pubsub_backend . wakeup_workers ( )
217+ pubsub . wakeup_worker ( self .pubsub_backend )
281218 return True
282219
283220 def is_compatible (self , task ):
@@ -524,7 +461,7 @@ def supervise_task(self, task):
524461 if cancel_state :
525462 self .cancel_abandoned_task (task , cancel_state , cancel_reason )
526463 if task .reserved_resources_record :
527- self .pubsub_backend . wakeup_workers ( )
464+ pubsub . wakeup_worker ( self .pubsub_backend )
528465 self .task = None
529466
530467 def handle_available_tasks (self ):
@@ -581,11 +518,11 @@ def _record_unblocked_waiting_tasks_metric(self):
581518 unblocked_tasks_stats ["longest_unblocked_waiting_time" ].seconds
582519 )
583520
584- self . pubsub_backend . record_worker_metrics (str ( now ) )
521+ pubsub . record_worker_metrics (self . pubsub_backend , now )
585522
586523 def pubsub_setup (self ):
587524 def cancellation_callback (message ):
588- if message == str (self .task .pk ):
525+ if self . task and message == str (self .task .pk ):
589526 self .cancel_task = True
590527
591528 def wakeup_callback (message ):
0 commit comments