diff --git a/src/consumer/kafka.rs b/src/consumer/kafka.rs index c63d5a59..5f7a310b 100644 --- a/src/consumer/kafka.rs +++ b/src/consumer/kafka.rs @@ -359,32 +359,16 @@ pub async fn handle_events( unreachable!("Got partition revocation before the consumer has started") } (ConsumerState::Ready, Event::Shutdown) => ConsumerState::Stopped, - (ConsumerState::Consuming(handles, mut tpl), Event::Assign(mut assigned)) => { - assert!( - tpl.is_disjoint(&assigned), - "Newly assigned TPL should be disjoint from TPL we're consuming from" - ); - tpl.append(&mut assigned); - debug!( - "{} additional topic partitions added after assignment", - assigned.len() - ); - handles.shutdown(CALLBACK_DURATION).await; - ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl) + (ConsumerState::Consuming(_, _), Event::Assign(_)) => { + unreachable!("Got partition assignment after the consumer has started") } - (ConsumerState::Consuming(handles, mut tpl), Event::Revoke(revoked)) => { + (ConsumerState::Consuming(handles, tpl), Event::Revoke(revoked)) => { assert!( - tpl.is_subset(&revoked), - "Revoked TPL should be a subset of TPL we're consuming from" + tpl == revoked, + "Revoked TPL should be equal to the subset of TPL we're consuming from" ); - tpl.retain(|e| !revoked.contains(e)); - debug!("{} topic partitions remaining after revocation", tpl.len()); handles.shutdown(CALLBACK_DURATION).await; - if tpl.is_empty() { - ConsumerState::Ready - } else { - ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl) - } + ConsumerState::Ready } (ConsumerState::Consuming(handles, _), Event::Shutdown) => { handles.shutdown(CALLBACK_DURATION).await;