--- name: environmental-monitoring description: "Monitor environmental conditions on construction sites. Track air quality, noise levels, vibration, dust, and weather to ensure compliance and worker safety." --- # Environmental Monitoring ## Overview Monitor and analyze environmental conditions on construction sites including air quality, noise, vibration, dust, and weather. Support regulatory compliance, worker safety, and community relations through real-time environmental tracking. ## Environmental Monitoring System ``` ┌─────────────────────────────────────────────────────────────────┐ │ ENVIRONMENTAL MONITORING │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ SENSORS MONITORING COMPLIANCE │ │ ─────── ────────── ────────── │ │ │ │ 💨 Air Quality ───┐ ✅ OSHA limits │ │ 🔊 Noise Level ───┼─────→ Real-time ────────→ ✅ EPA limits │ │ 📊 Vibration ───┤ Dashboard ✅ Local codes │ │ 🌫️ Dust/PM ───┤ Alerts ✅ Permits │ │ 🌡️ Weather ───┘ Reports ✅ Neighbors │ │ │ │ THRESHOLDS: │ │ • Noise: 85 dB (OSHA 8hr TWA) │ │ • PM2.5: 35 µg/m³ (EPA 24hr) │ │ • Vibration: 25 mm/s (structural) │ │ • CO: 50 ppm (OSHA ceiling) │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` ## Technical Implementation ```python from dataclasses import dataclass, field from typing import List, Dict, Optional, Tuple from datetime import datetime, timedelta from enum import Enum import statistics import math class ParameterType(Enum): NOISE = "noise" PM25 = "pm25" PM10 = "pm10" CO = "co" CO2 = "co2" VOC = "voc" VIBRATION = "vibration" TEMPERATURE = "temperature" HUMIDITY = "humidity" WIND_SPEED = "wind_speed" WIND_DIRECTION = "wind_direction" RAINFALL = "rainfall" class ComplianceStatus(Enum): COMPLIANT = "compliant" WARNING = "warning" EXCEEDANCE = "exceedance" CRITICAL = "critical" class AlertType(Enum): THRESHOLD_WARNING = "threshold_warning" THRESHOLD_EXCEEDANCE = "threshold_exceedance" EQUIPMENT_MALFUNCTION = "equipment_malfunction" WEATHER_ALERT = "weather_alert" COMMUNITY_COMPLAINT = "community_complaint" @dataclass class RegulatoryLimit: parameter: ParameterType limit_value: float unit: str averaging_period_hours: float # e.g., 8 for 8-hour TWA regulation: str # e.g., "OSHA", "EPA" description: str @dataclass class EnvironmentalReading: station_id: str parameter: ParameterType timestamp: datetime value: float unit: str quality_flag: str = "valid" @dataclass class MonitoringStation: id: str name: str location: Dict # {lat, lon, description} parameters: List[ParameterType] installation_date: datetime last_calibration: datetime status: str = "active" @dataclass class ComplianceRecord: parameter: ParameterType regulation: str limit_value: float measured_value: float averaging_period: str status: ComplianceStatus timestamp: datetime location: str @dataclass class EnvironmentalAlert: id: str alert_type: AlertType parameter: ParameterType station_id: str timestamp: datetime value: float threshold: float message: str acknowledged: bool = False resolved: bool = False resolution_notes: str = "" @dataclass class DailyReport: date: datetime site_name: str parameters_monitored: int readings_collected: int exceedances: int alerts_triggered: int compliance_status: ComplianceStatus summary: Dict[str, Dict] class EnvironmentalMonitor: """Monitor environmental conditions on construction sites.""" # Default regulatory limits REGULATORY_LIMITS = { ParameterType.NOISE: [ RegulatoryLimit(ParameterType.NOISE, 85, "dBA", 8.0, "OSHA", "8-hour TWA"), RegulatoryLimit(ParameterType.NOISE, 90, "dBA", 8.0, "OSHA", "Action level"), RegulatoryLimit(ParameterType.NOISE, 115, "dBA", 0.25, "OSHA", "15-min max"), ], ParameterType.PM25: [ RegulatoryLimit(ParameterType.PM25, 35, "µg/m³", 24.0, "EPA", "24-hour standard"), RegulatoryLimit(ParameterType.PM25, 12, "µg/m³", 8760.0, "EPA", "Annual standard"), ], ParameterType.PM10: [ RegulatoryLimit(ParameterType.PM10, 150, "µg/m³", 24.0, "EPA", "24-hour standard"), ], ParameterType.CO: [ RegulatoryLimit(ParameterType.CO, 50, "ppm", 0.0, "OSHA", "Ceiling limit"), RegulatoryLimit(ParameterType.CO, 35, "ppm", 8.0, "OSHA", "8-hour TWA"), ], ParameterType.VIBRATION: [ RegulatoryLimit(ParameterType.VIBRATION, 25, "mm/s", 0.0, "ISO 4866", "Structural damage threshold"), RegulatoryLimit(ParameterType.VIBRATION, 5, "mm/s", 0.0, "DIN 4150", "Sensitive structures"), ], } def __init__(self, site_name: str): self.site_name = site_name self.stations: Dict[str, MonitoringStation] = {} self.readings: List[EnvironmentalReading] = [] self.alerts: List[EnvironmentalAlert] = [] self.custom_limits: Dict[ParameterType, List[RegulatoryLimit]] = {} def add_station(self, id: str, name: str, location: Dict, parameters: List[ParameterType]) -> MonitoringStation: """Add monitoring station.""" station = MonitoringStation( id=id, name=name, location=location, parameters=parameters, installation_date=datetime.now(), last_calibration=datetime.now() ) self.stations[id] = station return station def add_custom_limit(self, parameter: ParameterType, limit_value: float, unit: str, averaging_hours: float, regulation: str, description: str): """Add custom regulatory limit.""" limit = RegulatoryLimit( parameter=parameter, limit_value=limit_value, unit=unit, averaging_period_hours=averaging_hours, regulation=regulation, description=description ) if parameter not in self.custom_limits: self.custom_limits[parameter] = [] self.custom_limits[parameter].append(limit) def record_reading(self, station_id: str, parameter: ParameterType, value: float, unit: str, timestamp: datetime = None) -> EnvironmentalReading: """Record environmental reading.""" if station_id not in self.stations: raise ValueError(f"Unknown station: {station_id}") reading = EnvironmentalReading( station_id=station_id, parameter=parameter, timestamp=timestamp or datetime.now(), value=value, unit=unit ) self.readings.append(reading) # Check against limits self._check_limits(station_id, parameter, value) return reading def record_batch(self, readings: List[Dict]) -> int: """Record multiple readings.""" count = 0 for r in readings: try: self.record_reading( station_id=r['station_id'], parameter=ParameterType(r['parameter']), value=r['value'], unit=r['unit'], timestamp=r.get('timestamp') ) count += 1 except Exception: pass return count def _check_limits(self, station_id: str, parameter: ParameterType, value: float): """Check value against regulatory limits.""" # Get applicable limits limits = self.REGULATORY_LIMITS.get(parameter, []) limits.extend(self.custom_limits.get(parameter, [])) for limit in limits: if limit.averaging_period_hours == 0: # Instantaneous limit check_value = value else: # Time-weighted average check_value = self._calculate_twa( station_id, parameter, limit.averaging_period_hours ) if check_value is None: continue # Check against limit if check_value >= limit.limit_value: self._create_alert( station_id, parameter, check_value, limit ) elif check_value >= limit.limit_value * 0.8: # Warning at 80% of limit self._create_alert( station_id, parameter, check_value, limit, is_warning=True ) def _calculate_twa(self, station_id: str, parameter: ParameterType, hours: float) -> Optional[float]: """Calculate time-weighted average.""" cutoff = datetime.now() - timedelta(hours=hours) readings = [r for r in self.readings if r.station_id == station_id and r.parameter == parameter and r.timestamp > cutoff] if not readings: return None return statistics.mean([r.value for r in readings]) def _create_alert(self, station_id: str, parameter: ParameterType, value: float, limit: RegulatoryLimit, is_warning: bool = False): """Create environmental alert.""" # Avoid duplicate alerts recent_alerts = [a for a in self.alerts if a.station_id == station_id and a.parameter == parameter and not a.resolved and (datetime.now() - a.timestamp).total_seconds() < 3600] if recent_alerts: return alert_type = (AlertType.THRESHOLD_WARNING if is_warning else AlertType.THRESHOLD_EXCEEDANCE) station = self.stations.get(station_id) alert = EnvironmentalAlert( id=f"ENV-{len(self.alerts)+1:05d}", alert_type=alert_type, parameter=parameter, station_id=station_id, timestamp=datetime.now(), value=value, threshold=limit.limit_value, message=f"{parameter.value} {'approaching' if is_warning else 'exceeds'} " f"{limit.regulation} limit ({limit.limit_value} {limit.unit}) " f"at {station.name if station else station_id}" ) self.alerts.append(alert) def get_current_conditions(self, station_id: str = None) -> Dict: """Get current environmental conditions.""" conditions = {} stations = ([self.stations[station_id]] if station_id else self.stations.values()) for station in stations: station_conditions = {} for param in station.parameters: # Get latest reading readings = [r for r in self.readings if r.station_id == station.id and r.parameter == param] if readings: latest = max(readings, key=lambda r: r.timestamp) station_conditions[param.value] = { "value": latest.value, "unit": latest.unit, "timestamp": latest.timestamp, "status": self._get_compliance_status(param, latest.value) } conditions[station.id] = { "name": station.name, "location": station.location, "parameters": station_conditions } return conditions def _get_compliance_status(self, parameter: ParameterType, value: float) -> ComplianceStatus: """Determine compliance status for value.""" limits = self.REGULATORY_LIMITS.get(parameter, []) limits.extend(self.custom_limits.get(parameter, [])) # Check instantaneous limits instant_limits = [l for l in limits if l.averaging_period_hours == 0] for limit in instant_limits: if value >= limit.limit_value: return ComplianceStatus.EXCEEDANCE elif value >= limit.limit_value * 0.9: return ComplianceStatus.WARNING return ComplianceStatus.COMPLIANT def check_compliance(self, start_date: datetime, end_date: datetime) -> List[ComplianceRecord]: """Check compliance for period.""" records = [] for station in self.stations.values(): for param in station.parameters: limits = self.REGULATORY_LIMITS.get(param, []) limits.extend(self.custom_limits.get(param, [])) for limit in limits: # Calculate average for period readings = [r for r in self.readings if r.station_id == station.id and r.parameter == param and start_date <= r.timestamp <= end_date] if not readings: continue avg_value = statistics.mean([r.value for r in readings]) max_value = max(r.value for r in readings) # Check appropriate value if limit.averaging_period_hours == 0: check_value = max_value period_str = "Instantaneous" else: check_value = avg_value period_str = f"{limit.averaging_period_hours:.0f}-hour avg" # Determine status if check_value >= limit.limit_value: status = ComplianceStatus.EXCEEDANCE elif check_value >= limit.limit_value * 0.9: status = ComplianceStatus.WARNING else: status = ComplianceStatus.COMPLIANT records.append(ComplianceRecord( parameter=param, regulation=limit.regulation, limit_value=limit.limit_value, measured_value=check_value, averaging_period=period_str, status=status, timestamp=end_date, location=station.name )) return records def get_exceedance_summary(self, days: int = 30) -> Dict: """Get summary of exceedances.""" cutoff = datetime.now() - timedelta(days=days) recent_alerts = [a for a in self.alerts if a.timestamp > cutoff and a.alert_type == AlertType.THRESHOLD_EXCEEDANCE] summary = { "period_days": days, "total_exceedances": len(recent_alerts), "by_parameter": {}, "by_station": {}, "recent_events": [] } for alert in recent_alerts: # By parameter param = alert.parameter.value summary["by_parameter"][param] = summary["by_parameter"].get(param, 0) + 1 # By station station = alert.station_id summary["by_station"][station] = summary["by_station"].get(station, 0) + 1 # Recent events summary["recent_events"] = sorted( recent_alerts, key=lambda a: a.timestamp, reverse=True )[:10] return summary def generate_daily_report(self, date: datetime = None) -> DailyReport: """Generate daily environmental report.""" if date is None: date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) next_day = date + timedelta(days=1) # Filter readings day_readings = [r for r in self.readings if date <= r.timestamp < next_day] # Filter alerts day_alerts = [a for a in self.alerts if date <= a.timestamp < next_day] # Check compliance compliance_records = self.check_compliance(date, next_day) exceedances = [r for r in compliance_records if r.status == ComplianceStatus.EXCEEDANCE] # Overall status if exceedances: overall_status = ComplianceStatus.EXCEEDANCE elif any(r.status == ComplianceStatus.WARNING for r in compliance_records): overall_status = ComplianceStatus.WARNING else: overall_status = ComplianceStatus.COMPLIANT # Summary by parameter param_summary = {} for param in ParameterType: param_readings = [r for r in day_readings if r.parameter == param] if param_readings: values = [r.value for r in param_readings] param_summary[param.value] = { "count": len(values), "min": min(values), "max": max(values), "avg": statistics.mean(values), "exceedances": len([r for r in compliance_records if r.parameter == param and r.status == ComplianceStatus.EXCEEDANCE]) } return DailyReport( date=date, site_name=self.site_name, parameters_monitored=len(set(r.parameter for r in day_readings)), readings_collected=len(day_readings), exceedances=len(exceedances), alerts_triggered=len(day_alerts), compliance_status=overall_status, summary=param_summary ) def generate_report(self) -> str: """Generate environmental monitoring report.""" lines = [ "# Environmental Monitoring Report", "", f"**Site:** {self.site_name}", f"**Report Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}", "", "## Monitoring Stations", "", "| Station | Location | Parameters | Status |", "|---------|----------|------------|--------|" ] for station in self.stations.values(): params = ", ".join([p.value for p in station.parameters]) lines.append( f"| {station.name} | {station.location.get('description', '-')} | " f"{params} | {station.status} |" ) # Current conditions conditions = self.get_current_conditions() lines.extend([ "", "## Current Conditions", "" ]) for station_id, data in conditions.items(): lines.append(f"### {data['name']}") lines.append("") lines.append("| Parameter | Value | Status |") lines.append("|-----------|-------|--------|") for param, values in data['parameters'].items(): status_icon = ("✅" if values['status'] == ComplianceStatus.COMPLIANT else "⚠️" if values['status'] == ComplianceStatus.WARNING else "🔴") lines.append( f"| {param} | {values['value']:.1f} {values['unit']} | " f"{status_icon} {values['status'].value} |" ) lines.append("") # Exceedance summary exceedance_summary = self.get_exceedance_summary(30) lines.extend([ "## 30-Day Exceedance Summary", "", f"**Total Exceedances:** {exceedance_summary['total_exceedances']}", "" ]) if exceedance_summary['by_parameter']: lines.append("By Parameter:") for param, count in exceedance_summary['by_parameter'].items(): lines.append(f"- {param}: {count}") # Active alerts active_alerts = [a for a in self.alerts if not a.resolved] if active_alerts: lines.extend([ "", f"## Active Alerts ({len(active_alerts)})", "", "| Time | Parameter | Station | Value | Threshold |", "|------|-----------|---------|-------|-----------|" ]) for alert in sorted(active_alerts, key=lambda a: a.timestamp, reverse=True)[:10]: lines.append( f"| {alert.timestamp.strftime('%Y-%m-%d %H:%M')} | " f"{alert.parameter.value} | {alert.station_id} | " f"{alert.value:.1f} | {alert.threshold} |" ) return "\n".join(lines) ``` ## Quick Start ```python from datetime import datetime, timedelta # Initialize monitor monitor = EnvironmentalMonitor("Downtown Construction Site") # Add monitoring stations monitor.add_station( "STA-001", "North Perimeter", location={"lat": 40.7128, "lon": -74.0060, "description": "North fence line"}, parameters=[ParameterType.NOISE, ParameterType.PM25, ParameterType.PM10] ) monitor.add_station( "STA-002", "Equipment Area", location={"lat": 40.7125, "lon": -74.0055, "description": "Near excavation"}, parameters=[ParameterType.NOISE, ParameterType.VIBRATION, ParameterType.CO] ) # Add custom limit for local ordinance monitor.add_custom_limit( ParameterType.NOISE, 65, "dBA", 0, "Local Ordinance", "Residential boundary limit" ) # Record readings monitor.record_reading("STA-001", ParameterType.NOISE, 78.5, "dBA") monitor.record_reading("STA-001", ParameterType.PM25, 28.3, "µg/m³") monitor.record_reading("STA-002", ParameterType.VIBRATION, 8.2, "mm/s") # Batch record readings = [ {"station_id": "STA-001", "parameter": "noise", "value": 82.0, "unit": "dBA"}, {"station_id": "STA-001", "parameter": "pm25", "value": 31.5, "unit": "µg/m³"}, {"station_id": "STA-002", "parameter": "noise", "value": 88.0, "unit": "dBA"} ] monitor.record_batch(readings) # Get current conditions conditions = monitor.get_current_conditions() for station, data in conditions.items(): print(f"\n{data['name']}:") for param, values in data['parameters'].items(): print(f" {param}: {values['value']} {values['unit']} - {values['status'].value}") # Check compliance compliance = monitor.check_compliance( datetime.now() - timedelta(days=1), datetime.now() ) for record in compliance: if record.status != ComplianceStatus.COMPLIANT: print(f"⚠️ {record.parameter.value}: {record.measured_value} vs limit {record.limit_value}") # Generate daily report report = monitor.generate_daily_report() print(f"\nDaily Status: {report.compliance_status.value}") print(f"Exceedances: {report.exceedances}") # Full report print(monitor.generate_report()) ``` ## Requirements ```bash pip install (no external dependencies) ```