#!/usr/bin/env python3 from flask import Flask, request, Response from subprocess import Popen, PIPE import logging import os import datetime import threading import queue app = Flask(__name__) logging.basicConfig(level=logging.INFO) PRINTER = "/dev/usb/lp0" JOB_DIR = "./data/jobs" STYLE = """ * { font-family: monospace; background-color: #000; color: #fff; } a { color: #0f0; cursor: pointer; margin-right: 8px; } input, textarea { background-color: #222; color: #fff; border: 1px solid #555; } input[type="submit"] { background-color: #444; border: 1px solid #888; padding: 5px 10px; } """ # --- sanity checks --------------------------------------------------------- if not os.path.exists(PRINTER): raise RuntimeError(f"Printer device {PRINTER} does not exist") os.makedirs(JOB_DIR, exist_ok=True) # --- job queue ------------------------------------------------------------- print_queue = queue.Queue() def printer_worker(): while True: job_path = print_queue.get() try: logging.info("printing %s", job_path) with open(job_path, "rb") as infile, open(PRINTER, "wb") as outfile: proc = Popen( ["./build/escpos"], stdin=infile, stdout=outfile, stderr=PIPE, close_fds=True, ) _, stderr = proc.communicate() if proc.returncode != 0: logging.error("print failed: %s", stderr.decode()) finally: print_queue.task_done() threading.Thread(target=printer_worker, daemon=True).start() # --- routes ---------------------------------------------------------------- @app.route("/print/", methods=["POST"]) def print_escpos(print_id): job_path = f"{JOB_DIR}/{print_id}.epml" with open(job_path, "wb") as f: f.write(request.data) print_queue.put(job_path) return Response("queued\n", status=202) @app.route("/print/submit_job", methods=["POST"]) def submit_print_job(): escpos_data = request.form.get("escpos_data", "") copies = int(request.form.get("copies", "1")) job_id = datetime.datetime.now().strftime("%Y%m%d%H%M%S") job_path = f"{JOB_DIR}/{job_id}.epml" with open(job_path, "w") as f: f.write(escpos_data) for _ in range(copies): print_queue.put(job_path) return Response("queued\n", status=202) @app.route("/", methods=["GET"]) def index(): return f"""

ESC/POS Print Server

Insert ESC/POS formatting tokens:

[NORMAL] [CENTER] [BOLD] [CUT] [INVERT ON] [INVERT OFF] [WIDE] [HR] [NL]



Copies:

""" # --- main ------------------------------------------------------------------ if __name__ == "__main__": app.run(host="0.0.0.0", port=8001, debug=False, use_reloader=False)