firmware_flasher: initial working flasher
parent
72aec53cef
commit
46f68a13b8
|
|
@ -1,20 +1,76 @@
|
||||||
# SPDX-License-Identifier: GPL-2.0-or-later
|
# SPDX-License-Identifier: GPL-2.0-or-later
|
||||||
|
|
||||||
import datetime
|
import datetime
|
||||||
|
import struct
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
|
||||||
|
from PyQt5.QtCore import pyqtSignal
|
||||||
from PyQt5.QtGui import QFontDatabase
|
from PyQt5.QtGui import QFontDatabase
|
||||||
from PyQt5.QtWidgets import QVBoxLayout, QHBoxLayout, QLineEdit, QToolButton, QPlainTextEdit, QProgressBar,\
|
from PyQt5.QtWidgets import QVBoxLayout, QHBoxLayout, QLineEdit, QToolButton, QPlainTextEdit, QProgressBar,\
|
||||||
QFileDialog, QDialog
|
QFileDialog, QDialog
|
||||||
|
|
||||||
from util import tr
|
from util import tr, chunks
|
||||||
from vial_device import VialBootloader
|
from vial_device import VialBootloader
|
||||||
|
|
||||||
|
|
||||||
|
def send_retries(dev, data, retries=20):
|
||||||
|
""" Sends usb packet up to 'retries' times, returns True if success, False if failed """
|
||||||
|
|
||||||
|
for x in range(retries):
|
||||||
|
ret = dev.send(data)
|
||||||
|
if ret == len(data) + 1:
|
||||||
|
return True
|
||||||
|
elif ret < 0:
|
||||||
|
time.sleep(0.1)
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
CHUNK = 64
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_flash(device, firmware, log):
|
||||||
|
# Check bootloader is correct version
|
||||||
|
device.send(b"VC\x00")
|
||||||
|
data = device.recv(8)
|
||||||
|
log("* Bootloader version: {}".format(data[0]))
|
||||||
|
if data[0] != 0:
|
||||||
|
log("Error: Unsupported bootloader version")
|
||||||
|
return
|
||||||
|
|
||||||
|
# TODO: Check vial ID against firmware package
|
||||||
|
device.send(b"VC\x01")
|
||||||
|
data = device.recv(8)
|
||||||
|
log("* Vial ID: {}".format(data.hex()))
|
||||||
|
|
||||||
|
# Flash
|
||||||
|
firmware_size = len(firmware)
|
||||||
|
if firmware_size % CHUNK != 0:
|
||||||
|
firmware_size += CHUNK - firmware_size % CHUNK
|
||||||
|
log(device.send(b"VC\x02" + struct.pack("<H", firmware_size // CHUNK)))
|
||||||
|
for part in chunks(firmware, CHUNK):
|
||||||
|
if len(part) < CHUNK:
|
||||||
|
part += b"\x00" * (CHUNK - len(part))
|
||||||
|
if not send_retries(device, part):
|
||||||
|
log("Error while sending data, firmware is corrupted.")
|
||||||
|
return
|
||||||
|
log(datetime.datetime.now())
|
||||||
|
|
||||||
|
# Reboot
|
||||||
|
log("Rebooting...")
|
||||||
|
device.send(b"VC\x03")
|
||||||
|
|
||||||
|
|
||||||
class FirmwareFlasher(QVBoxLayout):
|
class FirmwareFlasher(QVBoxLayout):
|
||||||
|
log_signal = pyqtSignal(object)
|
||||||
|
|
||||||
def __init__(self, parent=None):
|
def __init__(self, parent=None):
|
||||||
super().__init__(parent)
|
super().__init__(parent)
|
||||||
|
|
||||||
|
self.log_signal.connect(self._on_log)
|
||||||
|
|
||||||
self.selected_firmware_path = ""
|
self.selected_firmware_path = ""
|
||||||
|
|
||||||
file_selector = QHBoxLayout()
|
file_selector = QHBoxLayout()
|
||||||
|
|
@ -62,7 +118,25 @@ class FirmwareFlasher(QVBoxLayout):
|
||||||
self.log("Firmware update package: {}".format(self.selected_firmware_path))
|
self.log("Firmware update package: {}".format(self.selected_firmware_path))
|
||||||
|
|
||||||
def on_click_flash(self):
|
def on_click_flash(self):
|
||||||
pass
|
if not self.selected_firmware_path:
|
||||||
|
self.log("Error: Please select a firmware update package")
|
||||||
|
return
|
||||||
|
|
||||||
|
with open(self.selected_firmware_path, "rb") as inf:
|
||||||
|
firmware = inf.read()
|
||||||
|
|
||||||
|
if len(firmware) > 10 * 1024 * 1024:
|
||||||
|
self.log("Error: Firmware is too large. Check you've selected the correct file")
|
||||||
|
return
|
||||||
|
|
||||||
|
self.log("Preparing to flash...")
|
||||||
|
threading.Thread(target=lambda: cmd_flash(self.device, firmware, self.on_log)).start()
|
||||||
|
|
||||||
|
def on_log(self, line):
|
||||||
|
self.log_signal.emit(line)
|
||||||
|
|
||||||
|
def _on_log(self, line):
|
||||||
|
self.log(line)
|
||||||
|
|
||||||
def log(self, line):
|
def log(self, line):
|
||||||
self.txt_logger.appendPlainText("[{}] {}".format(datetime.datetime.now(), line))
|
self.txt_logger.appendPlainText("[{}] {}".format(datetime.datetime.now(), line))
|
||||||
|
|
|
||||||
|
|
@ -4,18 +4,13 @@ import struct
|
||||||
|
|
||||||
|
|
||||||
from keyboard import Keyboard
|
from keyboard import Keyboard
|
||||||
|
from util import chunks
|
||||||
|
|
||||||
LAYOUT_2x2 = """
|
LAYOUT_2x2 = """
|
||||||
{"name":"test","vendorId":"0x0000","productId":"0x1111","lighting":"none","matrix":{"rows":2,"cols":2},"layouts":{"keymap":[["0,0","0,1"],["1,0","1,1"]]}}
|
{"name":"test","vendorId":"0x0000","productId":"0x1111","lighting":"none","matrix":{"rows":2,"cols":2},"layouts":{"keymap":[["0,0","0,1"],["1,0","1,1"]]}}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
def chunks(data, sz):
|
|
||||||
for i in range(0, len(data), sz):
|
|
||||||
yield data[i:i+sz]
|
|
||||||
|
|
||||||
|
|
||||||
class SimulatedDevice:
|
class SimulatedDevice:
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
|
|
||||||
|
|
@ -44,3 +44,8 @@ def find_vial_devices(sideload_vid, sideload_pid):
|
||||||
elif dev["vendor_id"] == sideload_vid and dev["product_id"] == sideload_pid and is_rawhid(dev):
|
elif dev["vendor_id"] == sideload_vid and dev["product_id"] == sideload_pid and is_rawhid(dev):
|
||||||
filtered.append(VialKeyboard(dev, sideload=True))
|
filtered.append(VialKeyboard(dev, sideload=True))
|
||||||
return filtered
|
return filtered
|
||||||
|
|
||||||
|
|
||||||
|
def chunks(data, sz):
|
||||||
|
for i in range(0, len(data), sz):
|
||||||
|
yield data[i:i+sz]
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,13 @@ class VialDevice:
|
||||||
self.dev = hid.device()
|
self.dev = hid.device()
|
||||||
self.dev.open_path(self.desc["path"])
|
self.dev.open_path(self.desc["path"])
|
||||||
|
|
||||||
|
def send(self, data):
|
||||||
|
# add 00 at start for hidapi report id
|
||||||
|
return self.dev.write(b"\x00" + data)
|
||||||
|
|
||||||
|
def recv(self, length):
|
||||||
|
return bytes(self.dev.read(length))
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.dev.close()
|
self.dev.close()
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue