pulp.tasking¶
pulp.tasking.constants¶
pulp.tasking.services.manage_workers¶
This module manages creation, deletion, starting, and stopping of the systemd unit files for Pulp’s RQ workers. It accepts one parameter, which must be start or stop.
-
pulpcore.tasking.services.manage_workers.
main
()¶ This function is executed by the systemd unit file to manage the worker units.
pulp.tasking.services.storage¶
-
class
pulpcore.tasking.services.storage.
WorkerDirectory
(hostname)¶ The directory associated with a RQ worker.
Path format: <root>/<worker-hostname>
- Variables
_path (str) – The absolute path.
- Parameters
hostname (str) – The worker hostname.
-
create
()¶ Create the directory.
The directory is deleted and recreated when already exists.
-
delete
()¶ Delete the directory.
On permission denied - an attempt is made to recursively fix the permissions on the tree and the delete is retried.
-
MODE
= 448¶
-
property
path
¶ The absolute path to the directory.
- Returns
The absolute directory path.
- Return type
str
-
class
pulpcore.tasking.services.storage.
WorkingDirectory
¶ RQ Job working directory.
Path format: <worker-dir>/<task-id>
Examples
>>> >>> with WorkingDirectory() as working_dir: >>> # directory created. >>> # process CWD = working_dir.path. >>> .... >>> # directory deleted. >>> # process CWD restored. >>>
Args: hostname (str): The worker hostname.
pulp.tasking.services.worker_watcher¶
-
pulpcore.tasking.services.worker_watcher.
check_worker_processes
()¶ Look for missing Pulp worker processes, log and cleanup as needed.
To find a missing Worker process, filter the Workers model for entries older than utcnow() - WORKER_TTL. The heartbeat times are stored in native UTC, so this is a comparable datetime. For each missing worker found, call mark_worker_offline() synchronously for cleanup.
This method also checks that at least one resource-manager and one worker process is present. If there are zero of either, log at the error level that Pulp will not operate correctly.
-
pulpcore.tasking.services.worker_watcher.
handle_worker_heartbeat
(worker_name)¶ This is a generic function for updating worker heartbeat records.
Existing Worker objects are searched for one to update. If an existing one is found, it is updated. Otherwise a new Worker entry is created. Logging at the info level is also done.
- Parameters
worker_name (str) – The hostname of the worker
-
pulpcore.tasking.services.worker_watcher.
handle_worker_offline
(worker_name)¶ This is a generic function for handling workers going offline.
_delete_worker() task is called to handle any work cleanup associated with a worker going offline. Logging at the info level is also done.
- Parameters
worker_name (str) – The hostname of the worker
-
pulpcore.tasking.services.worker_watcher.
mark_worker_offline
(worker_name, normal_shutdown=False)¶ Mark the
Worker
as offline and cancel associated tasks.If the worker shutdown normally, no message is logged, otherwise an error level message is logged. Default is to assume the worker did not shut down normally.
Any resource reservations associated with this worker are cleaned up by this function.
Any tasks associated with this worker are explicitly canceled.
- Parameters
worker_name (str) –
normal_shutdown (bool) – True if the worker shutdown normally, False otherwise. Defaults to False.
-
pulpcore.tasking.services.worker_watcher.
mark_worker_online
(worker_name)¶ Sets some bookkeeping values on the worker record for tracking worker state
- Parameters
worker_name (str) – The hostname of the worker
pulp.tasking.tasks¶
-
pulpcore.tasking.tasks.
enqueue_with_reservation
(func, resources, args=None, kwargs=None, options=None)¶ Enqueue a message to Pulp workers with a reservation.
This method provides normal enqueue functionality, while also requesting necessary locks for serialized urls. No two tasks that claim the same resource can execute concurrently. It accepts resources which it transforms into a list of urls (one for each resource).
This does not dispatch the task directly, but instead promises to dispatch it later by encapsulating the desired task through a call to a
_queue_reserved_task()
task. See the docblock on_queue_reserved_task()
for more information on this.This method creates a
pulpcore.app.models.Task
object. Pulp expects to poll on a task just after calling this method, so a Task entry needs to exist for it before it returns.- Parameters
func (callable) – The function to be run by RQ when the necessary locks are acquired.
resources (list) – A list of resources to reserve guaranteeing that only one task reserves these resources. Each resource can be either a (str) resource URL or a (django.models.Model) resource instance.
args (tuple) – The positional arguments to pass on to the task.
kwargs (dict) – The keyword arguments to pass on to the task.
options (dict) – The options to be passed on to the task.
Returns (rq.job.job): An RQ Job instance as returned by RQ’s enqueue function
- Raises
ValueError – When resources is an unsupported type.
pulp.tasking.util¶
-
pulpcore.tasking.util.
cancel
(task_id)¶ Cancel the task that is represented by the given task_id.
This method cancels only the task with given task_id, not the spawned tasks. This also updates task’s state to ‘canceled’.
- Parameters
task_id (basestring) – The ID of the task you wish to cancel
- Raises
MissingResource – if a task with given task_id does not exist
-
pulpcore.tasking.util.
get_current_worker
()¶ Get the rq worker assigned to the current job
- Returns
rq.worker.Worker: The worker assigned to the current job
- Return type
class
-
pulpcore.tasking.util.
get_url
(model)¶ Get a resource url for the specified model object. This returns the path component of the resource URI. This is used in our resource locking/reservation code to identify resources.
- Parameters
model (django.models.Model) – A model object.
- Returns
The path component of the resource url
- Return type
str
pulp.tasking.worker¶
-
class
pulpcore.tasking.worker.
PulpWorker
(queues, **kwargs)¶ A Pulp worker for both the resource manager and generic workers
This worker is customized in the following ways:
Replaces the string ‘%h’ in the worker name with the fqdn
If the name starts with ‘reserved-resource-worker’ the worker ignores any other Queue configuration and only subscribes to a queue of the same name as the worker name
If the name starts with ‘resource-manager’ the worker ignores any other Queue configuration and only subscribes to the ‘resource-manager’ queue
Sets the worker TTL
Supports the killing of a job that is already running
Closes the database connection before forking so it is not process shared
-
execute_job
(*args, **kwargs)¶ Close the database connection before forking, so that it is not shared
-
handle_job_failure
(job, **kwargs)¶ Set the
pulpcore.app.models.Task
to failed and record the exception.This method is called by rq to handle a job failure.
- Parameters
job (rq.job.Job) – The job that experienced the failure
kwargs (dict) – Unused parameters
-
handle_job_success
(job, queue, started_job_registry)¶ Set the
pulpcore.app.models.Task
to completed.This method is called by rq to handle a job success.
- Parameters
job (rq.job.Job) – The job that experienced the success
queue (rq.queue.Queue) – The Queue associated with the job
started_job_registry (rq.registry.StartedJobRegistry) – The RQ registry of started jobs
-
handle_warm_shutdown_request
(*args, **kwargs)¶ Handle the warm shutdown of a RQ worker.
This cleans up any leftover records and marks the
pulpcore.app.models.Worker
record as being a clean shutdown.- Parameters
args (tuple) – unused positional arguments
kwargs (dict) – unused keyword arguments
-
heartbeat
(*args, **kwargs)¶ Handle the heartbeat of a RQ worker.
This writes the heartbeat records to the
pulpcore.app.models.Worker
records.- Parameters
args (tuple) – unused positional arguments
kwargs (dict) – unused keyword arguments
-
perform_job
(job, queue)¶ Set the
pulpcore.app.models.Task
to running and install a kill monitor ThreadThis method is called by the worker’s work horse thread (the forked child) just before the task begins executing. It creates a Thread which monitors a special Redis key which if created should kill the task with SIGKILL.
- Parameters
job (rq.job.Job) – The job to perform
queue (rq.queue.Queue) – The Queue associated with the job
-
register_birth
(*args, **kwargs)¶ Handle the birth of a RQ worker.
This creates the working directory and removes any vestige records from a previous worker with the same name.
- Parameters
args (tuple) – unused positional arguments
kwargs (dict) – unused keyword arguments
-
log_result_lifespan
= False¶