Open
Description
Bug report
Bug description:
For a multi-dimensional memory view, if the socket sends only part of the memory view, then the view will be truncated to an empty slice. As a result, the write logic finishes by sending only part of the data.
class _SelectorSocketTransport(_SelectorTransport):
def write(self, data):
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError(f'data argument must be a bytes-like object, '
f'not {type(data).__name__!r}')
if self._eof:
raise RuntimeError('Cannot call write() after write_eof()')
if self._empty_waiter is not None:
raise RuntimeError('unable to write; sendfile is in progress')
if not data:
return
if self._conn_lost:
if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
logger.warning('socket.send() raised exception.')
self._conn_lost += 1
return
if not self._buffer:
# Optimization: try to send now.
try:
print(f"send1 {self._sock} {data.nbytes if isinstance(data, memoryview) else len(data)}")
if isinstance(data, memoryview):
print(data.shape, data.ndim, data.strides, data.suboffsets)
n = self._sock.send(data)
print(f"send2 {self._sock} {n} {len(bytes(data))}")
except (BlockingIOError, InterruptedError):
pass
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
self._fatal_error(exc, 'Fatal write error on socket transport')
return
else:
data = memoryview(data)[n:]
print(f"send3 {bool(data)}")
if not data:
return
# Not all was written; register write handler.
self._loop._add_writer(self._sock_fd, self._write_ready)
# Add it to the buffer.
self._buffer.append(data)
self._maybe_pause_protocol()
Sample code to reproduce:
import asyncio
import tempfile
import numpy as np
q = asyncio.Queue()
arr = np.zeros((1, 300, 200), dtype=np.float16)
mv = memoryview(arr)
print(
len(mv),
mv.shape,
mv.format,
mv.ndim,
mv.strides,
mv.contiguous,
mv.c_contiguous,
mv.f_contiguous,
)
async def handle_echo(reader, writer):
print(f"readexactly: {mv.nbytes}")
data = await reader.readexactly(mv.nbytes)
print(f"readexactly done: {len(data)}")
writer.write(b"OK")
await writer.drain()
writer.close()
await writer.wait_closed()
async def test_server():
# Create a temporary file to get a unique path and then delete it
with tempfile.NamedTemporaryFile(delete=True) as tmp:
socket_path = tmp.name
server = await asyncio.start_unix_server(handle_echo, path=socket_path)
addr = socket_path
await q.put(addr)
print(f"Serving on {addr}")
async with server:
await server.serve_forever()
async def _test_send_multi_dimension_memoryview():
svr = asyncio.create_task(test_server())
try:
addr = await q.get()
reader, writer = await asyncio.open_unix_connection(addr)
writer.write(mv)
await writer.drain()
ok = await reader.readexactly(2)
print(ok)
finally:
svr.cancel()
asyncio.run(_test_send_multi_dimension_memoryview())
Output (hang):
1 (1, 300, 200) e 3 (120000, 400, 2) True True False
Serving on /var/folders/r6/h3hc6kj91s9czcyds6fh0yqh0000gn/T/tmp_9d7k3co
send1 <socket.socket fd=7, family=1, type=1, proto=0, raddr=/var/folders/r6/h3hc6kj91s9czcyds6fh0yqh0000gn/T/tmp_9d7k3co> 120000
(1, 300, 200) 3 (120000, 400, 2) ()
send2 <socket.socket fd=7, family=1, type=1, proto=0, raddr=/var/folders/r6/h3hc6kj91s9czcyds6fh0yqh0000gn/T/tmp_9d7k3co> 8192 120000
send3 False
readexactly: 120000
The test case should return without hang.
CPython versions tested on:
3.11 & 3.12 & main(bda1218)
Operating systems tested on:
macOS
Metadata
Metadata
Assignees
Projects
Status
Todo