Skip to content

Commit 91407d6

Browse files
committed
interop utilities for mplex ping
1 parent 4a53fc3 commit 91407d6

File tree

7 files changed

+299
-1
lines changed

7 files changed

+299
-1
lines changed

interop/README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
These commands are to be run in `./interop/exec`
2+
3+
## Redis
4+
5+
```bash
6+
docker run -p 6379:6379 -it redis:latest
7+
```
8+
9+
## Listener
10+
11+
```bash
12+
transport=tcp ip=0.0.0.0 is_dialer=false redis_addr=6379 test_timeout_seconds=180 security=insecure muxer=mplex python3 native_ping.py
13+
```
14+
15+
## Dialer
16+
17+
```bash
18+
transport=tcp ip=0.0.0.0 is_dialer=true port=8001 redis_addr=6379 port=8001 test_timeout_seconds=180 security=insecure muxer=mplex python3 native_ping.py
19+
```

interop/__init__.py

Whitespace-only changes.

interop/arch.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
from dataclasses import (
2+
dataclass,
3+
)
4+
5+
import multiaddr
6+
import redis
7+
import trio
8+
9+
from libp2p import (
10+
new_host,
11+
)
12+
from libp2p.crypto.keys import (
13+
KeyPair,
14+
)
15+
from libp2p.crypto.rsa import (
16+
create_new_key_pair,
17+
)
18+
from libp2p.custom_types import (
19+
TProtocol,
20+
)
21+
from libp2p.security.insecure.transport import (
22+
PLAINTEXT_PROTOCOL_ID,
23+
InsecureTransport,
24+
)
25+
import libp2p.security.secio.transport as secio
26+
from libp2p.stream_muxer.mplex.mplex import (
27+
MPLEX_PROTOCOL_ID,
28+
Mplex,
29+
)
30+
31+
32+
def generate_new_rsa_identity() -> KeyPair:
33+
return create_new_key_pair()
34+
35+
36+
async def build_host(transport: str, ip: str, port: str, sec_protocol: str, muxer: str):
37+
match (sec_protocol, muxer):
38+
case ("insecure", "mplex"):
39+
key_pair = create_new_key_pair()
40+
host = new_host(
41+
key_pair,
42+
{MPLEX_PROTOCOL_ID: Mplex},
43+
{
44+
TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair),
45+
TProtocol(secio.ID): secio.Transport(key_pair),
46+
},
47+
)
48+
muladdr = multiaddr.Multiaddr(f"/ip4/{ip}/tcp/{port}")
49+
return (host, muladdr)
50+
case _:
51+
raise ValueError("Protocols not supported")
52+
53+
54+
@dataclass
55+
class RedisClient:
56+
client: redis.Redis
57+
58+
def brpop(self, key: str, timeout: float) -> list[str]:
59+
result = self.client.brpop([key], timeout)
60+
return [result[1]] if result else []
61+
62+
def rpush(self, key: str, value: str) -> None:
63+
self.client.rpush(key, value)
64+
65+
66+
async def main():
67+
client = RedisClient(redis.Redis(host="localhost", port=6379, db=0))
68+
client.rpush("test", "hello")
69+
print(client.blpop("test", timeout=5))
70+
71+
72+
if __name__ == "__main__":
73+
trio.run(main)

interop/exec/config/mod.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
from dataclasses import (
2+
dataclass,
3+
)
4+
import os
5+
from typing import (
6+
Optional,
7+
)
8+
9+
10+
def str_to_bool(val: str) -> bool:
11+
return val.lower() in ("true", "1")
12+
13+
14+
class ConfigError(Exception):
15+
"""Raised when the required environment variables are missing or invalid"""
16+
17+
18+
@dataclass
19+
class Config:
20+
transport: str
21+
sec_protocol: Optional[str]
22+
muxer: Optional[str]
23+
ip: str
24+
is_dialer: bool
25+
test_timeout: int
26+
redis_addr: str
27+
port: str
28+
29+
@classmethod
30+
def from_env(cls) -> "Config":
31+
try:
32+
transport = os.environ["transport"]
33+
ip = os.environ["ip"]
34+
except KeyError as e:
35+
raise ConfigError(f"{e.args[0]} env variable not set") from None
36+
37+
try:
38+
is_dialer = str_to_bool(os.environ.get("is_dialer", "true"))
39+
test_timeout = int(os.environ.get("test_timeout", "180"))
40+
except ValueError as e:
41+
raise ConfigError(f"Invalid value in env: {e}") from None
42+
43+
redis_addr = os.environ.get("redis_addr", 6379)
44+
sec_protocol = os.environ.get("security")
45+
muxer = os.environ.get("muxer")
46+
port = os.environ.get("port", "8000")
47+
48+
return cls(
49+
transport=transport,
50+
sec_protocol=sec_protocol,
51+
muxer=muxer,
52+
ip=ip,
53+
is_dialer=is_dialer,
54+
test_timeout=test_timeout,
55+
redis_addr=redis_addr,
56+
port=port,
57+
)

interop/exec/native_ping.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import trio
2+
3+
from interop.exec.config.mod import (
4+
Config,
5+
ConfigError,
6+
)
7+
from interop.lib import (
8+
run_test,
9+
)
10+
11+
12+
async def main() -> None:
13+
try:
14+
config = Config.from_env()
15+
except ConfigError as e:
16+
print(f"Config error: {e}")
17+
return
18+
19+
# Uncomment and implement when ready
20+
_ = await run_test(
21+
config.transport,
22+
config.ip,
23+
config.port,
24+
config.is_dialer,
25+
config.test_timeout,
26+
config.redis_addr,
27+
config.sec_protocol,
28+
config.muxer,
29+
)
30+
31+
32+
if __name__ == "__main__":
33+
trio.run(main)

interop/lib.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
from dataclasses import (
2+
dataclass,
3+
)
4+
import json
5+
import time
6+
7+
from loguru import (
8+
logger,
9+
)
10+
import multiaddr
11+
import redis
12+
import trio
13+
14+
from interop.arch import (
15+
RedisClient,
16+
build_host,
17+
)
18+
from libp2p.custom_types import (
19+
TProtocol,
20+
)
21+
from libp2p.network.stream.net_stream import (
22+
INetStream,
23+
)
24+
from libp2p.peer.peerinfo import (
25+
info_from_p2p_addr,
26+
)
27+
28+
PING_PROTOCOL_ID = TProtocol("/ipfs/ping/1.0.0")
29+
PING_LENGTH = 32
30+
RESP_TIMEOUT = 60
31+
32+
33+
async def handle_ping(stream: INetStream) -> None:
34+
while True:
35+
try:
36+
payload = await stream.read(PING_LENGTH)
37+
peer_id = stream.muxed_conn.peer_id
38+
if payload is not None:
39+
print(f"received ping from {peer_id}")
40+
41+
await stream.write(payload)
42+
print(f"responded with pong to {peer_id}")
43+
44+
except Exception:
45+
await stream.reset()
46+
break
47+
48+
49+
async def send_ping(stream: INetStream) -> None:
50+
try:
51+
payload = b"\x01" * PING_LENGTH
52+
print(f"sending ping to {stream.muxed_conn.peer_id}")
53+
54+
await stream.write(payload)
55+
56+
with trio.fail_after(RESP_TIMEOUT):
57+
response = await stream.read(PING_LENGTH)
58+
59+
if response == payload:
60+
print(f"received pong from {stream.muxed_conn.peer_id}")
61+
62+
except Exception as e:
63+
print(f"error occurred: {e}")
64+
65+
66+
async def run_test(
67+
transport, ip, port, is_dialer, test_timeout, redis_addr, sec_protocol, muxer
68+
):
69+
logger.info("Starting run_test")
70+
71+
redis_client = RedisClient(
72+
redis.Redis(host="localhost", port=int(redis_addr), db=0)
73+
)
74+
(host, listen_addr) = await build_host(transport, ip, port, sec_protocol, muxer)
75+
logger.info(f"Running ping test local_peer={host.get_id()}")
76+
77+
async with host.run(listen_addrs=[listen_addr]), trio.open_nursery() as nursery:
78+
if not is_dialer:
79+
host.set_stream_handler(PING_PROTOCOL_ID, handle_ping)
80+
ma = f"{listen_addr}/p2p/{host.get_id().pretty()}"
81+
redis_client.rpush("listenerAddr", ma)
82+
83+
logger.info(f"Test instance, listening: {ma}")
84+
else:
85+
redis_addr = redis_client.brpop("listenerAddr", timeout=5)
86+
destination = redis_addr[0].decode()
87+
maddr = multiaddr.Multiaddr(destination)
88+
info = info_from_p2p_addr(maddr)
89+
90+
handshake_start = time.perf_counter()
91+
92+
await host.connect(info)
93+
stream = await host.new_stream(info.peer_id, [PING_PROTOCOL_ID])
94+
95+
logger.info("Remote conection established")
96+
nursery.start_soon(send_ping, stream)
97+
98+
handshake_plus_ping = (time.perf_counter() - handshake_start) * 1000.0
99+
100+
logger.info(f"handshake time: {handshake_plus_ping:.2f}ms")
101+
return
102+
103+
await trio.sleep_forever()
104+
105+
106+
@dataclass
107+
class Report:
108+
handshake_plus_one_rtt_millis: float
109+
ping_rtt_millis: float
110+
111+
def gen_report(self):
112+
return json.dumps(self.__dict__)

setup.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,14 @@
3737
"pytest-trio>=0.5.2",
3838
"factory-boy>=2.12.0,<3.0.0",
3939
],
40+
"interop": ["redis==6.1.0", "logging==0.4.9.6" "loguru==0.7.3"],
4041
}
4142

4243
extras_require["dev"] = (
43-
extras_require["dev"] + extras_require["docs"] + extras_require["test"]
44+
extras_require["dev"]
45+
+ extras_require["docs"]
46+
+ extras_require["test"]
47+
+ extras_require["interop"]
4448
)
4549

4650
try:

0 commit comments

Comments
 (0)