Skip to content

Commit

Permalink
Don't pass TaskId by reference
Browse files Browse the repository at this point in the history
  • Loading branch information
mendess committed Sep 24, 2024
1 parent 15df4a8 commit 20af072
Show file tree
Hide file tree
Showing 24 changed files with 294 additions and 319 deletions.
4 changes: 2 additions & 2 deletions crates/dapf/src/acceptance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ impl Test {
.test_produce_agg_job_req(
fake_leader_hpke_receiver_config,
self,
task_id,
*task_id,
part_batch_sel,
&DapAggregationParam::Empty,
reports_for_agg_job,
Expand Down Expand Up @@ -566,7 +566,7 @@ impl Test {
)
.with_context(|| "failed to parse response to AggregateInitReq from Helper")?;
let agg_share_span = task_config.consume_agg_job_resp(
task_id,
*task_id,
agg_job_state,
agg_job_resp,
self.metrics(),
Expand Down
4 changes: 2 additions & 2 deletions crates/dapf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ async fn handle_leader_actions(
.produce_report(
&[leader_hpke_config, helper_hpke_config],
now,
&task_id,
task_id,
measurement,
version,
)
Expand Down Expand Up @@ -592,7 +592,7 @@ async fn handle_leader_actions(
.into_vdaf()
.consume_encrypted_agg_shares(
receiver,
&task_id,
task_id,
&batch_selector,
collect_resp.report_count,
&DapAggregationParam::Empty,
Expand Down
48 changes: 19 additions & 29 deletions crates/daphne-server/src/roles/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl DapAggregator for crate::App {
#[tracing::instrument(skip(self, task_config, agg_share_span))]
async fn try_put_agg_share_span(
&self,
task_id: &TaskId,
task_id: TaskId,
task_config: &DapTaskConfig,
agg_share_span: DapAggregateSpan<DapAggregateShare>,
) -> DapAggregateSpan<Result<(), MergeAggShareError>> {
Expand Down Expand Up @@ -78,15 +78,13 @@ impl DapAggregator for crate::App {
#[tracing::instrument(skip(self))]
async fn get_agg_share(
&self,
task_id: &TaskId,
task_id: TaskId,
batch_sel: &BatchSelector,
) -> Result<DapAggregateShare, DapError> {
let task_config = self
.get_task_config_for(task_id)
.await?
.ok_or(DapError::Abort(DapAbort::UnrecognizedTask {
task_id: *task_id,
}))?;
.ok_or(DapError::Abort(DapAbort::UnrecognizedTask { task_id }))?;

let durable = self.durable();
let mut requests = Vec::new();
Expand Down Expand Up @@ -114,15 +112,13 @@ impl DapAggregator for crate::App {
#[tracing::instrument(skip(self))]
async fn mark_collected(
&self,
task_id: &TaskId,
task_id: TaskId,
batch_sel: &BatchSelector,
) -> Result<(), DapError> {
let task_config = self
.get_task_config_for(task_id)
.await?
.ok_or(DapError::Abort(DapAbort::UnrecognizedTask {
task_id: *task_id,
}))?;
.ok_or(DapError::Abort(DapAbort::UnrecognizedTask { task_id }))?;

let durable = self.durable();
let mut requests = Vec::new();
Expand Down Expand Up @@ -188,13 +184,13 @@ impl DapAggregator for crate::App {

async fn taskprov_opt_in(
&self,
task_id: &TaskId,
task_id: TaskId,
task_config: taskprov::DapTaskConfigNeedsOptIn,
global_config: &DapGlobalConfig,
) -> Result<DapTaskConfig, DapError> {
if let Some(param) = self
.kv()
.get_cloned::<kv::prefix::TaskprovOptInParam>(task_id, &KvGetOptions::default())
.get_cloned::<kv::prefix::TaskprovOptInParam>(&task_id, &KvGetOptions::default())
.await
.map_err(|e| fatal_error!(err = ?e, "failed to get TaskprovOptInParam from kv"))?
{
Expand All @@ -211,7 +207,7 @@ impl DapAggregator for crate::App {
if let Err(e) = self
.kv()
.put_with_expiration::<kv::prefix::TaskprovOptInParam>(
task_id,
&task_id,
param,
expiration_time,
)
Expand Down Expand Up @@ -251,10 +247,10 @@ impl DapAggregator for crate::App {

async fn get_task_config_for<'req>(
&'req self,
task_id: &'req TaskId,
task_id: TaskId,
) -> Result<Option<Self::WrappedDapTaskConfig<'req>>, DapError> {
self.kv()
.get_cloned::<kv::prefix::TaskConfig>(task_id, &KvGetOptions::default())
.get_cloned::<kv::prefix::TaskConfig>(&task_id, &KvGetOptions::default())
.await
.map_err(|e| fatal_error!(err = ?e, "failed to get a task config from kv: {task_id}"))
}
Expand All @@ -268,15 +264,13 @@ impl DapAggregator for crate::App {

async fn is_batch_overlapping(
&self,
task_id: &TaskId,
task_id: TaskId,
batch_sel: &BatchSelector,
) -> Result<bool, DapError> {
let task_config = self
.get_task_config_for(task_id)
.await?
.ok_or(DapError::Abort(DapAbort::UnrecognizedTask {
task_id: *task_id,
}))?;
.ok_or(DapError::Abort(DapAbort::UnrecognizedTask { task_id }))?;

// Check whether the request overlaps with previous requests. This is done by
// checking the AggregateStore and seeing whether it requests for aggregate
Expand All @@ -301,13 +295,11 @@ impl DapAggregator for crate::App {
)
}

async fn batch_exists(&self, task_id: &TaskId, batch_id: &BatchId) -> Result<bool, DapError> {
async fn batch_exists(&self, task_id: TaskId, batch_id: &BatchId) -> Result<bool, DapError> {
let task_config = self
.get_task_config_for(task_id)
.await?
.ok_or(DapError::Abort(DapAbort::UnrecognizedTask {
task_id: *task_id,
}))?;
.ok_or(DapError::Abort(DapAbort::UnrecognizedTask { task_id }))?;
let version = task_config.as_ref().version;

let agg_span = task_config.batch_span_for_sel(&BatchSelector::FixedSizeByBatchId {
Expand Down Expand Up @@ -399,7 +391,7 @@ impl HpkeProvider for crate::App {
async fn get_hpke_config_for<'s>(
&'s self,
version: DapVersion,
_task_id: Option<&TaskId>,
_task_id: Option<TaskId>,
) -> Result<Self::WrappedHpkeConfig<'static>, DapError> {
self.kv()
.get_mapped::<kv::prefix::HpkeReceiverConfigSet, _, _>(
Expand All @@ -418,13 +410,11 @@ impl HpkeProvider for crate::App {
.ok_or_else(|| fatal_error!(err = "there are no hpke configs in kv!!", %version))
}

async fn can_hpke_decrypt(&self, task_id: &TaskId, config_id: u8) -> Result<bool, DapError> {
async fn can_hpke_decrypt(&self, task_id: TaskId, config_id: u8) -> Result<bool, DapError> {
let version = self
.get_task_config_for(task_id)
.await?
.ok_or(DapError::Abort(DapAbort::UnrecognizedTask {
task_id: *task_id,
}))?
.ok_or(DapError::Abort(DapAbort::UnrecognizedTask { task_id }))?
.version;

Ok(self
Expand All @@ -444,7 +434,7 @@ impl HpkeProvider for crate::App {
impl HpkeDecrypter for crate::App {
async fn hpke_decrypt(
&self,
task_id: &TaskId,
task_id: TaskId,
info: &[u8],
aad: &[u8],
ciphertext: &HpkeCiphertext,
Expand All @@ -453,7 +443,7 @@ impl HpkeDecrypter for crate::App {
.get_task_config_for(task_id)
.await?
.as_ref()
.ok_or(DapAbort::UnrecognizedTask { task_id: *task_id })?
.ok_or(DapAbort::UnrecognizedTask { task_id })?
.version;
self.kv()
.peek::<kv::prefix::HpkeReceiverConfigSet, _, _>(
Expand Down
18 changes: 8 additions & 10 deletions crates/daphne-server/src/roles/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,23 @@ use url::Url;

#[async_trait]
impl DapLeader for crate::App {
async fn put_report(&self, report: &Report, task_id: &TaskId) -> Result<(), DapError> {
async fn put_report(&self, report: &Report, task_id: TaskId) -> Result<(), DapError> {
let task_config = self
.get_task_config_for(task_id)
.await?
.ok_or(DapAbort::UnrecognizedTask { task_id: *task_id })?;
.ok_or(DapAbort::UnrecognizedTask { task_id })?;

self.test_leader_state
.lock()
.await
.put_report(task_id, &task_config, report.clone())
}

async fn current_batch(&self, task_id: &TaskId) -> Result<BatchId, DapError> {
async fn current_batch(&self, task_id: TaskId) -> Result<BatchId, DapError> {
let task_config = self
.get_task_config_for(task_id)
.await?
.ok_or(DapError::Abort(DapAbort::UnrecognizedTask {
task_id: *task_id,
}))?;
.ok_or(DapError::Abort(DapAbort::UnrecognizedTask { task_id }))?;

self.test_leader_state
.lock()
Expand All @@ -49,15 +47,15 @@ impl DapLeader for crate::App {

async fn init_collect_job(
&self,
task_id: &TaskId,
task_id: TaskId,
coll_job_id: &CollectionJobId,
batch_sel: BatchSelector,
agg_param: DapAggregationParam,
) -> Result<url::Url, DapError> {
let task_config = self
.get_task_config_for(task_id)
.await?
.ok_or(DapAbort::UnrecognizedTask { task_id: *task_id })?;
.ok_or(DapAbort::UnrecognizedTask { task_id })?;

self.test_leader_state.lock().await.init_collect_job(
task_id,
Expand All @@ -70,7 +68,7 @@ impl DapLeader for crate::App {

async fn poll_collect_job(
&self,
task_id: &TaskId,
task_id: TaskId,
coll_job_id: &CollectionJobId,
) -> Result<DapCollectionJob, DapError> {
self.test_leader_state
Expand All @@ -81,7 +79,7 @@ impl DapLeader for crate::App {

async fn finish_collect_job(
&self,
task_id: &TaskId,
task_id: TaskId,
coll_job_id: &CollectionJobId,
collection: &Collection,
) -> Result<(), DapError> {
Expand Down
2 changes: 1 addition & 1 deletion crates/daphne-server/src/router/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ where
Ok(id) => id,
Err(e) => return AxumDapResponse::new_error(e, app.server_metrics()).into_response(),
};
match app.poll_collect_job(&req.task_id, collect_id).await {
match app.poll_collect_job(req.task_id, collect_id).await {
Ok(daphne::DapCollectionJob::Done(collect_resp)) => AxumDapResponse::new_success(
daphne::DapResponse {
version: req.version,
Expand Down
2 changes: 1 addition & 1 deletion crates/daphne-server/src/router/test_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ async fn leader_current_batch(
State(app): State<Arc<App>>,
Path(PathTaskId { task_id }): Path<PathTaskId>,
) -> impl IntoResponse {
match app.current_batch(&task_id).await {
match app.current_batch(task_id).await {
Ok(batch_id) => (StatusCode::OK, batch_id.to_base64url().into_bytes()).into_response(),
Err(e) => AxumDapResponse::new_error(e, &*app.metrics).into_response(),
}
Expand Down
Loading

0 comments on commit 20af072

Please sign in to comment.