#!/usr/bin/env python3 import argparse, os, time, io, sys, shutil from http.server import BaseHTTPRequestHandler, HTTPServer from urllib.parse import urlparse, unquote from datetime import datetime import cgi def safe_join(base, *paths): # Evita path traversal final = os.path.abspath(os.path.join(base, *paths)) base = os.path.abspath(base) if not final.startswith(base + os.sep) and final != base: raise ValueError("Path traversal detectado") return final def uniquify(path): if not os.path.exists(path): return path root, ext = os.path.splitext(path) ts = datetime.now().strftime("%Y%m%d-%H%M%S") return f"{root}_{ts}{ext}" class UploadHandler(BaseHTTPRequestHandler): server_version = "MiniUpload/1.0" def logx(self, msg): sys.stdout.write(f"[{datetime.now().isoformat(timespec='seconds')}] {self.client_address[0]} - {msg}\n") sys.stdout.flush() def _common_headers(self, code=200, ct="text/plain; charset=utf-8"): self.send_response(code) self.send_header("Content-Type", ct) self.end_headers() def do_GET(self): # Pequeña página de estado y lista de archivos parsed = urlparse(self.path) if parsed.path not in ("/", "/upload"): self._common_headers(404) self.wfile.write(b"Not found") return files = sorted(os.listdir(self.server.dest), key=lambda x: os.path.getmtime(os.path.join(self.server.dest, x)), reverse=True) out = io.StringIO() out.write(f"File upload available at /upload\nSaves in: {self.server.dest}\n\n") out.write("Recent files:\n") for f in files[:200]: p = os.path.join(self.server.dest, f) sz = os.path.getsize(p) mtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(os.path.getmtime(p))) out.write(f"- {f} ({sz} bytes) [{mtime}]\n") data = out.getvalue().encode("utf-8", "ignore") self._common_headers(200, "text/plain; charset=utf-8") self.wfile.write(data) def do_POST(self): parsed = urlparse(self.path) if parsed.path != "/upload": self._common_headers(404) self.wfile.write(b"Can only POST to /upload") return ctype, pdict = cgi.parse_header(self.headers.get('Content-Type', '')) clen = int(self.headers.get('Content-Length', '0')) ua = self.headers.get('User-Agent', '-') if ctype != 'multipart/form-data': self._common_headers(400) self.wfile.write(b"Expected multipart/form-data") return boundary = pdict.get('boundary') if not boundary: self._common_headers(400) self.wfile.write(b"Missing multipart boundary") return fs = cgi.FieldStorage( fp=self.rfile, headers=self.headers, environ={'REQUEST_METHOD': 'POST', 'CONTENT_TYPE': self.headers.get('Content-Type'), } ) saved_any = False if isinstance(fs.list, list): for field in fs.list: if field.filename and field.file: original_name = os.path.basename(field.filename) try: dest_path = safe_join(self.server.dest, original_name) except ValueError: self._common_headers(400) self.wfile.write(b"Invalid filename") return dest_path = uniquify(dest_path) with open(dest_path, "wb") as f: shutil.copyfileobj(field.file, f) sz = os.path.getsize(dest_path) self.logx(f'POST /upload -> saved "{os.path.basename(dest_path)}" ({sz} bytes), UA="{ua}"') saved_any = True if not saved_any: self._common_headers(400) self.wfile.write(b"No file parts found") return self._common_headers(200) self.wfile.write(b"OK") def do_PUT(self): # PUT /upload/filename.ext (cuerpo = datos) parsed = urlparse(self.path) if not parsed.path.startswith("/upload/") or parsed.path == "/upload/": self._common_headers(404) self.wfile.write(b"PUT to /upload/") return target_name = os.path.basename(unquote(parsed.path[len("/upload/"):])) if not target_name: self._common_headers(400) self.wfile.write(b"Missing filename") return try: dest_path = safe_join(self.server.dest, target_name) except ValueError: self._common_headers(400) self.wfile.write(b"Invalid filename") return dest_path = uniquify(dest_path) clen = int(self.headers.get('Content-Length', '0')) ua = self.headers.get('User-Agent', '-') with open(dest_path, "wb") as f: remaining = clen # lee en chunks por si es grande while remaining > 0: chunk = self.rfile.read(min(65536, remaining)) if not chunk: break f.write(chunk) remaining -= len(chunk) sz = os.path.getsize(dest_path) self.logx(f'PUT {self.path} -> saved "{os.path.basename(dest_path)}" ({sz} bytes), UA="{ua}"') self._common_headers(201) self.wfile.write(b"CREATED") def main(): ap = argparse.ArgumentParser(description="Mini servidor para recibir archivos por POST/PUT") ap.add_argument("--host", default="0.0.0.0") ap.add_argument("--port", type=int, default=8000) ap.add_argument("--dir", dest="dest", default=".", help="Carpeta donde guardar") args = ap.parse_args() os.makedirs(args.dest, exist_ok=True) httpd = HTTPServer((args.host, args.port), UploadHandler) httpd.dest = os.path.abspath(args.dest) print(f"File upload available at /upload") print(f"Serving HTTP on {args.host} port {args.port} (saving to {httpd.dest}) ...") try: httpd.serve_forever() except KeyboardInterrupt: print("\nShutting down...") if __name__ == "__main__": main()