/* Copyright 2022 - tomw Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ------------------------------------------- Change history: 0.9.9 - tomw - Make device keep-alive ping a configurable option, and disable by default for RM4 devices. 0.9.8 - tomw - Fix for IllegalBlockSizeException. 0.9.7 - tomw - Support for sensors (temperature and humidity). 0.9.6 - tomw - Bugfixes and improved error reporting. 0.9.5 - tomw - Added ping for RM3 devices as keep-alive. 0.9.4 - tomw - Added 'sequence' scripting feature. 0.9.3 - tomw - Rework app UI to use buttons. 0.9.1 - tomw - Added push command, which uses the button name as a saved code to send. 0.9.0 - tomw - Initial release. */ metadata { definition(name: "Broadlink Remote", namespace: "tomw", author: "tomw", importUrl: "https://raw.githubusercontent.com/tomwpublic/hubitat_broadlink/main/broadlinkRemoteDriver") { capability "Actuator" capability "Initialize" capability "RelativeHumidityMeasurement" capability "Refresh" capability "TemperatureMeasurement" command "learnIR" command "cancelRF" command "learnRF" command "sweepRF" command "sendCodeData", [[name: "code*", type: "STRING"]] command "push", ["name"] command "clearSavedCodes" command "deleteSavedCode", [[name: "name*", type: "STRING"]] command "saveLearnedCode", [[name: "name*", type: "STRING"]] command "testLearnedCode" command "sendSavedCode", [[name: "name*", type: "STRING"], [name: "reps", type: "NUMBER"]] command "cacheCodesForApp", ["retain"] // compatibility for previous integration command "importCodes", ["codeStore"] command "SendStoredCode", [[name: "name*", type: "STRING"], [name: "reps", type: "NUMBER"]] command "SendCode", ["code"] command "generateIR", ["offBurst", "onBurst", "leadIn", "bitmap", "leadOut"] // compatibility for previous integration attribute "learnedCode", "string" attribute "savedCode", "string" attribute "activity", "string" // enable to decode pcap trace //command "prepResponse", ["resp"] } } preferences { section { input name: "ipAddress", type: "text", title: "IP address", required: true input name: "deviceHasRF", type: "bool", title: "Does your device support RF?", defaultValue: false input name: "doDevicePing", type: "bool", title: "Send keep-alive ping to RM3 and RM Pro devices?", defaultValue: true } section { input name: "logEnable", type: "bool", title: "Enable debug logging", defaultValue: true } } import groovy.transform.Field @Field INIT_KEY = "097628343fe99e23765c1513accf8b02" @Field INIT_VECT = "562e17996d093d28ddb3ba695a2e6f58" @Field cmdAuthReq = 0x65 @Field cmdControl = 0x6A // common payloads that don't change @Field byte[] getFwVers = [0x68] @Field byte[] checkSensors = [0x01] @Field byte[] checkSensors4 = [0x24] import hubitat.device.HubAction import hubitat.device.Protocol import hubitat.helper.HexUtils def logDebug(msg) { if (logEnable) { log.debug(msg) } } def initialize() { unschedule() // clear these code values, in case the long strings cause browser problems device.deleteCurrentState("learnedCode") device.deleteCurrentState("savedCode") scan() } def updated() { initialize() } def cancelAll() { causeWaitCancel() } def scan() { // always cancel and try to re-init on scan() cancelAll() updateDevStatus("initializing") // https://github.com/mjg59/python-broadlink/blob/master/protocol.md#network-discovery byte[] packet = [0] * 48 packet[0x26] = 6 def cs = calcChecksum(packet) packet = replaceSubArr(packet, cs, 0x20) def str = HexUtils.byteArrayToHexString(packet) def params = [type: HubAction.Type.LAN_TYPE_UDPCLIENT, destinationAddress: ipAddress, encoding: HubAction.Encoding.HEX_STRING, callback: "scanResponse"] def hubAction = new HubAction(str, Protocol.LAN, params) sendHubCommand(hubAction) } def scanResponse(message) { def lanMsg = parseLanMessage(message) //logDebug("parse: ${lanMsg}") def plb = HexUtils.hexStringToByteArray(lanMsg.payload) logDebug("scanResp = ${HexUtils.byteArrayToHexString(plb)}") def devType = bytesToInt(subBytes(plb, 0x34, 2), "little") def mac = HexUtils.byteArrayToHexString(reverseBytes(subBytes(plb, 0x3A, 6))) def name = new String(subBytes(plb, 0x40, plb.size() - 0x40)) def isLocked = plb[0x7F] def devDetails = [devType: devType, mac: mac, name: name, isLocked: isLocked, id: 0, authKey: INIT_KEY] setDevDetails(devDetails) if(isLocked) { updateDevStatus("device is locked in Broadlink app") return } pauseExecution(500) // need to auth this device auth() updateDevStatus("waiting for auth", false) } def setDevDetails(devDetails) { state.devDetails = devDetails } def getDevDetails() { return state.devDetails ?: [:]} def updateDevStatus(activity, readyForAction = true) { sendEvent(name: "activity", value: activity) setVolatileState("readyForAction", readyForAction) } def checkReady() { def rfA = getVolatileState("readyForAction") if(null == rfA) { // not initialized, so assume "ready" return true } return rfA } def updateSeqStatus(inSeq = false) { logDebug("updateSeqStatus = ${inSeq}") setVolatileState("seqExecuting", inSeq) } def checkSeqStatus() { def sS = getVolatileState("seqExecuting") if(null == sS) { // not initialized, so assume "ready" return false } return sS } def auth() { if(!checkReady()) { return } byte[] authPacket = [0] * 80 // https://github.com/mjg59/python-broadlink/blob/master/broadlink/device.py#L173 authPacket[0x1E] = 1 authPacket[0x2D] = 1 // clear message count since we are re-auth'ing clrCount() send_packet(getDevDetails() << [callback: "authResponse"], cmdAuthReq, authPacket) } def authResponse(message) { def resp = prepResponse(message, false, true) if(null == resp) { return } logDebug("authResp = ${HexUtils.byteArrayToHexString(resp)}") // update device id and authKey def id = bytesToInt(subBytes(resp, 0, 4), "little") def authKey = HexUtils.byteArrayToHexString(subBytes(resp, 4, 16)) getDevDetails() << [id: id, authKey: authKey] updateDevStatus("idle") // kick off ping() loop as keep-alive if(doDevicePing || (null == doDevicePing)) { if(null == doDevicePing) { // no setting was specified, so this must be someone who upgraded source code // so, make sure the default value is used at least once device.updateSetting("doDevicePing", true) } runIn(60, ping) } // try to check basic info interrogateDevice() } def senseAPI() { def knownOldDevs = [0x2737,0x278F,0x27C2,0x27C7,0x27CC,0x27CD,0x27D0,0x27D1,0x27D3,0x27DC,0x27DE, 0x2712,0x272A,0x273D,0x277C,0x2783,0x2787,0x278B,0x2797,0x279D,0x27A1,0x27A6, 0x27A9,0x27C3] if(knownOldDevs.contains(getDevDetails()?.devType)) { // check against known list first setNewApi(false) return } // otherwise, try new API by default setNewApi(true) pauseExecution(500) send_packet(getDevDetails() << [callback: "senseResponse"], cmdControl, [0x01] as byte[]) } def senseResponse(message) { def resp = prepResponse(message, false, true) if([] == resp) { if(isNewApi() == true) { log.debug "unmatched old API device: ${getDevDetails()?.devType}" setNewApi(false) setErrors(0) } return } // if we got this far, new API is confirmed // so, disable ping() device.updateSetting("doDevicePing", false) // second way of getting device name //logDebug(new String(subBytes(resp, 0x48, resp.size() - 0x48))) //logDebug(resp.size()) // second way of getting lock status //logDebug(resp[0x87]) } def interrogateDevice() { if(!checkReady()) { return } // get_fwversion send_packet(getDevDetails() << [callback: "interrogateResp"], cmdControl, getFwVers) } def refresh() { if(!checkReady()) { return } // checkSensors send_packet(getDevDetails() << [callback: "sensorResponse"], cmdControl, isNewApi() ? checkSensors4 : checkSensors) } def sensorResponse(message) { def resp = prepResponse(message) if(resp?.size() == 0) { return } logDebug("sensorResp = ${HexUtils.byteArrayToHexString(resp)}") def temperature = resp[0] + (resp[1] / 100) if(temperature) { temperature = convertTemperatureIfNeeded(temperature, "C", 2) sendEvent(name: "temperature", value: temperature) } def humidity = resp[2] + (resp[3] / 100) if(humidity) { sendEvent(name: "humidity", value: humidity) } } def interrogateResp(message) { def resp = prepResponse(message, false, true) if(resp?.size() == 0) { return } //logDebug("interrogateResp = ${HexUtils.byteArrayToHexString(resp)}") switch(resp[0]) { case getFwVers[0]: logDebug("get_fwversion: ${bytesToInt(subBytes(resp, 4, 2), "little")}") break default: logDebug resp } // try to determine which API version the device speaks senseAPI() } def ping() { // send heartbeat ping that would normally come from cloud // so that offline devices don't periodically reboot byte[] packet = [0] * 0x30 packet[0x26] = 1 def params = [type: HubAction.Type.LAN_TYPE_UDPCLIENT, destinationAddress: ipAddress, encoding: HubAction.Encoding.HEX_STRING] sendHubCommand(new HubAction(HexUtils.byteArrayToHexString(packet), Protocol.LAN, params)) if(doDevicePing) { runIn(60, ping) } } def learnIR() { if(!checkReady()) { return } // https://github.com/mjg59/python-broadlink/blob/master/protocol.md#entering-learning-mode updateDevStatus("learning IR", false) send_packet(getDevDetails() << [callback: "learnResponse"], cmdControl, [0x03] as byte[]) } def learnResponse(message) { prepResponse(message) if(!checkError()) { // initialize a counter for the readback setVolatileState("iter", 0) readback() } } def readback() { // https://github.com/mjg59/python-broadlink/blob/master/protocol.md#entering-learning-mode send_packet(getDevDetails() << [callback: "readbackResponse"], cmdControl, [0x04] as byte[]) } def readbackResponse(message) { waitForResponse(message, "readback", "processCodeResp", 40) } def processCodeResp(resp) { def code = HexUtils.byteArrayToHexString(resp) //logDebug ("raw code = ${code}") sendEvent(name: "learnedCode", value: code) updateDevStatus("code captured") } def testLearnedCode() { sendCodeData(device.currentValue("learnedCode")) } ////////////////////////////////////// // RF state machine ////////////////////////////////////// @Field byte[] startSweep = [0x19] @Field byte[] checkSweep = [0x1A] @Field byte[] cancelSweep = [0x1E] @Field byte[] rfLearn = [0x1B] def hasRF() { if(!deviceHasRF) { log.info "RF command ignored. Check device preference setting." } return deviceHasRF } def cancelRF() { if(!hasRF()) { return } cancel_sweep() } def learnRF() { if(!checkReady()) { return } if(!hasRF()) { return } rf_learn() } def sweepRF() { if(!checkReady()) { return } if(!hasRF()) { return } start_sweep() } def setTargetRF(freq) { if(!hasRF()) { return } freq = freq?.toFloat() if(!freq) { return } if(freq < 300 || freq > 450) { return } // user input expected in terms of MHz // but stored in sweepFreq in terms of kHz freq = (freq * 1000)?.toInteger() if(freq) { getDevDetails() << [sweepFreq: freq] } } def start_sweep() { setVolatileState("iter", 0) send_packet(getDevDetails() << [callback: "parseRF"], cmdControl, startSweep) updateDevStatus("sweeping frequency - press and HOLD remote", false) } def check_sweep() { send_packet(getDevDetails() << [callback: "parseRF"], cmdControl, checkSweep) } def cancel_sweep(resp = null) { def msg = "trying to cancel RF sweep..." causeWaitCancel() updateDevStatus(msg, false) send_packet(getDevDetails() << [callback: "parseRF"], cmdControl, cancelSweep) updateDevStatus(msg, true) } def rf_learn(resp = null) { if(null == getDevDetails()?.sweepFreq) { updateDevStatus("missing sweep frequency - be sure to run sweepRF first") return } updateDevStatus("learning RF - press and release button multiple times", false) byte[] learnCmd = appendByteArr(rfLearn, HexUtils.hexStringToByteArray("00000000000000000000000000")) byte[] freqTerm = subBytes(intToBytes(getDevDetails()?.sweepFreq, 4, "little"), 0, 3) learnCmd = replaceSubArr(learnCmd, freqTerm, 4) send_packet(getDevDetails() << [callback: "parseRF"], cmdControl, learnCmd) } def parseRF(message) { def resp = prepResponse(message, false) if(null == resp) { return } logDebug("parseRF resp = ${HexUtils.byteArrayToHexString(resp)}") switch(resp[0]) { case startSweep[0]: if(checkError()) { return } // we started sweeping, so start checking for a success response check_sweep() break case checkSweep[0]: // sweeping RF; wait for flag to clear in checkSweepResponse waitForResponse(message, "check_sweep", "sweepSuccess", 40, "checkSweepResponse") break case cancelSweep[0]: if(checkError()) { return } updateDevStatus("cancelled RF sweep") break case rfLearn[0]: setVolatileState("iter", 0) readback() break } } def sweepSuccess(resp) { def sweepFreq = bytesToInt(subBytes(resp, 1, 3), "little") def msg = "sweep successful: freq = ${sweepFreq}" logDebug(msg) getDevDetails() << [sweepFreq: sweepFreq] updateDevStatus(msg) } def checkSweepResponse(resp) { // flag is set when sweep was successful return (resp?.getAt(0) == 1) } ////////////////////////////////////// // RF state machine ////////////////////////////////////// def causeWaitCancel() { setVolatileState("iter", 0xFFFF) } def waitForResponse(message, retryHandler, successHandler, timeout = 10, addlCheckLogic = null) { def resp = prepResponse(message) def iter = getVolatileState("iter") // if we timed out, we're done done if(iter >= timeout) { updateDevStatus("timed out") return } try { // if the transfer had an error if(checkError()) { throw new Exception("errored") } // if our additional check was specified AND failed if(null != addlCheckLogic) { if(!"$addlCheckLogic"(resp)) { throw new Exception("check failed") } } // otherwise, success! "$successHandler"(resp) return } catch(e) { //logDebug e.message // try again in 1/2 second setVolatileState("iter", iter + 1) pauseExecution(500) "$retryHandler"() } } def sendCodeData(code, reps = 1) { if(!checkReady()) { return } if(!code) { logDebug("sendCodeData() error: no code supplied") return } byte[] sendPacket = [0x02] + [0] * 3 def cb = HexUtils.hexStringToByteArray(code) sendPacket = appendByteArr(sendPacket, cb) if(sendPacket[4] == 0x26) { // FOR IR ONLY (0x26 first byte of command), // code will be sent (n+1) times this value reps = reps.toInteger() reps-- sendPacket[5] = (reps < 0) ? 0 : reps } else { // if this isn't an IR command AND we don't have RF, bail if(!hasRF()) { return } } logDebug("sendCodeData() packet: ${HexUtils.byteArrayToHexString(sendPacket)}") send_packet(getDevDetails() << [callback: "sendCodeResponse"], cmdControl, sendPacket) updateDevStatus("code sent (ts: ${new Date().getTime()})") } def sendCodeResponse(message) { prepResponse(message) } def generateIR(offBurst, onBurst, leadIn, bitmap, leadOut) { if([offBurst, onBurst, leadIn, bitmap, leadOut].contains(null)) { logDebug("generateIR: invalid code specs") return null } def code = "" // build the code sequence code += leadIn bitmap.each { code += ((it == "1") ? onBurst : offBurst) } code += leadOut // code sequence complete // apply length and build the actual packet def len = HexUtils.hexStringToByteArray(code).size() len = HexUtils.byteArrayToHexString([0xFF & len, 0xFF & (len >> 8)] as byte[]) code = "2600" + len + code + "0D05" logDebug("generateIR: offBurst=${offBurst}, onBurst=${onBurst}, leadIn=${leadIn}, bitmap=${bitmap}, leadOut=${leadOut}") logDebug("generateIR: code = ${code}") sendCodeData(code) } ////////////////////////////////////// // saved code operations ////////////////////////////////////// #include tomw.broadlinkHelpers def sendSavedCode(name, reps = 1) { sendCodeData(getSavedCode(name), reps) } def saveLearnedCode(name) { def entry = addSavedCode(name, device.currentValue("learnedCode")) sendEvent(name: "savedCode", value: entry) } def executeSequence(sequence) { if(!sequence) { return } if(checkSeqStatus()) { log.error "sequence already executing; try again later" return } try { updateSeqStatus(true) sequence = sequence?.split("sequence:")?.getAt(1) sequence = sequence.tokenize(";") logDebug("sending sequence: ${sequence}") sequence.each { it = it.trim() switch(it) { case ~/.*delay.*/: def delay = it.split("=")?.getAt(1)?.trim()?.toInteger() ?: 0 // limit to 5s delay delay = (delay > 5000) ? 5000 : delay logDebug("seq delay: ${delay} ms") pauseExecution(delay ?: 0) break case ~/.*code.*/: def codeName = it.split("=")?.getAt(1)?.trim() logDebug("seq code: ${codeName}") sendSavedCode(codeName) // throttle before next code pauseExecution(100) break case ~/.*repeat.*/: def codeName = it.split("repeat")?.getAt(1)?.trim() def params = codeName?.tokenize("=") if(2 == params.size()) { logDebug("seq code: ${params[1]?.trim()} reps: ${params[0]?.trim()?.toString()}") sendSavedCode(params[1]?.trim(), params[0]?.trim()?.toInteger()) // throttle before next code pauseExecution(100) } break default: logDebug("sequence bad entry: ${it}") } } } catch(Exception e) { logDebug("executeSequence: ${e.message}") } finally { updateSeqStatus(false) } } def push(name) { if(name instanceof String) { logDebug("push: ${name}") if(name.contains("sequence:")) { executeSequence(name) return } // if button number was a String, try to send a saved code with that name sendSavedCode(name) } } def SendStoredCode(name, reps = 1) { sendSavedCode(name, reps) } def SendCode(code) { sendCodeData(code) } def cacheCodesForApp(retain) { if(!retain) { // delete codes from data area once they are no longer needed removeDataValue("codes") return } // store copy of codes in data space, so app can get at them def codesStr = new groovy.json.JsonOutput().toJson(state.codes) updateDataValue("codes", codesStr) } ////////////////////////////////////// // saved code operations ////////////////////////////////////// def clrCount() { state.count = 0 return 0 } def incCount() { if(null == state.count) { return clrCount() } def count = (state.count + 1) & 0xffff state.count = count return count } def calcChecksum(byte[] packet) { def cs = 0xBEAF packet.each { cs = (cs + i8Tou8(it)) & 0xFFFF } return intToBytes(cs, 2, "little") } def isNewApi() { return newApi }//state.newApi } def setNewApi(val) { device.updateSetting("newApi", val) }//state.newApi = true } def prepPayload(payload, command) { if([cmdAuthReq].contains(command)) { // special commands return payload } if([getFwVers[0]].contains(payload[0])) { // special payloads return payload } if(isNewApi()) { // additional bytes at head of payload for "RMMINIB" and derivatives like "RM4MINI" and "RM4PRO" def temp = intToBytes(4, 2, "little") payload = appendByteArr(temp, payload) } return payload } def prepResponse(message, trim = true, useRaw = false) { // allow putting a pcap "hex stream" in as 'message' for decoding def lanMsg = parseLanMessage(message)?.payload ?: message //logDebug("parse: ${lanMsg}") def plb = HexUtils.hexStringToByteArray(lanMsg) // header payload //logDebug("command = ${HexUtils.byteArrayToHexString(subBytes(plb, 0, 0x38))}") updateError(plb) byte[] resp = [] if(plb.size() > 0x38) { // decode payload, starts at 0x38 plb = subBytes(plb, 0x38, plb.size() - 0x38) // pad for size, if necessary def padding = 16 - (plb.size() % 16) plb = appendByteArr(plb, [0] * padding) resp = aes_cbc(plb, "dec", getDevDetails().authKey, INIT_VECT) if(!resp) { return } if(isNewApi() && !useRaw) { resp = subBytes(resp, 2, resp.size() - 2) } if(trim) { // trim unused content resp = subBytes(resp, 4, resp.size() - 4) } } // decoded payload //logDebug(HexUtils.byteArrayToHexString(resp)) return resp } def send_packet(devDetails, command, payload) { try { // https://github.com/mjg59/python-broadlink/blob/master/protocol.md#command-packet-format byte[] packet = [0] * 56 def magic = HexUtils.hexStringToByteArray("5aa5aa555aa5aa55") packet = replaceSubArr(packet, magic, 0) // apply devType packet = replaceSubArr(packet, intToBytes(devDetails.devType, 2, "little"), 0x24) // apply command packet = replaceSubArr(packet, intToBytes(command, 1, "little"), 0x26) // update and apply count packet = replaceSubArr(packet, intToBytes(incCount(), 2, "little"), 0x28) // apply device MAC def mac = reverseBytes(HexUtils.hexStringToByteArray(devDetails.mac)) packet = replaceSubArr(packet, mac, 0x2A) // apply device ID packet = replaceSubArr(packet, intToBytes(devDetails.id, 4, "little"), 0x30) // prep payload with device- and command-specific details payload = prepPayload(payload, command) // apply checksum to header packet = replaceSubArr(packet, calcChecksum(payload), 0x34) // encrypt payload def padding = 16 - (payload.size() % 16) byte[] paddedPayload = appendByteArr(payload, [0] * padding) paddedPayload = aes_cbc(paddedPayload, "enc", devDetails.authKey, INIT_VECT) // compose whole packet (header + encrypted payload) byte[] totalPacket = appendByteArr(packet, paddedPayload) // apply checksum to totalPacket totalPacket = replaceSubArr(totalPacket, calcChecksum(totalPacket), 0x20) def str = HexUtils.byteArrayToHexString(totalPacket) //logDebug("${str}") def params = [type: HubAction.Type.LAN_TYPE_UDPCLIENT, destinationAddress: ipAddress, encoding: HubAction.Encoding.HEX_STRING, callback: devDetails.callback] def hubAction = new HubAction(str, Protocol.LAN, params) sendHubCommand(hubAction) return true } catch(Exception e) { updateDevStatus("send failed") logDebug("send_packet failed: check device details") //logDebug("message: ${e.message}") return false } } def updateError(packet) { def error = bytesToInt(subBytes(packet, 0x22, 2), "little") setErrors(error) return error } def setErrors(error) { setVolatileState("lastError", error) } def checkError() { def lE = getVolatileState("lastError") if(null == lE) { // if not initialized, assume no error return false } return (lE != 0) } def parse(message) { def lanMsg = parseLanMessage(message) logDebug("parse: ${lanMsg}") def resp = prepResponse(lanMsg.payload, false) if(null == resp) { return } logDebug("parseResp = ${HexUtils.byteArrayToHexString(resp)}") } import javax.crypto.spec.SecretKeySpec import javax.crypto.spec.IvParameterSpec import javax.crypto.Cipher def aes_cbc(data, op, key, iv) { // thanks: https://community.hubitat.com/t/groovy-aes-encryption-driver/31556 try { def cipher = Cipher.getInstance("AES/CBC/NoPadding", "SunJCE") byte[] keyBytes = HexUtils.hexStringToByteArray(key) SecretKeySpec aKey = new SecretKeySpec(keyBytes, "AES") byte[] ivBytes = HexUtils.hexStringToByteArray(iv) IvParameterSpec aIv = new IvParameterSpec(ivBytes) cipher.init(op == "enc" ? Cipher.ENCRYPT_MODE : Cipher.DECRYPT_MODE, aKey, aIv) return cipher.doFinal(data) as byte[] } catch (Exception e) { log.debug("aes_cbc() exception: ${e}") } return null } def appendByteArr(a, b) { byte[] c = new byte[a.size() + b.size()] a.eachWithIndex() { it, i -> c[i] = it } def aSz = a.size() b.eachWithIndex() { it, i -> c[i + aSz] = it } return c } def replaceSubArr(orig_arr, new_arr, start) { def tmp_arr = orig_arr.collect() new_arr.eachWithIndex { it, i -> tmp_arr[i + start] = it } return tmp_arr } private subBytes(arr, start, length) { byte[] sub = new byte[length] for(int i = 0; i < length; i++) { sub[i] = arr[i + start] } return sub } private reverseBytes(arr) { byte[] sub = new byte[arr.size()] byte end = arr.size() - 1 for(int i = 0; i < arr.size(); i++) { sub[i] = arr[end] end-- } return sub } def swapEndiannessU16(input) { return [i8Tou8(input[1]), i8Tou8(input[0])] } def swapEndiannessU32(input) { return [input[3], input[2], input[1], input[0]] } def swapEndiannessU64(input) { return [input[7], input[6], input[5], input[4], input[3], input[2], input[1], input[0]] } def intToBytes(input, width, endian = "little") { def output = new BigInteger(input).toByteArray() if(output.size() > width) { // if we got too many bytes, lop off the MSB(s) output = subBytes(output, output.size() - width, width) output = output.collect{it & 0xFF} } byte[] pad if(output.size() < width) { def padding = width - output.size() pad = [0] * padding output = appendByteArr(pad, output) } if("little" == endian) { switch(width) { case 1: break case 2: output = swapEndiannessU16(output) break case 4: output = swapEndiannessU32(output) break case 8: output = swapEndiannessU64(output) break } } return output.collect{it & 0xFF} } def bytesToInt(input, endian = "little") { def output = subBytes(input, 0, input.size()) long retVal = 0 output.eachWithIndex { it, i -> switch(endian) { case "little": retVal += ((it & 0xFF).toLong() << (i * 8)) break case "big": default: retVal += (it & 0xFF).toLong() << ((output.size() - 1 - i) * 8) break } } if(input.size() == 8) { // 8 bytes is too big for integer return retVal } return retVal as Integer } def i8Tou8(input) { return input & 0xFF } def i16Tou16(input) { return input & 0xFFFF }