diff --git a/configure.py b/configure.py index 365a3910..53b85845 100755 --- a/configure.py +++ b/configure.py @@ -66,11 +66,12 @@ SIZE_3_5_INCH = "3.5\"" SIZE_5_INCH = "5\"" SIZE_8_8_INCH = "8.8\"" +SIZE_8_8_INCH_USB = "8.8\" (V1.1)" SIZE_2_1_INCH = "2.1\"" # Only for retro compatibility SIZE_2_x_INCH = "2.1\" / 2.8\"" SIZE_0_96_INCH = "0.96\"" -size_list = (SIZE_0_96_INCH, SIZE_2_x_INCH, SIZE_3_5_INCH, SIZE_5_INCH, SIZE_8_8_INCH) +size_list = (SIZE_0_96_INCH, SIZE_2_x_INCH, SIZE_3_5_INCH, SIZE_5_INCH, SIZE_8_8_INCH, SIZE_8_8_INCH_USB) # Maps between config.yaml values and GUI description revision_and_size_to_model_map = { @@ -80,6 +81,7 @@ ('C', SIZE_2_x_INCH): TURING_MODEL, ('C', SIZE_5_INCH): TURING_MODEL, ('C', SIZE_8_8_INCH): TURING_MODEL, + ('C_USB', SIZE_8_8_INCH_USB): TURING_MODEL, ('D', SIZE_3_5_INCH): KIPYE_MODEL, ('WEACT_A', SIZE_3_5_INCH): WEACT_MODEL, ('WEACT_B', SIZE_0_96_INCH): WEACT_MODEL, @@ -97,6 +99,7 @@ (TURING_MODEL, SIZE_2_x_INCH): 'C', (TURING_MODEL, SIZE_5_INCH): 'C', (TURING_MODEL, SIZE_8_8_INCH): 'C', + (TURING_MODEL, SIZE_8_8_INCH_USB): 'C_USB', (KIPYE_MODEL, SIZE_3_5_INCH): 'D', (WEACT_MODEL, SIZE_3_5_INCH): 'WEACT_A', (WEACT_MODEL, SIZE_0_96_INCH): 'WEACT_B', @@ -383,6 +386,8 @@ def load_config_values(self): size = get_theme_size(self.config['config']['THEME']) size = size.replace(SIZE_2_1_INCH, SIZE_2_x_INCH) # If a theme is for 2.1" then it also is for 2.8" try: + if size == SIZE_8_8_INCH and self.config['display']['REVISION'] == 'C_USB': + size = SIZE_8_8_INCH_USB self.size_cb.set(size) except: self.size_cb.current(0) @@ -508,6 +513,7 @@ def on_model_change(self, e=None): def on_size_change(self, e=None): size = self.size_cb.get() size = size.replace(SIZE_2_x_INCH, SIZE_2_1_INCH) # For '2.1" / 2.8"' size, keep '2.1"' as size to get themes for + size = size.replace(SIZE_8_8_INCH_USB, SIZE_8_8_INCH) themes = get_themes(size) self.theme_cb.config(values=themes) diff --git a/library/display.py b/library/display.py index d4f21009..0befeb6d 100644 --- a/library/display.py +++ b/library/display.py @@ -23,6 +23,7 @@ from library.lcd.lcd_comm_rev_a import LcdCommRevA from library.lcd.lcd_comm_rev_b import LcdCommRevB from library.lcd.lcd_comm_rev_c import LcdCommRevC +from library.lcd.lcd_comm_turing_usb import LcdCommTuringUSB from library.lcd.lcd_comm_rev_d import LcdCommRevD from library.lcd.lcd_comm_weact_a import LcdCommWeActA from library.lcd.lcd_comm_weact_b import LcdCommWeActB @@ -85,6 +86,9 @@ def __init__(self): # Because of issue with Turing rev. C size auto-detection, manually configure screen width/height from theme self.lcd = LcdCommRevC(com_port=config.CONFIG_DATA['config']['COM_PORT'], update_queue=config.update_queue, display_width=width, display_height=height) + elif config.CONFIG_DATA["display"]["REVISION"] == "C_USB": + # On all USB models, manually configure screen width/height from theme + self.lcd = LcdCommTuringUSB(display_width=width, display_height=height) elif config.CONFIG_DATA["display"]["REVISION"] == "D": self.lcd = LcdCommRevD(com_port=config.CONFIG_DATA['config']['COM_PORT'], update_queue=config.update_queue) diff --git a/library/lcd/lcd_comm.py b/library/lcd/lcd_comm.py index 4e2714fe..fd77dbcf 100644 --- a/library/lcd/lcd_comm.py +++ b/library/lcd/lcd_comm.py @@ -50,6 +50,7 @@ def __init__(self, com_port: str = "AUTO", display_width: int = 320, display_hei self.lcd_serial = None # String containing absolute path to serial port e.g. "COM3", "/dev/ttyACM1" or "AUTO" for auto-discovery + # Ignored for USB HID screens self.com_port = com_port # Display always start in portrait orientation by default @@ -180,8 +181,8 @@ def ReadData(self, readSize: int): return self.serial_read(readSize) @staticmethod - @abstractmethod def auto_detect_com_port() -> Optional[str]: + # To implement only for screens that use serial commands pass @abstractmethod diff --git a/library/lcd/lcd_comm_turing_usb.py b/library/lcd/lcd_comm_turing_usb.py new file mode 100644 index 00000000..6fee436f --- /dev/null +++ b/library/lcd/lcd_comm_turing_usb.py @@ -0,0 +1,571 @@ +# SPDX-License-Identifier: GPL-3.0-or-later +# +# turing-smart-screen-python - a Python system monitor and library for USB-C displays like Turing Smart Screen or XuanFang +# https://github.com/mathoudebine/turing-smart-screen-python/ +# +# Copyright (C) 2021 Matthieu Houdebine (mathoudebine) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import math +import platform +import queue +import struct +import subprocess +import time +from io import BytesIO +from pathlib import Path +from typing import Optional + +import usb.core +import usb.util +from Crypto.Cipher import DES +from PIL import Image + +from library.log import logger +from library.lcd.lcd_comm import Orientation, LcdComm + +VENDOR_ID = 0x1cbe +PRODUCT_ID = 0x0088 + + +MAX_CHUNK_BYTES = 1024*1024 # Data sent to screen cannot exceed 1024MB or there will be a timeout + + +def build_command_packet_header(a0: int) -> bytearray: + packet = bytearray(500) + packet[0] = a0 + packet[2] = 0x1A + packet[3] = 0x6D + timestamp = int((time.time() - time.mktime(time.localtime()[:3] + (0, 0, 0, 0, 0, -1))) * 1000) + packet[4:8] = struct.pack(' bytes: + cipher = DES.new(key, DES.MODE_CBC, key) + padded_len = (len(data) + 7) // 8 * 8 + padded_data = data.ljust(padded_len, b'\x00') + return cipher.encrypt(padded_data) + + +def encrypt_command_packet(data: bytearray) -> bytearray: + des_key = b'slv3tuzx' + encrypted = encrypt_with_des(des_key, data) + final_packet = bytearray(512) + final_packet[:len(encrypted)] = encrypted + final_packet[510] = 161 + final_packet[511] = 26 + return final_packet + + +def find_usb_device(): + dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID) + if dev is None: + raise ValueError('USB device not found') + + try: + dev.set_configuration() + except usb.core.USBError as e: + print("Warning: set_configuration() failed:", e) + + if platform.system() == "Linux": + try: + if dev.is_kernel_driver_active(0): + dev.detach_kernel_driver(0) + except usb.core.USBError as e: + print("Warning: detach_kernel_driver failed:", e) + + return dev + + +def read_flush(ep_in, max_attempts=5): + """ + Flush the USB IN endpoint by reading available data until timeout or max attempts reached. + """ + for _ in range(max_attempts): + try: + ep_in.read(512, timeout=100) + except usb.core.USBError as e: + if e.errno == 110 or e.args[0] == 'Operation timed out': + break + else: + # print("Flush read error:", e) + break + + +def write_to_device(dev, data, timeout=2000): + cfg = dev.get_active_configuration() + intf = usb.util.find_descriptor(cfg, bInterfaceNumber=0) + if intf is None: + raise RuntimeError("USB interface 0 not found") + ep_out = usb.util.find_descriptor(intf, custom_match=lambda e: usb.util.endpoint_direction( + e.bEndpointAddress) == usb.util.ENDPOINT_OUT) + ep_in = usb.util.find_descriptor(intf, custom_match=lambda e: usb.util.endpoint_direction( + e.bEndpointAddress) == usb.util.ENDPOINT_IN) + assert ep_out is not None and ep_in is not None, "Could not find USB endpoints" + + try: + ep_out.write(data, timeout) + except usb.core.USBError as e: + print("USB write error:", e) + return None + + try: + response = ep_in.read(512, timeout) + read_flush(ep_in) + return bytes(response) + except usb.core.USBError as e: + print("USB read error:", e) + return None + + +def delay_sync(dev): + send_sync_command(dev) + time.sleep(0.2) + + +def send_sync_command(dev): + print("Sending Sync Command (ID 10)...") + cmd_packet = build_command_packet_header(10) + return write_to_device(dev, encrypt_command_packet(cmd_packet)) + + +def send_restart_device_command(dev): + print("Sending Restart Command (ID 11)...") + return write_to_device(dev, encrypt_command_packet(build_command_packet_header(11))) + + +def send_brightness_command(dev, brightness: int): + print(f"Sending Brightness Command (ID 14)...") + print(f" Brightness = {brightness}") + cmd_packet = build_command_packet_header(14) + cmd_packet[8] = brightness + return write_to_device(dev, encrypt_command_packet(cmd_packet)) + + +def send_frame_rate_command(dev, frame_rate: int): + print(f"Sending Frame Rate Command (ID 15)...") + print(f" Frame Rate = {frame_rate}") + cmd_packet = build_command_packet_header(15) + cmd_packet[8] = frame_rate + return write_to_device(dev, encrypt_command_packet(cmd_packet)) + + +def format_bytes(val): + if val > 1024 * 1024: + return f"{val / (1024 * 1024):.2f} GB" + else: + return f"{val / 1024:.2f} MB" + + +def send_refresh_storage_command(dev): + print("Sending Refresh Storage Command (ID 100)...") + response = write_to_device(dev, encrypt_command_packet(build_command_packet_header(100))) + + total = format_bytes(int.from_bytes(response[8:12], byteorder='little')) + used = format_bytes(int.from_bytes(response[12:16], byteorder='little')) + valid = format_bytes(int.from_bytes(response[16:20], byteorder='little')) + + print(f" Card Total = {total}") + print(f" Card Used = {used}") + print(f" Card Valid = {valid}") + + +def send_save_settings_command(dev, brightness=0, startup=0, reserved=0, rotation=0, sleep=0, offline=0): + print("Sending Save Settings Command (ID 125)...") + print(f" Brightness: {brightness}") + print(f" Startup Mode: {startup}") + print(f" Reserved: {reserved}") + print(f" Rotation: {rotation}") + print(f" Sleep Timeout: {sleep}") + print(f" Offline Mode: {offline}") + cmd_packet = build_command_packet_header(125) + cmd_packet[8] = brightness + cmd_packet[9] = startup + cmd_packet[10] = reserved + cmd_packet[11] = rotation + cmd_packet[12] = sleep + cmd_packet[13] = offline + return write_to_device(dev, encrypt_command_packet(cmd_packet)) + + +def send_image(dev, png_data: bytes): + img_size = len(png_data) + + cmd_packet = build_command_packet_header(102) + cmd_packet[8] = (img_size >> 24) & 0xFF + cmd_packet[9] = (img_size >> 16) & 0xFF + cmd_packet[10] = (img_size >> 8) & 0xFF + cmd_packet[11] = img_size & 0xFF + + full_payload = encrypt_command_packet(cmd_packet) + png_data + return write_to_device(dev, full_payload) + + +def clear_image(dev): + img_data = bytearray( + [0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a, 0x00, 0x00, 0x00, 0x0d, 0x49, 0x48, 0x44, 0x52, 0x00, 0x00, + 0x01, 0xe0, 0x00, 0x00, 0x07, 0x80, 0x08, 0x06, 0x00, 0x00, 0x00, 0x16, 0xf0, 0x84, 0xf5, 0x00, 0x00, 0x00, + 0x01, 0x73, 0x52, 0x47, 0x42, 0x00, 0xae, 0xce, 0x1c, 0xe9, 0x00, 0x00, 0x00, 0x04, 0x67, 0x41, 0x4d, 0x41, + 0x00, 0x00, 0xb1, 0x8f, 0x0b, 0xfc, 0x61, 0x05, 0x00, 0x00, 0x00, 0x09, 0x70, 0x48, 0x59, 0x73, 0x00, 0x00, + 0x0e, 0xc3, 0x00, 0x00, 0x0e, 0xc3, 0x01, 0xc7, 0x6f, 0xa8, 0x64, 0x00, 0x00, 0x0e, 0x0c, 0x49, 0x44, 0x41, + 0x54, 0x78, 0x5e, 0xed, 0xc1, 0x01, 0x0d, 0x00, 0x00, 0x00, 0xc2, 0xa0, 0xf7, 0x4f, 0x6d, 0x0f, 0x07, 0x14, + 0x00, 0x00, 0x00, 0x00, ] + [0x00] * 3568 + [0x00, 0xf0, 0x66, 0x4a, 0xc8, 0x00, 0x01, 0x11, 0x9d, 0x82, + 0x0a, 0x00, 0x00, 0x00, 0x00, 0x49, 0x45, 0x4e, 0x44, 0xae, 0x42, 0x60, 0x82]) + img_size = len(img_data) + print(f" Chunk Size: {img_size} bytes") + + cmd_packet = build_command_packet_header(102) + cmd_packet[8] = (img_size >> 24) & 0xFF + cmd_packet[9] = (img_size >> 16) & 0xFF + cmd_packet[10] = (img_size >> 8) & 0xFF + cmd_packet[11] = img_size & 0xFF + + full_payload = encrypt_command_packet(cmd_packet) + img_data + return write_to_device(dev, full_payload) + + +def delay(dev, rst): + time.sleep(0.05) + print("Sending Delay Command (ID 122)...") + cmd_packet = build_command_packet_header(122) + response = write_to_device(dev, encrypt_command_packet(cmd_packet)) + if response and response[8] > rst: + delay(dev, rst) + + +def extract_h264_from_mp4(mp4_path: str): + input_path = Path(mp4_path) + if not input_path.exists(): + raise FileNotFoundError(f"Input file not found: {input_path}") + + output_path = input_path.with_suffix(".h264") + + if output_path.exists(): + print(f"{output_path.name} already exists. Skipping extraction.") + return output_path + + cmd = ["ffmpeg", "-y", # overwrite without asking + "-i", str(input_path), # input file + "-c:v", "copy", # copy video stream + "-bsf:v", "h264_mp4toannexb", # convert to Annex-B + "-an", # remove audio + "-f", "h264", # set output format + str(output_path) # output file + ] + + print(f"Extracting H.264 from {input_path.name}...") + subprocess.run(cmd, check=True) + print(f"Done. Saved as {output_path.name}") + return output_path + + +def send_video(dev, video_path, loop=False): + output_path = extract_h264_from_mp4(video_path) + write_to_device(dev, encrypt_command_packet(build_command_packet_header(111))) + write_to_device(dev, encrypt_command_packet(build_command_packet_header(112))) + write_to_device(dev, encrypt_command_packet(build_command_packet_header(13))) + send_brightness_command(dev, 32) # 14 + write_to_device(dev, encrypt_command_packet(build_command_packet_header(41))) + clear_image(dev) # 102, 3703 + send_frame_rate_command(dev, 25) # 15 + # send_image(dev, './102_25011_payload.png') #102, 25011 + print("Sending Send Video Command (ID 121)...") + try: + while (True): + with open(output_path, 'rb') as f: + while True: + data = f.read(202752) + chunksize = len(data) + if not data: + break + print(f" Chunk Size: {chunksize} bytes") + + cmd_packet = build_command_packet_header(121) + cmd_packet[8] = (chunksize >> 24) & 0xFF + cmd_packet[9] = (chunksize >> 16) & 0xFF + cmd_packet[10] = (chunksize >> 8) & 0xFF + cmd_packet[11] = chunksize & 0xFF + + full_payload = encrypt_command_packet(cmd_packet) + data + response = write_to_device(dev, full_payload) + time.sleep(0.03) + if response is None or len(response) < 9 or response[8] <= 3: + delay(dev, 2) + print("Video sent successfully.") + if not loop: + break + except KeyboardInterrupt: + print("\nLoop interrupted by user. Sending reset...") + finally: + write_to_device(dev, encrypt_command_packet(build_command_packet_header(123))) + + +def _encode_png(image: Image.Image) -> bytes: + buffer = BytesIO() + image.save(buffer, format="PNG", compress_level=9) + return buffer.getvalue() + + +def compress_image(image: Image.Image, ratio: float) -> Image.Image: + width, height = image.size + image = image.resize((int(width * ratio*0.5), int(height * ratio*0.5)), + resample=Image.Resampling.LANCZOS) + image = image.resize((width, height)) + return image + + + +def upload_file(dev, file_path: str) -> bool: + local_path = Path(file_path) + if not local_path.exists(): + logger.error("Error: File does not exist: %s", file_path) + return False + + ext = local_path.suffix.lower() + if ext == ".png": + device_path = f"/tmp/sdcard/mmcblk0p1/img/{local_path.name}" + logger.info("Uploading PNG: %s → %s", file_path, device_path) + elif ext == ".mp4": + h264_path = extract_h264_from_mp4(file_path) + device_path = f"/tmp/sdcard/mmcblk0p1/video/{h264_path.name}" + local_path = h264_path # Update local path to .h264 + logger.info("Uploading MP4 as H264: %s → %s", local_path, device_path) + else: + logger.error("Error: Unsupported file type. Only .png and .mp4 are allowed.") + return False + + if not _open_file_command(dev, device_path): + logger.error("Failed to open remote file for writing.") + return False + + if not _write_file_command(dev, str(local_path)): + logger.error("Failed to write file data.") + return False + + logger.info("Upload completed successfully.") + return True + + +def _open_file_command(dev, path: str): + logger.info("Opening remote file: %s", path) + + path_bytes = path.encode("ascii") + length = len(path_bytes) + + packet = build_command_packet_header(38) + + packet[8] = (length >> 24) & 0xFF + packet[9] = (length >> 16) & 0xFF + packet[10] = (length >> 8) & 0xFF + packet[11] = length & 0xFF + packet[12:16] = b"\x00\x00\x00\x00" + packet[16 : 16 + length] = path_bytes + + return write_to_device(dev, encrypt_command_packet(packet)) + + +def _delete_command(dev, file_path: str): + logger.info("Deleting remote file: %s", file_path) + + path_bytes = file_path.encode("ascii") + length = len(path_bytes) + + packet = build_command_packet_header(40) + packet[8] = (length >> 24) & 0xFF + packet[9] = (length >> 16) & 0xFF + packet[10] = (length >> 8) & 0xFF + packet[11] = length & 0xFF + packet[12:16] = b"\x00\x00\x00\x00" + packet[16 : 16 + length] = path_bytes + + return write_to_device(dev, encrypt_command_packet(packet)) + + +def _play_command(dev, file_path: str): + logger.info("Requesting playback for: %s", file_path) + + path_bytes = file_path.encode("ascii") + length = len(path_bytes) + + packet = build_command_packet_header(98) + + packet[8] = (length >> 24) & 0xFF + packet[9] = (length >> 16) & 0xFF + packet[10] = (length >> 8) & 0xFF + packet[11] = length & 0xFF + packet[12:16] = b"\x00\x00\x00\x00" + packet[16 : 16 + length] = path_bytes + + return write_to_device(dev, encrypt_command_packet(packet)) + + +def _play2_command(dev, file_path: str): + logger.info("Requesting alternate playback for: %s", file_path) + + path_bytes = file_path.encode("ascii") + length = len(path_bytes) + + packet = build_command_packet_header(110) + + packet[8] = (length >> 24) & 0xFF + packet[9] = (length >> 16) & 0xFF + packet[10] = (length >> 8) & 0xFF + packet[11] = length & 0xFF + packet[12:16] = b"\x00\x00\x00\x00" + packet[16 : 16 + length] = path_bytes + + return write_to_device(dev, encrypt_command_packet(packet)) + + +def _play3_command(dev, file_path: str): + logger.info("Requesting image playback for: %s", file_path) + + path_bytes = file_path.encode("ascii") + length = len(path_bytes) + + packet = build_command_packet_header(113) + + packet[8] = (length >> 24) & 0xFF + packet[9] = (length >> 16) & 0xFF + packet[10] = (length >> 8) & 0xFF + packet[11] = length & 0xFF + packet[12:16] = b"\x00\x00\x00\x00" + packet[16 : 16 + length] = path_bytes + + return write_to_device(dev, encrypt_command_packet(packet)) + + +def _write_file_command(dev, file_path: str) -> bool: + logger.info("Writing remote file from: %s", file_path) + + try: + with open(file_path, "rb") as fh: + chunk_index = 0 + while True: + data_chunk = fh.read(202752) + if not data_chunk: + break + + chunk_size = len(data_chunk) + chunk_index += 1 + logger.debug("Chunk %d size: %d bytes", chunk_index, chunk_size) + + cmd_packet = build_command_packet_header(39) + cmd_packet[8] = (chunk_size >> 24) & 0xFF + cmd_packet[9] = (chunk_size >> 16) & 0xFF + cmd_packet[10] = (chunk_size >> 8) & 0xFF + cmd_packet[11] = chunk_size & 0xFF + + response = write_to_device(dev, encrypt_command_packet(cmd_packet) + data_chunk) + if response is None: + logger.error("Write command failed at chunk %d", chunk_index) + return False + + logger.info("File write completed successfully (%d chunks).", chunk_index) + return True + except FileNotFoundError: + logger.error("File not found: %s", file_path) + return False + except Exception as exc: + logger.error("Error writing file: %s", exc) + return False + +# This class is for Turing Smart Screen newer models (5.2" / 8" / 8.8" HW rev 1.x / 9.2") +# These models are not detected as serial ports but as (Win)USB devices +class LcdCommTuringUSB(LcdComm): + def __init__(self, com_port: str = "AUTO", display_width: int = 480, display_height: int = 1920, + update_queue: Optional[queue.Queue] = None): + super().__init__(com_port, display_width, display_height, update_queue) + self.dev = find_usb_device() + # Store the current screen state as an image that will be continuously updated and sent + self.current_state = Image.new("RGBA", (self.get_width(), self.get_height()), (0, 0, 0, 0)) + + def InitializeComm(self): + send_sync_command(self.dev) + + def Reset(self): + # Do not enable the reset command for now on Turing USB models + # send_restart_device_command(self.dev) + pass + + def Clear(self): + clear_image(self.dev) + + def ScreenOff(self): + # Turing USB models do not implement a "screen off" command (that we know of): use SetBrightness(0) instead + self.Clear() + self.SetBrightness(0) + + def ScreenOn(self): + # Turing USB models do not implement a "screen off" command (that we know of): using SetBrightness() instead + self.SetBrightness() + + def SetBrightness(self, level: int = 25): + assert 0 <= level <= 100, 'Brightness level must be [0-100]' + converted = int(level / 100 * 102) + send_brightness_command(self.dev, converted) + + def SetOrientation(self, orientation: Orientation): + self.orientation = orientation + # Recreate new state with correct width/height now that screen orientation has changed + self.current_state = Image.new("RGBA", (self.get_width(), self.get_height()), (0, 0, 0, 0)) + + def DisplayPILImage(self, image: Image.Image, x: int = 0, y: int = 0, image_width: int = 0, image_height: int = 0): + if not image_height: + image_height = image.size[1] + if not image_width: + image_width = image.size[0] + + if image.size[1] > self.get_height(): + image_height = self.get_height() + if image.size[0] > self.get_width(): + image_width = self.get_width() + + if image_width != image.size[0] or image_height != image.size[1]: + image = image.crop((0, 0, image_width, image_height)) + + # Paste new image over existing screen state + self.current_state.paste(image, (x, y)) + + # Rotate image before sending to screen: all images sent to the screen are in portrait mode + if self.orientation == Orientation.LANDSCAPE: + base_image = self.current_state.transpose(Image.Transpose.ROTATE_270) + elif self.orientation == Orientation.REVERSE_LANDSCAPE: + base_image = self.current_state.transpose(Image.Transpose.ROTATE_90) + elif self.orientation == Orientation.PORTRAIT: + base_image = self.current_state.transpose(Image.Transpose.ROTATE_180) + else: # Orientation.REVERSE_PORTRAIT is initial screen orientation + base_image = self.current_state + + # total_size = len(_encode_png(base_image)) + # print("total size =", total_size/1024) + # + # if total_size > 1024*1024: + # + # # If bitmap is > 1024MB operation will timeout: compress it + # size_overflow = total_size - 1024*1024 + # ratio = 1- (size_overflow / total_size) + # print("ratio = ", ratio) + # + # base_image = compress_image(base_image, ratio) + # + # new_size = len(_encode_png(base_image)) + # print("new_size =", new_size/1024) + + + # Send PNG data + encoded = _encode_png(base_image) + send_image(self.dev, encoded) diff --git a/requirements.txt b/requirements.txt index 02dfedf2..5f78ada3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,8 @@ uptime~=3.0.1 # For System Uptime requests~=2.32.5 # HTTP library ping3~=5.1.5 # ICMP ping implementation using raw socket pyinstaller~=6.16.0 # bundles a Python application and all its dependencies into a single package +pyusb~=1.3.1 +pycryptodome~=3.23.0 # Image generation Pillow~=11.3.0; python_version < "3.10" # For Python 3.9, only Pillow 11.x is supported