Skip to content

Commit 051496e

Browse files
committed
WIP a repro with artificial inconsistency
Signed-off-by: Sergey Vasilyev <[email protected]>
1 parent b561141 commit 051496e

File tree

3 files changed

+105
-7
lines changed

3 files changed

+105
-7
lines changed

examples/01-minimal/example.py

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,83 @@
1+
import logging
12
import kopf
3+
import dataclasses
4+
5+
6+
# @kopf.on.login()
7+
# def delayed_k3s(**_):
8+
# conn = kopf.login_via_pykube(logger=logging.getLogger('xxx'))
9+
# if conn:
10+
# return dataclasses.replace(conn, server=conn.server.rsplit(':', 1)[0] + ':11223')
211

312

413
@kopf.on.create('kopfexamples')
5-
def create_fn(spec, **kwargs):
6-
print(f"And here we are! Creating: {spec}")
7-
return {'message': 'hello world'} # will be the new status
14+
@kopf.on.update('kopfexamples')
15+
def create_fn(meta, spec, reason, logger, **kwargs):
16+
rv = meta.get('resourceVersion')
17+
logger.warning(f">>> {rv=} And here we are! {reason=}: {spec}")
18+
19+
20+
# @kopf.on.create('kopfexamples')
21+
# def create_fn2(spec, **kwargs):
22+
# print(f"And here we are! Creating2: {spec}")
23+
24+
"""
25+
=======================================================================================================================
26+
Trigger with (delete the object first!):
27+
$ kubectl apply -f examples/obj.yaml && sleep 1 && kubectl patch -f examples/obj.yaml --type merge -p '{"spec": {"field": 2}}'
28+
=======================================================================================================================
29+
30+
The timeline of a misbehaving operator (with an artificial latency of 3 seconds):
31+
32+
/-- kubectl creates an object (state a=s0)
33+
| ... sleep 1s
34+
| /-- kubectl patches the spec.field with the patch "p1", creates state "b"=s0+p1
35+
| | /-- Kopf patches with annotations (state c=s0+p1+p2)
36+
| | | /-- Kopf patches with annotations (the same state d=s0+p1+p2+p3, d==c)
37+
↓ ↓ | |
38+
----+-//-aaaaabbbbbbbcccccdddddddddddddddddd--> which state is stored in kubernetes
39+
↓ ↓ ↑↓ ↑↓
40+
| | || |\----3s----\
41+
| | |\---+3s----\ |
42+
| \----3s+---\| | |
43+
\----3s----\| || | |
44+
↓↑ ↓↑ ↓ ↓
45+
----+-//------------aaaaabbbbbbbbcccccdddddd--> which state is seen by the operator
46+
↓ ↓↑ ↓↑ ↓ ↓
47+
| || || | \-- Kopf gets the state "d"=s0+p1+p2+p3, sees the annotations, goes idle.
48+
| || || \-- Kopf gets the state "c"=s0+p1+p2, sees the annotations, goes idle.
49+
| || ||
50+
| || |\-- Kopf reacts, executes handlers (2ND TIME), adds annotations with a patch (p3)
51+
| || \-- Kopf gets the state "b"=s0+p1 with NO annotations of "p2" yet.
52+
| || !BUG!: "c"=s0+p1+p2 is not seen yet, though "c"/"p2" exists by now!
53+
| ||
54+
| |\-- Kopf reacts, executes handlers (1ST TIME), adds annotations with a patch (p2)
55+
| \-- Kopf gets a watch-event (state a)
56+
\-- Kopf starts watching the resource
57+
58+
A fix with consistency tracking (with an artificial latency of 3 seconds):
59+
60+
/-- kubectl creates an object (state a=s0)
61+
| ... sleep 1s
62+
| /-- kubectl patches the spec.field with the patch "p1", creates state "b"=s0+p1
63+
| | /-- Kopf patches with annotations (state c=s0+p1+p2)
64+
↓ ↓ |
65+
----+-//-aaaaabbbbbbbccccccccccccccccccc-> which state is stored in kubernetes
66+
↓ ↓ ↑↓
67+
| | |\----3s----\
68+
| \----3s+---\ |
69+
\----3s----\| | |
70+
↓↑ ↓ ↓
71+
----+-//------------aaaaabbbbbbbbcccccc-> which state is seen by the operator
72+
↓ ↓↑⇶⇶⇶⇶⇶⇶⇶⇶⇶⇶⇶⇶↓ Kopf's own patch "p2" enables the consistency expectation for 5s OR version "c"
73+
| || | |
74+
| || | \-- Kopf gets a consistent state "c"=s0+p1+p2 as expected, thus goes idle.
75+
| || |
76+
| || \-- Kopf executes ONLY the low-level handlers over the state "b"=s0+p1.
77+
| || \~~~~~~⨳ inconsistency mode: wait until a new event (then discard it) OR timeout (then process it)
78+
| ||
79+
| |\-- Kopf reacts, executes handlers, adds annotations with a patch (p2)
80+
| \-- Kopf gets a watch-event (state a)
81+
\-- Kopf starts watching the resource
82+
83+
"""

kopf/_core/reactor/processing.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ async def process_resource_causes(
308308
# If the wait exceeds its time and no new consistent events arrive, then fake the consistency.
309309
# However, if a patch is accumulated by now, skip waiting and apply it instantly (by exiting).
310310
# In that case, we are guaranteed to be inconsistent, so also skip the state-dependent handlers.
311+
unslept: Optional[float] = None
311312
consistency_is_required = changing_cause is not None
312313
consistency_is_achieved = consistency_time is None # i.e. preexisting consistency
313314
if consistency_is_required and not consistency_is_achieved and not patch and consistency_time:
@@ -316,6 +317,8 @@ async def process_resource_causes(
316317
consistency_is_achieved = unslept is None # "woke up" vs. "timed out"
317318
if consistency_is_required and not consistency_is_achieved:
318319
changing_cause = None # exit to PATCHing and/or re-iterating over new events.
320+
rv = raw_event.get('object', {}).get('metadata', {}).get('resourceVersion')
321+
local_logger.debug(f'>>> {rv=} {consistency_is_required=} {consistency_is_achieved=} {unslept=} {changing_cause=}')
319322

320323
# Now, the consistency is either pre-proven (by receiving or not expecting any resource version)
321324
# or implied (by exceeding the allowed consistency-waiting timeout while getting no new events).

kopf/_core/reactor/queueing.py

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ def exception_handler(exc: BaseException) -> None:
172172
exception_handler=exception_handler)
173173
streams: Streams = {}
174174

175+
loop = asyncio.get_running_loop()
175176
try:
176177
# Either use the existing object's queue, or create a new one together with the per-object job.
177178
# "Fire-and-forget": we do not wait for the result; the job destroys itself when it is fully done.
@@ -192,12 +193,21 @@ def exception_handler(exc: BaseException) -> None:
192193
if isinstance(raw_event, watching.Bookmark):
193194
continue
194195

196+
# TODO: REMOVE: only for debugging!
197+
rv = raw_event.get('object', {}).get('metadata', {}).get('resourceVersion')
198+
fld = raw_event.get('object', {}).get('spec', {}).get('field')
199+
knd = raw_event.get('object', {}).get('kind')
200+
nam = raw_event.get('object', {}).get('metadata', {}).get('name')
201+
logger.debug(f'STREAM GOT {knd=} {nam=} {rv=} // {fld=} ')
202+
195203
# Multiplex the raw events to per-resource workers/queues. Start the new ones if needed.
196204
key: ObjectRef = (resource, get_uid(raw_event))
197205
try:
198206
# Feed the worker, as fast as possible, no extra activities.
199-
streams[key].pressure.set() # interrupt current sleeps, if any.
200-
await streams[key].backlog.put(raw_event)
207+
loop.call_later(3.0, streams[key].pressure.set)
208+
loop.call_later(3.0, streams[key].backlog.put_nowait, raw_event)
209+
# streams[key].pressure.set() # interrupt current sleeps, if any.
210+
# await streams[key].backlog.put(raw_event)
201211
except KeyError:
202212

203213
# Block the operator's readiness for individual resource's index handlers.
@@ -211,8 +221,10 @@ def exception_handler(exc: BaseException) -> None:
211221

212222
# Start the worker, and feed it initially. Starting can be moderately slow.
213223
streams[key] = Stream(backlog=asyncio.Queue(), pressure=asyncio.Event())
214-
streams[key].pressure.set() # interrupt current sleeps, if any.
215-
await streams[key].backlog.put(raw_event)
224+
# streams[key].pressure.set() # interrupt current sleeps, if any.
225+
# await streams[key].backlog.put(raw_event)
226+
loop.call_later(3.0, streams[key].pressure.set)
227+
loop.call_later(3.0, streams[key].backlog.put_nowait, raw_event)
216228
await scheduler.spawn(
217229
name=f'worker for {key}',
218230
coro=worker(
@@ -316,6 +328,13 @@ async def worker(
316328
if isinstance(raw_event, EOS):
317329
break # out of the worker.
318330

331+
# TODO: REMOVE: only for debugging!
332+
rv = raw_event.get('object', {}).get('metadata', {}).get('resourceVersion')
333+
fld = raw_event.get('object', {}).get('spec', {}).get('field')
334+
knd = raw_event.get('object', {}).get('kind')
335+
nam = raw_event.get('object', {}).get('metadata', {}).get('name')
336+
logger.debug(f'QUEUED GOT {knd=} {nam=} {rv=} exp={expected_version!r} // {fld=} ')
337+
319338
# Keep track of the resource's consistency for high-level (state-dependent) handlers.
320339
# See `settings.persistence.consistency_timeout` for the explanation of consistency.
321340
if expected_version is not None and expected_version == get_version(raw_event):

0 commit comments

Comments
 (0)