Skip to content

Commit

Permalink
Add pull-through caching
Browse files Browse the repository at this point in the history
closes #507
  • Loading branch information
lubosmj committed Aug 1, 2023
1 parent fd749a6 commit e2e6823
Show file tree
Hide file tree
Showing 13 changed files with 736 additions and 100 deletions.
3 changes: 3 additions & 0 deletions CHANGES/507.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added support for pull-through caching. Users can now create a distribution with a remote pointing
to a remote registry without specifying the upstream name and Pulp automatically downloads missing
content and acts as a smart proxy.
38 changes: 38 additions & 0 deletions pulp_container/app/downloaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@
import json
import ssl
import re
import tempfile

from aiohttp.client_exceptions import ClientResponseError
from logging import getLogger
from multidict import MultiDict
from urllib import parse

from django.conf import settings

from pulpcore.plugin.models import Artifact, Task
from pulpcore.plugin.download import DownloaderFactory, HttpDownloader
from pulpcore.plugin import pulp_hashlib

from pulp_container.constants import V2_ACCEPT_HEADERS

Expand Down Expand Up @@ -95,14 +100,47 @@ async def _run(self, handle_401=True, extra_data=None):
return await self._run(handle_401=False, extra_data=extra_data)
else:
raise

to_return = await self._handle_response(response)

await response.release()
self.response_headers = response.headers

if self._close_session_on_finalize:
self.session.close()
return to_return

def _ensure_writer_has_open_file(self):
"""
Create a temporary file on demand.
Create a temporary file when it's actually used, allowing plugin writers to instantiate
many downloaders in memory.
This method sets the path of NamedTemporaryFile dynamically based on whether it is running
from a task or not. Otherwise, permission errors might be raised when Pulp is trying to
download a file from api-app and write to a user space.
"""
if not self._writer:
dir_path = settings.WORKING_DIRECTORY if Task.current() is None else "."
self._writer = tempfile.NamedTemporaryFile(dir=dir_path, delete=False)
self.path = self._writer.name
self._digests = {n: pulp_hashlib.new(n) for n in Artifact.DIGEST_FIELDS}
self._size = 0

def fetch(self, extra_data=None):
"""
Run the download synchronously with additional data and return the `DownloadResult`.
Returns:
:class:`~pulpcore.plugin.download.DownloadResult`
or :class:`~aiohttp.client.ClientResponse`
Raises:
Exception: Any fatal exception emitted during downloading
"""
return asyncio.get_event_loop().run_until_complete(self.run(extra_data=extra_data))

async def update_token(self, response_auth_header, used_token, repo_name):
"""
Update the Bearer token to be used with all requests.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Generated by Django 4.2.4 on 2023-08-01 19:00

from django.db import migrations, models
import django.db.models.deletion
import pulpcore.app.models.access_policy


class Migration(migrations.Migration):
dependencies = [
("core", "0108_task_versions"),
("container", "0036_containerpushrepository_pending_blobs_manifests"),
]

operations = [
migrations.CreateModel(
name="ContainerPullThroughDistribution",
fields=[
(
"distribution_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="core.distribution",
),
),
(
"private",
models.BooleanField(
default=False,
help_text="Restrict pull access to explicitly authorized users. Defaults to unrestricted pull access.",
),
),
],
options={
"default_related_name": "%(app_label)s_%(model_name)s",
},
bases=("core.distribution", pulpcore.app.models.access_policy.AutoAddObjPermsMixin),
),
migrations.CreateModel(
name="ContainerPullThroughRemote",
fields=[
(
"remote_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="core.remote",
),
),
],
options={
"default_related_name": "%(app_label)s_%(model_name)s",
},
bases=("core.remote", pulpcore.app.models.access_policy.AutoAddObjPermsMixin),
),
migrations.AddField(
model_name="containerrepository",
name="pending_blobs",
field=models.ManyToManyField(related_name="pending_blobs", to="container.blob"),
),
migrations.AddField(
model_name="containerrepository",
name="pending_manifests",
field=models.ManyToManyField(to="container.manifest"),
),
migrations.AddField(
model_name="containerrepository",
name="pending_tags",
field=models.ManyToManyField(to="container.tag"),
),
migrations.AddField(
model_name="containerrepository",
name="remaining_blobs",
field=models.ManyToManyField(related_name="remaining_blobs", to="container.blob"),
),
migrations.AddField(
model_name="containerdistribution",
name="pull_through_distribution",
field=models.ForeignKey(
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name="distributions",
to="container.containerpullthroughdistribution",
),
),
]
103 changes: 103 additions & 0 deletions pulp_container/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,66 @@ class Meta:
]


class ContainerPullThroughRemote(Remote, AutoAddObjPermsMixin):
"""
TODO: Add permissions.
"""

TYPE = "pull-through"

@property
def download_factory(self):
"""
Downloader Factory that maps to custom downloaders which support registry auth.
Upon first access, the DownloaderFactory is instantiated and saved internally.
Returns:
DownloadFactory: The instantiated DownloaderFactory to be used by
get_downloader()
"""
try:
return self._download_factory
except AttributeError:
self._download_factory = DownloaderFactory(
self,
downloader_overrides={
"http": downloaders.RegistryAuthHttpDownloader,
"https": downloaders.RegistryAuthHttpDownloader,
},
)
return self._download_factory

def get_downloader(self, remote_artifact=None, url=None, **kwargs):
"""
Get a downloader from either a RemoteArtifact or URL that is configured with this Remote.
This method accepts either `remote_artifact` or `url` but not both. At least one is
required. If neither or both are passed a ValueError is raised.
Args:
remote_artifact (:class:`~pulpcore.app.models.RemoteArtifact`): The RemoteArtifact to
download.
url (str): The URL to download.
kwargs (dict): This accepts the parameters of
:class:`~pulpcore.plugin.download.BaseDownloader`.
Raises:
ValueError: If neither remote_artifact and url are passed, or if both are passed.
Returns:
subclass of :class:`~pulpcore.plugin.download.BaseDownloader`: A downloader that
is configured with the remote settings.
"""
kwargs["remote"] = self
return super().get_downloader(remote_artifact=remote_artifact, url=url, **kwargs)

class Meta:
default_related_name = "%(app_label)s_%(model_name)s"


class ManifestSigningService(SigningService):
"""
Signing service used for creating container signatures.
Expand Down Expand Up @@ -485,6 +545,13 @@ class ContainerRepository(
ManifestSigningService, on_delete=models.SET_NULL, null=True
)

# temporary relations used for uncommitted pull-through cache operations
pending_tags = models.ManyToManyField(Tag)
pending_manifests = models.ManyToManyField(Manifest)
pending_blobs = models.ManyToManyField(Blob, related_name="pending_blobs")
# digests of remaining blobs to be attached to pending manifests
remaining_blobs = models.ManyToManyField(Blob, related_name="remaining_blobs")

class Meta:
default_related_name = "%(app_label)s_%(model_name)s"
permissions = [
Expand All @@ -507,6 +574,16 @@ def finalize_new_version(self, new_version):
"""
remove_duplicates(new_version)
validate_repo_version(new_version)
self.remove_pending_content(new_version)

def remove_pending_content(self, repository_version):
"""Remove pending blobs and manifests when committing the content to the repository."""
added_content = repository_version.added(
base_version=repository_version.base_version
).values_list("pk")
self.pending_tags.remove(*Tag.objects.filter(pk__in=added_content))
self.pending_manifests.remove(*Manifest.objects.filter(pk__in=added_content))
self.pending_blobs.remove(*Blob.objects.filter(pk__in=added_content))


class ContainerPushRepository(Repository, AutoAddObjPermsMixin):
Expand Down Expand Up @@ -563,6 +640,25 @@ def remove_pending_content(self, repository_version):
self.pending_manifests.remove(*Manifest.objects.filter(pk__in=added_content))


class ContainerPullThroughDistribution(Distribution, AutoAddObjPermsMixin):
"""
TODO: Add permissions.
"""

TYPE = "pull-through"

private = models.BooleanField(
default=False,
help_text=_(
"Restrict pull access to explicitly authorized users. "
"Defaults to unrestricted pull access."
),
)

class Meta:
default_related_name = "%(app_label)s_%(model_name)s"


class ContainerDistribution(Distribution, AutoAddObjPermsMixin):
"""
A container distribution defines how a repository version is distributed by Pulp's webserver.
Expand Down Expand Up @@ -593,6 +689,13 @@ class ContainerDistribution(Distribution, AutoAddObjPermsMixin):
)
description = models.TextField(null=True)

pull_through_distribution = models.ForeignKey(
ContainerPullThroughDistribution,
related_name="distributions",
on_delete=models.CASCADE,
null=True,
)

def get_repository_version(self):
"""
Returns the repository version that is supposed to be served by this ContainerDistribution.
Expand Down
34 changes: 18 additions & 16 deletions pulp_container/app/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,16 @@ async def get_tag(self, request):
pk__in=await sync_to_async(repository_version.get_content)(), name=tag_name
)
except ObjectDoesNotExist:
raise PathNotResolved(tag_name)
if distribution.remote:
repository = await repository_version.repository.acast()
try:
tag = await repository.pending_tags.select_related("tagged_manifest").aget(
name=tag_name
)
except ObjectDoesNotExist:
raise PathNotResolved(tag_name)
else:
raise PathNotResolved(tag_name)

# we do not convert OCI to docker
oci_mediatypes = [MEDIA_TYPE.MANIFEST_OCI, MEDIA_TYPE.INDEX_OCI]
Expand Down Expand Up @@ -155,8 +164,7 @@ async def get_tag(self, request):

async def dispatch_tag(self, request, tag, response_headers):
"""
Finds an artifact associated with a Tag and sends it to the client, otherwise tries
to stream it.
Finds an artifact associated with a Tag and sends it to the client.
Args:
request(:class:`~aiohttp.web.Request`): The request to prepare a response for.
Expand All @@ -169,13 +177,8 @@ async def dispatch_tag(self, request, tag, response_headers):
streamed back to the client.
"""
try:
artifact = await tag.tagged_manifest._artifacts.aget()
except ObjectDoesNotExist:
ca = await sync_to_async(lambda x: x[0])(tag.tagged_manifest.contentartifact_set.all())
return await self._stream_content_artifact(request, web.StreamResponse(), ca)
else:
return await Registry._dispatch(artifact, response_headers)
artifact = await sync_to_async(tag.tagged_manifest._artifacts.get)()
return await Registry._dispatch(artifact, response_headers)

@staticmethod
async def dispatch_converted_schema(tag, accepted_media_types, path):
Expand Down Expand Up @@ -219,7 +222,6 @@ async def get_by_digest(self, request):
"""
Return a response to the "GET" action.
"""

path = request.match_info["path"]
digest = "sha256:{digest}".format(digest=request.match_info["digest"])
distribution = await sync_to_async(self._match_distribution)(path)
Expand All @@ -233,15 +235,15 @@ async def get_by_digest(self, request):
content = await sync_to_async(repository_version.get_content)()

repository = await sync_to_async(repository_version.repository.cast)()
if repository.PUSH_ENABLED:
pending_blobs = repository.pending_blobs.values_list("pk")
pending_manifests = repository.pending_manifests.values_list("pk")
pending_content = pending_blobs.union(pending_manifests)
content |= Content.objects.filter(pk__in=pending_content)
pending_blobs = repository.pending_blobs.values_list("pk")
pending_manifests = repository.pending_manifests.values_list("pk")
pending_content = pending_blobs.union(pending_manifests)
content |= Content.objects.filter(pk__in=pending_content)

ca = await ContentArtifact.objects.select_related("artifact", "content").aget(
content__in=content, relative_path=digest
)

ca_content = await sync_to_async(ca.content.cast)()
if isinstance(ca_content, Blob):
media_type = BLOB_CONTENT_TYPE
Expand Down
Loading

0 comments on commit e2e6823

Please sign in to comment.