Skip to content

Commit

Permalink
feat: race local builds against remotes
Browse files Browse the repository at this point in the history
- resolves #434
  • Loading branch information
m-hilgendorf committed Jan 14, 2025
1 parent 9e450cf commit 0e730ee
Showing 1 changed file with 134 additions and 86 deletions.
220 changes: 134 additions & 86 deletions packages/server/src/target/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,72 +116,127 @@ impl Server {
return Ok(Some(output));
}

// Get a remote build if one exists that satisfies the retry constraint.
'a: {
// Find a build.
let futures = self
.get_remote_clients()
.await?
.into_values()
.map(|client| {
let arg = arg.clone();
Box::pin(async move {
let arg = tg::target::build::Arg {
create: false,
// Create a build id for the local build, in order to avoid borrow checking errors when canceling in the case that a remote returns first.
let build_id = tg::build::Id::new();

// Create futures.
let local = self.try_create_local_build(build_id.clone(), id.clone(), arg.clone());
let remote = self.try_get_remote_build(id.clone(), arg.clone());

// Race the local/remote builds.
let build = match future::select(std::pin::pin!(local), std::pin::pin!(remote)).await {
future::Either::Left((local, remote)) => {
if let Ok(Some(local)) = local {
Some(local)
} else {
remote.await?
}
},
future::Either::Right((remote, local)) => {
if let Ok(Some(build)) = remote {
// Cancel the local build in the case that the remote won the race.
let server = self.clone();
tokio::spawn(async move {
let arg = tg::build::finish::Arg {
status: tg::build::Status::Canceled,
error: None,
output: None,
remote: None,
..arg.clone()
};
let tg::target::build::Output { build } =
client.build_target(id, arg).await?;
let build = tg::Build::with_id(build);
Ok::<_, tg::Error>(Some((build, client)))
})
})
.collect_vec();
server.try_finish_build(&build_id, arg).await.ok();
});
Some(build)
} else {
local.await?
}
},
};

// Wait for the first build.
if futures.is_empty() {
break 'a;
}
let Ok((Some((build, _remote)), _)) = future::select_ok(futures).await else {
break 'a;
};
// Bail if no build was found/spawned.
let Some(build) = build else {
return Ok(None);
};

// Add the build as a child of the parent.
if let Some(parent) = arg.parent.as_ref() {
self.try_add_build_child(parent, build.id()).await.map_err(
|source| tg::error!(!source, %parent, %child = build.id(), "failed to add build as a child"),
)?;
}
// Add the build as a child of the parent.
if let Some(parent) = arg.parent.as_ref() {
self.try_add_build_child(parent, build.id()).await.map_err(
|source| tg::error!(!source, %parent, %child = build.id(), "failed to add build as a child"),
)?;
}

// Touch the build.
tokio::spawn({
let server = self.clone();
let build = build.clone();
async move {
let arg = tg::build::touch::Arg { remote: None };
server.touch_build(build.id(), arg).await.ok();
}
});
// Create the output.
let output = tg::target::build::Output {
build: build.id().clone(),
};
Ok(Some(output))
}

// Create the output.
let output = tg::target::build::Output {
build: build.id().clone(),
};
async fn try_get_remote_build(
&self,
id: tg::target::Id,
arg: tg::target::build::Arg,
) -> tg::Result<Option<tg::Build>> {
// Find a build.
let futures = self
.get_remote_clients()
.await?
.into_values()
.map(|client| {
let arg = arg.clone();
let id = id.clone();
Box::pin(async move {
let arg = tg::target::build::Arg {
create: false,
remote: None,
..arg.clone()
};
let tg::target::build::Output { build } = client.build_target(&id, arg).await?;
let build = tg::Build::with_id(build);
Ok::<_, tg::Error>(Some((build, client)))
})
})
.collect_vec();

return Ok(Some(output));
// Wait for the first build.
if futures.is_empty() {
return Ok(None);
}
let Ok((Some((build, _remote)), _)) = future::select_ok(futures).await else {
return Ok(None);
};

// If the create arg is false, then return `None`.
// Add the build as a child of the parent.
if let Some(parent) = arg.parent.as_ref() {
self.try_add_build_child(parent, build.id()).await.map_err(
|source| tg::error!(!source, %parent, %child = build.id(), "failed to add build as a child"),
)?;
}

// Touch the build.
tokio::spawn({
let server = self.clone();
let build = build.clone();
async move {
let arg = tg::build::touch::Arg { remote: None };
server.touch_build(build.id(), arg).await.ok();
}
});

Ok(Some(build))
}

async fn try_create_local_build(
&self,
build_id: tg::build::Id,
target_id: tg::target::Id,
arg: tg::target::build::Arg,
) -> tg::Result<Option<tg::Build>> {
if !arg.create {
return Ok(None);
}

// Otherwise, create a new build.
let build_id = tg::build::Id::new();

// Get the host.
let target = tg::Target::with_id(id.clone());
let target = tg::Target::with_id(target_id.clone());
let host = target.host(self).await?;

// Put the build.
Expand All @@ -195,7 +250,7 @@ impl Server {
output: None,
retry: arg.retry,
status: tg::build::Status::Enqueued,
target: id.clone(),
target: target_id.clone(),
created_at: time::OffsetDateTime::now_utc(),
enqueued_at: Some(time::OffsetDateTime::now_utc()),
dequeued_at: None,
Expand All @@ -215,13 +270,6 @@ impl Server {
)?;
}

// Add the build to the parent.
if let Some(parent) = arg.parent.as_ref() {
self.try_add_build_child(parent, build.id()).await.map_err(
|source| tg::error!(!source, %parent, %child = build.id(), "failed to add build as a child"),
)?;
}

// Publish the message.
tokio::spawn({
let server = self.clone();
Expand All @@ -236,35 +284,35 @@ impl Server {
});

// Spawn a task to spawn the build when the parent's permit is available.
let server = self.clone();
let parent = arg.parent.clone();
let build = build.clone();
tokio::spawn(async move {
// Acquire the parent's permit.
let Some(permit) = parent.as_ref().and_then(|parent| {
tokio::spawn({
let server = self.clone();
let parent = arg.parent.clone();
let build = build.clone();
async move {
// Acquire the parent's permit.
let Some(permit) = parent.as_ref().and_then(|parent| {
server
.build_permits
.get(parent)
.map(|permit| permit.clone())
}) else {
return;
};
let permit = permit
.lock_owned()
.map(|guard| BuildPermit(Either::Right(guard)))
.await;

// Attempt to spawn the build.
server
.build_permits
.get(parent)
.map(|permit| permit.clone())
}) else {
return;
};
let permit = permit
.lock_owned()
.map(|guard| BuildPermit(Either::Right(guard)))
.await;

// Attempt to spawn the build.
server
.spawn_build(build, permit, None)
.await
.inspect_err(|error| tracing::error!(?error, "failed to spawn the build"))
.ok();
.spawn_build(build, permit, None)
.await
.inspect_err(|error| tracing::error!(?error, "failed to spawn the build"))
.ok();
}
});

let output = tg::target::build::Output { build: build_id };

Ok(Some(output))
Ok(Some(build))
}

async fn detect_build_cycle(
Expand Down

0 comments on commit 0e730ee

Please sign in to comment.