# -*- coding: utf-8 -*- ############################################################################ # OpenMMS PCAP Check (Livox) # ############################################################################ # Version: 1.3.0 # # Date: September 2020 # # Author: Ryan Brazeal # # Email: ryan.brazeal@ufl.edu # # # # OPEN-SOURCE LICENSE INFO: # # # # This file is part of OpenMMS_OSS. # # # # OpenMMS_OSS is free software: you can redistribute it and/or modify # # it under the terms of the GNU General Public License as published by # # the Free Software Foundation, either version 3 of the License, or # # (at your option) any later version. # # # # OpenMMS_OSS is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # # GNU General Public License for more details. # # # # You should have received a copy of the GNU General Public License # # along with OpenMMS_OSS. If not, see . # # # ############################################################################ import time import sys import numpy as np import os import struct import matplotlib.pyplot as plt from datetime import datetime, timedelta from tqdm import tqdm from pathlib import Path import multiprocessing as mp def iHeartLidar2(logFile): print("\n XXX XXX ") time.sleep(0.1) print(" IIIII X XX XX X L DDDD AAAA RRRRR") time.sleep(0.1) print(" I X X X L i D D A A R R") time.sleep(0.1) print(" I X X L D D A A R R") time.sleep(0.1) print(" I XX XX L i D D AAAAAA RRRRR") time.sleep(0.1) print(" I XX XX L i D D A A R R") time.sleep(0.1) print(" I XX XX L i D D A A R R") time.sleep(0.1) print(" IIIII XXX LLLLLL i DDDD A A R R") time.sleep(0.1) print(" X \n\n") logFile.write("\n XXX XXX \n") logFile.write(" IIIII X XX XX X L DDDD AAAA RRRRR\n") logFile.write(" I X X X L i D D A A R R\n") logFile.write(" I X X L D D A A R R\n") logFile.write(" I XX XX L i D D AAAAAA RRRRR\n") logFile.write(" I XX XX L i D D A A R R\n") logFile.write(" I XX XX L i D D A A R R\n") logFile.write(" IIIII XXX LLLLLL i DDDD A A R R\n") logFile.write(" X \n\n") def OpenMMS1(): print(" _________ _______ _____ ______ ______ ________") time.sleep(0.1) print("|\\ ___ \\ ________ ________ ________ |\\ __ \\ __ \\|\\ __ \\ __ \\|\\ ____\\") time.sleep(0.1) print("\\|\\ \\_|\\ \\|\\ __ \\|\\ __ \\|\\ ___ \\|\\ \\|\\ \\|\\ \\|\\ \\|\\ \\|\\ \\|\\ \\___|_") time.sleep(0.1) print(" \\|\\ \\\\|\\ \\|\\ \\|\\ \\|\\ \\|\\ \\|\\ \\\\|\\ \\|\\ \\|\\__\\|\\ \\|\\ \\|\\__\\|\\ \\|\\_____ \\") time.sleep(0.1) print(" \\|\\ \\\\|\\ \\|\\ ____\\|\\ ____\\|\\ \\\\|\\ \\|\\ \\|__|\\|\\ \\|\\ \\|__|\\|\\ \\|____|\\ \\") time.sleep(0.1) print(" \\|\\ \\\\_\\ \\|\\ \\___|\\|\\ \\___|\\|\\ \\\\|\\ \\|\\ \\ \\|\\ \\|\\ \\ \\|\\ \\ __\\_\\ \\") time.sleep(0.1) print(" \\|\\________\\|\\__\\ \\|\\______\\\\|\\__\\\\|\\__\\|\\__\\ \\|\\__\\|\\__\\ \\|\\__\\|\\_______\\") time.sleep(0.1) print(" \\|________|\\|___| \\|_______|\\|___|\\|__|\\|___| \\|__|\\|___| \\|__|\\|________|\n\n") def OpenMMS1_log(logFile): logFile.write(" _________ _______ _____ ______ ______ ________\n") logFile.write("|\\ ___ \\ ________ ________ ________ |\\ __ \\ __ \\|\\ __ \\ __ \\|\\ ____\\\n") logFile.write("\\|\\ \\_|\\ \\|\\ __ \\|\\ __ \\|\\ ___ \\|\\ \\|\\ \\|\\ \\|\\ \\|\\ \\|\\ \\|\\ \\___|_\n") logFile.write(" \\|\\ \\\\|\\ \\|\\ \\|\\ \\|\\ \\|\\ \\|\\ \\\\|\\ \\|\\ \\|\\__\\|\\ \\|\\ \\|\\__\\|\\ \\|\\_____ \\\n") logFile.write(" \\|\\ \\\\|\\ \\|\\ ____\\|\\ ____\\|\\ \\\\|\\ \\|\\ \\|__|\\|\\ \\|\\ \\|__|\\|\\ \\|____|\\ \\\n") logFile.write(" \\|\\ \\\\_\\ \\|\\ \\___|\\|\\ \\___|\\|\\ \\\\|\\ \\|\\ \\ \\|\\ \\|\\ \\ \\|\\ \\ __\\_\\ \\\n") logFile.write(" \\|\\________\\|\\__\\ \\|\\______\\\\|\\__\\\\|\\__\\|\\__\\ \\|\\__\\|\\__\\ \\|\\__\\|\\_______\\\n") logFile.write(" \\|________|\\|___| \\|_______|\\|___|\\|__|\\|___| \\|__|\\|___| \\|__|\\|________|\n\n") def parseChunk(statusD, statsD, pointsD, start_time_ref, lock, chunkID, pointPackets, firmware): temp_warnings = 0 volt_warnings = 0 motor_warnings = 0 temp_errors = 0 volt_errors = 0 motor_errors = 0 dirty_warnings = 0 firmware_errors = 0 pps_warnings = 0 device_warnings = 0 self_heating_warnings = 0 time_sync_warnings = 0 numReturn1Pts = 0 numReturn2Pts = 0 numReturn3Pts = 0 numNullPts = 0 points_per_sec = 0 point_times_quant = [] time_ref_prev = start_time_ref descStr = "Core " + str(chunkID+1) if chunkID+1 > 9: descStr += " " else: descStr += " " lock.acquire() pbar = tqdm(total=len(pointPackets),unit=" packets",desc=descStr,position=chunkID) lock.release() for i in range(0,len(pointPackets)): lock.acquire() pbar.update() lock.release() time_ref = pointPackets[i][0] data_pc = pointPackets[i][1] time_since_start = time_ref - time_ref_prev if time_since_start > timedelta(seconds=1): point_times_quant.append([time_ref, points_per_sec]) time_ref_prev = time_ref points_per_sec = 0 # version = int.from_bytes(data_pc[42:43], byteorder='little') # slot_id = int.from_bytes(data_pc[43:44], byteorder='little') # lidar_id = int.from_bytes(data_pc[44:45], byteorder='little') timestamp_type = int.from_bytes(data_pc[50:51], byteorder='little') datatype = int.from_bytes(data_pc[51:52], byteorder='little') status_bits = str(bin(int.from_bytes(data_pc[46:47], byteorder='little')))[2:].zfill(8) status_bits += str(bin(int.from_bytes(data_pc[47:48], byteorder='little')))[2:].zfill(8) status_bits += str(bin(int.from_bytes(data_pc[48:49], byteorder='little')))[2:].zfill(8) status_bits += str(bin(int.from_bytes(data_pc[49:50], byteorder='little')))[2:].zfill(8) temp_status = int(status_bits[0:2], 2) volt_status = int(status_bits[2:4], 2) motor_status = int(status_bits[4:6], 2) dirty_status = int(status_bits[6:8], 2) firmware_status = int(status_bits[8:9], 2) pps_status = int(status_bits[9:10], 2) device_status = int(status_bits[10:11], 2) # fan_status = int(status_bits[11:12], 2) self_heating_status = int(status_bits[12:13], 2) # ptp_status = int(status_bits[13:14], 2) time_sync_status = int(status_bits[14:16], 2) # system_status = int(status_bits[30:], 2) if temp_status == 1: temp_warnings += 1 if volt_status == 1: volt_warnings += 1 if motor_status == 1: motor_warnings += 1 if temp_status == 2: temp_errors += 1 if volt_status == 2: volt_errors += 1 if motor_status == 2: motor_errors += 1 if dirty_status == 1: dirty_warnings += 1 if firmware_status == 1: firmware_errors += 1 if pps_status == 1: pps_warnings += 1 if device_status == 1: device_warnings += 1 if self_heating_status == 0: self_heating_warnings += 1 if time_sync_status == 0 or time_sync_status == 4: time_sync_warnings += 1 if timestamp_type == 4: #mid-40 spherical data if datatype == 1: time_delta = timedelta(microseconds=10) time_divisor = 1 if firmware == "03.03.0006": time_divisor = 2 elif firmware == "03.03.0007": time_divisor = 3 time_delta = timedelta(microseconds=16.667) pt_time = time_ref - time_delta byte_pos = 60 for j in range(0,100): zeroORtwoORthree = j % time_divisor pt_time += float(not (zeroORtwoORthree)) * time_delta returnNum = 1 + zeroORtwoORthree distance = float(struct.unpack(' '" + log_filename + "' ***") log_filename = filePath / log_filename bytesRead = 0 pcapSize = os.path.getsize(pcap_filename) pcap = open(pcap_filename, 'rb') # PCAP file header b = bytearray(pcap.read(24)) # magic_num = struct.unpack(' 0: timestamp_sec = 0 # nanosecond timestamp if timestamp_type == 1 or timestamp_type == 4: timestamp_sec = round(float(struct.unpack(' 0: run = True while run: designNum = len(pointPackets) // numCores if designNum < 10000: numCores -= 1 if numCores == 0: numCores = 1 run = False else: run = False else: print("\n\n****************************************************************") print("************* COULD NOT READ CPU INFORMATION, ODD? *************") print("****************************************************************\n\n") foundStuff = False #for debugging purposes # numCores = 1 if foundStuff: chunkIndex = [0] chunkSize = int(np.floor(float(len(pointPackets)) / float(numCores))) for i in range(1,numCores): chunkIndex.append(i * chunkSize) chunkIndex.append(len(pointPackets)) print("\nParsing observations from binary packet data using " + str(len(chunkIndex)-1) + " CPU core(s), please wait...\n") procs = [] manager = mp.Manager() statusD = manager.dict() statsD = manager.dict() pointsD = manager.dict() lock = mp.Lock() for i in range(0,len(chunkIndex)-1): statusD[i] = 0 for i in range(0,len(chunkIndex)-1): proc1 = mp.Process(target=parseChunk,args=(statusD, statsD, pointsD, pointPackets[0][0], lock, i, pointPackets[chunkIndex[i]:chunkIndex[i+1]], firmware)) procs.append(proc1) for iproc in procs: iproc.start() procs[len(procs)-1].join() temp_warnings = 0 volt_warnings = 0 motor_warnings = 0 temp_errors = 0 volt_errors = 0 motor_errors = 0 dirty_warnings = 0 firmware_errors = 0 pps_warnings = 0 device_warnings = 0 self_heating_warnings = 0 time_sync_warnings = 0 numReturn1Pts = 0 numReturn2Pts = 0 numReturn3Pts = 0 numNullPts = 0 mission_times = [] mission_points = [] for i in range(0,len(chunkIndex)-1): temp_warnings += statsD[i][0] volt_warnings += statsD[i][1] motor_warnings += statsD[i][2] temp_errors += statsD[i][3] volt_errors += statsD[i][4] motor_errors += statsD[i][5] dirty_warnings += statsD[i][6] firmware_errors += statsD[i][7] pps_warnings += statsD[i][8] device_warnings += statsD[i][9] self_heating_warnings += statsD[i][10] time_sync_warnings += statsD[i][11] numReturn1Pts += statsD[i][12] numReturn2Pts += statsD[i][13] numReturn3Pts += statsD[i][14] numNullPts += statsD[i][15] points_chunk = pointsD[i] for j in range(0,len(points_chunk)): mission_times.append(points_chunk[j][0]) mission_points.append(points_chunk[j][1]) total_points = (numReturn1Pts + numReturn2Pts + numReturn3Pts + numNullPts) if total_points > 1: timeEnd = time.time() print("\n\n COMPLETED PCAP CHECK IN " + str(round((timeEnd - timeStart) / 60.,2)) + " mins \n") logFile = open(log_filename, 'w') OpenMMS1_log(logFile) stats = ("\n ********************* PCAP CHECK RESULTS ********************\n\n") stats += (" INPUT: "+ pcapName +" at "+"{:,}".format(pcapSize)+" bytes\n\n") stats += (" Time of Initial PPS Pulse: " + str(time_ref) + "\n") stats += (" Initial PPS before Start Data: " + str(np.round(data_time_offset,3)) + "s\n\n") stats += (" First Data Time: " + str(pointPackets[0][0]) + " UTC\n") stats += (" Last Data Time: " + str(pointPackets[len(pointPackets)-1][0]) + " UTC\n") stats += (" Data Duration: " + str(pointPackets[len(pointPackets)-1][0] - pointPackets[0][0]) + "\n\n") stats += (" Total Packets: " + "{:,}".format(len(pointPackets)) + "\n") stats += (" Total Points: " + "{:,}".format(total_points - numNullPts) + "\n") stats += (" Returns: " + "1st: " + "{:,}".format(numReturn1Pts) + "\n") stats += (" " + "2nd: " + "{:,}".format(numReturn2Pts) + "\n") if firmware == "03.03.0007": stats += (" " + "3rd: " + "{:,}".format(numReturn3Pts) + "\n") stats += ("\n LIVOX LIDAR SENSOR MESSAGES\n\n") stats += (" Temperature Errors: " + str(temp_errors) + " Temperature Warnings: " + str(temp_warnings) + "\n") stats += (" Voltage Errors: " + str(volt_errors) + " Voltage Warnings: " + str(volt_warnings) + "\n") stats += (" Motor Errors: " + str(motor_errors) + " Motor Warnings: " + str(motor_warnings) + "\n") stats += (" Dirty Warnings: " + str(dirty_warnings) + "\n") stats += (" PPS Warnings: " + str(pps_warnings) + "\n") stats += (" Time Sync Warnings: " + str(time_sync_warnings) + "\n") stats += (" End Of Service Life Warnings: " + str(device_warnings) + "\n") stats += ("\n ***************************************************************\n") logFile.write(stats) print(stats) plt.figure(figsize=(10,5)) plt.yscale("log") plt.xlabel('Mission Time (UTC)') plt.ylabel('Points Per Second (logarithmic scale)') plt.title('Collected Points during Mission (' + pcapName + ')') plt.plot(mission_times, mission_points, 'g.', ms=1) #'g,') max_pts = np.max(mission_points) plt.ylim(bottom=1, top=(max_pts*10)) plt.savefig(filePath / ("Collected Points - " + timeStr + ".jpg")) plt.show() print(" *** Note: Collected Points plot saved as image ***\n") iHeartLidar2(logFile) logFile.close() else: print("\n\n****************************************************************") print("*********** NO DATA AND/OR POSITION INFO WAS FOUND *************") print("****************************************************************\n\n") ### command line start if __name__ == "__main__": print("\nLivox PCAP File Check Started...") pcap_filename = sys.argv[1] livox_filename = sys.argv[2] main(pcap_filename, livox_filename)