diff --git a/pytest.ini b/pytest.ini index a96b6bb9..54ac7299 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,2 +1,4 @@ [pytest] addopts = --benchmark-disable +# rewrite asserts in tests/backends/mixins.py as well +python_files = tests/*.py diff --git a/tests/__init__.py b/tests/__init__.py index e69de29b..a5461d0f 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1,8 @@ +from typing import ContextManager, Protocol, TypeVar + +T = TypeVar("T") + + +class WithStmt(Protocol): + def __call__(self, value: ContextManager[T]) -> T: + ... diff --git a/tests/backends/mixins.py b/tests/backends/mixins.py index 7fb9f92e..662afb97 100644 --- a/tests/backends/mixins.py +++ b/tests/backends/mixins.py @@ -10,6 +10,7 @@ from arroyo.backends.abstract import Consumer, Producer from arroyo.errors import ConsumerError, EndOfPartition, OffsetOutOfRange from arroyo.types import BrokerValue, Partition, Topic, TStrategyPayload +from tests import WithStmt from tests.assertions import assert_changes, assert_does_not_change @@ -34,172 +35,171 @@ def get_producer(self) -> Producer[TStrategyPayload]: def get_payloads(self) -> Iterator[TStrategyPayload]: raise NotImplementedError - def test_consumer(self) -> None: + def test_consumer(self, with_stmt: WithStmt) -> None: group = uuid.uuid1().hex payloads = self.get_payloads() - with self.get_topic() as topic: - with closing(self.get_producer()) as producer: - messages = [ - future.result(timeout=5.0) - for future in [ - producer.produce(topic, next(payloads)) for i in range(2) - ] - ] - - consumer = self.get_consumer(group) - - def _assignment_callback(partitions: Mapping[Partition, int]) -> None: - assert partitions == {Partition(topic, 0): messages[0].offset} - - consumer.seek({Partition(topic, 0): messages[1].offset}) + topic = with_stmt(self.get_topic()) + producer = with_stmt(closing(self.get_producer())) - with pytest.raises(ConsumerError): - consumer.seek({Partition(topic, 1): 0}) + messages = [ + future.result(timeout=5.0) + for future in [producer.produce(topic, next(payloads)) for i in range(2)] + ] - assignment_callback = mock.Mock(side_effect=_assignment_callback) + consumer = self.get_consumer(group) - def _revocation_callback(partitions: Sequence[Partition]) -> None: - assert partitions == [Partition(topic, 0)] - assert consumer.tell() == {Partition(topic, 0): messages[1].offset} + def _assignment_callback(partitions: Mapping[Partition, int]) -> None: + assert partitions == {Partition(topic, 0): messages[0].offset} - # Not sure why you'd want to do this, but it shouldn't error. - consumer.seek({Partition(topic, 0): messages[0].offset}) + consumer.seek({Partition(topic, 0): messages[1].offset}) - revocation_callback = mock.Mock(side_effect=_revocation_callback) + with pytest.raises(ConsumerError): + consumer.seek({Partition(topic, 1): 0}) - # TODO: It'd be much nicer if ``subscribe`` returned a future that we could - # use to wait for assignment, but we'd need to be very careful to avoid - # edge cases here. It's probably not worth the complexity for now. - consumer.subscribe( - [topic], on_assign=assignment_callback, on_revoke=revocation_callback - ) + assignment_callback = mock.Mock(side_effect=_assignment_callback) - with assert_changes( - lambda: assignment_callback.called, False, True - ), assert_changes( - consumer.tell, - {}, - {Partition(topic, 0): messages[1].next_offset}, - ): - value = consumer.poll(10.0) # XXX: getting the subcription is slow - - assert isinstance(value, BrokerValue) - assert value.committable == messages[1].committable - assert value.partition == Partition(topic, 0) - assert value.offset == messages[1].offset - assert value == messages[1] + def _revocation_callback(partitions: Sequence[Partition]) -> None: + assert partitions == [Partition(topic, 0)] + assert consumer.tell() == {Partition(topic, 0): messages[1].offset} + # Not sure why you'd want to do this, but it shouldn't error. consumer.seek({Partition(topic, 0): messages[0].offset}) - assert consumer.tell() == {Partition(topic, 0): messages[0].offset} - with pytest.raises(ConsumerError): - consumer.seek({Partition(topic, 1): 0}) + revocation_callback = mock.Mock(side_effect=_revocation_callback) + + # TODO: It'd be much nicer if ``subscribe`` returned a future that we could + # use to wait for assignment, but we'd need to be very careful to avoid + # edge cases here. It's probably not worth the complexity for now. + consumer.subscribe( + [topic], on_assign=assignment_callback, on_revoke=revocation_callback + ) + + with assert_changes( + lambda: assignment_callback.called, False, True + ), assert_changes( + consumer.tell, + {}, + {Partition(topic, 0): messages[1].next_offset}, + ): + value = consumer.poll(10.0) # XXX: getting the subcription is slow + + assert isinstance(value, BrokerValue) + assert value.committable == messages[1].committable + assert value.partition == Partition(topic, 0) + assert value.offset == messages[1].offset + assert value == messages[1] + + consumer.seek({Partition(topic, 0): messages[0].offset}) + assert consumer.tell() == {Partition(topic, 0): messages[0].offset} + + with pytest.raises(ConsumerError): + consumer.seek({Partition(topic, 1): 0}) + + with assert_changes(consumer.paused, [], [Partition(topic, 0)]): + consumer.pause([Partition(topic, 0)]) - with assert_changes(consumer.paused, [], [Partition(topic, 0)]): - consumer.pause([Partition(topic, 0)]) + # Even if there is another message available, ``poll`` should + # return ``None`` if the consumer is paused. + assert consumer.poll(1.0) is None - # Even if there is another message available, ``poll`` should - # return ``None`` if the consumer is paused. - assert consumer.poll(1.0) is None + with assert_changes(consumer.paused, [Partition(topic, 0)], []): + consumer.resume([Partition(topic, 0)]) - with assert_changes(consumer.paused, [Partition(topic, 0)], []): - consumer.resume([Partition(topic, 0)]) + value = consumer.poll(5.0) + assert isinstance(value, BrokerValue) + assert value.partition == Partition(topic, 0) + assert value.offset == messages[0].offset + assert value.payload == messages[0].payload - value = consumer.poll(5.0) - assert isinstance(value, BrokerValue) - assert value.partition == Partition(topic, 0) - assert value.offset == messages[0].offset - assert value.payload == messages[0].payload + assert consumer.commit_offsets() == {} - assert consumer.commit_offsets() == {} + consumer.stage_offsets(value.committable) - consumer.stage_offsets(value.committable) + assert consumer.commit_offsets() == {Partition(topic, 0): value.next_offset} - assert consumer.commit_offsets() == {Partition(topic, 0): value.next_offset} + consumer.stage_offsets({Partition(Topic("invalid"), 0): 0}) - consumer.stage_offsets({Partition(Topic("invalid"), 0): 0}) + with pytest.raises(ConsumerError): + consumer.commit_offsets() - with pytest.raises(ConsumerError): - consumer.commit_offsets() + assert consumer.tell() == {Partition(topic, 0): messages[1].offset} - assert consumer.tell() == {Partition(topic, 0): messages[1].offset} + consumer.unsubscribe() - consumer.unsubscribe() - - with assert_changes(lambda: revocation_callback.called, False, True): - assert consumer.poll(1.0) is None + with assert_changes(lambda: revocation_callback.called, False, True): + assert consumer.poll(1.0) is None - assert consumer.tell() == {} + assert consumer.tell() == {} - with pytest.raises(ConsumerError): - consumer.seek({Partition(topic, 0): messages[0].offset}) + with pytest.raises(ConsumerError): + consumer.seek({Partition(topic, 0): messages[0].offset}) - revocation_callback.reset_mock() + revocation_callback.reset_mock() - with assert_changes( - lambda: consumer.closed, False, True - ), assert_does_not_change(lambda: revocation_callback.called, False): - consumer.close() + with assert_changes( + lambda: consumer.closed, False, True + ), assert_does_not_change(lambda: revocation_callback.called, False): + consumer.close() - # Make sure all public methods (except ``close```) error if called - # after the consumer has been closed. + # Make sure all public methods (except ``close```) error if called + # after the consumer has been closed. - with pytest.raises(RuntimeError): - consumer.subscribe([topic]) + with pytest.raises(RuntimeError): + consumer.subscribe([topic]) - with pytest.raises(RuntimeError): - consumer.unsubscribe() + with pytest.raises(RuntimeError): + consumer.unsubscribe() - with pytest.raises(RuntimeError): - consumer.poll() + with pytest.raises(RuntimeError): + consumer.poll() - with pytest.raises(RuntimeError): - consumer.tell() + with pytest.raises(RuntimeError): + consumer.tell() - with pytest.raises(RuntimeError): - consumer.seek({Partition(topic, 0): messages[0].offset}) + with pytest.raises(RuntimeError): + consumer.seek({Partition(topic, 0): messages[0].offset}) - with pytest.raises(RuntimeError): - consumer.pause([Partition(topic, 0)]) + with pytest.raises(RuntimeError): + consumer.pause([Partition(topic, 0)]) - with pytest.raises(RuntimeError): - consumer.resume([Partition(topic, 0)]) + with pytest.raises(RuntimeError): + consumer.resume([Partition(topic, 0)]) - with pytest.raises(RuntimeError): - consumer.paused() + with pytest.raises(RuntimeError): + consumer.paused() - # stage_positions does not validate anything - consumer.stage_offsets({}) + # stage_positions does not validate anything + consumer.stage_offsets({}) - with pytest.raises(RuntimeError): - consumer.commit_offsets() + with pytest.raises(RuntimeError): + consumer.commit_offsets() - consumer.close() # should be safe, even if the consumer is already closed + consumer.close() # should be safe, even if the consumer is already closed - consumer = self.get_consumer(group) + consumer = self.get_consumer(group) - revocation_callback = mock.MagicMock() + revocation_callback = mock.MagicMock() - consumer.subscribe([topic], on_revoke=revocation_callback) + consumer.subscribe([topic], on_revoke=revocation_callback) - value = consumer.poll(10.0) # XXX: getting the subscription is slow - assert isinstance(value, BrokerValue) - assert value.partition == Partition(topic, 0) - assert value.offset == messages[1].offset - assert value.payload == messages[1].payload - assert value == messages[1] + value = consumer.poll(10.0) # XXX: getting the subscription is slow + assert isinstance(value, BrokerValue) + assert value.partition == Partition(topic, 0) + assert value.offset == messages[1].offset + assert value.payload == messages[1].payload + assert value == messages[1] - try: - assert consumer.poll(1.0) is None - except EndOfPartition as error: - assert error.partition == Partition(topic, 0) - assert error.offset == value.next_offset - else: - raise AssertionError("expected EndOfPartition error") + try: + assert consumer.poll(1.0) is None + except EndOfPartition as error: + assert error.partition == Partition(topic, 0) + assert error.offset == value.next_offset + else: + raise AssertionError("expected EndOfPartition error") - with assert_changes(lambda: revocation_callback.called, False, True): - consumer.close() + with assert_changes(lambda: revocation_callback.called, False, True): + consumer.close() def test_consumer_offset_out_of_range(self) -> None: payloads = self.get_payloads() @@ -231,172 +231,172 @@ def test_consumer_offset_out_of_range(self) -> None: with pytest.raises(OffsetOutOfRange): consumer.poll() - def test_working_offsets(self) -> None: + def test_working_offsets(self, with_stmt: WithStmt) -> None: payloads = self.get_payloads() - with self.get_topic() as topic: - with closing(self.get_producer()) as producer: - messages = [producer.produce(topic, next(payloads)).result(5.0)] + topic = with_stmt(self.get_topic()) - def on_assign(partitions: Mapping[Partition, int]) -> None: - # NOTE: This will eventually need to be controlled by a generalized - # consumer auto offset reset setting. + with closing(self.get_producer()) as producer: + messages = [producer.produce(topic, next(payloads)).result(5.0)] - assert ( - partitions - == consumer.tell() - == {messages[0].partition: messages[0].offset} - ) + def on_assign(partitions: Mapping[Partition, int]) -> None: + # NOTE: This will eventually need to be controlled by a generalized + # consumer auto offset reset setting. - consumer = self.get_consumer() - consumer.subscribe([topic], on_assign=on_assign) + assert ( + partitions + == consumer.tell() + == {messages[0].partition: messages[0].offset} + ) - for i in range(5): - value = consumer.poll(1.0) - if value is not None: - break - else: - time.sleep(1.0) + consumer = self.get_consumer() + consumer.subscribe([topic], on_assign=on_assign) + + for i in range(5): + value = consumer.poll(1.0) + if value is not None: + break else: - raise Exception("assignment never received") + time.sleep(1.0) + else: + raise Exception("assignment never received") + + assert isinstance(value, BrokerValue) + assert value == messages[0] + + # The first call to ``poll`` should raise ``EndOfPartition``. It + # should be otherwise be safe to try to read the first missing + # offset (index) in the partition. + with assert_does_not_change( + consumer.tell, {value.partition: value.next_offset} + ), pytest.raises(EndOfPartition): + assert consumer.poll(1.0) is None - assert isinstance(value, BrokerValue) + # It should be otherwise be safe to try to read the first missing + # offset (index) in the partition. + with assert_does_not_change( + consumer.tell, {value.partition: value.next_offset} + ): + assert consumer.poll(1.0) is None + + with assert_changes( + consumer.tell, + {value.partition: value.next_offset}, + {value.partition: value.offset}, + ): + consumer.seek({value.partition: value.offset}) + + with assert_changes( + consumer.tell, + {value.partition: value.offset}, + {value.partition: value.next_offset}, + ): + value = consumer.poll() assert value == messages[0] - # The first call to ``poll`` should raise ``EndOfPartition``. It - # should be otherwise be safe to try to read the first missing - # offset (index) in the partition. - with assert_does_not_change( - consumer.tell, {value.partition: value.next_offset} - ), pytest.raises(EndOfPartition): - assert consumer.poll(1.0) is None - - # It should be otherwise be safe to try to read the first missing - # offset (index) in the partition. - with assert_does_not_change( - consumer.tell, {value.partition: value.next_offset} - ): - assert consumer.poll(1.0) is None - - with assert_changes( - consumer.tell, - {value.partition: value.next_offset}, - {value.partition: value.offset}, - ): - consumer.seek({value.partition: value.offset}) - - with assert_changes( - consumer.tell, - {value.partition: value.offset}, - {value.partition: value.next_offset}, - ): - value = consumer.poll() - assert value == messages[0] - - # Seeking beyond the first missing index should work but subsequent - # reads should error. (We don't know if this offset is valid or not - # until we try to fetch a message.) - assert isinstance(value, BrokerValue) - with assert_changes( - consumer.tell, - {value.partition: value.next_offset}, - {value.partition: value.next_offset + 1}, - ): - consumer.seek({value.partition: value.next_offset + 1}) - - # Offsets should not be advanced after a failed poll. - with assert_does_not_change( - consumer.tell, - {value.partition: value.next_offset + 1}, - ), pytest.raises(ConsumerError): - consumer.poll(1.0) - - # Trying to seek on an unassigned partition should error. - with assert_does_not_change( - consumer.tell, - {value.partition: value.next_offset + 1}, - ), pytest.raises(ConsumerError): - consumer.seek({value.partition: 0, Partition(topic, -1): 0}) - - # Trying to seek to a negative offset should error. - with assert_does_not_change( - consumer.tell, - {value.partition: value.next_offset + 1}, - ), pytest.raises(ConsumerError): - consumer.seek({value.partition: -1}) - - def test_pause_resume(self) -> None: + # Seeking beyond the first missing index should work but subsequent + # reads should error. (We don't know if this offset is valid or not + # until we try to fetch a message.) + assert isinstance(value, BrokerValue) + with assert_changes( + consumer.tell, + {value.partition: value.next_offset}, + {value.partition: value.next_offset + 1}, + ): + consumer.seek({value.partition: value.next_offset + 1}) + + # Offsets should not be advanced after a failed poll. + with assert_does_not_change( + consumer.tell, + {value.partition: value.next_offset + 1}, + ), pytest.raises(ConsumerError): + consumer.poll(1.0) + + # Trying to seek on an unassigned partition should error. + with assert_does_not_change( + consumer.tell, + {value.partition: value.next_offset + 1}, + ), pytest.raises(ConsumerError): + consumer.seek({value.partition: 0, Partition(topic, -1): 0}) + + # Trying to seek to a negative offset should error. + with assert_does_not_change( + consumer.tell, + {value.partition: value.next_offset + 1}, + ), pytest.raises(ConsumerError): + consumer.seek({value.partition: -1}) + + def test_pause_resume(self, with_stmt: WithStmt) -> None: payloads = self.get_payloads() - with self.get_topic() as topic, closing( - self.get_consumer() - ) as consumer, closing(self.get_producer()) as producer: - messages = [ - producer.produce(topic, next(payloads)).result(timeout=5.0) - for i in range(5) - ] + topic = with_stmt(self.get_topic()) + consumer = with_stmt(closing(self.get_consumer())) + producer = with_stmt(closing(self.get_producer())) - consumer.subscribe([topic]) + messages = [ + producer.produce(topic, next(payloads)).result(timeout=5.0) + for i in range(5) + ] + + consumer.subscribe([topic]) + + assert consumer.poll(10.0) == messages[0] + assert consumer.paused() == [] + + # XXX: Unfortunately, there is really no way to prove that this + # consumer would return the message other than by waiting a while. + with assert_changes(consumer.paused, [], [Partition(topic, 0)]): + consumer.pause([Partition(topic, 0)]) - assert consumer.poll(10.0) == messages[0] - assert consumer.paused() == [] + assert consumer.poll(1.0) is None - # XXX: Unfortunately, there is really no way to prove that this - # consumer would return the message other than by waiting a while. - with assert_changes(consumer.paused, [], [Partition(topic, 0)]): - consumer.pause([Partition(topic, 0)]) + # We should pick up where we left off when we resume the partition. + with assert_changes(consumer.paused, [Partition(topic, 0)], []): + consumer.resume([Partition(topic, 0)]) + assert consumer.poll(5.0) == messages[1] + + # Calling ``seek`` should have a side effect, even if no messages + # are consumed before calling ``pause``. + with assert_changes( + consumer.tell, + {Partition(topic, 0): messages[1].next_offset}, + {Partition(topic, 0): messages[3].offset}, + ): + consumer.seek({Partition(topic, 0): messages[3].offset}) + consumer.pause([Partition(topic, 0)]) assert consumer.poll(1.0) is None + consumer.resume([Partition(topic, 0)]) + + assert consumer.poll(5.0) == messages[3] + + # It is still allowable to call ``seek`` on a paused partition. + # When consumption resumes, we would expect to see the side effect + # of that seek. + consumer.pause([Partition(topic, 0)]) + with assert_changes( + consumer.tell, + {Partition(topic, 0): messages[3].next_offset}, + {Partition(topic, 0): messages[0].offset}, + ): + consumer.seek({Partition(topic, 0): messages[0].offset}) + assert consumer.poll(1.0) is None + consumer.resume([Partition(topic, 0)]) + + assert consumer.poll(5.0) == messages[0] - # We should pick up where we left off when we resume the partition. - with assert_changes(consumer.paused, [Partition(topic, 0)], []): - consumer.resume([Partition(topic, 0)]) - - assert consumer.poll(5.0) == messages[1] - - # Calling ``seek`` should have a side effect, even if no messages - # are consumed before calling ``pause``. - with assert_changes( - consumer.tell, - {Partition(topic, 0): messages[1].next_offset}, - {Partition(topic, 0): messages[3].offset}, - ): - consumer.seek({Partition(topic, 0): messages[3].offset}) - consumer.pause([Partition(topic, 0)]) - assert consumer.poll(1.0) is None - consumer.resume([Partition(topic, 0)]) - - assert consumer.poll(5.0) == messages[3] - - # It is still allowable to call ``seek`` on a paused partition. - # When consumption resumes, we would expect to see the side effect - # of that seek. + with assert_does_not_change(consumer.paused, []), pytest.raises(ConsumerError): + consumer.pause([Partition(topic, 0), Partition(topic, 1)]) + + with assert_changes(consumer.paused, [], [Partition(topic, 0)]): consumer.pause([Partition(topic, 0)]) - with assert_changes( - consumer.tell, - {Partition(topic, 0): messages[3].next_offset}, - {Partition(topic, 0): messages[0].offset}, - ): - consumer.seek({Partition(topic, 0): messages[0].offset}) - assert consumer.poll(1.0) is None - consumer.resume([Partition(topic, 0)]) - - assert consumer.poll(5.0) == messages[0] - - with assert_does_not_change(consumer.paused, []), pytest.raises( - ConsumerError - ): - consumer.pause([Partition(topic, 0), Partition(topic, 1)]) - - with assert_changes(consumer.paused, [], [Partition(topic, 0)]): - consumer.pause([Partition(topic, 0)]) - - with assert_does_not_change( - consumer.paused, [Partition(topic, 0)] - ), pytest.raises(ConsumerError): - consumer.resume([Partition(topic, 0), Partition(topic, 1)]) - - def test_pause_resume_rebalancing(self) -> None: + + with assert_does_not_change( + consumer.paused, [Partition(topic, 0)] + ), pytest.raises(ConsumerError): + consumer.resume([Partition(topic, 0), Partition(topic, 1)]) + + def test_pause_resume_rebalancing(self, with_stmt: WithStmt) -> None: payloads = self.get_payloads() consumer_a_on_assign = mock.Mock() @@ -404,112 +404,110 @@ def test_pause_resume_rebalancing(self) -> None: consumer_b_on_assign = mock.Mock() consumer_b_on_revoke = mock.Mock() - with self.get_topic(2) as topic, closing( - self.get_producer() - ) as producer, closing( - self.get_consumer("group", enable_end_of_partition=False) - ) as consumer_a, closing( - self.get_consumer("group", enable_end_of_partition=False) - ) as consumer_b: - messages = [ - producer.produce(Partition(topic, i), next(payloads)).result( - timeout=5.0 - ) - for i in [0, 1] + topic = with_stmt(self.get_topic(2)) + producer = with_stmt(closing(self.get_producer())) + consumer_a = with_stmt( + closing(self.get_consumer("group", enable_end_of_partition=False)) + ) + consumer_b = with_stmt( + closing(self.get_consumer("group", enable_end_of_partition=False)) + ) + + messages = [ + producer.produce(Partition(topic, i), next(payloads)).result(timeout=5.0) + for i in [0, 1] + ] + + def wait_until_rebalancing( + from_consumer: Consumer[Any], to_consumer: Consumer[Any] + ) -> None: + for _ in range(10): + assert from_consumer.poll(0) is None + if to_consumer.poll(1.0) is not None: + return + + raise RuntimeError("no rebalancing happened") + + consumer_a.subscribe( + [topic], on_assign=consumer_a_on_assign, on_revoke=consumer_a_on_revoke + ) + + # It doesn't really matter which message is fetched first -- we + # just want to know the assignment occurred. + assert consumer_a.poll(10.0) in messages # XXX: getting the subcription is slow + + assert len(consumer_a.tell()) == 2 + assert len(consumer_b.tell()) == 0 + + # Pause all partitions. + consumer_a.pause([Partition(topic, 0), Partition(topic, 1)]) + assert set(consumer_a.paused()) == set( + [Partition(topic, 0), Partition(topic, 1)] + ) + + consumer_b.subscribe( + [topic], on_assign=consumer_b_on_assign, on_revoke=consumer_b_on_revoke + ) + + wait_until_rebalancing(consumer_a, consumer_b) + + if self.cooperative_sticky: + # within incremental rebalancing, only one partition should have been reassigned to the consumer_b, and consumer_a should remain paused + assert consumer_a.paused() == [Partition(topic, 1)] + assert consumer_a.poll(10.0) is None + else: + # The first consumer should have had its offsets rolled back, as + # well as should have had it's partition resumed during + # rebalancing. + assert consumer_a.paused() == [] + assert consumer_a.poll(10.0) is not None + + assert len(consumer_a.tell()) == 1 + assert len(consumer_b.tell()) == 1 + + (consumer_a_partition,) = consumer_a.tell() + (consumer_b_partition,) = consumer_b.tell() + + # Pause consumer_a again. + consumer_a.pause(list(consumer_a.tell())) + # if we close consumer_a, consumer_b should get all partitions + producer.produce(next(iter(consumer_a.tell())), next(payloads)).result( + timeout=5.0 + ) + consumer_a.unsubscribe() + wait_until_rebalancing(consumer_a, consumer_b) + + assert len(consumer_b.tell()) == 2 + + if self.cooperative_sticky: + + assert consumer_a_on_assign.mock_calls == [ + mock.call({Partition(topic, 0): 0, Partition(topic, 1): 0}), + ] + assert consumer_a_on_revoke.mock_calls == [ + mock.call([Partition(topic, 0)]), + mock.call([Partition(topic, 1)]), ] - def wait_until_rebalancing( - from_consumer: Consumer[Any], to_consumer: Consumer[Any] - ) -> None: - for _ in range(10): - assert from_consumer.poll(0) is None - if to_consumer.poll(1.0) is not None: - return - - raise RuntimeError("no rebalancing happened") - - consumer_a.subscribe( - [topic], on_assign=consumer_a_on_assign, on_revoke=consumer_a_on_revoke - ) - - # It doesn't really matter which message is fetched first -- we - # just want to know the assignment occurred. - assert ( - consumer_a.poll(10.0) in messages - ) # XXX: getting the subcription is slow - - assert len(consumer_a.tell()) == 2 - assert len(consumer_b.tell()) == 0 - - # Pause all partitions. - consumer_a.pause([Partition(topic, 0), Partition(topic, 1)]) - assert set(consumer_a.paused()) == set( - [Partition(topic, 0), Partition(topic, 1)] - ) - - consumer_b.subscribe( - [topic], on_assign=consumer_b_on_assign, on_revoke=consumer_b_on_revoke - ) - - wait_until_rebalancing(consumer_a, consumer_b) + assert consumer_b_on_assign.mock_calls == [ + mock.call({Partition(topic, 0): 0}), + mock.call({Partition(topic, 1): 0}), + ] + assert consumer_b_on_revoke.mock_calls == [] + else: + assert consumer_a_on_assign.mock_calls == [ + mock.call({Partition(topic, 0): 0, Partition(topic, 1): 0}), + mock.call({consumer_a_partition: 0}), + ] + assert consumer_a_on_revoke.mock_calls == [ + mock.call([Partition(topic, 0), Partition(topic, 1)]), + mock.call([consumer_a_partition]), + ] - if self.cooperative_sticky: - # within incremental rebalancing, only one partition should have been reassigned to the consumer_b, and consumer_a should remain paused - assert consumer_a.paused() == [Partition(topic, 1)] - assert consumer_a.poll(10.0) is None - else: - # The first consumer should have had its offsets rolled back, as - # well as should have had it's partition resumed during - # rebalancing. - assert consumer_a.paused() == [] - assert consumer_a.poll(10.0) is not None - - assert len(consumer_a.tell()) == 1 - assert len(consumer_b.tell()) == 1 - - (consumer_a_partition,) = consumer_a.tell() - (consumer_b_partition,) = consumer_b.tell() - - # Pause consumer_a again. - consumer_a.pause(list(consumer_a.tell())) - # if we close consumer_a, consumer_b should get all partitions - producer.produce(next(iter(consumer_a.tell())), next(payloads)).result( - timeout=5.0 - ) - consumer_a.unsubscribe() - wait_until_rebalancing(consumer_a, consumer_b) - - assert len(consumer_b.tell()) == 2 - - if self.cooperative_sticky: - - assert consumer_a_on_assign.mock_calls == [ - mock.call({Partition(topic, 0): 0, Partition(topic, 1): 0}), - ] - assert consumer_a_on_revoke.mock_calls == [ - mock.call([Partition(topic, 0)]), - mock.call([Partition(topic, 1)]), - ] - - assert consumer_b_on_assign.mock_calls == [ - mock.call({Partition(topic, 0): 0}), - mock.call({Partition(topic, 1): 0}), - ] - assert consumer_b_on_revoke.mock_calls == [] - else: - assert consumer_a_on_assign.mock_calls == [ - mock.call({Partition(topic, 0): 0, Partition(topic, 1): 0}), - mock.call({consumer_a_partition: 0}), - ] - assert consumer_a_on_revoke.mock_calls == [ - mock.call([Partition(topic, 0), Partition(topic, 1)]), - mock.call([consumer_a_partition]), - ] - - assert consumer_b_on_assign.mock_calls == [ - mock.call({consumer_b_partition: 0}), - mock.call({Partition(topic, 0): 0, Partition(topic, 1): 0}), - ] - assert consumer_b_on_revoke.mock_calls == [ - mock.call([consumer_b_partition]) - ] + assert consumer_b_on_assign.mock_calls == [ + mock.call({consumer_b_partition: 0}), + mock.call({Partition(topic, 0): 0, Partition(topic, 1): 0}), + ] + assert consumer_b_on_revoke.mock_calls == [ + mock.call([consumer_b_partition]) + ] diff --git a/tests/backends/test_kafka.py b/tests/backends/test_kafka.py index 17364f86..f22f6fcd 100644 --- a/tests/backends/test_kafka.py +++ b/tests/backends/test_kafka.py @@ -2,11 +2,13 @@ import itertools import os import pickle +import random import time import uuid +from concurrent.futures import ThreadPoolExecutor from contextlib import closing from pickle import PickleBuffer -from typing import Any, Iterator, Mapping, MutableSequence, Optional +from typing import Any, Iterator, List, Mapping, MutableSequence, Optional from unittest import mock import pytest @@ -24,6 +26,7 @@ from arroyo.processing.processor import StreamProcessor from arroyo.processing.strategies.abstract import MessageRejected from arroyo.types import BrokerValue, Partition, Topic +from tests import WithStmt from tests.backends.mixins import StreamsTestMixin commit_codec = CommitCodec() @@ -205,65 +208,120 @@ def test_auto_offset_reset_error(self) -> None: with pytest.raises(ConsumerError): consumer.poll(10.0) # XXX: getting the subcription is slow - def test_consumer_stream_processor_shutdown(self) -> None: + def test_consumer_stream_processor_shutdown(self, with_stmt: WithStmt) -> None: strategy = mock.Mock() strategy.poll.side_effect = RuntimeError("goodbye") factory = mock.Mock() factory.create_with_partitions.return_value = strategy - with self.get_topic() as topic: - with closing(self.get_producer()) as producer: - producer.produce(topic, next(self.get_payloads())).result(5.0) + topic = with_stmt(self.get_topic()) + + producer = with_stmt(closing(self.get_producer())) + producer.produce(topic, next(self.get_payloads())).result(5.0) - with closing(self.get_consumer()) as consumer: - processor = StreamProcessor(consumer, topic, factory, IMMEDIATE) + consumer = with_stmt(closing(self.get_consumer())) + processor = StreamProcessor(consumer, topic, factory, IMMEDIATE) - with pytest.raises(RuntimeError): - processor.run() + with pytest.raises(RuntimeError): + processor.run() - def test_consumer_polls_when_paused(self) -> None: + def test_consumer_polls_when_paused(self, with_stmt: WithStmt) -> None: strategy = mock.Mock() factory = mock.Mock() factory.create_with_partitions.return_value = strategy poll_interval = 6000 - with self.get_topic() as topic: - with closing(self.get_producer()) as producer, closing( - self.get_consumer(max_poll_interval_ms=poll_interval) - ) as consumer: - producer.produce(topic, next(self.get_payloads())).result(5.0) + topic = with_stmt(self.get_topic()) + producer = with_stmt(closing(self.get_producer())) + consumer = with_stmt( + closing(self.get_consumer(max_poll_interval_ms=poll_interval)) + ) - processor = StreamProcessor(consumer, topic, factory, IMMEDIATE) + producer.produce(topic, next(self.get_payloads())).result(5.0) - # Wait for the consumer to subscribe and first message to be processed - for _ in range(1000): - processor._run_once() - if strategy.submit.call_count > 0: - break - time.sleep(0.1) + processor = StreamProcessor(consumer, topic, factory, IMMEDIATE) - assert strategy.submit.call_count == 1 + # Wait for the consumer to subscribe and first message to be processed + for _ in range(1000): + processor._run_once() + if strategy.submit.call_count > 0: + break + time.sleep(0.1) - # Now we start raising message rejected. the next produced message doesn't get processed - strategy.submit.side_effect = MessageRejected() + assert strategy.submit.call_count == 1 - producer.produce(topic, next(self.get_payloads())).result(5.0) - processor._run_once() - assert consumer.paused() == [] - # After ~5 seconds the consumer should be paused. On the next two calls to run_once it - # will pause itself, then poll the consumer. - time.sleep(5.0) - processor._run_once() - processor._run_once() - assert len(consumer.paused()) == 1 - - # Now we exceed the poll interval. After that we stop raising MessageRejected and - # the consumer unpauses itself. - time.sleep(2.0) - strategy.submit.side_effect = None - processor._run_once() - assert consumer.paused() == [] + # Now we start raising message rejected. the next produced message doesn't get processed + strategy.submit.side_effect = MessageRejected() + + producer.produce(topic, next(self.get_payloads())).result(5.0) + processor._run_once() + assert consumer.paused() == [] + # After ~5 seconds the consumer should be paused. On the next two calls to run_once it + # will pause itself, then poll the consumer. + time.sleep(5.0) + processor._run_once() + processor._run_once() + assert len(consumer.paused()) == 1 + + # Now we exceed the poll interval. After that we stop raising MessageRejected and + # the consumer unpauses itself. + time.sleep(2.0) + strategy.submit.side_effect = None + processor._run_once() + assert consumer.paused() == [] + + def test_rebalancing_fuzz(self, with_stmt: WithStmt) -> None: + num_consumers = 8 + msgs_per_consumer = 100 + total_timeout = 60 * 6 + + consumers = [ + self.get_consumer("group", enable_end_of_partition=False) + for _ in range(num_consumers) + ] + + topic = with_stmt(self.get_topic(num_consumers)) + + payloads = self.get_payloads() + + produced = [] + with closing(self.get_producer()) as producer: + for i in range(num_consumers * msgs_per_consumer): + msg = next(payloads) + producer.produce(Partition(topic, i % num_consumers), msg) + produced.append(msg) + + messages: List[BrokerValue[Any]] = [] + + def poll(consumer: KafkaConsumer) -> None: + die_after = time.time() * total_timeout + while len(messages) < len(produced) and time.time() < die_after: + + def on_revoke(partitions: Any) -> None: + consumer.commit_offsets() + + consumer.subscribe([topic], on_revoke=on_revoke) + + restart_after = time.time() + random.randrange(1, 30) + while time.time() < restart_after: + message = consumer.poll(5.0) + if message: + messages.append(message) + consumer.stage_offsets(message.committable) + + consumer.commit_offsets() + consumer.unsubscribe() + + with ThreadPoolExecutor(max_workers=num_consumers) as e: + for _ in e.map(poll, consumers, timeout=total_timeout * 2): + pass + + assert None not in messages + assert set(message.payload.value for message in messages) == { + str(i).encode() for i in range(num_consumers * msgs_per_consumer) + } + assert len(messages) == len(produced) class TestKafkaStreamsIncrementalRebalancing(TestKafkaStreams): diff --git a/tests/backends/test_local.py b/tests/backends/test_local.py index de31bb0f..68fc9d5b 100644 --- a/tests/backends/test_local.py +++ b/tests/backends/test_local.py @@ -4,7 +4,6 @@ from abc import abstractmethod from datetime import datetime from typing import Iterator, Optional -from unittest import TestCase import pytest @@ -19,11 +18,12 @@ from arroyo.backends.local.storages.memory import MemoryMessageStorage from arroyo.types import Partition, Topic from arroyo.utils.clock import MockedClock +from tests import WithStmt from tests.backends.mixins import StreamsTestMixin class LocalStreamsTestMixin(StreamsTestMixin[int]): - def setUp(self) -> None: + def setup(self) -> None: self.storage = self.get_message_storage() self.broker = LocalBroker(self.storage, MockedClock()) @@ -52,8 +52,8 @@ def get_payloads(self) -> Iterator[int]: return itertools.count() @pytest.mark.xfail(strict=True, reason="rebalancing not implemented") - def test_pause_resume_rebalancing(self) -> None: - return super().test_pause_resume_rebalancing() + def test_pause_resume_rebalancing(self, with_stmt: WithStmt) -> None: + return super().test_pause_resume_rebalancing(with_stmt) def test_storage(self) -> None: topic = Topic(uuid.uuid1().hex) @@ -95,6 +95,6 @@ def test_storage(self) -> None: self.storage.delete_topic(topic) -class LocalStreamsMemoryStorageTestCase(LocalStreamsTestMixin, TestCase): +class TestLocalStreamsMemoryStorage(LocalStreamsTestMixin): def get_message_storage(self) -> MessageStorage[int]: return MemoryMessageStorage() diff --git a/tests/conftest.py b/tests/conftest.py index 5905fa13..5e826f27 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ -from typing import Generator, Iterator, List +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Callable, Generator, Iterator, List import pytest @@ -9,6 +10,7 @@ from arroyo.types import TStrategyPayload from arroyo.utils.clock import MockedClock from arroyo.utils.metrics import configure_metrics +from tests import WithStmt from tests.metrics import TestingMetricsBackend @@ -40,3 +42,20 @@ def assert_no_internal_errors( for e in errors: raise e + + +@pytest.fixture +def with_stmt(request: pytest.FixtureRequest) -> WithStmt: + finalizers: List[Callable[[], Any]] = [] + + @request.addfinalizer + def _() -> None: + with ThreadPoolExecutor(10) as e: + e.map(lambda x: x(), finalizers, timeout=20) # type: ignore + + def inner(value: Any) -> Any: + rv = value.__enter__() + finalizers.append(lambda: value.__exit__(None, None, None)) + return rv + + return inner