Skip to content

Commit 2b8ef5a

Browse files
author
Daniel Gellert
authored
fix: set missing loop param for service inits (#436)
1 parent c0fde84 commit 2b8ef5a

File tree

6 files changed

+22
-14
lines changed

6 files changed

+22
-14
lines changed

faust/app/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ def __init__(
466466
self._default_options = (id, options)
467467

468468
# The agent manager manages all agents.
469-
self.agents = AgentManager(self)
469+
self.agents = AgentManager(self, loop=loop)
470470

471471
# Sensors monitor Faust using a standard sensor API.
472472
self.sensors = SensorDelegate(self)
@@ -1791,7 +1791,7 @@ def _update_assignment(self, assigned: Set[TP]) -> Tuple[Set[TP], Set[TP]]:
17911791
return revoked, newly_assigned
17921792

17931793
def _new_producer(self) -> ProducerT:
1794-
return self.transport.create_producer(beacon=self.beacon)
1794+
return self.transport.create_producer(loop=self.loop, beacon=self.beacon)
17951795

17961796
def _new_consumer(self) -> ConsumerT:
17971797
return self.transport.create_consumer(

faust/transport/consumer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,7 @@ def __init__(
485485
self.not_waiting_next_records = Event()
486486
self.not_waiting_next_records.set()
487487
self._reset_state()
488-
super().__init__(**kwargs)
488+
super().__init__(loop=loop, **kwargs)
489489
self.transactions = self.transport.create_transaction_manager(
490490
consumer=self,
491491
producer=self.app.producer,

faust/transport/drivers/aiokafka.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1482,7 +1482,7 @@ async def _create_topic(
14821482
topic,
14831483
partitions,
14841484
replication,
1485-
loop=asyncio.get_event_loop_policy().get_event_loop(),
1485+
loop=self.loop,
14861486
**kwargs,
14871487
)
14881488
try:

faust/transport/producer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ def __init__(
144144
self.partitioner = conf.producer_partitioner
145145
api_version = self._api_version = conf.producer_api_version
146146
assert api_version is not None
147-
super().__init__(**kwargs)
147+
super().__init__(loop=loop, **kwargs)
148148
self.buffer = ProducerBuffer(loop=self.loop, beacon=self.beacon)
149149
if conf.producer_threaded:
150150
self.threaded_producer = self.create_threaded_producer()

tests/unit/app/test_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ def test_new_producer(self, *, app):
119119
autospec=Transport,
120120
)
121121
assert app._new_producer() is transport.create_producer.return_value
122-
transport.create_producer.assert_called_with(beacon=ANY)
122+
transport.create_producer.assert_called_with(loop=app.loop, beacon=ANY)
123123
assert app.producer is transport.create_producer.return_value
124124

125125
@pytest.mark.parametrize(

tests/unit/stores/test_aerospike.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,9 @@ async def test_get_aerospike_client_error(self, aero):
4545
client_mock.connect = MagicMock(side_effect=Exception)
4646
faust.stores.aerospike.aerospike_client = None
4747
config = {"k": "v"}
48-
with pytest.raises(Exception):
48+
with pytest.raises(Exception) as exc_info:
4949
AeroSpikeStore.get_aerospike_client(config)
50+
assert exc_info.type == Exception
5051

5152
@pytest.mark.asyncio
5253
async def test_get_aerospike_client_instantiated(self, aero):
@@ -90,8 +91,9 @@ def test_get_none_value(self, store):
9091

9192
def test_get_exception(self, store):
9293
store.client.get = MagicMock(side_effect=Exception)
93-
with pytest.raises(Exception):
94+
with pytest.raises(Exception) as exc_info:
9495
store._get(b"test_get")
96+
assert exc_info.type == Exception
9597

9698
def test_set_success(
9799
self,
@@ -119,8 +121,9 @@ def test_set_exception(self, store):
119121
store.client.put = MagicMock(side_effect=Exception)
120122
key = b"key"
121123
value = b"value"
122-
with pytest.raises(Exception):
124+
with pytest.raises(Exception) as exc_info:
123125
store._set(key, value)
126+
assert exc_info.type == Exception
124127

125128
def test_persisted_offset(self, store):
126129
return_value = store.persisted_offset(MagicMock())
@@ -136,15 +139,17 @@ def test_del_success(self, store):
136139
def test_del_exception(self, store):
137140
key = b"key"
138141
store.client.remove = MagicMock(side_effect=Exception)
139-
with pytest.raises(Exception):
142+
with pytest.raises(Exception) as exc_info:
140143
store._del(key)
144+
assert exc_info.type == Exception
141145

142146
def test_iterkeys_error(self, store):
143147
scan = MagicMock()
144148
store.client.scan = MagicMock(side_effect=Exception)
145149
scan.results = MagicMock(side_effect=Exception)
146-
with pytest.raises(Exception):
150+
with pytest.raises(Exception) as exc_info:
147151
list(store._iterkeys())
152+
assert exc_info.type == Exception
148153

149154
def test_iterkeys_success(self, store):
150155
scan = MagicMock()
@@ -181,13 +186,15 @@ def test_itervalues_success(self, store):
181186

182187
def test_itervalues_error(self, store):
183188
store.client.scan = MagicMock(side_effect=Exception)
184-
with pytest.raises(Exception):
189+
with pytest.raises(Exception) as exc_info:
185190
set(store._itervalues())
191+
assert exc_info.type == Exception
186192

187193
def test_iteritems_error(self, store):
188194
store.client.scan = MagicMock(side_effect=Exception)
189-
with pytest.raises(Exception):
195+
with pytest.raises(Exception) as exc_info:
190196
set(store._iteritems())
197+
assert exc_info.type == Exception
191198

192199
def test_iteritems_success(self, store):
193200
with patch("faust.stores.aerospike.aerospike", MagicMock()):
@@ -218,8 +225,9 @@ def test_iteritems_success(self, store):
218225
def test_contains_error(self, store):
219226
store.client.exists = MagicMock(side_effect=Exception)
220227
key = b"key"
221-
with pytest.raises(Exception):
228+
with pytest.raises(Exception) as exc_info:
222229
store._contains(key)
230+
assert exc_info.type == Exception
223231

224232
def test_contains_does_not_exist(self, store):
225233
store.client.exists = MagicMock(return_value=(None, None))

0 commit comments

Comments
 (0)