keyboard_comm: try most commands several times

main
Ilya Zhuravlev 2021-01-07 12:19:57 -05:00
parent 69cd27ad55
commit c3d6329c44
2 changed files with 45 additions and 25 deletions

View File

@ -78,7 +78,7 @@ class Keyboard:
def reload_layers(self): def reload_layers(self):
""" Get how many layers the keyboard has """ """ Get how many layers the keyboard has """
self.layers = self.usb_send(self.dev, struct.pack("B", CMD_VIA_GET_LAYER_COUNT))[1] self.layers = self.usb_send(self.dev, struct.pack("B", CMD_VIA_GET_LAYER_COUNT), retries=20)[1]
def reload_layout(self, sideload_json=None): def reload_layout(self, sideload_json=None):
""" Requests layout data from the current device """ """ Requests layout data from the current device """
@ -87,18 +87,19 @@ class Keyboard:
payload = sideload_json payload = sideload_json
else: else:
# get keyboard identification # get keyboard identification
data = self.usb_send(self.dev, struct.pack("BB", CMD_VIA_VIAL_PREFIX, CMD_VIAL_GET_KEYBOARD_ID)) data = self.usb_send(self.dev, struct.pack("BB", CMD_VIA_VIAL_PREFIX, CMD_VIAL_GET_KEYBOARD_ID), retries=20)
self.vial_protocol, self.keyboard_id = struct.unpack("<IQ", data[0:12]) self.vial_protocol, self.keyboard_id = struct.unpack("<IQ", data[0:12])
# get the size # get the size
data = self.usb_send(self.dev, struct.pack("BB", CMD_VIA_VIAL_PREFIX, CMD_VIAL_GET_SIZE)) data = self.usb_send(self.dev, struct.pack("BB", CMD_VIA_VIAL_PREFIX, CMD_VIAL_GET_SIZE), retries=20)
sz = struct.unpack("<I", data[0:4])[0] sz = struct.unpack("<I", data[0:4])[0]
# get the payload # get the payload
payload = b"" payload = b""
block = 0 block = 0
while sz > 0: while sz > 0:
data = self.usb_send(self.dev, struct.pack("<BBI", CMD_VIA_VIAL_PREFIX, CMD_VIAL_GET_DEFINITION, block)) data = self.usb_send(self.dev, struct.pack("<BBI", CMD_VIA_VIAL_PREFIX, CMD_VIAL_GET_DEFINITION, block),
retries=20)
if sz < MSG_LEN: if sz < MSG_LEN:
data = data[:sz] data = data[:sz]
payload += data payload += data
@ -157,7 +158,7 @@ class Keyboard:
for x in range(0, size, BUFFER_FETCH_CHUNK): for x in range(0, size, BUFFER_FETCH_CHUNK):
offset = x offset = x
sz = min(size - offset, BUFFER_FETCH_CHUNK) sz = min(size - offset, BUFFER_FETCH_CHUNK)
data = self.usb_send(self.dev, struct.pack(">BHB", CMD_VIA_KEYMAP_GET_BUFFER, offset, sz)) data = self.usb_send(self.dev, struct.pack(">BHB", CMD_VIA_KEYMAP_GET_BUFFER, offset, sz), retries=20)
keymap += data[4:4+sz] keymap += data[4:4+sz]
for layer in range(self.layers): for layer in range(self.layers):
@ -169,19 +170,21 @@ class Keyboard:
for layer in range(self.layers): for layer in range(self.layers):
for idx in self.encoderpos: for idx in self.encoderpos:
data = self.usb_send(self.dev, struct.pack("BBBB", CMD_VIA_VIAL_PREFIX, CMD_VIAL_GET_ENCODER, layer, idx)) data = self.usb_send(self.dev, struct.pack("BBBB", CMD_VIA_VIAL_PREFIX, CMD_VIAL_GET_ENCODER, layer, idx),
retries=20)
self.encoder_layout[(layer, idx, 0)] = struct.unpack(">H", data[0:2])[0] self.encoder_layout[(layer, idx, 0)] = struct.unpack(">H", data[0:2])[0]
self.encoder_layout[(layer, idx, 1)] = struct.unpack(">H", data[2:4])[0] self.encoder_layout[(layer, idx, 1)] = struct.unpack(">H", data[2:4])[0]
if self.layouts: if self.layouts:
data = self.usb_send(self.dev, struct.pack("BB", CMD_VIA_GET_KEYBOARD_VALUE, VIA_LAYOUT_OPTIONS)) data = self.usb_send(self.dev, struct.pack("BB", CMD_VIA_GET_KEYBOARD_VALUE, VIA_LAYOUT_OPTIONS),
retries=20)
self.layout_options = struct.unpack(">I", data[2:6])[0] self.layout_options = struct.unpack(">I", data[2:6])[0]
def reload_macros(self): def reload_macros(self):
""" Loads macro information from the keyboard """ """ Loads macro information from the keyboard """
data = self.usb_send(self.dev, struct.pack("B", CMD_VIA_MACRO_GET_COUNT)) data = self.usb_send(self.dev, struct.pack("B", CMD_VIA_MACRO_GET_COUNT), retries=20)
self.macro_count = data[1] self.macro_count = data[1]
data = self.usb_send(self.dev, struct.pack("B", CMD_VIA_MACRO_GET_BUFFER_SIZE)) data = self.usb_send(self.dev, struct.pack("B", CMD_VIA_MACRO_GET_BUFFER_SIZE), retries=20)
self.macro_memory = struct.unpack(">H", data[1:3])[0] self.macro_memory = struct.unpack(">H", data[1:3])[0]
self.macro = b"" self.macro = b""
@ -189,7 +192,7 @@ class Keyboard:
# now retrieve the entire buffer, MACRO_CHUNK bytes at a time, as that is what fits into a packet # now retrieve the entire buffer, MACRO_CHUNK bytes at a time, as that is what fits into a packet
for x in range(0, self.macro_memory, BUFFER_FETCH_CHUNK): for x in range(0, self.macro_memory, BUFFER_FETCH_CHUNK):
sz = min(BUFFER_FETCH_CHUNK, self.macro_memory - x) sz = min(BUFFER_FETCH_CHUNK, self.macro_memory - x)
data = self.usb_send(self.dev, struct.pack(">BHB", CMD_VIA_MACRO_GET_BUFFER, x, sz)) data = self.usb_send(self.dev, struct.pack(">BHB", CMD_VIA_MACRO_GET_BUFFER, x, sz), retries=20)
self.macro += data[4:4+sz] self.macro += data[4:4+sz]
# macros are stored as NUL-separated strings, so let's clean up the buffer # macros are stored as NUL-separated strings, so let's clean up the buffer
# ensuring we only get macro_count strings after we split by NUL # ensuring we only get macro_count strings after we split by NUL
@ -202,7 +205,7 @@ class Keyboard:
key = (layer, row, col) key = (layer, row, col)
if self.layout[key] != code: if self.layout[key] != code:
self.usb_send(self.dev, struct.pack(">BBBBH", CMD_VIA_SET_KEYCODE, layer, row, col, code)) self.usb_send(self.dev, struct.pack(">BBBBH", CMD_VIA_SET_KEYCODE, layer, row, col, code), retries=20)
self.layout[key] = code self.layout[key] = code
def set_encoder(self, layer, index, direction, code): def set_encoder(self, layer, index, direction, code):
@ -212,13 +215,14 @@ class Keyboard:
key = (layer, index, direction) key = (layer, index, direction)
if self.encoder_layout[key] != code: if self.encoder_layout[key] != code:
self.usb_send(self.dev, struct.pack(">BBBBBH", CMD_VIA_VIAL_PREFIX, CMD_VIAL_SET_ENCODER, self.usb_send(self.dev, struct.pack(">BBBBBH", CMD_VIA_VIAL_PREFIX, CMD_VIAL_SET_ENCODER,
layer, index, direction, code)) layer, index, direction, code), retries=20)
self.encoder_layout[key] = code self.encoder_layout[key] = code
def set_layout_options(self, options): def set_layout_options(self, options):
if self.layout_options != -1 and self.layout_options != options: if self.layout_options != -1 and self.layout_options != options:
self.layout_options = options self.layout_options = options
self.usb_send(self.dev, struct.pack(">BBI", CMD_VIA_SET_KEYBOARD_VALUE, VIA_LAYOUT_OPTIONS, options)) self.usb_send(self.dev, struct.pack(">BBI", CMD_VIA_SET_KEYBOARD_VALUE, VIA_LAYOUT_OPTIONS, options),
retries=20)
def set_macro(self, data): def set_macro(self, data):
if len(data) > self.macro_memory: if len(data) > self.macro_memory:
@ -226,7 +230,8 @@ class Keyboard:
for x, chunk in enumerate(chunks(data, BUFFER_FETCH_CHUNK)): for x, chunk in enumerate(chunks(data, BUFFER_FETCH_CHUNK)):
off = x * BUFFER_FETCH_CHUNK off = x * BUFFER_FETCH_CHUNK
self.usb_send(self.dev, struct.pack(">BHB", CMD_VIA_MACRO_SET_BUFFER, off, len(chunk)) + chunk) self.usb_send(self.dev, struct.pack(">BHB", CMD_VIA_MACRO_SET_BUFFER, off, len(chunk)) + chunk,
retries=20)
self.macro = data self.macro = data
def save_layout(self): def save_layout(self):
@ -297,7 +302,7 @@ class Keyboard:
def get_uid(self): def get_uid(self):
""" Retrieve UID from the keyboard, explicitly sending a query packet """ """ Retrieve UID from the keyboard, explicitly sending a query packet """
data = self.usb_send(self.dev, struct.pack("BB", CMD_VIA_VIAL_PREFIX, CMD_VIAL_GET_KEYBOARD_ID)) data = self.usb_send(self.dev, struct.pack("BB", CMD_VIA_VIAL_PREFIX, CMD_VIAL_GET_KEYBOARD_ID), retries=20)
keyboard_id = data[4:12] keyboard_id = data[4:12]
return keyboard_id return keyboard_id
@ -306,7 +311,7 @@ class Keyboard:
if self.vial_protocol < 0: if self.vial_protocol < 0:
return 1 return 1
data = self.usb_send(self.dev, struct.pack("BB", CMD_VIA_VIAL_PREFIX, CMD_VIAL_GET_UNLOCK_STATUS)) data = self.usb_send(self.dev, struct.pack("BB", CMD_VIA_VIAL_PREFIX, CMD_VIAL_GET_UNLOCK_STATUS), retries=20)
return data[0] return data[0]
def get_unlock_in_progress(self): def get_unlock_in_progress(self):
@ -314,7 +319,7 @@ class Keyboard:
if self.vial_protocol < 0: if self.vial_protocol < 0:
return 0 return 0
data = self.usb_send(self.dev, struct.pack("BB", CMD_VIA_VIAL_PREFIX, CMD_VIAL_GET_UNLOCK_STATUS)) data = self.usb_send(self.dev, struct.pack("BB", CMD_VIA_VIAL_PREFIX, CMD_VIAL_GET_UNLOCK_STATUS), retries=20)
return data[1] return data[1]
def get_unlock_keys(self): def get_unlock_keys(self):
@ -324,7 +329,7 @@ class Keyboard:
if self.vial_protocol < 0: if self.vial_protocol < 0:
return [] return []
data = self.usb_send(self.dev, struct.pack("BB", CMD_VIA_VIAL_PREFIX, CMD_VIAL_GET_UNLOCK_STATUS)) data = self.usb_send(self.dev, struct.pack("BB", CMD_VIA_VIAL_PREFIX, CMD_VIAL_GET_UNLOCK_STATUS), retries=20)
rowcol = [] rowcol = []
for x in range(15): for x in range(15):
row = data[2 + x * 2] row = data[2 + x * 2]
@ -337,17 +342,17 @@ class Keyboard:
if self.vial_protocol < 0: if self.vial_protocol < 0:
return return
self.usb_send(self.dev, struct.pack("BB", CMD_VIA_VIAL_PREFIX, CMD_VIAL_UNLOCK_START)) self.usb_send(self.dev, struct.pack("BB", CMD_VIA_VIAL_PREFIX, CMD_VIAL_UNLOCK_START), retries=20)
def unlock_poll(self): def unlock_poll(self):
if self.vial_protocol < 0: if self.vial_protocol < 0:
return b"" return b""
data = self.usb_send(self.dev, struct.pack("BB", CMD_VIA_VIAL_PREFIX, CMD_VIAL_UNLOCK_POLL)) data = self.usb_send(self.dev, struct.pack("BB", CMD_VIA_VIAL_PREFIX, CMD_VIAL_UNLOCK_POLL), retries=20)
return data return data
def lock(self): def lock(self):
if self.vial_protocol < 0: if self.vial_protocol < 0:
return return
self.usb_send(self.dev, struct.pack("BB", CMD_VIA_VIAL_PREFIX, CMD_VIAL_LOCK)) self.usb_send(self.dev, struct.pack("BB", CMD_VIA_VIAL_PREFIX, CMD_VIAL_LOCK), retries=20)

View File

@ -16,15 +16,30 @@ VIBL_SERIAL_NUMBER_MAGIC = "vibl:d4f8159c"
MSG_LEN = 32 MSG_LEN = 32
def hid_send(dev, msg): def hid_send(dev, msg, retries=1):
if len(msg) > MSG_LEN: if len(msg) > MSG_LEN:
raise RuntimeError("message must be less than 32 bytes") raise RuntimeError("message must be less than 32 bytes")
msg += b"\x00" * (MSG_LEN - len(msg)) msg += b"\x00" * (MSG_LEN - len(msg))
# add 00 at start for hidapi report id data = b""
dev.write(b"\x00" + msg)
return bytes(dev.read(MSG_LEN)) while retries > 0:
retries -= 1
try:
# add 00 at start for hidapi report id
if dev.write(b"\x00" + msg) != MSG_LEN + 1:
continue
data = bytes(dev.read(MSG_LEN, timeout_ms=500))
if not data:
continue
except OSError:
continue
break
if not data:
raise RuntimeError("failed to communicate with the device")
return data
def is_rawhid(dev): def is_rawhid(dev):