Removed old code, added new files in src/ using classes for each moving part.
This commit is contained in:
parent
fbc6a09e60
commit
b58748e671
BIN
configuration/hello_world.png
Normal file
BIN
configuration/hello_world.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 62 KiB |
135
littleprynter.py
135
littleprynter.py
@ -1,135 +0,0 @@
|
|||||||
#!/usr/bin/python
|
|
||||||
|
|
||||||
import os, random, sys, json
|
|
||||||
from datetime import datetime
|
|
||||||
from subprocess import call
|
|
||||||
|
|
||||||
import serial
|
|
||||||
from flask import Flask, render_template, jsonify, make_response, request, redirect, url_for, abort, session, flash
|
|
||||||
from flask_limiter import Limiter
|
|
||||||
from pprint import pprint
|
|
||||||
from flask_limiter.util import get_remote_address
|
|
||||||
|
|
||||||
# Update to using the new Python 3 lib
|
|
||||||
import board
|
|
||||||
import busio
|
|
||||||
|
|
||||||
import adafruit_thermal_printer
|
|
||||||
|
|
||||||
ThermalPrinter = adafruit_thermal_printer.get_printer_class(2.69)
|
|
||||||
|
|
||||||
RX = board.RX
|
|
||||||
TX = board.TX
|
|
||||||
|
|
||||||
uart = serial.Serial("/dev/ttyAMA0", baudrate=19200, timeout=3000)
|
|
||||||
printer = ThermalPrinter(uart, auto_warm_up=False)
|
|
||||||
|
|
||||||
# Initialize the printer. Note this will take a few seconds for the printer
|
|
||||||
# to warm up and be ready to accept commands (hence calling it explicitly vs.
|
|
||||||
# automatically in the initializer with the default auto_warm_up=True).
|
|
||||||
printer.warm_up()
|
|
||||||
|
|
||||||
if printer.has_paper():
|
|
||||||
print('Printer has paper!')
|
|
||||||
else:
|
|
||||||
print('Printer might be out of paper, or RX is disconnected!')
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
app = Flask(__name__)
|
|
||||||
app.secret_key = b'\x98>3nW[D\xa4\xd4\xd0K\xab?oM.`\x98'
|
|
||||||
limiter = Limiter(
|
|
||||||
app,
|
|
||||||
key_func=get_remote_address,
|
|
||||||
default_limits=["200 per day", "50 per hour"]
|
|
||||||
)
|
|
||||||
|
|
||||||
SERIAL_PORT = '/dev/ttyAMA0'
|
|
||||||
BAUDRATE = 19200
|
|
||||||
|
|
||||||
# session['logged_in'] = False
|
|
||||||
|
|
||||||
def error_handler_limiter():
|
|
||||||
flash("Trop de requêtes !!! CANNOT PRINT !!! HAAAAAAAAAA",'dark')
|
|
||||||
return redirect(url_for('display_index_page'))
|
|
||||||
|
|
||||||
@app.errorhandler(418)
|
|
||||||
def i_m_a_tea_pot(error):
|
|
||||||
return make_response('☕\n', 418)
|
|
||||||
|
|
||||||
@app.route('/tea')
|
|
||||||
def tea():
|
|
||||||
abort(418)
|
|
||||||
|
|
||||||
@app.route('/')
|
|
||||||
@limiter.exempt
|
|
||||||
def display_index_page():
|
|
||||||
if session.get('logged_in'):
|
|
||||||
return render_template('index.html')
|
|
||||||
else:
|
|
||||||
return redirect(url_for('login'))
|
|
||||||
|
|
||||||
@app.route('/login', methods=['POST','GET'])
|
|
||||||
@limiter.limit("100 per minute", error_message=error_handler_limiter)
|
|
||||||
def login():
|
|
||||||
if request.method == 'POST':
|
|
||||||
if not session.get('logged_in'):
|
|
||||||
if request.form['username'] and request.form['password']:
|
|
||||||
# Get the json
|
|
||||||
with open('users.json') as f:
|
|
||||||
users_file = json.load(f)
|
|
||||||
for user in users_file["users"]:
|
|
||||||
if users_file["users"][user] == request.form['password']:
|
|
||||||
session['logged_in'] = True
|
|
||||||
session['user'] = request.form['username']
|
|
||||||
|
|
||||||
if not session.get('logged_in'):
|
|
||||||
flash('Mot de passe ou pseudo invalide.','danger')
|
|
||||||
return redirect(url_for('login'))
|
|
||||||
else:
|
|
||||||
return redirect(url_for('display_index_page'))
|
|
||||||
else:
|
|
||||||
flash('Incorrect logins')
|
|
||||||
return render_template('password.html')
|
|
||||||
else:
|
|
||||||
return render_template('password.html')
|
|
||||||
else:
|
|
||||||
return render_template('password.html')
|
|
||||||
|
|
||||||
@app.route("/logout")
|
|
||||||
def logout():
|
|
||||||
session['logged_in'] = False
|
|
||||||
flash('Tu est déconnecté', 'info')
|
|
||||||
return redirect(url_for('login'))
|
|
||||||
|
|
||||||
# @app.route('/print/image')
|
|
||||||
# @limiter.limit("5 per minute", error_message=error_handler_limiter)
|
|
||||||
# def print_image():
|
|
||||||
# if session.get('logged_in'):
|
|
||||||
# img = random.choice(os.listdir("static/images/")) #change dir name to whatever
|
|
||||||
# # call(["lp", "-o fit-to-page", "static/images/" + img])
|
|
||||||
# printer = Adafruit_Thermal(SERIAL_PORT, BAUDRATE, timeout=5)
|
|
||||||
# printer.begin()
|
|
||||||
# printer.feed(1)
|
|
||||||
# printer.printImage("static/images/" + img, True)
|
|
||||||
# printer.feed(2)
|
|
||||||
# return redirect(url_for('display_index_page'))
|
|
||||||
# else:
|
|
||||||
# return redirect(url_for('login'))
|
|
||||||
|
|
||||||
@app.route('/print/text', methods=['POST'])
|
|
||||||
@limiter.limit("3: per minute", error_message=error_handler_limiter)
|
|
||||||
def print_text():
|
|
||||||
if session.get('logged_in'):
|
|
||||||
if len(request.form['message']) < 200:
|
|
||||||
printer.set_defaults()
|
|
||||||
printer.print((request.form['message']))
|
|
||||||
printer.print("From" + session['user'])
|
|
||||||
printer.feed(2)
|
|
||||||
|
|
||||||
return redirect(url_for('display_index_page'))
|
|
||||||
else:
|
|
||||||
flash('Le text est trop long, 200 caractères au maximum stp !')
|
|
||||||
return redirect(url_for('display_index_page'))
|
|
||||||
else:
|
|
||||||
return redirect(url_for('login'))
|
|
51
src/main.py
Normal file
51
src/main.py
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
# Welcome to the LittlePrynter's source code.
|
||||||
|
# This program expose a web interface, with user authentification, that makes it possible to print messages from the web.
|
||||||
|
# It also exposes a API, making it possible to print and interface with much of the printer's abilities.
|
||||||
|
|
||||||
|
# We first define the connection to the printer itself,
|
||||||
|
# Then we build the API around Flask,
|
||||||
|
# Then we build the web interface, using the simple Jinja2 templating.
|
||||||
|
|
||||||
|
# Following are the librairies we import,
|
||||||
|
from flask import Flask # Used for the web framework
|
||||||
|
from printer import Printer
|
||||||
|
|
||||||
|
# We define the app module used by Flask
|
||||||
|
app = Flask(__name__)
|
||||||
|
app.secret_key = b'\x98>3nM[D\xa4\xd4\xd0K\xab?oM.`\x98'
|
||||||
|
# TODO: Get this secret key from a config file
|
||||||
|
|
||||||
|
|
||||||
|
# Printer connection
|
||||||
|
# Uses the class defined in the printer.py file
|
||||||
|
|
||||||
|
# Define the USB connections here.
|
||||||
|
# TODO: move this to a config file
|
||||||
|
vendor_id = 0x04b8
|
||||||
|
device_id = 0x0e28
|
||||||
|
|
||||||
|
|
||||||
|
printer = Printer(app)
|
||||||
|
|
||||||
|
if not printer.init_printer(vendor_id, device_id):
|
||||||
|
app.logger.error('Could not init printer, aborting.')
|
||||||
|
exit(-1)
|
||||||
|
|
||||||
|
# API routes
|
||||||
|
# The api has the following methods
|
||||||
|
# api/print/{sms,img,letter,qr,barcode}
|
||||||
|
# api/auth/{login,logout}
|
||||||
|
# api/status/{paper,ping,stats}
|
||||||
|
|
||||||
|
# If you just call the api route, you get a help back.
|
||||||
|
@app.route('/api')
|
||||||
|
def api_help():
|
||||||
|
return "Welcome to the API's help page"
|
||||||
|
|
||||||
|
@app.route('/api/print')
|
||||||
|
def index():
|
||||||
|
return "Printing..."
|
||||||
|
|
||||||
|
@app.route('/api/print/sms')
|
||||||
|
def api_print_sms():
|
||||||
|
return "Printing short message."
|
95
src/printer.py
Normal file
95
src/printer.py
Normal file
@ -0,0 +1,95 @@
|
|||||||
|
# Importing the module to mage the connection to the printer.
|
||||||
|
from escpos.printer import Usb, USBNotFoundError
|
||||||
|
from time import sleep, gmtime, strftime
|
||||||
|
import os.path
|
||||||
|
|
||||||
|
|
||||||
|
class Printer(object):
|
||||||
|
"""
|
||||||
|
# The connection is based on the ESC/POS library
|
||||||
|
|
||||||
|
## Connection to the USB printer
|
||||||
|
|
||||||
|
## Making sure the printer is alive
|
||||||
|
|
||||||
|
## Making sure it has paper
|
||||||
|
|
||||||
|
## Define default print settings
|
||||||
|
|
||||||
|
## Print starting message, log time of first print, cut.
|
||||||
|
|
||||||
|
## Annonce readyness : return a positive pong message.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Is the printer ready to accept a new print ?
|
||||||
|
ready = False
|
||||||
|
|
||||||
|
def __init__(self, app):
|
||||||
|
super(Printer, self).__init__()
|
||||||
|
self.app = app
|
||||||
|
self.ready = False
|
||||||
|
self.printer = None
|
||||||
|
|
||||||
|
def init_printer(self, device_id,vendor_id):
|
||||||
|
try:
|
||||||
|
p = Usb(vendor_id, device_id)
|
||||||
|
except USBNotFoundError as e:
|
||||||
|
self.app.logger.error("The USB device is not plugged in, aborting...")
|
||||||
|
exit(-1)
|
||||||
|
waiting_elapsed = 10
|
||||||
|
|
||||||
|
# Is the printer online ? Is the communication with the printer successfull ?
|
||||||
|
while not p.is_online():
|
||||||
|
self.app.logger.debug('Waiting for printer to get online...')
|
||||||
|
sleep(1)
|
||||||
|
waiting_elapsed -= 1
|
||||||
|
if waiting_elapsed < 1:
|
||||||
|
self.app.logger.error('Printer took more than 10 seconds to get online, aborting...')
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Let's check paper status.
|
||||||
|
ps = p.paper_status()
|
||||||
|
if ps == 0:
|
||||||
|
self.app.logger.error('Printer has no more paper, aborting...')
|
||||||
|
return False
|
||||||
|
elif ps == 1:
|
||||||
|
self.app.logger.warning('Printer needs paper to be changed very soon ! ')
|
||||||
|
sleep(5)
|
||||||
|
elif ps == 0:
|
||||||
|
self.app.logger.debug('Printer has paper, good to go')
|
||||||
|
|
||||||
|
# We're going to print a little text to let know that we're good, as a final check.
|
||||||
|
p.textln("Hello world ! \nStarting up at " + strftime("%Y-%m-%d %H:%M:%S", gmtime()))
|
||||||
|
p.set(align='center', font='a', bold=False, underline=0, width=1, height=1, density=9, invert=False, smooth=False, flip=False, double_width=False, double_height=False, custom_size=False)
|
||||||
|
p.textln("------------------")
|
||||||
|
p.qr("Hello World ! :)")
|
||||||
|
p.image("../configuration/hello_world.png",center=True)
|
||||||
|
p.cut()
|
||||||
|
|
||||||
|
self.printer = p;
|
||||||
|
self.ready = True;
|
||||||
|
return True
|
||||||
|
|
||||||
|
def print_sms(self, msg):
|
||||||
|
clean_msg = str(msg)
|
||||||
|
|
||||||
|
if len(clean_msg) > 256 or len(clean_msg) < 10 :
|
||||||
|
self.app.logger.warning("Could not print message of this length :" + len(clean_msg))
|
||||||
|
return False
|
||||||
|
else :
|
||||||
|
self.printer.textln(strftime("%Y-%m-%d %H:%M:%S", gmtime()))
|
||||||
|
self.printer.textln(clean_msg)
|
||||||
|
|
||||||
|
def print_img(self, path):
|
||||||
|
|
||||||
|
if os.path.isfile(str(path)):
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
self.app.logger.warning("File does not exist : " + str(path))
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.printer.image(path)
|
||||||
|
except Exception as e:
|
||||||
|
return e
|
||||||
|
else:
|
||||||
|
self.app.logging.debug("Printed an image : " + str(path))
|
Loading…
Reference in New Issue
Block a user