From 7921fdf72f1f52bcf7e9e42e3b2f3fc920c1f1f4 Mon Sep 17 00:00:00 2001 From: aanufriev-celonis Date: Tue, 30 Jun 2026 12:00:28 +0200 Subject: [PATCH] test: reproduce #6531 --- .../src/indexing_scheduler/mod.rs | 209 ++++++++++++++++++ 1 file changed, 209 insertions(+) diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index f67a5462264..9dd8e8fea76 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -1156,6 +1156,215 @@ mod tests { } } + // Test-reproducer of the mass-outage+stuck-ingest bug. + // Link: https://github.com/quickwit-oss/quickwit/issues/6531. + // It asserts that ingestion recovers after the outage. + // + // Scenario: an ingester crashed and restarted having lost its WAL, but the metastore + // still lists its shard as `Open`. The ingester rejoins (`Ready` state) and, on startup, + // reports the shards it still holds via `AdviseResetShardsRequest` - an empty inventory, + // since its WAL is gone. The control plane plans an indexing task for that `Open` shard, + // which the restarted ingester can never serve. + // Ingester fail was it does not have access to lost shard anymore, but control plane + // cannot recover properly and just retries. The system ends up in unrecoverable state. + // + // TODO: the fix adds mandatory `shards_snapshot_timestamp` field to `AdviseResetShardsRequest`, + // set it to `2_000` on the test request below (must exceed the shard's `update_timestamp` + // of 1_000 for the shard to be treated as lost). + #[tokio::test] + async fn test_ingest_recovers_after_mass_outage() { + use quickwit_actors::{Observe, Universe}; + use quickwit_config::{ClusterConfig, INGEST_V2_SOURCE_ID}; + use quickwit_ingest::IngesterPool; + use quickwit_metastore::ListIndexesMetadataResponseExt; + use quickwit_proto::control_plane::AdviseResetShardsRequest; + use quickwit_proto::metastore::{ + ListIndexesMetadataRequest, ListIndexesMetadataResponse, ListShardsRequest, + ListShardsResponse, ListShardsSubresponse, MetastoreServiceClient, + MockMetastoreService, + }; + use quickwit_proto::types::Position; + + use crate::control_plane::{ControlPlane, ControlPlaneObservableState}; + + // run actors on a mock clock so `universe.sleep` fast-forwards the control loop timers + let universe = Universe::with_accelerated_time(); + + // control plane node + let node_id = NodeId::from_str("test-control-plane"); + + let mut mock_indexer = MockIndexingService::new(); + mock_indexer + .expect_apply_indexing_plan() + .returning(|_| Ok(ApplyIndexingPlanResponse {})); + + // test indexer + let indexer = IndexerNodeInfo { + node_id: NodeId::from_str("test-indexer"), + generation_id: 0, + client: IndexingServiceClient::from_mock(mock_indexer), + indexing_tasks: Vec::new(), + indexing_capacity: CpuCapacity::from_cpu_millis(4_000), + ingester_status: IngesterStatus::Ready, + }; + + // register the indexer so the scheduler has a node to assign indexing tasks to + let indexer_pool = IndexerPool::default(); + indexer_pool.insert(indexer.node_id.clone(), indexer); + + // no ingesters needed, test starts already at indexing phase + let ingester_pool = IngesterPool::default(); + + let mut mock_metastore = MockMetastoreService::new(); + + // one index with a single, enabled ingest-v2 source + let mut index = IndexMetadata::for_test("test-index-0", "ram:///test-index-0"); + let mut source = SourceConfig::ingest_v2(); + source.enabled = true; + index.add_source(source).unwrap(); + + // on startup the control plane lists all indexes + let index_clone = index.clone(); + mock_metastore.expect_list_indexes_metadata().return_once( + move |request: ListIndexesMetadataRequest| { + assert_eq!(request, ListIndexesMetadataRequest::all()); + Ok(ListIndexesMetadataResponse::for_test(vec![index_clone])) + }, + ); + + // phantom shard with `Open` state is in the metastore but its data is gone, + // led by the restarted ingester + let mut shard = Shard { + index_uid: Some(index.index_uid.clone()), + source_id: INGEST_V2_SOURCE_ID.to_string(), + shard_id: Some(ShardId::from(17)), // just random ID + leader_id: "test-ingester".to_string(), + publish_position_inclusive: Some(Position::offset(1_000u64)), + update_timestamp: 1_000, + ..Default::default() + }; + shard.set_shard_state(ShardState::Open); + let index_uid = index.index_uid.clone(); + + // register shard in metastore + mock_metastore + .expect_list_shards() + .return_once(move |_request: ListShardsRequest| { + Ok(ListShardsResponse { + subresponses: vec![ListShardsSubresponse { + index_uid: Some(index_uid), + source_id: INGEST_V2_SOURCE_ID.to_string(), + shards: vec![shard], + }], + }) + }); + + // spawn real control plane wired to the mocks above and empty cluster-change stream + let (control_plane_mailbox, _handle, mut readiness_rx) = ControlPlane::spawn( + &universe, + ClusterConfig::for_test(), + node_id, + quickwit_cluster::ClusterChangeStreamFactoryForTest::default(), + indexer_pool, + ingester_pool, + MetastoreServiceClient::from_mock(mock_metastore), + ); + + // wait until control plane loads metastore and is ready, first plan is now exists + tokio::time::timeout( + Duration::from_secs(5), + readiness_rx.wait_for(|ready| *ready), + ) + .await + .unwrap() + .unwrap(); + + // check if any task is still indexing shard 17 + let plan_targets_phantom_shard = |mailbox: &quickwit_actors::Mailbox| { + let mailbox = mailbox.clone(); + async move { + // `Observe` returns snapshot of the control plane's internal state + let obs: ControlPlaneObservableState = mailbox.ask(Observe).await.unwrap(); + obs.indexing_scheduler + .last_applied_physical_plan + .unwrap() + .indexing_tasks_per_indexer() + .get("test-indexer") + .cloned() + .unwrap_or_default() + .iter() + .any(|task| task.shard_ids.contains(&ShardId::from(17))) + } + }; + + // precondition: right after startup control plane already planned an indexing task + // for open phantom shard (this is the setup of the broken situation) + assert!( + plan_targets_phantom_shard(&control_plane_mailbox).await, + "control plane should plan a task for the open phantom shard" + ); + + // simulate the restarted ingester's startup handshake: it tells the control plane + // which shards it still has in its WAL; having lost its WAL, it reports none. + // Currently this is a no-op for `Open` shards, so the phantom shard is left untouched. + // TODO: add `shards_snapshot_timestamp: 2_000` here once fix applied + control_plane_mailbox + .ask(AdviseResetShardsRequest { + ingester_id: "test-ingester".to_string(), + shard_ids: Vec::new(), + }) + .await + .unwrap() + .unwrap(); + + // wait for ingestion to recover, the plan to stop targeting the lost phantom shard; + // give the control loop several ticks to rebuild + let phantom_shard_id = ShardId::from(17); + let max_ticks = 10; + let mut recovered = false; + + let mut last_planned_shard_ids: Vec = Vec::new(); + + for _ in 0..max_ticks { + // fast-forward one second of control-loop time + universe.sleep(Duration::from_secs(1)).await; + + // snapshot the control plane and extract the shard IDs the plan assigns to our indexer + let obs: ControlPlaneObservableState = + control_plane_mailbox.ask(Observe).await.unwrap(); + last_planned_shard_ids = obs + .indexing_scheduler + .last_applied_physical_plan + .unwrap() + .indexing_tasks_per_indexer() + .get("test-indexer") + .cloned() + .unwrap_or_default() + .iter() + .flat_map(|task| task.shard_ids.clone()) + .collect(); + + // (!) recovery: the plan no longer references the lost shard + if !last_planned_shard_ids.contains(&phantom_shard_id) { + recovered = true; + break; + } + } + + assert!( + recovered, + "Ingestion never recovers after a mass outage, restarted ingester \ + `test-ingester`\nreported an empty WAL inventory in AdviseResetShardsRequest, but \ + after {max_ticks} ticks\nindexing plan still targets the lost shard \ + {phantom_shard_id:?}.\nPlanned shard ids for `test-indexer`: \ + {last_planned_shard_ids:?}.\nLeader no longer holds lost shard, so every ingest \ + attempt will fails and ingestion is stuck.\nExpected: the plan should drop the shard \ + once the ingester reports it lost." + ); + + universe.assert_quit().await; + } + #[test] fn test_select_available_indexers_returns_only_ready_when_available() { let indexer_pool = IndexerPool::default();