#!/usr/bin/env python
# coding=UTF-8
# CWA Drive Dump
from subprocess import check_output
import time
from datetime import timedelta
import platform
import ctypes
import sys
import os
import atexit
#import io
def driveDump(path, outputFile, mode, type):
blockSize = 128 * 1024
print("DriveDump:", path)
try:
print("Detecting device physical drive size...")
fileSize = findPhysicalDriveSize(path)
if fileSize is None or fileSize <= 0:
if fileSize is None:
print("WARNING: Detecting drive size is not supported on this platform.")
else:
print("WARNING: Problem determining drive size.")
if type == "ax3":
print("WARNING: Using default drive size for AX3")
fileSize = 992161 * 512
elif type == "ax6":
print("WARNING: Using default drive size for AX6")
fileSize = 1975995 * 512
elif type is None:
print("ERROR: Cannot use a default drive size as device type is unspecified. ")
return False
else:
print("ERROR: Cannot use a default drive size for unknown device type: " + type)
return False
unmountDrive(path)
startTime = time.time()
with open(path, 'rb') as fi:
with open(outputFile, mode) as fo:
writtenSize = 0
# Resume
offset = fo.tell()
fi.seek(offset)
while True:
data = fi.read(blockSize)
size = len(data)
if size <= 0:
break
written = fo.write(data)
fo.flush()
if written != size:
print("ERROR: Problem writing all of the data, wrote " + str(written) + " of " + str(size) + "")
break
offset += size
writtenSize += written
perc = round(100 * offset / fileSize, 1)
elapsed = time.time() - startTime
remaining = 0
if elapsed > 0:
rate = writtenSize / elapsed
if rate > 0:
remaining = (fileSize - offset) / rate
print("Dumping " + str(written) + " =" + str(writtenSize) + " @" + str(offset) + " /" + str(fileSize) + " (" + str(perc) + "%) in " + str(timedelta(seconds=int(elapsed))) + ", " + str(round(rate / 1024, 3)) + " kB/s, ETA " + str(timedelta(seconds=int(remaining))) + ".")
if offset >= fileSize:
break
except FileExistsError:
print("ERROR: Output file already exists. Remove, rename, or use options --overwrite or --resume: ", outputFile)
return False
except PermissionError:
if platform.system() == "Windows":
print("ERROR: Permission error -- you must run this in an Ctrl+Shift+Esc, Alt+F, N, cmd, 'Create this task with administrative privileges.'")
else:
print("ERROR: Permission error -- you must run this as root, try running prefixed with: sudo")
return False
except OSError as e:
print("ERROR: Problem accessing the device:", e)
if e.errno == 16:
print("ERROR: Resource busy, the device is in use -- check that it is not mounted.")
return False
return True
def unmountDrive(path):
# Windows
if platform.system() == "Windows":
return True
elif platform.system() == "Darwin":
# diskutil unmountDisk /dev/$DISK
command = "diskutil unmountDisk " + path + ""
print("...macOS: Unmounting device:", command)
out = check_output(["bash", "-c", command])
response = out.decode("utf-8")
# Unmount of all volumes on disk4 was successful
print("...macOS: response:", response)
return True
else:
print("..." + platform.system() + ": Automatically unmounted not supported on this platform.")
return None
def findPhysicalDriveSize(physicalDrive):
# Windows
if platform.system() == "Windows":
print("...Windows: Detecting physical drive size...")
out = check_output(["wmic", "diskdrive", "list", "brief"])
lines = out.split(b"\r\n")
for line in lines:
parts = line.split()
for part in parts:
if part.decode() == physicalDrive:
return int(parts[-1].decode())
return 0
# macOS
elif platform.system() == "Darwin":
# diskutil info -plist disk4s1 | grep -C1 "TotalSize" | tail -n 2 | grep -Eo "\d+"
command = "diskutil info -plist disk4s1 | grep -C1 \"TotalSize\" | tail -n 2 | grep -Eo \"\\d+\""
out = check_output(["bash", "-c", command])
out.decode("utf-8")
return int(out)
# Unsupported platform
else:
print("..." + platform.system() + ": Automatically detecting physical drive size not supported on this platform.")
return None
def findPhysicalDrives():
# Windows
if platform.system() == "Windows":
print("...Windows: detecting device physical drive...")
prefixDevice = [b"AX3 AX3 Mass Storage USB Device", b"AX6 AX6 Mass Storage USB Device"]
prefixDrive = b"\\\\.\\PHYSICALDRIVE"
out = check_output(["wmic", "diskdrive", "list", "brief"])
paths = []
lines = out.split(b"\r\n")
for line in lines:
match = False
for prefix in prefixDevice:
if line.startswith(prefix):
match = True
if match:
parts = line.split()
for part in parts:
if part.startswith(prefixDrive):
paths.append(part.decode("utf-8"))
return paths
# macOS
elif platform.system() == "Darwin":
# diskutil list | grep -E "\bAX\d+_\d+\b" | grep -Eo "\bdisk\d+s\d+"
command = "diskutil list | grep -E \"\\bAX\\d+_\\d+\\b\" | grep -Eo \"\\bdisk\\d+s\\d+\""
paths = []
try:
out = check_output(["bash", "-c", command])
except:
print("ERROR: Problem detecting device physical drive -- please check the device is connected.")
return paths
lines = out.split(b"\n")
for line in lines:
line = line.decode("utf-8")
if len(line) > 0:
paths.append("/dev/" + line)
return paths
# Unsupported platform
else:
print("..." + platform.system() + ": Automatically finding device path not supported on this platform.")
return None
def findSingleDrive():
paths = findPhysicalDrives()
if paths is None:
print("WARNING: Unable to find drive path.")
return None
elif len(paths) <= 0:
print("WARNING: Found no matching drive (expecting one):")
return None
elif len(paths) > 1:
print("WARNING: Found too many matching drives (expecting at most one):")
for path in paths:
print("", path);
return None
else:
path = paths[0]
print("NOTE: Found path:", path)
return path
def needsToRunElevated():
# Windows
if platform.system() == "Windows":
try:
if ctypes.windll.shell32.IsUserAnAdmin():
return False
else:
return True
except:
return None
# Linux or macOS
elif platform.system() == "Linux" or platform.system() == "Darwin":
if os.getuid() == 0:
return False
return None
# Unsupported platform
else:
return None
# Re-run the program with admin rights
def rerunElevated():
# Windows
if platform.system() == "Windows":
params = " ".join(['"%s"' % (x,) for x in sys.argv[0:]])
print("...Windows: spawning version with admin rights...", sys.executable, params)
ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, params, None, 1)
return True
# Unsupported platform
else:
print("..." + platform.system() + ": Not running with elevated permissions and will not attempt to automatically re-run on this platform -- try re-running the command prefixed with: sudo")
def pause():
print("Press Enter to continue...")
input()
def main():
#atexit.register(pause)
print("Running...")
# Options
drivePath = None
outputFile = None
mode = "xb"
type = None
attemptElevate = True
pauseOnExit = True
arg = 1
while arg < len(sys.argv):
if sys.argv[arg].startswith("-"):
if sys.argv[arg] == "--no-overwrite":
mode = "xb"
elif sys.argv[arg] == "--overwrite":
mode = "wb"
elif sys.argv[arg] == "--resume":
mode = "ab"
elif sys.argv[arg] == "--type:ax3":
type = "ax3"
elif sys.argv[arg] == "--type:ax6":
type = "ax6"
elif sys.argv[arg] == "--source":
arg += 1
if arg < len(sys.argv):
drivePath = sys.argv[arg]
else:
print("WARNING: No device specified after --source")
elif sys.argv[arg] == "--dest":
arg += 1
if arg < len(sys.argv):
outputFile = sys.argv[arg]
else:
print("WARNING: No output file specified after --dest")
elif sys.argv[arg] == "--no-pause":
pauseOnExit = False
elif sys.argv[arg] == "--no-elevate":
attemptElevate = False
else:
print("ERROR: Unrecognized option: " + sys.argv[arg])
return
elif outputFile == None: # backwards-compatible: dest-only specified
outputFile = sys.argv[arg]
elif drivePath == None: # also supports: source dest
drivePath = outputFile
outputFile = sys.argv[arg]
else:
print("ERROR: Unrecognized positional argument: " + sys.argv[arg])
return
arg += 1
if outputFile is None:
outputFile = "cwa-dump.img"
print("NOTE: Using output file in mode=" + mode + ":", outputFile)
if drivePath is None:
print("Determining source...")
drivePath = findSingleDrive()
if drivePath is None:
print("ERROR: No device specified or found -- cannot continue.")
return
print("Checking whether likely needs to run with elevated permissions...")
if needsToRunElevated():
print("...it is likely to need to run with elevated permissions.")
if attemptElevate:
print("...attempting to elevate...")
ret = rerunElevated()
if ret is not None:
return ret
# Run code needing admin rights
ret = driveDump(drivePath, outputFile, mode, type)
if pauseOnExit:
pause()
return ret
if __name__ == "__main__":
main()