from qtpy import QtCore, QtWidgets
from functools import partial
import time
[docs]
class CallLater:
"""
A **callable** object that will run a specified function when the next Qt event loop iteration
occurs, but only once, irrespective of how many times the ``CallLater`` was called since the
last iteration of the event loop.
This is useful if you are stuck in a long running function and want to schedule something for
when it finishes or when :func:`puzzlepiece.puzzle.Puzzle.process_events` is called.
Let's say a :class:`puzzlepiece.piece.Piece` has an image view defined in
:func:`~puzzlepiece.piece.Piece.custom_layout` and we'd like to update the image when the
Piece's ``image`` :class:`~puzzlepiece.param.ParamArray` changes. We can then do::
def update_image():
image.setImage(self.params['image'].value)
update_later = pzp.threads.CallLater(update_image)
self.params['image'].changed.connect(update_later)
instead of directly connecting ``update_image``. This way the image data will be set only once, when
the next GUI update occurs. So the code is more efficient if somewhere else we do::
for i in range(1000):
self.params['image'].get_value()
# The image view won't update here as the Qt event loop is stuck inside this function
puzzle.process_events()
# update_image will be called once as part of this event loop iteration,
# instead of a 1000 times.
# You can also directly call the CallLater object:
update_later()
This behaviour is implemented through a single shot QTimer stored internally.
:param function: The function to be called.
:param args and kwargs: further arguments and keyword arguments can be provided,
they will be passed to the function when it is called.
"""
def __init__(self, function, *args, **kwargs):
self._timer = QtCore.QTimer()
self._timer.setSingleShot(True)
self._timer.setTimerType(QtCore.Qt.TimerType.PreciseTimer)
self._timer.timeout.connect(partial(function, *args, **kwargs))
def __call__(self, *args, **kwargs):
self._timer.start()
class _Emitter(QtCore.QObject):
# The Emitter is needed as a QRunnable is not a QObject, and cannot emit it's own signals.
# So we set up the Signal here, and let a Worker instance an Emitter for its use
signal = QtCore.Signal(object)
[docs]
class Worker(QtCore.QRunnable):
"""
Generic worker (QRunnable) that calls a function in a thread.
:param function: The function for the Worker to run.
:param args: list of arguments to be forwarded to the function when run.
:param kwargs: dictionary of keyword arguments to be forwarded to the function when run.
"""
def __init__(self, function, args=None, kwargs=None):
self.function = function
self.args = args if args is not None else []
self.kwargs = kwargs if kwargs is not None else {}
#: Bool flag, True when task finished.
self.done = False
self._emitter = _Emitter()
#: A Qt signal emitted when the function returns, passes the returned value to the connected Slot.
self.returned = self._emitter.signal
super().__init__()
[docs]
@QtCore.Slot()
def run(self):
"""
Run the Worker. Shouldn't be excecuted directly, instead
use :func:`puzzlepiece.puzzle.Puzzle.run_worker`.
"""
try:
r = self.function(*self.args, **self.kwargs)
self.returned.emit(r)
finally:
self.done = True
[docs]
class LiveWorker(Worker):
"""
A Worker that calls a function repeatedly in a thread,
separated by specified time intervals.
:param function: The function for the Worker to run.
:param sleep: Time to sleep between function calls in seconds.
:param args: list of arguments to be forwarded to the function when run.
:param kwargs: dictionary of keyword arguments to be forwarded to the function when run.
"""
def __init__(self, function, sleep=0.1, args=None, kwargs=None):
self.stopping = False
self.sleep = sleep
super().__init__(function, args, kwargs)
#: A Qt signal emitted each time the function returns, passes the returned value to the connected Slot.
self.returned = self.returned
[docs]
def stop(self):
"""
Ask the Worker to stop. This will only take effect once the current
execution of the function is over.
"""
self.stopping = True
[docs]
@QtCore.Slot()
def run(self):
"""
Run the Worker. Shouldn't be excecuted directly, instead
use :func:`puzzlepiece.puzzle.Puzzle.run_worker`.
"""
try:
while not self.stopping:
r = self.function(*self.args, **self.kwargs)
self.returned.emit(r)
if not self.stopping:
time.sleep(self.sleep)
finally:
self.done = True
[docs]
class PuzzleTimer(QtWidgets.QWidget):
"""
A Widget that displays a checkbox. When the checkbox is checked, an associated
function is called repeatedly in a thread, separated by specified time intervals.
:param name: The display name for this PuzzleTimer.
:param puzzle: The parent :class:`~puzzlepiece.puzzle.Puzzle` (needed as the Puzzle
is charged with the QThreadPool that runs the tasks).
:param function: The function for the PuzzleTimer to run.
:param sleep: Time to sleep between function calls in seconds.
:param args: list of arguments to be forwarded to the function when run.
:param kwargs: dictionary of keyword arguments to be forwarded to the function when run.
"""
#: A Qt signal emitted each time the associated LiveWorker returns, passes the returned value to the connected Slot.
returned = QtCore.Signal(object)
def __init__(self, name, puzzle, function, sleep=0.1, args=None, kwargs=None):
self.name = name
self.puzzle = puzzle
self.function = function
self._sleep = sleep
self.args = args
self.kwargs = kwargs
self.worker = None
super().__init__()
layout = QtWidgets.QGridLayout()
layout.setContentsMargins(0, 0, 0, 0)
self.setLayout(layout)
self.input = QtWidgets.QCheckBox(self.name)
layout.addWidget(self.input)
self.input.stateChanged.connect(self._state_handler)
def _state_handler(self, state):
if state and (self.worker is None or self.worker.done):
self.worker = LiveWorker(self.function, self._sleep, self.args, self.kwargs)
self.worker.returned.connect(self._return_handler)
self.puzzle.run_worker(self.worker)
elif self.worker is not None:
self.worker.stop()
def _return_handler(self, value):
self.returned.emit(value)
[docs]
def stop(self):
"""
Ask the PuzzleTimer to stop.
This will only take effect once the current execution of the function is over,
see :func:`puzzlepiece.threads.LiveWorker.stop`.
"""
self.input.setChecked(False)
@property
def sleep(self):
"""
Interval in seconds between executions of the associated function.
The property can be modified to change the interval for the associated Worker,
even if it's already running.
"""
return self._sleep
@sleep.setter
def sleep(self, value):
if self.worker is not None:
self.worker.sleep = float(value)
self._sleep = float(value)