pulpcore.plugin.stages

Plugin writers can use the Stages API to create a high-performance, download-and-saving pipeline to make writing sync code easier. There are several parts to the API:

  1. DeclarativeVersion is a generic pipeline useful for most synchronization use cases.
  2. The builtin Stages including Artifact Related Stages, Content Related Stages, and Content Association and Unassociation Stages.
  3. The Stages API, which allows you to build custom stages and pipelines.

DeclarativeVersion

class pulpcore.plugin.stages.DeclarativeVersion(first_stage, repository, mirror=False, remove_duplicates=None)

A pipeline that creates a new RepositoryVersion from a stream of DeclarativeContent objects.

The plugin writer needs to specify a first_stage that will create a DeclarativeContent object for each Content unit that should exist in the new RepositoryVersion.

The pipeline stages perform the following steps by default:

  1. Create the new RepositoryVersion
  2. Use the provided first_stage to construct DeclarativeContent
  3. Query existing artifacts to determine which are already local to Pulp with QueryExistingArtifacts
  4. Download any undownloaded Artifact objects with ArtifactDownloader
  5. Save the newly downloaded Artifact objects with ArtifactSaver
  6. Query for Content units already present in Pulp with QueryExistingContents
  7. Save new Content units not yet present in Pulp with ContentSaver
  8. Attach RemoteArtifact to the Content via RemoteArtifactSaver
  9. Resolve the attached Future of DeclarativeContent with ResolveContentFutures
  10. Remove duplicate content in the repository version if remove_duplicates is given by RemoveDuplicates
  11. Associate all content units with the new RepositoryVersion with ContentAssociation
  12. Unassociate any content units not declared in the stream (only when mirror=True) with ContentUnassociation

To do this, the plugin writer should subclass the Stage class and define its run() interface which returns a coroutine. This coroutine should download metadata, create the corresponding DeclarativeContent objects, and put them into the asyncio.Queue via put() to send them down the pipeline. For example:

>>> class MyFirstStage(Stage):
>>>
>>>     def __init__(remote):
>>>         self.remote = remote
>>>
>>>     async def run(self):
>>>         downloader = remote.get_downloader(url=remote.url)
>>>         result = await downloader.run()
>>>         for entry in read_my_metadata_file_somehow(result.path)
>>>             unit = MyContent(entry)  # make the content unit in memory-only
>>>             artifact = Artifact(entry)  # make Artifact in memory-only
>>>             da = DeclarativeArtifact(artifact, url, entry.relative_path, self.remote)
>>>             dc = DeclarativeContent(content=unit, d_artifacts=[da])
>>>             await self.put(dc)

To use your first stage with the pipeline you have to instantiate the subclass and pass it to DeclarativeVersion.

  1. Create the instance of the subclassed Stage object.
  2. Create the DeclarativeVersion instance, passing the Stage subclass instance to it
  3. Call the create() method on your DeclarativeVersion instance

Here is an example:

>>> first_stage = MyFirstStage(remote)
>>> DeclarativeVersion(first_stage, repository).create()

Example using remove_duplicates:

# This will enforce that within a repository version, FileContent.relative_path is # unique. >>> remove_dupes = [{‘model’: FileContent, ‘field_names’: [‘relative_path’]}] >>> DeclarativeVersion(first_stage, repository, remove_duplicates=remove_dupes).create()

Parameters:
  • first_stage (Stage) – The first stage to receive DeclarativeContent from.
  • repository (Repository) – The repository receiving the new version.
  • mirror (bool) – ‘True’ removes content units from the RepositoryVersion that are not requested in the DeclarativeVersion stream. ‘False’ (additive) only adds content units observed in the DeclarativeVersion stream, and does not remove any pre-existing units in the RepositoryVersion. ‘False’ is the default.
  • remove_duplicates (list) – A list of dictionaries that indicate objects which are considered duplicates within a single repository version. These objects will be removed from the new version, making room for the new objects passing through the pipeline. Each dict should have 2 keys, model, which is a subclass of pulpcore.plugin.models.Content and field_names which is a list of strings corresponding to fields on the provided model.
create()

Perform the work. This is the long-blocking call where all syncing occurs.

pipeline_stages(new_version)

Build the list of pipeline stages feeding into the ContentAssociation stage.

Plugin-writers may override this method to build a custom pipeline. This can be achieved by returning a list with different stages or by extending the list returned by this method.

Parameters:new_version (RepositoryVersion) – The new repository version that is going to be built.
Returns:List of Stage instances
Return type:list
class pulpcore.plugin.stages.DeclarativeArtifact(artifact=None, url=None, relative_path=None, remote=None, extra_data=None, deferred_download=False)

Relates an Artifact, how to download it, and its relative_path used later during publishing.

This is used by the Stages API stages to determine if an Artifact is already present and ensure Pulp can download it in the future. The artifact can be either saved or unsaved. If unsaved, the artifact attributes may be incomplete because not all digest information can be computed until the Artifact is downloaded.

Variables:
  • artifact (Artifact) – An Artifact either saved or unsaved. If unsaved, it may have partial digest information attached to it.
  • url (str) – the url to fetch the Artifact from.
  • relative_path (str) – the relative_path this Artifact should be published at for any Publication.
  • remote (Remote) – The remote used to fetch this Artifact.
  • extra_data (dict) – A dictionary available for additional data to be stored in.
  • deferred_download (bool) – Whether this artifact should be downloaded and saved in the artifact stages. Defaults to False. See On-Demand Support.
Raises:

ValueError – If artifact, url, relative_path, or remote are not specified.

class pulpcore.plugin.stages.DeclarativeContent(content=None, d_artifacts=None, extra_data=None, does_batch=True)

Relates a Content unit and zero or more DeclarativeArtifact objects.

This is used by the Stages API stages to determine if a Content unit is already present and ensure all of its associated DeclarativeArtifact objects are related correctly. The content can be either saved or unsaved depending on where in the Stages API pipeline this is used.

Variables:
  • content (subclass of Content) – A Content unit, possibly unsaved
  • d_artifacts (list) – A list of zero or more DeclarativeArtifact objects associated with content.
  • extra_data (dict) – A dictionary available for additional data to be stored in.
  • does_batch (bool) – If False, prevent batching mechanism to block this item. Defaults to True.
  • future (Future) – A future that gets resolved to the Content in the ResolveContentFutures stage. See the ResolveContentFutures stage for example usage.
Raises:

ValueError – If content is not specified.

get_or_create_future()

Return the existing or a new future.

If you rely on this future in a the course of the pipeline, consider clearing the does_batch attribute to prevent deadlocks. See the ResolveContentFutures stage for example usage.

Returns:An existing asyncio.Future or a newly created one.

Stages API

pulpcore.plugin.stages.create_pipeline(stages, maxsize=100)

A coroutine that builds a Stages API linear pipeline from the list stages and runs it.

Each stage is an instance of a class derived from pulpcore.plugin.stages.Stage that implements the run() coroutine. This coroutine reads asyncromously either from the items() iterator or the batches() iterator and outputs the items with put(). Here is an example of the simplest stage that only passes data.

>>> class MyStage(Stage):
>>>     async def run(self):
>>>         async for d_content in self.items():  # Fetch items from the previous stage
>>>             await self.put(d_content)  # Hand them over to the next stage
Parameters:
  • stages (list of coroutines) – A list of Stages API compatible coroutines.
  • maxsize (int) – The maximum amount of items a queue between two stages should hold. Optional and defaults to 100.
Returns:

A single coroutine that can be used to run, wait, or cancel the entire pipeline with.

Raises:

ValueError – When a stage instance is specified more than once.

class pulpcore.plugin.stages.Stage

The base class for all Stages API stages.

To make a stage, inherit from this class and implement run() on the subclass.

__call__()

This coroutine makes the stage callable.

It calls run() and signals the next stage that its work is finished.

batches(minsize=50)

Asynchronous iterator yielding batches of DeclarativeContent from self._in_q.

The iterator will try to get as many instances of DeclarativeContent as possible without blocking, but at least minsize instances.

Parameters:minsize (int) – The minimum batch size to yield (unless it is the final batch)
Yields:A list of DeclarativeContent instances

Examples

Used in stages to get large chunks of d_content instances from self._in_q:

class MyStage(Stage):
    async def run(self):
        async for batch in self.batches():
            for d_content in batch:
                # process declarative content
                await self.put(d_content)
items()

Asynchronous iterator yielding items of DeclarativeContent from self._in_q.

The iterator will get instances of DeclarativeContent one by one as they get available.

Yields:An instance of DeclarativeContent

Examples

Used in stages to get d_content instances one by one from self._in_q:

class MyStage(Stage):
    async def run(self):
        async for d_content in self.items():
            # process declarative content
            await self.put(d_content)
put(item)

Coroutine to pass items to the next stage.

Parameters:item – A handled instance of pulpcore.plugin.stages.DeclarativeContent
Raises:ValueError – When item is None.
run()

The coroutine that is run as part of this stage.

Returns:The coroutine that runs this stage.
class pulpcore.plugin.stages.EndStage

A Stages API stage that drains incoming items and does nothing with the items. This is required at the end of all pipelines.

Without this stage, the maxsize of the last stage’s _out_q could fill up and block the entire pipeline.

__call__()

This method drains items from the last queue and drops them.

Importantly it does not try to put items into the nonexistent next queue.

Content Association and Unassociation Stages

class pulpcore.plugin.stages.RemoveDuplicates(new_version, model, field_names)

Stage allows plugins to remove content that would break repository uniqueness constraints.

This stage is expected to be added by the DeclarativeVersion. See that class for example usage.

Parameters:
  • new_version (RepositoryVersion) – The repo version this stage unassociates content from.
  • model (pulpcore.plugin.models.Content) – Subclass of a Content model to indicate which content type to operate on.
  • field_names (list) – List of field names to ensure uniqueness within a repository version.
run()

The coroutine for this stage.

Returns:The coroutine for this stage.
class pulpcore.plugin.stages.ContentAssociation(new_version, *args, **kwargs)

A Stages API stage that associates content units with new_version.

This stage stores all content unit primary keys in memory before running. This is done to compute the units already associated but not received from self._in_q. These units are passed via self._out_q to the next stage as a django.db.models.query.QuerySet.

This stage creates a ProgressBar named ‘Associating Content’ that counts the number of units associated. Since it’s a stream the total count isn’t known until it’s finished.

Parameters:
  • new_version (RepositoryVersion) – The repo version this stage associates content with.
  • args – unused positional arguments passed along to Stage.
  • kwargs – unused keyword arguments passed along to Stage.
run()

The coroutine for this stage.

Returns:The coroutine for this stage.
class pulpcore.plugin.stages.ContentUnassociation(new_version, *args, **kwargs)

A Stages API stage that unassociates content units from new_version.

This stage creates a ProgressBar named ‘Un-Associating Content’ that counts the number of units un-associated. Since it’s a stream the total count isn’t known until it’s finished.

Parameters:
  • new_version (RepositoryVersion) – The repo version this stage unassociates content from.
  • args – unused positional arguments passed along to Stage.
  • kwargs – unused keyword arguments passed along to Stage.
run()

The coroutine for this stage.

Returns:The coroutine for this stage.