Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 102 additions & 81 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1125,16 +1125,15 @@ where
// before the persistence flag is set. Capturing outside would let us
// observe pending ops while the flag is still unset, causing us to
// flush monitor writes without persisting the ChannelManager.
// Declared before futures so it outlives the Joiner (drop order).
let pending_monitor_writes;

let mut futures = Joiner::new();

if channel_manager.get_cm().get_and_clear_needs_persistence() {
pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
log_trace!(logger, "Persisting ChannelManager...");

let fut = async {
let needs_cm_persist = channel_manager.get_cm().get_and_clear_needs_persistence();
// Captured by the ChannelManager persistence future below. Only meaningful when
// `needs_cm_persist` is set, Declared before the futures so it outlives them
// (drop order).
let pending_monitor_writes =
if needs_cm_persist { chain_monitor.get_cm().pending_operation_count() } else { 0 };

let mut cm_fut = core::pin::pin!(async {
if needs_cm_persist {
kv_store
.write(
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
Expand All @@ -1147,22 +1146,26 @@ where
// Flush monitor operations that were pending before we persisted. New updates
// that arrived after are left for the next iteration.
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);
Ok(())
};
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
let mut fut = Box::pin(fut);

// Because persisting the ChannelManager is important to avoid accidental
// force-closures, go ahead and poll the future once before we do slightly more
// CPU-intensive tasks in the form of NetworkGraph pruning or scorer time-stepping
// below. This will get it moving but won't block us for too long if the underlying
// future is actually async.
}
Ok(())
});

// Because persisting the ChannelManager is important to avoid accidental force-closures,
// go ahead and poll the future once before we do slightly more CPU-intensive tasks in the
// form of NetworkGraph pruning or scorer time-stepping below. This will get it moving but
// won't block us for too long if the underlying future is actually async. We stash the
// outcome and feed it into the `Joiner` once it is constructed.
let mut cm_persist_res = None;
let mut cm_persist_pending = false;
if needs_cm_persist {
log_trace!(logger, "Persisting ChannelManager...");

use core::future::Future;
let mut waker = dummy_waker();
let mut ctx = task::Context::from_waker(&mut waker);
match core::pin::Pin::new(&mut fut).poll(&mut ctx) {
task::Poll::Ready(res) => futures.set_a_res(res),
task::Poll::Pending => futures.set_a(fut),
match cm_fut.as_mut().poll(&mut ctx) {
task::Poll::Ready(res) => cm_persist_res = Some(res),
task::Poll::Pending => cm_persist_pending = true,
}

log_trace!(logger, "Done persisting ChannelManager.");
Expand Down Expand Up @@ -1210,7 +1213,8 @@ where
GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
_ => prune_timer_elapsed,
};
if should_prune {

let network_graph_to_persist = if should_prune {
// The network graph must not be pruned while rapid sync completion is pending
if let Some(network_graph) = gossip_sync.prunable_network_graph() {
if let Some(duration_since_epoch) = fetch_time() {
Expand All @@ -1222,28 +1226,15 @@ where
log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually.");
log_trace!(logger, "Persisting network graph.");
}
let fut = async {
if let Err(e) = kv_store
.write(
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_KEY,
network_graph.encode(),
)
.await
{
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
}

Ok(())
};

// TODO: Once our MSRV is 1.68 we should be able to drop the Box
futures.set_b(Box::pin(fut));

have_pruned = true;
Some(network_graph)
} else {
None
}
}
} else {
None
};
if !have_decayed_scorer {
if let Some(ref scorer) = scorer {
if let Some(duration_since_epoch) = fetch_time() {
Expand All @@ -1253,7 +1244,9 @@ where
}
have_decayed_scorer = true;
}
match check_and_reset_sleeper(&mut last_scorer_persist_call, || {
// Step the scorer forward synchronously here, deferring the actual write to the
// future built below.
let persist_scorer = match check_and_reset_sleeper(&mut last_scorer_persist_call, || {
sleeper(SCORER_PERSIST_TIMER)
}) {
Some(false) => {
Expand All @@ -1264,7 +1257,44 @@ where
} else {
log_trace!(logger, "Persisting scorer");
}
let fut = async {
true
} else {
false
}
},
Some(true) => break,
None => false,
};
let persist_sweeper =
match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) {
Some(false) => {
log_trace!(logger, "Regenerating sweeper spends if necessary");
true
},
Some(true) => break,
None => false,
};

let network_graph_fut = core::pin::pin!(async {
if let Some(network_graph) = network_graph_to_persist {
if let Err(e) = kv_store
.write(
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_KEY,
network_graph.encode(),
)
.await
{
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
}
}
Ok(())
});
let scorer_fut =
core::pin::pin!(async {
if persist_scorer {
if let Some(ref scorer) = scorer {
if let Err(e) = kv_store
.write(
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
Expand All @@ -1274,43 +1304,22 @@ where
)
.await
{
log_error!(
logger,
"Error: Failed to persist scorer, check your disk and permissions {}",
e
);
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
}

Ok(())
};

// TODO: Once our MSRV is 1.68 we should be able to drop the Box
futures.set_c(Box::pin(fut));
}
}
},
Some(true) => break,
None => {},
}
match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) {
Some(false) => {
log_trace!(logger, "Regenerating sweeper spends if necessary");
Ok(())
});
let sweeper_fut = core::pin::pin!(async {
if persist_sweeper {
if let Some(ref sweeper) = sweeper {
let fut = async {
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;

Ok(())
};

// TODO: Once our MSRV is 1.68 we should be able to drop the Box
futures.set_d(Box::pin(fut));
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
}
},
Some(true) => break,
None => {},
}

if let Some(liquidity_manager) = liquidity_manager.as_ref() {
let fut = async {
}
Ok(())
});
let lm_fut = core::pin::pin!(async {
if let Some(liquidity_manager) = liquidity_manager.as_ref() {
liquidity_manager
.get_lm()
.persist()
Expand All @@ -1324,9 +1333,21 @@ where
log_error!(logger, "Persisting LiquidityManager failed: {}", e);
e
})
};
futures.set_e(Box::pin(fut));
}
} else {
Ok(())
}
});

let mut futures = Joiner::new();
if let Some(res) = cm_persist_res {
futures.set_a_res(res);
} else if cm_persist_pending {
futures.set_a(cm_fut);
}
futures.set_b(network_graph_fut);
futures.set_c(scorer_fut);
futures.set_d(sweeper_fut);
futures.set_e(lm_fut);

// Run persistence tasks in parallel and exit if any of them returns an error.
for res in futures.await {
Expand Down