Skip to content

Commit e9d4285

Browse files
authored
Instruct runners to always pull from online queue after getting their server-side retry manifest (#81)
1 parent a1942ce commit e9d4285

File tree

11 files changed

+455
-82
lines changed

11 files changed

+455
-82
lines changed

CHANGELOG.md

+15
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
## 1.7.0
2+
3+
ABQ 1.7.0 is a minor release.
4+
5+
This release improves ABQ's behavior when an ABQ runner is terminated before
6+
being assigned all applicable test in a run manifest. In previous versions of
7+
ABQ, retrying such a runner would only retry the tests the runner was assigned
8+
before it terminated. Starting with ABQ 1.7.0, a runner that connects for a run
9+
ID after it was terminated will run all tests it ran on its first connection,
10+
and then pull tests from the run queue.
11+
12+
ABQ continues to cancel runs when a runner is terminated with SIGTERM, SIGINT,
13+
or SIGQUIT. The changes in 1.7.0 apply to runners terminated in other ways, for
14+
example via SIGKILL.
15+
116
## 1.6.4
217

318
ABQ 1.6.4 is a patch release.

Cargo.lock

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/abq_cli/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "abq"
3-
version = "1.6.4"
3+
version = "1.7.0"
44
edition = "2021"
55

66
[dependencies]

crates/abq_cli/src/report.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ async fn wait_for_results_help(
293293
}
294294
RunInProgress { active_runners } => {
295295
if active_runners.is_empty() {
296-
bail!("this ABQ run has not assigned all tests in your test suite, but there are no active runners to assign them to. Please either add more runners, or launch a new run.")
296+
bail!("this ABQ run has not assigned all tests in your test suite, but there are no active runners to assign them to. Please retry a runner, add more runners, or launch a new run.")
297297
}
298298

299299
let active_runners = active_runners

crates/abq_cli/tests/cli.rs

+158-4
Original file line numberDiff line numberDiff line change
@@ -417,12 +417,15 @@ fn wait_for_live_worker(worker_stderr: &mut ChildStderr) {
417417

418418
// Waits for the debug line "starting execution of all tests" in the worker.
419419
fn wait_for_worker_executing(worker_stderr: &mut ChildStderr) {
420-
let mut worker_reader = BufReader::new(worker_stderr).lines();
421-
// Spin until we know the worker0 is UP
420+
wait_for_line(worker_stderr, "starting execution of all tests");
421+
}
422+
423+
fn wait_for_line<R: std::io::Read>(reader: R, output: &str) {
424+
let mut reader = BufReader::new(reader).lines();
422425
loop {
423-
if let Some(line) = worker_reader.next() {
426+
if let Some(line) = reader.next() {
424427
let line = line.expect("line is not a string");
425-
if line.contains("starting execution of all tests") {
428+
if line.contains(output) {
426429
break;
427430
}
428431
}
@@ -5017,3 +5020,154 @@ fn write_partial_rwx_v1_json_results_on_early_runner_termination() {
50175020
}
50185021
"###);
50195022
}
5023+
5024+
#[test]
5025+
#[with_protocol_version]
5026+
#[serial]
5027+
fn retry_continued_manifest_read_on_worker_death() {
5028+
let name = "retry_continued_manifest_read_on_worker_death";
5029+
let conf = CSConfigOptions {
5030+
use_auth_token: true,
5031+
tls: true,
5032+
};
5033+
5034+
let (_queue_proc, queue_addr) = setup_queue!(name, conf);
5035+
5036+
let proto = AbqProtocolVersion::V0_2.get_supported_witness().unwrap();
5037+
5038+
let manifest = (0..4)
5039+
.map(|i| {
5040+
TestOrGroup::test(Test::new(
5041+
proto,
5042+
format!("test{}", i),
5043+
[],
5044+
Default::default(),
5045+
))
5046+
})
5047+
.collect::<Vec<_>>();
5048+
5049+
let manifest = ManifestMessage::new(Manifest::new(manifest, Default::default()));
5050+
5051+
let make_simulation = |i| {
5052+
[
5053+
Connect,
5054+
//
5055+
// Write spawn message
5056+
OpaqueWrite(pack(legal_spawned_message(proto))),
5057+
//
5058+
// Write the manifest if we need to.
5059+
// Otherwise handle the one test.
5060+
IfGenerateManifest {
5061+
then_do: vec![OpaqueWrite(pack(&manifest))],
5062+
else_do: {
5063+
let mut actions = vec![
5064+
//
5065+
// Read init context message + write ACK
5066+
OpaqueRead,
5067+
OpaqueWrite(pack(InitSuccessMessage::new(proto))),
5068+
// Read first test, write okay
5069+
IfAliveReadAndWriteFake(Status::Success),
5070+
Stdout("finished running first test\n".into()),
5071+
];
5072+
if i == 1 {
5073+
// First run: sleep forever, we will kill the worker.
5074+
actions.push(Sleep(Duration::from_secs(600)));
5075+
} else {
5076+
for _ in 0..3 {
5077+
actions.push(IfAliveReadAndWriteFake(Status::Success));
5078+
}
5079+
}
5080+
actions
5081+
},
5082+
},
5083+
//
5084+
// Finish
5085+
Exit(0),
5086+
]
5087+
};
5088+
5089+
let simulation1 = make_simulation(1);
5090+
let simulation2 = make_simulation(2);
5091+
5092+
let packed = pack_msgs_to_disk(simulation1);
5093+
5094+
let run_id = "test-run-id";
5095+
5096+
let test_args = {
5097+
let simulator = native_runner_simulation_bin();
5098+
let simfile_path = packed.path.display().to_string();
5099+
let args = vec![
5100+
format!("test"),
5101+
format!("--worker=0"),
5102+
format!("--queue-addr={queue_addr}"),
5103+
format!("--run-id={run_id}"),
5104+
format!("--batch-size=1"),
5105+
];
5106+
let mut args = conf.extend_args_for_client(args);
5107+
args.extend([s!("--"), simulator, simfile_path]);
5108+
args
5109+
};
5110+
5111+
let report_args = {
5112+
let args = vec![
5113+
format!("report"),
5114+
format!("--reporter=dot"),
5115+
format!("--queue-addr={queue_addr}"),
5116+
format!("--run-id={run_id}"),
5117+
format!("--color=never"),
5118+
];
5119+
conf.extend_args_for_client(args)
5120+
};
5121+
5122+
let mut worker0_attempt1 = Abq::new(format!("{name}_worker0_attempt1"))
5123+
.args(&test_args)
5124+
.spawn();
5125+
5126+
let attempt1_stdout = worker0_attempt1.stdout.as_mut().unwrap();
5127+
wait_for_line(attempt1_stdout, "finished running first test");
5128+
5129+
// Kill the worker.
5130+
worker0_attempt1.kill().unwrap();
5131+
5132+
{
5133+
let CmdOutput { exit_status, .. } = Abq::new(name.to_string() + "_report1")
5134+
.args(&report_args)
5135+
.run();
5136+
assert!(!exit_status.success());
5137+
}
5138+
5139+
std::fs::write(packed.path, pack_msgs(simulation2)).unwrap();
5140+
5141+
{
5142+
let CmdOutput {
5143+
stdout,
5144+
stderr,
5145+
exit_status,
5146+
} = Abq::new(format!("{name}_worker0_attempt1"))
5147+
.args(test_args)
5148+
.run();
5149+
assert!(
5150+
exit_status.success(),
5151+
"STDOUT:\n{stdout}\nSTDERR:\n{stderr}"
5152+
);
5153+
assert!(
5154+
stdout.contains("4 tests, 0 failures"),
5155+
"STDOUT:\n{stdout}\nSTDERR:\n{stderr}"
5156+
);
5157+
}
5158+
5159+
{
5160+
let CmdOutput {
5161+
stdout,
5162+
stderr,
5163+
exit_status,
5164+
} = Abq::new(name.to_string() + "_report2")
5165+
.args(report_args)
5166+
.run();
5167+
assert!(exit_status.success());
5168+
assert!(
5169+
stdout.contains("4 tests, 0 failures"),
5170+
"STDOUT:\n{stdout}\nSTDERR:\n{stderr}"
5171+
);
5172+
}
5173+
}

crates/abq_queue/src/queue.rs

+73-18
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ impl AllRuns {
422422
runner_test_command_differs,
423423
})
424424
}
425-
Some((old_entity, old_finished_state)) => {
425+
Some((old_entity, old_finished_time)) => {
426426
if old_entity.id == entity.id {
427427
// The same worker entity is connecting twice - this implies that the
428428
// same worker process is asking to find a run more than once, which
@@ -453,13 +453,13 @@ impl AllRuns {
453453
// itself, and if this is done we could hand out the manifest right here,
454454
// rather than asking the runner to reconnect to retrieve the manifest.
455455
tracing::info!(
456-
?old_finished_state,
456+
?old_finished_time,
457457
?entity,
458458
"worker reconnecting for out-of-process retry manifest during active run"
459459
);
460460

461461
AssignedRunStatus::Run(AssignedRun {
462-
kind: AssignedRunKind::Retry,
462+
kind: AssignedRunKind::RetryAndContinue,
463463
runner_test_command_differs,
464464
})
465465
}
@@ -488,7 +488,7 @@ impl AllRuns {
488488
}
489489

490490
AssignedRunStatus::Run(AssignedRun {
491-
kind: AssignedRunKind::Retry,
491+
kind: AssignedRunKind::RetryAndContinue,
492492
runner_test_command_differs,
493493
})
494494
} else {
@@ -1305,21 +1305,11 @@ impl AllRuns {
13051305
// legal cancellation states
13061306
}
13071307
RunState::InitialManifestDone { seen_workers, .. } => {
1308-
// Since we already have issued the full manifest out, don't mark this run as
1309-
// cancelled; this might be a stragling worker or a worker that cancelled an
1310-
// out-of-process retry.
13111308
tracing::info!(
13121309
?run_id,
13131310
"refusing to cancel run whose manifest has already been exhausted"
13141311
);
1315-
// Mark the worker as now-inactive.
1316-
let old_tag = seen_workers.write().insert_by_tag(entity, false);
1317-
log_assert!(
1318-
old_tag.is_some(),
1319-
?entity,
1320-
?run_id,
1321-
"entity was not seen before it marked cancellation"
1322-
);
1312+
seen_workers.write().insert_by_tag(entity, false);
13231313
return;
13241314
}
13251315
}
@@ -3995,7 +3985,7 @@ mod test {
39953985
assert_eq!(
39963986
assigned,
39973987
AssignedRunStatus::Run(AssignedRun {
3998-
kind: AssignedRunKind::Retry,
3988+
kind: AssignedRunKind::RetryAndContinue,
39993989
runner_test_command_differs: false
40003990
})
40013991
);
@@ -4331,7 +4321,7 @@ mod persistence_on_end_of_manifest {
43314321

43324322
#[tokio::test]
43334323
#[with_protocol_version]
4334-
async fn worker_told_to_pull_retry_manifest() {
4324+
async fn worker_told_to_pull_retry_manifest_and_continue() {
43354325
let queues = SharedRuns::default();
43364326
let remote = remote::NoopPersister::new().into();
43374327

@@ -4388,10 +4378,75 @@ mod persistence_on_end_of_manifest {
43884378
.build(),
43894379
)
43904380
.await;
4381+
4382+
assert_eq!(
4383+
assigned,
4384+
AssignedRunStatus::Run(AssignedRun {
4385+
kind: AssignedRunKind::RetryAndContinue,
4386+
runner_test_command_differs: false
4387+
})
4388+
);
4389+
}
4390+
4391+
#[tokio::test]
4392+
#[with_protocol_version]
4393+
async fn worker_told_to_pull_retry_manifest_no_continue() {
4394+
let queues = SharedRuns::default();
4395+
let remote = remote::NoopPersister::new().into();
4396+
4397+
let run_id = RunId::unique();
4398+
4399+
let worker0 = Entity::runner(1, 1);
4400+
let worker0_shadow = Entity::runner(1, 1);
4401+
assert_ne!(worker0.id, worker0_shadow.id);
4402+
assert_eq!(worker0.tag, worker0_shadow.tag);
4403+
4404+
let test1 = fake_test_spec(proto);
4405+
let test2 = fake_test_spec(proto);
4406+
let test3 = fake_test_spec(proto);
4407+
4408+
let test_command_hash = TestCommandHash::random();
4409+
4410+
// Create run, add manifest by worker0
4411+
{
4412+
let run_params = RunParamsBuilder::new(&run_id, &remote)
4413+
.entity(worker0)
4414+
.runner_test_command_hash(test_command_hash)
4415+
.build();
4416+
let manifest = vec![
4417+
(test1.clone(), GroupId::new()),
4418+
(test2, GroupId::new()),
4419+
(test3, GroupId::new()),
4420+
];
4421+
let _ = queues.find_or_create_run(run_params).await;
4422+
let _ = queues.add_manifest(&run_id, manifest, Default::default());
4423+
}
4424+
4425+
// worker0 pulls tests
4426+
{
4427+
let NextWorkResult { bundle, .. } = queues.next_work(worker0, &run_id);
4428+
assert_eq!(
4429+
bundle.work,
4430+
vec![WorkerTest::new(test1.clone(), INIT_RUN_NUMBER)]
4431+
);
4432+
}
4433+
4434+
queues.mark_worker_complete(&run_id, worker0, std::time::Instant::now());
4435+
4436+
// Suppose worker0 re-runs.
4437+
let assigned = queues
4438+
.find_or_create_run(
4439+
RunParamsBuilder::new(&run_id, &remote)
4440+
.entity(worker0_shadow)
4441+
.runner_test_command_hash(test_command_hash)
4442+
.build(),
4443+
)
4444+
.await;
4445+
43914446
assert_eq!(
43924447
assigned,
43934448
AssignedRunStatus::Run(AssignedRun {
4394-
kind: AssignedRunKind::Retry,
4449+
kind: AssignedRunKind::RetryAndContinue,
43954450
runner_test_command_differs: false
43964451
})
43974452
);

crates/abq_workers/src/assigned_run.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use serde_derive::{Deserialize, Serialize};
77
pub enum AssignedRunKind {
88
/// This worker is connecting for a fresh run, and should fetch tests online.
99
Fresh { should_generate_manifest: bool },
10-
/// This worker is connecting for a retry, and should fetch its manifest from the queue once.
11-
Retry,
10+
/// This worker should pull its retry manifest, and then continue to fetch tests online.
11+
RetryAndContinue,
1212
}
1313

1414
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]

crates/abq_workers/src/negotiate.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ impl WorkersNegotiator {
248248
AssignedRunKind::Fresh {
249249
should_generate_manifest,
250250
} => should_generate_manifest,
251-
AssignedRunKind::Retry => false,
251+
AssignedRunKind::RetryAndContinue => false,
252252
};
253253

254254
let runner_strategy_generator = RunnerStrategyGenerator::new(

0 commit comments

Comments
 (0)