Skip to content

WIP: Identifying & resolving race-around conditions in MplexStream & YamuxStream #640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

kaneki003
Copy link
Contributor

@kaneki003 kaneki003 commented May 31, 2025

What was wrong?

Issue #639

How was it fixed:

  • Added read_lock and write_lock attributes to the MplexStream class to prevent interleaving issues during concurrent reads and writes. Locks are used in the read and write methods to ensure serialized access.

Documentation updates:

  • Added a feature note describing the addition of read and write locks to the MplexStream class, emphasizing thread-safe access and data integrity.

Testing enhancements:

  • Introduced new tests to verify that concurrent reads and writes are properly serialized, ensuring the locks behave as expected.

To-Do

  • Clean up commit history
  • Add or update documentation related to these changes
  • Add entry to the release notes

Cute Animal Picture

cute animal image

@kaneki003
Copy link
Contributor Author

Kindly review @seetadev and @acul71.

@seetadev
Copy link
Contributor

@kaneki003 : Awesome work. All CI/CD checks are passing. Please do add a test file and a news fragment.

Please see if we need a similar feature in yamux too. We can add it in this PR itself.

@kaneki003
Copy link
Contributor Author

Thanks @seetadev ,i have already added test file as well as the newsfragment file for this PR.
Regarding Yamux ,i'll look into it as well and we can cover that up in this PR only.

@kaneki003
Copy link
Contributor Author

@paschal533 ,this is the PR ,would like to know if I'm going in right direction or not.

@paschal533
Copy link
Contributor

@paschal533 ,this is the PR ,would like to know if I'm going in right direction or not.

Yes, you are going in the right direction. Well done and keep up with the good work 👏. Let's me know when you are done with Yamux as well so that I can review everything at once.

@kaneki003
Copy link
Contributor Author

@paschal533 and @acul71 , i'm done with Yamux as well.Kindly review the PR.

@acul71
Copy link
Contributor

acul71 commented Jun 1, 2025

Maybe is better that @paschal533 have a look at this. If not re-ping me

@paschal533
Copy link
Contributor

Maybe is better that @paschal533 have a look at this. If not re-ping me

Sure, Luca. I will look at it today and give my reviews by evening.

@paschal533
Copy link
Contributor

Hi @kaneki003, Thanks for submitting this PR and tackling the concurrency issues in the stream multiplexers. I really appreciate the effort you’ve put into making py-libp2p more robust. Looking at this PR, I can see the motivation for adding locks to prevent concurrent access issues, but I have several concerns about the implementation that I'd like to discuss. I get why you’re adding locks to prevent data interleaving, it’s a legit concern in concurrent systems. But I’m a bit worried that this approach might be overkill and could slow things down, especially for Yamux, which is built for speed. Let’s dive into the details and figure out if we’re solving the right problem.

Yamux Changes - Probably Not Needed

The Yamux implementation already has solid synchronization in place:

  • window_lock handles flow control nicely.
  • streams_lock in the parent Yamux class keeps stream management safe.
  • The protocol itself is designed to handle concurrent I/O without issues.

Adding read/write locks on top of this creates unnecessary serialization. Yamux is specifically designed for high-performance concurrent I/O, and serializing all reads/writes per stream would eliminate one of its key benefits.

Mplex Changes - Makes More Sense, But Needs Backup

Mplex has simpler synchronization, so the locks here seem more reasonable. The trio.MemoryReceiveChannel could definitely benefit from explicit locking in some cases. That said, I’d love to see more details on why this is necessary to make sure it’s worth the performance trade-off.

Questions I'd love answers to:

  • What specific interleaving scenarios are you seeing? Can you provide a concrete example or test case that demonstrates data corruption without these locks?
  • Have you benchmarked the performance impact? These locks will serialize all I/O on each stream, which could significantly hurt performance under load.
  • Could this be solved at the application level? If specific applications need serialized access, they could coordinate that themselves rather than forcing it on all users.

Alternative suggestions:

  • Improve buffer management and event coordination instead of blanket locking
  • Provide optional wrapper classes for applications that need serialized access

My Take

  • Yamux: I’d lean toward skipping these changes. The existing synchronization seems solid, and the performance hit from extra locks could be a dealbreaker. Let’s confirm if there’s a real issue first.
  • Mplex: I’m more open to these changes, but I’d need to see a clear use case, a failing test, or benchmarks showing minimal performance impact.
  • Next Steps: Could you share some benchmark data? That’d help a ton in figuring out the best fix.

Thanks again for the PR I’m excited to work with you on this. Let me know what you think or if you’ve got more context on the issues you’re seeing. Happy to brainstorm other solutions if needed

@kaneki003
Copy link
Contributor Author

kaneki003 commented Jun 3, 2025

Thanks a lot for your detailed feedback, @paschal533! You’ve covered almost all the critical points.

To clarify my approach:

  • For Mplex, I agree that adding locks is important since there’s no other synchronization mechanism in place.
  • For Yamux, I think we should weigh the trade-offs between increased complexity and ensuring data integrity.

To move forward, I’m planning to test both Mplex and Yamux without Locks() to observe their behavior regarding data integrity, performance, and possible race conditions, and will share my findings here. This way, we can make a more informed decision about where locks are necessary and where they might be avoidable.

I’ll update this thread with my findings soon!

@kaneki003 kaneki003 marked this pull request as draft June 3, 2025 09:03
@paschal533
Copy link
Contributor

Thanks a lot for your detailed feedback, @paschal533! You’ve covered almost all the critical points.

To clarify my approach:

  • For Mplex, I agree that adding locks is important since there’s no other synchronization mechanism in place.
  • For Yamux, I think we should weigh the trade-offs between increased complexity and ensuring data integrity.

To move forward, I’m planning to test both Mplex and Yamux without Locks() to observe their behavior regarding data integrity, performance, and possible race conditions, and will share my findings here. This way, we can make a more informed decision about where locks are necessary and where they might be avoidable.

I’ll update this thread with my findings soon!

I’m glad we’re on the same page about digging deeper into this. Your plan to test Mplex and Yamux without locks to check data integrity, performance, and race conditions sounds perfect, that’ll give us solid data to work with. I appreciate you taking the time to clarify your approach.

For Mplex, I agree it’s more likely to need some synchronization since its mechanisms are simpler. For Yamux, I’m curious to see what your tests show, given its existing flow control and stream management. Looking forward to your findings. Those should help us nail down whether locks are the right call or if we can lean on other solutions like buffer tweaks or app-level coordination.

@kaneki003
Copy link
Contributor Author

Hey @paschal533 , i have been testing the Yamux implementation and i found a logical error in the stream.recv_window and stream.send_window.
The test was working fine till now ,but when I tweaked the amount of bytes being sent,once over the DEFAULT_WINDOW_SIZE,the things begin to break.

 pytest tests/core/stream_muxer/test_yamux_read_write_lock.py -v
================================ test session starts ================================
platform linux -- Python 3.12.7, pytest-8.3.5, pluggy-1.5.0 -- /home/kaneki003/Open-Source/libp2p/py-libp2p/venv/bin/python3
cachedir: .pytest_cache
rootdir: /home/kaneki003/Open-Source/libp2p/py-libp2p
configfile: pyproject.toml
plugins: trio-0.8.0, anyio-1.4.0, xdist-3.6.1, Faker-37.1.0
collected 1 item                                                                    

tests/core/stream_muxer/test_yamux_read_write_lock.py::test_yamux_race_condition_without_locks FAILED [100%]

===================================== FAILURES ======================================
______________________ test_yamux_race_condition_without_locks ______________________
  + Exception Group Traceback (most recent call last):
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/runner.py", line 341, in from_call
  |     result: TResult | None = func()
  |                              ^^^^^^
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/runner.py", line 242, in <lambda>
  |     lambda: runtest_hook(item=item, **kwds), when=when, reraise=reraise
  |             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_hooks.py", line 513, in __call__
  |     return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_manager.py", line 120, in _hookexec
  |     return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 182, in _multicall
  |     return outcome.get_result()
  |            ^^^^^^^^^^^^^^^^^^^^
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_result.py", line 100, in get_result
  |     raise exc.with_traceback(exc.__traceback__)
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 167, in _multicall
  |     teardown.throw(outcome._exception)
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/threadexception.py", line 92, in pytest_runtest_call
  |     yield from thread_exception_runtest_hook()
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/threadexception.py", line 68, in thread_exception_runtest_hook
  |     yield
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 167, in _multicall
  |     teardown.throw(outcome._exception)
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/unraisableexception.py", line 95, in pytest_runtest_call
  |     yield from unraisable_exception_runtest_hook()
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/unraisableexception.py", line 70, in unraisable_exception_runtest_hook
  |     yield
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 167, in _multicall
  |     teardown.throw(outcome._exception)
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/logging.py", line 846, in pytest_runtest_call
  |     yield from self._runtest_for(item, "call")
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/logging.py", line 829, in _runtest_for
  |     yield
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 167, in _multicall
  |     teardown.throw(outcome._exception)
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/capture.py", line 898, in pytest_runtest_call
  |     return (yield)
  |             ^^^^^
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 167, in _multicall
  |     teardown.throw(outcome._exception)
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/skipping.py", line 257, in pytest_runtest_call
  |     return (yield)
  |             ^^^^^
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 103, in _multicall
  |     res = hook_impl.function(*args)
  |           ^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/runner.py", line 174, in pytest_runtest_call
  |     item.runtest()
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/python.py", line 1627, in runtest
  |     self.ihook.pytest_pyfunc_call(pyfuncitem=self)
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_hooks.py", line 513, in __call__
  |     return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_manager.py", line 120, in _hookexec
  |     return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 139, in _multicall
  |     raise exception.with_traceback(exception.__traceback__)
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pluggy/_callers.py", line 103, in _multicall
  |     res = hook_impl.function(*args)
  |           ^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/_pytest/python.py", line 159, in pytest_pyfunc_call
  |     result = testfunction(**testargs)
  |              ^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pytest_trio/plugin.py", line 348, in wrapper
  |     return run(partial(fn, **kwargs), clock=clock, instruments=instruments)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_core/_run.py", line 2435, in run
  |     raise runner.main_task_outcome.error
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pytest_trio/plugin.py", line 426, in _bootstrap_fixtures_and_run_test
  |     raise test_ctx.error_list[0]
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pytest_trio/plugin.py", line 193, in _fixture_manager
  |     async with trio.open_nursery() as nursery_fixture:
  |                ^^^^^^^^^^^^^^^^^^^
  |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_core/_run.py", line 1070, in __aexit__
  |     raise combined_error_from_nursery
  | ExceptionGroup: Exceptions from Trio nursery (1 sub-exception)
  +-+---------------- 1 ----------------
    | Exception Group Traceback (most recent call last):
    |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pytest_trio/plugin.py", line 195, in _fixture_manager
    |     yield nursery_fixture
    |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/pytest_trio/plugin.py", line 250, in run
    |     await self._func(**resolved_kwargs)
    |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/tests/core/stream_muxer/test_yamux_read_write_lock.py", line 148, in test_yamux_race_condition_without_locks
    |     async with trio.open_nursery() as nursery:
    |                ^^^^^^^^^^^^^^^^^^^
    |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_core/_run.py", line 1070, in __aexit__
    |     raise combined_error_from_nursery
    | ExceptionGroup: Exceptions from Trio nursery (4 sub-exceptions)
    +-+---------------- 1 ----------------
      | Traceback (most recent call last):
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/tests/core/stream_muxer/test_yamux_read_write_lock.py", line 139, in reader
      |     data = await stream.read(MSG_SIZE)
      |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/libp2p/stream_muxer/yamux/yamux.py", line 218, in read
      |     return await self.conn.read_stream(self.stream_id, n)
      |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/libp2p/stream_muxer/yamux/yamux.py", line 413, in read_stream
      |     raise MuxedStreamEOF("Connection shut down")
      | libp2p.stream_muxer.exceptions.MuxedStreamEOF: Connection shut down
      +---------------- 2 ----------------
      | Traceback (most recent call last):
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/tests/core/stream_muxer/test_yamux_read_write_lock.py", line 139, in reader
      |     data = await stream.read(MSG_SIZE)
      |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/libp2p/stream_muxer/yamux/yamux.py", line 218, in read
      |     return await self.conn.read_stream(self.stream_id, n)
      |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/libp2p/stream_muxer/yamux/yamux.py", line 413, in read_stream
      |     raise MuxedStreamEOF("Connection shut down")
      | libp2p.stream_muxer.exceptions.MuxedStreamEOF: Connection shut down
      +---------------- 3 ----------------
      | Traceback (most recent call last):
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/libp2p/stream_muxer/yamux/yamux.py", line 110, in write
      |     await trio.sleep(0.01)
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_timeouts.py", line 111, in sleep
      |     await sleep_until(trio.current_time() + seconds)
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_timeouts.py", line 92, in sleep_until
      |     await sleep_forever()
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_timeouts.py", line 71, in sleep_forever
      |     await trio.lowlevel.wait_task_rescheduled(lambda _: trio.lowlevel.Abort.SUCCEEDED)
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_core/_traps.py", line 208, in wait_task_rescheduled
      |     return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
      |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/outcome/_impl.py", line 213, in unwrap
      |     raise captured_error
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_core/_run.py", line 1579, in raise_cancel
      |     raise Cancelled._create()
      | trio.Cancelled: Cancelled
      | 
      | During handling of the above exception, another exception occurred:
      | 
      | Traceback (most recent call last):
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/tests/core/stream_muxer/test_yamux_read_write_lock.py", line 131, in writer
      |     await stream.write(msg)
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/libp2p/stream_muxer/yamux/yamux.py", line 105, in write
      |     async with self.window_lock:
      |                ^^^^^^^^^^^^^^^^
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_sync.py", line 136, in __aexit__
      |     self.release()
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_sync.py", line 623, in release
      |     raise RuntimeError("can't release a Lock you don't own")
      | RuntimeError: can't release a Lock you don't own
      +---------------- 4 ----------------
      | Traceback (most recent call last):
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/libp2p/stream_muxer/yamux/yamux.py", line 110, in write
      |     await trio.sleep(0.01)
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_timeouts.py", line 111, in sleep
      |     await sleep_until(trio.current_time() + seconds)
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_timeouts.py", line 92, in sleep_until
      |     await sleep_forever()
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_timeouts.py", line 71, in sleep_forever
      |     await trio.lowlevel.wait_task_rescheduled(lambda _: trio.lowlevel.Abort.SUCCEEDED)
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_core/_traps.py", line 208, in wait_task_rescheduled
      |     return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
      |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/outcome/_impl.py", line 213, in unwrap
      |     raise captured_error
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_core/_run.py", line 1579, in raise_cancel
      |     raise Cancelled._create()
      | trio.Cancelled: Cancelled
      | 
      | During handling of the above exception, another exception occurred:
      | 
      | Traceback (most recent call last):
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/tests/core/stream_muxer/test_yamux_read_write_lock.py", line 131, in writer
      |     await stream.write(msg)
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/libp2p/stream_muxer/yamux/yamux.py", line 105, in write
      |     async with self.window_lock:
      |                ^^^^^^^^^^^^^^^^
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_sync.py", line 136, in __aexit__
      |     self.release()
      |   File "/home/kaneki003/Open-Source/libp2p/py-libp2p/venv/lib/python3.12/site-packages/trio/_sync.py", line 623, in release
      |     raise RuntimeError("can't release a Lock you don't own")
      | RuntimeError: can't release a Lock you don't own
      +------------------------------------
=============================== slowest 50 durations ================================
2.54s call     tests/core/stream_muxer/test_yamux_read_write_lock.py::test_yamux_race_condition_without_locks
0.02s setup    tests/core/stream_muxer/test_yamux_read_write_lock.py::test_yamux_race_condition_without_locks
0.00s teardown tests/core/stream_muxer/test_yamux_read_write_lock.py::test_yamux_race_condition_without_locks
============================== short test summary info ==============================
FAILED tests/core/stream_muxer/test_yamux_read_write_lock.py::test_yamux_race_condition_without_locks - ExceptionGroup: Exceptions from Trio nursery (1 sub-exception)
================================= 1 failed in 2.58s =================================

On digging where the break actually happened,the problem is when the size of send_window is set to 0,the writer waits for update of window size ,where the reader is required to send the send_window_update function,which will increase the send_window size .

async def send_window_update(self, increment: Optional[int] = None) -> None:
        """Send a window update to peer."""
        if increment is None:
            increment = DEFAULT_WINDOW_SIZE - self.recv_window

        if increment <= 0:
            return

        async with self.window_lock:
            self.recv_window += increment
            header = struct.pack(
                YAMUX_HEADER_FORMAT, 0, TYPE_WINDOW_UPDATE, 0, self.stream_id, increment
            )
            await self.conn.secured_conn.write(header)

The problem is inside the update_window_size() function,where the automatic increment calculation will always result in 0,resulting in no increase in size of send_window() on writer size and it gets stuck inside waiting for window_size updation in between.

I have committed the test I prepared and you can look into it.

@paschal533
Copy link
Contributor

paschal533 commented Jun 8, 2025

Hi @kaneki003, thanks for the update and the awesome test case. The new test in test_yamux_read_write_lock.py is fantastic, it really highlights the flow control bug when sending 256 KiB messages. Removing the read_lock and write_lock was a great move to isolate the issue, and your analysis of the send_window_update problem is spot-on. The test failure shows we’re hitting a deadlock because recv_window isn’t being updated, causing the increment to be zero.

Proposed Fixes

  1. Update recv_window in read: Decrement recv_window when data is read and trigger window updates:
async def read(self, n: Optional[int] = -1) -> bytes:
        # Handle None value for n by converting it to -1
        if n is None:
            n = -1

        # If the stream is closed for receiving and the buffer is empty, raise EOF
        if self.recv_closed and not self.conn.stream_buffers.get(self.stream_id):
            logging.debug(
                f"Stream {self.stream_id}: Stream closed for receiving and buffer empty"
            )
            raise MuxedStreamEOF("Stream is closed for receiving")

        # If reading until EOF (n == -1), block until stream is closed
        if n == -1:
            while not self.recv_closed and not self.conn.event_shutting_down.is_set():
                # Check if there's data in the buffer
                buffer = self.conn.stream_buffers.get(self.stream_id)
                if buffer and len(buffer) > 0:
                    # Wait for closure even if data is available
                    logging.debug(
                        f"Stream {self.stream_id}:Waiting for FIN before returning data"
                    )
                    await self.conn.stream_events[self.stream_id].wait()
                    self.conn.stream_events[self.stream_id] = trio.Event()
                else:
                    # No data, wait for data or closure
                    logging.debug(f"Stream {self.stream_id}: Waiting for data or FIN")
                    await self.conn.stream_events[self.stream_id].wait()
                    self.conn.stream_events[self.stream_id] = trio.Event()

            # After loop, check if stream is closed or shutting down
            async with self.conn.streams_lock:
                if self.conn.event_shutting_down.is_set():
                    logging.debug(f"Stream {self.stream_id}: Connection shutting down")
                    raise MuxedStreamEOF("Connection shut down")
                if self.closed:
                    if self.reset_received:
                        logging.debug(f"Stream {self.stream_id}: Stream was reset")
                        raise MuxedStreamReset("Stream was reset")
                    logging.debug(
                        f"Stream {self.stream_id}: Stream closed cleanly (EOF)"
                    )
                    raise MuxedStreamEOF("Stream closed cleanly (EOF)")
                buffer = self.conn.stream_buffers.get(self.stream_id)
                if buffer is None:
                    logging.debug(
                        f"Stream {self.stream_id}: Buffer gone, assuming closed"
                    )
                    raise MuxedStreamEOF("Stream buffer closed")
                if self.recv_closed and len(buffer) == 0:
                    logging.debug(f"Stream {self.stream_id}: EOF reached")
                    raise MuxedStreamEOF("Stream is closed for receiving")
                # Return all buffered data
                data = bytes(buffer)
                buffer.clear()
                async with self.window_lock:
                    self.recv_window -= len(data)
                    if self.recv_window < DEFAULT_WINDOW_SIZE // 2:
                        await self.send_window_update()
                logging.debug(f"Stream {self.stream_id}: Returning {len(data)} bytes")
                return data
        data = await self.conn.read_stream(self.stream_id, n)
        async with self.window_lock:
            self.recv_window -= len(data)
            if self.recv_window < DEFAULT_WINDOW_SIZE // 2:
                await self.send_window_update()
        return data
  1. Fix send_window_update: Reset recv_window to ensure a positive increment:
async def send_window_update(self, increment: Optional[int] = None) -> None:
    """Send a window update to peer."""
    async with self.window_lock:
        if increment is None:
            increment = DEFAULT_WINDOW_SIZE
            self.recv_window = DEFAULT_WINDOW_SIZE
        if increment <= 0:
            logging.debug(f"Stream {self.stream_id}: Skipping window update (increment={increment})")
            return
        logging.debug(f"Stream {self.stream_id}: Sending window update with increment={increment}")
        header = struct.pack(
            YAMUX_HEADER_FORMAT, 0, TYPE_WINDOW_UPDATE, 0, self.stream_id, increment
        )
        await self.conn.secured_conn.write(header)
  1. Fix write Lock Issue: Use a timeout to avoid lock release errors:
async def write(self, data: bytes) -> None:
        if self.send_closed:
            raise MuxedStreamError("Stream is closed for sending")

        # Flow control: Check if we have enough send window
        total_len = len(data)
        sent = 0

        while sent < total_len:
            async with self.window_lock:
                # Use a timeout to avoid lock release errors
                with trio.move_on_after(1.0) as cancel_scope:
                    while self.send_window == 0 and not self.closed:
                        await trio.sleep(0.01)
                if cancel_scope.cancelled_caught:
                    raise MuxedStreamError("Timed out waiting for window update")

                # Wait for available window
                while self.send_window == 0 and not self.closed:
                    # Release lock while waiting
                    self.window_lock.release()
                    await trio.sleep(0.01)
                    await self.window_lock.acquire()

                if self.closed:
                    raise MuxedStreamError("Stream is closed")

                # Calculate how much we can send now
                to_send = min(self.send_window, total_len - sent)
                chunk = data[sent : sent + to_send]
                self.send_window -= to_send

                # Send the data
                header = struct.pack(
                    YAMUX_HEADER_FORMAT, 0, TYPE_DATA, 0, self.stream_id, len(chunk)
                )
                await self.conn.secured_conn.write(header + chunk)
                sent += to_send

        # If window is getting low, consider updating
        if self.send_window < DEFAULT_WINDOW_SIZE // 2:
            await self.send_window_update()

Can you implement the above changes and re-run the test? The logging should help verify window updates are happening. If the test passes without read_lock and write_lock, we might not need them for Yamux, keeping it fast. also run the test with varying MSG_SIZE (e.g., 1 KiB, 256 KiB, 512 KiB) and measure throughput to ensure no regressions.

Thanks for the great test, it’s really helping us nail this down. Let me know if you need help with the changes or want to debug together.

@kaneki003
Copy link
Contributor Author

Sure @paschal533 ,would be looking into the changes you proposed and make a commit.If the tests passes we can then concentrate then on Mplex stream.

@kaneki003 kaneki003 changed the title TODO: Added locks for read & write operations in MplexStream WIP: Identifying & resolving race-around conditions in MplexStream & YamuxStream Jun 9, 2025
@kaneki003
Copy link
Contributor Author

hey @paschal533 , can you correct me if I'm wrong in my understanding of recv_window. When the receiver peer receives data from sender,then the data is initially sent to it's buffer ,due to which the buffer_size depletes corresponding to depletion in recv_window,and when data is read by receiver from buffer,the recv_window replenishes.

@paschal533
Copy link
Contributor

Hi @kaneki003, You're right that when the sender sends data, it lands in the receiver's buffer (self.conn.stream_buffers[stream_id]). As the buffer fills up, it does take up space, which ties into flow control. However, the recv_window in our code doesn’t directly shrink as the buffer fills. Instead, it’s a value we advertise to the sender to say how much data we’re ready to accept. The sender uses its send_window to track this, reducing it as it sends data within our recv_window.

When the receiver reads data via the read method, it clears out the buffer, freeing up space, just like you said. But the recv_window doesn’t automatically get replenished when we read from the buffer. We have to explicitly send a window update (through send_window_update) to bump up recv_window and let the sender know it can send more data by increasing its send_window. In our implementation, this usually happens in the write method when the sender’s send_window gets low (below half the default size), or if we manually call send_window_update.

So, the main tweak to your understanding is that recv_window replenishment isn’t automatic when we read from the buffer, it’s triggered by sending a window update message. Hope that clears things up?

@kaneki003
Copy link
Contributor Author

Thanks @paschal533 ,now it got it cleared.However, isn't it is incorrect that in write method we are sending window_update?Since in Yamux, the receiver is responsible for sending window_updates and not the sender.
So shouldn't it be in read function only?

@paschal533
Copy link
Contributor

That's a great catch @kaneki003. The window update in the write method is indeed incorrect, that's the sender trying to update its own receive window, which doesn't make sense in the flow control model.

In Yamux flow control, the receiver should send window updates to tell the sender "hey, I've read some data from my buffer, you can send me more" and the sender should only update its send_window when it receives these window updates from the receiver

So the send_window_update() call should be in the read method, not the write method. When we read data from the buffer and free up space, that's when we should notify the peer that we can accept more data.

The current logic in write where we call send_window_update() when send_window gets low is backwards, we're essentially telling the peer to send us more data when we're the ones trying to send data to them.

Thanks for pointing this out. The window update logic should be moved to the read method where it actually makes sense in the context of Yamux flow control. Make the changes and ping me for review. well done 👌👏

@kaneki003
Copy link
Contributor Author

kaneki003 commented Jun 10, 2025

Hey @paschal533 ,i have committed the changes.Kindly review them.
The test is now passing upto way higher then DEFAULT_WINDOW_SIZE.
Regarding CI/CD failures,would be resolving them(possibly due to strict-type check introduced after the merge).
Also,squashed all commits earlier into single one to make the review easier.

@paschal533
Copy link
Contributor

Hey @paschal533 ,i have committed the changes.Kindly review them. The test is now passing upto way higher then DEFAULT_WINDOW_SIZE. Regarding CI/CD failures,would be resolving them(possibly due to strict-type check introduced after the merge). Also,squashed all commits earlier into single one to make the review easier.

Alright, well done @kaneki003. I will review them right away.

@paschal533
Copy link
Contributor

Hi @kaneki003 Thanks for making these changes and tackling the window update issue. You’re on the right track, and I appreciate the effort to align with the Yamux spec. The timeout in the write method and the skip_lock option in send_window_update are great additions, nice work on those. However, there are a couple of issues in the read method that we should fix to make sure everything works as expected. The window update in read is a good step, but there are a couple of problems.
Decrementing self.recv_window when reading data isn’t right. In Yamux, the receiver’s recv_window tracks how much data it’s willing to accept, and it shouldn’t decrease on read, that’s the sender’s send_window job. This could mess up flow control.
The window update only happens for specific-size reads (n > 0), but not when reading until EOF (n == -1). Since n == -1 clears the buffer, it should also trigger a window update. The window update increment relies on DEFAULT_WINDOW_SIZE - self.recv_window, which might not reflect the actual buffer state, especially with the recv_window decrement. Overall, the changes are solid, but the read method needs these changes to get flow control right.

@kaneki003
Copy link
Contributor Author

kaneki003 commented Jun 11, 2025

Thanks @paschal533 ,for the review.
Regarding recv_window,I was thinking like this workflow.
Peer1-->sender
Peer2-->receiver

1.Peer1 ------> Peer2 (12Kb data sent)
2.Peer1 send_window reduces by 12kbs
3.Peer2 stores the received data in it's buffer
4.Peer2 receive_window reduces also by 12kbs
5.When the Peer2 reads the message,the buffer again frees up, and receive_window shall be replenished by the data read.

This workflow was thought keeping in mind that the buffer will not be over-whelmed during high amount of message sending.

However,i changed it after understanding from your comment about how it's supposed to work:

However, the recv_window in our code doesn’t directly shrink as the buffer fills. Instead, it’s a value we advertise to the sender to say how much data we’re ready to accept. The sender uses its send_window to track this, reducing it as it sends data within our recv_window.

When the receiver reads data via the read method, it clears out the buffer, freeing up space, just like you said. But the recv_window doesn’t automatically get replenished when we read from the buffer. We have to explicitly send a window update (through send_window_update) to bump up recv_window and let the sender know it can send more data by increasing its send_window. In our implementation, this usually happens in the write method when the sender’s send_window gets low (below half the default size), or if we manually call send_window_update.

So,i shifted to above workflow,where the it's updated during read instead of buffer fill up.
If we don't decrement the recv_window in read method and buffer receive,then where it would take place?
By decrementing in read,it is just seems to be reducing but actually it's checking the current limit it has already received,and when it's less than half DEFAULT_WINDOW_SIZE,it sends the window_update to sender,thereby replenishing both send_window of sender(by sending packet of window_update) and recv_window of it's own(in the line self.recv_window += increment_value).
As in yamux,the receiver sends window update,when it detects that recv_window size is less than half of DEFAULT_WINDOW_SIZE.The sender cannot update/access receiver recv_window,to track it uses send_window,and waits for window_update from receiver.

@seetadev
Copy link
Contributor

@kaneki003 and @paschal533 : Nice effort and great set of initiatives indeed. Appreciate it.

Wish to share a feedback. It would be great if you could create a discussion thread for this PR and do these detailed discussions over there. This will help a number of initiatives around Yamux and Mplex.

@paschal533
Copy link
Contributor

paschal533 commented Jun 11, 2025

@kaneki003 and @paschal533 : Nice effort and great set of initiatives indeed. Appreciate it.

Wish to share a feedback. It would be great if you could create a discussion thread for this PR and do these detailed discussions over there. This will help a number of initiatives around Yamux and Mplex.

Okay @seetadev, I will create a discussion thread for this PR. @kaneki003 I will notify you once I do so. Let's continue the discussion there.

@kaneki003
Copy link
Contributor Author

Summary of changes made after last review:

  • Replaced decrement of recv_window with increment following Yamux specs.
  • Added support for correct send_window_updates when reading till EOF (n==-1) case.
  • Resolved build fails with proper type casting.

Work to be done next:

  • Look into how we can use DEFAULT_WINDOW_SIZE // 2 while sending updates,since in current fix it is sent each time read is done,causing performance issue.

    • Possible fix for above is using a counter in read() function which will track the amount of data read till now in n==-1 case ,and when the counter is greater than DEFAULT_WINDOW_SIZE // 2,we shall send window_update of size counter,and reducing it back to 0.This would help in reducing overhead in n==-1 case.For n!=-1,we'll send the window_update each time we read n bytes from the stream.
  • Start with MPlex stream stress-testing to point out concurrency-failures and fix those.

CCing: @paschal533 , @seetadev for possible suggestions and feedback

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants