-
Notifications
You must be signed in to change notification settings - Fork 147
WIP: Identifying & resolving race-around conditions in MplexStream & YamuxStream #640
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
@kaneki003 : Awesome work. All CI/CD checks are passing. Please do add a test file and a news fragment. Please see if we need a similar feature in yamux too. We can add it in this PR itself. |
Thanks @seetadev ,i have already added test file as well as the newsfragment file for this PR. |
@paschal533 ,this is the PR ,would like to know if I'm going in right direction or not. |
Yes, you are going in the right direction. Well done and keep up with the good work 👏. Let's me know when you are done with Yamux as well so that I can review everything at once. |
@paschal533 and @acul71 , i'm done with Yamux as well.Kindly review the PR. |
Maybe is better that @paschal533 have a look at this. If not re-ping me |
Sure, Luca. I will look at it today and give my reviews by evening. |
Hi @kaneki003, Thanks for submitting this PR and tackling the concurrency issues in the stream multiplexers. I really appreciate the effort you’ve put into making py-libp2p more robust. Looking at this PR, I can see the motivation for adding locks to prevent concurrent access issues, but I have several concerns about the implementation that I'd like to discuss. I get why you’re adding locks to prevent data interleaving, it’s a legit concern in concurrent systems. But I’m a bit worried that this approach might be overkill and could slow things down, especially for Yamux, which is built for speed. Let’s dive into the details and figure out if we’re solving the right problem. Yamux Changes - Probably Not NeededThe Yamux implementation already has solid synchronization in place:
Adding read/write locks on top of this creates unnecessary serialization. Yamux is specifically designed for high-performance concurrent I/O, and serializing all reads/writes per stream would eliminate one of its key benefits. Mplex Changes - Makes More Sense, But Needs BackupMplex has simpler synchronization, so the locks here seem more reasonable. The Questions I'd love answers to:
Alternative suggestions:
My Take
Thanks again for the PR I’m excited to work with you on this. Let me know what you think or if you’ve got more context on the issues you’re seeing. Happy to brainstorm other solutions if needed |
Thanks a lot for your detailed feedback, @paschal533! You’ve covered almost all the critical points. To clarify my approach:
To move forward, I’m planning to test both Mplex and Yamux without Locks() to observe their behavior regarding data integrity, performance, and possible race conditions, and will share my findings here. This way, we can make a more informed decision about where locks are necessary and where they might be avoidable. I’ll update this thread with my findings soon! |
I’m glad we’re on the same page about digging deeper into this. Your plan to test Mplex and Yamux without locks to check data integrity, performance, and race conditions sounds perfect, that’ll give us solid data to work with. I appreciate you taking the time to clarify your approach. For Mplex, I agree it’s more likely to need some synchronization since its mechanisms are simpler. For Yamux, I’m curious to see what your tests show, given its existing flow control and stream management. Looking forward to your findings. Those should help us nail down whether locks are the right call or if we can lean on other solutions like buffer tweaks or app-level coordination. |
Hey @paschal533 , i have been testing the pytest tests/core/stream_muxer/test_yamux_read_write_lock.py -v
================================ test session starts ================================
platform linux -- Python 3.12.7, pytest-8.3.5, pluggy-1.5.0 -- /home/kaneki003/Open-Source/libp2p/py-libp2p/venv/bin/python3
cachedir: .pytest_cache
rootdir: /home/kaneki003/Open-Source/libp2p/py-libp2p
configfile: pyproject.toml
plugins: trio-0.8.0, anyio-1.4.0, xdist-3.6.1, Faker-37.1.0
collected 1 item
tests/core/stream_muxer/test_yamux_read_write_lock.py::test_yamux_race_condition_without_locks FAILED [100%]
===================================== FAILURES ======================================
______________________ test_yamux_race_condition_without_locks ______________________
+ Exception Group Traceback (most recent call last):
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/runner.py", line 341, in from_call
| result: TResult | None = func()
| ^^^^^^
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/runner.py", line 242, in <lambda>
| lambda: runtest_hook(item=item, **kwds), when=when, reraise=reraise
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_hooks.py", line 513, in __call__
| return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_manager.py", line 120, in _hookexec
| return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 182, in _multicall
| return outcome.get_result()
| ^^^^^^^^^^^^^^^^^^^^
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_result.py", line 100, in get_result
| raise exc.with_traceback(exc.__traceback__)
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 167, in _multicall
| teardown.throw(outcome._exception)
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/threadexception.py", line 92, in pytest_runtest_call
| yield from thread_exception_runtest_hook()
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/threadexception.py", line 68, in thread_exception_runtest_hook
| yield
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 167, in _multicall
| teardown.throw(outcome._exception)
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/unraisableexception.py", line 95, in pytest_runtest_call
| yield from unraisable_exception_runtest_hook()
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/unraisableexception.py", line 70, in unraisable_exception_runtest_hook
| yield
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 167, in _multicall
| teardown.throw(outcome._exception)
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/logging.py", line 846, in pytest_runtest_call
| yield from self._runtest_for(item, "call")
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/logging.py", line 829, in _runtest_for
| yield
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 167, in _multicall
| teardown.throw(outcome._exception)
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/capture.py", line 898, in pytest_runtest_call
| return (yield)
| ^^^^^
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 167, in _multicall
| teardown.throw(outcome._exception)
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/skipping.py", line 257, in pytest_runtest_call
| return (yield)
| ^^^^^
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 103, in _multicall
| res = hook_impl.function(*args)
| ^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/runner.py", line 174, in pytest_runtest_call
| item.runtest()
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/python.py", line 1627, in runtest
| self.ihook.pytest_pyfunc_call(pyfuncitem=self)
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_hooks.py", line 513, in __call__
| return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_manager.py", line 120, in _hookexec
| return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 139, in _multicall
| raise exception.with_traceback(exception.__traceback__)
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 103, in _multicall
| res = hook_impl.function(*args)
| ^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/python.py", line 159, in pytest_pyfunc_call
| result = testfunction(**testargs)
| ^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pytest_trio/plugin.py", line 348, in wrapper
| return run(partial(fn, **kwargs), clock=clock, instruments=instruments)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_core/_run.py", line 2435, in run
| raise runner.main_task_outcome.error
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pytest_trio/plugin.py", line 426, in _bootstrap_fixtures_and_run_test
| raise test_ctx.error_list[0]
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pytest_trio/plugin.py", line 193, in _fixture_manager
| async with trio.open_nursery() as nursery_fixture:
| ^^^^^^^^^^^^^^^^^^^
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_core/_run.py", line 1070, in __aexit__
| raise combined_error_from_nursery
| ExceptionGroup: Exceptions from Trio nursery (1 sub-exception)
+-+---------------- 1 ----------------
| Exception Group Traceback (most recent call last):
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pytest_trio/plugin.py", line 195, in _fixture_manager
| yield nursery_fixture
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pytest_trio/plugin.py", line 250, in run
| await self._func(**resolved_kwargs)
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/tests/core/stream_muxer/test_yamux_read_write_lock.py", line 148, in test_yamux_race_condition_without_locks
| async with trio.open_nursery() as nursery:
| ^^^^^^^^^^^^^^^^^^^
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_core/_run.py", line 1070, in __aexit__
| raise combined_error_from_nursery
| ExceptionGroup: Exceptions from Trio nursery (4 sub-exceptions)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/tests/core/stream_muxer/test_yamux_read_write_lock.py", line 139, in reader
| data = await stream.read(MSG_SIZE)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/libp2p/stream_muxer/yamux/yamux.py", line 218, in read
| return await self.conn.read_stream(self.stream_id, n)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/libp2p/stream_muxer/yamux/yamux.py", line 413, in read_stream
| raise MuxedStreamEOF("Connection shut down")
| libp2p.stream_muxer.exceptions.MuxedStreamEOF: Connection shut down
+---------------- 2 ----------------
| Traceback (most recent call last):
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/tests/core/stream_muxer/test_yamux_read_write_lock.py", line 139, in reader
| data = await stream.read(MSG_SIZE)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/libp2p/stream_muxer/yamux/yamux.py", line 218, in read
| return await self.conn.read_stream(self.stream_id, n)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/libp2p/stream_muxer/yamux/yamux.py", line 413, in read_stream
| raise MuxedStreamEOF("Connection shut down")
| libp2p.stream_muxer.exceptions.MuxedStreamEOF: Connection shut down
+---------------- 3 ----------------
| Traceback (most recent call last):
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/libp2p/stream_muxer/yamux/yamux.py", line 110, in write
| await trio.sleep(0.01)
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_timeouts.py", line 111, in sleep
| await sleep_until(trio.current_time() + seconds)
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_timeouts.py", line 92, in sleep_until
| await sleep_forever()
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_timeouts.py", line 71, in sleep_forever
| await trio.lowlevel.wait_task_rescheduled(lambda _: trio.lowlevel.Abort.SUCCEEDED)
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_core/_traps.py", line 208, in wait_task_rescheduled
| return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/outcome/_impl.py", line 213, in unwrap
| raise captured_error
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_core/_run.py", line 1579, in raise_cancel
| raise Cancelled._create()
| trio.Cancelled: Cancelled
|
| During handling of the above exception, another exception occurred:
|
| Traceback (most recent call last):
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/tests/core/stream_muxer/test_yamux_read_write_lock.py", line 131, in writer
| await stream.write(msg)
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/libp2p/stream_muxer/yamux/yamux.py", line 105, in write
| async with self.window_lock:
| ^^^^^^^^^^^^^^^^
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_sync.py", line 136, in __aexit__
| self.release()
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_sync.py", line 623, in release
| raise RuntimeError("can't release a Lock you don't own")
| RuntimeError: can't release a Lock you don't own
+---------------- 4 ----------------
| Traceback (most recent call last):
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/libp2p/stream_muxer/yamux/yamux.py", line 110, in write
| await trio.sleep(0.01)
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_timeouts.py", line 111, in sleep
| await sleep_until(trio.current_time() + seconds)
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_timeouts.py", line 92, in sleep_until
| await sleep_forever()
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_timeouts.py", line 71, in sleep_forever
| await trio.lowlevel.wait_task_rescheduled(lambda _: trio.lowlevel.Abort.SUCCEEDED)
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_core/_traps.py", line 208, in wait_task_rescheduled
| return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/outcome/_impl.py", line 213, in unwrap
| raise captured_error
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_core/_run.py", line 1579, in raise_cancel
| raise Cancelled._create()
| trio.Cancelled: Cancelled
|
| During handling of the above exception, another exception occurred:
|
| Traceback (most recent call last):
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/tests/core/stream_muxer/test_yamux_read_write_lock.py", line 131, in writer
| await stream.write(msg)
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/libp2p/stream_muxer/yamux/yamux.py", line 105, in write
| async with self.window_lock:
| ^^^^^^^^^^^^^^^^
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_sync.py", line 136, in __aexit__
| self.release()
| File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_sync.py", line 623, in release
| raise RuntimeError("can't release a Lock you don't own")
| RuntimeError: can't release a Lock you don't own
+------------------------------------
=============================== slowest 50 durations ================================
2.54s call tests/core/stream_muxer/test_yamux_read_write_lock.py::test_yamux_race_condition_without_locks
0.02s setup tests/core/stream_muxer/test_yamux_read_write_lock.py::test_yamux_race_condition_without_locks
0.00s teardown tests/core/stream_muxer/test_yamux_read_write_lock.py::test_yamux_race_condition_without_locks
============================== short test summary info ==============================
FAILED tests/core/stream_muxer/test_yamux_read_write_lock.py::test_yamux_race_condition_without_locks - ExceptionGroup: Exceptions from Trio nursery (1 sub-exception)
================================= 1 failed in 2.58s ================================= On digging where the break actually happened,the problem is when the size of send_window is set to 0,the writer waits for update of window size ,where the reader is required to send the async def send_window_update(self, increment: Optional[int] = None) -> None:
"""Send a window update to peer."""
if increment is None:
increment = DEFAULT_WINDOW_SIZE - self.recv_window
if increment <= 0:
return
async with self.window_lock:
self.recv_window += increment
header = struct.pack(
YAMUX_HEADER_FORMAT, 0, TYPE_WINDOW_UPDATE, 0, self.stream_id, increment
)
await self.conn.secured_conn.write(header) The problem is inside the I have committed the test I prepared and you can look into it. |
Hi @kaneki003, thanks for the update and the awesome test case. The new test in Proposed Fixes
async def read(self, n: Optional[int] = -1) -> bytes:
# Handle None value for n by converting it to -1
if n is None:
n = -1
# If the stream is closed for receiving and the buffer is empty, raise EOF
if self.recv_closed and not self.conn.stream_buffers.get(self.stream_id):
logging.debug(
f"Stream {self.stream_id}: Stream closed for receiving and buffer empty"
)
raise MuxedStreamEOF("Stream is closed for receiving")
# If reading until EOF (n == -1), block until stream is closed
if n == -1:
while not self.recv_closed and not self.conn.event_shutting_down.is_set():
# Check if there's data in the buffer
buffer = self.conn.stream_buffers.get(self.stream_id)
if buffer and len(buffer) > 0:
# Wait for closure even if data is available
logging.debug(
f"Stream {self.stream_id}:Waiting for FIN before returning data"
)
await self.conn.stream_events[self.stream_id].wait()
self.conn.stream_events[self.stream_id] = trio.Event()
else:
# No data, wait for data or closure
logging.debug(f"Stream {self.stream_id}: Waiting for data or FIN")
await self.conn.stream_events[self.stream_id].wait()
self.conn.stream_events[self.stream_id] = trio.Event()
# After loop, check if stream is closed or shutting down
async with self.conn.streams_lock:
if self.conn.event_shutting_down.is_set():
logging.debug(f"Stream {self.stream_id}: Connection shutting down")
raise MuxedStreamEOF("Connection shut down")
if self.closed:
if self.reset_received:
logging.debug(f"Stream {self.stream_id}: Stream was reset")
raise MuxedStreamReset("Stream was reset")
logging.debug(
f"Stream {self.stream_id}: Stream closed cleanly (EOF)"
)
raise MuxedStreamEOF("Stream closed cleanly (EOF)")
buffer = self.conn.stream_buffers.get(self.stream_id)
if buffer is None:
logging.debug(
f"Stream {self.stream_id}: Buffer gone, assuming closed"
)
raise MuxedStreamEOF("Stream buffer closed")
if self.recv_closed and len(buffer) == 0:
logging.debug(f"Stream {self.stream_id}: EOF reached")
raise MuxedStreamEOF("Stream is closed for receiving")
# Return all buffered data
data = bytes(buffer)
buffer.clear()
async with self.window_lock:
self.recv_window -= len(data)
if self.recv_window < DEFAULT_WINDOW_SIZE // 2:
await self.send_window_update()
logging.debug(f"Stream {self.stream_id}: Returning {len(data)} bytes")
return data
data = await self.conn.read_stream(self.stream_id, n)
async with self.window_lock:
self.recv_window -= len(data)
if self.recv_window < DEFAULT_WINDOW_SIZE // 2:
await self.send_window_update()
return data
async def send_window_update(self, increment: Optional[int] = None) -> None:
"""Send a window update to peer."""
async with self.window_lock:
if increment is None:
increment = DEFAULT_WINDOW_SIZE
self.recv_window = DEFAULT_WINDOW_SIZE
if increment <= 0:
logging.debug(f"Stream {self.stream_id}: Skipping window update (increment={increment})")
return
logging.debug(f"Stream {self.stream_id}: Sending window update with increment={increment}")
header = struct.pack(
YAMUX_HEADER_FORMAT, 0, TYPE_WINDOW_UPDATE, 0, self.stream_id, increment
)
await self.conn.secured_conn.write(header)
async def write(self, data: bytes) -> None:
if self.send_closed:
raise MuxedStreamError("Stream is closed for sending")
# Flow control: Check if we have enough send window
total_len = len(data)
sent = 0
while sent < total_len:
async with self.window_lock:
# Use a timeout to avoid lock release errors
with trio.move_on_after(1.0) as cancel_scope:
while self.send_window == 0 and not self.closed:
await trio.sleep(0.01)
if cancel_scope.cancelled_caught:
raise MuxedStreamError("Timed out waiting for window update")
# Wait for available window
while self.send_window == 0 and not self.closed:
# Release lock while waiting
self.window_lock.release()
await trio.sleep(0.01)
await self.window_lock.acquire()
if self.closed:
raise MuxedStreamError("Stream is closed")
# Calculate how much we can send now
to_send = min(self.send_window, total_len - sent)
chunk = data[sent : sent + to_send]
self.send_window -= to_send
# Send the data
header = struct.pack(
YAMUX_HEADER_FORMAT, 0, TYPE_DATA, 0, self.stream_id, len(chunk)
)
await self.conn.secured_conn.write(header + chunk)
sent += to_send
# If window is getting low, consider updating
if self.send_window < DEFAULT_WINDOW_SIZE // 2:
await self.send_window_update() Can you implement the above changes and re-run the test? The logging should help verify window updates are happening. If the test passes without Thanks for the great test, it’s really helping us nail this down. Let me know if you need help with the changes or want to debug together. |
Sure @paschal533 ,would be looking into the changes you proposed and make a commit.If the tests passes we can then concentrate then on Mplex stream. |
hey @paschal533 , can you correct me if I'm wrong in my understanding of |
Hi @kaneki003, You're right that when the sender sends data, it lands in the receiver's buffer When the receiver reads data via the read method, it clears out the buffer, freeing up space, just like you said. But the So, the main tweak to your understanding is that |
Thanks @paschal533 ,now it got it cleared.However, isn't it is incorrect that in write method we are sending |
That's a great catch @kaneki003. The window update in the In Yamux flow control, the receiver should send window updates to tell the sender "hey, I've read some data from my buffer, you can send me more" and the sender should only update its So the The current logic in Thanks for pointing this out. The window update logic should be moved to the |
Hey @paschal533 ,i have committed the changes.Kindly review them. |
Alright, well done @kaneki003. I will review them right away. |
Hi @kaneki003 Thanks for making these changes and tackling the window update issue. You’re on the right track, and I appreciate the effort to align with the Yamux spec. The timeout in the |
Thanks @paschal533 ,for the review. 1.Peer1 ------> Peer2 (12Kb data sent) This workflow was thought keeping in mind that the buffer will not be over-whelmed during high amount of message sending. However,i changed it after understanding from your comment about how it's supposed to work:
So,i shifted to above workflow,where the it's updated during read instead of buffer fill up. |
@kaneki003 and @paschal533 : Nice effort and great set of initiatives indeed. Appreciate it. Wish to share a feedback. It would be great if you could create a discussion thread for this PR and do these detailed discussions over there. This will help a number of initiatives around Yamux and Mplex. |
Okay @seetadev, I will create a discussion thread for this PR. @kaneki003 I will notify you once I do so. Let's continue the discussion there. |
Summary of changes made after last review:
Work to be done next:
CCing: @paschal533 , @seetadev for possible suggestions and feedback |
What was wrong?
Issue #639
How was it fixed:
read_lock
andwrite_lock
attributes to theMplexStream
class to prevent interleaving issues during concurrent reads and writes. Locks are used in theread
andwrite
methods to ensure serialized access.Documentation updates:
MplexStream
class, emphasizing thread-safe access and data integrity.Testing enhancements:
To-Do
Cute Animal Picture