Skip to content

Commit

Permalink
[Turbopack] refactor filesystem writes to an effect based system (ver…
Browse files Browse the repository at this point in the history
…cel#72847)

### Why?

Before we just wrote the files in `FileSystem::write()`, but that caused some issues when two `FileSystem::write()` tasks are active (or even temporarily active) at the same time. They invalidate each other and basically caused a loop of writes.

For a long time we wanted to have a better solution for that, since that problem causes compilation to hang with 100% cpu load, which is quite annoying.

Also the eventual consistency can cause some unexpected writes to be done when the content or filename are on a temporary value that has not stabilised yet.

The problem applies to every side effect in tasks.

### What?

To solve that this refactoring changes the way side effects in tasks are handled.

Side effects in tasks should not be directly executed anymore. Instead one should emit an "effect" (`effect(async { ... })` helper method) to declare a side effect that should be executed. Top level execution is usually wrapped in a strongly consistent read of the top level task. Here one need to apply the "effects" (`apply_effects(operation).await?` helper method). Since this happens after the strongly consistent read, only effects that have been stabilised are executed.

It's also possible to call `apply_effects` somewhere nested in the execution (after a strongly consistent read) if side effects are necessary for further execution. e. g. we do that to emit the files for the node.js pool before executing the pool.

Internally effects are collectibles that are propagated through the call graph. Each effect can only be executed once, so updates to the graph will only apply new effects while existing effects are skipped as they were already executed.

The actual change is pretty simple, but `FileSystem::write` no longer returns a `Vc<Completion>` but nothing instead. This makes the change needed a bit bigger.

Good news: To emit a whole (potential circular) graph we could use recursion now instead of needing to walk the graph while avoiding cycles. This might allow some performance improvements for incremental builds.
  • Loading branch information
sokra authored Nov 18, 2024
1 parent 2186f7e commit be9b2dd
Show file tree
Hide file tree
Showing 30 changed files with 879 additions and 532 deletions.
31 changes: 21 additions & 10 deletions crates/napi/src/next_api/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use next_api::{
route::{Endpoint, WrittenEndpoint},
};
use tracing::Instrument;
use turbo_tasks::{Completion, ReadRef, Vc, VcValueType};
use turbo_tasks::{get_effects, Completion, Effects, ReadRef, Vc, VcValueType};
use turbopack_core::{
diagnostics::PlainDiagnostic,
error::PrettyPrintError,
Expand Down Expand Up @@ -104,38 +104,42 @@ async fn strongly_consistent_catch_collectables<R: VcValueType + Send>(
Option<ReadRef<R>>,
Arc<Vec<ReadRef<PlainIssue>>>,
Arc<Vec<ReadRef<PlainDiagnostic>>>,
Arc<Effects>,
)> {
let result = source.strongly_consistent().await;
let issues = get_issues(source).await?;
let diagnostics = get_diagnostics(source).await?;
let effects = Arc::new(get_effects(source).await?);

let result = if result.is_err() && issues.iter().any(|i| i.severity <= IssueSeverity::Error) {
None
} else {
Some(result?)
};

Ok((result, issues, diagnostics))
Ok((result, issues, diagnostics, effects))
}

#[turbo_tasks::value(serialization = "none")]
struct WrittenEndpointWithIssues {
written: Option<ReadRef<WrittenEndpoint>>,
issues: Arc<Vec<ReadRef<PlainIssue>>>,
diagnostics: Arc<Vec<ReadRef<PlainDiagnostic>>>,
effects: Arc<Effects>,
}

#[turbo_tasks::function]
async fn get_written_endpoint_with_issues(
endpoint: Vc<Box<dyn Endpoint>>,
) -> Result<Vc<WrittenEndpointWithIssues>> {
let write_to_disk = endpoint.write_to_disk();
let (written, issues, diagnostics) =
let (written, issues, diagnostics, effects) =
strongly_consistent_catch_collectables(write_to_disk).await?;
Ok(WrittenEndpointWithIssues {
written,
issues,
diagnostics,
effects,
}
.cell())
}
Expand All @@ -149,13 +153,15 @@ pub async fn endpoint_write_to_disk(
let endpoint = ***endpoint;
let (written, issues, diags) = turbo_tasks
.run_once(async move {
let operation = get_written_endpoint_with_issues(endpoint);
let WrittenEndpointWithIssues {
written,
issues,
diagnostics,
} = &*get_written_endpoint_with_issues(endpoint)
.strongly_consistent()
.await?;
effects,
} = &*operation.strongly_consistent().await?;
effects.apply().await?;

Ok((written.clone(), issues.clone(), diagnostics.clone()))
})
.await
Expand All @@ -180,9 +186,10 @@ pub fn endpoint_server_changed_subscribe(
func,
move || {
async move {
subscribe_issues_and_diags(endpoint, issues)
.strongly_consistent()
.await
let operation = subscribe_issues_and_diags(endpoint, issues);
let result = operation.strongly_consistent().await?;
result.effects.apply().await?;
Ok(result)
}
.instrument(tracing::info_span!("server changes subscription"))
},
Expand All @@ -191,6 +198,7 @@ pub fn endpoint_server_changed_subscribe(
changed: _,
issues,
diagnostics,
effects: _,
} = &*ctx.value;

Ok(vec![TurbopackResult {
Expand All @@ -210,6 +218,7 @@ struct EndpointIssuesAndDiags {
changed: Option<ReadRef<Completion>>,
issues: Arc<Vec<ReadRef<PlainIssue>>>,
diagnostics: Arc<Vec<ReadRef<PlainDiagnostic>>>,
effects: Arc<Effects>,
}

impl PartialEq for EndpointIssuesAndDiags {
Expand All @@ -233,12 +242,13 @@ async fn subscribe_issues_and_diags(
let changed = endpoint.server_changed();

if should_include_issues {
let (changed_value, issues, diagnostics) =
let (changed_value, issues, diagnostics, effects) =
strongly_consistent_catch_collectables(changed).await?;
Ok(EndpointIssuesAndDiags {
changed: changed_value,
issues,
diagnostics,
effects,
}
.cell())
} else {
Expand All @@ -247,6 +257,7 @@ async fn subscribe_issues_and_diags(
changed: Some(changed_value),
issues: Arc::new(vec![]),
diagnostics: Arc::new(vec![]),
effects: Arc::new(Effects::default()),
}
.cell())
}
Expand Down
32 changes: 22 additions & 10 deletions crates/napi/src/next_api/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tokio::{io::AsyncWriteExt, time::Instant};
use tracing::Instrument;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Registry};
use turbo_rcstr::RcStr;
use turbo_tasks::{Completion, ReadRef, TransientInstance, UpdateInfo, Vc};
use turbo_tasks::{get_effects, Completion, Effects, ReadRef, TransientInstance, UpdateInfo, Vc};
use turbo_tasks_fs::{
util::uri_from_file, DiskFileSystem, FileContent, FileSystem, FileSystemPath,
};
Expand Down Expand Up @@ -623,6 +623,7 @@ struct EntrypointsWithIssues {
entrypoints: ReadRef<Entrypoints>,
issues: Arc<Vec<ReadRef<PlainIssue>>>,
diagnostics: Arc<Vec<ReadRef<PlainDiagnostic>>>,
effects: Arc<Effects>,
}

#[turbo_tasks::function]
Expand All @@ -633,10 +634,12 @@ async fn get_entrypoints_with_issues(
let entrypoints = entrypoints_operation.strongly_consistent().await?;
let issues = get_issues(entrypoints_operation).await?;
let diagnostics = get_diagnostics(entrypoints_operation).await?;
let effects = Arc::new(get_effects(entrypoints_operation).await?);
Ok(EntrypointsWithIssues {
entrypoints,
issues,
diagnostics,
effects,
}
.cell())
}
Expand All @@ -653,13 +656,14 @@ pub fn project_entrypoints_subscribe(
func,
move || {
async move {
let operation = get_entrypoints_with_issues(container);
let EntrypointsWithIssues {
entrypoints,
issues,
diagnostics,
} = &*get_entrypoints_with_issues(container)
.strongly_consistent()
.await?;
effects,
} = &*operation.strongly_consistent().await?;
effects.apply().await?;
Ok((entrypoints.clone(), issues.clone(), diagnostics.clone()))
}
.instrument(tracing::info_span!("entrypoints subscription"))
Expand Down Expand Up @@ -718,6 +722,7 @@ struct HmrUpdateWithIssues {
update: ReadRef<Update>,
issues: Arc<Vec<ReadRef<PlainIssue>>>,
diagnostics: Arc<Vec<ReadRef<PlainDiagnostic>>>,
effects: Arc<Effects>,
}

#[turbo_tasks::function]
Expand All @@ -730,10 +735,12 @@ async fn hmr_update(
let update = update_operation.strongly_consistent().await?;
let issues = get_issues(update_operation).await?;
let diagnostics = get_diagnostics(update_operation).await?;
let effects = Arc::new(get_effects(update_operation).await?);
Ok(HmrUpdateWithIssues {
update,
issues,
diagnostics,
effects,
}
.cell())
}
Expand All @@ -760,14 +767,15 @@ pub fn project_hmr_events(
let project = project.project().resolve().await?;
let state = project.hmr_version_state(identifier.clone(), session);

let update = hmr_update(project, identifier.clone(), state)
.strongly_consistent()
.await?;
let operation = hmr_update(project, identifier.clone(), state);
let update = operation.strongly_consistent().await?;
let HmrUpdateWithIssues {
update,
issues,
diagnostics,
effects,
} = &*update;
effects.apply().await?;
match &**update {
Update::Missing | Update::None => {}
Update::Total(TotalUpdate { to }) => {
Expand Down Expand Up @@ -832,6 +840,7 @@ struct HmrIdentifiersWithIssues {
identifiers: ReadRef<Vec<RcStr>>,
issues: Arc<Vec<ReadRef<PlainIssue>>>,
diagnostics: Arc<Vec<ReadRef<PlainDiagnostic>>>,
effects: Arc<Effects>,
}

#[turbo_tasks::function]
Expand All @@ -842,10 +851,12 @@ async fn get_hmr_identifiers_with_issues(
let hmr_identifiers = hmr_identifiers_operation.strongly_consistent().await?;
let issues = get_issues(hmr_identifiers_operation).await?;
let diagnostics = get_diagnostics(hmr_identifiers_operation).await?;
let effects = Arc::new(get_effects(hmr_identifiers_operation).await?);
Ok(HmrIdentifiersWithIssues {
identifiers: hmr_identifiers,
issues,
diagnostics,
effects,
}
.cell())
}
Expand All @@ -861,13 +872,14 @@ pub fn project_hmr_identifiers_subscribe(
turbo_tasks.clone(),
func,
move || async move {
let operation = get_hmr_identifiers_with_issues(container);
let HmrIdentifiersWithIssues {
identifiers,
issues,
diagnostics,
} = &*get_hmr_identifiers_with_issues(container)
.strongly_consistent()
.await?;
effects,
} = &*operation.strongly_consistent().await?;
effects.apply().await?;

Ok((identifiers.clone(), issues.clone(), diagnostics.clone()))
},
Expand Down
6 changes: 3 additions & 3 deletions crates/next-api/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1647,10 +1647,10 @@ impl Endpoint for AppEndpoint {

let node_root_ref = &node_root.await?;

this.app_project
let _ = this
.app_project
.project()
.emit_all_output_assets(Vc::cell(output_assets))
.await?;
.emit_all_output_assets(Vc::cell(output_assets));

let node_root = this.app_project.project().node_root();
let server_paths = all_server_paths(output_assets, node_root)
Expand Down
4 changes: 1 addition & 3 deletions crates/next-api/src/instrumentation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,7 @@ impl Endpoint for InstrumentationEndpoint {
let this = self.await?;
let output_assets = self.output_assets();
let _ = output_assets.resolve().await?;
this.project
.emit_all_output_assets(Vc::cell(output_assets))
.await?;
let _ = this.project.emit_all_output_assets(Vc::cell(output_assets));

let node_root = this.project.node_root();
let server_paths = all_server_paths(output_assets, node_root)
Expand Down
4 changes: 1 addition & 3 deletions crates/next-api/src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,7 @@ impl Endpoint for MiddlewareEndpoint {
let this = self.await?;
let output_assets = self.output_assets();
let _ = output_assets.resolve().await?;
this.project
.emit_all_output_assets(Vc::cell(output_assets))
.await?;
let _ = this.project.emit_all_output_assets(Vc::cell(output_assets));

let node_root = this.project.node_root();
let server_paths = all_server_paths(output_assets, node_root)
Expand Down
2 changes: 1 addition & 1 deletion crates/next-api/src/nft_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl NftJsonAsset {
let output_fs = this.output_fs.await?;
let nft_folder = self.ident().path().parent().await?;

if let Some(subdir) = output_fs.root.strip_prefix(&*project_fs.root) {
if let Some(subdir) = output_fs.root().strip_prefix(&**project_fs.root()) {
Ok(this
.project_fs
.root()
Expand Down
6 changes: 3 additions & 3 deletions crates/next-api/src/pages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1290,10 +1290,10 @@ impl Endpoint for PageEndpoint {
// single operation
let output_assets = self.output_assets();

this.pages_project
let _ = this
.pages_project
.project()
.emit_all_output_assets(Vc::cell(output_assets))
.await?;
.emit_all_output_assets(Vc::cell(output_assets));

let node_root = this.pages_project.project().node_root();
let server_paths = all_server_paths(output_assets, node_root)
Expand Down
26 changes: 11 additions & 15 deletions crates/next-api/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,7 @@ impl Project {
pub async fn emit_all_output_assets(
self: Vc<Self>,
output_assets: Vc<OutputAssetsOperation>,
) -> Result<Vc<()>> {
) -> Result<()> {
let span = tracing::info_span!("emitting");
async move {
let all_output_assets = all_assets_from_entries_operation(output_assets);
Expand All @@ -1173,27 +1173,23 @@ impl Project {
let node_root = self.node_root();

if let Some(map) = self.await?.versioned_content_map {
let _ = map
.insert_output_assets(
all_output_assets,
node_root,
client_relative_path,
node_root,
)
.resolve()
.await?;
let _ = map.insert_output_assets(
all_output_assets,
node_root,
client_relative_path,
node_root,
);

Ok(Vc::cell(()))
Ok(())
} else {
let _ = emit_assets(
*all_output_assets.await?,
node_root,
client_relative_path,
node_root,
)
.resolve()
.await?;
Ok(Vc::cell(()))
);

Ok(())
}
}
.instrument(span)
Expand Down
Loading

0 comments on commit be9b2dd

Please sign in to comment.