Skip to content

feat: Adding stream state to the Network Stream #637

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

Sahilgill24
Copy link

@Sahilgill24 Sahilgill24 commented May 30, 2025

What was wrong?

Issue #632

The current implementation of network stream , was lacking a mechanism to track the lifecycle of a stream . Adding this would allow in getting better control over the stream as well as better error handling .

Follow the discussion to get more details #635

How was it fixed?

The implementation goes as follows

1: Define a StreamState enum as follows :-

class StreamState(Enum):
    INIT = auto()
    OPEN = auto()
    RESET = auto()
    CLOSED = auto()
    ERROR = auto()

2: Define two functions get_state and set_state , to view the current state and change the state of the stream respectively .

    @property
    def state(self) -> StreamState:
        return self._state

    def set_state(self, state: StreamState) -> None:
        self._state = state

3: On each operation such as read , write , close , reset and start change the state using the set_state function and add safeguards using get_state for better error handling. other examples can be seen in the files changed section.

    async def reset(self) -> None:
        self.set_state(StreamState.RESET)
        await self.muxed_stream.reset()

TO-DO

  • Clean up commit history

@Sahilgill24
Copy link
Author

Hi @seetadev , Could you please review the draft once , I wanted to know if I was going in the correct Direction .

@seetadev
Copy link
Contributor

seetadev commented Jun 1, 2025

@Sahilgill24 : Hi Sahil. Appreciate the effort and learning progress.

However, considerable CI/CD build issues needs to be addressed.

Please collaborate with @paschal533 and @sumanjeet0012 to arrive at a good conclusion on the PR.

@sumanjeet0012
Copy link
Contributor

sumanjeet0012 commented Jun 1, 2025

@Sahilgill24 Great to see your PR!

I have reviewed your submission and would like to highlight a few issues:

  1. Linting issues:

black....................................................................Failed

  • hook id: black
  • files were modified by this hook

reformatted libp2p/network/connection/swarm_connection.py

All done! ✨ 🍰 ✨
1 file reformatted, 218 files left unchanged.

flake8...................................................................Passed

These minor linting issues can be resolved by running: pre-commit run --all-files.
This command will automatically reformat the code and fix the relevant issues.
Please ensure the updated files are staged and committed before pushing the changes.

  1. Core Issues:

+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/home/runner/work/py-libp2p/py-libp2p/libp2p/protocol_muxer/multiselect_communicator.py", line 35, in write
| await self.read_writer.write(msg_bytes)
| File "/home/runner/work/py-libp2p/py-libp2p/libp2p/network/stream/net_stream.py", line 103, in write
| raise StreamClosed("Cannot write to stream; not open")
| libp2p.network.stream.exceptions.StreamClosed: Cannot write to stream; not open
|
| The above exception was the direct cause of the following exception:

The primary error here is: raise StreamClosed("Cannot write to stream; not open").

Source of error:

async def read(self, n: int = None) -> bytes:
"""
Read from stream.

    :param n: number of bytes to read
    :return: bytes of input
    """
    try:
        if self.state != StreamState.OPEN:
            raise StreamClosed("Cannot read from stream; not open")
        else:
            return await self.muxed_stream.read(n)

OR

async def write(self, data: bytes) -> None:
"""
Write to stream.

    :return: number of bytes written
    """
    try:
        if self.state != StreamState.OPEN:
            raise StreamClosed("Cannot write to stream; not open")
        else:
            await self.muxed_stream.write(data)

Reason for the error:

when new streams are created, their state is not always set to OPEN before protocol muxer attempts to write to them. As a result, writing fails with "Cannot write to stream; not open", which bubbles up as failures in various protocol and host tests, such as test_ping_once, test_ping_several, and multiple protocol example tests.

streams_open: tuple[NetStream, ...] = self.get_streams()
for stream in streams_open:
""" Set the state of the stream to OPEN """
stream.set_state(StreamState.OPEN)

This will set the state to OPEN for all streams currently returned by self.get_streams() at the time start() is called.

However, this only affects streams that already exist when start() runs. It does NOT affect:

  1. New streams created after start() runs (such as those created via new_stream or when handling new incoming streams).

  2. Streams created in code paths that do not call start() or do not use get_streams().

That means any new NetStream instance created after this point will still have the default state (INIT), and unless something else sets the state to OPEN before any read/write, your state check will block I/O with "Cannot write to stream; not open".

For debugging purposes, you may configure the logger to output the stream’s state at the point where the error occurs.

You should now identify all the different code paths where streams are created and ensure that the appropriate state is explicitly set for each newly created stream.

@Sahilgill24
Copy link
Author

Sahilgill24 commented Jun 1, 2025

Hi @sumanjeet0012 , thanks alot for the review .

  1. Linting issues:

Actually the pytest command and pre-commit run --all-files command on my local machine are both giving Crypto Module not found error , which I am trying to figure out . So this time I tried running the tox command and it is passing all the tests . This was also bad on my part for not trying it out thoroughly once .

2. Core Issues:

Regarding the "Stream closed " Error , I had missed the step as you mentioned and had added the stream state to open , directly to the start() function so only the existing streams were getting their state set to OPEN but it should also have been added to the _add_stream() function which is called by the new_stream function , which should resolve the issue as it will set the state to OPEN for all the new incoming streams and using tox on my local machine , all the tests are also passing . I am currently trying to figure out the reason for the Module not found error , because the tox checks take about 40 minutes to complete .
Update : The error for the Crypto module has gone , it was due to another crypto library which was interfering with the pycryptodome one .

@Sahilgill24 Sahilgill24 marked this pull request as ready for review June 1, 2025 19:18
@Sahilgill24 Sahilgill24 marked this pull request as draft June 1, 2025 19:19
@Sahilgill24 Sahilgill24 marked this pull request as ready for review June 2, 2025 17:57
@@ -68,16 +101,23 @@ async def write(self, data: bytes) -> None:
:return: number of bytes written
"""
try:
await self.muxed_stream.write(data)
if self.state != StreamState.OPEN:
raise StreamClosed("Cannot write to stream; not open")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you could consider using:
elif self.state != StreamState.OPEN: raise StreamClosed("Cannot read from stream; not open")
in the write method as well, to maintain consistency with the read method.

Or is there any specific reason of not using it ?

Copy link
Author

@Sahilgill24 Sahilgill24 Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @sumanjeet0012 ,so There was a reason for this .
In the tests test_net_stream.py , if we check the local and remote "reset then read" tests here , they both expect to give a StreamReset exception to be raised , whereas the tests for the local and remote "reset then write" tests here expects a StreamClosed exception to be raised .

but I am not sure whether there was some other reason for showing the StreamClosed exception in the tests .
I think for the local test we could show the StreamReset exception but for the remote part I am a bit confused myself

Copy link
Contributor

@mystical-prog mystical-prog Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe @sumanjeet0012 is correct that there should be consistency across read and write, the root cause lies in the yamux implementation, I am trying some things out, will get back with a detailed suggestion soon.

Copy link
Contributor

@sumanjeet0012 sumanjeet0012 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Sahilgill24 Everything looks good. Just an optional suggestion regarding the write method —
you might consider using the StreamReset error there as well.

@Sahilgill24
Copy link
Author

@sumanjeet0012 , I have mentioned a reason for it in the review , please take a look at it once . Thanks !

@sumanjeet0012
Copy link
Contributor

sumanjeet0012 commented Jun 5, 2025

@mystical-prog can you take a look at our discussion and provide some suggestions.

@mystical-prog
Copy link
Contributor

hey @Sahilgill24 @sumanjeet0012

I am currently reviewing the entire conversation and changes, I will ping as soon as I have any suggestions or changes, hoping to unblock you guys ASAP

@Sahilgill24
Copy link
Author

Sahilgill24 commented Jun 6, 2025

Hey @seetadev @pacrob , could you review it once , The work is finished .
Thanks to @sumanjeet0012 , @mystical-prog for helping with identifying the errors and correcting the tests .

@seetadev
Copy link
Contributor

seetadev commented Jun 9, 2025

@Sahilgill24 : Thank you for sharing feedback.

We are reviewing the PR and will share pointers soon. Appreciate the great contribution by you, @sumanjeet0012 and @mystical-prog.

@Sahilgill24 Sahilgill24 force-pushed the feat/issue-632-adding-stream-state branch from 801ceec to 286752c Compare June 10, 2025 13:19
@Sahilgill24 Sahilgill24 reopened this Jun 11, 2025
@Sahilgill24
Copy link
Author

Sahilgill24 commented Jun 12, 2025

@mystical-prog @sumanjeet0012 @seetadev so while implementing the remove function as mentioned in #663 .

The flow for the remove function should be simple like :

NetStream.reset() -> NetStream.remove() -> SwarmConn.remove_stream() -> Swarm.notify_closed_stream()

The main issue with this is that we would need a SwarmConn reference in the net_stream.py file , to be able to call the remove_stream function() and then just using SwarmConn.swarm.notify_closed_stream(), but importing the SwarmConn Class to the netstream file would cause a Circular import error between the two files.

Any suggestions to solve this or how we can by-pass this error ?

@seetadev
Copy link
Contributor

The flow for the remove function should be simple like :

NetStream.reset() -> NetStream.remove() -> SwarmConn.remove_stream() -> Swarm.notify_closed_stream()

The main issue with this is that we would need a SwarmConn reference in the net_stream.py file , to be able to call the remove_stream function() and then just using SwarmConn.swarm.notify_closed_stream(), but importing the SwarmConn Class to the netstream file would cause a Circular import error between the two files.

Any suggestions to solve this or how we can by-pass this error ?

@Sahilgill24 : Thanks for laying out the flow so clearly — that definitely looks like the right sequence for cleanly removing a stream and notifying the swarm.

You're right about the circular import issue between net_stream.py and swarm_conn.py — this is a classic dependency problem when two components need to talk to each other but also depend on each other’s internals.

A couple of suggestions that could help us resolve or bypass the circular import:

Use Dependency Injection: Instead of importing SwarmConn directly inside net_stream.py, we could pass a reference to the SwarmConn instance into the NetStream when it’s initialized. This way, NetStream doesn’t need to import SwarmConn; it just holds a reference and calls remove_stream() directly when needed. This is clean and avoids tight coupling.

Event Callback Pattern: Alternatively, NetStream could accept a generic on_remove callback that abstracts the logic for what should happen when a stream is removed. That callback can be defined in SwarmConn, and NetStream doesn’t need to know the specifics of SwarmConn at all — just that it needs to call on_remove().

Late Import: If the dependency is minimal and the usage is only in one method (e.g., during remove()), a lazy or inline import inside the function body can sometimes work. It’s not the most elegant solution, but it can break the cycle without refactoring the whole structure.

My vote would go to the first approach — passing the SwarmConn reference directly — since it's simple, avoids circular imports, and keeps the data flow clear and testable.

CCing @pacrob , @mystical-prog and @sumanjeet0012 for their feedback, pointers and thoughts too.

@Sahilgill24
Copy link
Author

@seetadev, Thanks for the detailed explanation.
I was thinking that we could add the swarm_conn as an attribute and then explicitly write a function set_swarm_conn , and call that when the streams are being started. Although it won't look neat.
But directly passing a SwarmConn instance will also do the same I believe and would be more neat and a better approach in general.

@seetadev
Copy link
Contributor

Thanks for the detailed explanation.
I was thinking that we could add the swarm_conn as an attribute and then explicitly write a function set_swarm_conn , and call that when the streams are being started. Although it won't look neat.
But directly passing a SwarmConn instance will also do the same I believe and would be more neat and a better approach in general.

@Sahilgill24 : Thank you for the thoughtful suggestion and for engaging deeply with the design considerations here.

Thank you for sharing this note — while adding swarm_conn as an attribute and setting it later via a set_swarm_conn() function would technically work, it could indeed make the lifecycle handling more complex and less intuitive. Passing the SwarmConn instance directly during initialization is a cleaner and more maintainable approach. It keeps the dependencies explicit, aligns well with the overall design principles we're following in the codebase, and makes it easier to reason about the object's state.

Also looping in @pacrob , @mystical-prog, @sumanjeet0012 and @AkMo3 here — wish to share the PR brings notable change on many fronts related to the Network stream. @Sahilgill24 has marked it ready for review.

Wish if you could share feedback, thoughts or pointers. This will indeed impact a number of other key projects in Py-libp2p, which are currently in the pipeline too.

@seetadev
Copy link
Contributor

@Sahilgill24 : There is one CI/CD test failing too. Please check.

@Sahilgill24
Copy link
Author

Sahilgill24 commented Jun 17, 2025

@Sahilgill24 : There is one CI/CD test failing too. Please check.

@pacrob @sumanjeet0012 @mystical-prog

I am not sure, why this specific build is failing and the windows (3.12, wheel) is working, on getting the summary from copilot, I think it is some actions or build specific issue.

All the tests were passing on my local without any issue.

@pacrob
Copy link
Member

pacrob commented Jun 17, 2025

@Sahilgill24 : There is one CI/CD test failing too. Please check.

I believe these are failing due to lru-dict not having official support for python 3.13. I've noticed this failing in other places too.

It appears to be being tracked - amitdev/lru-dict#72 - but has no recent action from the maintainer.

As long as that is the only failing CI, we don't need to hold a merge up because of it.

Copy link
Collaborator

@AkMo3 AkMo3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Sahilgill24,
In my opinion the changes in the NetStream states are unwarrented. The reasons being:

  • We are moving from a finer state control that is designed for bidirectional stream states to unidirectional states.
  • The INIT and ERROR state seems to be redundant because when the NetStream is created, the transport is already functional, so the state of stream can be directly OPEN. Regarding ERROR state, when a error occurs, we are raising the errors and setting up proper states as per the error. RESET already covers stream is broken or unusable.
  • Removed thread safety. The thread safe functional calls prevented race conditions that are removed now.

Adding of swarm connection is a good idea and will remove hasattr and getattr. This should make the flow a lot cleaner

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants