-
Notifications
You must be signed in to change notification settings - Fork 640
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support AnyIO #2045
Support AnyIO #2045
Conversation
Awesome, thanks! Let me know when you think it's ready for review or if you have any questions. The edge-triggered FD stuff is very fiddly. |
It's still a WIP, but it would be nice to know what you think about it @minrk. |
Yeah, It may be that the coroutine/wait_readable approach is different enough in design that it would be better to start The hardest part, I think, will be correctly handling the edge-triggered FD with multiple awaiters, especially since it is hard to ensure we provoke this situation thoroughly in tests. Any event that consumes socket events must necessarily wake all tasks that are waiting on the FD or they can hang. We'll need to be thorough about the implications of a socket being used in multiple task groups, because all waiting tasks must wake. If that's a problem, we'll need to make sure to have locks to prevent the situations that won't work from happening. Needing to wake tasks includes accessing the Plus we'll need a |
assert recvd == [b"hi", b"there"] | ||
|
||
|
||
@mark.skipif(not hasattr(zmq, "SNDTIMEO"), reason="requires SNDTIMEO") | ||
async def test_send_timeout(socket): | ||
s = socket(zmq.PUSH) | ||
s.sndtimeo = 100 | ||
with pytest.raises(zmq.Again): | ||
with pytest.raises(ExceptionGroup) as excinfo: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hopefully this isn't required when it's wrapped up. A single coroutine should raise a single error, right? This would be a major breaking change and a significant degradation of the API.
Here's a very simple anyio example implementing async recv using only the blocking pyzmq API. It doesn't handle the edge-triggered nuances or timeouts, but it gets the very basics. |
if key == EVENTS: | ||
self._schedule_remaining_events(result) | ||
# if key == EVENTS: | ||
# self._schedule_remaining_events(result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is one of the things we will need to keep some version of. This is part of handling the edge-triggered FD: all tasks waiting on wait_readable
must wake if EVENTS is accessed from anywhere.
Yes I think you're right that it's safer to have a separate implementation just for AnyIO, for backwards-compatibility reasons. I'm not sure anything I've done will remain then, since I was hoping to replace the asyncio layer with AnyIO, but now we can start from scratch as you did in your example. |
I think either way is fine for the separate package or here. A separate package can also make sense as a prototyping pathway to release often and avoid needing to wait for pyzmq releases. It really shouldn't need changes in pyzmq, though I do occasionally have to do some cooperation to get all the serialization methods to work well with the async subclasses without too much duplication. For example, outside pyzmq it would be totally appropriate to only support low-level send/recv_multipart, not all the serialization extensions. That doesn't fit in the pyzmq socket subclass approach where all methods need to work, and use a "has a" rather than "is a" relationship, which type systems like a lot better since subclass != subtype. We can always merge upstream here if/when it seems like 'the way to go' for async pyzmq, or not. We did that with |
(btw, what you have can still remain, and you can copy the changed files to Maybe it should even be a function-based approach ( |
Sounds good, I'll create a new package then. |
One simple way would be to run pyzmq's blocking APIs in a thread, but I guess that would lead to very bad performances. |
I have started https://github.com/davidbrochart/zmq-anyio so I'm closing this for now. Thanks for the feedback @minrk ! |
Closes #1827.