Skip to content

Commit

Permalink
chore(core): rust fs4 lock
Browse files Browse the repository at this point in the history
  • Loading branch information
AgentEnder committed Jan 14, 2025
1 parent 61b98f3 commit 0204e02
Show file tree
Hide file tree
Showing 13 changed files with 187 additions and 189 deletions.
22 changes: 2 additions & 20 deletions packages/nx/bin/nx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { performance } from 'perf_hooks';
import { setupWorkspaceContext } from '../src/utils/workspace-context';
import { daemonClient } from '../src/daemon/client/client';
import { removeDbConnections } from '../src/utils/db-connection';
import { signalToCode } from '../src/utils/exit-codes';
import { registerCleanupFn } from '../src/utils/cleanup';

function main() {
if (
Expand Down Expand Up @@ -276,26 +276,8 @@ const getLatestVersionOfNx = ((fn: () => string) => {
return () => cache || (cache = fn());
})(_getLatestVersionOfNx);

function nxCleanup(signal?: NodeJS.Signals) {
registerCleanupFn(() => {
removeDbConnections();
if (signal) {
process.exit(signalToCode(signal));
} else {
process.exit();
}
}

process.on('exit', () => {
nxCleanup();
});
process.on('SIGINT', () => {
nxCleanup('SIGINT');
});
process.on('SIGTERM', () => {
nxCleanup('SIGTERM');
});
process.on('SIGHUP', () => {
nxCleanup('SIGHUP');
});

main();
12 changes: 4 additions & 8 deletions packages/nx/src/command-line/graph/graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import { ConfigurationSourceMaps } from '../../project-graph/utils/project-confi
import { createTaskHasher } from '../../hasher/create-task-hasher';
import { ProjectGraphError } from '../../project-graph/error-types';
import { isNxCloudUsed } from '../../utils/nx-cloud-utils';
import { registerCleanupFn } from '../../utils/cleanup';

export interface GraphError {
message: string;
Expand Down Expand Up @@ -667,14 +668,9 @@ async function startServer(
}
});

const handleTermination = async (exitCode: number) => {
if (unregisterFileWatcher) {
unregisterFileWatcher();
}
process.exit(exitCode);
};
process.on('SIGINT', () => handleTermination(128 + 2));
process.on('SIGTERM', () => handleTermination(128 + 15));
if (unregisterFileWatcher) {
registerCleanupFn(() => unregisterFileWatcher);
}

return new Promise<{ app: Server; url: URL }>((res) => {
app.listen(port, host, () => {
Expand Down
34 changes: 3 additions & 31 deletions packages/nx/src/executors/run-commands/run-commands.impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import {
PseudoTerminal,
PseudoTtyProcess,
} from '../../tasks-runner/pseudo-terminal';
import { signalToCode } from '../../utils/exit-codes';
import {
loadAndExpandDotEnvFile,
unloadDotEnvFile,
} from '../../tasks-runner/task-env';
import { registerCleanupFn } from '../../utils/cleanup';

export const LARGE_BUFFER = 1024 * 1000000;
let pseudoTerminal: PseudoTerminal | null;
Expand Down Expand Up @@ -640,41 +640,13 @@ function registerProcessListener() {
});
});

// Terminate any task processes on exit
process.on('exit', () => {
registerCleanupFn((signal) => {
childProcesses.forEach((p) => {
if ('connected' in p ? p.connected : p.isAlive) {
p.kill();
p.kill(signal);
}
});
});
process.on('SIGINT', () => {
childProcesses.forEach((p) => {
if ('connected' in p ? p.connected : p.isAlive) {
p.kill('SIGTERM');
}
});
// we exit here because we don't need to write anything to cache.
process.exit(signalToCode('SIGINT'));
});
process.on('SIGTERM', () => {
childProcesses.forEach((p) => {
if ('connected' in p ? p.connected : p.isAlive) {
p.kill('SIGTERM');
}
});
// no exit here because we expect child processes to terminate which
// will store results to the cache and will terminate this process
});
process.on('SIGHUP', () => {
childProcesses.forEach((p) => {
if ('connected' in p ? p.connected : p.isAlive) {
p.kill('SIGTERM');
}
});
// no exit here because we expect child processes to terminate which
// will store results to the cache and will terminate this process
});
}

function wrapArgIntoQuotesIfNeeded(arg: string): string {
Expand Down
2 changes: 1 addition & 1 deletion packages/nx/src/native/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ export declare class ChildProcess {
export declare class FileLock {
locked: boolean
constructor(lockFilePath: string)
lock(): void
unlock(): void
wait(): Promise<void>
lock(): void
}

export declare class HashPlanner {
Expand Down
44 changes: 44 additions & 0 deletions packages/nx/src/native/tests/file-lock.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { fork } from 'child_process';
import { join } from 'path';

describe('withLock', () => {
it('should block the second call until the first one is done', async () => {
let combinedOutputs = [];
let a = fork(join(__dirname, './fixtures/file-lock-fixture.spec.js'), {
env: {
LABEL: 'a',
NX_NATIVE_LOGGING: 'trace',
},
stdio: 'pipe',
execArgv: ['--require', 'ts-node/register'],
});

// Gives a bit of time to make the outputs of the tests more predictable...
// if both start at the same time, its hard to guarantee that a will get the lock before b.
await new Promise((r) => setTimeout(r, 500));

let b = fork(join(__dirname, './fixtures/file-lock-fixture.spec.js'), {
env: {
LABEL: 'b',
NX_NATIVE_LOGGING: 'trace',
},
stdio: 'pipe',
execArgv: ['--require', 'ts-node/register'],
});

a.stdout.on('data', (data) => {
combinedOutputs.push('A: ' + data.toString().trim());
});
b.stdout.on('data', (data) => {
combinedOutputs.push('B: ' + data.toString().trim());
});

a.stderr.pipe(process.stderr);
b.stderr.pipe(process.stderr);

await Promise.all([a, b].map((p) => new Promise((r) => p.once('exit', r))));

expect(combinedOutputs).toContain('A: ran with lock');
expect(combinedOutputs).toContain('B: waited for lock');
});
});
20 changes: 20 additions & 0 deletions packages/nx/src/native/tests/fixtures/file-lock-fixture.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
const { FileLock } = require('../../native-bindings.js');
const ora = require('ora');
const tmp = require('os').tmpdir();

(async () => {
const lock = new FileLock(
require('path').join(tmp, 'nx-unit-tests', 'file-lock-fixture')
);
if (lock.locked) {
const s = ora('Waiting for lock').start();
await lock.wait();
s.stop();
console.log('waited for lock');
} else {
await lock.lock();
await new Promise((resolve) => setTimeout(resolve, 5000));
console.log('ran with lock');
await lock.unlock();
}
})();
140 changes: 70 additions & 70 deletions packages/nx/src/native/utils/file_lock.rs
Original file line number Diff line number Diff line change
@@ -1,79 +1,95 @@
use std::fs::{self, File};
use std::io;
use std::path::Path;
use std::time::Duration;
use fs4::fs_std::FileExt;
use napi::bindgen_prelude::*;
use std::{
fs::{self, OpenOptions},
path::Path,
};
use tracing::trace;

#[napi]
#[derive(Clone)]
pub struct FileLock {
#[napi]
pub locked: bool,

file: fs::File,
lock_file_path: String,
}

/// const lock = new FileLock('lockfile.lock');
/// if (lock.locked) {
/// lock.wait()
/// readFromCache()
/// } else {
/// lock.lock()
/// ... do some work
/// writeToCache()
/// lock.unlock()
/// }
#[napi]
impl FileLock {
#[napi(constructor)]
pub fn new(lock_file_path: String) -> Self {
let locked = Path::new(&lock_file_path).exists();
Self {
locked,
lock_file_path,
}
}
pub fn new(lock_file_path: String) -> anyhow::Result<Self> {
// Creates the directory where the lock file will be stored
fs::create_dir_all(Path::new(&lock_file_path).parent().unwrap())?;

#[napi]
pub fn lock(&mut self) -> anyhow::Result<()> {
if self.locked {
anyhow::bail!("File {} is already locked", self.lock_file_path)
// Opens the lock file
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&lock_file_path)?;

trace!("Locking file {}", lock_file_path);

// Check if the file is locked
let file_lock: std::result::Result<(), std::io::Error> = file.try_lock_exclusive();

if file_lock.is_ok() {
// Checking if the file is locked, locks it, so unlock it.
file.unlock()?;
}

let _ = File::create(&self.lock_file_path)?;
self.locked = true;
Ok(())
Ok(Self {
file: file,
locked: file_lock.is_err(),
lock_file_path,
})
}

#[napi]
pub fn unlock(&mut self) -> anyhow::Result<()> {
if !self.locked {
anyhow::bail!("File {} is not locked", self.lock_file_path)
}
fs::remove_file(&self.lock_file_path).or_else(|err| {
if err.kind() == io::ErrorKind::NotFound {
Ok(())
} else {
Err(err)
}
})?;
pub fn unlock(&mut self) {
let _ = self.file.unlock();
self.locked = false;
Ok(())
}

#[napi]
pub async fn wait(&self) -> Result<(), napi::Error> {
if !self.locked {
return Ok(());
}

loop {
if !self.locked || !Path::new(&self.lock_file_path).exists() {
break Ok(());
}
std::thread::sleep(Duration::from_millis(2));
#[napi(ts_return_type = "Promise<void>")]
pub fn wait(&mut self, env: Env) -> napi::Result<napi::JsObject> {
if self.locked {
let lock_file_path = self.lock_file_path.clone();
self.locked = false;
env.spawn_future(async move {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&lock_file_path)?;
file.lock_shared()?;
Ok(())
})
} else {
env.spawn_future(async move { Ok(()) })
}
}
}

// Ensure the lock file is removed when the FileLock is dropped
impl Drop for FileLock {
fn drop(&mut self) {
if self.locked {
let _ = self.unlock();
}
#[napi]
pub fn lock(&mut self) -> napi::Result<()> {
self.file.lock_exclusive()?;
self.locked = true;
Ok(())
}
}

// TODO: Fix the tests
#[cfg(test)]
mod test {
use super::*;
Expand All @@ -87,30 +103,13 @@ mod test {
let lock_file = tmp_dir.child("test_lock_file");
let lock_file_path = lock_file.path().to_path_buf();
let lock_file_path_str = lock_file_path.into_os_string().into_string().unwrap();
let mut file_lock = FileLock::new(lock_file_path_str);
let mut file_lock = FileLock::new(lock_file_path_str).unwrap();
assert_eq!(file_lock.locked, false);
let _ = file_lock.lock();
assert_eq!(file_lock.locked, true);
assert!(lock_file.exists());
let _ = file_lock.unlock();
assert_eq!(lock_file.exists(), false);
}

#[tokio::test]
async fn test_wait() {
let tmp_dir = TempDir::new().unwrap();
let lock_file = tmp_dir.child("test_lock_file");
let lock_file_path = lock_file.path().to_path_buf();
let lock_file_path_str = lock_file_path.into_os_string().into_string().unwrap();
let mut file_lock = FileLock::new(lock_file_path_str);
let _ = file_lock.lock();
let file_lock_clone = file_lock.clone();
let wait_fut = async move {
let _ = file_lock_clone.wait().await;
};
let _ = tokio::runtime::Runtime::new().unwrap().block_on(wait_fut);
assert_eq!(file_lock.locked, false);
assert_eq!(lock_file.exists(), false);
}

#[test]
Expand All @@ -120,9 +119,10 @@ mod test {
let lock_file_path = lock_file.path().to_path_buf();
let lock_file_path_str = lock_file_path.into_os_string().into_string().unwrap();
{
let mut file_lock = FileLock::new(lock_file_path_str.clone());
let mut file_lock = FileLock::new(lock_file_path_str.clone()).unwrap();
let _ = file_lock.lock();
}
assert_eq!(lock_file.exists(), false);
let file_lock = FileLock::new(lock_file_path_str.clone());
assert_eq!(file_lock.unwrap().locked, false);
}
}
Loading

0 comments on commit 0204e02

Please sign in to comment.