WIP: Acces-libre : Raspberry Pi integration #8

Draft
n07070 wants to merge 14 commits from acces-libre into master
15 changed files with 824 additions and 245 deletions

1
.gitignore vendored
View File

@ -2,6 +2,7 @@
__pycache__/ __pycache__/
*.py[cod] *.py[cod]
*$py.class *$py.class
CACHEDIR.TAG
# C extensions # C extensions
*.so *.so

View File

@ -20,7 +20,7 @@ To make this project work, you will need :
- Some knowledge of the command line, - Some knowledge of the command line,
- Some knowledge of Python. - Some knowledge of Python.
- 3h of your time, 5h if things need debugging. - 3h of your time, 5h if things need debugging.
- `git`, `virtualenv`,`pip` and `python` >= 3.8.6. - `fswebcam`, `libjpeg-dev` ,`zlib1g-dev`,`libffi-dev`,`git`, `virtualenv`,`pip` and `python` >= 3.8.6.
- A webcam for the webcam page to work. Will work on a smartphone. Not required. - A webcam for the webcam page to work. Will work on a smartphone. Not required.
## Context ## Context

View File

@ -6,6 +6,13 @@ vendor_id = 0x04b8
device_id = 0x0e28 device_id = 0x0e28
upload_folder = "src/static/uploads" upload_folder = "src/static/uploads"
# Raspberry Pi Configuration
[rpi]
button_gpio_port_number = 2
indicator_gpio_port_number = 22
flash_gpio_port_number = 16
flash = true
# Users = Password # Users = Password
[users] [users]
admin = "admin" admin = "admin"

View File

@ -5,6 +5,12 @@
vendor_id = "0x04b8" vendor_id = "0x04b8"
device_id = "0x0e28" device_id = "0x0e28"
# Raspberry Pi Configuration
[rpi]
button_gpio_port_number = 17
indicator_gpio_port_number = 18
flash = true
# Users = Password # Users = Password
[users] [users]
admin = "admin" admin = "admin"

12
littleprynter.service Normal file
View File

@ -0,0 +1,12 @@
[Unit]
Description=LittlePrynter
After=network.target
[Service]
Type=simple
WorkingDirectory=/home/pi/littleprynter/
Environment=FLASK_APP=src/main.py
ExecStart=/home/pi/littleprynter/bin/flask run --host 0.0.0.0 --debug --no-reload
[Install]
WantedBy=multi-user.target

View File

@ -1,36 +1,261 @@
Adafruit_Thermal==1.1.0
appdirs==1.4.4 appdirs==1.4.4
argcomplete==2.0.6 apt-listchanges==4.8
argcomplete==3.6.2
arrow==1.3.0
attrs==25.3.0
babel==2.17.0
bcrypt==4.2.0
bidict==0.23.1
blinker==1.9.0
certifi==2025.1.31
cffi==1.17.1 cffi==1.17.1
click==8.1.8 chardet==5.2.0
commonmark==0.9.1 charset-normalizer==3.4.2
cryptography==45.0.4 click==8.3.0
cloud-init==25.2
colorzero==2.0
configobj==5.0.9
cryptography==43.0.0
Deprecated==1.2.18 Deprecated==1.2.18
distlib==0.3.9
distro==1.9.0
escpos==2.0.0 escpos==2.0.0
Flask==2.1.3 filelock==3.18.0
Flask-Limiter==2.4.5.1 Flask==3.1.2
future==0.18.3 Flask-Limiter==4.0.0
itsdangerous==2.1.2 Flask-SocketIO==5.5.1
fqdn==1.5.1
future==1.0.0
gpiod==2.2.0
gpiozero==2.0.1
h11==0.16.0
idna==3.10
isoduration==20.11.0
itsdangerous==2.2.0
Jinja2==3.1.6 Jinja2==3.1.6
limits==2.6.3 jsonpatch==1.32
jsonpointer==2.4
jsonschema==4.19.2
jsonschema-specifications==2023.12.1
lgpio==0.2.2.0
limits==5.6.0
linkify-it-py==2.0.3
markdown-it-py==3.0.0
MarkupSafe==2.1.5 MarkupSafe==2.1.5
numpy==2.3.0 mdurl==0.1.2
packaging==21.3 numpy==2.3.4
pillow==11.2.1 oauthlib==3.2.2
olefile==0.47
ordered-set==4.1.0
packaging==25.0
pillow==11.1.0
platformdirs==4.3.7
ply==3.11
pycparser==2.22 pycparser==2.22
Pygments==2.12.0 pycryptodomex==3.20.0
pyparsing==3.0.9 Pygments==2.18.0
PyJWT==2.10.1
pyserial==3.5 pyserial==3.5
python-barcode==0.13.1 python-apt==3.0.0
pyusb==1.2.1 python-barcode==0.16.1
python-dateutil==2.9.0
python-engineio==4.12.3
python-socketio==5.14.1
pyusb==1.3.1
PyYAML==6.0.2 PyYAML==6.0.2
qrcode==7.3.1 qrcode==8.2
rich==12.4.4 referencing==0.36.2
requests==2.32.3
rfc3339-validator==0.1.4
rfc3986-validator==0.1.1
rfc3987==1.3.8
rich==13.9.4
rpds-py==0.21.0
rpi-keyboard-config==1.0
rpi-lgpio==0.6
setuptools==80.9.0 setuptools==80.9.0
six==1.16.0 simple-websocket==1.1.0
six==1.17.0
smbus2==0.4.3
spidev==3.6
ssh-import-id==5.10
toml==0.10.2 toml==0.10.2
typing_extensions==4.2.0 types-aiofiles==24.1
Unidecode==1.3.8 types-antlr4-python3-runtime==4.13
types-assertpy==1.1
types-atheris==2.3
types-aws-xray-sdk==2.14
types-beautifulsoup4==4.12
types-bleach==6.2
types-boltons==24.1
types-braintree==4.31
types-cachetools==5.5
types-caldav==1.3
types-capturer==3.0
types-cffi==1.16
types-chevron==0.14
types-click-default-group==1.2
types-click-spinner==0.1
types-colorama==0.4
types-commonmark==0.9
types-console-menu==0.8
types-corus==0.10
types-croniter==5.0.1
types-dateparser==1.2
types-decorator==5.1
types-defusedxml==0.7
types-Deprecated==1.2.15
types-docker==7.1
types-dockerfile-parse==2.0
types-docutils==0.21
types-editdistance==0.8
types-entrypoints==0.4
types-ExifRead==3.0
types-fanstatic==1.4
types-first==2.0
types-flake8==7.1
types-flake8-bugbear==24.12.12
types-flake8-builtins==2.5
types-flake8-docstrings==1.7
types-flake8-rst-docstrings==0.3
types-flake8-simplify==0.21
types-flake8-typing-imports==1.16
types-Flask-Cors==5.0
types-Flask-Migrate==4.0
types-Flask-SocketIO==5.4
types-fpdf2==2.8.2
types-gdb==15.0
types-gevent==24.11
types-google-cloud-ndb==2.3
types-greenlet==3.1
types-hdbcli==2.23
types-html5lib==1.1
types-httplib2==0.22
types-humanfriendly==10.0
types-hvac==2.3
types-ibm-db==3.2.4
types-icalendar==6.1
types-influxdb-client==1.45
types-inifile==0.4
types-JACK-Client==0.5
types-Jetson.GPIO==2.1
types-jmespath==1.0
types-jsonschema==4.23
types-jwcrypto==1.5
types-keyboard==0.13
types-ldap3==2.9
types-libsass==0.23
types-lupa==2.2
types-lzstring==1.0
types-m3u8==6.0
types-Markdown==3.7
types-mock==5.1
types-mypy-extensions==1.0
types-mysqlclient==2.2
types-nanoid==2.0.0
types-netaddr==1.3
types-netifaces==0.11
types-networkx==3.4.2
types-oauthlib==3.2
types-objgraph==3.6
types-olefile==0.47
types-openpyxl==3.1.5
types-opentracing==2.4
types-paramiko==3.5
types-parsimonious==0.10
types-passlib==1.7
types-passpy==1.0
types-peewee==3.17.8
types-pep8-naming==0.14
types-pexpect==4.9
types-pika-ts==1.3
types-polib==1.2
types-portpicker==1.6
types-protobuf==5.29.1
types-psutil==6.1
types-psycopg2==2.9.10
types-pyasn1==0.6
types-pyaudio==0.2
types-PyAutoGUI==0.9
types-pycocotools==2.0
types-pycurl==7.45.4
types-pyfarmhash==0.4
types-pyflakes==3.2
types-pygit2==1.15
types-Pygments==2.18
types-pyinstaller==6.11
types-pyjks==20.0
types-PyMySQL==1.1
types-pynput==1.7.7
types-pyOpenSSL==24.1
types-pyRFC3339==2.0.1
types-PyScreeze==1.0.1
types-pyserial==3.5
types-pysftp==0.2
types-pytest-lazy-fixture==0.6
types-python-crontab==3.2
types-python-datemath==3.0.1
types-python-dateutil==2.9
types-python-http-client==3.3.7
types-python-jenkins==1.8
types-python-jose==3.3
types-python-nmap==0.7
types-python-xlib==0.33
types-pytz==2024.2
types-pywin32==308
types-pyxdg==0.28
types-PyYAML==6.0
types-qrbill==1.1
types-qrcode==8.0
types-regex==2024.11.6
types-reportlab==4.2.5
types-requests==2.32
types-requests-oauthlib==2.0
types-retry==0.9
types-RPi.GPIO==0.7
types-s2clientprotocol==5
types-seaborn==0.13.2
types-Send2Trash==1.8
types-setuptools==75.6
types-shapely==2.0
types-simplejson==3.19
types-singledispatch==4.1
types-six==1.17
types-slumber==0.7
types-str2bool==1.1
types-tabulate==0.9
types-tensorflow==2.18.0
types-TgCrypto==1.2
types-toml==0.10
types-toposort==1.10
types-tqdm==4.67
types-translationstring==1.4
types-tree-sitter-languages==1.10
types-ttkthemes==3.2
types-ujson==5.10
types-unidiff==0.7
types-untangle==1.2
types-usersettings==1.1
types-uWSGI==2.0
types-vobject==0.9.9
types-waitress==3.0.1
types-WebOb==1.8
types-whatthepatch==1.0
types-workalendar==17.0
types-WTForms==3.2.1
types-wurlitzer==3.1
types-xdgenvpy==3.0
types-xmltodict==0.14
types-zstd==1.5
types-zxcvbn==4.4
typing_extensions==4.15.0
uc-micro-py==1.0.3
uritemplate==4.1.1
urllib3==2.3.0
viivakoodi==0.8.0 viivakoodi==0.8.0
Werkzeug==2.1.2 virtualenv==20.31.2
wrapt==1.14.1 webcolors==1.13
Werkzeug==3.1.3
wheel==0.46.1
wrapt==1.17.3
wsproto==1.2.0

View File

@ -6,11 +6,17 @@
# Then we build the API around Flask, # Then we build the API around Flask,
# Then we build the web interface, using the simple Jinja2 templating. # Then we build the web interface, using the simple Jinja2 templating.
# We support two modes :
# The first is a simple mode, where a computer, connected to a thermal printer, runs this program and exposes a web interface that makes use of the client's camera
# The seconde is booth mode, where a Raspberry Pi is connected to a thermal printer, a button and a flash. The web interface exists but may not be used, as the press of the button with take a picture and activate the flash while simply informing the web page.
# Following are the librairies we import, # Following are the librairies we import,
from flask import Flask, request, render_template, flash, abort, redirect, url_for, make_response, jsonify # Used for the web framework from flask import Flask, request, render_template, flash, abort, redirect, url_for, make_response, jsonify # Used for the web framework
from flask_socketio import SocketIO
from flask_limiter import Limiter from flask_limiter import Limiter
from flask_limiter.util import get_remote_address from flask_limiter.util import get_remote_address
from printer import Printer # The wrapper for the printer class from printer import Printer # The wrapper for the printer class
from raspberry import Raspberry # The Raspberry pi control Class
from web import Web # Wrapper for the web routes and API from web import Web # Wrapper for the web routes and API
import toml # Used for the config file parsing import toml # Used for the config file parsing
import pprint # To pretty print JSON import pprint # To pretty print JSON
@ -19,6 +25,8 @@ import os # For VARS from the shell.
# Variables # Variables
app = Flask(__name__) app = Flask(__name__)
socketio = SocketIO(app)
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
# Load the configuration file # Load the configuration file
try: try:
@ -35,6 +43,7 @@ except Exception as e:
exit(-1) exit(-1)
app.logger.debug("Config file loaded !") app.logger.debug("Config file loaded !")
# Define the USB connections here. # Define the USB connections here.
vendor_id = configuration_file["printer"]["vendor_id"] vendor_id = configuration_file["printer"]["vendor_id"]
device_id = configuration_file["printer"]["device_id"] device_id = configuration_file["printer"]["device_id"]
@ -44,13 +53,12 @@ try:
os.mkdir(UPLOAD_FOLDER) os.mkdir(UPLOAD_FOLDER)
app.logger.debug(f"Directory '{UPLOAD_FOLDER}' created successfully.") app.logger.debug(f"Directory '{UPLOAD_FOLDER}' created successfully.")
except FileExistsError: except FileExistsError:
app.logger.error(f"Directory '{UPLOAD_FOLDER}' already exists.") app.logger.debug(f"Directory '{UPLOAD_FOLDER}' already exists.")
except PermissionError: except PermissionError:
app.logger.error(f"Permission denied: Unable to create '{UPLOAD_FOLDER}'.") app.logger.error(f"Permission denied: Unable to create '{UPLOAD_FOLDER}'.")
except Exception as e: except Exception as e:
app.logger.error(f"An error occurred: {e}") app.logger.error(f"An error occurred: {e}")
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
# Output the config file # Output the config file
if os.getenv('LIPY_DEBUG') == True: if os.getenv('LIPY_DEBUG') == True:
@ -68,16 +76,30 @@ app.config['TEMPLATES_AUTO_RELOAD'] = True
printer = Printer(app,0x04b8, 0x0e28) printer = Printer(app,0x04b8, 0x0e28)
printer.init_printer() printer.init_printer()
# Web routes # Find out if we are running on a Raspberry Pi
rpi = Raspberry(printer,
app,
socketio,
configuration_file['rpi']['button_gpio_port_number'], configuration_file['rpi']['indicator_gpio_port_number'],
configuration_file['rpi']['flash_gpio_port_number'],
configuration_file['rpi']['flash'] )
RASPBERRY_PI_CONNECTED = rpi.is_raspberry_pi()
#############################################################
# Web & API routes
#############################################################
web = Web(app, printer) web = Web(app, printer)
if __name__ == "__main__": if __name__ == "__main__":
app.run(ssl_context='adhoc') app.run(debug=True, use_reloader=False, host='0.0.0.0', ssl_context='adhoc')
limiter = Limiter( limiter = Limiter(
app, get_remote_address,
key_func=get_remote_address, app=app,
default_limits=["1500 per day", "500 per hour"] default_limits=["1500 per day", "500 per hour"]
) )
@ -116,16 +138,12 @@ def api_print_sms():
txt = request.form["txt"] txt = request.form["txt"]
sign = request.form["signature"] sign = request.form["signature"]
except Exception as e: except Exception as e:
app.logger.error(str(e) + " - Whoops, no forms submitted or missing signature.") app.logger.error("Whoops, no forms submitted or missing signature :" + str(e))
return jsonify({'message': 'Error getting the information from the form :' + e}), 500 flash("Whoops, no forms submitted or missing signature : " + str(e))
return redirect(url_for("index"))
try:
web.print_sms(txt,sign) web.print_sms(txt,sign)
except Exception as e: return redirect(url_for("index"))
return jsonify({'message': 'Error printing the SMS:' + e}), 500
return jsonify({'message': 'Message printed'}), 200
@app.route('/api/print/img', methods=['POST']) @app.route('/api/print/img', methods=['POST'])
@limiter.limit("6/minute", override_defaults=False) @limiter.limit("6/minute", override_defaults=False)
@ -135,40 +153,61 @@ def api_print_image():
try: try:
sign = request.form["signature"] sign = request.form["signature"]
except Exception as e: except Exception as e:
app.logger.error(str(e) + " - Whoops, no forms submitted or missing signature.") app.logger.error("Whoops, no forms submitted or missing signature :" + str(e))
return jsonify({'message': 'Error getting the information from the form :' + e}), 500 flash("Whoops, no forms submitted or missing signature : " + str(e))
return redirect(url_for("index"))
if request.method == 'POST': if request.method == 'POST':
# check if the post request has the file part # check if the post request has the file part
try: try:
if 'img' not in request.files: if 'img' not in request.files:
app.logger.error("No file found. Did you use the good form ?") app.logger.error("Whoops, no images submitted :" + str(e))
return jsonify({'message': 'No file found. Did you use the good form ?'}), 500 flash("Whoops, no images submitted : " + str(e))
else: else:
file = request.files['img'] file = request.files['img']
except Exception as e: except Exception as e:
app.logger.error('Error getting the files :' + e) app.logger.error('Error getting the files :' + str(e))
return jsonify({'message': 'Error getting the files :' + e}), 500 flash('Error getting the files :' + str(e))
return redirect(url_for("index"))
# If the user does not select a file, the browser submits an # If the user does not select a file, the browser submits an
# empty file without a filename. # empty file without a filename.
if file.filename == '': if file.filename == '':
app.logger.error("Submitted file has no filename !") app.logger.error("Submitted file has no filename !")
return jsonify({'message': "Submitted file has no filename !" + e}), 500 flash("Submitted file has no filename !")
return redirect(url_for("index"))
try: try:
app.logger.debug("Sending the image to the printer.") app.logger.debug("Sending the image to the printer.")
web.print_image(file, sign) web.print_image(file, sign)
except Exception as e: except Exception as e:
app.logger.error("The image could not be printed : " + e ) app.logger.error("The image could not be printed because : " + str(e) )
return jsonify({'message': "The image could not be printed :" + e}), 500 flash("The image could not be printed because : " + str(e))
return redirect(url_for("index"))
else: else:
return jsonify({'message': "Method Not Allowed, please POST"}), 403 app.logger.error("Method not allowed")
app.logger.debug('Bad access type to this API, please POST') flash("Method not allowed")
return redirect(url_for("index"))
flash('Picture printed ! '),
return redirect(url_for("index"))
@app.route('/api/camera/picture')
def camera_picture():
# Returns a picture taken by the camera
if RASPBERRY_PI_CONNECTED:
try:
return rpi.camera_picture()
except Exception as e:
return jsonify({'message': 'Error getting the stream : ' + e}), 500
else:
return jsonify({'message': 'No camera present'}), 500
return jsonify({'message': 'Message printed'}), 200
@app.route('/login') @app.route('/login')
@ -186,7 +225,7 @@ def logout_page():
@app.errorhandler(429) @app.errorhandler(429)
def ratelimit_handler(e): def ratelimit_handler(e):
flash("Rate limit reached, please slow down :) ( Currently at "+ e.description + ")", 'error') flash("Rate limit reached, please slow down :) ( Currently at "+ e.description + ")", 'error')
app.logger.debug('Rate limit reached ' + e) app.logger.debug('Rate limit reached ' + str(e.description))
return redirect(url_for("index")) return redirect(url_for("index"))
@app.route("/ping") @app.route("/ping")
@ -195,3 +234,16 @@ def ping():
flash("🏓 Pong !",'info') flash("🏓 Pong !",'info')
app.logger.debug('🏓 Pong !') app.logger.debug('🏓 Pong !')
return redirect(url_for("index")) return redirect(url_for("index"))
@socketio.on('ping')
def handle_message(data):
app.logger.debug('Received : ' + str(data))
socketio.emit('pong',"Pong !")
@socketio.on('get_camera_status')
def camera_status():
app.logger.debug('Client asked if we had a camera')
if RASPBERRY_PI_CONNECTED:
socketio.emit("camera_status", True)
else:
socketio.emit("camera_status", False)

View File

@ -1,5 +1,4 @@
# Importing the module to mage the connection to the printer. # Importing the module to mage the connection to the printer.
from flask import flash
from escpos.printer import Usb, USBNotFoundError from escpos.printer import Usb, USBNotFoundError
from time import sleep, gmtime, strftime from time import sleep, gmtime, strftime
import os.path import os.path
@ -46,18 +45,14 @@ class Printer(object):
match status: match status:
case 0: case 0:
self.app.logger.error('Printer has no more paper, aborting...') self.app.logger.error('Printer has no more paper, aborting...')
flash("No more paper on the printer. Sorry.",category='error')
self.printer.close() self.printer.close()
return False raise Exception("No more paper in the printer")
case 1: case 1:
self.app.logger.warning('Printer needs paper to be changed very soon ! ') self.app.logger.warning('Printer needs paper to be changed very soon ! ')
flash('Printer needs paper to be changed very soon ! ', category='info')
self.printer.close() self.printer.close()
return True
case 2: case 2:
self.app.logger.debug('Printer has paper, good to go') self.app.logger.debug('Printer has paper, good to go')
self.printer.close() self.printer.close()
return True
def init_printer(self): def init_printer(self):
@ -99,85 +94,107 @@ class Printer(object):
self.ready = True; self.ready = True;
self.printer.close(); self.printer.close();
if not self.check_paper(): self.check_paper()
return False
return True return True
def print_sms(self, msg, signature) -> bool: def print_sms(self, msg, signature="",bold=False):
clean_msg = str(msg) clean_msg = str(msg) + "\n"
clean_signature = str(signature) clean_signature = str(signature)
if not self.check_paper(): if len(clean_msg) > 4096:
return False
if len(clean_msg) > 256 or len(clean_msg) < 3 :
self.app.logger.warning("Could not print message of this length: " + str(len(clean_msg))) self.app.logger.warning("Could not print message of this length: " + str(len(clean_msg)))
flash("Could not print message of this length :" + str(len(clean_msg)) + ", needs to between 3 and 256 caracters long.",category='error') raise Exception("Could not print message of this length :" + str(len(clean_msg)) + ", needs to be below 4096 caracters long.")
return False
if len(signature) > 256 or len(signature) < 3: if len(signature) > 256:
self.app.logger.warning("Could not print message without a signature.") self.app.logger.warning("Could not print signature of this length: " + str(len(clean_signature)))
flash("Could not print message without a signature.",category='error') raise Exception("Could not print signature of this length :" + str(len(clean_signature)) + ", needs to be below 256 caracters long.")
return False
if not os.getenv('LIPY_DEBUG') == True:
try: try:
self.printer.open(self.usb_args); self.printer.open(self.usb_args);
self.printer.set(align='left', font='a', bold=False, underline=0, width=1, height=1, density=8, invert=False, smooth=True, flip=False, double_width=False, double_height=False, custom_size=False) self.printer.set(align='center', font='a', bold=bold)
self.printer.textln(clean_msg ) self.printer.textln(clean_msg )
self.printer.set(align='left', font='b', bold=False, underline=1, width=1, height=1, density=9, invert=False, smooth=True, flip=False, double_width=False, double_height=False, custom_size=False) if clean_signature:
self.printer.textln("> " + clean_signature + " @ " + strftime("%Y-%m-%d %H:%M:%S", gmtime())) self.printer.textln(clean_signature )
self.printer.cut() self.printer.close()
self.printer.close
except Exception as e: except Exception as e:
flash("Unable to print because : " + e) self.app.logger.error("Unable to print because : " + str(e))
flash("Message printed : " + clean_msg ,category='info') self.app.logger.info("Printed text")
return True return True
def print_img(self, path, sign): def print_img(self, path, sign="",center=True,process=False):
clean_signature = str(sign) clean_signature = str(sign)
if len(clean_signature) > 256 or len(clean_signature) < 3: if len(sign) > 256:
self.app.logger.warning("Could not print message without a signature.") self.app.logger.warning("Could not print signature of this length: " + str(len(clean_signature)))
flash("Could not print message without a signature.",category='error') raise Exception("Could not print signature of this length :" + str(len(clean_signature)) + ", needs to be below 256 caracters long.")
return False
if not os.path.isfile(str(path)): if not os.path.isfile(str(path)):
self.app.logger.warning("File does not exist : " + str(path)) self.app.logger.warning("File does not exist : " + str(path))
flash('The file path for this image :' + str(path) + " wasn't found. Please try again.", 'error') raise Exception('The file path for this image :' + str(path) + " wasn't found. Please try again.")
return False
else: else:
self.app.logger.debug("Printing file from " + str(path)) self.app.logger.debug("Printing file from " + str(path))
if process:
try: try:
self.app.logger.debug("Proccessing the image") self.app.logger.debug("Proccessing the image")
path = process_image(self, path) path = process_image(self, path)
except Exception as e: except Exception as e:
flash(str(e))
self.app.logger.error(str(e)) self.app.logger.error(str(e))
return False
else:
self.app.logger.warning("Not proccessing the image")
try: try:
self.printer.open(self.usb_args) self.printer.open(self.usb_args)
self.printer.textln("> " + clean_signature + " @ " + strftime("%Y-%m-%d %H:%M:%S", gmtime())) self.printer.image(path,center=center)
self.printer.image(path)
self.printer.cut()
self.printer.close() self.printer.close()
self.app.logger.debug("Printed an image : " + str(path)) self.app.logger.debug("Printed an image : " + str(path))
os.remove(path) os.remove(path)
self.app.logger.debug("Removed image.") self.app.logger.debug("Removed image : " + str(path))
return True
except Exception as e: except Exception as e:
self.printer.close() self.printer.close()
flash(str(e),'error') self.app.logger.error(str(e))
return False return False
self.app.logger.info("Printed a picture")
return True
def qr(self, content):
try:
self.printer.open(self.usb_args)
self.printer.qr(content, center=True)
self.printer.close()
except Exception as e:
self.printer.close()
self.app.logger.error(str(e))
return False
self.app.logger.info("Printed a QR")
return True
def cut(self):
try:
self.printer.open(self.usb_args)
self.printer.cut()
self.printer.close()
except Exception as e:
self.printer.close()
self.app.logger.error(str(e))
raise e
self.app.logger.info("Did a cut")
return True
def process_image(self, path): def process_image(self, path):
brightness_factor = 1.5 # Used only if image is too dark brightness_factor = 1.5 # Used only if image is too dark
brightness_threshold = 150 # Brightness threshold (0255) brightness_threshold = 100 # Brightness threshold (0255)
contrast_factor = 1.2 # Less than 1.0 = lower contrast contrast_factor = 0.6 # Less than 1.0 = lower contrast
max_width = 575 max_width = 575
max_height = 1000 max_height = 1000
@ -191,9 +208,9 @@ def process_image(self, path):
original_img.thumbnail((max_width, max_height), Image.LANCZOS) original_img.thumbnail((max_width, max_height), Image.LANCZOS)
self.app.logger.debug("Resized the image") self.app.logger.debug("Resized the image")
# Convert to grayscale for dithering # # Convert to grayscale for dithering
dithered_img = original_img.convert("L").convert("1") # Dithering using default method (FloydSteinberg) # dithered_img = original_img.convert("L").convert("1") # Dithering using default method (FloydSteinberg)
self.app.logger.debug("Dithered the image") # self.app.logger.debug("Dithered the image")
# Compute brightness of original image (grayscale average) # Compute brightness of original image (grayscale average)
grayscale = original_img.convert("L") grayscale = original_img.convert("L")
@ -208,13 +225,13 @@ def process_image(self, path):
enhancer = ImageEnhance.Brightness(original_img) enhancer = ImageEnhance.Brightness(original_img)
original_img = enhancer.enhance(brightness_factor) original_img = enhancer.enhance(brightness_factor)
# Reduce contrast # # Reduce contrast
contrast_enhancer = ImageEnhance.Contrast(original_img) # contrast_enhancer = ImageEnhance.Contrast(original_img)
original_img = contrast_enhancer.enhance(contrast_factor) # original_img = contrast_enhancer.enhance(contrast_factor)
# Final resize check # Final resize check
if original_img.height > max_height: if original_img.height > max_height:
flash("Image is too long, sorry! Keep it below 575×1000 pixels.", 'error') raise ValueError("Image is too long, sorry! Keep it below 575×1000 pixels.")
self.app.logger.error("Image is too long, sorry! Keep it below 575×1000 pixels.") self.app.logger.error("Image is too long, sorry! Keep it below 575×1000 pixels.")
return False return False

230
src/raspberry.py Normal file
View File

@ -0,0 +1,230 @@
from flask_socketio import SocketIO
from gpiozero import Button, LED, DigitalOutputDevice
from time import sleep, gmtime, strftime
from PIL import Image
import io # To check if we are on a Raspberry Pi
import subprocess
import os
class Raspberry(object):
"""
This class will manage three things :
- Connecting to a USB webcam
- Managing a push button
- Activating a flash ( or light )
- Flash an indicator light
"""
def __init__(self, printer, app, socketio, button_gpio_port_number, indicator_gpio_port_number, flash_gpio_port_number, is_flash_present,):
self.printer = printer
self.socketio = socketio
self.app = app
self.flash_gpio = flash_gpio_port_number
self.is_flash_present = is_flash_present
self.button_gpio = button_gpio_port_number
self.led_gpio = indicator_gpio_port_number
self.image_path = self.app.config['UPLOAD_FOLDER'] + '/image.jpg'
def is_raspberry_pi(self, raise_on_errors=False):
# Check if we are running on a raspberry pi
try:
with io.open('/proc/cpuinfo', 'r') as cpuinfo:
found = False
for line in cpuinfo:
if line.startswith('Hardware'):
found = True
label, value = line.strip().split(':', 1)
value = value.strip()
if value not in (
'BCM2708',
'BCM2709',
'BCM2711',
'BCM2835',
'BCM2836'
):
self.app.logger.debug('This system does not appear to be a Raspberry Pi.')
return False
if not found:
self.app.logger.error('Couldn\'t get sufficient hardware information from /proc/cpuinfo, Unable to determine if we are on a Raspberry Pi.')
return False
except IOError:
self.app.logger.error('Unable to open `/proc/cpuinfo`.')
return False
self.app.logger.debug('It seems we are on a Raspberry Pi')
try:
self.initialise_gpio()
except Exception as e:
self.app.logger.debug('Could not init GPIO : ' + str(e))
raise e
return True
def initialise_gpio(self):
self.app.logger.debug('Initializing GPIO')
self.led = LED(self.led_gpio)
self.app.logger.debug('Activated indicator LED')
self.indicator_countdown(iters=3)
self.button = Button(self.button_gpio, pull_up=True, bounce_time=0.1)
self.button.when_pressed = self.on_button_pressed
self.app.logger.debug('Activated button')
# The "flash" is a relay-controlled device ( light bulb for example )
self.flash = DigitalOutputDevice(self.flash_gpio)
self.flash_toggle()
self.app.logger.debug('Activated flash')
def indicator_countdown(self,iters=10,multi=10):
for i in range(iters,0,-1):
self.led.on()
sleep(i/multi)
self.led.off()
sleep(i/multi)
def indicator_led(self,timing=0.2,l=5):
for i in range(l):
self.app.logger.debug("LED turned on")
self.led.on()
sleep(timing)
self.led.off()
self.app.logger.debug("LED turned off")
sleep(timing)
def flash_toggle(self):
self.app.logger.debug("Flash turned on")
self.flash.on()
sleep(0.3)
self.flash.off()
self.app.logger.debug("Flash turned off")
def take_picture(self):
# Validate if the image path is valid
if not os.path.isdir(os.path.dirname(self.image_path)):
self.app.logger.error(f"Invalid directory for image path: {self.image_path}")
return False
try:
result = subprocess.run(
['fswebcam', '--no-banner', '-r', '1920x1080', self.image_path],
check=True, # Will raise CalledProcessError if the command fails
stdout=subprocess.PIPE, # Capture standard output
stderr=subprocess.PIPE # Capture error output
)
# Optionally log the command output
self.app.logger.debug(f"Image captured successfully: {result.stdout.decode()}")
except subprocess.CalledProcessError as e:
# Log error output if available
self.app.logger.error(f"Unable to take a picture. Error: {e.stderr.decode()}")
return False
except Exception as e:
# Catch any unexpected errors
self.app.logger.error(f"Unexpected error while taking picture: {str(e)}")
return False
# # Overlay logo
# logo_path = 'src/static/images/requin.png' # Update path as needed
# if not self.overlay_logo(self.image_path, logo_path):
# self.app.logger.warning("Picture taken but failed to overlay logo.")
return True
def overlay_logo(self, image_path, logo_path, output_path=None, position='bottom_right', margin=10):
try:
image = Image.open(image_path).convert("RGBA")
logo = Image.open(logo_path).convert("RGBA")
# Resize logo if it's too big (logo will be 30% the width of the image)
logo_ratio = 0.30 # You can change the ratio if you want the logo bigger or smaller
logo_width = int(image.width * logo_ratio)
logo_height = int(logo.height * (logo_width / logo.width))
logo = logo.resize((logo_width, logo_height), Image.Resampling.LANCZOS)
# Calculate position based on the chosen location
if position == 'bottom_right':
x = image.width - logo.width - margin
y = image.height - logo.height - margin
elif position == 'top_left':
x, y = margin, margin
elif position == 'top_right':
x = image.width - logo.width - margin
y = margin
elif position == 'bottom_left':
x = margin
y = image.height - logo.height - margin
else:
raise ValueError("Invalid position. Choose from 'bottom_right', 'top_left', 'top_right', or 'bottom_left'.")
# Composite the logo onto the image
image.paste(logo, (x, y), logo) # Use logo as its own alpha mask
# Save the result
if not output_path:
output_path = image_path # Overwrite the original image if no output path is given
image.save(output_path)
except Exception as e:
self.app.logger.error(f"Error overlaying logo: {e}")
return False
return True
def crop_to_square(self, image_path, output_path=None):
try:
image = Image.open(image_path)
width, height = image.size
# Determine shorter side
new_edge = min(width, height)
# Calculate cropping box (centered)
left = (width - new_edge) // 2
top = (height - new_edge) // 2
right = left + new_edge
bottom = top + new_edge
image = image.crop((left, top, right, bottom))
if not output_path:
output_path = image_path # Overwrite original
image.save(output_path)
except Exception as e:
self.app.logger.error(f"Error cropping image to square: {e}")
return False
return True
def on_button_pressed(self):
self.app.logger.debug("Button has been pressed")
self.led.on()
self.app.logger.debug("Counting down")
self.indicator_countdown(iters=4,multi=20) # The indicator will flash a countdown LED
self.app.logger.debug("Taking picture")
try:
self.flash.on()
self.take_picture()
except Exception as e:
self.app.logger.error("Could not take a picture after the button press : " + str(e))
finally:
self.flash.off()
self.app.logger.debug("Printing picture")
self.led.on()
self.crop_to_square(self.image_path)
self.printer.print_img('src/static/images/extase-club.png',process=True)
self.printer.print_img(self.image_path,process=True)
self.printer.print_sms("")
self.printer.print_sms("With Love From Société.Vide",signature="",bold=True)
self.printer.print_sms("Printed by LittlePrynter",signature="")
self.printer.print_sms("n07070.xyz",signature="")
self.printer.print_sms(strftime("%Y-%m-%d %H:%M", gmtime()),signature="")
self.printer.qr("https://n07070.xyz/articles/littleprynter")
self.printer.cut()
self.led.off()
self.app.logger.debug("Done printing picture")
return True

Binary file not shown.

After

Width:  |  Height:  |  Size: 61 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

View File

@ -1,39 +1,159 @@
let streaming; let streaming;
var current_stream; var current_stream;
var current_camera_is; var current_camera_is;
var supports_facing_mode;
var camera_options;
var width = document.getElementById("video").parentNode.parentElement.clientWidth; var width = document.getElementById("video").parentNode.parentElement.clientWidth;
var height = width / (4 / 3); var height = width / (4 / 3);
var socket = io();
function startup(){ socket.on('connect', function() {
socket.emit('ping', {data: 'I\'m connected!'});
console.log("Sent a ping to the server :)");
});
socket.on('pong', () => {
console.log('Received pong back ! ');
});
socket.on('new_image', () => {
console.log("Received new image event");
const img = document.getElementById('snapshot');
img.src = '/image?' + new Date().getTime(); // Bust cache
});
async function startup(){
video = document.getElementById('video'); video = document.getElementById('video');
canvas = document.getElementById('canvas'); canvas = document.getElementById('canvas');
photo = document.getElementById('photo'); photo = document.getElementById('photo');
switch_cameras = document.getElementById('flip') switch_cameras = document.getElementById('flip')
printButton = document.getElementById('print_button'); printButton = document.getElementById('print_button');
if (check_webcam() === true ){ console.log("Checking for client webcam capabilities");
// We have a camera_options dictionnary in return, or false if the device does not support webcams.
let client_webcam_capabilities = await check_webcam_capabilies();
if ( client_webcam_capabilities != false ){
get_webcam(client_webcam_capabilities);
setup_events(); setup_events();
clear_canvas(); clear_canvas();
} else { } else {
console.log("Checking for server webcam capabilities");
let server_webcam_capabilities = await check_server_camera();
if (server_webcam_capabilities === true ){
console.log("The server has a camera, using it.");
console.log("TODO");
}
else {
no_webcam_error(); no_webcam_error();
console.log("Seems like it's impossible to get a webcam."); console.log("Seems like it's impossible to get a webcam from the client nor the server.");
}
} }
} }
function check_webcam(){ async function check_webcam_capabilies(){
console.log("Cheking for a camera..."); console.log("Checking for a the capabilities of the client's media devices");
if (get_front_webcam()) {
console.log("Got front camera !");
return true;
}
if (get_any_webcam()) { // We try to start with the front facing camera,
console.log("Got a webcam !"); // if we have no support, we switch back to a normal camera.
return true; try {
} // We first check if the navigator has a media device
console.log("Nope"); if (!navigator.mediaDevices?.enumerateDevices) {
console.log("enumerateDevices() not supported.");
return false; return false;
} else {
// List cameras and microphones.
console.log("The device has the following media devices : ")
await navigator.mediaDevices
.enumerateDevices()
.then((devices) => {
devices.forEach((device) => {
console.log(`${device.kind}: ${device.label} id = ${device.deviceId}`);
});
})
.catch((err) => {
console.error(`${err.name}: ${err.message}`);
return false;
});
}
} catch (e) {
console.log("The device does not seem to support webcams " + e);
return false;
}
console.log("Checking for the supported constraints of the media devices ");
try {
const supports = navigator.mediaDevices.getSupportedConstraints();
if (!supports['facingMode']) {
throw new Error("This browser does not support facingMode!");
} else {
console.log("The device supports facing mode, selecting the front camera by default");
supports_facing_mode = true;
camera_options = {
video: {
facingMode: 'user', // Or 'environment' if we want a camera facing away
},
audio: false
};
return camera_options;
}
} catch (e) {
console.log("Resetting to default camera : " + e);
supports_facing_mode = false;
camera_options = {
video: true,
audio: false
};
return camera_options
}
}
async function get_webcam(options){
stop_video_streams();
try {
navigator.mediaDevices.getUserMedia(options)
.then(function(stream) {
// on success, stream it in video tag
// the video tag is hidden, as is the canvas.
console.log("Got the webcam stream");
printButton.removeAttribute("disabled","");
current_stream = stream;
video.srcObject = stream;
video.setAttribute('autoplay', '');
video.setAttribute('muted', '');
video.setAttribute('playsinline', '')
video.play();
return true;
})
.catch(function(err) {
console.log("Didn't manage to get a camera :" + err);
return false;
});
} catch (err) {
console.log("Didn't manage to get a camera :" + err);
return false;
}
}
async function check_server_camera(){
console.log("Checking for a camera on the server");
await socket.emit('get_camera_status', (response) => {
if (response){
console.log("Server has a camera !");
return true;
} else {
console.log("The server doesn't seem to have a camera");
return false;
}
});
} }
function setup_events(){ function setup_events(){
@ -66,7 +186,7 @@ function setup_events(){
}, false); }, false);
switch_cameras.addEventListener('click', function(ev) { switch_cameras.addEventListener('click', function(ev) {
flip_cameras(); flip_cameras(camera_options);
}, false ); }, false );
printButton.addEventListener('click', function(ev){ printButton.addEventListener('click', function(ev){
@ -124,7 +244,7 @@ function print_picture(data){
let time = currentDate.getHours() + ":" + currentDate.getMinutes() + ":" + currentDate.getSeconds(); let time = currentDate.getHours() + ":" + currentDate.getMinutes() + ":" + currentDate.getSeconds();
formData.set("img", picture, "picture.png"); formData.set("img", picture, "picture.png");
formData.set("signature", "Printed via the webcam @ " + time) formData.set("signature", "Webcam")
fetch(url, { fetch(url, {
method: 'POST', // or 'PUT' method: 'POST', // or 'PUT'
@ -137,26 +257,9 @@ function print_picture(data){
} }
function flip_cameras(){ function flip_cameras(camera_options){
switch (current_camera_is) { if ( supports_facing_mode ) {
case "front": get_webcam((camera_options.video.facingMode === 'user' ? 'environment' : 'user'));
try {
get_any_webcam();
} catch (e) {
console.log("Could not get another camera");
get_front_webcam();
}
break;
case "any":
try {
get_front_webcam();
} catch (e) {
console.log("Could not get another camera");
get_any_webcam();
}
break;
default:
console.log("Impossible to switch cameras : none is selected.");
} }
} }
@ -177,84 +280,6 @@ function stop_video_streams(){
} }
} }
async function get_webcam(options){
stop_video_streams();
try {
await navigator.mediaDevices.getUserMedia(options)
.then(function(stream) {
// on success, stream it in video tag
// the video tag is hidden, as is the canvas.
console.log("Got a camera ( generic )");
printButton.removeAttribute("disabled","");
current_stream = stream;
video.srcObject = stream;
video.setAttribute('autoplay', '');
video.setAttribute('muted', '');
video.setAttribute('playsinline', '')
video.play();
return true;
})
.catch(function(err) {
console.log("Didn't manage to get a camera :" + err);
return false;
});
} catch (err) {
console.log("Didn't manage to get a camera :" + err);
return false;
}
}
function get_any_webcam(){
var camera_options = {
video: {
facingMode: 'environment', // Or 'environment' if we want a camera facing away
},
audio: false
};
if(get_webcam(camera_options)){
console.log("Got any camera, or environment camera.");
current_camera_is = "any";
return true;
} else {
return false;
}
}
function get_front_webcam(){
// We try to start with the front facing camera,
// if we have no support, we switch back to a normal camera.
try {
const supports = navigator.mediaDevices.getSupportedConstraints();
if (!supports['facingMode']) {
throw new Error("This browser does not support facingMode!");
} else {
var camera_options = {
video: {
facingMode: 'user', // Or 'environment' if we want a camera facing away
},
audio: false
};
}
} catch (e) {
console.log("Resetting to default camera : " + e);
var camera_options = {
video: true,
audio: false
};
}
if(get_webcam(camera_options)){
console.log("Got the front camera");
current_camera_is = "front";
return true;
} else {
return false;
}
}
function no_webcam_error(){ function no_webcam_error(){
console.log("Seems like they is no webcam available.") console.log("Seems like they is no webcam available.")

View File

@ -56,4 +56,6 @@
<p class=" text-center">Little Prynter is built by <a href="https://n07070.xyz/about-me/">n07070</a> because it's fun :) | <a href="https://git.n07070.xyz/n07070/littleprynter">Source code - AGPLv3</a></p> <p class=" text-center">Little Prynter is built by <a href="https://n07070.xyz/about-me/">n07070</a> because it's fun :) | <a href="https://git.n07070.xyz/n07070/littleprynter">Source code - AGPLv3</a></p>
</footer> </footer>
</body> </body>
<script src="https://cdn.socket.io/4.0.0/socket.io.min.js"></script>
</html> </html>

View File

@ -99,6 +99,7 @@
</style> </style>
<script src="https://cdn.socket.io/4.0.0/socket.io.min.js"></script>
<script type="text/javascript" src="{{ url_for('static',filename='js/webcam.js') }}"></script> <script type="text/javascript" src="{{ url_for('static',filename='js/webcam.js') }}"></script>
<br> <br>

View File

@ -1,4 +1,4 @@
from flask import Flask, request from flask import Flask, request, flash
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
from printer import Printer from printer import Printer
import time import time
@ -15,28 +15,29 @@ class Web(object):
def print_sms(self, texte, sign: str): def print_sms(self, texte, sign: str):
# TODO: verify the texte before printing it here ? # TODO: verify the texte before printing it here ?
self.app.logger.debug("Printing : " + str(texte) + " from " + str(sign)) self.app.logger.debug("Printing : " + str(texte) + " from " + str(sign))
if not os.getenv('LIPY_DEBUG'): try:
time.sleep(1) self.printer.print_sms(texte, sign)
self.printer.cut()
except Exception as e:
self.app.logger.error(e)
flash("Error while printing the SMS : "+ str(e))
return self.printer.print_sms(texte, sign) flash("You message " + str( texte ) + " has been printed :)")
def print_image(self, image, sign: str) -> bool:
def print_image(self, image, sign):
self.app.logger.debug("Uploading file") self.app.logger.debug("Uploading file")
try: try:
self.app.logger.debug("Uploading file from " + str(sign)) self.app.logger.debug("Uploading file from " + str(sign))
if self.upload_file(image): if self.upload_file(image):
self.app.logger.debug("File has been uploaded, printing...") self.app.logger.debug("File has been uploaded, printing...")
self.printer.print_img(os.path.join(self.app.config['UPLOAD_FOLDER'], secure_filename(image.filename)), sign) self.printer.print_img(os.path.join(self.app.config['UPLOAD_FOLDER'], secure_filename(image.filename)), sign=sign,process=True)
return True self.printer.cut()
else:
return False
except Exception as e: except Exception as e:
self.app.logger.error(e) self.app.logger.error(e)
raise Exception flash("Could not upload file." + str(e))
else: flash("Your image has been printed :)")
flash("Could not upload file.",'error')
return False
def login(username: str,password: str) -> bool: def login(username: str,password: str) -> bool:
pass pass
@ -58,7 +59,7 @@ class Web(object):
self.app.logger.debug("File saved") self.app.logger.debug("File saved")
except Exception as e: except Exception as e:
self.app.logger.error("Could not save file") self.app.logger.error("Could not save file")
flash(e,'error') flash(str(e),'error')
return False return False
self.app.logger.debug("File saved to " + str(os.path.join(self.app.config['UPLOAD_FOLDER'], filename))) self.app.logger.debug("File saved to " + str(os.path.join(self.app.config['UPLOAD_FOLDER'], filename)))