-
Notifications
You must be signed in to change notification settings - Fork 1
Scrap
cavart28 edited this page Nov 11, 2021
·
1 revision
locked_deque = RWLockSortedDeque(((('plc', i), f'data_{i}') for i in range(initial_size)),
locked_deque = RWLockSortedDeque(((('plc', i), f'data_{i}') for i in range(initial_size)),
key=operator.itemgetter(0),
maxlen=maxlen)
maxlen=maxlen)
assert len(locked_deque) == initial_size
new_item = (('plc', initial_size + 1), 'new data')
with locked_deque.writer_lock() as writer:
with locked_deque.writer_lock() as writer:
writer.append(new_item)
writer.append(new_item)
assert len(locked_deque) == initial_size + 1
with locked_deque.reader_lock() as reader:
with locked_deque.reader_lock() as reader:
# looking for the first element greater than all of those in the initial deque
assert reader.find_gt(('plc', initial_size)) == new_item
assert reader.find_gt(('plc', initial_size)) == new_itemthe deque cannot exceed its maxlen
with locked_deque.writer_lock() as writer:
with locked_deque.writer_lock() as writer:
for i in range(maxlen):
new_item = (('plc', initial_size + 2 + i), 'new data')
writer.append(new_item)
assert len(locked_deque) == maxlen
assert len(locked_deque) == maxlenattempting to add another element, not larger than the previous one, will raise an error
with locked_deque.writer_lock() as writer:
with locked_deque.writer_lock() as writer:
try:
writer.append(new_item)
except Exception as e:
assert e == ValueError(f"Item key must be greater than last item key to append: {new_item[0]}")
try:
writer.append(new_item)
except Exception as e:
assert e == ValueError(f"Item key must be greater than last item key to append: {new_item[0]}")
except Exception as e:
assert e == ValueError(f"Item key must be greater than last item key to append: {new_item[0]}")
assert e == ValueError(f"Item key must be greater than last item key to append: {new_item[0]}")