Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 66 additions & 30 deletions populate_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@


logger = logging.getLogger()
QUAY_REGISTRY_TAG_PAGE_SIZE = 100
QUAY_TAG_FETCH_CONCURRENCY = 20
QUAY_TAG_FETCH_PER_SECOND = 20


class RepositoryKind(str, Enum):
Expand Down Expand Up @@ -53,16 +56,11 @@ class RepositoryListResponse(pydantic.BaseModel):
next_page: Optional[str] = None


class RepositoryTag(pydantic.BaseModel):
"""Define the repository tag data of interest."""
class TagListResponse(pydantic.BaseModel):
"""Define the repository tag list data of interest."""

name: str


class SingleRepositoryResponse(Repository):
"""Define the single repository data of interest."""

tags: Dict[str, RepositoryTag]
tags: List[str]


class ContainerImageParser(HTMLParser):
Expand Down Expand Up @@ -111,14 +109,33 @@ async def fetch_all(
params["namespace"] = repository
async with httpx.AsyncClient(
base_url=api_url, headers=headers, timeout=httpx.Timeout(12)
) as client:
names = await cls._fetch_names(client=client, params=params)
) as api_client, httpx.AsyncClient(
base_url=cls._registry_url(api_url),
headers=headers,
timeout=httpx.Timeout(12),
) as registry_client:
names = await cls._fetch_names(client=api_client, params=params)
images = await cls._fetch_tags(
client=client, repository=repository, names=names
client=registry_client, repository=repository, names=names
)
log_images(log_file=log_file, images=images)
return images

@staticmethod
def _registry_url(api_url: str) -> str:
"""Derive the registry API root from the Quay API root."""
url = httpx.URL(api_url)
api_path = url.path
if api_path.endswith("/api/v1/"):
registry_prefix = api_path[:-len("/api/v1/")]
registry_path = f"{registry_prefix}/v2/"
elif api_path.endswith("/api/v1"):
registry_prefix = api_path[:-len("/api/v1")]
registry_path = f"{registry_prefix}/v2/"
else:
registry_path = "/v2/"
return str(url.copy_with(path=registry_path, query=None, fragment=None))

@classmethod
async def _fetch_names(
cls, client: httpx.AsyncClient, params: Dict[str, str]
Expand Down Expand Up @@ -171,8 +188,8 @@ async def _fetch_tags(
client: httpx.AsyncClient,
repository: str,
names: List[str],
max_concurrency: int = 10,
max_per_second: int = 10,
max_concurrency: int = QUAY_TAG_FETCH_CONCURRENCY,
max_per_second: int = QUAY_TAG_FETCH_PER_SECOND,
) -> List[str]:
"""
Fetch the image tags for each given container image.
Expand All @@ -181,21 +198,17 @@ async def _fetch_tags(
limits.

"""
requests = [
client.build_request(method="GET", url=f"repository/{repository}/{name}")
for name in names
]
images = []
with cls._progress_bar() as pbar:
task = pbar.add_task(description="Image Tags", total=len(requests))
task = pbar.add_task(description="Image Tags", total=len(names))
async with aiometer.amap(
partial(cls._fetch_single_repository, client),
requests,
partial(cls._fetch_single_repository_tags, client, repository),
names,
max_at_once=max_concurrency,
max_per_second=max_per_second,
) as results:
async for repo in results: # type: SingleRepositoryResponse
images.extend((f"{repo.name}:{tag}" for tag in repo.tags))
async for repo_images in results:
images.extend(repo_images)
pbar.update(task, advance=1)
return images

Expand All @@ -220,13 +233,30 @@ def _progress_bar(cls) -> rprog.Progress:
retry=tenacity.retry_if_exception_type(httpx.HTTPError),
before=tenacity.before_log(logger, logging.DEBUG),
)
async def _fetch_single_repository(
client: httpx.AsyncClient, request: httpx.Request
) -> SingleRepositoryResponse:
"""Fetch a single repository resource and parse the response."""
response = await client.send(request=request)
response.raise_for_status()
return SingleRepositoryResponse.parse_obj(response.json())
async def _fetch_single_repository_tags(
client: httpx.AsyncClient, repository: str, name: str
) -> List[str]:
"""Fetch all tags for a single repository from the registry tag endpoint."""
images = []
next_url = f"{repository}/{name}/tags/list"
params: Optional[Dict[str, int]] = {"n": QUAY_REGISTRY_TAG_PAGE_SIZE}
while next_url:
response = await client.get(next_url, params=params)
response.raise_for_status()
payload = TagListResponse.parse_obj(response.json())
images.extend((f"{name}:{tag}" for tag in payload.tags))
next_link = response.links.get("next")
next_url = QuayImageFetcher._normalize_next_link(client, next_link["url"]) if next_link else ""
params = None
return images

@staticmethod
def _normalize_next_link(client: httpx.AsyncClient, next_url: str) -> str:
"""Normalize relative Quay pagination links against the registry client base path."""
base_path = client.base_url.path
if next_url.startswith(base_path):
return next_url[len(base_path):]
return next_url.lstrip("/")


class SingularityImageFetcher:
Expand Down Expand Up @@ -309,8 +339,14 @@ def get_new_images(

def parse_denylist(filename: Path) -> List[str]:
"""Parse the list of images to skip."""
denylist = list()
with filename.open() as handle:
return [entry for line in handle.readlines() if (entry := line.strip()) and not line.startswith('#')]
for line in handle.readlines():
entry = line.strip()
if entry and not line.startswith('#'):
denylist.append(entry)
logger.info(f"{len(denylist):,} entries found on the skip-list")
return denylist


def generate_build_script(filename: Path, images: List[str]) -> None:
Expand Down