feat(pulsar): support distributed indexing#6561
Conversation
Allow `num_pipelines > 1` for the Pulsar source so its partitions can be
indexed in parallel across several pipelines and indexers, on par with
Kafka.
The control plane already spawns N pipelines for "NonSharded" sources
(Pulsar included), delegating partition distribution to the broker. The
only blockers were a config validation gate and the consumer setup:
- Lift the `num_pipelines > 1` validation gate for Pulsar.
- Derive a per-pipeline consumer name (`{prefix}-{pipeline_uid}`) so each
pipeline connects as a distinct consumer under the shared subscription.
- Keep the `Failover` subscription: on a partitioned topic it assigns each
partition to exactly one active consumer, matching Quickwit's
per-partition checkpoint model. `Shared` would spread a partition's
messages across pipelines and corrupt the metastore checkpoint; this is
now documented at the call site.
Tests: flip the config unit test to expect success, add a `pipeline_uid`
test-builder setter, and add a broker integration test proving N pipelines
split the partitions of a 4-partition topic with disjoint ownership, no
loss, no duplicates, and a consistent merged checkpoint.
Docs: add a "Distributed indexing" section to the Pulsar guide and note the
partitioned-topic requirement in the source config reference.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 1026d0d675
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| .consumer() | ||
| .with_topics(¶ms.topics) | ||
| .with_consumer_name(¶ms.consumer_name) | ||
| .with_consumer_name(&consumer_name) |
There was a problem hiding this comment.
Refresh positions on Pulsar failover handoff
With num_pipelines > 1, these distinct consumers can sit idle and later become active for a partition after another pipeline has already published checkpoints. That standby still has the current_positions snapshot loaded in try_new, and Pulsar suggest_truncate only ACKs the local source without advancing positions for partitions it did not own, so the first batch after a normal Failover reassignment records a delta from the stale startup position. Once the metastore checkpoint is ahead of that from position, publishing the split is rejected and the reassigned partition wedges; refresh/update per-partition positions before emitting after handoff.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in 97b03e0. The pipeline now lazily re-reads the committed position from the metastore the first time it consumes a partition in a session (tracked via synced_partitions), so the first delta after a Failover handoff is contiguous with the metastore checkpoint instead of starting from the stale try_new snapshot — mirroring how the Kafka source refreshes offsets on assignment. Added unit tests for the align_position helper.
| match &self.source_params { | ||
| SourceParams::PubSub(_) | ||
| | SourceParams::Kafka(_) | ||
| | SourceParams::Pulsar(_) |
There was a problem hiding this comment.
Update the REST bad-config expectation
Allowing Pulsar in this multi-pipeline allow-list changes the /indexes/{index}/sources path so the existing test_create_source_with_bad_config case in quickwit-serve/src/index_api/rest_handler.rs no longer fails config validation for num_pipelines: 2; it proceeds to the metastore lookup for the missing my-index instead, so that test still asserting a 400 with the old Kafka/PubSub-only error will fail. Please update or remove that bad-config case with this feature enabled.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in 97b03e0. Updated test_create_source_with_bad_config to assert the multi-pipeline rejection using a kinesis source (still gated) plus the new error-message wording.
… test Addresses review feedback on distributed Pulsar indexing. P1 — Failover position refresh: with num_pipelines > 1, a pipeline can become the active Failover consumer for a partition another pipeline has already advanced. The `current_positions` snapshot taken in `try_new` is then stale, so the first delta would start from a position behind the committed metastore checkpoint and the publish would be rejected, wedging the partition. We now lazily re-read the committed position from the metastore the first time a partition is consumed in a pipeline session (tracked via `synced_partitions`), so the first recorded delta is contiguous. Mirrors the Kafka source refreshing offsets on assignment. Added unit tests for the `align_position` helper. P2 — Updated `test_create_source_with_bad_config` in quickwit-serve: it asserted the multi-pipeline rejection using a Pulsar source, which is now allowed. Switched it to a kinesis source and the updated error message.
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 97b03e02e5
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| if !self.synced_partitions.insert(partition.clone()) { | ||
| return Ok(()); |
There was a problem hiding this comment.
Resync after repeated Pulsar failovers
With a partitioned Failover subscription, a live pipeline can process partition P, lose it when another pipeline joins, and later regain P when that pipeline leaves. Since synced_partitions is never cleared, the second handoff returns here without re-reading the metastore checkpoint; if the other pipeline advanced P, current_positions is behind and the next batch records a stale (from..to] delta, which SourceCheckpoint::check_compatibility rejects. Distributed Pulsar indexing can therefore stall after normal rebalances; resync on each ownership change or before recording after a gap, not only once per source lifetime.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Good catch — fixed in c326bdd. The previous version tracked synced partitions in a set that was never cleared, so it only resynced once per lifetime. It now records the last time a message was received per partition and re-reads the committed position whenever a partition is consumed for the first time or reappears after a gap (> 30s, a proxy for a Failover re-gain since the Pulsar client does not surface partition reassignments). The merge never moves a position backwards, so continuously-owned partitions never resync and in-flight progress is preserved. Repeated handoffs are now handled; extended the unit tests accordingly.
Follow-up to the failover position-refresh fix. The previous version tracked synced partitions in a set that was never cleared, so it only re-read the committed position once per pipeline lifetime. After a second handoff (a pipeline loses a partition to another pipeline, then regains it once that pipeline leaves), it would reuse the stale local position and record a non-contiguous delta, which the metastore rejects — stalling the partition after a normal rebalance. We now track the last time a message was received per partition and re-read the committed position whenever a partition is consumed for the first time or reappears after a gap longer than POSITION_RESYNC_GAP (30s), a proxy for a Failover (re)gain since the Pulsar client does not surface partition reassignments. The merge never moves a position backwards, so a continuously-owned partition never resyncs and in-flight progress is preserved. Renamed the helper to `advance_position` and extended the unit tests (adopt-when-ahead, keep-local-when-ahead, no-op-when-uncommitted).
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: c326bddd48
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| async fn sync_partition_position(&mut self, partition: &PartitionId) -> anyhow::Result<()> { | ||
| let now = Instant::now(); | ||
| let resync = match self.last_message_at.get(partition) { | ||
| Some(last_message_at) => now.duration_since(*last_message_at) > POSITION_RESYNC_GAP, |
There was a problem hiding this comment.
Resync on every reassigned Pulsar partition
When a Failover rebalance hands a partition from this pipeline to another pipeline and back in under 30 seconds, this branch treats the next message as continuous ownership and skips fetch_checkpoint(). If the other pipeline published while it owned the partition, current_positions is still behind the metastore checkpoint, so the next delta is recorded from a stale from position and the publish is rejected for that partition; this can happen during quick indexer restarts or scale changes, so the resync cannot be gated only on an elapsed-time heuristic.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
You are right that the time-gap heuristic cannot detect every reassignment — the Pulsar client surfaces no partition-assignment events, so no source-level heuristic can be exhaustive. The resync is therefore a best-effort churn optimization, not the correctness guarantee. Correctness is enforced by the metastore: SourceCheckpoint::try_apply_delta rejects any non-contiguous/overlapping delta, so a stale from (including a re-handoff within the gap) results in a rejected publish and a pipeline restart that resumes from the committed checkpoint — never lost or duplicated data. This is the same model the Kafka source relies on across consumer-group rebalances. I documented this on sync_partition_position (d765a6c) and gated the resync to num_pipelines > 1 so single-pipeline sources keep their exact previous behavior.
| let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config) | ||
| .with_pipeline_uid(pipeline_uid) | ||
| .build(); |
There was a problem hiding this comment.
Wire Pulsar broker sources to the test metastore
Because this path still calls build() without injecting the metastore passed to the helper, the new distributed-indexing broker test runs each source against SourceRuntimeBuilder's mock empty checkpoint rather than the real test metastore. That means the test cannot exercise sync_partition_position() against checkpoint updates from another pipeline, so the rebalance/metastore-alignment behavior this change is trying to protect remains untested.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in d765a6c. The broker-test source builder now injects the real test metastore (with_metastore, enabled under the pulsar-broker-tests feature) instead of the empty mock checkpoint. Added test_distributed_source_resumes_from_committed_checkpoint, which seeds a committed checkpoint in the metastore (simulating another pipeline that already indexed the first half) and asserts a distributed source resumes from it and indexes only the uncommitted remainder. The position-merge logic itself is also covered by the advance_position unit tests.
| // checkpoint deltas always come from a single pipeline. `Shared` would | ||
| // spread a partition's messages across pipelines, producing conflicting, | ||
| // non-monotonic deltas that the metastore rejects. | ||
| .with_subscription_type(SubType::Failover) |
There was a problem hiding this comment.
Guard against overlapping Failover consumers
Pulsar Failover can briefly have the old active consumer still processing a partition while the newly elected active consumer starts receiving from it; with num_pipelines > 1, those two Quickwit pipelines can then emit overlapping checkpoint deltas for the same partition. In that rebalance window, one publish is rejected as incompatible (or duplicate docs are staged before the rejection), so relying on Failover alone here is not enough to guarantee the per-partition single-writer invariant this source requires.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Agreed that Failover alone does not prevent a brief overlap between the outgoing and incoming active consumer during election. The per-partition single-writer invariant is not guaranteed at the source layer here — it is guaranteed at the metastore: overlapping deltas for the same partition fail try_apply_delta (IncompatibleCheckpointDelta), so one publish is rejected and that pipeline restarts and resumes from the committed checkpoint. No duplicate docs are committed and no data is lost; the staged-but-unpublished split is simply discarded on the rejected publish. This is the same guarantee the existing Kafka source provides across rebalances. Documented at the SubType::Failover call site and on sync_partition_position in d765a6c.
…tastore backstop; wire test metastore
Addresses the third review round on distributed Pulsar indexing.
- Single-writer correctness is ultimately enforced by the metastore, not by
the source: SourceCheckpoint::try_apply_delta rejects any overlapping or
non-contiguous delta, so a stale position or a brief overlap between the old
and new active Failover consumers during a rebalance yields a rejected
publish and a pipeline restart (resuming from the committed checkpoint),
never lost or duplicated data — the same model the Kafka source relies on
across consumer-group rebalances. Documented this on sync_partition_position.
The time-gap resync stays a best-effort churn optimization; cases it misses
(e.g. a re-handoff within the gap, which the Pulsar client cannot signal) are
caught by that backstop.
- Gate the resync to sources with num_pipelines > 1 (new `distributed` flag):
a single-pipeline source can never hand a partition over, so it never needs
to re-read the checkpoint — restoring the exact previous single-pipeline
behavior and avoiding any extra metastore load for the common case.
- Wire the real test metastore into the Pulsar broker-test source builder
(it previously ran against an empty mock checkpoint) and add
test_distributed_source_resumes_from_committed_checkpoint, which seeds a
committed checkpoint ("another pipeline") and asserts a distributed source
resumes from it instead of re-indexing committed messages. Enables
with_metastore under the pulsar-broker-tests feature.
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d765a6cf09
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| let consumer_name = | ||
| consumer_name(&source_params.consumer_name, source_runtime.pipeline_uid()); |
There was a problem hiding this comment.
Preserve configured Pulsar consumer names for one pipeline
When a Pulsar source keeps the default num_pipelines: 1, this still changes the broker-visible name from the configured consumer_name (for example quickwit) to quickwit-<pipeline_uid> before passing it to with_consumer_name. That is a compatibility regression for existing non-distributed sources and any Pulsar monitoring/automation keyed by the documented consumer name, even though uniqueness is only needed when multiple pipelines connect under the same subscription. Please keep the configured name for num_pipelines == 1 and only suffix it for distributed sources.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in ab1cb8a. The pipeline UID suffix is now applied only when num_pipelines > 1; a single-pipeline source keeps its configured consumer_name unchanged, preserving the previous broker-visible name and any monitoring keyed on it. Reverted the single-pipeline test assertions to the plain name accordingly.
A single-pipeline source kept its configured `consumer_name` before this feature; suffixing it with the pipeline UID unconditionally was a needless broker-visible change that breaks monitoring/automation keyed on the name. Only distributed sources (num_pipelines > 1) need per-pipeline uniqueness under the shared subscription, so single-pipeline sources now keep the configured name unchanged. Reverted the single-pipeline test assertions to the plain name and made the distributed-indexing test actually set num_pipelines = 2.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ab1cb8a6f4
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| return Ok(()); | ||
| } | ||
| let committed = self.source_runtime.fetch_checkpoint().await?; | ||
| advance_position(&mut self.current_positions, partition, &committed); |
There was a problem hiding this comment.
Ack committed Pulsar positions after resync
On a distributed failover where the previous active pipeline published a checkpoint but died before processing SuggestTruncate, this resync can move current_positions past messages the broker is still replaying. Those messages are then skipped in add_doc_to_batch and, if the partition is quiet, no RawDocBatch is published, so publisher::suggest_truncate never sends an ack for the committed position; the Pulsar subscription cursor/backlog stays behind until a later message arrives. When adopting a metastore position here, also seek or cumulative-ack that partition to the adopted position.
Useful? React with 👍 / 👎.
|
@codex review |
What
Enable distributed indexing for the Pulsar source by allowing
num_pipelines > 1, on par with Kafka. This lifts a long-standing limitation (there was a// TODO: uncomment asserts once distributed indexing is activated for pulsar).Why
The control plane already schedules
num_pipelinespipelines for "NonSharded" sources, and Pulsar is already in that arm (get_sources_to_schedule). It spawns N identicalIndexingTasks and delegates partition distribution to the broker — exactly like Kafka relies on consumer-group rebalancing. The only blockers were a config validation gate and the consumer setup.How
quickwit-config/src/source_config/serialize.rs(addSourceParams::Pulsar(_)to the allow-list alongside Kafka / PubSub / S3-notifications).{consumer_name}-{pipeline_uid}) inpulsar_source.rs, so each pipeline connects as a distinct consumer under the shared subscription.consumer_namein the config is now treated as a prefix.SubType::Failover(notShared) — this is a correctness requirement, locked in with a comment. On a partitioned topic, Failover assigns each partition to exactly one active consumer, which matches Quickwit's per-partition checkpoint model (PartitionId = topic-partition, Position = message id).Sharedwould spread a single partition's messages across pipelines, producing conflicting, non-monotonic checkpoint deltas that the metastore rejects.Distribution semantics
Distribution happens per partition, so the topic(s) must be partitioned — same constraint as Kafka. A non-partitioned topic is served by a single active consumer; extra pipelines stay idle as standbys. For balanced load, make the partition count a multiple of
num_pipelines.Tests
num_pipelines > 1fails now asserts success; added a0.8case mirroring Kafka.SourceRuntimeBuilder::with_pipeline_uidtest helper.test_partitioned_topic_distributed_indexing(featurepulsar-broker-tests): two pipelines with distinct pipeline UIDs split a 4-partition topic and the test asserts disjoint partition ownership, no loss, no duplicates, and a consistent merged checkpoint (try_apply_deltaof both deltas succeeds). All 8pulsar-broker-testspass against a real Pulsar broker.Docs
docs/ingest-data/pulsar.md.docs/configuration/source-config.md(partition-multiple guidance + partitioned-topic requirement for Pulsar).