48 Commits

Author SHA1 Message Date
n07070
09e588c3ff Change Error to warning when not on a Raspberry 2026-06-12 17:20:53 +02:00
n07070
0699775d35 Add welcome message 2026-06-12 17:20:44 +02:00
n07070
7d19098b61 Change management of state : we assume the printer is online. Otherwise,
because this is a real-time command, we might not get the good answer
and fail to print fast enough. See https://download4.epson.biz/sec_pubs/pos/reference_en/escpos/realtime_commands.html
2026-06-12 17:19:53 +02:00
n07070
65e4a2ad9c Manage error when no Printers are found 2026-06-12 16:35:38 +02:00
n07070
af15ed8754 Manage file too big exceptions 2026-06-04 19:45:37 +02:00
n07070
53010987f4 Update Error raising in uploads and image processing 2026-06-04 19:26:21 +02:00
n07070
175dd3385a Add content in print queue method 2026-06-04 02:32:47 +02:00
n07070
3a1d9b20fb Add skip line and catch printing errors 2026-06-04 02:32:23 +02:00
n07070
c57e2f91a2 Update getting debug env 2026-06-04 02:32:08 +02:00
n07070
9ccd2b8bdf Use textarea instead of input, easier for ASCII art 2026-06-04 02:31:53 +02:00
n07070
54678175ba Remove socketio from Worker for the moment 2026-06-04 01:27:23 +02:00
n07070
2262840f75 Apply linting to worker 2026-06-04 01:26:47 +02:00
n07070
ad3cb6231a Apply linting to web 2026-06-04 01:11:37 +02:00
n07070
adcc744e7a Apply linting to the Raspberry Pi 2026-06-04 01:04:46 +02:00
n07070
6d9db2d2aa Apply linting to printers 2026-06-04 00:39:37 +02:00
n07070
651235a610 Lint printer file 2026-06-04 00:34:49 +02:00
n07070
3c490e10b4 Apply black formatter 2026-06-04 00:31:04 +02:00
n07070
a2d1779e2b Update worker to get differents printer types 2026-06-03 23:55:04 +02:00
n07070
f841cd5628 Add queue completion method, add Printer discovery and types 2026-06-03 23:54:15 +02:00
n07070
db7e030a1f Remove printer import, remove vendor&device configuration, update
print_queue
2026-06-03 23:53:02 +02:00
n07070
1218d3fbee Remove unused import, add get_queue_completed method 2026-06-03 23:52:23 +02:00
n07070
ef613b3c10 Remove print, add task Typing 2026-06-03 23:51:38 +02:00
n07070
e549cdc64b Remove vendor and device ID from configuration because it's not needed
anymore
2026-06-03 23:51:13 +02:00
n07070
9e4ec6c1a5 Add support for QR code task 2026-06-01 21:40:28 +02:00
n07070
9e77e0980b Change the Printer class to implement multiple printer types 2026-06-01 21:40:13 +02:00
0e3cc46a41 Merge pull request 'Restructure the code and implement a printing queue' (#29) from restructure-printing-queue into master
Reviewed-on: #29
2026-05-27 00:00:56 +02:00
n07070
bbfe1936da Remove unused import 2026-05-26 23:58:47 +02:00
n07070
8134c5e892 Improve linting of printer class 2026-05-26 23:56:55 +02:00
n07070
934f766cf3 Update waiting time, update Exceptions 2026-05-26 23:53:26 +02:00
n07070
eb9e1ec200 Remove code meant for another branch ( brother-ql code ) 2026-05-26 23:50:00 +02:00
n07070
bc035508cd Update line lenght of docstring 2026-05-22 11:01:06 +02:00
n07070
cba34744f6 Update raspberry pi class to print via the print queue 2026-05-22 11:01:06 +02:00
n07070
0c8c40098c Add docstring & comments, remove dead code 2026-05-22 11:01:06 +02:00
n07070
3b640dc549 Add comments about the code structure 2026-05-22 11:01:06 +02:00
n07070
2daafe28f2 Apply linting 2026-05-22 11:01:06 +02:00
n07070
c50922790d Restructure main class to activate worker and use tasks, print queue,
update Printer
2026-05-22 11:01:06 +02:00
n07070
e8ec9b74c0 Restructure web class to use print queue and tasks 2026-05-22 11:01:06 +02:00
n07070
9dee67c333 Add worker class 2026-05-22 11:01:06 +02:00
n07070
42bf6d6496 Add printing queue objects 2026-05-22 11:01:06 +02:00
n07070
a38088bd05 Add task objects 2026-05-22 11:01:06 +02:00
n07070
cb3e0d900f Update numpy 2026-05-22 11:01:06 +02:00
n07070
c5a8019fbe Add an alert if the webcam print fails 2026-05-22 11:01:06 +02:00
n07070
e926ee9163 Update printing routes for the form 2026-05-22 11:01:06 +02:00
n07070
3f915a1b25 Fix error flashing and transmission 2026-05-22 11:01:06 +02:00
n07070
a06086521a Add new web route, restructure API route 2026-05-22 11:01:06 +02:00
n07070
ee27c62d0f Add new functions for discovery and parsing of printers, WIP 2026-05-22 11:01:06 +02:00
n07070
2a11239c1e Add new dependencies for brother ql printers 2026-05-22 11:00:43 +02:00
n07070
bd9888caf7 Downgrade python supported version for 3.13 2026-05-20 12:01:51 +02:00
11 changed files with 786 additions and 369 deletions

View File

@@ -5,8 +5,6 @@ signature = "Anonymous"
# Printer settings
[printer]
vendor_id = 0x04b8
device_id = 0x0e28
upload_folder = "src/static/uploads"
# Raspberry Pi Configuration

View File

@@ -7,7 +7,7 @@ authors = [
]
license = "AGPLv3"
readme = "README.md"
requires-python = ">=3.14"
requires-python = ">=3.13"
dependencies = [
"flask (>=3.1.3,<4.0.0)",
"numpy (>=2.3.4)",

View File

@@ -36,7 +36,6 @@ import werkzeug.exceptions
from flask_socketio import SocketIO
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from printer import Printer # The wrapper for the printer class
from raspberry import Raspberry # The Raspberry pi control Class
from web import Web # Wrapper for the web routes and API
from print_queue import PrintQueue
@@ -73,11 +72,7 @@ except OSError as e:
app.logger.debug("Config file loaded !")
# Define the USB connections here.
vendor_id = configuration_file["printer"]["vendor_id"]
device_id = configuration_file["printer"]["device_id"]
UPLOAD_FOLDER = str(configuration_file["printer"]["upload_folder"])
try:
os.mkdir(UPLOAD_FOLDER)
app.logger.debug("Directory %s created successfully.", UPLOAD_FOLDER)
@@ -88,49 +83,48 @@ except PermissionError:
sys.exit(77)
# Output the config file
if os.getenv("FLASK_DEBUG"):
if not os.getenv("FLASK_DEBUG") is None and os.getenv("FLASK_DEBUG") is True:
pprint.pprint(configuration_file)
# We define the app module used by Flask
app.secret_key = configuration_file["secrets"]["flask_secret_key"]
app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER
app.config["ALLOWED_EXTENSIONS"] = ALLOWED_EXTENSIONS
app.config["MAX_CONTENT_LENGTH"] = 10 * 1000 * 1000 # Maximum 3Mb for a file upload
app.config["MAX_CONTENT_LENGTH"] = 10 * 1000 * 1000 # Maximum 10Mb for a file upload
app.config["TEMPLATES_AUTO_RELOAD"] = True
# Printer connection
# Uses the class defined in the printer.py file
printer = Printer(app, 0x04B8, 0x0E28)
printer.init_printer()
# Find out if we are running on a Raspberry Pi
rpi = Raspberry(
printer,
app,
socketio,
configuration_file["rpi"]["button_gpio_port_number"],
configuration_file["rpi"]["indicator_gpio_port_number"],
configuration_file["rpi"]["flash_gpio_port_number"],
configuration_file["rpi"]["flash"],
)
RASPBERRY_PI_CONNECTED = rpi.is_raspberry_pi()
# Queue creation
print_queue = PrintQueue(app)
# Find out if we are running on a Raspberry Pi
rpi = Raspberry(
print_queue,
app,
configuration_file
)
RASPBERRY_PI_CONNECTED = rpi.is_raspberry_pi()
# Web & API management
web = Web(app, print_queue)
# Start worker thread
worker = PrintWorker(app, print_queue, printer, socketio)
worker.start()
# When created, the worker will try to find printers connected to the system
try:
worker = PrintWorker(app, print_queue)
worker.start()
except Exception as e:
app.logger.error("Could not start the worker because %s ", str(e))
sys.exit(-1)
# The rate limit
limiter = Limiter(
get_remote_address, app=app, default_limits=["1500 per day", "500 per hour"]
)
app.logger.info("🖶 Welcome to LittlePrynter !")
# General routes
@app.route("/")
@limiter.limit("1/second", override_defaults=False)
@@ -197,6 +191,9 @@ def web_print_img():
"No signature found for this print, using default signature : %s", str(e)
)
sign = configuration_file["defaults"]["signature"]
except werkzeug.exceptions.RequestEntityTooLarge as e:
flash("Whoops, image is too big: " + str(e), "error")
return redirect(url_for("index"))
# check if the post request has the file part
if "img" not in request.files:
@@ -304,6 +301,7 @@ def api_print_image():
return "OK", 200
# TODO: This might not depend on the Raspberry Pi
@app.route("/api/camera/picture", methods=["GET"])
def camera_picture():
"""Returns a picture taken by the camera on a raspberry pi"""
@@ -322,6 +320,12 @@ def api_queue_status():
return jsonify(web.get_queue_state())
@app.route("/api/queue/completed", methods=["GET"])
def api_queue_completed():
"""API endpoint that returns the finished tasks"""
return jsonify(web.get_queue_completed())
@app.route("/api/worker", methods=["GET"])
def api_worker_state():
"""API endpoint to get the worker state"""
@@ -403,4 +407,4 @@ def camera_status():
if __name__ == "__main__":
app.run(debug=True, use_reloader=False, host="0.0.0.0", ssl_context="adhoc")
app.run(use_reloader=False, host="0.0.0.0", ssl_context="adhoc")

View File

@@ -76,7 +76,16 @@ class PrintQueue:
"""Return current queue state"""
with self._lock:
self.app.logger.debug("Return current queue state")
return [{"task_id": t.task_id, "status": t.status} for t in self._queue]
return [
{"task_id": t.task_id, "status": t.status, "type": str(t.task_type), "content": str(t.get_print_data())}
for t in self._queue
]
def get_queue_completed(self):
"""Return completed queue elements"""
with self._lock:
self.app.logger.debug("Return completed queue elements")
return self._completed_tasks
def get_status(self, task_id):
"""Get full status info for a task"""

View File

@@ -1,115 +1,129 @@
"""
This class manages connexion to a Printer
"""
# import brother_ql
from time import sleep
import os.path
import os
from abc import ABC, abstractmethod
from dataclasses import dataclass
import time
from enum import Enum
import uuid
import threading
import usb.core
from PIL import Image, ImageEnhance
import numpy as np
# Importing the module to manage the connection to the printer.
# Importing the modules needed for each supported printer Type
import escpos.printer
from brother_ql.models import ModelsManager
from brother_ql.backends import backend_factory
from brother_ql.raster import BrotherQLRaster
from brother_ql.conversion import convert
from brother_ql.backends.helpers import send
class Printer(object):
class PrinterType(Enum):
"""
# The connection is based on the ESC/POS library
## Connection to the USB printer
## Making sure the printer is alive
## Making sure it has paper
## Define default print settings
## Print starting message, log time of first print, cut.
## Annonce readyness : return a positive pong message.
What are the capacities of a Printer ?
"""
# Is the printer ready to accept a new print ?
ready = False
EPSON = "epson"
BROTHER = "brother"
def __init__(self, app, device_id, vendor_id):
super(Printer, self).__init__()
# For Brother-QL Printers
@dataclass
class PrinterInfo:
"""
Brother-QL printer information
"""
identifier: str
backend: str
protocol: str
vendor_id: str
product_id: str
serial_number: str
name: str = "Brother QL Printer"
model: str = "QL-570"
status: str = "unknown"
label_type: str = "unknown"
label_size: str = "unknown"
label_width: int = 0
label_height: int = 0
def __getitem__(self, item):
return getattr(self, item)
def __setitem__(self, key, value):
setattr(self, key, value)
class Printer(ABC):
"""
If it outputs printed paper and speaks like a printer, then it must be a printer.
"""
def __init__(self, app, vendor_id, device_id, printer_type: PrinterType):
"""
We initialize a Printer via it's USB connexion, and generate a unique ID
"""
self.id = uuid.uuid4()
self.app = app
self.ready = False
self.printer = None
self.device_id = device_id
self.vendor_id = vendor_id
self.device_id = device_id
self.ready = False
self.printer_type = printer_type
self._lock = threading.Lock()
@abstractmethod
def _has_paper(self) -> bool:
"""Check if the printer has papier"""
@abstractmethod
def _state(self) -> bool:
"""Reports the state of the Printer"""
@abstractmethod
def print_task(self, task_type, data) -> None:
"""Takes a PrintTask and executes it"""
class EscPosPrinter(Printer):
"""
Create a new ESC/POS based printer.
"""
def __init__(self, app, vendor_id, device_id):
"""
Create a connexion to a ESC/POS Printer via USB,
Making sure the printer is alive,
Making sure it has paper,
Define default print settings
"""
super().__init__(app, vendor_id, device_id, printer_type=PrinterType.EPSON)
self.printer = None
self.usb_args = {}
self.usb_args["idVendor"] = self.device_id
self.usb_args["idProduct"] = self.vendor_id
self.usb_args["idVendor"] = self.vendor_id
self.usb_args["idProduct"] = self.device_id
def check_paper(self) -> bool:
"""
On printers that support it, we check that the printer has paper
"""
self.app.logger.debug("Checking paper status...")
self.printer.open(self.usb_args)
status = self.printer.paper_status()
match status:
case 0:
self.app.logger.error("Printer has no more paper, aborting...")
self.printer.close()
raise RuntimeError("No more paper in the printer")
case 1:
self.app.logger.warning(
"Printer needs paper to be changed very soon ! "
)
self.printer.close()
case 2:
self.app.logger.debug("Printer has paper, good to go")
self.printer.close()
try:
# This also calls open(), which we need to close()
# or else the device will appear as busy.
p = escpos.printer.Usb(self.vendor_id, self.device_id, 0, profile="TM-P80")
except escpos.exceptions.DeviceNotFoundError as e:
self.app.logger.error(
"The USB device is not plugged in : %s",
str(e),
)
except Exception as e:
self.app.logger.error("Printer could not be connected : %s ", str(e))
def init_printer(self):
"""
Check if the printer online ? Is the communication with the printer successfull ?
"""
# TODO: This could happen directly when creating a new Printer class
if os.getenv("FLASK_DEBUG"):
waiting_elapsed = 15
else:
waiting_elapsed = 10
self.app.logger.debug("Waiting for printer to get online...")
while not self.ready:
try:
# This also calls open(), which we need to close()
# or else the device will appear as busy.
p = escpos.printer.Usb(
self.device_id, self.vendor_id, 0, profile="TM-P80"
)
except Exception as e:
self.app.logger.error(
"The USB device is not plugged in, trying again %s : %s",
waiting_elapsed,
str(e),
)
pass
try:
if p.is_online():
self.ready = True
self.app.logger.debug("Printer online !")
except Exception as e:
self.app.logger.error(
"Error while getting the printer online %s : %s",
waiting_elapsed,
str(e),
)
pass
sleep(1)
waiting_elapsed -= 1
if waiting_elapsed < 1:
self.app.logger.error(
"Printer took more than 30 seconds to get online, aborting..."
)
waiting_elapsed = 1 # Reset the waiting time for the next print.
return False
try:
if p.is_online():
self.app.logger.debug("Printer online !")
except Exception as e:
raise e
# Setting up the printing options.
p.set(
@@ -130,15 +144,38 @@ class Printer(object):
# Beware : if we print every time the printer becomes ready, it means
# we are printing before and after every print !
self.printer = p
self.printer.close() # We close the connexion to the Printer
try:
self._has_paper()
except Exception as e:
raise e
self.ready = True
self.printer.close()
self.check_paper()
return True
def _print_sms(self, msg, signature="", bold=False):
def _has_paper(self):
"""Check if the printer has paper left"""
self.app.logger.debug("Checking paper status...")
self.printer.open(self.usb_args)
status = self.printer.paper_status()
match status:
case 0:
self.app.logger.error("Printer has no more paper, aborting...")
self.printer.close()
raise RuntimeError("No more paper in the printer")
case 1:
self.app.logger.warning(
"Printer needs paper to be changed very soon ! "
)
self.printer.close()
return True
case 2:
self.app.logger.debug("Printer has paper, good to go")
self.printer.close()
return True
def _print_txt(self, msg, signature="", bold=False):
self.ready = False
if not isinstance(msg, str):
self.app.logger.error(
"It is not possible to print a " + str(type(msg)) + ", only strings."
@@ -154,7 +191,7 @@ class Printer(object):
self.app.logger.warning(
"Could not print message of this length: " + str(len(clean_msg))
)
raise Exception(
raise RuntimeError(
"Could not print message of this length :"
+ str(len(clean_msg))
+ ", needs to be below 4096 caracters long."
@@ -164,7 +201,7 @@ class Printer(object):
self.app.logger.warning(
"Could not print signature of this length: " + str(len(clean_signature))
)
raise Exception(
raise RuntimeError(
"Could not print signature of this length :"
+ str(len(clean_signature))
+ ", needs to be below 256 caracters long."
@@ -178,6 +215,7 @@ class Printer(object):
self.printer.textln(clean_msg)
if clean_signature:
self.printer.textln(clean_signature)
self.printer.textln()
self.printer.close()
except Exception as e:
self.app.logger.error("Unable to print because : " + str(e))
@@ -186,9 +224,10 @@ class Printer(object):
) from e
self.app.logger.info("Printed text")
return True
self.ready = True
def _print_img(self, path, signature="", center=True, process=False):
self.ready = False
clean_signature = str(signature)
if len(signature) > 256:
@@ -208,36 +247,39 @@ class Printer(object):
+ str(path)
+ " wasn't found. Please try again."
)
else:
self.app.logger.debug("Printing file from " + str(path))
self.app.logger.debug("Printing file from " + str(path))
if process:
try:
self.app.logger.debug("Proccessing the image")
path = _process_image(self, path)
processed_path = _process_image(self, path)
except RuntimeError as e:
self.app.logger.error(
"Error while processing the image, aborting print : %s", str(e)
)
raise e
else:
processed_path = path
self.app.logger.warning("Not proccessing the image")
try:
self.printer.open(self.usb_args)
self.printer.image(path, center=center)
self.printer.image(processed_path, center=center)
self.printer.textln(signature)
self.printer.close()
self.app.logger.debug("Printed an image : " + str(path))
self.app.logger.debug("Printed an image : " + str(processed_path))
except Exception as e:
self.app.logger.error(str(e))
raise RuntimeError("Could not print the picture") from e
finally:
try:
os.remove(path)
os.remove(processed_path)
except OSError as e:
raise e
self.app.logger.debug("Removed image : " + str(processed_path))
self.app.logger.debug("Removed image : " + str(path))
try:
@@ -249,22 +291,25 @@ class Printer(object):
raise RuntimeError("Could not close the printer connexion. ") from e
self.app.logger.info("Printed a picture")
return True
self.ready = True
def _qr(self, content):
self.ready = False
try:
self.printer.open(self.usb_args)
self.printer.qr(content, center=True)
self.printer.textln(content)
self.printer.close()
except Exception as e:
except RuntimeError as e:
self.printer.close()
self.app.logger.error(str(e))
return False
raise e
self.app.logger.info("Printed a QR")
return True
self.ready = True
def _cut(self):
self.ready = False
try:
self.printer.open(self.usb_args)
self.printer.cut()
@@ -275,34 +320,236 @@ class Printer(object):
raise e
self.app.logger.info("Did a cut")
return True
self.ready = True
def _state(self) -> bool:
has_paper = self._has_paper()
is_ready = self.ready
self.app.logger.debug("Has paper : %s " , has_paper )
self.app.logger.debug("Ready : %s " , is_ready )
return is_ready and has_paper # and is_online
def print_task(self, task_type, data):
"""Execute actual print based on task type"""
match (task_type.value):
case "text":
self._print_sms(data["txt"], signature=data["sign"])
case "image":
self._print_img(
data["img"], signature=data["sign"], process=data["process"]
with self._lock:
self.app.logger.debug("Acquired lock to start print")
i_m_ready = self._state()
while not i_m_ready:
self.app.logger.debug("Waiting for the printer to become ready, current state %s ", str(i_m_ready))
i_m_ready = self._state()
time.sleep(0.3)
self.app.logger.debug("Checked state to start printing : %s", self._state())
self.ready = False
try:
self.app.logger.debug("Checking task type")
match (task_type.value):
case "text":
self._print_txt(data["txt"], signature=data["sign"])
self.ready = True
case "image":
self._print_img(
data["img"], signature=data["sign"], process=data["process"]
)
self.ready = True
case "cut":
self._cut()
self.ready = True
case "qr":
self._qr(data["txt"])
self.ready = True
case _:
raise RuntimeError("This task type is not supported")
except Exception as e:
self.app.logger.debug("Exception occured while printing %s", str(e))
self.ready = True
raise RuntimeError from e
class BrotherPrinter(Printer):
"""
Manages connexion and capabilities of a BrotherQL Printer
"""
def __init__(self, app, vendor_id, device_id):
super().__init__(
app, vendor_id="", device_id="", printer_type=PrinterType.BROTHER
)
self.printer = None
self.usb_args = {}
self.usb_args["idVendor"] = self.device_id
self.usb_args["idProduct"] = self.vendor_id
self.model_manager = ModelsManager()
# Code taken from https://github.com/5shekel/printit/blob/master/printer_utils.py
backend = backend_factory("pyusb")
available_devices = backend["list_available_devices"]()
for printer in available_devices:
self.app.logger.debug(f"Found device: {printer}")
identifier = printer["identifier"]
parts = identifier.split("/")
if len(parts) < 4:
self.app.logger.warning(
f"Skipping device with invalid identifier format: {identifier}"
)
case "cut":
self._cut()
case _:
raise RuntimeError("This task type is not supported")
continue
protocol = parts[0]
# device_info = parts[2]
serial_number = parts[3]
try:
product_id_int = int(self.device_id, 16)
for m in self.model_manager.iter_elements():
if m.product_id == product_id_int:
model = m.identifier
break
self.app.logger.debug(f"Matched printer model: {model}")
except ValueError:
self.app.logger.warning(f"Invalid product ID format: {m.product_id}")
self.printer_info = PrinterInfo(
identifier=identifier,
backend="pyusb",
model=model,
protocol=protocol,
vendor_id=vendor_id,
product_id=self.device_id,
serial_number=serial_number,
)
self.ready = True
def _has_paper(self):
raise NotImplementedError("This printer model does not support this.")
def _state(self):
return self.ready
def _print_img(self, data):
"""
Print a raster image via a Brother QL printer
"""
self.ready = False
label_type = "102"
rotate = 0
dither = False
try:
# Prepare the image for printing
qlr = BrotherQLRaster(self.printer_info["model"])
instructions = convert(
qlr=qlr,
images=[data["img"]],
label=label_type,
rotate=rotate,
threshold=70,
dither=dither,
compress=True,
red=False,
dpi_600=False,
hq=False,
cut=True,
)
# Debug logging
if os.getenv("FLASK_DEBUG"):
self.app.logger.debug(f"""
Print parameters:
- Label type: {label_type}
- Rotate: {rotate}
- Dither: {dither}
- Model: {self.printer_info['model']}
- Backend: {self.printer_info['backend']}
- Identifier: {self.printer_info['identifier']}
""")
# Try to print using Python API
# send() = status = {
# 'instructions_sent': True, # The instructions were sent to the printer.
# 'outcome': 'unknown', # String description of the outcome of the sending operation like: 'unknown', 'sent', 'printed', 'error'
# 'printer_state': None, # If the selected backend supports reading back the printer state, this key will contain it.
# 'did_print': False, # If True, a print was produced. It defaults to False if the outcome is uncertain (due to a backend without read-back capability).
# 'ready_for_next_job': False, # If True, the printer is ready to receive the next instructions. It defaults to False if the state is unknown.
# }
status = send(
instructions=instructions,
printer_identifier=self.printer_info["identifier"],
backend_identifier="pyusb",
)
if (
not status["did_print"]
or status["outcome"] == "error"
or status["outcome"] == "unknown"
):
raise RuntimeError("Failed to print using Python API")
if status["printer_state"]:
self.ready = bool(status["printer_state"])
else:
self.ready = True
except usb.core.USBError as e:
# Treat timeout errors as successful since they often occur after print completion
if e.errno == 110: # Operation timed out
self.app.logger.debug(
"USB timeout occurred - this is normal and the print likely completed"
)
self.app.logger.debug("Print completed (timeout is normal)")
self.ready = True
error_msg = f"USBError encountered: {e}"
self.app.logger.debug(error_msg)
raise RuntimeError from e
except Exception as e:
error_msg = f"Unexpected error during printing: {str(e)}"
self.app.logger.debug(error_msg)
raise RuntimeError from e
def print_task(self, task_type, data):
"""Execute actual print based on task type"""
with self._lock:
if self._state:
self._state = False
match (task_type.value):
case "image":
self._print_img(data["img"])
self._state = True
case "cut":
# The cut happens by default on Brother QL printers.
self._state = True
case _:
raise RuntimeError("This task type is not supported")
else:
raise RuntimeError("The printer is not ready to print yet !")
# These values are by default for now
# raise NotImplementedError("This printer type is not implemented yet")
def _process_image(self, path):
brightness_factor = 1.5 # Used only if image is too dark
brightness_threshold = 100 # Brightness threshold (0255)
contrast_factor = 0.6 # Less than 1.0 = lower contrast
contrast_factor = 2 # Less than 1.0 = lower contrast
max_width = 575
max_height = 1000
with Image.open(path) as original_img:
# Convert to RGB if needed (JPEG doesn't support alpha)
if original_img.mode in ("RGBA", "P"):
self.app.logger.debug("Converting the image to RGB from RGBA")
self.app.logger.debug("Converting the image from RGBA to RGBA")
original_img = original_img.convert("RGB")
# Resize while maintaining aspect ratio
@@ -310,7 +557,8 @@ def _process_image(self, path):
self.app.logger.debug("Resized the image")
# # Convert to grayscale for dithering
# dithered_img = original_img.convert("L").convert("1") # Dithering using default method (FloydSteinberg)
# dithered_img = original_img.convert("L").convert("1")
# Dithering using default method (FloydSteinberg)
# self.app.logger.debug("Dithered the image")
# Compute brightness of original image (grayscale average)
@@ -331,107 +579,19 @@ def _process_image(self, path):
self.app.logger.debug(
f"Image too dark, increasing brightness by a factor of {brightness_factor:.2f}"
)
enhancer = ImageEnhance.Brightness(original_img)
original_img = enhancer.enhance(brightness_factor)
enhancer = ImageEnhance.Brightness(grayscale)
grayscale = enhancer.enhance(brightness_factor)
# # Reduce contrast
# contrast_enhancer = ImageEnhance.Contrast(original_img)
# original_img = contrast_enhancer.enhance(contrast_factor)
# Computer current contrast of grayscale image
contrast = np.clip(np.std(np.array(grayscale)), 0, 255)
self.app.logger.debug("Standard deviation of the contrast : %s", contrast)
# # Enhance contrast
contrast_enhancer = ImageEnhance.Contrast(grayscale)
original_img = contrast_enhancer.enhance(contrast_factor)
# Convert to JPEG and save
jpeg_path = os.path.splitext(path)[0] + "_processed.jpg"
original_img.save(jpeg_path, format="JPEG", quality=95, optimize=True)
grayscale.save(jpeg_path, format="JPEG", quality=95, optimize=True)
self.app.logger.debug("Processed and saved image.")
return jpeg_path
def discover_printers():
"""
We try to find all the connected printers ( 0 or n ) to this system.
For every type of supported printer, we try to autodiscover them.
http://www.linux-usb.org/usb.ids A list of USB vendor IDs
04b8 Seiko Epson Corp.
04f9 Brother Industries, Ltd
"""
def find_and_parse_borther_ql_printer():
## We might be able to no use this because there is a `discover` command in https://github.com/pklaus/brother_ql#usage
## Code stolen from https://framagit.org/stickoeur/diagnostickoeur/-/blob/no-masters/printit.py?ref_type=heads
"""Find and parse Brother QL printer information."""
model_manager = ModelsManager()
# Debug print to show we're searching
# print("Searching for Brother QL printer...")
for backend_name in ["pyusb", "linux_kernel"]:
try:
# print(f"Trying backend: {backend_name}")
backend = backend_factory(backend_name)
available_devices = backend["list_available_devices"]()
# print(f"Found {len(available_devices)} devices with {backend_name} backend")
for printer in available_devices:
# print(f"Found device: {printer}")
identifier = printer["identifier"]
parts = identifier.split("/")
if len(parts) < 4:
# print(f"Skipping device with invalid identifier format: {identifier}")
continue
protocol = parts[0]
device_info = parts[2]
serial_number = parts[3]
try:
vendor_id, product_id = device_info.split(":")
except ValueError:
# print(f"Invalid device info format: {device_info}")
continue
# Default model
model = "QL-570"
# Try to match product ID to determine actual model
try:
product_id_int = int(product_id, 16)
for m in model_manager.iter_elements():
if m.product_id == product_id_int:
model = m.identifier
break
# print(f"Matched printer model: {model}")
except ValueError:
# print(f"Invalid product ID format: {product_id}")
continue
printer_info = {
"identifier": identifier,
"backend": backend_name,
"model": model,
"protocol": protocol,
"vendor_id": vendor_id,
"product_id": product_id,
"serial_number": serial_number,
}
# print(f"Found printer: {printer_info}")
return printer_info
except Exception as e:
# print(f"Error with backend {backend_name}: {str(e)}")
continue
print("No Brother QL printer found")
return None
def fint_and_parse_epson_printer():
pass

160
src/printers.py Normal file
View File

@@ -0,0 +1,160 @@
"""
A collection of Printers.
It has methods to discover printers, and provides an interface for
the methods expected from printers.
"""
from collections.abc import Set
import usb.core
import usb.util
from printer import Printer, EscPosPrinter, BrotherPrinter
class Printers:
"""
Finds and creates a set of Printer that can be used by the Workers to print.
"""
def __init__(self, app):
"""
Discover printers connected to the computer and return a Collection of Printer()
"""
self.app = app
self.printers = self._discover_printers()
def _discover_printers(self) -> Set[Printer]:
"""
Gets connected USB printer devices using the pyusb library.
We analyse the USB devices, get the ones that match
the printer class ( 7 ) and for Brother and EPSON printers,
try to create a Printer object that can be used by the Worker class
to execute prints.
Returns a set of Printer
"""
self.app.logger.debug("Discovering USB Devices connected to this system")
printers = set()
# Find all connected USB devices
devices = usb.core.find(find_all=True, custom_match=_FindClass(7))
if not devices:
self.app.logger.warning(
"No USB devices of class 7 ( printers ) found or pyusb could not access the bus."
)
raise RuntimeError(
"No USB devices of class 7 ( printers ) found or pyusb could not access the bus."
)
for dev in devices:
# Attempt to get the manufacturer and product strings
try:
manufacturer = usb.util.get_string(dev, dev.iManufacturer)
except Exception:
manufacturer = "Unknown"
try:
product = usb.util.get_string(dev, dev.iProduct)
except Exception:
product = "Unknown"
self.app.logger.debug(
"Looking at %s %s (%s:%s)",
manufacturer,
product,
hex(dev.idVendor),
hex(dev.idProduct),
)
if manufacturer == "EPSON":
try:
# We create a new EscPosPrinter()
self.app.logger.debug("Trying to creat a new EPSON printer")
prid = dev.idProduct
vendir = dev.idVendor
escpos_printer = EscPosPrinter(
self.app, vendor_id=vendir, device_id=prid
)
except Exception as e:
raise e
# If the object creation is successfull, we add it to the list of Printers
printers.add(escpos_printer)
self.app.logger.debug("Found a %s printer", manufacturer)
# We already found the type of printer,
# we don't need an extra comparaison.
continue
# or a Brother Printer
if manufacturer == "Brother":
try:
# We create a new BrotherPrinter()
self.app.logger.debug("Trying to creat a new BROTHER printer")
prid = dev.idProduct
vendir = dev.idVendor
brother_printer = BrotherPrinter(
self.app, vendor_id=vendir, device_id=prid
)
except Exception as e:
self.app.logger.error(
"Could not create a %s printer class with %s:%s" , product,
dev.idVendor,
dev.idProduct,
)
raise e
# If the object creation is successfull, we add it to the list of Printers
printers.add(brother_printer)
self.app.logger.debug("Found a %s printer" , manufacturer)
self.app.logger.debug("Found %s printers" , len(printers))
if len(printers) < 1:
self.app.logger.warning("Not printers found ! Please plug in a Printer and restart the program.")
raise RuntimeError("No printers found")
return printers
def any(self) -> Printer:
"""
Return a dict key: UUID, value: Printer, with any connected printer.
"""
if len(self.printers) > 0:
for i in self.printers:
return i
else:
raise RuntimeError("No printers available")
# def get_printer(self, printer_type):
# """
# Return a specific printer
# printer_type -- a printer type
# """
# return NotImplementedError()
class _FindClass:
"""
Use by usb.core to modify the way USB devices are found
Taken from pyUSB documentation on Github
"""
def __init__(self, class_):
self._class = class_
def __call__(self, device):
# first, let's check the device
if device.bDeviceClass == self._class:
return True
# ok, transverse all devices to find an
# interface that matches our class
for cfg in device:
# find_descriptor: what's it?
intf = usb.util.find_descriptor(cfg, bInterfaceClass=self._class)
if intf is not None:
return True
return False

View File

@@ -10,12 +10,12 @@ import io # To check if we are on a Raspberry Pi
import subprocess
import os
from time import sleep, gmtime, strftime
from flask_socketio import SocketIO
from gpiozero import Button, LED, DigitalOutputDevice
from PIL import Image
from task import TextTask, ImageTask, CutTask
class Raspberry():
class Raspberry:
"""
This class will manage three things :
- Connecting to a USB webcam
@@ -23,29 +23,22 @@ class Raspberry():
- Activating a flash ( or light )
- Flash an indicator light
# pylint: disable=too-many-instance-attributes
# dede
"""
def __init__(
self,
print_queue,
app,
socketio,
button_gpio_port_number,
indicator_gpio_port_number,
flash_gpio_port_number,
is_flash_present,
configuration_file
):
self.print_queue = print_queue
self.socketio = socketio
self.app = app
self.flash_gpio = flash_gpio_port_number
self.is_flash_present = is_flash_present
self.button_gpio = button_gpio_port_number
self.led_gpio = indicator_gpio_port_number
self.configuration_file = configuration_file
self.image_path = self.app.config["UPLOAD_FOLDER"] + "/image.jpg"
def is_raspberry_pi(self, raise_on_errors=False):
def is_raspberry_pi(self):
"""
Checking if we are on a Raspberry Pi by checking
information on the /proc/cpuinfo file
@@ -72,9 +65,10 @@ class Raspberry():
return False
if not found:
self.app.logger.error(
"Couldn't get sufficient hardware information from /proc/cpuinfo, Unable to determine if we are on a Raspberry Pi."
self.app.logger.warning(
"Couldn't get sufficient hardware information from /proc/cpuinfo"
)
self.app.logger.warning("Unable to determine if we are on a Raspberry Pi.")
return False
except IOError:
self.app.logger.error("Unable to open `/proc/cpuinfo`.")
@@ -83,28 +77,38 @@ class Raspberry():
self.app.logger.debug("It seems we are on a Raspberry Pi")
try:
self.initialise_gpio()
self._initialise_gpio()
except Exception as e:
self.app.logger.debug("Could not init GPIO : " + str(e))
raise e
return True
def initialise_gpio(self):
def _initialise_gpio(self):
"""
Set GPIO ports from configuration and activate them
to show the user it's working.
"""
self.app.logger.debug("Initializing GPIO")
self.led = LED(self.led_gpio)
self.led = LED(self.configuration_file["rpi"]["indicator_gpio_port_number"])
self.app.logger.debug("Activated indicator LED")
self.indicator_countdown(iters=3)
self.button = Button(self.button_gpio, pull_up=True, bounce_time=0.1)
self.button = Button(
self.configuration_file["rpi"]["button_gpio_port_number"],
pull_up=True, bounce_time=0.1
)
self.button.when_pressed = self.on_button_pressed
self.app.logger.debug("Activated button")
# The "flash" is a relay-controlled device ( light bulb for example )
self.flash = DigitalOutputDevice(self.flash_gpio)
self.flash = DigitalOutputDevice(self.configuration_file["rpi"]["flash_gpio_port_number"])
self.flash_toggle()
self.app.logger.debug("Activated flash")
def indicator_countdown(self, iters=10, multi=10):
"""
Activates the LED faster and faster to show a countdown
"""
for i in range(iters, 0, -1):
self.led.on()
sleep(i / multi)
@@ -112,7 +116,10 @@ class Raspberry():
sleep(i / multi)
def indicator_led(self, timing=0.2, l=5):
for i in range(l):
"""
Turns on the indicator LED for a certain period of time
"""
for _ in range(l):
self.app.logger.debug("LED turned on")
self.led.on()
sleep(timing)
@@ -121,6 +128,9 @@ class Raspberry():
sleep(timing)
def flash_toggle(self):
"""
Flashes the flash
"""
self.app.logger.debug("Flash turned on")
self.flash.on()
sleep(0.3)
@@ -128,6 +138,9 @@ class Raspberry():
self.app.logger.debug("Flash turned off")
def take_picture(self):
"""
Takes a picture via the USB webcam
"""
# Validate if the image path is valid
if not os.path.isdir(os.path.dirname(self.image_path)):
self.app.logger.error(
@@ -173,6 +186,9 @@ class Raspberry():
position="bottom_right",
margin=10,
):
"""
Takes an image and overlays it with a another picture.
"""
try:
image = Image.open(image_path).convert("RGBA")
logo = Image.open(logo_path).convert("RGBA")
@@ -199,7 +215,9 @@ class Raspberry():
y = image.height - logo.height - margin
else:
raise ValueError(
"Invalid position. Choose from 'bottom_right', 'top_left', 'top_right', or 'bottom_left'."
"Invalid position." +
"Choose from 'bottom_right', 'top_left', " +
" 'top_right', or 'bottom_left'."
)
# Composite the logo onto the image
@@ -217,6 +235,9 @@ class Raspberry():
return True
def crop_to_square(self, image_path, output_path=None):
"""
Crop an image so that it becomes a square
"""
try:
image = Image.open(image_path)
width, height = image.size
@@ -244,6 +265,10 @@ class Raspberry():
return True
def on_button_pressed(self):
"""
When a button press is detected, a picture is taken from the webcam
and added to the print queue.
"""
self.app.logger.debug("Button has been pressed")
self.led.on()
self.app.logger.debug("Counting down")
@@ -263,8 +288,10 @@ class Raspberry():
self.app.logger.debug("Printing picture")
self.led.on()
self.crop_to_square(self.image_path)
self.print_queue.enqueue(ImageTask(self.image_path,signature="",process=True))
self.print_queue.enqueue(TextTask(content="Imprimé par LittlePrynter", signature=""))
self.print_queue.enqueue(ImageTask(self.image_path, signature="", process=True))
self.print_queue.enqueue(
TextTask(content="Imprimé par LittlePrynter", signature="")
)
time = strftime("%Y-%m-%d %H:%M", gmtime())
self.print_queue.enqueue(TextTask(content=time, signature=""))
self.print_queue.enqueue(CutTask())

View File

@@ -14,11 +14,11 @@ to print at the same time.
We can also delay and store printing tasks until a printer becomes
available if none is online.
"""
from abc import ABC, abstractmethod
## See https://docs.python.org/3/library/abc.html to learn more about this
# from dataclasses import dataclass
from enum import Enum
import uuid
@@ -28,9 +28,11 @@ class TaskType(Enum):
"""
The different tasks supported by the printers
"""
TEXT = "text"
IMAGE = "image"
CUT = "cut"
QR = "qr"
class PrintTask(ABC):
@@ -38,18 +40,15 @@ class PrintTask(ABC):
A print task holds information about what we are looking to print.
"""
def __init__(self, task_type):
def __init__(self, task_type: TaskType):
self.task_id = self._generate_id()
self.task_type = task_type
self.status = "pending" # pending, processing, completed, failed
print("Created a new " + str(self.task_type) + " with ID " + self.task_id)
@abstractmethod
def get_print_data(self):
"""Return data formatted for printer"""
def _generate_id(self):
# Generate unique task ID
return str(uuid.uuid4())
@@ -69,6 +68,18 @@ class TextTask(PrintTask):
return {"txt": self.content, "sign": self.signature}
class QRTask(TextTask):
"""This task prints a QR-Code, the signature is ignore and is always the content itself"""
def __init__(self, content):
super().__init__(content, signature="")
self.content = content
self.signature = content
def get_print_data(self):
return {"txt": self.content, "sign": self.signature}
class ImageTask(PrintTask):
"""
This tasks represents a image content ( in the form of it's path ), and it's signature.

View File

@@ -6,7 +6,8 @@
<h3 class="card-header">Print a short message</h3>
<div class="card-body">
<form class="form-group" action="/web/print/sms" method="post">
<input class="form-control" type="text" name="txt" placeholder="200 chars or less " maxlength="200" required><br>
<textarea class="form-control" type="text" name="txt" placeholder="4096 chars or less " maxlength="4096"></textarea>
<br>
<input class="form-control" type="text" name="signature" placeholder="Signature or pseudo" maxlength="200"><br>
<input class="btn btn-primary float-right" type="submit" value="Imprimer" name="imprimer">
</form>

View File

@@ -1,13 +1,15 @@
from flask import flash
from werkzeug.utils import secure_filename
import time
"""
Manage all of the inputs from a web source
"""
import os
from werkzeug.utils import secure_filename
from task import TextTask, ImageTask, CutTask
class Web(object):
class Web:
"""Web is the class that gets all of the information from web calls
( API and Web page ) and provides checks before sending stuff to printing"""
( API and Web page ) and provides checks before sending stuff to printing"""
def __init__(self, app, print_queue):
super(Web).__init__()
@@ -42,7 +44,7 @@ class Web(object):
file_uploaded = self.upload_file(image)
except Exception as e:
self.app.logger.error(e)
raise RuntimeError("Could not upload file") from e
raise RuntimeError("Could not upload file : " + str(e)) from e
if file_uploaded:
self.app.logger.debug("File has been uploaded, printing...")
@@ -66,16 +68,19 @@ class Web(object):
return True
def login(self, username: str, password: str) -> bool:
"""Not implemented"""
return
# def login(self, username: str, password: str) -> bool:
# """Not implemented"""
# return
def logout(self, username: str, password: str) -> bool:
"""Not implemented"""
return
# def logout(self, username: str, password: str) -> bool:
# """Not implemented"""
# return
def allowed_file(self, filename) -> bool:
self.app.logger.debug("Is the filename allowed ?")
"""
Check if the file extension is allowed
"""
self.app.logger.debug("Checking if the file extension is allowed")
return (
"." in filename
and filename.rsplit(".", 1)[1].lower()
@@ -83,8 +88,11 @@ class Web(object):
)
def upload_file(self, image) -> bool:
"""
Save the file after executing checks on it
"""
self.app.logger.debug("Validating file")
if image:
if not image is None or not image == "":
if self.allowed_file(image.filename):
filename = secure_filename(image.filename)
self.app.logger.debug("File valid")
@@ -92,24 +100,23 @@ class Web(object):
image.save(os.path.join(self.app.config["UPLOAD_FOLDER"], filename))
except OSError as e:
self.app.logger.error("Could not save file %s", e)
return False
raise RuntimeError("An OS error occured while uploading this file : " + str(e)) from e
self.app.logger.debug(
"File saved to "
+ str(os.path.join(self.app.config["UPLOAD_FOLDER"], filename))
)
return True
else:
self.app.logger.error(
"Could not save file because the filename is forbidden"
)
return False
else:
self.app.logger.error(
"Could not save file, it seems to be null ? : " + str(filename)
)
return False
self.app.logger.error(
"Could not save file because the filename is forbidden"
)
raise RuntimeError("This file type is forbidden.")
def get_queue_state(self):
"""Return current queue state"""
return self.print_queue.get_queue_state()
def get_queue_completed(self):
"""Return completed queue elements"""
return self.print_queue.get_queue_completed()

View File

@@ -1,84 +1,123 @@
# This is the main printing thread
# As explained in the task file, this is where we command
# printing to happen.
"""
This is the main printing thread. A worker thread consums Tasks from
a PrintQueue, while trying to find available printers.
"""
import threading
import time
from printers import Printers
class PrintWorker(threading.Thread):
def __init__(self, app, print_queue, printer, socketio=None):
"""
A thread used to consume Tasks added to a Print Queue.
On initialisation, the worker will try to find Printers,
and on each print, choose an available Printer from a list of Printers.
If a print fails, it's not retried, but will be added to a list of completed
tasks.
"""
def __init__(self, app, print_queue):
super().__init__(daemon=True)
self.app = app
self.print_queue = print_queue
self.printer = printer
self.socketio = socketio # Optional
self.printer = None
self._lock = threading.Lock()
self.running = True
self.state = "idle" # idle, printing, dead, drinking-a-beer
self.app.logger.debug("Ho great, I'm alive... I'm ready to work another day...")
try:
self.printers = Printers(self.app)
self.printers_obj = self.printers.printers
self.printers = iter(self.printers.printers)
except RuntimeError as e:
self.app.logger.warning("Could not get any Printers")
raise e
def run(self):
"""Background thread that processes queue items"""
self.app.logger.info("Worker started working.")
self.app.logger.debug("Worker %s started working.", threading.get_ident())
self.app.logger.debug("Current threads : %s" , threading.active_count())
self.app.logger.debug("Threads actives : %s " , threading.enumerate())
while True:
if not self.running or not self.printer.ready:
# If the printer is dead or asleep, it can't work.
if not self.running:
time.sleep(0.2)
continue
try:
task = self.print_queue.dequeue()
except Exception as e:
self.app.logger.error("Could not get a new task ! %s ", str(e))
raise RuntimeError(
"We could not get a new task because " + str(e)
) from e
if task:
# If we have no available printer, we look at the list printers
# we know about, and try to find one that is available.
# When we find a printer, we acquire it
# When we are finished with a printer, we release it to the world.
while not self.printer or not self.printer.ready:
time.sleep(1)
try:
self.app.logger.info("Got a new task")
self.app.logger.debug("Got task %s", task.task_id)
self.state = "printing"
task.status = "processing"
self._emit_status(task.task_id, "processing")
self.app.logger.debug("Changing printers")
self.printer = next(self.printers)
self.app.logger.debug(
"The worker got a %s printer and it's %s",
self.printer.printer_type,
"Ready" if self.printer.ready else "Not ready",
)
except Exception as e:
self.app.logger.error("No printer detected" + str(e))
self.printer = None
print_data = task.get_print_data()
if self.state != "idle":
self.app.logger("We are not idle, waiting...")
time.sleep(1)
continue
self.state = "printing"
with self._lock:
try:
task = self.print_queue.dequeue()
except Exception as e:
self.app.logger.error("Could not get a new task ! %s ", str(e))
self.state = "idle"
raise RuntimeError(
"We could not get a new task because " + str(e)
) from e
if task:
try:
self.printer.print_task(task.task_type, print_data)
self.app.logger.info("Got a new task")
self.app.logger.debug("Got task %s", task.task_id)
task.status = "processing"
print_data = task.get_print_data()
try:
self.printer.print_task(task.task_type, print_data)
except RuntimeError as e:
self.state = "idle"
self.app.logger.error("Could not print : %s", str(e))
raise e
task.status = "completed"
self.print_queue.mark_completed(task.task_id, "completed")
self.app.logger.debug(
"Finished printing task %s " , task.task_id
)
self.state = "idle"
except RuntimeError as e:
self.app.logger.error("Could not print : %s", str(e))
raise e
task.status = "failed"
self.state = "idle"
self.print_queue.mark_completed(task.task_id, "failed")
self.app.logger.error(
"Could not print task %s because %s " , task.task_id, str(e)
)
task.status = "completed"
self.print_queue.mark_completed(task.task_id, "completed")
self._emit_status(task.task_id, "completed")
except RuntimeError as e:
task.status = "failed"
self.print_queue.mark_completed(task.task_id, "failed")
self._emit_status(task.task_id, "failed", error=str(e))
print(f"Print task {task.task_id} failed: {e}")
else:
# When they are no new tasks to handle, we put the thread to sleep.
self.state = "idle"
time.sleep(0.1)
def _emit_status(self, task_id, status, error=None):
"""Emit status update via Socket.IO if available"""
if not self.socketio:
return
room = f"task_{task_id}"
data = {
"task_id": task_id,
"status": status,
"position": None, # Task no longer in queue
}
if error:
data["error"] = error
self.socketio.emit("task_status", data, room=room)
else:
# When they are no new tasks to handle, we put the thread to sleep.
self.state = "idle"
time.sleep(0.1)
def stop_worker(self):
"""
@@ -104,4 +143,5 @@ class PrintWorker(threading.Thread):
"is_running": self.running,
"queue_size": len(self.print_queue),
"state": self.state,
"printers": len(self.printers_obj),
}