--- name: drone-site-survey description: "Process drone survey data for construction sites. Generate orthomosaics, DEMs, point clouds, calculate volumes, track progress, and integrate with BIM models for comparison." --- # Drone Site Survey Processing ## Overview This skill implements drone data processing for construction site monitoring. Process aerial imagery to generate maps, measure volumes, track progress, and compare with design models. **Capabilities:** - Orthomosaic generation - Digital Elevation Model (DEM) creation - Point cloud processing - Volume calculations - Progress monitoring - BIM comparison - Stockpile measurement ## Quick Start ```python from dataclasses import dataclass from typing import List, Dict, Tuple, Optional from datetime import datetime import numpy as np @dataclass class DroneImage: filename: str timestamp: datetime latitude: float longitude: float altitude: float heading: float pitch: float roll: float camera_model: str @dataclass class PointCloud: points: np.ndarray # Nx3 array colors: Optional[np.ndarray] = None # Nx3 RGB normals: Optional[np.ndarray] = None # Nx3 @dataclass class VolumeResult: volume_m3: float area_m2: float method: str reference_plane: str confidence: float def calculate_volume_simple(point_cloud: PointCloud, reference_z: float = None) -> VolumeResult: """Simple volume calculation from point cloud""" points = point_cloud.points if reference_z is None: reference_z = np.min(points[:, 2]) # Grid-based volume calculation x_min, x_max = np.min(points[:, 0]), np.max(points[:, 0]) y_min, y_max = np.min(points[:, 1]), np.max(points[:, 1]) grid_size = 0.5 # 50cm grid x_bins = np.arange(x_min, x_max + grid_size, grid_size) y_bins = np.arange(y_min, y_max + grid_size, grid_size) volume = 0 cell_area = grid_size ** 2 for i in range(len(x_bins) - 1): for j in range(len(y_bins) - 1): mask = ( (points[:, 0] >= x_bins[i]) & (points[:, 0] < x_bins[i + 1]) & (points[:, 1] >= y_bins[j]) & (points[:, 1] < y_bins[j + 1]) ) cell_points = points[mask] if len(cell_points) > 0: max_z = np.max(cell_points[:, 2]) height = max_z - reference_z if height > 0: volume += height * cell_area area = (x_max - x_min) * (y_max - y_min) return VolumeResult( volume_m3=volume, area_m2=area, method='grid_based', reference_plane=f'z={reference_z:.2f}', confidence=0.9 ) # Example usage sample_points = np.random.rand(10000, 3) * [100, 100, 10] # 100x100m, 10m height point_cloud = PointCloud(points=sample_points) result = calculate_volume_simple(point_cloud) print(f"Volume: {result.volume_m3:.2f} m³, Area: {result.area_m2:.2f} m²") ``` ## Comprehensive Drone Survey System ### Image Processing Pipeline ```python from dataclasses import dataclass, field from typing import List, Dict, Tuple, Optional from datetime import datetime import numpy as np from pathlib import Path import json @dataclass class CameraParameters: focal_length_mm: float sensor_width_mm: float sensor_height_mm: float image_width_px: int image_height_px: int @dataclass class GeoReference: crs: str # Coordinate Reference System (e.g., "EPSG:4326") origin: Tuple[float, float, float] # lat, lon, alt rotation: Tuple[float, float, float] # heading, pitch, roll @dataclass class SurveyFlight: flight_id: str date: datetime site_name: str images: List[DroneImage] camera: CameraParameters geo_reference: GeoReference flight_altitude: float overlap_forward: float = 0.8 overlap_side: float = 0.7 gsd: float = 0 # Ground Sample Distance (cm/pixel) def __post_init__(self): if self.gsd == 0 and self.camera: # Calculate GSD sensor_width = self.camera.sensor_width_mm focal_length = self.camera.focal_length_mm image_width = self.camera.image_width_px altitude = self.flight_altitude self.gsd = (altitude * sensor_width) / (focal_length * image_width) * 100 # cm @dataclass class ProcessingResult: orthomosaic_path: Optional[str] = None dem_path: Optional[str] = None dsm_path: Optional[str] = None point_cloud_path: Optional[str] = None report_path: Optional[str] = None statistics: Dict = field(default_factory=dict) class DroneDataProcessor: """Process drone survey data""" def __init__(self, output_dir: str): self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) def process_survey(self, flight: SurveyFlight, generate_ortho: bool = True, generate_dem: bool = True, generate_pointcloud: bool = True) -> ProcessingResult: """Process drone survey data""" result = ProcessingResult() result.statistics['flight_id'] = flight.flight_id result.statistics['image_count'] = len(flight.images) result.statistics['gsd_cm'] = flight.gsd result.statistics['flight_date'] = flight.date.isoformat() # In production, these would call actual photogrammetry libraries # like OpenDroneMap, Pix4D API, or custom SfM pipeline if generate_ortho: result.orthomosaic_path = str(self.output_dir / f"{flight.flight_id}_ortho.tif") result.statistics['ortho_resolution'] = flight.gsd if generate_dem: result.dem_path = str(self.output_dir / f"{flight.flight_id}_dem.tif") result.dsm_path = str(self.output_dir / f"{flight.flight_id}_dsm.tif") if generate_pointcloud: result.point_cloud_path = str(self.output_dir / f"{flight.flight_id}_pointcloud.las") # Generate report result.report_path = str(self.output_dir / f"{flight.flight_id}_report.json") with open(result.report_path, 'w') as f: json.dump(result.statistics, f, indent=2) return result def extract_point_cloud(self, las_path: str) -> PointCloud: """Extract point cloud from LAS file""" # In production, use laspy or similar # Simulated point cloud for demonstration n_points = 100000 points = np.random.rand(n_points, 3) * [100, 100, 20] colors = np.random.randint(0, 255, (n_points, 3), dtype=np.uint8) return PointCloud(points=points, colors=colors) def compare_surveys(self, survey1: ProcessingResult, survey2: ProcessingResult) -> Dict: """Compare two surveys for change detection""" # Load point clouds pc1 = self.extract_point_cloud(survey1.point_cloud_path) pc2 = self.extract_point_cloud(survey2.point_cloud_path) # Calculate elevation differences # In production, use proper point cloud registration and comparison comparison = { 'survey1_date': survey1.statistics.get('flight_date'), 'survey2_date': survey2.statistics.get('flight_date'), 'point_count_diff': len(pc2.points) - len(pc1.points), 'changes_detected': [] } return comparison ``` ### Volume Calculation Engine ```python from scipy.spatial import Delaunay from scipy.interpolate import griddata import numpy as np class VolumeCalculator: """Advanced volume calculations from drone data""" def __init__(self, point_cloud: PointCloud): self.points = point_cloud.points self.colors = point_cloud.colors def calculate_cut_fill(self, design_surface: np.ndarray, grid_size: float = 0.5) -> Dict: """Calculate cut and fill volumes compared to design surface""" # Create grid x_min, x_max = np.min(self.points[:, 0]), np.max(self.points[:, 0]) y_min, y_max = np.min(self.points[:, 1]), np.max(self.points[:, 1]) x_grid = np.arange(x_min, x_max, grid_size) y_grid = np.arange(y_min, y_max, grid_size) xx, yy = np.meshgrid(x_grid, y_grid) # Interpolate actual surface actual_z = griddata( self.points[:, :2], self.points[:, 2], (xx, yy), method='linear' ) # Interpolate design surface design_z = griddata( design_surface[:, :2], design_surface[:, 2], (xx, yy), method='linear' ) # Calculate differences diff = actual_z - design_z cell_area = grid_size ** 2 cut_volume = np.nansum(diff[diff > 0]) * cell_area fill_volume = np.nansum(np.abs(diff[diff < 0])) * cell_area net_volume = cut_volume - fill_volume return { 'cut_volume_m3': float(cut_volume), 'fill_volume_m3': float(fill_volume), 'net_volume_m3': float(net_volume), 'balance': 'cut' if net_volume > 0 else 'fill', 'grid_size_m': grid_size, 'area_m2': float((x_max - x_min) * (y_max - y_min)) } def calculate_stockpile_volume(self, base_method: str = 'lowest_perimeter') -> VolumeResult: """Calculate stockpile volume""" # Find boundary points from scipy.spatial import ConvexHull hull = ConvexHull(self.points[:, :2]) boundary_points = self.points[hull.vertices] if base_method == 'lowest_perimeter': reference_z = np.min(boundary_points[:, 2]) elif base_method == 'average_perimeter': reference_z = np.mean(boundary_points[:, 2]) elif base_method == 'triangulated': # Create base triangulation from boundary reference_z = np.min(self.points[:, 2]) else: reference_z = np.min(self.points[:, 2]) # Calculate volume using triangulated surface volume = self._triangulated_volume(reference_z) hull_area = self._calculate_hull_area(boundary_points[:, :2]) return VolumeResult( volume_m3=volume, area_m2=hull_area, method='triangulated_' + base_method, reference_plane=f'z={reference_z:.2f}', confidence=0.95 ) def _triangulated_volume(self, reference_z: float) -> float: """Calculate volume using Delaunay triangulation""" # Create 2D triangulation points_2d = self.points[:, :2] tri = Delaunay(points_2d) volume = 0 for simplex in tri.simplices: # Get triangle vertices p1 = self.points[simplex[0]] p2 = self.points[simplex[1]] p3 = self.points[simplex[2]] # Calculate prism volume avg_height = (p1[2] + p2[2] + p3[2]) / 3 - reference_z if avg_height > 0: # Triangle area area = 0.5 * abs( (p2[0] - p1[0]) * (p3[1] - p1[1]) - (p3[0] - p1[0]) * (p2[1] - p1[1]) ) volume += area * avg_height return volume def _calculate_hull_area(self, points_2d: np.ndarray) -> float: """Calculate area of convex hull""" hull = ConvexHull(points_2d) return hull.volume # In 2D, volume is actually area def generate_contours(self, interval: float = 1.0) -> List[Dict]: """Generate elevation contours""" z_min = np.min(self.points[:, 2]) z_max = np.max(self.points[:, 2]) contour_levels = np.arange( np.floor(z_min / interval) * interval, np.ceil(z_max / interval) * interval + interval, interval ) contours = [] for level in contour_levels: # In production, use proper contouring algorithm contours.append({ 'elevation': float(level), 'type': 'major' if level % 5 == 0 else 'minor' }) return contours ``` ### Progress Monitoring ```python from datetime import date from typing import List, Dict @dataclass class ProgressPoint: date: date point_cloud: PointCloud orthomosaic_path: str annotations: Dict = field(default_factory=dict) class ConstructionProgressMonitor: """Monitor construction progress from drone surveys""" def __init__(self, project_name: str): self.project_name = project_name self.surveys: List[ProgressPoint] = [] self.volume_calculator = None def add_survey(self, survey_date: date, point_cloud: PointCloud, orthomosaic_path: str): """Add survey for progress tracking""" self.surveys.append(ProgressPoint( date=survey_date, point_cloud=point_cloud, orthomosaic_path=orthomosaic_path )) self.surveys.sort(key=lambda x: x.date) def calculate_earthwork_progress(self, design_surface: np.ndarray) -> List[Dict]: """Calculate earthwork progress over time""" progress = [] for i, survey in enumerate(self.surveys): calc = VolumeCalculator(survey.point_cloud) cut_fill = calc.calculate_cut_fill(design_surface) progress.append({ 'date': survey.date.isoformat(), 'survey_index': i + 1, 'cut_volume_m3': cut_fill['cut_volume_m3'], 'fill_volume_m3': cut_fill['fill_volume_m3'], 'net_volume_m3': cut_fill['net_volume_m3'] }) # Calculate progress percentages if len(progress) > 1: initial = progress[0] for p in progress[1:]: if initial['net_volume_m3'] != 0: p['progress_pct'] = ( (initial['net_volume_m3'] - p['net_volume_m3']) / abs(initial['net_volume_m3']) * 100 ) else: p['progress_pct'] = 0 return progress def detect_changes(self, survey1_idx: int, survey2_idx: int, threshold_m: float = 0.5) -> Dict: """Detect changes between two surveys""" pc1 = self.surveys[survey1_idx].point_cloud pc2 = self.surveys[survey2_idx].point_cloud # Create grids for comparison grid_size = 1.0 # 1m grid x_min = min(np.min(pc1.points[:, 0]), np.min(pc2.points[:, 0])) x_max = max(np.max(pc1.points[:, 0]), np.max(pc2.points[:, 0])) y_min = min(np.min(pc1.points[:, 1]), np.min(pc2.points[:, 1])) y_max = max(np.max(pc1.points[:, 1]), np.max(pc2.points[:, 1])) x_bins = np.arange(x_min, x_max + grid_size, grid_size) y_bins = np.arange(y_min, y_max + grid_size, grid_size) changes = { 'increased': [], # Areas where elevation increased 'decreased': [], # Areas where elevation decreased 'unchanged': 0 } for i in range(len(x_bins) - 1): for j in range(len(y_bins) - 1): # Get points in cell for both surveys cell_x = (x_bins[i] + x_bins[i + 1]) / 2 cell_y = (y_bins[j] + y_bins[j + 1]) / 2 mask1 = ( (pc1.points[:, 0] >= x_bins[i]) & (pc1.points[:, 0] < x_bins[i + 1]) & (pc1.points[:, 1] >= y_bins[j]) & (pc1.points[:, 1] < y_bins[j + 1]) ) mask2 = ( (pc2.points[:, 0] >= x_bins[i]) & (pc2.points[:, 0] < x_bins[i + 1]) & (pc2.points[:, 1] >= y_bins[j]) & (pc2.points[:, 1] < y_bins[j + 1]) ) if np.sum(mask1) > 0 and np.sum(mask2) > 0: z1 = np.mean(pc1.points[mask1, 2]) z2 = np.mean(pc2.points[mask2, 2]) diff = z2 - z1 if diff > threshold_m: changes['increased'].append({ 'x': cell_x, 'y': cell_y, 'change_m': diff }) elif diff < -threshold_m: changes['decreased'].append({ 'x': cell_x, 'y': cell_y, 'change_m': diff }) else: changes['unchanged'] += 1 changes['summary'] = { 'increased_cells': len(changes['increased']), 'decreased_cells': len(changes['decreased']), 'unchanged_cells': changes['unchanged'], 'date_from': self.surveys[survey1_idx].date.isoformat(), 'date_to': self.surveys[survey2_idx].date.isoformat() } return changes def generate_progress_report(self, output_path: str, design_surface: np.ndarray = None) -> str: """Generate progress report""" import pandas as pd report_data = { 'project': self.project_name, 'surveys': len(self.surveys), 'date_range': { 'start': self.surveys[0].date.isoformat() if self.surveys else None, 'end': self.surveys[-1].date.isoformat() if self.surveys else None } } if design_surface is not None: report_data['earthwork_progress'] = self.calculate_earthwork_progress(design_surface) with pd.ExcelWriter(output_path, engine='openpyxl') as writer: # Summary pd.DataFrame([report_data]).to_excel(writer, sheet_name='Summary', index=False) # Survey list survey_data = [{ 'Date': s.date, 'Orthomosaic': s.orthomosaic_path } for s in self.surveys] pd.DataFrame(survey_data).to_excel(writer, sheet_name='Surveys', index=False) # Progress (if available) if 'earthwork_progress' in report_data: pd.DataFrame(report_data['earthwork_progress']).to_excel( writer, sheet_name='Progress', index=False ) return output_path ``` ### BIM Comparison ```python class BIMDroneComparator: """Compare drone survey with BIM model""" def __init__(self, bim_surface: np.ndarray, drone_pointcloud: PointCloud): self.bim = bim_surface self.drone = drone_pointcloud def compare_elevations(self, grid_size: float = 1.0) -> Dict: """Compare drone elevations with BIM design""" # Create comparison grid x_min = max(np.min(self.bim[:, 0]), np.min(self.drone.points[:, 0])) x_max = min(np.max(self.bim[:, 0]), np.max(self.drone.points[:, 0])) y_min = max(np.min(self.bim[:, 1]), np.min(self.drone.points[:, 1])) y_max = min(np.max(self.bim[:, 1]), np.max(self.drone.points[:, 1])) x_grid = np.arange(x_min, x_max, grid_size) y_grid = np.arange(y_min, y_max, grid_size) xx, yy = np.meshgrid(x_grid, y_grid) # Interpolate both surfaces bim_z = griddata(self.bim[:, :2], self.bim[:, 2], (xx, yy), method='linear') drone_z = griddata( self.drone.points[:, :2], self.drone.points[:, 2], (xx, yy), method='linear' ) # Calculate differences diff = drone_z - bim_z valid_mask = ~np.isnan(diff) valid_diff = diff[valid_mask] return { 'mean_diff_m': float(np.mean(valid_diff)), 'std_diff_m': float(np.std(valid_diff)), 'max_diff_m': float(np.max(valid_diff)), 'min_diff_m': float(np.min(valid_diff)), 'rmse_m': float(np.sqrt(np.mean(valid_diff ** 2))), 'within_5cm': float(np.sum(np.abs(valid_diff) < 0.05) / len(valid_diff) * 100), 'within_10cm': float(np.sum(np.abs(valid_diff) < 0.10) / len(valid_diff) * 100), 'comparison_points': int(len(valid_diff)) } def find_deviations(self, tolerance_m: float = 0.1) -> List[Dict]: """Find areas with significant deviations""" deviations = [] grid_size = 1.0 # Grid comparison x_min = max(np.min(self.bim[:, 0]), np.min(self.drone.points[:, 0])) x_max = min(np.max(self.bim[:, 0]), np.max(self.drone.points[:, 0])) y_min = max(np.min(self.bim[:, 1]), np.min(self.drone.points[:, 1])) y_max = min(np.max(self.bim[:, 1]), np.max(self.drone.points[:, 1])) for x in np.arange(x_min, x_max, grid_size): for y in np.arange(y_min, y_max, grid_size): # Get average elevations bim_mask = ( (self.bim[:, 0] >= x) & (self.bim[:, 0] < x + grid_size) & (self.bim[:, 1] >= y) & (self.bim[:, 1] < y + grid_size) ) drone_mask = ( (self.drone.points[:, 0] >= x) & (self.drone.points[:, 0] < x + grid_size) & (self.drone.points[:, 1] >= y) & (self.drone.points[:, 1] < y + grid_size) ) if np.sum(bim_mask) > 0 and np.sum(drone_mask) > 0: bim_z = np.mean(self.bim[bim_mask, 2]) drone_z = np.mean(self.drone.points[drone_mask, 2]) diff = drone_z - bim_z if abs(diff) > tolerance_m: deviations.append({ 'x': x + grid_size / 2, 'y': y + grid_size / 2, 'bim_elevation': bim_z, 'actual_elevation': drone_z, 'deviation_m': diff, 'type': 'high' if diff > 0 else 'low' }) return deviations ``` ## Quick Reference | Measurement | Method | Accuracy | |-------------|--------|----------| | Stockpile Volume | Triangulated | ±2-5% | | Cut/Fill Volume | Grid comparison | ±5% | | Area Measurement | Orthomosaic | <1cm GSD | | Elevation (DEM) | Photogrammetry | ±2-5cm | | Progress Tracking | Multi-temporal | Relative | ## Resources - **OpenDroneMap**: https://www.opendronemap.org - **Pix4D**: https://www.pix4d.com - **LAStools**: https://rapidlasso.com/lastools/ - **DDC Website**: https://datadrivenconstruction.io ## Next Steps - See `progress-monitoring-cv` for image-based progress - See `bim-validation-pipeline` for model comparison - See `data-visualization` for 3D visualization