Skip to content

Commit fda82ff

Browse files
authored
Merge branch 'main' into interop/py-rust
2 parents 12d141d + d020bbc commit fda82ff

File tree

22 files changed

+827
-30
lines changed

22 files changed

+827
-30
lines changed

examples/ping/ping.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ async def send_ping(stream: INetStream) -> None:
5757
async def run(port: int, destination: str) -> None:
5858
localhost_ip = "127.0.0.1"
5959
listen_addr = multiaddr.Multiaddr(f"/ip4/0.0.0.0/tcp/{port}")
60-
host = new_host()
60+
host = new_host(listen_addrs=[listen_addr])
6161

6262
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
6363
if not destination:

libp2p/__init__.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from collections.abc import (
22
Mapping,
3+
Sequence,
34
)
45
from importlib.metadata import version as __version
56
from typing import (
@@ -9,6 +10,8 @@
910
cast,
1011
)
1112

13+
import multiaddr
14+
1215
from libp2p.abc import (
1316
IHost,
1417
IMuxedConn,
@@ -154,6 +157,7 @@ def new_swarm(
154157
sec_opt: Optional[TSecurityOptions] = None,
155158
peerstore_opt: Optional[IPeerStore] = None,
156159
muxer_preference: Optional[Literal["YAMUX", "MPLEX"]] = None,
160+
listen_addrs: Optional[Sequence[multiaddr.Multiaddr]] = None,
157161
) -> INetworkService:
158162
"""
159163
Create a swarm instance based on the parameters.
@@ -163,6 +167,7 @@ def new_swarm(
163167
:param sec_opt: optional choice of security upgrade
164168
:param peerstore_opt: optional peerstore
165169
:param muxer_preference: optional explicit muxer preference
170+
:param listen_addrs: optional list of multiaddrs to listen on
166171
:return: return a default swarm instance
167172
168173
Note: Yamux (/yamux/1.0.0) is the preferred stream multiplexer
@@ -175,8 +180,16 @@ def new_swarm(
175180

176181
id_opt = generate_peer_id_from(key_pair)
177182

178-
# TODO: Parse `listen_addrs` to determine transport
179-
transport = TCP()
183+
if listen_addrs is None:
184+
transport = TCP()
185+
else:
186+
addr = listen_addrs[0]
187+
if addr.__contains__("tcp"):
188+
transport = TCP()
189+
elif addr.__contains__("quic"):
190+
raise ValueError("QUIC not yet supported")
191+
else:
192+
raise ValueError(f"Unknown transport in listen_addrs: {listen_addrs}")
180193

181194
# Generate X25519 keypair for Noise
182195
noise_key_pair = create_new_x25519_key_pair()
@@ -229,6 +242,7 @@ def new_host(
229242
peerstore_opt: Optional[IPeerStore] = None,
230243
disc_opt: Optional[IPeerRouting] = None,
231244
muxer_preference: Optional[Literal["YAMUX", "MPLEX"]] = None,
245+
listen_addrs: Sequence[multiaddr.Multiaddr] = None,
232246
) -> IHost:
233247
"""
234248
Create a new libp2p host based on the given parameters.
@@ -239,6 +253,7 @@ def new_host(
239253
:param peerstore_opt: optional peerstore
240254
:param disc_opt: optional discovery
241255
:param muxer_preference: optional explicit muxer preference
256+
:param listen_addrs: optional list of multiaddrs to listen on
242257
:return: return a host instance
243258
"""
244259
swarm = new_swarm(
@@ -247,6 +262,7 @@ def new_host(
247262
sec_opt=sec_opt,
248263
peerstore_opt=peerstore_opt,
249264
muxer_preference=muxer_preference,
265+
listen_addrs=listen_addrs,
250266
)
251267

252268
if disc_opt is not None:

libp2p/abc.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,14 @@
88
KeysView,
99
Sequence,
1010
)
11+
from types import (
12+
TracebackType,
13+
)
1114
from typing import (
1215
TYPE_CHECKING,
1316
Any,
1417
AsyncContextManager,
18+
Optional,
1519
)
1620

1721
from multiaddr import (
@@ -215,7 +219,7 @@ async def accept_stream(self) -> "IMuxedStream":
215219
"""
216220

217221

218-
class IMuxedStream(ReadWriteCloser):
222+
class IMuxedStream(ReadWriteCloser, AsyncContextManager["IMuxedStream"]):
219223
"""
220224
Interface for a multiplexed stream.
221225
@@ -249,6 +253,20 @@ def set_deadline(self, ttl: int) -> bool:
249253
otherwise False.
250254
"""
251255

256+
@abstractmethod
257+
async def __aenter__(self) -> "IMuxedStream":
258+
"""Enter the async context manager."""
259+
return self
260+
261+
async def __aexit__(
262+
self,
263+
exc_type: Optional[type[BaseException]],
264+
exc_val: Optional[BaseException],
265+
exc_tb: Optional[TracebackType],
266+
) -> None:
267+
"""Exit the async context manager and close the stream."""
268+
await self.close()
269+
252270

253271
# -------------------------- net_stream interface.py --------------------------
254272

libp2p/host/basic_host.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,29 @@ async def new_stream(
200200
net_stream.set_protocol(selected_protocol)
201201
return net_stream
202202

203+
async def send_command(self, peer_id: ID, command: str) -> list[str]:
204+
"""
205+
Send a multistream-select command to the specified peer and return
206+
the response.
207+
208+
:param peer_id: peer_id that host is connecting
209+
:param command: supported multistream-select command (e.g., "ls)
210+
:raise StreamFailure: If the stream cannot be opened or negotiation fails
211+
:return: list of strings representing the response from peer.
212+
"""
213+
new_stream = await self._network.new_stream(peer_id)
214+
215+
try:
216+
response = await self.multiselect_client.query_multistream_command(
217+
MultiselectCommunicator(new_stream), command
218+
)
219+
except MultiselectClientError as error:
220+
logger.debug("fail to open a stream to peer %s, error=%s", peer_id, error)
221+
await new_stream.reset()
222+
raise StreamFailure(f"failed to open a stream to peer {peer_id}") from error
223+
224+
return response
225+
203226
async def connect(self, peer_info: PeerInfo) -> None:
204227
"""
205228
Ensure there is a connection between this host and the peer

libp2p/protocol_muxer/multiselect.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,14 @@ async def negotiate(
6060
raise MultiselectError() from error
6161

6262
if command == "ls":
63-
# TODO: handle ls command
64-
pass
63+
supported_protocols = list(self.handlers.keys())
64+
response = "\n".join(supported_protocols) + "\n"
65+
66+
try:
67+
await communicator.write(response)
68+
except MultiselectCommunicatorError as error:
69+
raise MultiselectError() from error
70+
6571
else:
6672
protocol = TProtocol(command)
6773
if protocol in self.handlers:

libp2p/protocol_muxer/multiselect_client.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,36 @@ async def select_one_of(
8181

8282
raise MultiselectClientError("protocols not supported")
8383

84+
async def query_multistream_command(
85+
self, communicator: IMultiselectCommunicator, command: str
86+
) -> list[str]:
87+
"""
88+
Send a multistream-select command over the given communicator and return
89+
parsed response.
90+
91+
:param communicator: communicator to use to communicate with counterparty
92+
:param command: supported multistream-select command(e.g., ls)
93+
:raise MultiselectClientError: If the communicator fails to process data.
94+
:return: list of strings representing the response from peer.
95+
"""
96+
await self.handshake(communicator)
97+
98+
if command == "ls":
99+
try:
100+
await communicator.write("ls")
101+
except MultiselectCommunicatorError as error:
102+
raise MultiselectClientError() from error
103+
else:
104+
raise ValueError("Command not supported")
105+
106+
try:
107+
response = await communicator.read()
108+
response_list = response.strip().splitlines()
109+
except MultiselectCommunicatorError as error:
110+
raise MultiselectClientError() from error
111+
112+
return response_list
113+
84114
async def try_select(
85115
self, communicator: IMultiselectCommunicator, protocol: TProtocol
86116
) -> TProtocol:

libp2p/pubsub/gossipsub.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
)
1111
import logging
1212
import random
13+
import time
1314
from typing import (
1415
Any,
1516
DefaultDict,
@@ -80,8 +81,7 @@ class GossipSub(IPubsubRouter, Service):
8081
# The protocol peer supports
8182
peer_protocol: dict[ID, TProtocol]
8283

83-
# TODO: Add `time_since_last_publish`
84-
# Create topic --> time since last publish map.
84+
time_since_last_publish: dict[str, int]
8585

8686
mcache: MessageCache
8787

@@ -138,6 +138,7 @@ def __init__(
138138
self.direct_peers[direct_peer.peer_id] = direct_peer
139139
self.direct_connect_interval = direct_connect_interval
140140
self.direct_connect_initial_delay = direct_connect_initial_delay
141+
self.time_since_last_publish = {}
141142

142143
async def run(self) -> None:
143144
if self.pubsub is None:
@@ -253,6 +254,8 @@ async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
253254
except StreamClosed:
254255
logger.debug("Fail to publish message to %s: stream closed", peer_id)
255256
self.pubsub._handle_dead_peer(peer_id)
257+
for topic in pubsub_msg.topicIDs:
258+
self.time_since_last_publish[topic] = int(time.time())
256259

257260
def _get_peers_to_send(
258261
self, topic_ids: Iterable[str], msg_forwarder: ID, origin: ID
@@ -342,6 +345,7 @@ async def join(self, topic: str) -> None:
342345
await self.emit_graft(topic, peer)
343346

344347
self.fanout.pop(topic, None)
348+
self.time_since_last_publish.pop(topic, None)
345349

346350
async def leave(self, topic: str) -> None:
347351
# Note: the comments here are the near-exact algorithm description from the spec
@@ -514,10 +518,12 @@ def mesh_heartbeat(
514518

515519
def fanout_heartbeat(self) -> None:
516520
# Note: the comments here are the exact pseudocode from the spec
517-
for topic in self.fanout:
518-
# Delete topic entry if it's not in `pubsub.peer_topics`
519-
# or (TODO) if it's time-since-last-published > ttl
520-
if topic not in self.pubsub.peer_topics:
521+
for topic in list(self.fanout):
522+
if (
523+
topic not in self.pubsub.peer_topics
524+
and self.time_since_last_publish.get(topic, 0) + self.time_to_live
525+
< int(time.time())
526+
):
521527
# Remove topic from fanout
522528
del self.fanout[topic]
523529
else:

0 commit comments

Comments
 (0)