diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 8ab20d5a1f3..d4f06acf784 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1125,16 +1125,15 @@ where // before the persistence flag is set. Capturing outside would let us // observe pending ops while the flag is still unset, causing us to // flush monitor writes without persisting the ChannelManager. - // Declared before futures so it outlives the Joiner (drop order). - let pending_monitor_writes; - - let mut futures = Joiner::new(); - - if channel_manager.get_cm().get_and_clear_needs_persistence() { - pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); - log_trace!(logger, "Persisting ChannelManager..."); - - let fut = async { + let needs_cm_persist = channel_manager.get_cm().get_and_clear_needs_persistence(); + // Captured by the ChannelManager persistence future below. Only meaningful when + // `needs_cm_persist` is set, Declared before the futures so it outlives them + // (drop order). + let pending_monitor_writes = + if needs_cm_persist { chain_monitor.get_cm().pending_operation_count() } else { 0 }; + + let mut cm_fut = core::pin::pin!(async { + if needs_cm_persist { kv_store .write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1147,22 +1146,26 @@ where // Flush monitor operations that were pending before we persisted. New updates // that arrived after are left for the next iteration. chain_monitor.get_cm().flush(pending_monitor_writes, &logger); - Ok(()) - }; - // TODO: Once our MSRV is 1.68 we should be able to drop the Box - let mut fut = Box::pin(fut); - - // Because persisting the ChannelManager is important to avoid accidental - // force-closures, go ahead and poll the future once before we do slightly more - // CPU-intensive tasks in the form of NetworkGraph pruning or scorer time-stepping - // below. This will get it moving but won't block us for too long if the underlying - // future is actually async. + } + Ok(()) + }); + + // Because persisting the ChannelManager is important to avoid accidental force-closures, + // go ahead and poll the future once before we do slightly more CPU-intensive tasks in the + // form of NetworkGraph pruning or scorer time-stepping below. This will get it moving but + // won't block us for too long if the underlying future is actually async. We stash the + // outcome and feed it into the `Joiner` once it is constructed. + let mut cm_persist_res = None; + let mut cm_persist_pending = false; + if needs_cm_persist { + log_trace!(logger, "Persisting ChannelManager..."); + use core::future::Future; let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); - match core::pin::Pin::new(&mut fut).poll(&mut ctx) { - task::Poll::Ready(res) => futures.set_a_res(res), - task::Poll::Pending => futures.set_a(fut), + match cm_fut.as_mut().poll(&mut ctx) { + task::Poll::Ready(res) => cm_persist_res = Some(res), + task::Poll::Pending => cm_persist_pending = true, } log_trace!(logger, "Done persisting ChannelManager."); @@ -1210,7 +1213,8 @@ where GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed, _ => prune_timer_elapsed, }; - if should_prune { + + let network_graph_to_persist = if should_prune { // The network graph must not be pruned while rapid sync completion is pending if let Some(network_graph) = gossip_sync.prunable_network_graph() { if let Some(duration_since_epoch) = fetch_time() { @@ -1222,28 +1226,15 @@ where log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually."); log_trace!(logger, "Persisting network graph."); } - let fut = async { - if let Err(e) = kv_store - .write( - NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_KEY, - network_graph.encode(), - ) - .await - { - log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e); - } - - Ok(()) - }; - - // TODO: Once our MSRV is 1.68 we should be able to drop the Box - futures.set_b(Box::pin(fut)); have_pruned = true; + Some(network_graph) + } else { + None } - } + } else { + None + }; if !have_decayed_scorer { if let Some(ref scorer) = scorer { if let Some(duration_since_epoch) = fetch_time() { @@ -1253,7 +1244,9 @@ where } have_decayed_scorer = true; } - match check_and_reset_sleeper(&mut last_scorer_persist_call, || { + // Step the scorer forward synchronously here, deferring the actual write to the + // future built below. + let persist_scorer = match check_and_reset_sleeper(&mut last_scorer_persist_call, || { sleeper(SCORER_PERSIST_TIMER) }) { Some(false) => { @@ -1264,7 +1257,44 @@ where } else { log_trace!(logger, "Persisting scorer"); } - let fut = async { + true + } else { + false + } + }, + Some(true) => break, + None => false, + }; + let persist_sweeper = + match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) { + Some(false) => { + log_trace!(logger, "Regenerating sweeper spends if necessary"); + true + }, + Some(true) => break, + None => false, + }; + + let network_graph_fut = core::pin::pin!(async { + if let Some(network_graph) = network_graph_to_persist { + if let Err(e) = kv_store + .write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + network_graph.encode(), + ) + .await + { + log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e); + } + } + Ok(()) + }); + let scorer_fut = + core::pin::pin!(async { + if persist_scorer { + if let Some(ref scorer) = scorer { if let Err(e) = kv_store .write( SCORER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1274,43 +1304,22 @@ where ) .await { - log_error!( - logger, - "Error: Failed to persist scorer, check your disk and permissions {}", - e - ); + log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e); } - - Ok(()) - }; - - // TODO: Once our MSRV is 1.68 we should be able to drop the Box - futures.set_c(Box::pin(fut)); + } } - }, - Some(true) => break, - None => {}, - } - match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) { - Some(false) => { - log_trace!(logger, "Regenerating sweeper spends if necessary"); + Ok(()) + }); + let sweeper_fut = core::pin::pin!(async { + if persist_sweeper { if let Some(ref sweeper) = sweeper { - let fut = async { - let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await; - - Ok(()) - }; - - // TODO: Once our MSRV is 1.68 we should be able to drop the Box - futures.set_d(Box::pin(fut)); + let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await; } - }, - Some(true) => break, - None => {}, - } - - if let Some(liquidity_manager) = liquidity_manager.as_ref() { - let fut = async { + } + Ok(()) + }); + let lm_fut = core::pin::pin!(async { + if let Some(liquidity_manager) = liquidity_manager.as_ref() { liquidity_manager .get_lm() .persist() @@ -1324,9 +1333,21 @@ where log_error!(logger, "Persisting LiquidityManager failed: {}", e); e }) - }; - futures.set_e(Box::pin(fut)); - } + } else { + Ok(()) + } + }); + + let mut futures = Joiner::new(); + if let Some(res) = cm_persist_res { + futures.set_a_res(res); + } else if cm_persist_pending { + futures.set_a(cm_fut); + } + futures.set_b(network_graph_fut); + futures.set_c(scorer_fut); + futures.set_d(sweeper_fut); + futures.set_e(lm_fut); // Run persistence tasks in parallel and exit if any of them returns an error. for res in futures.await {