Plugin writers can use the Stages API to create a high-performance, download-and-saving pipeline to make writing sync code easier. There are several parts to the API:

  1. DeclarativeVersion is a generic pipeline useful for most synchronization use cases.

  2. The builtin Stages including Artifact Related Stages and Content Related Stages.

  3. The Stages API, which allows you to build custom stages and pipelines.


class pulpcore.plugin.stages.DeclarativeVersion(first_stage, repository, mirror=False, acs=False)

A pipeline that creates a new RepositoryVersion from a stream of DeclarativeContent objects.

The plugin writer needs to specify a first_stage that will create a DeclarativeContent object for each Content unit that should exist in the RepositoryVersion.

The pipeline stages perform the following steps by default:

  1. Create the new RepositoryVersion

  2. Use the provided first_stage to construct DeclarativeContent

  3. Query existing artifacts to determine which are already local to Pulp with QueryExistingArtifacts

  4. Download any undownloaded Artifact objects with ArtifactDownloader

  5. Save the newly downloaded Artifact objects with ArtifactSaver

  6. Query for Content units already present in Pulp with QueryExistingContents

  7. Save new Content units not yet present in Pulp with ContentSaver

  8. Attach RemoteArtifact to the Content via RemoteArtifactSaver

  9. Resolve the attached Future of DeclarativeContent with ResolveContentFutures

  10. Associate all content units with the new RepositoryVersion with ContentAssociation

  11. Unassociate any content units not declared in the stream (only when mirror=True) with ContentUnassociation

To do this, the plugin writer should subclass the Stage class and define its run() interface which returns a coroutine. This coroutine should download metadata, create the corresponding DeclarativeContent objects, and put them into the asyncio.Queue via put() to send them down the pipeline. For example:

class MyFirstStage(Stage):

    def __init__(remote):
        self.remote = remote

    async def run(self):
        downloader = remote.get_downloader(url=remote.url)
        result = await downloader.run()
        for entry in read_my_metadata_file_somehow(result.path)
            unit = MyContent(entry)  # make the content unit in memory-only
            artifact = Artifact(entry)  # make Artifact in memory-only
            da = DeclarativeArtifact(artifact, url, entry.relative_path, self.remote)
            dc = DeclarativeContent(content=unit, d_artifacts=[da])
            await self.put(dc)

To use your first stage with the pipeline you have to instantiate the subclass and pass it to DeclarativeVersion.

  1. Create the instance of the subclassed Stage object.

  2. Create the DeclarativeVersion instance, passing the Stage subclass instance to it

  3. Call the create() method on your DeclarativeVersion instance

Here is an example:

first_stage = MyFirstStage(remote)
DeclarativeVersion(first_stage, repository_version).create()
  • first_stage (Stage) – The first stage to receive DeclarativeContent from.

  • repository (Repository) – The repository receiving the new version.

  • mirror (bool) – ‘True’ removes content units from the RepositoryVersion that are not requested in the DeclarativeVersion stream. ‘False’ (additive) only adds content units observed in the DeclarativeVersion stream, and does not remove any pre-existing units in the RepositoryVersion. ‘False’ is the default.

  • acs (bool) – When set to ‘True’ a new stage is added to look for Alternate Content Sources.

class pulpcore.plugin.stages.DeclarativeArtifact(artifact=None, url=None, urls=None, relative_path=None, remote=None, extra_data=None, deferred_download=False)

Relates an Artifact, how to download it, and its relative_path used later during publishing.

This is used by the Stages API stages to determine if an Artifact is already present and ensure Pulp can download it in the future. The artifact can be either saved or unsaved. If unsaved, the artifact attributes may be incomplete because not all digest information can be computed until the Artifact is downloaded.

  • artifact (Artifact) – An Artifact either saved or unsaved. If unsaved, it may have partial digest information attached to it.

  • url (str) – the url to fetch the Artifact from.

  • urls (List[str]) – A list of many possible URLs to fetch the Artifact from.

  • relative_path (str) – the relative_path this Artifact should be published at for any Publication.

  • remote (Remote) – The remote used to fetch this Artifact.

  • extra_data (dict) – A dictionary available for additional data to be stored in.

  • deferred_download (bool) – Whether this artifact should be downloaded and saved in the artifact stages. Defaults to False. See On-Demand Support.

  • ValueError – If artifact, url, or relative_path are not specified. If remote is not

  • specified and artifact doesn't have a file.

class pulpcore.plugin.stages.DeclarativeContent(content=None, d_artifacts=None, extra_data=None)

Relates a Content unit and zero or more DeclarativeArtifact objects.

This is used by the Stages API stages to determine if a Content unit is already present and ensure all of its associated DeclarativeArtifact objects are related correctly. The content can be either saved or unsaved depending on where in the Stages API pipeline this is used.

  • content (subclass of Content) – A Content unit, possibly unsaved

  • d_artifacts (list) – A list of zero or more DeclarativeArtifact objects associated with content.

  • extra_data (dict) – A dictionary available for additional data to be stored in.


ValueError – If content is not specified.

async resolution()

Coroutine that waits for the content to be saved to database. Returns the content unit.

Stages API

async pulpcore.plugin.stages.create_pipeline(stages, maxsize=1)

A coroutine that builds a Stages API linear pipeline from the list stages and runs it.

Each stage is an instance of a class derived from pulpcore.plugin.stages.Stage that implements the run() coroutine. This coroutine reads asynchronously either from the items() iterator or the batches() iterator and outputs the items with put(). Here is an example of the simplest stage that only passes data:

class MyStage(Stage):
    async def run(self):
        async for d_content in self.items():  # Fetch items from the previous stage
            await self.put(d_content)  # Hand them over to the next stage
  • stages (list of coroutines) – A list of Stages API compatible coroutines.

  • maxsize (int) – The maximum amount of items a queue between two stages should hold. Optional and defaults to 1.


A single coroutine that can be used to run, wait, or cancel the entire pipeline with.


ValueError – When a stage instance is specified more than once.

class pulpcore.plugin.stages.Stage

The base class for all Stages API stages.

To make a stage, inherit from this class and implement run() on the subclass.

async __call__()

This coroutine makes the stage callable.

It calls run() and signals the next stage that its work is finished.

class pulpcore.plugin.stages.EndStage

A Stages API stage that drains incoming items and does nothing with the items. This is required at the end of all pipelines.

Without this stage, the maxsize of the last stage’s _out_q could fill up and block the entire pipeline.

async __call__()

This method drains items from the last queue and drops them.

Importantly it does not try to put items into the nonexistent next queue.