--- name: batch-processing-jobs description: Implement robust batch processing systems with job queues, schedulers, background tasks, and distributed workers. Use when processing large datasets, scheduled tasks, async operations, or resource-intensive computations. --- # Batch Processing Jobs ## Overview Implement scalable batch processing systems for handling large-scale data processing, scheduled tasks, and async operations efficiently. ## When to Use - Processing large datasets - Scheduled report generation - Email/notification campaigns - Data imports and exports - Image/video processing - ETL pipelines - Cleanup and maintenance tasks - Long-running computations - Bulk data updates ## Architecture Patterns ``` ┌─────────────┐ ┌─────────────┐ ┌──────────┐ │ Producer │─────▶│ Queue │─────▶│ Worker │ └─────────────┘ └─────────────┘ └──────────┘ │ │ │ ▼ │ ┌──────────┐ └─────────────▶│ Result │ │ Storage │ └──────────┘ ``` ## Implementation Examples ### 1. **Bull Queue (Node.js)** ```typescript import Queue from 'bull'; import { v4 as uuidv4 } from 'uuid'; interface JobData { id: string; type: string; payload: any; userId?: string; metadata?: Record; } interface JobResult { success: boolean; data?: any; error?: string; processedAt: number; duration: number; } class BatchProcessor { private queue: Queue.Queue; private resultQueue: Queue.Queue; constructor(redisUrl: string) { // Main processing queue this.queue = new Queue('batch-jobs', redisUrl, { defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 2000 }, removeOnComplete: 1000, removeOnFail: 5000, timeout: 300000 // 5 minutes }, settings: { maxStalledCount: 2, stalledInterval: 30000 } }); // Results queue this.resultQueue = new Queue('batch-results', redisUrl); this.setupProcessors(); this.setupEvents(); } private setupProcessors(): void { // Data processing job this.queue.process('process-data', 10, async (job) => { const startTime = Date.now(); const { payload } = job.data; job.log(`Processing ${payload.items?.length || 0} items`); try { // Update progress await job.progress(0); const results = await this.processDataBatch( payload.items, (progress) => job.progress(progress) ); const duration = Date.now() - startTime; return { success: true, data: results, processedAt: Date.now(), duration }; } catch (error: any) { const duration = Date.now() - startTime; throw new Error(`Processing failed: ${error.message}`); } }); // Report generation job this.queue.process('generate-report', 2, async (job) => { const { payload } = job.data; const report = await this.generateReport( payload.type, payload.filters, payload.format ); return { success: true, data: { reportId: uuidv4(), url: report.url, size: report.size }, processedAt: Date.now(), duration: 0 }; }); // Email batch job this.queue.process('send-emails', 5, async (job) => { const { payload } = job.data; const { recipients, template, data } = payload; const results = await this.sendEmailBatch( recipients, template, data ); return { success: true, data: { sent: results.successful, failed: results.failed }, processedAt: Date.now(), duration: 0 }; }); } private setupEvents(): void { this.queue.on('completed', (job, result) => { console.log(`Job ${job.id} completed:`, result); // Store result this.resultQueue.add({ jobId: job.id, ...result }); }); this.queue.on('failed', (job, error) => { console.error(`Job ${job?.id} failed:`, error.message); // Store failure this.resultQueue.add({ jobId: job?.id, success: false, error: error.message, processedAt: Date.now(), duration: 0 }); }); this.queue.on('progress', (job, progress) => { console.log(`Job ${job.id} progress: ${progress}%`); }); this.queue.on('stalled', (job) => { console.warn(`Job ${job.id} stalled`); }); } async addJob( type: string, payload: any, options?: Queue.JobOptions ): Promise> { const jobData: JobData = { id: uuidv4(), type, payload, metadata: { createdAt: Date.now() } }; return this.queue.add(type, jobData, options); } async addBulkJobs( jobs: Array<{ type: string; payload: any; options?: Queue.JobOptions }> ): Promise[]> { const bulkData = jobs.map(({ type, payload, options }) => ({ name: type, data: { id: uuidv4(), type, payload, metadata: { createdAt: Date.now() } }, opts: options || {} })); return this.queue.addBulk(bulkData); } async scheduleJob( type: string, payload: any, cronExpression: string ): Promise> { return this.addJob(type, payload, { repeat: { cron: cronExpression } }); } private async processDataBatch( items: any[], onProgress: (progress: number) => Promise ): Promise { const results = []; const total = items.length; for (let i = 0; i < total; i++) { const result = await this.processItem(items[i]); results.push(result); // Update progress const progress = Math.round(((i + 1) / total) * 100); await onProgress(progress); } return results; } private async processItem(item: any): Promise { // Simulate processing await new Promise(resolve => setTimeout(resolve, 100)); return { ...item, processed: true }; } private async generateReport( type: string, filters: any, format: string ): Promise { // Simulate report generation return { url: `https://cdn.example.com/reports/${uuidv4()}.${format}`, size: 1024 * 1024 }; } private async sendEmailBatch( recipients: string[], template: string, data: any ): Promise<{ successful: number; failed: number }> { // Simulate email sending return { successful: recipients.length, failed: 0 }; } async getJobStatus(jobId: string): Promise { const job = await this.queue.getJob(jobId); if (!job) return null; const state = await job.getState(); const logs = await this.queue.getJobLogs(jobId); return { id: job.id, name: job.name, data: job.data, state, progress: job.progress(), attempts: job.attemptsMade, failedReason: job.failedReason, finishedOn: job.finishedOn, processedOn: job.processedOn, logs: logs.logs }; } async getQueueStats(): Promise { const [ waiting, active, completed, failed, delayed, paused ] = await Promise.all([ this.queue.getWaitingCount(), this.queue.getActiveCount(), this.queue.getCompletedCount(), this.queue.getFailedCount(), this.queue.getDelayedCount(), this.queue.getPausedCount() ]); return { waiting, active, completed, failed, delayed, paused }; } async pause(): Promise { await this.queue.pause(); } async resume(): Promise { await this.queue.resume(); } async clean(grace: number = 0): Promise { await this.queue.clean(grace, 'completed'); await this.queue.clean(grace, 'failed'); } async close(): Promise { await this.queue.close(); await this.resultQueue.close(); } } // Usage const processor = new BatchProcessor('redis://localhost:6379'); // Add single job const job = await processor.addJob('process-data', { items: [{ id: 1 }, { id: 2 }, { id: 3 }] }); // Add bulk jobs await processor.addBulkJobs([ { type: 'process-data', payload: { items: [/* ... */] } }, { type: 'generate-report', payload: { type: 'sales', format: 'pdf' } } ]); // Schedule recurring job await processor.scheduleJob( 'generate-report', { type: 'daily-summary' }, '0 0 * * *' // Daily at midnight ); // Check status const status = await processor.getJobStatus(job.id!); console.log('Job status:', status); // Get queue stats const stats = await processor.getQueueStats(); console.log('Queue stats:', stats); ``` ### 2. **Celery-Style Worker (Python)** ```python from celery import Celery, Task from celery.schedules import crontab from typing import List, Any, Dict import time import logging # Initialize Celery app = Celery( 'batch_processor', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1' ) # Configure Celery app.conf.update( task_serializer='json', accept_content=['json'], result_serializer='json', timezone='UTC', enable_utc=True, task_track_started=True, task_time_limit=300, # 5 minutes task_soft_time_limit=270, # 4.5 minutes worker_prefetch_multiplier=4, worker_max_tasks_per_child=1000, ) # Periodic tasks app.conf.beat_schedule = { 'daily-report': { 'task': 'tasks.generate_daily_report', 'schedule': crontab(hour=0, minute=0), }, 'cleanup-old-data': { 'task': 'tasks.cleanup_old_data', 'schedule': crontab(hour=2, minute=0), }, } logger = logging.getLogger(__name__) class CallbackTask(Task): """Base task with callback support.""" def on_success(self, retval, task_id, args, kwargs): logger.info(f"Task {task_id} succeeded: {retval}") def on_failure(self, exc, task_id, args, kwargs, einfo): logger.error(f"Task {task_id} failed: {exc}") def on_retry(self, exc, task_id, args, kwargs, einfo): logger.warning(f"Task {task_id} retrying: {exc}") @app.task(base=CallbackTask, bind=True, max_retries=3) def process_batch_data(self, items: List[Dict[str, Any]]) -> Dict[str, Any]: """Process batch of data items.""" try: results = [] total = len(items) for i, item in enumerate(items): # Process item result = process_single_item(item) results.append(result) # Update progress progress = int((i + 1) / total * 100) self.update_state( state='PROGRESS', meta={'current': i + 1, 'total': total, 'percent': progress} ) return { 'processed': len(results), 'success': True, 'results': results } except Exception as exc: logger.error(f"Batch processing failed: {exc}") raise self.retry(exc=exc, countdown=60) # Retry after 1 minute @app.task def process_single_item(item: Dict[str, Any]) -> Dict[str, Any]: """Process single item.""" # Simulate processing time.sleep(0.1) return { 'id': item.get('id'), 'processed': True, 'timestamp': time.time() } @app.task(bind=True) def generate_report( self, report_type: str, filters: Dict[str, Any], format: str = 'pdf' ) -> Dict[str, str]: """Generate report.""" logger.info(f"Generating {report_type} report in {format} format") self.update_state(state='PROGRESS', meta={'step': 'gathering_data'}) # Gather data time.sleep(2) self.update_state(state='PROGRESS', meta={'step': 'processing'}) # Process data time.sleep(2) self.update_state(state='PROGRESS', meta={'step': 'generating'}) # Generate report time.sleep(2) return { 'report_id': f"report-{int(time.time())}", 'url': f"https://cdn.example.com/reports/report.{format}", 'format': format } @app.task def send_email_batch( recipients: List[str], template: str, context: Dict[str, Any] ) -> Dict[str, int]: """Send batch of emails.""" successful = 0 failed = 0 for recipient in recipients: try: send_email(recipient, template, context) successful += 1 except Exception as e: logger.error(f"Failed to send email to {recipient}: {e}") failed += 1 return { 'successful': successful, 'failed': failed, 'total': len(recipients) } @app.task def generate_daily_report(): """Scheduled task: Generate daily report.""" logger.info("Generating daily report") generate_report.delay('daily', {}, 'pdf') @app.task def cleanup_old_data(): """Scheduled task: Clean up old data.""" logger.info("Cleaning up old data") # Cleanup logic here def send_email(recipient: str, template: str, context: Dict[str, Any]): """Send single email.""" logger.info(f"Sending email to {recipient}") # Email sending logic # Task chaining and grouping from celery import chain, group, chord def process_in_chunks(items: List[Any], chunk_size: int = 100): """Process items in parallel chunks.""" chunks = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)] # Process chunks in parallel job = group(process_batch_data.s(chunk) for chunk in chunks) result = job.apply_async() return result def process_with_callback(items: List[Any]): """Process items and call callback when done.""" callback = send_notification.s() header = group(process_batch_data.s(chunk) for chunk in [items]) # Use chord to call callback after all tasks complete job = chord(header)(callback) return job @app.task def send_notification(results): """Callback task after batch processing.""" logger.info(f"All tasks completed: {len(results)} results") # Usage examples if __name__ == '__main__': # Enqueue task result = process_batch_data.delay([ {'id': 1, 'value': 'a'}, {'id': 2, 'value': 'b'} ]) # Check task status print(f"Task ID: {result.id}") print(f"Status: {result.status}") # Wait for result (blocking) final_result = result.get(timeout=10) print(f"Result: {final_result}") # Process in chunks items = [{'id': i} for i in range(1000)] chunk_result = process_in_chunks(items, chunk_size=100) # Check group result print(f"Chunks: {len(chunk_result)}") ``` ### 3. **Cron Job Scheduler** ```typescript import cron from 'node-cron'; interface ScheduledJob { name: string; schedule: string; handler: () => Promise; enabled: boolean; lastRun?: Date; nextRun?: Date; } class JobScheduler { private jobs: Map = new Map(); private jobConfigs: Map = new Map(); register(job: ScheduledJob): void { if (this.jobs.has(job.name)) { throw new Error(`Job ${job.name} already registered`); } // Validate cron expression if (!cron.validate(job.schedule)) { throw new Error(`Invalid cron expression: ${job.schedule}`); } const task = cron.schedule(job.schedule, async () => { if (!job.enabled) return; console.log(`Running job: ${job.name}`); const startTime = Date.now(); try { await job.handler(); const duration = Date.now() - startTime; console.log(`Job ${job.name} completed in ${duration}ms`); job.lastRun = new Date(); } catch (error) { console.error(`Job ${job.name} failed:`, error); } }); this.jobs.set(job.name, task); this.jobConfigs.set(job.name, job); if (job.enabled) { task.start(); } } start(name: string): void { const task = this.jobs.get(name); if (!task) { throw new Error(`Job ${name} not found`); } task.start(); const config = this.jobConfigs.get(name)!; config.enabled = true; } stop(name: string): void { const task = this.jobs.get(name); if (!task) { throw new Error(`Job ${name} not found`); } task.stop(); const config = this.jobConfigs.get(name)!; config.enabled = false; } remove(name: string): void { const task = this.jobs.get(name); if (task) { task.destroy(); this.jobs.delete(name); this.jobConfigs.delete(name); } } getJobs(): ScheduledJob[] { return Array.from(this.jobConfigs.values()); } } // Usage const scheduler = new JobScheduler(); // Register jobs scheduler.register({ name: 'daily-backup', schedule: '0 2 * * *', // 2 AM daily enabled: true, handler: async () => { console.log('Running daily backup...'); // Backup logic } }); scheduler.register({ name: 'hourly-cleanup', schedule: '0 * * * *', // Every hour enabled: true, handler: async () => { console.log('Running cleanup...'); // Cleanup logic } }); scheduler.register({ name: 'weekly-report', schedule: '0 9 * * 1', // Monday 9 AM enabled: true, handler: async () => { console.log('Generating weekly report...'); // Report generation } }); ``` ## Best Practices ### ✅ DO - Implement idempotency for all jobs - Use job queues for distributed processing - Monitor job success/failure rates - Implement retry logic with exponential backoff - Set appropriate timeouts - Log job execution details - Use dead letter queues for failed jobs - Implement job priority levels - Batch similar operations together - Use connection pooling - Implement graceful shutdown - Monitor queue depth and processing time ### ❌ DON'T - Process jobs synchronously in request handlers - Ignore failed jobs - Set unlimited retries - Skip monitoring and alerting - Process jobs without timeouts - Store large payloads in queue - Forget to clean up completed jobs ## Resources - [Bull Queue Documentation](https://github.com/OptimalBits/bull) - [Celery Documentation](https://docs.celeryq.dev/) - [Cron Expression Guide](https://crontab.guru/)