@@ -172,6 +172,7 @@ def exception_handler(exc: BaseException) -> None:
172172 exception_handler = exception_handler )
173173 streams : Streams = {}
174174
175+ loop = asyncio .get_running_loop ()
175176 try :
176177 # Either use the existing object's queue, or create a new one together with the per-object job.
177178 # "Fire-and-forget": we do not wait for the result; the job destroys itself when it is fully done.
@@ -192,12 +193,21 @@ def exception_handler(exc: BaseException) -> None:
192193 if isinstance (raw_event , watching .Bookmark ):
193194 continue
194195
196+ # TODO: REMOVE: only for debugging!
197+ rv = raw_event .get ('object' , {}).get ('metadata' , {}).get ('resourceVersion' )
198+ fld = raw_event .get ('object' , {}).get ('spec' , {}).get ('field' )
199+ knd = raw_event .get ('object' , {}).get ('kind' )
200+ nam = raw_event .get ('object' , {}).get ('metadata' , {}).get ('name' )
201+ logger .debug (f'STREAM GOT { knd = } { nam = } { rv = } // { fld = } ' )
202+
195203 # Multiplex the raw events to per-resource workers/queues. Start the new ones if needed.
196204 key : ObjectRef = (resource , get_uid (raw_event ))
197205 try :
198206 # Feed the worker, as fast as possible, no extra activities.
199- streams [key ].pressure .set () # interrupt current sleeps, if any.
200- await streams [key ].backlog .put (raw_event )
207+ loop .call_later (3.0 , streams [key ].pressure .set )
208+ loop .call_later (3.0 , streams [key ].backlog .put_nowait , raw_event )
209+ # streams[key].pressure.set() # interrupt current sleeps, if any.
210+ # await streams[key].backlog.put(raw_event)
201211 except KeyError :
202212
203213 # Block the operator's readiness for individual resource's index handlers.
@@ -211,8 +221,10 @@ def exception_handler(exc: BaseException) -> None:
211221
212222 # Start the worker, and feed it initially. Starting can be moderately slow.
213223 streams [key ] = Stream (backlog = asyncio .Queue (), pressure = asyncio .Event ())
214- streams [key ].pressure .set () # interrupt current sleeps, if any.
215- await streams [key ].backlog .put (raw_event )
224+ # streams[key].pressure.set() # interrupt current sleeps, if any.
225+ # await streams[key].backlog.put(raw_event)
226+ loop .call_later (3.0 , streams [key ].pressure .set )
227+ loop .call_later (3.0 , streams [key ].backlog .put_nowait , raw_event )
216228 await scheduler .spawn (
217229 name = f'worker for { key } ' ,
218230 coro = worker (
@@ -316,6 +328,13 @@ async def worker(
316328 if isinstance (raw_event , EOS ):
317329 break # out of the worker.
318330
331+ # TODO: REMOVE: only for debugging!
332+ rv = raw_event .get ('object' , {}).get ('metadata' , {}).get ('resourceVersion' )
333+ fld = raw_event .get ('object' , {}).get ('spec' , {}).get ('field' )
334+ knd = raw_event .get ('object' , {}).get ('kind' )
335+ nam = raw_event .get ('object' , {}).get ('metadata' , {}).get ('name' )
336+ logger .debug (f'QUEUED GOT { knd = } { nam = } { rv = } exp={ expected_version !r} // { fld = } ' )
337+
319338 # Keep track of the resource's consistency for high-level (state-dependent) handlers.
320339 # See `settings.persistence.consistency_timeout` for the explanation of consistency.
321340 if expected_version is not None and expected_version == get_version (raw_event ):
0 commit comments