--- name: equipment-telematics description: "Integrate and analyze telematics data from heavy construction equipment. Track location, utilization, fuel consumption, maintenance needs, and operator behavior." --- # Equipment Telematics ## Overview Integrate telematics data from heavy construction equipment (excavators, cranes, loaders, trucks) to monitor utilization, track location, analyze fuel efficiency, predict maintenance needs, and ensure safe operation. ## Telematics Data Flow ``` ┌─────────────────────────────────────────────────────────────────┐ │ EQUIPMENT TELEMATICS │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ EQUIPMENT TELEMATICS ANALYTICS │ │ ───────── ────────── ───────── │ │ │ │ 🚜 Excavator ────┐ 📍 Location 📊 Utilization│ │ 🏗️ Crane ────┼──────→ 🔧 Engine Hours ────────→ ⛽ Fuel │ │ 🚛 Truck ────┤ ⛽ Fuel Level 🔧 Maintenance│ │ 🚧 Loader ────┘ ⚡ Performance 👷 Operator │ │ │ │ METRICS TRACKED: │ │ • GPS location and geofencing │ │ • Engine hours and idle time │ │ • Fuel consumption rate │ │ • Load cycles and productivity │ │ • Fault codes and diagnostics │ │ • Operator behavior and safety │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` ## Technical Implementation ```python from dataclasses import dataclass, field from typing import List, Dict, Optional, Tuple from datetime import datetime, timedelta from enum import Enum import statistics import math class EquipmentType(Enum): EXCAVATOR = "excavator" CRANE = "crane" LOADER = "loader" BULLDOZER = "bulldozer" DUMP_TRUCK = "dump_truck" CONCRETE_MIXER = "concrete_mixer" FORKLIFT = "forklift" COMPACTOR = "compactor" GRADER = "grader" TELEHANDLER = "telehandler" class OperatingStatus(Enum): OPERATING = "operating" IDLE = "idle" OFF = "off" MAINTENANCE = "maintenance" FAULT = "fault" class FaultSeverity(Enum): INFO = "info" WARNING = "warning" CRITICAL = "critical" SHUTDOWN = "shutdown" @dataclass class GPSLocation: latitude: float longitude: float altitude: float = 0.0 speed: float = 0.0 heading: float = 0.0 timestamp: datetime = field(default_factory=datetime.now) @dataclass class TelematicsReading: equipment_id: str timestamp: datetime location: GPSLocation engine_hours: float fuel_level: float # Percentage fuel_rate: float # L/hr engine_rpm: int hydraulic_temp: float coolant_temp: float operating_status: OperatingStatus load_percentage: float = 0.0 operator_id: str = "" @dataclass class FaultCode: code: str description: str severity: FaultSeverity timestamp: datetime equipment_id: str resolved: bool = False @dataclass class Equipment: id: str name: str equipment_type: EquipmentType make: str model: str year: int serial_number: str hourly_rate: float = 0.0 fuel_capacity: float = 0.0 # Liters current_hours: float = 0.0 next_service_hours: float = 0.0 assigned_site: str = "" assigned_operator: str = "" @dataclass class Geofence: id: str name: str center_lat: float center_lon: float radius_meters: float allowed_equipment: List[str] = field(default_factory=list) @dataclass class UtilizationReport: equipment_id: str period_start: datetime period_end: datetime total_hours: float operating_hours: float idle_hours: float off_hours: float utilization_pct: float idle_pct: float fuel_consumed: float fuel_efficiency: float # L/operating hour cycles: int class EquipmentTelematics: """Integrate and analyze equipment telematics data.""" # Maintenance intervals by type (hours) SERVICE_INTERVALS = { EquipmentType.EXCAVATOR: 250, EquipmentType.CRANE: 200, EquipmentType.LOADER: 250, EquipmentType.BULLDOZER: 250, EquipmentType.DUMP_TRUCK: 300, } # Typical fuel rates (L/hr) TYPICAL_FUEL_RATES = { EquipmentType.EXCAVATOR: 15, EquipmentType.CRANE: 12, EquipmentType.LOADER: 18, EquipmentType.BULLDOZER: 25, EquipmentType.DUMP_TRUCK: 20, } def __init__(self, fleet_name: str): self.fleet_name = fleet_name self.equipment: Dict[str, Equipment] = {} self.readings: List[TelematicsReading] = [] self.faults: List[FaultCode] = [] self.geofences: Dict[str, Geofence] = {} def register_equipment(self, id: str, name: str, equipment_type: EquipmentType, make: str, model: str, year: int, serial_number: str, hourly_rate: float = 0, fuel_capacity: float = 0) -> Equipment: """Register equipment in fleet.""" equipment = Equipment( id=id, name=name, equipment_type=equipment_type, make=make, model=model, year=year, serial_number=serial_number, hourly_rate=hourly_rate, fuel_capacity=fuel_capacity ) self.equipment[id] = equipment return equipment def add_geofence(self, id: str, name: str, center_lat: float, center_lon: float, radius_meters: float, allowed_equipment: List[str] = None) -> Geofence: """Add geofence boundary.""" geofence = Geofence( id=id, name=name, center_lat=center_lat, center_lon=center_lon, radius_meters=radius_meters, allowed_equipment=allowed_equipment or [] ) self.geofences[id] = geofence return geofence def ingest_reading(self, equipment_id: str, location: GPSLocation, engine_hours: float, fuel_level: float, fuel_rate: float, engine_rpm: int, hydraulic_temp: float, coolant_temp: float, load_percentage: float = 0, operator_id: str = "") -> TelematicsReading: """Ingest telematics reading from equipment.""" if equipment_id not in self.equipment: raise ValueError(f"Unknown equipment: {equipment_id}") # Determine operating status if engine_rpm == 0: status = OperatingStatus.OFF elif engine_rpm < 800 or load_percentage < 10: status = OperatingStatus.IDLE else: status = OperatingStatus.OPERATING reading = TelematicsReading( equipment_id=equipment_id, timestamp=location.timestamp, location=location, engine_hours=engine_hours, fuel_level=fuel_level, fuel_rate=fuel_rate, engine_rpm=engine_rpm, hydraulic_temp=hydraulic_temp, coolant_temp=coolant_temp, operating_status=status, load_percentage=load_percentage, operator_id=operator_id ) self.readings.append(reading) # Update equipment status equip = self.equipment[equipment_id] equip.current_hours = engine_hours # Check for issues self._check_diagnostics(equipment_id, reading) self._check_geofence(equipment_id, location) return reading def _check_diagnostics(self, equipment_id: str, reading: TelematicsReading): """Check for diagnostic issues.""" equip = self.equipment[equipment_id] # High temperature warning if reading.hydraulic_temp > 90: self._add_fault(equipment_id, "HYD_TEMP_HIGH", "Hydraulic temperature high", FaultSeverity.WARNING) if reading.coolant_temp > 100: self._add_fault(equipment_id, "COOLANT_TEMP_HIGH", "Coolant temperature critical", FaultSeverity.CRITICAL) # Low fuel warning if reading.fuel_level < 15: self._add_fault(equipment_id, "FUEL_LOW", "Fuel level below 15%", FaultSeverity.WARNING) # Service due service_interval = self.SERVICE_INTERVALS.get(equip.equipment_type, 250) hours_to_service = equip.next_service_hours - reading.engine_hours if hours_to_service < 0: self._add_fault(equipment_id, "SERVICE_OVERDUE", "Maintenance service overdue", FaultSeverity.WARNING) elif hours_to_service < 50: self._add_fault(equipment_id, "SERVICE_DUE", f"Service due in {hours_to_service:.0f} hours", FaultSeverity.INFO) def _check_geofence(self, equipment_id: str, location: GPSLocation): """Check geofence violations.""" for geofence in self.geofences.values(): # Calculate distance from center distance = self._haversine_distance( location.latitude, location.longitude, geofence.center_lat, geofence.center_lon ) if distance > geofence.radius_meters: if (not geofence.allowed_equipment or equipment_id in geofence.allowed_equipment): self._add_fault(equipment_id, "GEOFENCE_EXIT", f"Equipment left {geofence.name} boundary", FaultSeverity.WARNING) def _haversine_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Calculate distance between two coordinates in meters.""" R = 6371000 # Earth radius in meters phi1 = math.radians(lat1) phi2 = math.radians(lat2) delta_phi = math.radians(lat2 - lat1) delta_lambda = math.radians(lon2 - lon1) a = (math.sin(delta_phi/2)**2 + math.cos(phi1) * math.cos(phi2) * math.sin(delta_lambda/2)**2) c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) return R * c def _add_fault(self, equipment_id: str, code: str, description: str, severity: FaultSeverity): """Add fault code.""" # Check if same fault already active existing = [f for f in self.faults if f.equipment_id == equipment_id and f.code == code and not f.resolved] if existing: return fault = FaultCode( code=code, description=description, severity=severity, timestamp=datetime.now(), equipment_id=equipment_id ) self.faults.append(fault) def get_current_status(self, equipment_id: str) -> Dict: """Get current status of equipment.""" if equipment_id not in self.equipment: raise ValueError(f"Unknown equipment: {equipment_id}") equip = self.equipment[equipment_id] # Get latest reading readings = [r for r in self.readings if r.equipment_id == equipment_id] if not readings: return {"equipment": equip, "status": "no_data"} latest = max(readings, key=lambda r: r.timestamp) # Active faults active_faults = [f for f in self.faults if f.equipment_id == equipment_id and not f.resolved] return { "equipment_id": equip.id, "name": equip.name, "type": equip.equipment_type.value, "status": latest.operating_status.value, "location": { "lat": latest.location.latitude, "lon": latest.location.longitude, "speed": latest.location.speed }, "engine_hours": latest.engine_hours, "fuel_level": latest.fuel_level, "fuel_rate": latest.fuel_rate, "temps": { "hydraulic": latest.hydraulic_temp, "coolant": latest.coolant_temp }, "operator": latest.operator_id, "active_faults": len(active_faults), "last_update": latest.timestamp } def calculate_utilization(self, equipment_id: str, start_date: datetime, end_date: datetime) -> UtilizationReport: """Calculate utilization metrics for equipment.""" readings = [r for r in self.readings if r.equipment_id == equipment_id and start_date <= r.timestamp <= end_date] if not readings: return None readings.sort(key=lambda r: r.timestamp) total_hours = (end_date - start_date).total_seconds() / 3600 operating_hours = 0 idle_hours = 0 fuel_consumed = 0 # Calculate from readings for i in range(1, len(readings)): prev = readings[i-1] curr = readings[i] interval_hours = (curr.timestamp - prev.timestamp).total_seconds() / 3600 if prev.operating_status == OperatingStatus.OPERATING: operating_hours += interval_hours fuel_consumed += prev.fuel_rate * interval_hours elif prev.operating_status == OperatingStatus.IDLE: idle_hours += interval_hours fuel_consumed += prev.fuel_rate * interval_hours * 0.3 # Idle uses ~30% fuel off_hours = total_hours - operating_hours - idle_hours utilization_pct = (operating_hours / total_hours * 100) if total_hours > 0 else 0 idle_pct = (idle_hours / (operating_hours + idle_hours) * 100) if (operating_hours + idle_hours) > 0 else 0 fuel_efficiency = (fuel_consumed / operating_hours) if operating_hours > 0 else 0 return UtilizationReport( equipment_id=equipment_id, period_start=start_date, period_end=end_date, total_hours=total_hours, operating_hours=operating_hours, idle_hours=idle_hours, off_hours=off_hours, utilization_pct=utilization_pct, idle_pct=idle_pct, fuel_consumed=fuel_consumed, fuel_efficiency=fuel_efficiency, cycles=0 # Would need load cycle detection ) def get_fleet_summary(self) -> Dict: """Get summary of entire fleet.""" summary = { "total_equipment": len(self.equipment), "by_status": {}, "by_type": {}, "active_faults": 0, "service_due": [] } for equip in self.equipment.values(): # Count by type eq_type = equip.equipment_type.value summary["by_type"][eq_type] = summary["by_type"].get(eq_type, 0) + 1 # Get current status try: status = self.get_current_status(equip.id) op_status = status.get("status", "unknown") summary["by_status"][op_status] = summary["by_status"].get(op_status, 0) + 1 # Check service due service_interval = self.SERVICE_INTERVALS.get(equip.equipment_type, 250) hours_to_service = equip.next_service_hours - equip.current_hours if hours_to_service < 50: summary["service_due"].append({ "equipment": equip.name, "hours_remaining": hours_to_service }) except Exception: summary["by_status"]["unknown"] = summary["by_status"].get("unknown", 0) + 1 # Count active faults summary["active_faults"] = len([f for f in self.faults if not f.resolved]) return summary def predict_maintenance(self, equipment_id: str) -> Dict: """Predict maintenance needs based on usage patterns.""" if equipment_id not in self.equipment: raise ValueError(f"Unknown equipment: {equipment_id}") equip = self.equipment[equipment_id] # Calculate average daily hours week_ago = datetime.now() - timedelta(days=7) recent_readings = [r for r in self.readings if r.equipment_id == equipment_id and r.timestamp > week_ago] if len(recent_readings) < 2: return {"prediction": "insufficient_data"} hours_start = min(r.engine_hours for r in recent_readings) hours_end = max(r.engine_hours for r in recent_readings) days = (max(r.timestamp for r in recent_readings) - min(r.timestamp for r in recent_readings)).days or 1 daily_hours = (hours_end - hours_start) / days # Predict service date service_interval = self.SERVICE_INTERVALS.get(equip.equipment_type, 250) hours_to_service = equip.next_service_hours - equip.current_hours if daily_hours > 0: days_to_service = hours_to_service / daily_hours service_date = datetime.now() + timedelta(days=days_to_service) else: service_date = None return { "equipment_id": equipment_id, "current_hours": equip.current_hours, "next_service_hours": equip.next_service_hours, "hours_to_service": hours_to_service, "avg_daily_hours": daily_hours, "predicted_service_date": service_date, "service_type": "Routine maintenance", "estimated_downtime_hours": 8 } def generate_report(self) -> str: """Generate fleet telematics report.""" summary = self.get_fleet_summary() lines = [ "# Equipment Telematics Report", "", f"**Fleet:** {self.fleet_name}", f"**Report Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}", "", "## Fleet Summary", "", f"| Metric | Value |", f"|--------|-------|", f"| Total Equipment | {summary['total_equipment']} |", f"| Active Faults | {summary['active_faults']} |", f"| Service Due | {len(summary['service_due'])} |", "", "## Status Distribution", "" ] for status, count in summary["by_status"].items(): lines.append(f"- {status}: {count}") # Equipment details lines.extend([ "", "## Equipment Status", "", "| Equipment | Type | Status | Hours | Fuel | Faults |", "|-----------|------|--------|-------|------|--------|" ]) for equip in self.equipment.values(): try: status = self.get_current_status(equip.id) status_icon = "✅" if status['status'] == 'operating' else "⏸️" if status['status'] == 'idle' else "⏹️" lines.append( f"| {equip.name} | {equip.equipment_type.value} | " f"{status_icon} {status['status']} | {status['engine_hours']:.0f} | " f"{status['fuel_level']:.0f}% | {status['active_faults']} |" ) except Exception: lines.append( f"| {equip.name} | {equip.equipment_type.value} | ⚠️ No data | - | - | - |" ) # Service due if summary["service_due"]: lines.extend([ "", "## Service Due Soon", "", "| Equipment | Hours Remaining |", "|-----------|-----------------|" ]) for svc in summary["service_due"]: lines.append(f"| {svc['equipment']} | {svc['hours_remaining']:.0f} |") # Active faults active_faults = [f for f in self.faults if not f.resolved] if active_faults: lines.extend([ "", "## Active Faults", "", "| Equipment | Code | Description | Severity |", "|-----------|------|-------------|----------|" ]) for fault in active_faults[:10]: sev_icon = "🔴" if fault.severity == FaultSeverity.CRITICAL else "🟡" equip = self.equipment.get(fault.equipment_id) lines.append( f"| {equip.name if equip else fault.equipment_id} | " f"{fault.code} | {fault.description} | {sev_icon} {fault.severity.value} |" ) return "\n".join(lines) ``` ## Quick Start ```python from datetime import datetime, timedelta # Initialize telematics system telematics = EquipmentTelematics("Site A Fleet") # Register equipment telematics.register_equipment( "EX-001", "Excavator #1", EquipmentType.EXCAVATOR, make="Caterpillar", model="320", year=2022, serial_number="CAT320X12345", hourly_rate=150, fuel_capacity=400 ) telematics.register_equipment( "CR-001", "Tower Crane #1", EquipmentType.CRANE, make="Liebherr", model="200EC-H", year=2021, serial_number="LH200EC54321", hourly_rate=200, fuel_capacity=300 ) # Add geofence for site boundary telematics.add_geofence( "SITE-A", "Site A Boundary", center_lat=40.7128, center_lon=-74.0060, radius_meters=500 ) # Ingest telematics reading location = GPSLocation( latitude=40.7128, longitude=-74.0059, speed=5.0, timestamp=datetime.now() ) telematics.ingest_reading( "EX-001", location, engine_hours=1250.5, fuel_level=65.0, fuel_rate=18.5, engine_rpm=1800, hydraulic_temp=75.0, coolant_temp=85.0, load_percentage=75, operator_id="OP-101" ) # Get current status status = telematics.get_current_status("EX-001") print(f"Excavator status: {status['status']}") print(f"Location: {status['location']}") print(f"Fuel: {status['fuel_level']}%") # Calculate utilization util = telematics.calculate_utilization( "EX-001", datetime.now() - timedelta(days=7), datetime.now() ) if util: print(f"Utilization: {util.utilization_pct:.1f}%") print(f"Fuel efficiency: {util.fuel_efficiency:.1f} L/hr") # Predict maintenance maintenance = telematics.predict_maintenance("EX-001") print(f"Days to service: {maintenance.get('predicted_service_date')}") # Fleet summary summary = telematics.get_fleet_summary() print(f"Fleet: {summary['total_equipment']} units") # Generate report print(telematics.generate_report()) ``` ## Requirements ```bash pip install (no external dependencies) ```