Skip to content

Commit a2d8917

Browse files
authored
fix socket communications (#81)
For the `UdpWriter`, only open the socket once, during initialization. While this has not been an issue in practice, and performs comparably, it leads to port churn on the host that uses this. A single open socket will lead to a single port being used for communication. The `FileWriter` is not sufficient for communicating with Unix Domain Sockets, and we need to add a `UnixWriter` to handle this correctly. Verified in tests.
1 parent 6173588 commit a2d8917

File tree

8 files changed

+96
-10
lines changed

8 files changed

+96
-10
lines changed

setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[metadata]
22
name = netflix-spectator-py
3-
version = 1.0.3
3+
version = 1.0.4
44
description = Library for reporting metrics from Python applications to SpectatorD and the Netflix Atlas Timeseries Database.
55
long_description = file: README.md
66
long_description_content_type = text/markdown

spectator/writer/new_writer.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from spectator.writer.memory_writer import MemoryWriter
77
from spectator.writer.noop_writer import NoopWriter
88
from spectator.writer.udp_writer import UdpWriter
9+
from spectator.writer.unix_writer import UnixWriter
910

1011
WriterUnion = Union[FileWriter, MemoryWriter, NoopWriter, UdpWriter]
1112

@@ -15,7 +16,8 @@ def is_valid_output_location(location: str) -> bool:
1516
return False
1617
return location in ["none", "memory", "stdout", "stderr", "udp", "unix"] or \
1718
location.startswith("file://") or \
18-
location.startswith("udp://")
19+
location.startswith("udp://") or \
20+
location.startswith("unix://")
1921

2022

2123
def new_writer(location: str) -> WriterUnion:
@@ -37,16 +39,17 @@ def new_writer(location: str) -> WriterUnion:
3739
writer = UdpWriter(location, address)
3840
elif location == "unix":
3941
# default unix domain socket for spectatord
40-
location = "file:///run/spectatord/spectatord.unix"
41-
file = open(urlparse(location).path, "a", encoding="utf-8")
42-
writer = FileWriter(location, file)
42+
location = "unix:///run/spectatord/spectatord.unix"
43+
writer = UnixWriter(urlparse(location).path)
4344
elif location.startswith("file://"):
4445
file = open(urlparse(location).path, "a", encoding="utf-8")
4546
writer = FileWriter(location, file)
4647
elif location.startswith("udp://"):
4748
parsed = urlparse(location)
4849
address = (parsed.hostname, parsed.port)
4950
writer = UdpWriter(location, address)
51+
elif location.startswith("unix://"):
52+
writer = UnixWriter(urlparse(location).path)
5053
else:
5154
raise ValueError(f"unsupported Writer location: {location}")
5255

spectator/writer/udp_writer.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,15 @@ def __init__(self, location: str, address: Tuple[str, int]) -> None:
2222
# anything that does not appear to be an IPv4 or IPv6 address (i.e. hostnames)
2323
self._family = socket.AF_INET
2424

25+
self._sock = socket.socket(family=self._family, type=socket.SOCK_DGRAM)
26+
2527
def write(self, line: str) -> None:
2628
self._logger.debug("write line=%s", line)
2729

2830
try:
29-
with socket.socket(self._family, socket.SOCK_DGRAM) as s:
30-
s.sendto(bytes(line, encoding="utf-8"), self._address)
31+
self._sock.sendto(bytes(line, encoding="utf-8"), self._address)
3132
except IOError:
3233
self._logger.error("failed to write line=%s", line)
3334

3435
def close(self) -> None:
35-
pass
36+
self._sock.close()

spectator/writer/unix_writer.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import socket
2+
3+
from spectator.writer import Writer
4+
5+
6+
class UnixWriter(Writer):
7+
"""Writer that outputs data to a Unix Domain Socket."""
8+
9+
def __init__(self, location: str) -> None:
10+
super().__init__()
11+
self._logger.debug("initialize UnixWriter to %s", location)
12+
self._sock = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_DGRAM)
13+
self._location = location
14+
15+
def write(self, line: str) -> None:
16+
self._logger.debug("write line=%s", line)
17+
18+
try:
19+
self._sock.sendto(bytes(line, encoding="utf-8"), self._location)
20+
except IOError:
21+
self._logger.error("failed to write line=%s", line)
22+
23+
def close(self) -> None:
24+
self._sock.close()

tests/udp_server.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
class UdpServer:
66

7-
def __init__(self, address: Tuple[str, int] = ("127.0.0.1", 0)):
8-
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
7+
def __init__(self, address: Tuple[str, int] = ("127.0.0.1", 0)) -> None:
8+
self._sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
99
self._sock.bind(address)
1010

1111
def address(self) -> str:

tests/unix_server.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import os
2+
import random
3+
import socket
4+
5+
6+
class UnixServer:
7+
8+
def __init__(self, path: str = f"/tmp/spectatord-test-{random.randint(1, 10000)}") -> None:
9+
if os.path.exists(path):
10+
os.remove(path)
11+
self._path = path
12+
self._sock = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_DGRAM)
13+
self._sock.bind(path)
14+
15+
def address(self) -> str:
16+
return "unix://{}".format(self._path)
17+
18+
def read(self) -> str:
19+
data, _ = self._sock.recvfrom(1024)
20+
return data.decode(encoding="utf-8")
21+
22+
def close(self) -> None:
23+
self._sock.close()
24+
os.remove(self._path)

tests/writer/test_new_writer.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import unittest
2+
3+
from spectator.writer.new_writer import is_valid_output_location
4+
5+
6+
class IsValidOutputLocationTest(unittest.TestCase):
7+
8+
def test_is_valid_output_location(self):
9+
self.assertTrue(is_valid_output_location("none"))
10+
self.assertTrue(is_valid_output_location("memory"))
11+
self.assertTrue(is_valid_output_location("stdout"))
12+
self.assertTrue(is_valid_output_location("stderr"))
13+
self.assertTrue(is_valid_output_location("udp"))
14+
self.assertTrue(is_valid_output_location("unix"))
15+
self.assertTrue(is_valid_output_location("file://testfile.txt"))
16+
self.assertTrue(is_valid_output_location("udp://localhost:1234"))
17+
self.assertTrue(is_valid_output_location("unix:///tmp/socket.sock"))
18+
self.assertFalse(is_valid_output_location("invalid"))

tests/writer/test_unix_writer.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import unittest
2+
from contextlib import closing
3+
4+
from spectator import new_writer
5+
from ..unix_server import UnixServer
6+
7+
8+
class UnixWriterTest(unittest.TestCase):
9+
10+
def test_unix(self) -> None:
11+
with closing(UnixServer()) as server:
12+
with closing(new_writer(server.address())) as w:
13+
w.write("foo")
14+
self.assertEqual("foo", server.read())
15+
w.write("bar")
16+
self.assertEqual("bar", server.read())

0 commit comments

Comments
 (0)