#!/usr/bin/env python import sys import os import struct import getpass from hashlib import sha256 from random import randint import argparse import subprocess try: import py_sg except ImportError, e: print "You need to install the \"py_sg\" module." sys.exit(1) BLOCK_SIZE = 512 HANDSTORESECURITYBLOCK = 1 dev = None ## Print fail message with red leading characters def fail(str): return "\033[91m" + "[!]" + "\033[0m" + " " + str ## Print fail message with green leading characters def success(str): return "\033[92m" + "[*]" + "\033[0m" + " " + str ## Print fail message with blue leading characters def question(str): return "\033[94m" + "[+]" + "\033[0m" + " " + str def title(str): return "\033[93m" + str + "\033[0m" ### Return if the current user is root def is_root_user(): if os.geteuid() != 0: return False else: return True ## Convert an integer to his human-readable secure status def sec_status_to_str(security_status): if security_status == 0x00: return "No lock" elif security_status == 0x01: return "Locked"; elif security_status == 0x02: return "Unlocked"; elif security_status == 0x06: return "Locked, unlock blocked"; elif security_status == 0x07: return "No keys"; else: return "unknown"; ## Convert an integer to his human-readable cipher algorithm def cipher_id_to_str(cipher_id): if cipher_id == 0x10: return "AES_128_ECB"; elif cipher_id == 0x12: return "AES_128_CBC"; elif cipher_id == 0x18: return "AES_128_XTS"; elif cipher_id == 0x20: return "AES_256_ECB"; elif cipher_id == 0x22: return "AES_256_CBC"; elif cipher_id == 0x28: return "AES_256_XTS"; elif cipher_id == 0x30: return "Full Disk Encryption"; else: return "unknown"; ## Transform "cdb" in char[] def _scsi_pack_cdb(cdb): return struct.pack('{0}B'.format(len(cdb)), *cdb) ## Convert int from host byte order to network byte order def htonl(num): return struct.pack('!I', num) ## Convert int from host byte order to network byte order def htons(num): return struct.pack('!H', num) ## Call the device and get the selected block of Handy Store. def read_handy_store(page): cdb = [0xD8,0x00,0x00,0x00,0x00,0x01,0x00,0x00,0x01,0x00] i = 2 for c in htonl(page): cdb[i] = ord(c) i+=1 data = py_sg.read(dev, _scsi_pack_cdb(cdb), BLOCK_SIZE) return data ## Calculate checksum on the returned data def hsb_checksum(data): c = 0 for i in range(510): c = c + ord(data[i]) c = c + ord(data[0]) ## Some WD Utils count data[0] twice, some other not ... r = (c * -1) & 0xFF return hex(r) ## Call the device and get the encryption status. ## The function returns three values: ## ## SecurityStatus: ## 0x00 => No lock ## 0x01 => Locked ## 0x02 => Unlocked ## 0x06 => Locked, unlock blocked ## 0x07 => No keys ## CurrentChiperID ## 0x10 => AES_128_ECB ## 0x12 => AES_128_CBC ## 0x18 => AES_128_XTS ## 0x20 => AES_256_ECB ## 0x22 => AES_256_CBC ## 0x28 => AES_256_XTS ## 0x30 => Full Disk Encryption ## KeyResetEnabler (4 bytes that change every time) ## def get_encryption_status(): cdb = [0xC0, 0x45, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x30, 0x00] data = py_sg.read(dev, _scsi_pack_cdb(cdb), BLOCK_SIZE) if ord(data[0]) != 0x45: print fail("Wrong encryption status signature %s" % hex(ord(data[0]))) sys.exit(1) ## SecurityStatus, CurrentChiperID, KeyResetEnabler return (ord(data[3]), ord(data[4]), data[8:12]) ## Call the device and get the first block of Handy Store. ## The function returns three values: ## ## Iteration - number of iteration (hashing) in password generation ## Salt - salt used in password generation ## Hint - hint of the password if used. TODO. def read_handy_store_block1(): signature = [0x00, 0x01, 0x44, 0x57] sector_data = read_handy_store(1) ## Check if retrieved Checksum is correct if hsb_checksum(sector_data) != hex(ord(sector_data[511])): print fail("Wrong HSB1 checksum") sys.exit(1) ## Check if retrieved Signature is correct for i in range(0,4): if signature[i] != ord(sector_data[i]): print fail("Wrong HSB1 signature.") sys.exit(1); iteration = struct.unpack_from(" 0): old_passwd_hashed = mk_password_block(old_passwd, iteration, salt) pw_block[3] = pw_block[3] | 0x10 else: old_passwd_hashed = "" for i in range(32): old_passwd_hashed = old_passwd_hashed + chr(0x00) if (len(new_passwd) > 0): new_passwd_hashed = mk_password_block(new_passwd, iteration, salt) pw_block[3] = pw_block[3] | 0x01 else: new_passwd_hashed = "" for i in range(32): new_passwd_hashed = new_passwd_hashed + chr(0x00) if pw_block[3] & 0x11 == 0x11: pw_block[3] = pw_block[3] & 0xEE pwblen = 8 + 2 * pwblen cdb[8] = pwblen try: ## If exception isn't raised the unlock operation gone ok. py_sg.write(dev, _scsi_pack_cdb(cdb), _scsi_pack_cdb(pw_block) + old_passwd_hashed + new_passwd_hashed) print success("Password changed.") except: ## Wrong password or something bad is happened. print fail("Error changing password") pass ## Change the internal key used for encryption, every data on the device would be permanently unaccessible. ## Device forgets even the partition table so you have to make a new one. def secure_erase(cipher_id = 0): cdb = [0xC1, 0xE3, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x00] status, current_cipher_id, key_reset = get_encryption_status() if cipher_id == 0: cipher_id = current_cipher_id pw_block = [0x45,0x00,0x00,0x00,0x30,0x00,0x00,0x00] if cipher_id == 0x10 or cipher_id == 0x12 or cipher_id == 0x18: pwblen = 16; pw_block[3] = 0x01 elif cipher_id == 0x20 or cipher_id == 0x22 or cipher_id == 0x28: pwblen = 32; pw_block[3] = 0x01 elif cipher_id == 0x30: pwblen = 32; # pw_block[3] = 0x00 else: print fail("Unsupported cipher %s" % cipher_id) sys.exit(1) ## Set the actual lenght of pw_block (8 bytes + pwblen pseudorandom data) cdb[8] = pwblen + 8 ## Fill pw_block with random data for rand_byte in os.urandom(pwblen): pw_block.append(ord(rand_byte)) ## key_reset needs to be retrieved immidiatly before the reset request #status, current_cipher_id, key_reset = get_encryption_status() key_reset = get_encryption_status()[2] i = 2 for c in key_reset: cdb[i] = ord(c) i += 1 try: py_sg.write(dev, _scsi_pack_cdb(cdb), _scsi_pack_cdb(pw_block)) print success("Device erased. You need to create a new partition on the device (Hint: fdisk and mkfs)") except: ## Something bad is happened. print fail("Something wrong.") pass ## Get device info through "lsscsi" command def get_device_info(device = None): if device == None: grep_string = "Passport" else: grep_string = device ## Ex. from the following string ## "[23:0:0:0] disk WD My Passport 0820 1012 /dev/sdb" ## We extract p = subprocess.Popen("lsscsi | grep " + grep_string + " | grep -oP \"\/([a-zA-Z]+)\/([a-zA-Z0-9]+)\"",shell=True,stdout=subprocess.PIPE) ## /dev/sdb complete_path = p.stdout.read().rstrip() p = subprocess.Popen("lsscsi | grep " + grep_string + " | grep -oP \"\/([a-zA-Z]+)\/([a-zA-Z0-9]+)\" | cut -d '/' -f 3",shell=True,stdout=subprocess.PIPE) ## sdb relative_path = p.stdout.read().rstrip() p = subprocess.Popen("lsscsi -d|grep " + grep_string + "|cut -d ':' -f 1|cut -d '[' -f 2",shell=True,stdout=subprocess.PIPE) ## 23 host_number = p.stdout.read().rstrip() return (complete_path, relative_path, host_number) ## Enable mount operations ## Tells the system to scan the "new" (unlocked) device def enable_mount(device): sec_status, cipher_id, key_reset = get_encryption_status() ## Device should be in the correct state if (sec_status == 0x00 or sec_status == 0x02): rp,hn = get_device_info(device)[1:] p = subprocess.Popen("echo 1 > /sys/block/" + rp + "/device/delete",shell=True) p = subprocess.Popen("echo \"- - -\" > /sys/class/scsi_host/host" + hn + "/scan",shell=True) print success("Now depending on your system you can mount your device or it will be automagically mounted.") else: print fail("Device needs to be unlocked in order to mount it.") ## Main function, get parameters and manage operations def main(argv): global dev print title("WD Passport Ultra linux utility v0.1 by duke") parser = argparse.ArgumentParser() parser.add_argument("-s", "--status", required=False, action="store_true", help="Check device status and encryption type") parser.add_argument("-u", "--unlock", required=False, action="store_true", help="Unlock") parser.add_argument("-m", "--mount", required=False, action="store_true", help="Enable mount point for an unlocked device") parser.add_argument("-c", "--change_passwd", required=False, action="store_true", help="Change (or disable) password") parser.add_argument("-e", "--erase", required=False, action="store_true", help="Secure erase device") parser.add_argument("-d", "--device", dest="device", required=False, help="Force device path (ex. /dev/sdb). Usually you don't need this option.") args = parser.parse_args() if not is_root_user(): print fail("You need to have root privileges to run this script.") sys.exit(1) if len(sys.argv) == 1: args.status = True if args.device: DEVICE = args.device else: ## Get occurrences of "Passport" devices p = subprocess.Popen("lsscsi | grep Passport | wc -l",shell=True,stdout=subprocess.PIPE) if int(p.stdout.read().rstrip()) > 1: print fail("Multiple occurences of \"My Passport\" detected. You should specify a device manually (with -d option).") sys.exit(1) DEVICE = get_device_info()[0] try: dev = open(DEVICE,"r+b") except: print fail("Something wrong opening device \"%s\"" % (DEVICE)) sys.exit(1) if args.status: status, cipher_id, key_reset = get_encryption_status() print success("Device state") print "\tSecurity status: %s" % sec_status_to_str(status) print "\tEncryption type: %s" % cipher_id_to_str(cipher_id) if args.unlock: unlock() if args.change_passwd: change_password() if args.erase: print question("Any data on the device will be lost. Are you sure you want to continue? [y/N]") r = sys.stdin.read(1) if r.lower() == 'y': secure_erase(0) else: print success("Ok. Bye.") if args.mount: enable_mount(DEVICE) if __name__ == "__main__": main(sys.argv[1:])