Skip to content

Commit

Permalink
feat: add package names api for gateway (#819)
Browse files Browse the repository at this point in the history
  • Loading branch information
nichmor authored Aug 16, 2024
1 parent 1e99d7a commit 149f433
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 7 deletions.
5 changes: 4 additions & 1 deletion crates/rattler_repodata_gateway/src/gateway/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::fetch;
use crate::fetch::{FetchRepoDataError, RepoDataNotFoundError};
use crate::gateway::direct_url_query::DirectUrlQueryError;
use rattler_conda_types::{Channel, MatchSpec};
use rattler_conda_types::{Channel, InvalidPackageNameError, MatchSpec};
use rattler_redaction::Redact;
use reqwest_middleware::Error;
use simple_spawn_blocking::Cancelled;
Expand Down Expand Up @@ -44,6 +44,9 @@ pub enum GatewayError {

#[error("the package from url '{0}', doesn't have the same name as the match spec filename intents '{1}'")]
UrlRecordNameMismatch(String, String),

#[error(transparent)]
InvalidPackageName(#[from] InvalidPackageNameError),
}

impl From<Cancelled> for GatewayError {
Expand Down
8 changes: 8 additions & 0 deletions crates/rattler_repodata_gateway/src/gateway/local_subdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,12 @@ impl SubdirClient for LocalSubdirClient {
})
.await
}

fn package_names(&self) -> Vec<String> {
let sparse_repodata: Arc<SparseRepoData> = self.sparse.clone();
sparse_repodata
.package_names()
.map(std::convert::Into::into)
.collect()
}
}
25 changes: 22 additions & 3 deletions crates/rattler_repodata_gateway/src/gateway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use dashmap::{mapref::entry::Entry, DashMap};
pub use error::GatewayError;
use file_url::url_to_path;
use local_subdir::LocalSubdirClient;
pub use query::GatewayQuery;
pub use query::{NamesQuery, RepoDataQuery};
use rattler_cache::package_cache::PackageCache;
use rattler_conda_types::{Channel, MatchSpec, Platform};
pub use repo_data::RepoData;
Expand Down Expand Up @@ -99,7 +99,7 @@ impl Gateway {
channels: ChannelIter,
platforms: PlatformIter,
specs: PackageNameIter,
) -> GatewayQuery
) -> RepoDataQuery
where
AsChannel: Into<Channel>,
ChannelIter: IntoIterator<Item = AsChannel>,
Expand All @@ -108,14 +108,33 @@ impl Gateway {
PackageNameIter: IntoIterator<Item = IntoMatchSpec>,
IntoMatchSpec: Into<MatchSpec>,
{
GatewayQuery::new(
RepoDataQuery::new(
self.inner.clone(),
channels.into_iter().map(Into::into).collect(),
platforms.into_iter().collect(),
specs.into_iter().map(Into::into).collect(),
)
}

/// Return all names from repodata
pub fn names<AsChannel, ChannelIter, PlatformIter>(
&self,
channels: ChannelIter,
platforms: PlatformIter,
) -> NamesQuery
where
AsChannel: Into<Channel>,
ChannelIter: IntoIterator<Item = AsChannel>,
PlatformIter: IntoIterator<Item = Platform>,
<PlatformIter as IntoIterator>::IntoIter: Clone,
{
NamesQuery::new(
self.inner.clone(),
channels.into_iter().map(Into::into).collect(),
platforms.into_iter().collect(),
)
}

/// Clears any in-memory cache for the given channel.
///
/// Any subsequent query will re-fetch any required data from the source.
Expand Down
104 changes: 101 additions & 3 deletions crates/rattler_repodata_gateway/src/gateway/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{gateway::direct_url_query::DirectUrlQuery, Reporter};
/// with the same channels will not result in the repodata being fetched
/// twice.
#[derive(Clone)]
pub struct GatewayQuery {
pub struct RepoDataQuery {
/// The gateway that manages all resources
gateway: Arc<GatewayInner>,

Expand All @@ -43,7 +43,7 @@ pub struct GatewayQuery {
reporter: Option<Arc<dyn Reporter>>,
}

impl GatewayQuery {
impl RepoDataQuery {
/// Constructs a new instance. This should not be called directly, use
/// [`Gateway::query`] instead.
pub(super) fn new(
Expand Down Expand Up @@ -271,11 +271,109 @@ impl GatewayQuery {
}
}

impl IntoFuture for GatewayQuery {
impl IntoFuture for RepoDataQuery {
type Output = Result<Vec<RepoData>, GatewayError>;
type IntoFuture = futures::future::BoxFuture<'static, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
self.execute().boxed()
}
}

/// Represents a query for package names to execute with a [`Gateway`].
///
/// When executed the query will asynchronously load the package names from all
/// subdirectories (combination of channels and platforms).
#[derive(Clone)]
pub struct NamesQuery {
/// The gateway that manages all resources
gateway: Arc<GatewayInner>,

/// The channels to fetch from
channels: Vec<Channel>,

/// The platforms the fetch from
platforms: Vec<Platform>,

/// The reporter to use by the query.
reporter: Option<Arc<dyn Reporter>>,
}

impl NamesQuery {
/// Constructs a new instance. This should not be called directly, use
/// [`Gateway::names`] instead.
pub(super) fn new(
gateway: Arc<GatewayInner>,
channels: Vec<Channel>,
platforms: Vec<Platform>,
) -> Self {
Self {
gateway,
channels,
platforms,

reporter: None,
}
}

/// Sets the reporter to use for this query.
///
/// The reporter is notified of important evens during the execution of the
/// query. This allows reporting progress back to a user.
pub fn with_reporter(self, reporter: impl Reporter + 'static) -> Self {
Self {
reporter: Some(Arc::new(reporter)),
..self
}
}

/// Execute the query and return the package names.
pub async fn execute(self) -> Result<Vec<PackageName>, GatewayError> {
// Collect all the channels and platforms together
let channels_and_platforms = self
.channels
.iter()
.cartesian_product(self.platforms.into_iter())
.collect_vec();

// Create barrier cells for each subdirectory.
// This can be used to wait until the subdir becomes available.
let mut pending_subdirs = FuturesUnordered::new();
for (channel, platform) in channels_and_platforms {
// Create a barrier so work that need this subdir can await it.
// Set the subdir to prepend the direct url queries in the result.

let inner = self.gateway.clone();
let reporter = self.reporter.clone();
pending_subdirs.push(async move {
match inner
.get_or_create_subdir(channel, platform, reporter)
.await
{
Ok(subdir) => Ok(subdir.package_names().unwrap_or_default()),
Err(e) => Err(e),
}
});
}
let mut names: HashSet<String> = HashSet::default();

while let Some(result) = pending_subdirs.next().await {
let subdir_names = result?;
names.extend(subdir_names);
}

Ok(names
.into_iter()
.map(PackageName::try_from)
.collect::<Result<Vec<PackageName>, _>>()?)
}
}

impl IntoFuture for NamesQuery {
type Output = Result<Vec<PackageName>, GatewayError>;
type IntoFuture = futures::future::BoxFuture<'static, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
self.execute().boxed()
}
}
4 changes: 4 additions & 0 deletions crates/rattler_repodata_gateway/src/gateway/remote_subdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,8 @@ impl SubdirClient for RemoteSubdirClient {
) -> Result<Arc<[RepoDataRecord]>, GatewayError> {
self.sparse.fetch_package_records(name, reporter).await
}

fn package_names(&self) -> Vec<String> {
self.sparse.package_names()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ impl SubdirClient for ShardedSubdir {

Ok(records.into())
}

fn package_names(&self) -> Vec<String> {
self.sharded_repodata.shards.keys().cloned().collect()
}
}

/// Atomically writes the shard bytes to the cache.
Expand Down
17 changes: 17 additions & 0 deletions crates/rattler_repodata_gateway/src/gateway/subdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ pub enum Subdir {
Found(SubdirData),
}

impl Subdir {
/// Returns the names of all packages in the subdirectory.
pub fn package_names(&self) -> Option<Vec<String>> {
match self {
Subdir::Found(subdir) => Some(subdir.package_names()),
Subdir::NotFound => None,
}
}
}

/// Fetches and caches repodata records by package name for a specific subdirectory of a channel.
pub struct SubdirData {
/// The client to use to fetch repodata.
Expand Down Expand Up @@ -135,6 +145,10 @@ impl SubdirData {

Ok(records)
}

pub fn package_names(&self) -> Vec<String> {
self.client.package_names()
}
}

/// A client that can be used to fetch repodata for a specific subdirectory.
Expand All @@ -146,4 +160,7 @@ pub trait SubdirClient: Send + Sync {
name: &PackageName,
reporter: Option<&dyn Reporter>,
) -> Result<Arc<[RepoDataRecord]>, GatewayError>;

/// Returns the names of all packages in the subdirectory.
fn package_names(&self) -> Vec<String>;
}

0 comments on commit 149f433

Please sign in to comment.