File: //opt/gsutil/third_party/urllib3/test/test_wait.py
from __future__ import annotations
import signal
import threading
import time
import typing
from socket import socket, socketpair
from types import FrameType
import pytest
from urllib3.util.wait import (
_have_working_poll,
poll_wait_for_socket,
select_wait_for_socket,
wait_for_read,
wait_for_socket,
wait_for_write,
)
TYPE_SOCKET_PAIR = typing.Tuple[socket, socket]
TYPE_WAIT_FOR = typing.Callable[..., bool]
@pytest.fixture
def spair() -> typing.Generator[TYPE_SOCKET_PAIR, None, None]:
a, b = socketpair()
yield a, b
a.close()
b.close()
variants: list[TYPE_WAIT_FOR] = [wait_for_socket, select_wait_for_socket]
if _have_working_poll():
variants.append(poll_wait_for_socket)
@pytest.mark.parametrize("wfs", variants)
def test_wait_for_socket(wfs: TYPE_WAIT_FOR, spair: TYPE_SOCKET_PAIR) -> None:
a, b = spair
with pytest.raises(RuntimeError):
wfs(a, read=False, write=False)
assert not wfs(a, read=True, timeout=0)
assert wfs(a, write=True, timeout=0)
b.send(b"x")
assert wfs(a, read=True, timeout=0)
assert wfs(a, read=True, timeout=10)
assert wfs(a, read=True, timeout=None)
# Fill up the socket with data
a.setblocking(False)
try:
while True:
a.send(b"x" * 999999)
except OSError:
pass
# Now it's not writable anymore
assert not wfs(a, write=True, timeout=0)
# But if we ask for read-or-write, that succeeds
assert wfs(a, read=True, write=True, timeout=0)
# Unless we read from it
assert a.recv(1) == b"x"
assert not wfs(a, read=True, write=True, timeout=0)
# But if the remote peer closes the socket, then it becomes readable
b.close()
assert wfs(a, read=True, timeout=0)
# Waiting for a socket that's actually been closed is just a bug, and
# raises some kind of helpful exception (exact details depend on the
# platform).
with pytest.raises(Exception):
wfs(b, read=True)
def test_wait_for_read_write(spair: TYPE_SOCKET_PAIR) -> None:
a, b = spair
assert not wait_for_read(a, 0)
assert wait_for_write(a, 0)
b.send(b"x")
assert wait_for_read(a, 0)
assert wait_for_write(a, 0)
# Fill up the socket with data
a.setblocking(False)
try:
while True:
a.send(b"x" * 999999)
except OSError:
pass
# Now it's not writable anymore
assert not wait_for_write(a, 0)
@pytest.mark.skipif(not hasattr(signal, "setitimer"), reason="need setitimer() support")
@pytest.mark.parametrize("wfs", variants)
def test_eintr(wfs: TYPE_WAIT_FOR, spair: TYPE_SOCKET_PAIR) -> None:
a, b = spair
interrupt_count = [0]
def handler(sig: int, frame: FrameType | None) -> typing.Any:
assert sig == signal.SIGALRM
interrupt_count[0] += 1
old_handler = signal.signal(signal.SIGALRM, handler)
try:
assert not wfs(a, read=True, timeout=0)
start = time.monotonic()
try:
# Start delivering SIGALRM 10 times per second
signal.setitimer(signal.ITIMER_REAL, 0.1, 0.1)
# Sleep for 1 second (we hope!)
wfs(a, read=True, timeout=1)
finally:
# Stop delivering SIGALRM
signal.setitimer(signal.ITIMER_REAL, 0)
end = time.monotonic()
dur = end - start
assert 0.9 < dur < 3
finally:
signal.signal(signal.SIGALRM, old_handler)
assert interrupt_count[0] > 0
@pytest.mark.skipif(not hasattr(signal, "setitimer"), reason="need setitimer() support")
@pytest.mark.parametrize("wfs", variants)
def test_eintr_zero_timeout(wfs: TYPE_WAIT_FOR, spair: TYPE_SOCKET_PAIR) -> None:
a, b = spair
interrupt_count = [0]
def handler(sig: int, frame: FrameType | None) -> typing.Any:
assert sig == signal.SIGALRM
interrupt_count[0] += 1
old_handler = signal.signal(signal.SIGALRM, handler)
try:
assert not wfs(a, read=True, timeout=0)
try:
# Start delivering SIGALRM 1000 times per second,
# to trigger race conditions such as
# https://github.com/urllib3/urllib3/issues/1396.
signal.setitimer(signal.ITIMER_REAL, 0.001, 0.001)
# Hammer the system call for a while to trigger the
# race.
end = time.monotonic() + 5
for i in range(100000):
wfs(a, read=True, timeout=0)
if time.monotonic() >= end:
break
finally:
# Stop delivering SIGALRM
signal.setitimer(signal.ITIMER_REAL, 0)
finally:
signal.signal(signal.SIGALRM, old_handler)
assert interrupt_count[0] > 0
@pytest.mark.skipif(not hasattr(signal, "setitimer"), reason="need setitimer() support")
@pytest.mark.parametrize("wfs", variants)
def test_eintr_infinite_timeout(wfs: TYPE_WAIT_FOR, spair: TYPE_SOCKET_PAIR) -> None:
a, b = spair
interrupt_count = [0]
def handler(sig: int, frame: FrameType | None) -> typing.Any:
assert sig == signal.SIGALRM
interrupt_count[0] += 1
def make_a_readable_after_one_second() -> None:
time.sleep(1)
b.send(b"x")
old_handler = signal.signal(signal.SIGALRM, handler)
try:
assert not wfs(a, read=True, timeout=0)
start = time.monotonic()
try:
# Start delivering SIGALRM 10 times per second
signal.setitimer(signal.ITIMER_REAL, 0.1, 0.1)
# Sleep for 1 second (we hope!)
thread = threading.Thread(target=make_a_readable_after_one_second)
thread.start()
wfs(a, read=True)
finally:
# Stop delivering SIGALRM
signal.setitimer(signal.ITIMER_REAL, 0)
thread.join()
end = time.monotonic()
dur = end - start
assert 0.9 < dur < 3
finally:
signal.signal(signal.SIGALRM, old_handler)
assert interrupt_count[0] > 0