Skip to content

bpo-31234: Add support.join_thread() helper #3587

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import weakref
import test.support
import test.support.script_helper
from test import support


# Skip tests if _multiprocessing wasn't built.
Expand Down Expand Up @@ -72,6 +73,12 @@ def close_queue(queue):
queue.join_thread()


def join_process(process, timeout):
# Since multiprocessing.Process has the same API than threading.Thread
# (join() and is_alive(), the support function can be reused
support.join_thread(process, timeout)


#
# Constants
#
Expand Down Expand Up @@ -477,7 +484,7 @@ def test_many_processes(self):
for p in procs:
p.start()
for p in procs:
p.join(timeout=10)
join_process(p, timeout=10)
for p in procs:
self.assertEqual(p.exitcode, 0)

Expand All @@ -489,7 +496,7 @@ def test_many_processes(self):
for p in procs:
p.terminate()
for p in procs:
p.join(timeout=10)
join_process(p, timeout=10)
if os.name != 'nt':
for p in procs:
self.assertEqual(p.exitcode, -signal.SIGTERM)
Expand Down Expand Up @@ -652,7 +659,7 @@ def test_sys_exit(self):
p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
p.daemon = True
p.start()
p.join(5)
join_process(p, timeout=5)
self.assertEqual(p.exitcode, 1)

with open(testfn, 'r') as f:
Expand All @@ -665,7 +672,7 @@ def test_sys_exit(self):
p = self.Process(target=sys.exit, args=(reason,))
p.daemon = True
p.start()
p.join(5)
join_process(p, timeout=5)
self.assertEqual(p.exitcode, reason)

#
Expand Down Expand Up @@ -1254,8 +1261,7 @@ def test_waitfor(self):
state.value += 1
cond.notify()

p.join(5)
self.assertFalse(p.is_alive())
join_process(p, timeout=5)
self.assertEqual(p.exitcode, 0)

@classmethod
Expand Down Expand Up @@ -1291,7 +1297,7 @@ def test_waitfor_timeout(self):
state.value += 1
cond.notify()

p.join(5)
join_process(p, timeout=5)
self.assertTrue(success.value)

@classmethod
Expand Down Expand Up @@ -4005,7 +4011,7 @@ def test_timeout(self):
self.assertEqual(conn.recv(), 456)
conn.close()
l.close()
p.join(10)
join_process(p, timeout=10)
finally:
socket.setdefaulttimeout(old_timeout)

Expand Down Expand Up @@ -4041,7 +4047,7 @@ def child(cls, n, conn):
p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
p.start()
conn.close()
p.join(timeout=5)
join_process(p, timeout=5)
else:
conn.send(len(util._afterfork_registry))
conn.close()
Expand All @@ -4054,7 +4060,7 @@ def test_lock(self):
p.start()
w.close()
new_size = r.recv()
p.join(timeout=5)
join_process(p, timeout=5)
self.assertLessEqual(new_size, old_size)

#
Expand Down Expand Up @@ -4109,7 +4115,7 @@ def test_closefd(self):
p.start()
writer.close()
e = reader.recv()
p.join(timeout=5)
join_process(p, timeout=5)
finally:
self.close(fd)
writer.close()
Expand Down
10 changes: 10 additions & 0 deletions Lib/test/support/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2107,6 +2107,16 @@ def wait_threads_exit(timeout=60.0):
gc_collect()


def join_thread(thread, timeout=30.0):
"""Join a thread. Raise an AssertionError if the thread is still alive
after timeout seconds.
"""
thread.join(timeout)
if thread.is_alive():
msg = f"failed to join the thread in {timeout:.1f} seconds"
raise AssertionError(msg)


def reap_children():
"""Use this function at the end of test_main() whenever sub-processes
are started. This will help ensure that no extra children (zombies)
Expand Down
30 changes: 8 additions & 22 deletions Lib/test/test_asynchat.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,7 @@ def line_terminator_check(self, term, server_chunk):
c.push(b"I'm not dead yet!" + term)
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join(timeout=TIMEOUT)
if s.is_alive():
self.fail("join() timed out")
support.join_thread(s, timeout=TIMEOUT)

self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])

Expand Down Expand Up @@ -156,9 +154,7 @@ def numeric_terminator_check(self, termlen):
c.push(data)
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join(timeout=TIMEOUT)
if s.is_alive():
self.fail("join() timed out")
support.join_thread(s, timeout=TIMEOUT)

self.assertEqual(c.contents, [data[:termlen]])

Expand All @@ -178,9 +174,7 @@ def test_none_terminator(self):
c.push(data)
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join(timeout=TIMEOUT)
if s.is_alive():
self.fail("join() timed out")
support.join_thread(s, timeout=TIMEOUT)

self.assertEqual(c.contents, [])
self.assertEqual(c.buffer, data)
Expand All @@ -192,9 +186,7 @@ def test_simple_producer(self):
p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
c.push_with_producer(p)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join(timeout=TIMEOUT)
if s.is_alive():
self.fail("join() timed out")
support.join_thread(s, timeout=TIMEOUT)

self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])

Expand All @@ -204,9 +196,7 @@ def test_string_producer(self):
data = b"hello world\nI'm not dead yet!\n"
c.push_with_producer(data+SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join(timeout=TIMEOUT)
if s.is_alive():
self.fail("join() timed out")
support.join_thread(s, timeout=TIMEOUT)

self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])

Expand All @@ -217,9 +207,7 @@ def test_empty_line(self):
c.push(b"hello world\n\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join(timeout=TIMEOUT)
if s.is_alive():
self.fail("join() timed out")
support.join_thread(s, timeout=TIMEOUT)

self.assertEqual(c.contents,
[b"hello world", b"", b"I'm not dead yet!"])
Expand All @@ -238,9 +226,7 @@ def test_close_when_done(self):
# where the server echoes all of its data before we can check that it
# got any down below.
s.start_resend_event.set()
s.join(timeout=TIMEOUT)
if s.is_alive():
self.fail("join() timed out")
support.join_thread(s, timeout=TIMEOUT)

self.assertEqual(c.contents, [])
# the server might have been able to send a byte or two back, but this
Expand All @@ -261,7 +247,7 @@ def test_push(self):
self.assertRaises(TypeError, c.push, 'unicode')
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join(timeout=TIMEOUT)
support.join_thread(s, timeout=TIMEOUT)
self.assertEqual(c.contents, [b'bytes', b'bytes', b'bytes'])


Expand Down
2 changes: 1 addition & 1 deletion Lib/test/test_asyncio/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ def client():
proto.transport.close()
lsock.close()

thread.join(1)
support.join_thread(thread, timeout=1)
self.assertFalse(thread.is_alive())
self.assertEqual(proto.state, 'CLOSED')
self.assertEqual(proto.nbytes, len(message))
Expand Down
8 changes: 2 additions & 6 deletions Lib/test/test_asyncore.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,7 @@ def test_send(self):

self.assertEqual(cap.getvalue(), data*2)
finally:
t.join(timeout=TIMEOUT)
if t.is_alive():
self.fail("join() timed out")
support.join_thread(t, timeout=TIMEOUT)


@unittest.skipUnless(hasattr(asyncore, 'file_wrapper'),
Expand Down Expand Up @@ -794,9 +792,7 @@ def test_quick_connect(self):
except OSError:
pass
finally:
t.join(timeout=TIMEOUT)
if t.is_alive():
self.fail("join() timed out")
support.join_thread(t, timeout=TIMEOUT)

class TestAPI_UseIPv4Sockets(BaseTestAPI):
family = socket.AF_INET
Expand Down
4 changes: 3 additions & 1 deletion Lib/test/test_imaplib.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,9 @@ def _cleanup(self):
# cleanup the server
self.server.shutdown()
self.server.server_close()
self.thread.join(3.0)
support.join_thread(self.thread, 3.0)
# Explicitly clear the attribute to prevent dangling thread
self.thread = None

def test_EOF_without_complete_welcome_message(self):
# http://bugs.python.org/issue5949
Expand Down
16 changes: 4 additions & 12 deletions Lib/test/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -791,13 +791,10 @@ def stop(self, timeout=None):
to terminate.
"""
self.close()
self._thread.join(timeout)
support.join_thread(self._thread, timeout)
self._thread = None
asyncore.close_all(map=self._map, ignore_all=True)

alive = self._thread.is_alive()
self._thread = None
if alive:
self.fail("join() timed out")

class ControlMixin(object):
"""
Expand Down Expand Up @@ -847,11 +844,8 @@ def stop(self, timeout=None):
"""
self.shutdown()
if self._thread is not None:
self._thread.join(timeout)
alive = self._thread.is_alive()
support.join_thread(self._thread, timeout)
self._thread = None
if alive:
self.fail("join() timed out")
self.server_close()
self.ready.clear()

Expand Down Expand Up @@ -2892,9 +2886,7 @@ def setup_via_listener(self, text, verify=None):
finally:
t.ready.wait(2.0)
logging.config.stopListening()
t.join(2.0)
if t.is_alive():
self.fail("join() timed out")
support.join_thread(t, 2.0)

def test_listen_config_10_ok(self):
with support.captured_stdout() as output:
Expand Down
10 changes: 2 additions & 8 deletions Lib/test/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,7 @@ def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
block_func)
return self.result
finally:
thread.join(10) # make sure the thread terminates
if thread.is_alive():
self.fail("trigger function '%r' appeared to not return" %
trigger_func)
support.join_thread(thread, 10) # make sure the thread terminates

# Call this instead if block_func is supposed to raise an exception.
def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
Expand All @@ -77,10 +74,7 @@ def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
self.fail("expected exception of kind %r" %
expected_exception_class)
finally:
thread.join(10) # make sure the thread terminates
if thread.is_alive():
self.fail("trigger function '%r' appeared to not return" %
trigger_func)
support.join_thread(thread, 10) # make sure the thread terminates
if not thread.startedEvent.is_set():
self.fail("trigger thread ended but event never set")

Expand Down
7 changes: 3 additions & 4 deletions Lib/test/test_sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import threading
import time
import unittest
from test import support


TIMEOUT = 10
Expand Down Expand Up @@ -81,8 +82,7 @@ def test_enter_concurrent(self):
self.assertEqual(q.get(timeout=TIMEOUT), 5)
self.assertTrue(q.empty())
timer.advance(1000)
t.join(timeout=TIMEOUT)
self.assertFalse(t.is_alive())
support.join_thread(t, timeout=TIMEOUT)
self.assertTrue(q.empty())
self.assertEqual(timer.time(), 5)

Expand Down Expand Up @@ -137,8 +137,7 @@ def test_cancel_concurrent(self):
self.assertEqual(q.get(timeout=TIMEOUT), 4)
self.assertTrue(q.empty())
timer.advance(1000)
t.join(timeout=TIMEOUT)
self.assertFalse(t.is_alive())
support.join_thread(t, timeout=TIMEOUT)
self.assertTrue(q.empty())
self.assertEqual(timer.time(), 4)

Expand Down