pulpcore.plugin.stages¶
Plugin writers can use the Stages API to create a high-performance, download-and-saving pipeline to make writing sync code easier. There are several parts to the API:
DeclarativeVersion is a generic pipeline useful for most synchronization use cases.
The builtin Stages including Artifact Related Stages and Content Related Stages.
The Stages API, which allows you to build custom stages and pipelines.
DeclarativeVersion¶
- class pulpcore.plugin.stages.DeclarativeVersion(first_stage, repository, mirror=False, acs=False)¶
A pipeline that creates a new
RepositoryVersion
from a stream ofDeclarativeContent
objects.The plugin writer needs to specify a first_stage that will create a
DeclarativeContent
object for each Content unit that should exist in theRepositoryVersion
.The pipeline stages perform the following steps by default:
Create the new
RepositoryVersion
Use the provided first_stage to construct
DeclarativeContent
Query existing artifacts to determine which are already local to Pulp with
QueryExistingArtifacts
Download any undownloaded
Artifact
objects withArtifactDownloader
Save the newly downloaded
Artifact
objects withArtifactSaver
Query for Content units already present in Pulp with
QueryExistingContents
Save new Content units not yet present in Pulp with
ContentSaver
Attach
RemoteArtifact
to theContent
viaRemoteArtifactSaver
Resolve the attached
Future
ofDeclarativeContent
withResolveContentFutures
Associate all content units with the new
RepositoryVersion
withContentAssociation
Unassociate any content units not declared in the stream (only when mirror=True) with
ContentUnassociation
To do this, the plugin writer should subclass the
Stage
class and define itsrun()
interface which returns a coroutine. This coroutine should download metadata, create the correspondingDeclarativeContent
objects, and put them into theasyncio.Queue
viaput()
to send them down the pipeline. For example:class MyFirstStage(Stage): def __init__(remote): self.remote = remote async def run(self): downloader = remote.get_downloader(url=remote.url) result = await downloader.run() for entry in read_my_metadata_file_somehow(result.path) unit = MyContent(entry) # make the content unit in memory-only artifact = Artifact(entry) # make Artifact in memory-only da = DeclarativeArtifact(artifact, url, entry.relative_path, self.remote) dc = DeclarativeContent(content=unit, d_artifacts=[da]) await self.put(dc)
To use your first stage with the pipeline you have to instantiate the subclass and pass it to
DeclarativeVersion
.Create the instance of the subclassed
Stage
object.Create the
DeclarativeVersion
instance, passing theStage
subclass instance to itCall the
create()
method on yourDeclarativeVersion
instance
Here is an example:
first_stage = MyFirstStage(remote) DeclarativeVersion(first_stage, repository_version).create()
- Parameters
first_stage (
Stage
) – The first stage to receiveDeclarativeContent
from.repository (
Repository
) – The repository receiving the new version.mirror (bool) – ‘True’ removes content units from the
RepositoryVersion
that are not requested in theDeclarativeVersion
stream. ‘False’ (additive) only adds content units observed in theDeclarativeVersion stream
, and does not remove any pre-existing units in theRepositoryVersion
. ‘False’ is the default.acs (bool) – When set to ‘True’ a new stage is added to look for Alternate Content Sources.
- class pulpcore.plugin.stages.DeclarativeArtifact(artifact=None, url=None, relative_path=None, remote=None, extra_data=None, deferred_download=False)¶
Relates an
Artifact
, how to download it, and its relative_path used later during publishing.This is used by the Stages API stages to determine if an
Artifact
is already present and ensure Pulp can download it in the future. The artifact can be either saved or unsaved. If unsaved, the artifact attributes may be incomplete because not all digest information can be computed until theArtifact
is downloaded.- Variables
artifact (
Artifact
) – AnArtifact
either saved or unsaved. If unsaved, it may have partial digest information attached to it.url (str) – the url to fetch the
Artifact
from.relative_path (str) – the relative_path this
Artifact
should be published at for any Publication.remote (
Remote
) – The remote used to fetch thisArtifact
.extra_data (dict) – A dictionary available for additional data to be stored in.
deferred_download (bool) – Whether this artifact should be downloaded and saved in the artifact stages. Defaults to False. See On-Demand Support.
- Raises
ValueError – If artifact, url, or relative_path are not specified. If remote is not
specified and artifact doesn't have a file. –
- class pulpcore.plugin.stages.DeclarativeContent(content=None, d_artifacts=None, extra_data=None)¶
Relates a Content unit and zero or more
DeclarativeArtifact
objects.This is used by the Stages API stages to determine if a Content unit is already present and ensure all of its associated
DeclarativeArtifact
objects are related correctly. The content can be either saved or unsaved depending on where in the Stages API pipeline this is used.- Variables
content (subclass of
Content
) – A Content unit, possibly unsavedd_artifacts (list) – A list of zero or more
DeclarativeArtifact
objects associated with content.extra_data (dict) – A dictionary available for additional data to be stored in.
- Raises
ValueError – If content is not specified.
- async resolution()¶
Coroutine that waits for the content to be saved to database. Returns the content unit.
Stages API¶
- async pulpcore.plugin.stages.create_pipeline(stages, maxsize=1000)¶
A coroutine that builds a Stages API linear pipeline from the list stages and runs it.
Each stage is an instance of a class derived from
pulpcore.plugin.stages.Stage
that implements therun()
coroutine. This coroutine reads asyncromously either from the items() iterator or the batches() iterator and outputs the items with put(). Here is an example of the simplest stage that only passes data:class MyStage(Stage): async def run(self): async for d_content in self.items(): # Fetch items from the previous stage await self.put(d_content) # Hand them over to the next stage
- Parameters
stages (list of coroutines) – A list of Stages API compatible coroutines.
maxsize (int) – The maximum amount of items a queue between two stages should hold. Optional and defaults to 100.
- Returns
A single coroutine that can be used to run, wait, or cancel the entire pipeline with.
- Raises
ValueError – When a stage instance is specified more than once.
- class pulpcore.plugin.stages.Stage¶
The base class for all Stages API stages.
To make a stage, inherit from this class and implement
run()
on the subclass.- async __call__()¶
This coroutine makes the stage callable.
It calls
run()
and signals the next stage that its work is finished.
- class pulpcore.plugin.stages.EndStage¶
A Stages API stage that drains incoming items and does nothing with the items. This is required at the end of all pipelines.
Without this stage, the maxsize of the last stage’s _out_q could fill up and block the entire pipeline.
- async __call__()¶
This method drains items from the last queue and drops them.
Importantly it does not try to put items into the nonexistent next queue.