#!/usr/bin/env python # # Install and run: # curl -O https://raw.githubusercontent.com/digitalinteraction/openmovement/master/Software/AX3/cwa-convert/python/cwa_metadata.py # python cwa_metadata.py CWA-DATA.CWA # # CWA Metadata Reader # Dan Jackson, Open Movement, 2017-2021 # # Output metadata from a .CWA file. # # Usage: python cwa_metadata.py [-mode:json | -mode:ldjson] CWA-DATA.CWA [...] # # Where: # -mode:json - Output human-readable formatted JSON (default) # -mode:ldjson - Output metadata on a single line per file (line-delimited JSON) # CWA-DATA.CWA - Name of one or more .CWA files to read metadata from # import sys from struct import * import time from datetime import datetime def read_timestamp(data): value = unpack('> 26) & 0x3f) + 2000 month = (value >> 22) & 0x0f day = (value >> 17) & 0x1f hours = (value >> 12) & 0x1f mins = (value >> 6) & 0x3f secs = (value >> 0) & 0x3f try: dt = datetime(year, month, day, hours, mins, secs) timestamp = (dt - datetime(1970, 1, 1)).total_seconds() return timestamp # return str(datetime.fromtimestamp(timestamp)) # return time.strptime(t, '%Y-%m-%d %H:%M:%S') except ValueError: print("WARNING: Invalid date:", value, year, month, day, hours, mins, secs) return -1 # 16-bit checksum (should sum to zero) def checksum(data): sum = 0 for i in range(0, len(data), 2): #value = data[i] | (data[i + 1] << 8) value = unpack(' 0: # Parse the percent-encoded hex digits value *= 16 if char >= 'a' and char <= 'f': value += ord(char) + 10 - ord('a') elif char >= 'A' and char <= 'F': value += ord(char) + 10 - ord('A') elif char >= '0' and char <= '9': value += ord(char) - ord('0') nibbles -= 1 if nibbles == 0: output.append(value) elif char == '+': # Treat plus as space (application/x-www-form-urlencoded) output.append(ord(' ')) else: # Preserve character output.append(ord(char)) return output.decode('utf-8') def cwa_parse_metadata(data): # Metadata represented as a dictionary metadata = {} # Shorthand name expansions shorthand = { "_c": "Study Centre", "_s": "Study Code", "_i": "Investigator", "_x": "Exercise Code", "_v": "Volunteer Num", "_p": "Body Location", "_so": "Setup Operator", "_n": "Notes", "_b": "Start time", "_e": "End time", "_ro": "Recovery Operator", "_r": "Retrieval Time", "_co": "Comments", "_sc": "Subject Code", "_se": "Sex", "_h": "Height", "_w": "Weight", "_ha": "Handedness", "_sn": "Subject Notes", } # CWA File has 448 bytes of metadata at offset 64 if sys.version_info[0] < 3: encString = str(data) else: encString = str(data, 'ascii') # Remove any trailing spaces, null, or 0xFF bytes encString = encString.rstrip('\x20\xff\x00') # Name-value pairs separated with ampersand nameValues = encString.split('&') # Each name-value pair separated with an equals for nameValue in nameValues: parts = nameValue.split('=') # Name is URL-encoded UTF-8 name = urldecode(parts[0]) if len(name) > 0: value = None if len(parts) > 1: # Value is URL-encoded UTF-8 value = urldecode(parts[1]) # Expand shorthand names name = shorthand.get(name, name) # Store metadata name-value pair metadata[name] = value # Metadata dictionary return metadata def cwa_header(block): header = {} if len(block) >= 512: packetHeader = unpack('BB', block[0:2]) # @ 0 +2 ASCII "MD", little-endian (0x444D) packetLength = unpack('= 508: header['packetLength'] = packetLength # unpack() <=little-endian, bB=s/u 8-bit, hH=s/u 16-bit, iI=s/u 32-bit hardwareType = unpack('B', block[4:5])[0] # @ 4 +1 Hardware type (0x00/0xff/0x17 = AX3, 0x64 = AX6) header['hardwareType'] = hardwareType if hardwareType == 0x00 or hardwareType == 0xff: hardwareType = 0x17 if hardwareType == 0x17: header['deviceType'] = 'AX3' elif hardwareType == 0x64: header['deviceType'] = 'AX6' else: header['deviceType'] = hex(hardwareType)[2:] # BCD header['deviceId'] = unpack('> (rate >> 6)). header['lastChange'] = read_timestamp(block[37:41]) # @37 +4 Last change metadata time header['firmwareRevision'] = unpack('B', block[41:42])[0] # @41 +1 Firmware revision number # header['timeZone'] = unpack('> (rateCode >> 6)) return header def cwa_data(block, extractData=False): data = {} if len(block) >= 512: packetHeader = unpack('BB', block[0:2]) # @ 0 +2 ASCII "AX", little-endian (0x5841) packetLength = unpack('> (rate >> 6)). numAxesBPS = unpack('B', block[25:26])[0] # @25 +1 0x32 (top nibble: number of axes = 3; bottom nibble: packing format - 2 = 3x 16-bit signed, 0 = 3x 10-bit signed + 2-bit exponent) timestampOffset = unpack('> (rateCode >> 6) frequency = 3200 / (1 << (15 - (rateCode & 0x0f))) data['frequency'] = frequency timeFractional = 0; # if top-bit set, we have a fractional date if deviceFractional & 0x8000: # Need to undo backwards-compatible shim by calculating how many whole samples the fractional part of timestamp accounts for. timeFractional = (deviceFractional & 0x7fff) << 1 # use original deviceId field bottom 15-bits as 16-bit fractional time timestampOffset += (timeFractional * int(frequency)) >> 16 # undo the backwards-compatible shift (as we have a true fractional) # Add fractional time to timestamp timestamp += timeFractional / 65536 data['timestamp'] = timestamp data['timestampOffset'] = timestampOffset data['timestampTime'] = timestamp_string(data['timestamp']) # Maximum samples per sector channels = (numAxesBPS >> 4) & 0x0f bytesPerAxis = numAxesBPS & 0x0f bytesPerSample = 4 if bytesPerAxis == 0 and channels == 3: bytesPerSample = 4 elif bytesPerAxis > 0 and channels > 0: bytesPerSample = bytesPerAxis * channels samplesPerSector = 480 // bytesPerSample data['channels'] = channels data['bytesPerAxis'] = bytesPerAxis # 0 for DWORD packing data['bytesPerSample'] = bytesPerSample data['samplesPerSector'] = samplesPerSector # Axes accelAxis = -1 gyroAxis = -1 magAxis = -1 if channels >= 6: gyroAxis = 0 accelAxis = 3 if channels >= 9: magAxis = 6 elif channels >= 3: accelAxis = 0 # Default units/scaling/range accelUnit = 256 # 1g = 256 gyroRange = 2000 # 32768 = 2000dps magUnit = 16 # 1uT = 16 # light is least significant 10 bits, accel scale 3-MSB, gyro scale next 3 bits: AAAGGGLLLLLLLLLL accelUnit = 1 << (8 + ((light >> 13) & 0x07)) if ((light >> 10) & 0x07) != 0: gyroRange = 8000 // (1 << ((light >> 10) & 0x07)) # Scale #accelScale = 1.0 / accelUnit #gyroScale = float(gyroRange) / 32768 #magScale = 1.0 / magUnit # Range accelRange = 16 if rateCode != 0: accelRange = 16 >> (rateCode >> 6) #magRange = 32768 / magUnit # Unit gyroUnit = 32768.0 / gyroRange if accelAxis >= 0: data['accelAxis'] = accelAxis data['accelRange'] = accelRange data['accelUnit'] = accelUnit if gyroAxis >= 0: data['gyroAxis'] = gyroAxis data['gyroRange'] = gyroRange data['gyroUnit'] = gyroUnit if magAxis >= 0: data['magAxis'] = magAxis data['magRange'] = magRange data['magUnit'] = magUnit # Read sample values if extractData: if accelAxis >= 0: accelSamples = [[0, 0, 0]] * data['sampleCount'] if bytesPerAxis == 0 and channels == 3: for i in range(data['sampleCount']): ofs = 30 + i * 4 #val = block[i] | (block[i + 1] << 8) | (block[i + 2] << 16) | (block[i + 3] << 24) val = unpack('> 30) & 3)) accelSamples[i][0] = (short_sign_extend((0xffc0 & (val << 6))) >> ex) / accelUnit accelSamples[i][1] = (short_sign_extend((0xffc0 & (val >> 4))) >> ex) / accelUnit accelSamples[i][2] = (short_sign_extend((0xffc0 & (val >> 14))) >> ex) / accelUnit elif bytesPerAxis == 2: for i in range(data['sampleCount']): ofs = 30 + (i * 2 * channels) + 2 * accelAxis accelSamples[i][0] = (block[ofs + 0] | (block[ofs + 1] << 8)) / accelUnit accelSamples[i][1] = (block[ofs + 2] | (block[ofs + 3] << 8)) / accelUnit accelSamples[i][2] = (block[ofs + 4] | (block[ofs + 5] << 8)) / accelUnit data['samplesAccel'] = accelSamples if gyroAxis >= 0 and bytesPerAxis == 2: gyroSamples = [[0, 0, 0]] * data['sampleCount'] for i in range(data['sampleCount']): ofs = 30 + (i * 2 * channels) + 2 * gyroAxis gyroSamples[i][0] = (block[ofs + 0] | (block[ofs + 1] << 8)) / gyroUnit gyroSamples[i][1] = (block[ofs + 2] | (block[ofs + 3] << 8)) / gyroUnit gyroSamples[i][2] = (block[ofs + 4] | (block[ofs + 5] << 8)) / gyroUnit data['samplesGyro'] = gyroSamples if magAxis >= 0 and bytesPerAxis == 2: magSamples = [[0, 0, 0]] * data['sampleCount'] for i in range(data['sampleCount']): ofs = 30 + (i * 2 * channels) + 2 * magAxis magSamples[i][0] = (block[ofs + 0] | (block[ofs + 1] << 8)) / magUnit magSamples[i][1] = (block[ofs + 2] | (block[ofs + 3] << 8)) / magUnit magSamples[i][2] = (block[ofs + 4] | (block[ofs + 5] << 8)) / magUnit data['samplesMag'] = magSamples return data def cwa_info(filename): file = {} header = {} first = {} last = {} file['name'] = os.path.basename(filename) # Header with open(filename, "rb") as f: sectorSize = 512 # File length f.seek(0, 2) fileSize = f.tell() # Read header headerSize = 1024 f.seek(0) headerBytes = f.read(headerSize) header = cwa_header(headerBytes) if 'packetLength' not in header: raise Exception('Header invalid') headerSize = header['packetLength'] + 4 # Read first data sector f.seek(headerSize) firstBytes = f.read(sectorSize) first = cwa_data(firstBytes) # Read last data sector f.seek(fileSize - sectorSize) lastBytes = f.read(sectorSize) last = cwa_data(lastBytes) # Update file metadata file['size'] = fileSize file['sectorSize'] = sectorSize file['headerSize'] = headerSize if fileSize >= headerSize: file['numSectors'] = (fileSize - headerSize) // 512 else: file['numSectors'] = 0 # Samples per sector samplesPerSector = 0 if 'samplesPerSector' in first: samplesPerSector = first['samplesPerSector'] if 'samplesPerSector' in last: samplesPerSector = last['samplesPerSector'] file['samplesPerSector'] = samplesPerSector # Estimate total number of samples file['numSamples'] = file['numSectors'] * samplesPerSector duration = 0 if 'timestamp' in first and 'timestamp' in last: duration = last['timestamp'] - first['timestamp'] file['duration'] = duration # Mean rate (assuming no breaks) meanRate = 0 if duration != 0: meanRate = file['numSamples'] / duration file['meanRate'] = meanRate # Parse metadata info = {} info['file'] = file info['header'] = header info['first'] = first info['last'] = last # Metadata dictionary return info # Test function if __name__ == "__main__": import json import os mode = 'json' # '-mode:json', '-mode:ldjson', '-mode:size_rate' for filename in sys.argv[1:]: try: if filename[0:6] == '-mode:': mode = filename[6:] continue info = cwa_info(filename) if mode == 'json': print(json.dumps(info, indent=4, sort_keys=True)) elif mode == 'ldjson': print(json.dumps(info)) elif mode == 'size_rate': print('%s,%s,%s,%s,%s' % (info['file']['name'],info['file']['size']/1024/1024,info['file']['duration']/60/60/24,info['header']['sampleRate'],info['file']['meanRate'])) else: print('ERROR: Unknown output mode: %s' % mode) except Exception as e: #print('Exception ' + e.__doc__ + ' -- ' + str(e)) raise