Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to run a function in main thread for safe workers #42

Draft
wants to merge 16 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ jobs:
run: pip install -e ".[dev]"

- name: Run tests
env:
# is it really necessary?
SUBPROCESS_SAME_ENV_TEST: true
BATCH_JOB_TEST: true
run: python -m pytest -v --cov src

- name: Upload coverage reports to Codecov
Expand Down
5 changes: 3 additions & 2 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
include LICENSE
include tests/resources/dummy
include tests/resources/empty_runner.py
recursive-include tests *.py
recursive-include tests/resources/ *.yaml
include tests/resources/dummy
include LICENSE
163 changes: 111 additions & 52 deletions src/dakara_base/safe_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@


import logging
import platform
import signal
import sys
from functools import wraps
from queue import Empty, Queue
Expand Down Expand Up @@ -429,6 +431,9 @@
initialization assigns its own thread to the instance and makes it target a
dummy method.

If a method `run_main` is defined, it will be called in the main thread. It
takes the stop event end the errors queue as arguments.

Attributes:
stop (threading.Event): Stop event that notify to stop the entire
program when set.
Expand Down Expand Up @@ -494,22 +499,26 @@
"""Runner class.

The runner creates the stop event and errors queue. It is designed to
execute the thread of a `WorkerSafeThread` instance until an error occurs
or an user interruption pops out (Ctrl+C).
execute the thread of a `WorkerSafeThread` instance until the end of the
program. This execution is done by the `run_safe` method, that will use a
blocking function. By default, the blocking function waits for an error to
occur or a user interruption to pop out (Ctrl+C). It can be replaced if a
`run_main` function is passed to `run_safe`, or if the `run_main` method of
the used `WorkerSafeThread` class is defined. The blocking function must
accept a stop event and an error queue, and be blocking as long as the stop
event is not set.

The initialization creates the stop event and the errors queue and calls
the custom init method.

Attributes:
POLLING_INTERVAL (float): For Windows only, interval between two
attempts to wait for the stop event.
stop (threading.Event): Stop event that notify to stop the execution of
the thread.
errors (queue.Queue): Error queue to communicate the exception of the
thread.
"""

POLLING_INTERVAL = 0.5
ERROR_TIMEOUT = 5

def __init__(self, *args, **kwargs):
# create stop event
Expand All @@ -525,65 +534,115 @@
"""Custom initialization stub."""
pass

def run_safe(self, WorkerClass, *args, **kwargs):
def run_safe(self, worker_class, args=None, kwargs=None, run_main=None):
"""Execute a WorkerSafeThread instance thread.

The thread is executed and the method waits for the stop event to be
set or a user interruption to be triggered (Ctrl+C).
set or a user interruption to be triggered (Ctrl+C). An alternative
waiting function can be provided by `run_main`, or by the method
`run_main`. The choice of the function to run is:

1. Provided `run_main` function;
2. Worker class `run_main` method; or
3. Default module `wait` function.

Args:
WorkerClass (WorkerSafeThread): Worker class with safe thread.
worker_class (WorkerSafeThread): Worker class with safe thread.
Note you have to pass a custom class based on
`WorkerSafeThread`.
Other arguments are passed to the thread of WorkerClass.
args (list): Positional arguments passed to the worker class constructor.
kwargs (dict): Named arguments passed to the worker class constructor.
run_main (function): Function to execute in the main thread. It
must accept the stop event and the errors queue, and must be
blocking as long as the stop event is not set.
"""
if args is None:
args = ()

if kwargs is None:
kwargs = {}

# register user interruption
def on_sigint(signal_number, frame):
logger.debug("Receiving signal %i to close", signal_number)
self.errors.put(None)
self.stop.set()

signal.signal(signal.SIGINT, on_sigint)

# create worker thread
with worker_class(self.stop, self.errors, *args, **kwargs) as worker:
logger.debug("Create worker thread")
worker.thread.start()

# select blocking function
if run_main is not None:
# if a function is provided, run it
# it must ruturn when the stop event is set
block = run_main

elif hasattr(worker, "run_main"):
# if the worker has a function to run, run it
# it must ruturn when the stop event is set
block = getattr(worker, "run_main")

else:
block = wait

# wait
block(self.stop, self.errors)

# get the error from the error queue
# a delay of 5 seconds is accorded for the error to be retrieved
try:
# create worker thread
with WorkerClass(self.stop, self.errors, *args, **kwargs) as worker:

logger.debug("Create worker thread")
worker.thread.start()

# wait for stop event
logger.debug("Waiting for stop event")

# We have to use a different code for Windows because the
# Ctrl+C event will not be handled during `self.stop.wait()`.
# This method is blocking for Windows, not for Linux, which is
# due to the way Ctrl+C is differently handled by the two OSs.
# For Windows, a quick and dirty solution consists in polling
# the `self.stop.wait()` with a timeout argument, so the call
# is non-permanently blocking.
# More resources on this:
# https://mail.python.org/pipermail/python-dev/2017-August/148800.html
# https://stackoverflow.com/a/51954792/4584444
if sys.platform.startswith("win"):
while not self.stop.is_set():
self.stop.wait(self.POLLING_INTERVAL)

else:
self.stop.wait()

# stop on Ctrl+C
except KeyboardInterrupt:
reason = self.errors.get(timeout=self.ERROR_TIMEOUT)

# if there is no error in the error queue, raise a general error
# this case is very unlikely to happen and is not tested
except Empty as empty_error:
raise EmptyErrorsQueueError("Unknown error happened") from empty_error

# if there is no error, this is a normal interruption
if reason is None:
logger.debug("User stop caught")
self.stop.set()
return

_, error, traceback = reason
logger.debug("Internal error caught")
error.with_traceback(traceback)
raise error


def wait(stop, errors, interval=0.5):
"""Wait for stop event to be set.

We have to use a specific code for Windows because the Ctrl+C event will
not be handled during `self.stop.wait()`. This method is blocking for
Windows, not for Linux, which is due to the way Ctrl+C is differently
handled by the two OSs. For Windows, a quick and dirty solution consists
in polling the `self.stop.wait()` with a timeout argument, so the call is
non-permanently blocking.

See also:
https://mail.python.org/pipermail/python-dev/2017-August/148800.html
https://stackoverflow.com/a/51954792/4584444

Args:
stop (threading.Event): Stop event.
errors (queue.Queue): Errors queue.
interval (float): For Windows only, interval between two attempts to
wait for the stop event.
"""
logger.debug("Waiting for stop event")

# stop on error
else:
logger.debug("Internal error caught")
if platform.system() == "Windows":
while not stop.is_set():
stop.wait(interval)

Check warning on line 640 in src/dakara_base/safe_workers.py

View check run for this annotation

Codecov / codecov/patch

src/dakara_base/safe_workers.py#L639-L640

Added lines #L639 - L640 were not covered by tests

# get the error from the error queue and re-raise it
# a delay of 5 seconds is accorded for the error to be retrieved
try:
_, error, traceback = self.errors.get(5)
error.with_traceback(traceback)
raise error
return

Check warning on line 642 in src/dakara_base/safe_workers.py

View check run for this annotation

Codecov / codecov/patch

src/dakara_base/safe_workers.py#L642

Added line #L642 was not covered by tests

# if there is no error in the error queue, raise a general error
# this case is very unlikely to happen and is not tested
except Empty as empty_error:
raise NoErrorCaughtError("Unknown error happened") from empty_error
# otherwise just wait for the program to stop
stop.wait()


class UnredefinedTimerError(DakaraError):
Expand All @@ -602,7 +661,7 @@
"""


class NoErrorCaughtError(RuntimeError):
class EmptyErrorsQueueError(RuntimeError):
"""No error caught error.

Error raised if the safe workers mechanism stops for an error, but there is
Expand Down
Empty file added tests/integration/__init__.py
Empty file.
79 changes: 79 additions & 0 deletions tests/integration/test_safe_workers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import os
import platform
import signal
import subprocess
from importlib.resources import path
from time import sleep
from unittest import TestCase, skipIf, skipUnless

from path import Path, TempDir

import dakara_base # noqa F401


class RunnerIntegrationTestCase(TestCase):
IS_SUBPROCESS_SAME_ENV = (
platform.system() != "Windows" or "SUBPROCESS_SAME_ENV_TEST" in os.environ
)
IS_BATCH_JOB = platform.system() == "Windows" and "BATCH_JOB_TEST" in os.environ

@staticmethod
def wait_output(process, line, interval=0.1):
"""Wait a process to output a line.

Beware this method consumes the lines from the process, so they will
not be included in `process.communicate()`.

Args:
process (subprocess.Popen): Process to evaluate.
line (str): Line of text to obtain to process output.
interval (float): Interval in seconds between two evaluations.

Returns:
list of str: Lines outputed by the process before the expected one
showed up.
"""
lines = []
while process.poll() is None:
sleep(interval)
out = process.stdout.readline()
if not out:
continue

lines.append(out.strip())

if line in lines:
return lines

@skipUnless(
IS_SUBPROCESS_SAME_ENV,
"Can only be tested if subprocess environment is same as current environment",
)
@skipIf(IS_BATCH_JOB, "Can only be tested if script is not launched in batch job")
def test_run_safe_signal(self):
"""Test to send an interruption signal to a runner."""
with TempDir() as tempdir:
with path("tests.resources", "empty_runner.py") as resource_path:
file_path = Path(resource_path).copy(tempdir)

process = subprocess.Popen(
["python", "-u", str(file_path)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
start_lines = self.wait_output(process, "starting worker")

system = platform.system()
if system == "Windows":
process.send_signal(signal.CTRL_C_EVENT)

else:
process.send_signal(signal.SIGINT)

out, _ = process.communicate()
end_lines = out.splitlines()
self.assertListEqual(start_lines, ["starting runner", "starting worker"])
self.assertListEqual(end_lines, ["ending worker", "ending runner"])

self.assertEqual(process.returncode, 0)
23 changes: 23 additions & 0 deletions tests/resources/empty_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from dakara_base.safe_workers import Runner, WorkerSafeThread


class MyWorker(WorkerSafeThread):
def init_worker(self):
self.thread = self.create_thread(target=self.run_thread)

def run_thread(self):
print("starting worker")
self.stop.wait()
print("ending worker")


class MyRunner(Runner):
def run(self):
print("starting runner")
self.run_safe(MyWorker)
print("ending runner")


if __name__ == "__main__":
runner = MyRunner()
runner.run()
Empty file added tests/unit/__init__.py
Empty file.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Loading
Loading