ThreadPool
Qt Base Class: QThreadPool
Signature: QThreadPool(self, parent: Optional[PySide6.QtCore.QObject] = None) -> None
Base classes
Name |
Children |
Inherits |
ObjectMixin prettyqt.core.object
|
|
|
QThreadPool PySide6.QtCore QThreadPool(self, parent: Optional[PySide6.QtCore.QObject] \= None) -> None |
|
|
⋔ Inheritance diagram
graph TD
1473299850160["core.ThreadPool"]
1473299815024["core.ObjectMixin"]
140713234304496["builtins.object"]
1473288854928["QtCore.QThreadPool"]
1473288842240["QtCore.QObject"]
1473291690208["Shiboken.Object"]
1473299815024 --> 1473299850160
140713234304496 --> 1473299815024
1473288854928 --> 1473299850160
1473288842240 --> 1473288854928
1473291690208 --> 1473288842240
140713234304496 --> 1473291690208
🛈 DocStrings
Bases: ObjectMixin
, QThreadPool
Manages a collection of QThreads.
Note: signals only work correctly when exclusively using start_worker method.
Source code in prettyqt\core\threadpool.py
| class ThreadPool(core.ObjectMixin, core.QThreadPool):
"""Manages a collection of QThreads.
Note: signals only work correctly when exclusively using start_worker method.
"""
__instance: Self | None = None # a global instance
job_num_updated = core.Signal(int)
error_occured = core.Signal(Exception)
busy_state_changed = core.Signal(bool)
def __contains__(self, other: core.QThread):
return self.contains(other)
def get_thread_priority(self) -> core.thread.PriorityStr:
return core.thread.PRIORITY.inverse[self.threadPriority()]
def set_thread_priority(self, priority: core.thread.PriorityStr):
prio = core.thread.PRIORITY[priority]
self.setThreadPriority(prio)
@classmethod
def instance(cls) -> Self:
"""Return global ThreadPool singleton. (globalInstance always returns Qt type)."""
if cls.__instance is None:
cls.__instance = cls()
return cls.__instance
def start_worker(
self,
fn_or_worker: Callable | Worker,
args: tuple | None = None,
kwargs: dict | None = None,
priority: int = 0,
result_fn: Callable | None = None,
finished_fn: Callable | None = None,
progress_fn: Callable | None = None,
error_fn: Callable | None = None,
):
if isinstance(fn_or_worker, Callable):
if args is None:
args = ()
if kwargs is None:
kwargs = {}
runnable = Worker(fn_or_worker, *args, **kwargs)
else:
runnable = fn_or_worker
runnable.signals.finished.connect(self._on_job_ended)
runnable.signals.error.connect(self._on_job_ended)
runnable.signals.error.connect(self._on_exception)
if result_fn:
runnable.signals.result.connect(result_fn)
if finished_fn:
runnable.signals.finished.connect(finished_fn)
if progress_fn:
runnable.signals.progress.connect(progress_fn)
if error_fn:
runnable.signals.error.connect(error_fn)
thread_count = self.activeThreadCount()
if thread_count == 0:
self.busy_state_changed.emit(True)
self.job_num_updated.emit(thread_count + 1) # + 1 because we didnt start yet.
super().start(runnable, priority)
def _on_job_ended(self):
thread_count = self.activeThreadCount()
if thread_count == 1: # this is the last job
self.busy_state_changed.emit(False)
self.job_num_updated.emit(thread_count - 1) # -1 because we didnt really end yet.
def _on_exception(self, exception):
self.error_occured.emit(exception)
|
instance() -> Self
classmethod
Return global ThreadPool singleton. (globalInstance always returns Qt type).
Source code in prettyqt\core\threadpool.py
| @classmethod
def instance(cls) -> Self:
"""Return global ThreadPool singleton. (globalInstance always returns Qt type)."""
if cls.__instance is None:
cls.__instance = cls()
return cls.__instance
|
⌗ Property table
Qt Property |
Type |
Doc |
objectName |
QString |
|
expiryTimeout |
int |
|
maxThreadCount |
int |
|
activeThreadCount |
int |
|
stackSize |
uint |
|
threadPriority |
QThread::Priority |
|