Skip to content
1 change: 1 addition & 0 deletions src/network/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl PeerManager {

pub async fn init(&self) -> Result<(), PeerManagerError> {
let mut peers = self.peers.write().await;

for (peer_addr, peer) in peers.iter_mut() {
let mut new_peer = Peer::new(peer_addr, self.network_magic);

Expand Down
9 changes: 8 additions & 1 deletion src/pipeline/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,16 @@ impl gasket::framework::Worker<Stage> for Worker {
&mut self,
stage: &mut Stage,
) -> Result<WorkSchedule<Vec<Transaction>>, WorkerError> {
let queued_len = stage.output.len().unwrap_or(0);
let current_cap = CAP - queued_len as u16;

if current_cap == 0 {
return Ok(WorkSchedule::Idle);
}

let transactions = stage
.priority
.next(TransactionStatus::Pending, CAP)
.next(TransactionStatus::Pending, current_cap)
.await
.or_retry()?;

Expand Down