Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 210 additions & 1 deletion quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,215 @@ mod tests {
}
}

// Test-reproducer of the mass-outage+stuck-ingest bug.
// Link: https://github.com/quickwit-oss/quickwit/issues/6531.
// It asserts that ingestion recovers after the outage.
//
// Scenario: an ingester crashed and restarted having lost its WAL, but the metastore
// still lists its shard as `Open`. The ingester rejoins (`Ready` state) and, on startup,
// reports the shards it still holds via `AdviseResetShardsRequest` - an empty inventory,
// since its WAL is gone. The control plane plans an indexing task for that `Open` shard,
// which the restarted ingester can never serve.
// Ingester fail was it does not have access to lost shard anymore, but control plane
// cannot recover properly and just retries. The system ends up in unrecoverable state.
//
// TODO: the fix adds mandatory `shards_snapshot_timestamp` field to `AdviseResetShardsRequest`,
// set it to `2_000` on the test request below (must exceed the shard's `update_timestamp`
// of 1_000 for the shard to be treated as lost).
#[tokio::test]
async fn test_ingest_recovers_after_mass_outage() {
use quickwit_actors::{Observe, Universe};
use quickwit_config::{ClusterConfig, INGEST_V2_SOURCE_ID};
use quickwit_ingest::IngesterPool;
use quickwit_metastore::ListIndexesMetadataResponseExt;
use quickwit_proto::control_plane::AdviseResetShardsRequest;
use quickwit_proto::metastore::{
ListIndexesMetadataRequest, ListIndexesMetadataResponse, ListShardsRequest,
ListShardsResponse, ListShardsSubresponse, MetastoreServiceClient,
MockMetastoreService,
};
use quickwit_proto::types::Position;

use crate::control_plane::{ControlPlane, ControlPlaneObservableState};

// run actors on a mock clock so `universe.sleep` fast-forwards the control loop timers
let universe = Universe::with_accelerated_time();

// control plane node
let node_id = NodeId::from_str("test-control-plane");

let mut mock_indexer = MockIndexingService::new();
mock_indexer
.expect_apply_indexing_plan()
.returning(|_| Ok(ApplyIndexingPlanResponse {}));

// test indexer
let indexer = IndexerNodeInfo {
node_id: NodeId::from_str("test-indexer"),
generation_id: 0,
client: IndexingServiceClient::from_mock(mock_indexer),
indexing_tasks: Vec::new(),
indexing_capacity: CpuCapacity::from_cpu_millis(4_000),
ingester_status: IngesterStatus::Ready,
};

// register the indexer so the scheduler has a node to assign indexing tasks to
let indexer_pool = IndexerPool::default();
indexer_pool.insert(indexer.node_id.clone(), indexer);

// no ingesters needed, test starts already at indexing phase
let ingester_pool = IngesterPool::default();

let mut mock_metastore = MockMetastoreService::new();

// one index with a single, enabled ingest-v2 source
let mut index = IndexMetadata::for_test("test-index-0", "ram:///test-index-0");
let mut source = SourceConfig::ingest_v2();
source.enabled = true;
index.add_source(source).unwrap();

// on startup the control plane lists all indexes
let index_clone = index.clone();
mock_metastore.expect_list_indexes_metadata().return_once(
move |request: ListIndexesMetadataRequest| {
assert_eq!(request, ListIndexesMetadataRequest::all());
Ok(ListIndexesMetadataResponse::for_test(vec![index_clone]))
},
);

// phantom shard with `Open` state is in the metastore but its data is gone,
// led by the restarted ingester
let mut shard = Shard {
index_uid: Some(index.index_uid.clone()),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shard_id: Some(ShardId::from(17)), // just random ID
leader_id: "test-ingester".to_string(),
publish_position_inclusive: Some(Position::offset(1_000u64)),
update_timestamp: 1_000,
..Default::default()
};
shard.set_shard_state(ShardState::Open);
let index_uid = index.index_uid.clone();

// register shard in metastore
mock_metastore
.expect_list_shards()
.return_once(move |_request: ListShardsRequest| {
Ok(ListShardsResponse {
subresponses: vec![ListShardsSubresponse {
index_uid: Some(index_uid),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shards: vec![shard],
}],
})
});

// spawn real control plane wired to the mocks above and empty cluster-change stream
let (control_plane_mailbox, _handle, mut readiness_rx) = ControlPlane::spawn(
&universe,
ClusterConfig::for_test(),
node_id,
quickwit_cluster::ClusterChangeStreamFactoryForTest::default(),
indexer_pool,
ingester_pool,
MetastoreServiceClient::from_mock(mock_metastore),
);

// wait until control plane loads metastore and is ready, first plan is now exists
tokio::time::timeout(
Duration::from_secs(5),
readiness_rx.wait_for(|ready| *ready),
)
.await
.unwrap()
.unwrap();

// check if any task is still indexing shard 17
let plan_targets_phantom_shard = |mailbox: &quickwit_actors::Mailbox<ControlPlane>| {
let mailbox = mailbox.clone();
async move {
// `Observe` returns snapshot of the control plane's internal state
let obs: ControlPlaneObservableState = mailbox.ask(Observe).await.unwrap();
obs.indexing_scheduler
.last_applied_physical_plan
.unwrap()
.indexing_tasks_per_indexer()
.get("test-indexer")
.cloned()
.unwrap_or_default()
.iter()
.any(|task| task.shard_ids.contains(&ShardId::from(17)))
}
};

// precondition: right after startup control plane already planned an indexing task
// for open phantom shard (this is the setup of the broken situation)
assert!(
plan_targets_phantom_shard(&control_plane_mailbox).await,
"control plane should plan a task for the open phantom shard"
);

// simulate the restarted ingester's startup handshake: it tells the control plane
// which shards it still has in its WAL; having lost its WAL, it reports none.
// Currently this is a no-op for `Open` shards, so the phantom shard is left untouched.
// TODO: add `shards_snapshot_timestamp: 2_000` here once fix applied
control_plane_mailbox
.ask(AdviseResetShardsRequest {
ingester_id: "test-ingester".to_string(),
shard_ids: Vec::new(),
})
.await
.unwrap()
.unwrap();

// wait for ingestion to recover, the plan to stop targeting the lost phantom shard;
// give the control loop several ticks to rebuild
let phantom_shard_id = ShardId::from(17);
let max_ticks = 10;
let mut recovered = false;

let mut last_planned_shard_ids: Vec<ShardId> = Vec::new();

for _ in 0..max_ticks {
// fast-forward one second of control-loop time
universe.sleep(Duration::from_secs(1)).await;

// snapshot the control plane and extract the shard IDs the plan assigns to our indexer
let obs: ControlPlaneObservableState =
control_plane_mailbox.ask(Observe).await.unwrap();
last_planned_shard_ids = obs
.indexing_scheduler
.last_applied_physical_plan
.unwrap()
.indexing_tasks_per_indexer()
.get("test-indexer")
.cloned()
.unwrap_or_default()
.iter()
.flat_map(|task| task.shard_ids.clone())
.collect();

// (!) recovery: the plan no longer references the lost shard
if !last_planned_shard_ids.contains(&phantom_shard_id) {
recovered = true;
break;
}
}

assert!(
recovered,
"Ingestion never recovers after a mass outage, restarted ingester \
`test-ingester`\nreported an empty WAL inventory in AdviseResetShardsRequest, but \
after {max_ticks} ticks\nindexing plan still targets the lost shard \
{phantom_shard_id:?}.\nPlanned shard ids for `test-indexer`: \
{last_planned_shard_ids:?}.\nLeader no longer holds lost shard, so every ingest \
attempt will fails and ingestion is stuck.\nExpected: the plan should drop the shard \
once the ingester reports it lost."
);

universe.assert_quit().await;
}

#[test]
fn test_select_available_indexers_returns_only_ready_when_available() {
let indexer_pool = IndexerPool::default();
Expand Down Expand Up @@ -1374,7 +1583,7 @@ mod tests {
}

// Applies a plan to a single indexer whose apply RPC hangs forever, then reports whether the
// apply task finished within `observe` i.e. whether a timeout cancelled it.
// apply task finished within `observe` - i.e. whether a timeout cancelled it.
async fn hanging_apply_is_cancelled_within(status: IngesterStatus, observe: Duration) -> bool {
let indexer_pool = IndexerPool::default();
let indexer = hanging_indexer_node_info(status);
Expand Down
Loading