pulp.tasking

pulp.tasking.connection

pulpcore.tasking.connection.get_redis_connection()

pulp.tasking.constants

pulp.tasking.services.manage_workers

This module manages creation, deletion, starting, and stopping of the systemd unit files for Pulp’s RQ workers. It accepts one parameter, which must be start or stop.

pulpcore.tasking.services.manage_workers.main()

This function is executed by the systemd unit file to manage the worker units.

pulp.tasking.services.storage

class pulpcore.tasking.services.storage.WorkerDirectory(hostname)

The directory associated with a RQ worker.

Path format: <root>/<worker-hostname>

Variables

_path (str) – The absolute path.

Parameters

hostname (str) – The worker hostname.

create()

Create the directory.

The directory is deleted and recreated when already exists.

delete()

Delete the directory.

On permission denied - an attempt is made to recursively fix the permissions on the tree and the delete is retried.

MODE = 448
property path

The absolute path to the directory.

Returns

The absolute directory path.

Return type

str

class pulpcore.tasking.services.storage.WorkingDirectory

RQ Job working directory.

Path format: <worker-dir>/<task-id>

Examples

>>>
>>> with WorkingDirectory() as working_dir:
>>>     # directory created.
>>>     # process CWD = working_dir.path.
>>>     ....
>>> # directory deleted.
>>> # process CWD restored.
>>>

Args: hostname (str): The worker hostname.

pulp.tasking.services.worker_watcher

pulpcore.tasking.services.worker_watcher.check_worker_processes()

Look for missing Pulp worker processes, log and cleanup as needed.

To find a missing Worker process, filter the Workers model for entries older than utcnow() - WORKER_TTL. The heartbeat times are stored in native UTC, so this is a comparable datetime. For each missing worker found, call mark_worker_offline() synchronously for cleanup.

This method also checks that at least one resource-manager and one worker process is present. If there are zero of either, log at the error level that Pulp will not operate correctly.

pulpcore.tasking.services.worker_watcher.handle_worker_heartbeat(worker_name)

This is a generic function for updating worker heartbeat records.

Existing Worker objects are searched for one to update. If an existing one is found, it is updated. Otherwise a new Worker entry is created. Logging at the info level is also done.

Parameters

worker_name (str) – The hostname of the worker

pulpcore.tasking.services.worker_watcher.handle_worker_offline(worker_name)

This is a generic function for handling workers going offline.

_delete_worker() task is called to handle any work cleanup associated with a worker going offline. Logging at the info level is also done.

Parameters

worker_name (str) – The hostname of the worker

pulpcore.tasking.services.worker_watcher.mark_worker_offline(worker_name, normal_shutdown=False)

Mark the Worker as offline and cancel associated tasks.

If the worker shutdown normally, no message is logged, otherwise an error level message is logged. Default is to assume the worker did not shut down normally.

Any resource reservations associated with this worker are cleaned up by this function.

Any tasks associated with this worker are explicitly canceled.

Parameters
  • worker_name (str) –

  • normal_shutdown (bool) – True if the worker shutdown normally, False otherwise. Defaults to False.

pulpcore.tasking.services.worker_watcher.mark_worker_online(worker_name)

Sets some bookkeeping values on the worker record for tracking worker state

Parameters

worker_name (str) – The hostname of the worker

pulp.tasking.tasks

pulpcore.tasking.tasks.enqueue_with_reservation(func, resources, args=None, kwargs=None, options=None)

Enqueue a message to Pulp workers with a reservation.

This method provides normal enqueue functionality, while also requesting necessary locks for serialized urls. No two tasks that claim the same resource can execute concurrently. It accepts resources which it transforms into a list of urls (one for each resource).

This does not dispatch the task directly, but instead promises to dispatch it later by encapsulating the desired task through a call to a _queue_reserved_task() task. See the docblock on _queue_reserved_task() for more information on this.

This method creates a pulpcore.app.models.Task object. Pulp expects to poll on a task just after calling this method, so a Task entry needs to exist for it before it returns.

Parameters
  • func (callable) – The function to be run by RQ when the necessary locks are acquired.

  • resources (list) – A list of resources to reserve guaranteeing that only one task reserves these resources. Each resource can be either a (str) resource URL or a (django.models.Model) resource instance.

  • args (tuple) – The positional arguments to pass on to the task.

  • kwargs (dict) – The keyword arguments to pass on to the task.

  • options (dict) – The options to be passed on to the task.

Returns (rq.job.job): An RQ Job instance as returned by RQ’s enqueue function

Raises

ValueError – When resources is an unsupported type.

pulp.tasking.util

pulpcore.tasking.util.cancel(task_id)

Cancel the task that is represented by the given task_id.

This method cancels only the task with given task_id, not the spawned tasks. This also updates task’s state to ‘canceled’.

Parameters

task_id (basestring) – The ID of the task you wish to cancel

Raises

MissingResource – if a task with given task_id does not exist

pulpcore.tasking.util.get_current_worker()

Get the rq worker assigned to the current job

Returns

rq.worker.Worker: The worker assigned to the current job

Return type

class

pulpcore.tasking.util.get_url(model)

Get a resource url for the specified model object. This returns the path component of the resource URI. This is used in our resource locking/reservation code to identify resources.

Parameters

model (django.models.Model) – A model object.

Returns

The path component of the resource url

Return type

str

pulp.tasking.worker

class pulpcore.tasking.worker.PulpWorker(queues, **kwargs)

A Pulp worker for both the resource manager and generic workers

This worker is customized in the following ways:

  • Replaces the string ‘%h’ in the worker name with the fqdn

  • If the name starts with ‘reserved-resource-worker’ the worker ignores any other Queue configuration and only subscribes to a queue of the same name as the worker name

  • If the name starts with ‘resource-manager’ the worker ignores any other Queue configuration and only subscribes to the ‘resource-manager’ queue

  • Sets the worker TTL

  • Supports the killing of a job that is already running

  • Closes the database connection before forking so it is not process shared

execute_job(*args, **kwargs)

Close the database connection before forking, so that it is not shared

handle_job_failure(job, **kwargs)

Set the pulpcore.app.models.Task to failed and record the exception.

This method is called by rq to handle a job failure.

Parameters
  • job (rq.job.Job) – The job that experienced the failure

  • kwargs (dict) – Unused parameters

handle_job_success(job, queue, started_job_registry)

Set the pulpcore.app.models.Task to completed.

This method is called by rq to handle a job success.

Parameters
  • job (rq.job.Job) – The job that experienced the success

  • queue (rq.queue.Queue) – The Queue associated with the job

  • started_job_registry (rq.registry.StartedJobRegistry) – The RQ registry of started jobs

handle_warm_shutdown_request(*args, **kwargs)

Handle the warm shutdown of a RQ worker.

This cleans up any leftover records and marks the pulpcore.app.models.Worker record as being a clean shutdown.

Parameters
  • args (tuple) – unused positional arguments

  • kwargs (dict) – unused keyword arguments

heartbeat(*args, **kwargs)

Handle the heartbeat of a RQ worker.

This writes the heartbeat records to the pulpcore.app.models.Worker records.

Parameters
  • args (tuple) – unused positional arguments

  • kwargs (dict) – unused keyword arguments

perform_job(job, queue)

Set the pulpcore.app.models.Task to running and install a kill monitor Thread

This method is called by the worker’s work horse thread (the forked child) just before the task begins executing. It creates a Thread which monitors a special Redis key which if created should kill the task with SIGKILL.

Parameters
  • job (rq.job.Job) – The job to perform

  • queue (rq.queue.Queue) – The Queue associated with the job

register_birth(*args, **kwargs)

Handle the birth of a RQ worker.

This creates the working directory and removes any vestige records from a previous worker with the same name.

Parameters
  • args (tuple) – unused positional arguments

  • kwargs (dict) – unused keyword arguments

log_result_lifespan = False