Add file-based locking to state mutations to prevent concurrent write races
Parallel operations (e.g. stale review cleanup in `list`) could clobber each other via read-modify-write on the shared state file. Also fix spinner lifecycle in `review` and simplify empty-list output. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
69ba73b3c3
commit
bd9d481e81
|
|
@ -30,14 +30,10 @@ export async function action(opts: { json?: boolean }) {
|
||||||
} catch {}
|
} catch {}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sessions.length === 0) {
|
if (sessions.length === 0 && !opts.json) {
|
||||||
if (opts.json) {
|
console.log("◆ No active sessions.")
|
||||||
console.log("[]")
|
if ((await vm.status()) !== "running") {
|
||||||
} else {
|
console.log(`\n${red}VM is not running.${reset}`)
|
||||||
console.log("◆ No active sessions.")
|
|
||||||
if ((await vm.status()) !== "running") {
|
|
||||||
console.log(`\n${red}VM is not running.${reset}`)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -59,13 +55,11 @@ export async function action(opts: { json?: boolean }) {
|
||||||
)
|
)
|
||||||
const statuses = Object.fromEntries(statusEntries)
|
const statuses = Object.fromEntries(statusEntries)
|
||||||
|
|
||||||
// Self-heal stale in_review flags — re-check activity to avoid racing with a concurrent review start
|
// Self-heal stale in_review flags in parallel and update in-memory objects
|
||||||
for (const s of staleReviewSessions) {
|
await Promise.all(staleReviewSessions.map(async (s) => {
|
||||||
const stillActive = await vm.isClaudeActive(s.worktree, s.branch)
|
await state.patchSession(root, s.branch, { in_review: false }).catch(() => {})
|
||||||
if (!stillActive) {
|
s.in_review = false
|
||||||
await state.patchSession(root, s.branch, { in_review: false }).catch(() => {})
|
}))
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (opts.json) {
|
if (opts.json) {
|
||||||
const withStatus = sessions.map(s => ({ ...s, status: statuses[s.branch] }))
|
const withStatus = sessions.map(s => ({ ...s, status: statuses[s.branch] }))
|
||||||
|
|
|
||||||
|
|
@ -74,13 +74,16 @@ Your thoughts, in brief.
|
||||||
if (opts.print) {
|
if (opts.print) {
|
||||||
spin.text = "Running review…"
|
spin.text = "Running review…"
|
||||||
const result = await vm.claude(session.worktree, { print: prompt })
|
const result = await vm.claude(session.worktree, { print: prompt })
|
||||||
|
spin.stop()
|
||||||
if (result.output) process.stdout.write(result.output + "\n")
|
if (result.output) process.stdout.write(result.output + "\n")
|
||||||
} else {
|
} else {
|
||||||
spin.succeed("Session ready")
|
spin.succeed("Session ready")
|
||||||
await vm.claude(session.worktree, { prompt })
|
await vm.claude(session.worktree, { prompt })
|
||||||
}
|
}
|
||||||
} finally {
|
} catch (e) {
|
||||||
spin.stop()
|
spin.stop()
|
||||||
|
throw e
|
||||||
|
} finally {
|
||||||
// Clear review flag before saveChanges to minimize the race window
|
// Clear review flag before saveChanges to minimize the race window
|
||||||
await state.patchSession(root, session.branch, { in_review: false }).catch(() => {})
|
await state.patchSession(root, session.branch, { in_review: false }).catch(() => {})
|
||||||
// Save worktree changes only in interactive mode
|
// Save worktree changes only in interactive mode
|
||||||
|
|
|
||||||
55
src/state.ts
55
src/state.ts
|
|
@ -1,5 +1,5 @@
|
||||||
import { join, dirname } from "path"
|
import { join, dirname } from "path"
|
||||||
import { readdir, rename } from "fs/promises"
|
import { readdir, rename, mkdir, rmdir } from "fs/promises"
|
||||||
import { homedir } from "os"
|
import { homedir } from "os"
|
||||||
|
|
||||||
export interface Session {
|
export interface Session {
|
||||||
|
|
@ -36,29 +36,64 @@ export async function save(repoRoot: string, state: State): Promise<void> {
|
||||||
await rename(tmpPath, path)
|
await rename(tmpPath, path)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const LOCK_TIMEOUT = 5000
|
||||||
|
const LOCK_RETRY_MS = 50
|
||||||
|
|
||||||
|
async function withStateLock<T>(repoRoot: string, fn: () => Promise<T>): Promise<T> {
|
||||||
|
const lockPath = statePath(repoRoot) + ".lock"
|
||||||
|
const start = Date.now()
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
await mkdir(lockPath)
|
||||||
|
break
|
||||||
|
} catch (e: any) {
|
||||||
|
if (e.code !== "EEXIST") throw e
|
||||||
|
if (Date.now() - start > LOCK_TIMEOUT) {
|
||||||
|
// Stale lock — force acquire
|
||||||
|
await rmdir(lockPath).catch(() => {})
|
||||||
|
await mkdir(lockPath)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
await Bun.sleep(LOCK_RETRY_MS)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
return await fn()
|
||||||
|
} finally {
|
||||||
|
await rmdir(lockPath).catch(() => {})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export async function getSession(repoRoot: string, branch: string): Promise<Session | undefined> {
|
export async function getSession(repoRoot: string, branch: string): Promise<Session | undefined> {
|
||||||
const state = await load(repoRoot)
|
const state = await load(repoRoot)
|
||||||
return state.sessions[branch]
|
return state.sessions[branch]
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function setSession(repoRoot: string, session: Session): Promise<void> {
|
export async function setSession(repoRoot: string, session: Session): Promise<void> {
|
||||||
const state = await load(repoRoot)
|
await withStateLock(repoRoot, async () => {
|
||||||
state.sessions[session.branch] = session
|
const state = await load(repoRoot)
|
||||||
await save(repoRoot, state)
|
state.sessions[session.branch] = session
|
||||||
|
await save(repoRoot, state)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function patchSession(repoRoot: string, branch: string, patch: Partial<Session>): Promise<void> {
|
export async function patchSession(repoRoot: string, branch: string, patch: Partial<Session>): Promise<void> {
|
||||||
const state = await load(repoRoot)
|
await withStateLock(repoRoot, async () => {
|
||||||
if (state.sessions[branch]) {
|
const state = await load(repoRoot)
|
||||||
|
if (!state.sessions[branch]) throw new Error(`session not found: ${branch}`)
|
||||||
Object.assign(state.sessions[branch], patch)
|
Object.assign(state.sessions[branch], patch)
|
||||||
await save(repoRoot, state)
|
await save(repoRoot, state)
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function removeSession(repoRoot: string, branch: string): Promise<void> {
|
export async function removeSession(repoRoot: string, branch: string): Promise<void> {
|
||||||
const state = await load(repoRoot)
|
await withStateLock(repoRoot, async () => {
|
||||||
delete state.sessions[branch]
|
const state = await load(repoRoot)
|
||||||
await save(repoRoot, state)
|
delete state.sessions[branch]
|
||||||
|
await save(repoRoot, state)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface GlobalSession extends Session {
|
export interface GlobalSession extends Session {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user