From 571f93a237b67ab88ad38f3079c158d46bbe008f Mon Sep 17 00:00:00 2001 From: Abeeujah Date: Sat, 20 Jun 2026 21:04:37 +0100 Subject: [PATCH] Avoid heap-allocating background processor futures Replace Box::pin with core::pin::pin! in process_events_async now that MSRV is 1.75. This eliminates a heap allocation per task on every loop iteration by pinning the futures directly to the stack. To satisfy lifetime and Joiner bounds, the loop logic was refactored to run synchronous timer checks first, using flags to conditionally execute the stack-pinned futures. Existing eager polling and early-break semantics are preserved. --- lightning-background-processor/src/lib.rs | 183 ++++++++++++---------- 1 file changed, 102 insertions(+), 81 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 8ab20d5a1f3..d4f06acf784 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1125,16 +1125,15 @@ where // before the persistence flag is set. Capturing outside would let us // observe pending ops while the flag is still unset, causing us to // flush monitor writes without persisting the ChannelManager. - // Declared before futures so it outlives the Joiner (drop order). - let pending_monitor_writes; - - let mut futures = Joiner::new(); - - if channel_manager.get_cm().get_and_clear_needs_persistence() { - pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); - log_trace!(logger, "Persisting ChannelManager..."); - - let fut = async { + let needs_cm_persist = channel_manager.get_cm().get_and_clear_needs_persistence(); + // Captured by the ChannelManager persistence future below. Only meaningful when + // `needs_cm_persist` is set, Declared before the futures so it outlives them + // (drop order). + let pending_monitor_writes = + if needs_cm_persist { chain_monitor.get_cm().pending_operation_count() } else { 0 }; + + let mut cm_fut = core::pin::pin!(async { + if needs_cm_persist { kv_store .write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1147,22 +1146,26 @@ where // Flush monitor operations that were pending before we persisted. New updates // that arrived after are left for the next iteration. chain_monitor.get_cm().flush(pending_monitor_writes, &logger); - Ok(()) - }; - // TODO: Once our MSRV is 1.68 we should be able to drop the Box - let mut fut = Box::pin(fut); - - // Because persisting the ChannelManager is important to avoid accidental - // force-closures, go ahead and poll the future once before we do slightly more - // CPU-intensive tasks in the form of NetworkGraph pruning or scorer time-stepping - // below. This will get it moving but won't block us for too long if the underlying - // future is actually async. + } + Ok(()) + }); + + // Because persisting the ChannelManager is important to avoid accidental force-closures, + // go ahead and poll the future once before we do slightly more CPU-intensive tasks in the + // form of NetworkGraph pruning or scorer time-stepping below. This will get it moving but + // won't block us for too long if the underlying future is actually async. We stash the + // outcome and feed it into the `Joiner` once it is constructed. + let mut cm_persist_res = None; + let mut cm_persist_pending = false; + if needs_cm_persist { + log_trace!(logger, "Persisting ChannelManager..."); + use core::future::Future; let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); - match core::pin::Pin::new(&mut fut).poll(&mut ctx) { - task::Poll::Ready(res) => futures.set_a_res(res), - task::Poll::Pending => futures.set_a(fut), + match cm_fut.as_mut().poll(&mut ctx) { + task::Poll::Ready(res) => cm_persist_res = Some(res), + task::Poll::Pending => cm_persist_pending = true, } log_trace!(logger, "Done persisting ChannelManager."); @@ -1210,7 +1213,8 @@ where GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed, _ => prune_timer_elapsed, }; - if should_prune { + + let network_graph_to_persist = if should_prune { // The network graph must not be pruned while rapid sync completion is pending if let Some(network_graph) = gossip_sync.prunable_network_graph() { if let Some(duration_since_epoch) = fetch_time() { @@ -1222,28 +1226,15 @@ where log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually."); log_trace!(logger, "Persisting network graph."); } - let fut = async { - if let Err(e) = kv_store - .write( - NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_KEY, - network_graph.encode(), - ) - .await - { - log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e); - } - - Ok(()) - }; - - // TODO: Once our MSRV is 1.68 we should be able to drop the Box - futures.set_b(Box::pin(fut)); have_pruned = true; + Some(network_graph) + } else { + None } - } + } else { + None + }; if !have_decayed_scorer { if let Some(ref scorer) = scorer { if let Some(duration_since_epoch) = fetch_time() { @@ -1253,7 +1244,9 @@ where } have_decayed_scorer = true; } - match check_and_reset_sleeper(&mut last_scorer_persist_call, || { + // Step the scorer forward synchronously here, deferring the actual write to the + // future built below. + let persist_scorer = match check_and_reset_sleeper(&mut last_scorer_persist_call, || { sleeper(SCORER_PERSIST_TIMER) }) { Some(false) => { @@ -1264,7 +1257,44 @@ where } else { log_trace!(logger, "Persisting scorer"); } - let fut = async { + true + } else { + false + } + }, + Some(true) => break, + None => false, + }; + let persist_sweeper = + match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) { + Some(false) => { + log_trace!(logger, "Regenerating sweeper spends if necessary"); + true + }, + Some(true) => break, + None => false, + }; + + let network_graph_fut = core::pin::pin!(async { + if let Some(network_graph) = network_graph_to_persist { + if let Err(e) = kv_store + .write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + network_graph.encode(), + ) + .await + { + log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e); + } + } + Ok(()) + }); + let scorer_fut = + core::pin::pin!(async { + if persist_scorer { + if let Some(ref scorer) = scorer { if let Err(e) = kv_store .write( SCORER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1274,43 +1304,22 @@ where ) .await { - log_error!( - logger, - "Error: Failed to persist scorer, check your disk and permissions {}", - e - ); + log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e); } - - Ok(()) - }; - - // TODO: Once our MSRV is 1.68 we should be able to drop the Box - futures.set_c(Box::pin(fut)); + } } - }, - Some(true) => break, - None => {}, - } - match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) { - Some(false) => { - log_trace!(logger, "Regenerating sweeper spends if necessary"); + Ok(()) + }); + let sweeper_fut = core::pin::pin!(async { + if persist_sweeper { if let Some(ref sweeper) = sweeper { - let fut = async { - let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await; - - Ok(()) - }; - - // TODO: Once our MSRV is 1.68 we should be able to drop the Box - futures.set_d(Box::pin(fut)); + let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await; } - }, - Some(true) => break, - None => {}, - } - - if let Some(liquidity_manager) = liquidity_manager.as_ref() { - let fut = async { + } + Ok(()) + }); + let lm_fut = core::pin::pin!(async { + if let Some(liquidity_manager) = liquidity_manager.as_ref() { liquidity_manager .get_lm() .persist() @@ -1324,9 +1333,21 @@ where log_error!(logger, "Persisting LiquidityManager failed: {}", e); e }) - }; - futures.set_e(Box::pin(fut)); - } + } else { + Ok(()) + } + }); + + let mut futures = Joiner::new(); + if let Some(res) = cm_persist_res { + futures.set_a_res(res); + } else if cm_persist_pending { + futures.set_a(cm_fut); + } + futures.set_b(network_graph_fut); + futures.set_c(scorer_fut); + futures.set_d(sweeper_fut); + futures.set_e(lm_fut); // Run persistence tasks in parallel and exit if any of them returns an error. for res in futures.await {