Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: race local builds against remotes #439

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 134 additions & 86 deletions packages/server/src/target/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,72 +116,127 @@ impl Server {
return Ok(Some(output));
}

// Get a remote build if one exists that satisfies the retry constraint.
'a: {
// Find a build.
let futures = self
.get_remote_clients()
.await?
.into_values()
.map(|client| {
let arg = arg.clone();
Box::pin(async move {
let arg = tg::target::build::Arg {
create: false,
// Create a build id for the local build, in order to avoid borrow checking errors when canceling in the case that a remote returns first.
let build_id = tg::build::Id::new();

// Create futures.
let local = self.try_create_local_build(build_id.clone(), id.clone(), arg.clone());
let remote = self.try_get_remote_build(id.clone(), arg.clone());

// Race the local/remote builds.
let build = match future::select(std::pin::pin!(local), std::pin::pin!(remote)).await {
future::Either::Left((local, remote)) => {
if let Ok(Some(local)) = local {
Some(local)
} else {
remote.await?
}
},
future::Either::Right((remote, local)) => {
if let Ok(Some(build)) = remote {
// Cancel the local build in the case that the remote won the race.
let server = self.clone();
tokio::spawn(async move {
let arg = tg::build::finish::Arg {
status: tg::build::Status::Canceled,
error: None,
output: None,
remote: None,
..arg.clone()
};
let tg::target::build::Output { build } =
client.build_target(id, arg).await?;
let build = tg::Build::with_id(build);
Ok::<_, tg::Error>(Some((build, client)))
})
})
.collect_vec();
server.try_finish_build(&build_id, arg).await.ok();
});
Some(build)
} else {
local.await?
}
},
};

// Wait for the first build.
if futures.is_empty() {
break 'a;
}
let Ok((Some((build, _remote)), _)) = future::select_ok(futures).await else {
break 'a;
};
// Bail if no build was found/spawned.
let Some(build) = build else {
return Ok(None);
};

// Add the build as a child of the parent.
if let Some(parent) = arg.parent.as_ref() {
self.try_add_build_child(parent, build.id()).await.map_err(
|source| tg::error!(!source, %parent, %child = build.id(), "failed to add build as a child"),
)?;
}
// Add the build as a child of the parent.
if let Some(parent) = arg.parent.as_ref() {
self.try_add_build_child(parent, build.id()).await.map_err(
|source| tg::error!(!source, %parent, %child = build.id(), "failed to add build as a child"),
)?;
}

// Touch the build.
tokio::spawn({
let server = self.clone();
let build = build.clone();
async move {
let arg = tg::build::touch::Arg { remote: None };
server.touch_build(build.id(), arg).await.ok();
}
});
// Create the output.
let output = tg::target::build::Output {
build: build.id().clone(),
};
Ok(Some(output))
}

// Create the output.
let output = tg::target::build::Output {
build: build.id().clone(),
};
async fn try_get_remote_build(
&self,
id: tg::target::Id,
arg: tg::target::build::Arg,
) -> tg::Result<Option<tg::Build>> {
// Find a build.
let futures = self
.get_remote_clients()
.await?
.into_values()
.map(|client| {
let arg = arg.clone();
let id = id.clone();
Box::pin(async move {
let arg = tg::target::build::Arg {
create: false,
remote: None,
..arg.clone()
};
let tg::target::build::Output { build } = client.build_target(&id, arg).await?;
let build = tg::Build::with_id(build);
Ok::<_, tg::Error>(Some((build, client)))
})
})
.collect_vec();

return Ok(Some(output));
// Wait for the first build.
if futures.is_empty() {
return Ok(None);
}
let Ok((Some((build, _remote)), _)) = future::select_ok(futures).await else {
return Ok(None);
};

// If the create arg is false, then return `None`.
// Add the build as a child of the parent.
if let Some(parent) = arg.parent.as_ref() {
self.try_add_build_child(parent, build.id()).await.map_err(
|source| tg::error!(!source, %parent, %child = build.id(), "failed to add build as a child"),
)?;
}

// Touch the build.
tokio::spawn({
let server = self.clone();
let build = build.clone();
async move {
let arg = tg::build::touch::Arg { remote: None };
server.touch_build(build.id(), arg).await.ok();
}
});

Ok(Some(build))
}

async fn try_create_local_build(
&self,
build_id: tg::build::Id,
target_id: tg::target::Id,
arg: tg::target::build::Arg,
) -> tg::Result<Option<tg::Build>> {
if !arg.create {
return Ok(None);
}

// Otherwise, create a new build.
let build_id = tg::build::Id::new();

// Get the host.
let target = tg::Target::with_id(id.clone());
let target = tg::Target::with_id(target_id.clone());
let host = target.host(self).await?;

// Put the build.
Expand All @@ -195,7 +250,7 @@ impl Server {
output: None,
retry: arg.retry,
status: tg::build::Status::Enqueued,
target: id.clone(),
target: target_id.clone(),
created_at: time::OffsetDateTime::now_utc(),
enqueued_at: Some(time::OffsetDateTime::now_utc()),
dequeued_at: None,
Expand All @@ -215,13 +270,6 @@ impl Server {
)?;
}

// Add the build to the parent.
if let Some(parent) = arg.parent.as_ref() {
self.try_add_build_child(parent, build.id()).await.map_err(
|source| tg::error!(!source, %parent, %child = build.id(), "failed to add build as a child"),
)?;
}

// Publish the message.
tokio::spawn({
let server = self.clone();
Expand All @@ -236,35 +284,35 @@ impl Server {
});

// Spawn a task to spawn the build when the parent's permit is available.
let server = self.clone();
let parent = arg.parent.clone();
let build = build.clone();
tokio::spawn(async move {
// Acquire the parent's permit.
let Some(permit) = parent.as_ref().and_then(|parent| {
tokio::spawn({
let server = self.clone();
let parent = arg.parent.clone();
let build = build.clone();
async move {
// Acquire the parent's permit.
let Some(permit) = parent.as_ref().and_then(|parent| {
server
.build_permits
.get(parent)
.map(|permit| permit.clone())
}) else {
return;
};
let permit = permit
.lock_owned()
.map(|guard| BuildPermit(Either::Right(guard)))
.await;

// Attempt to spawn the build.
server
.build_permits
.get(parent)
.map(|permit| permit.clone())
}) else {
return;
};
let permit = permit
.lock_owned()
.map(|guard| BuildPermit(Either::Right(guard)))
.await;

// Attempt to spawn the build.
server
.spawn_build(build, permit, None)
.await
.inspect_err(|error| tracing::error!(?error, "failed to spawn the build"))
.ok();
.spawn_build(build, permit, None)
.await
.inspect_err(|error| tracing::error!(?error, "failed to spawn the build"))
.ok();
}
});

let output = tg::target::build::Output { build: build_id };

Ok(Some(output))
Ok(Some(build))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to wait for the build to be spawned here, or just let it win?

}

async fn detect_build_cycle(
Expand Down