import json import config import cgi import os import sqlite3 import mimetypes import urllib.parse import Admin import utils def playout(filename, content = "text/html"): return ["", [('Content-Type', content if content else "application/octet-stream"), ("X-Accel-Redirect", filename)], None] def resolve_alias(alias): resolved = None db = sqlite3.connect(config.database, check_same_thread = False) cursor = db.cursor() cursor.execute("SELECT user, real, access, storage, display FROM aliases WHERE alias=:alias", {"alias": alias}) data = cursor.fetchone() if data: resolved = {"user": data[0], "path": data[1], "access": data[2], "limit": data[3], "display": data[4]} else: print("Unknown alias " + alias) db.close() return resolved def listing(alias): listing = {"total": 0, "files": [], "access": alias["access"], "display": alias["display"]} if 'r' in alias["access"]: directory = utils.target_filename(alias, None) listing["files"] = utils.dirlisting(directory, True, False) # Calculate total size for file in listing["files"]: listing["total"] += file["size"] return [json.dumps(listing), [('Content-Type','application/json')], "200 OK"] def upload(alias, post): if 'c' not in alias["access"]: return ["", [('Content-Type','text/html')], "403 No"] if post["file"].filename: target = utils.target_filename(alias, utils.sanitize_filename(post["file"].filename)) while os.path.isfile(target): target += "_dup" try: open(target, 'wb').write(post["file"].file.read()) print("Uploaded " + target) return ["", [('Content-Type','text/html')], "200 OK"] except PermissionError as e: print("Failed to store " + target + ": " + e.msg) return ["", [('Content-Type','text/html')], "500 Error"] else: print("Upload failed") return ["", [('Content-Type','text/html')], "500 Error"] def route(path, env, post): if len(path) == 1 and not path[0]: return config.Auth.login(env, post) if path[0] == "admin": return Admin.route(path, env, post) if path[0] == "favicon.ico": return playout("/assets/favicon.ico") # Get mapped user/path/limits alias = resolve_alias(path[0]) if not alias: return utils.redirect(config.homepage) # Redirect if no slash after alias if len(path) == 1: return utils.redirect(path[0] + "/"); if len(path) > 1 and path[1] == "upload": return upload(alias, post) if len(path) > 1 and path[1] == "listing": return listing(alias) if len(path) > 1 and path[1] == "file": filename = utils.sanitize_filename(path[2]) return playout("/data/" + utils.target_filename_internal(alias, filename), mimetypes.guess_type(filename)[0]) if len(path) > 1 and path[1] == "preview" and alias["display"] == "gallery": filename = utils.sanitize_filename(path[2]) return playout("/data/" + utils.target_filename_internal(alias, "preview/" + filename), mimetypes.guess_type(filename)[0]) return playout("/interface/listing.htm") def handle_request(env, response): path = env.get('PATH_INFO', '').lstrip('/').split('/') post = None headers = [] # Read POST data try: if env.get('CONTENT_TYPE', '').startswith("multipart/form-data"): post = cgi.FieldStorage(environ=env, fp=env['wsgi.input'], keep_blank_values=True) else: content_length = int(env.get('CONTENT_LENGTH', '0')) post_raw = env["wsgi.input"].read(content_length).decode('utf-8') post = urllib.parse.parse_qs(post_raw) except ValueError as e: post = None # Route request data, addtl_headers, code = route(path, env, post) headers.extend(addtl_headers) response(code if code else "200 OK", headers) return [bytes(data, "utf-8")]