--- name: pyqt-threading description: "PyQt/PySide6 threading and concurrency - QThread, QThreadPool, QTimer, thread safety, concurrent patterns" metadata: author: mte90 version: 1.0.0 tags: - python - qt - pyqt - pyside - threading - concurrency - async - qthread --- # PyQt Threading - Concurrency and Thread Safety Comprehensive guide to threading in PyQt applications. ## Thread Safety Rules **CRITICAL**: Qt/PyQt is NOT thread-safe for UI operations. You MUST follow these rules: 1. **Never access widgets from worker threads** - Only the main thread can modify UI 2. **Use signals for cross-thread communication** - Emit signals from worker, connect to slots in main thread 3. **Use Qt.QueuedConnection for thread-safe signal delivery** - AutoConnection handles this automatically 4. **Never block the main thread** - Long operations will freeze the UI ```python # ❌ WRONG: Direct UI access from thread class BadWorker(QThread): def run(self): # This will crash or cause undefined behavior! self.label.setText("Done") # ✅ CORRECT: Use signals class GoodWorker(QThread): finished = Signal(str) def run(self): result = self.process_data() self.finished.emit(result) # Signal emitted, UI updated in main thread ``` ## QThread with Worker Object (Recommended Pattern) The most flexible pattern separates the worker logic from thread lifecycle: ```python from PySide6.QtCore import QThread, Signal, QObject, Slot class Worker(QObject): """Worker object that does the actual work.""" finished = Signal(object) progress = Signal(int) error = Signal(str) def __init__(self, data): super().__init__() self.data = data self._is_cancelled = False @Slot() def process(self): """Main processing method called from thread.""" try: for i, item in enumerate(self.data): if self._is_cancelled: return # Simulate heavy work result = self.process_item(item) self.progress.emit(int((i + 1) / len(self.data) * 100)) self.finished.emit({"status": "success", "count": len(self.data)}) except Exception as e: self.error.emit(str(e)) def cancel(self): self._is_cancelled = True def process_item(self, item): import time time.sleep(0.1) # Simulate work return item * 2 class ThreadController(QObject): """Manages worker thread lifecycle.""" def __init__(self): super().__init__() self.thread = None self.worker = None def start_work(self, data): # Create thread and worker self.thread = QThread() self.worker = Worker(data) # Move worker to thread self.worker.moveToThread(self.thread) # Connect signals self.worker.finished.connect(self.on_finished) self.worker.progress.connect(self.on_progress) self.worker.error.connect(self.on_error) # Thread lifecycle self.thread.started.connect(self.worker.process) self.thread.finished.connect(self.thread.deleteLater) # Start thread self.thread.start() def cancel_work(self): if self.worker: self.worker.cancel() if self.thread: self.thread.quit() self.thread.wait() @Slot() def on_finished(self, result): print(f"Work completed: {result}") self.cleanup() @Slot() def on_progress(self, percent): print(f"Progress: {percent}%") @Slot() def on_error(self, error): print(f"Error: {error}") self.cleanup() def cleanup(self): self.thread = None self.worker = None ``` ## QThread Subclass (Simpler Pattern) For simpler cases, subclass QThread directly: ```python from PySide6.QtCore import QThread, Signal class DataProcessor(QThread): """Thread that processes data and emits progress.""" progress = Signal(int) result_ready = Signal(list) error_occurred = Signal(str) finished = Signal() def __init__(self, input_data, parent=None): super().__init__(parent) self.input_data = input_data self._cancelled = False def run(self): """Thread entry point - called by start().""" try: results = [] total = len(self.input_data) for i, item in enumerate(self.input_data): if self._cancelled: self.error_occurred.emit("Cancelled") return # Process item (heavy work here) processed = self.process_item(item) results.append(processed) # Emit progress progress_percent = int((i + 1) / total * 100) self.progress.emit(progress_percent) self.result_ready.emit(results) except Exception as e: self.error_occurred.emit(str(e)) finally: self.finished.emit() def process_item(self, item): import time time.sleep(0.05) # Simulate work return str(item).upper() def cancel(self): self._cancelled = True # Usage class MainWindow(QMainWindow): def __init__(self): super().__init__() self.processor = None self.progress = QProgressBar() self.start_btn = QPushButton("Start") self.cancel_btn = QPushButton("Cancel") self.start_btn.clicked.connect(self.start_processing) self.cancel_btn.clicked.connect(self.cancel_processing) def start_processing(self): data = ["item1", "item2", "item3", "item4", "item5"] self.processor = DataProcessor(data) self.processor.progress.connect(self.progress.setValue) self.processor.result_ready.connect(self.on_results) self.processor.error_occurred.connect(self.on_error) self.processor.finished.connect(self.on_finished) self.processor.start() self.start_btn.setEnabled(False) def cancel_processing(self): if self.processor: self.processor.cancel() def on_results(self, results): print(f"Got {len(results)} results") def on_error(self, error): QMessageBox.warning(self, "Error", error) def on_finished(self): self.start_btn.setEnabled(True) self.progress.setValue(0) self.processor = None ``` ## QThreadPool with QRunnable For parallel execution of independent tasks: ```python from PySide6.QtCore import QThreadPool, QRunnable, Signal, QObject, QThread import time class TaskSignals(QObject): """Signals for QRunnable (QRunnable cannot have signals directly).""" finished = Signal(object) error = Signal(str) progress = Signal(int) class ParallelTask(QRunnable): """Runnable task for thread pool.""" def __init__(self, task_id, data): super().__init__() self.task_id = task_id self.data = data self.signals = TaskSignals() self._cancelled = False def run(self): """Executed by thread pool.""" try: time.sleep(0.5) # Simulate work if self._cancelled: return result = { "id": self.task_id, "processed": str(self.data).upper(), "thread": int(QThread.currentThreadId()) } self.signals.finished.emit(result) except Exception as e: self.signals.error.emit(str(e)) def cancel(self): self._cancelled = True class ThreadPoolManager(QObject): """Manages parallel task execution.""" all_finished = Signal(int) def __init__(self, max_threads=4): super().__init__() self.pool = QThreadPool() self.pool.setMaxThreadCount(max_threads) self.active_tasks = {} self.completed_count = 0 self.total_tasks = 0 def run_parallel(self, tasks): """Run multiple tasks in parallel.""" self.completed_count = 0 self.total_tasks = len(tasks) self.active_tasks.clear() for task_id, data in enumerate(tasks): task = ParallelTask(task_id, data) task.signals.finished.connect( lambda result, tid=task_id: self.on_task_finished(result) ) task.signals.error.connect(self.on_task_error) self.active_tasks[task_id] = task self.pool.start(task) def on_task_finished(self, result): self.completed_count += 1 task_id = result["id"] del self.active_tasks[task_id] if self.completed_count >= self.total_tasks: self.all_finished.emit(self.completed_count) def on_task_error(self, error): print(f"Task error: {error}") def cancel_all(self): for task in self.active_tasks.values(): task.cancel() self.active_tasks.clear() ``` ## QTimer for Periodic Updates ```python from PySide6.QtCore import QTimer, Slot class PollingWidget(QWidget): def __init__(self): super().__init__() # Create timer self.timer = QTimer(self) self.timer.timeout.connect(self.on_timeout) # UI self.status_label = QLabel("Last update: Never") self.poll_btn = QPushButton("Start Polling") self.poll_btn.setCheckable(True) layout = QVBoxLayout(self) layout.addWidget(self.status_label) layout.addWidget(self.poll_btn) self.poll_btn.toggled.connect(self.toggle_polling) @Slot() def toggle_polling(self, checked): if checked: self.timer.start(1000) # Poll every second self.poll_btn.setText("Stop Polling") else: self.timer.stop() self.poll_btn.setText("Start Polling") @Slot() def on_timeout(self): from datetime import datetime self.status_label.setText(f"Last update: {datetime.now().strftime('%H:%M:%S')}") ``` ## Thread-Safe Data Sharing ```python from PySide6.QtCore import QMutex, QMutexLocker, QReadWriteLock class SharedData: """Thread-safe data container.""" def __init__(self): self._data = {} self._mutex = QMutex() def set_value(self, key, value): """Thread-safe write.""" locker = QMutexLocker(self._mutex) self._data[key] = value def get_value(self, key, default=None): """Thread-safe read.""" locker = QMutexLocker(self._mutex) return self._data.get(key, default) def get_all(self): """Thread-safe copy of all data.""" locker = QMutexLocker(self._mutex) return dict(self._data) class ReadWriteData: """Read-write lock for read-heavy workloads.""" def __init__(self): self._data = {} self._lock = QReadWriteLock() def read_value(self, key): """Multiple readers can hold the lock.""" self._lock.lockForRead() try: return self._data.get(key) finally: self._lock.unlock() def write_value(self, key, value): """Only one writer at a time.""" self._lock.lockForWrite() try: self._data[key] = value finally: self._lock.unlock() ``` ## Best Practices 1. **Always use signals for cross-thread communication** 2. **Keep worker objects thread-affinity aware** - Don't assume they're in main thread 3. **Clean up threads properly** - Use deleteLater() and quit() + wait() 4. **Handle cancellation** - Check flags periodically in long operations 5. **Use QThreadPool for parallel independent tasks** 6. **Use QThread.moveToThread() for single long operations** 7. **Never use time.sleep() in main thread** - Use timers or workers instead ## Common Issues | Issue | Cause | Solution | |-------|-------|----------| | UI freezes | Blocking operation in main thread | Move to worker thread | | Crashes on widget access | Accessing UI from worker thread | Use signals instead | | Memory leaks | Thread not cleaned up | Use deleteLater() and proper lifecycle | | Deadlocks | Multiple mutexes acquired in different order | Always acquire in same order, use timeout | | Race conditions | Shared data without locks | Use QMutex or atomic operations | ## References - **Qt Threads**: https://doc.qt.io/qtforpython-6/PySide6/QtCore/QThread.html - **Thread Basics**: https://doc.qt.io/qtforpython-6/threads_and_qobjects.html - **QThreadPool**: https://doc.qt.io/qtforpython-6/PySide6/QtCore/QThreadPool.html - **QTimer**: https://doc.qt.io/qtforpython-6/PySide6/QtCore/QTimer.html