from qtpy import QtCore, QtWidgets
from functools import partial
import time
[docs]
class CallLater:
"""
A **callable** object that will run a specified function when the next Qt event loop iteration
occurs, but only once, irrespective of how many times the ``CallLater`` was called since the
last iteration of the event loop.
This is useful if you are stuck in a long running function and want to schedule something for
when it finishes or when :func:`puzzlepiece.puzzle.Puzzle.process_events` is called.
Let's say a :class:`puzzlepiece.piece.Piece` has an image view defined in
:func:`~puzzlepiece.piece.Piece.custom_layout` and we'd like to update the image when the
Piece's ``image`` :class:`~puzzlepiece.param.ParamArray` changes. We can then do::
def update_image():
image.setImage(self.params['image'].value)
update_later = pzp.threads.CallLater(update_image)
self.params['image'].changed.connect(update_later)
instead of directly connecting ``update_image``. This way the image data will be set only once, when
the next GUI update occurs. So the code is more efficient if somewhere else we do::
for i in range(1000):
self.params['image'].get_value()
# The image view won't update here as the Qt event loop is stuck inside this function
puzzle.process_events()
# update_image will be called once as part of this event loop iteration,
# instead of a 1000 times.
# You can also directly call the CallLater object:
update_later()
This behaviour is implemented through a single shot QTimer stored internally.
:param function: The function to be called.
:param args and kwargs: further arguments and keyword arguments can be provided,
they will be passed to the function when it is called.
"""
def __init__(self, function, *args, **kwargs):
self._timer = QtCore.QTimer()
self._timer.setSingleShot(True)
self._timer.setTimerType(QtCore.Qt.TimerType.PreciseTimer)
self._timer.timeout.connect(partial(function, *args, **kwargs))
def __call__(self, *args, **kwargs):
self._timer.start()
class _Emitter(QtCore.QObject):
# The Emitter is needed as a QRunnable is not a QObject, and cannot emit it's own signals.
# So we set up the Signal here, and let a Worker instance an Emitter for its use
signal = QtCore.Signal(object)
[docs]
class Worker(QtCore.QRunnable):
"""
Generic worker (QRunnable) that calls a function in a thread.
Note that :func:`puzzlepiece.param.BaseParam.get_value` and
:func:`puzzlepiece.param.BaseParam.set_value` are thread-safe by default, so you can use
these within Workers, but be *very* careful about interacting with other UI components from
inside threads - in general you should not do it, as it can cause complete crashes. Generally
your getters and setters should not interact with the GUI either to make them thread-safe-ish -
connect to the :attr:`puzzlepiece.param.BaseParam.changed` Signal for plot updates instead,
especially if you foresee the params being used in Workers.
Even if you do not interact with GUI components from the threads, be careful of how you
schedule things to run - for example trying to set an exposure time on a camera *while*
a frame acquisition is in progress could result in an error or crash.
To run a function in a thread::
worker = puzzlepiece.threads.Worker(long_function, args=(a,), kwargs={"b": "c"})
worker.returned.connect(lambda value: print(value))
puzzle.run_worker(worker)
# or, for short
puzzle.run_worker(puzzlepiece.threads.Worker(long_function))
:param function: The function for the Worker to run.
:param args: list of arguments to be forwarded to the function when run.
:param kwargs: dictionary of keyword arguments to be forwarded to the function when run.
"""
def __init__(self, function, args=None, kwargs=None):
self.function = function
self.args = args if args is not None else []
self.kwargs = kwargs if kwargs is not None else {}
#: Bool flag, True when task finished.
self.done = False
self._emitter = _Emitter()
#: A Qt signal emitted when the function returns, passes the returned value to the connected Slot.
self.returned = self._emitter.signal
super().__init__()
[docs]
@QtCore.Slot()
def run(self):
"""
Run the Worker. Shouldn't be excecuted directly, instead
use :func:`puzzlepiece.puzzle.Puzzle.run_worker`.
"""
try:
r = self.function(*self.args, **self.kwargs)
self.returned.emit(r)
finally:
self.done = True
class _Done_Emitter(QtCore.QObject):
signal = QtCore.Signal()
[docs]
class LiveWorker(Worker):
"""
A Worker that calls a function repeatedly in a thread,
separated by specified time intervals.
:param function: The function for the Worker to run.
:param sleep: Time to sleep between function calls in seconds.
:param args: list of arguments to be forwarded to the function when run.
:param kwargs: dictionary of keyword arguments to be forwarded to the function when run.
"""
def __init__(self, function, sleep=0.1, args=None, kwargs=None):
self.stopping = False
#: Amount of time to sleep between function executions. Can be changed while running.
self.sleep = sleep
super().__init__(function, args, kwargs)
#: A Qt signal emitted each time the function returns, passes the returned value to the connected Slot.
self.returned = self.returned
# The above line is there for documentation to compile correctly
self._done_emitter = _Done_Emitter()
#: A Qt signal emitted when the LiveWorker is stopped.
self.done_signal = self._done_emitter.signal
[docs]
def stop(self):
"""
Ask the Worker to stop. This will only take effect once the current
execution of the function is over.
"""
self.stopping = True
[docs]
@QtCore.Slot()
def run(self):
"""
Run the Worker. Shouldn't be excecuted directly, instead
use :func:`puzzlepiece.puzzle.Puzzle.run_worker`.
"""
try:
while not self.stopping:
r = self.function(*self.args, **self.kwargs)
self.returned.emit(r)
if not self.stopping:
time.sleep(self.sleep)
finally:
self.done = True
self.done_signal.emit()
[docs]
class PuzzleTimer(QtWidgets.QWidget):
"""
A Widget that displays a checkbox. When the checkbox is checked, an associated
function is called repeatedly in a thread, separated by specified time intervals.
This way if the function takes a long time (like a camera exposure) it doesn't
lock up the main GUI thread.
Uses a :class:`~puzzlepiece.threads.LiveWorker` to implement this.
Can be added in :func:`puzzlepiece.piece.Piece.custom_layout` to enable live previews
for cameras etc::
def custom_layout(self):
layout = QtWidgets.QVBoxLayout()
timer = puzzlepiece.threads.PuzzleTimer('Live', self.puzzle, self["image"].get_value, 0.1)
layout.addWidget(self.timer)
# Add a pyqtgraph plot that updates when the `changed` Signal of the "image" param is emitted
# ...
return layout
:param name: The display name for this PuzzleTimer.
:param puzzle: The parent :class:`~puzzlepiece.puzzle.Puzzle` (needed as the Puzzle
manages the QThreadPool that runs the tasks).
:param function: The function for the PuzzleTimer to run.
:param sleep: Time to sleep between function calls in seconds.
:param args: list of arguments to be forwarded to the function when run.
:param kwargs: dictionary of keyword arguments to be forwarded to the function when run.
"""
#: A Qt signal emitted each time the associated LiveWorker returns, passes the returned value to the connected Slot.
returned = QtCore.Signal(object)
def __init__(self, name, puzzle, function, sleep=0.1, args=None, kwargs=None):
self.name = name
self.puzzle = puzzle
self.function = function
self._sleep = sleep
self.args = args
self.kwargs = kwargs
self.worker = None
super().__init__()
layout = QtWidgets.QGridLayout()
layout.setContentsMargins(0, 0, 0, 0)
self.setLayout(layout)
self.input = QtWidgets.QCheckBox(self.name)
layout.addWidget(self.input)
self.input.stateChanged.connect(self._state_handler)
def _state_handler(self, state):
if state and (self.worker is None or self.worker.done):
self.worker = LiveWorker(self.function, self._sleep, self.args, self.kwargs)
self.worker.returned.connect(self._return_handler)
self.worker.done_signal.connect(self.stop)
self.puzzle.run_worker(self.worker)
elif self.worker is not None:
self.worker.stop()
def _return_handler(self, value):
self.returned.emit(value)
[docs]
def stop(self):
"""
Ask the PuzzleTimer to stop.
This will only take effect once the current execution of the function is over,
see :func:`puzzlepiece.threads.LiveWorker.stop`.
"""
self.input.setChecked(False)
@property
def sleep(self):
"""
Interval in seconds between executions of the associated function.
The property can be modified to change the interval for the associated Worker,
even if it's already running.
"""
return self._sleep
@sleep.setter
def sleep(self, value):
if self.worker is not None:
self.worker.sleep = float(value)
self._sleep = float(value)