1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
import json
import config
import cgi
import os
import sqlite3
import mimetypes
import urllib.parse
import Admin
import utils
def playout(filename, content = "text/html"):
return ["", [('Content-Type', content if content else "application/octet-stream"), ("X-Accel-Redirect", filename)], None]
def resolve_alias(alias):
resolved = None
db = sqlite3.connect(config.database, check_same_thread = False)
cursor = db.cursor()
cursor.execute("SELECT user, real, access, storage, display FROM aliases WHERE alias=:alias", {"alias": alias})
data = cursor.fetchone()
if data:
resolved = {"user": data[0], "path": data[1], "access": data[2], "limit": data[3], "display": data[4]}
else:
print("Unknown alias " + alias)
db.close()
return resolved
def listing(alias):
listing = {"total": 0, "files": [], "access": alias["access"], "display": alias["display"]}
if 'r' in alias["access"]:
directory = utils.target_filename(alias, None)
listing["files"] = utils.dirlisting(directory, True, False)
# Calculate total size
for file in listing["files"]:
listing["total"] += file["size"]
return [json.dumps(listing), [('Content-Type','application/json')], "200 OK"]
def upload(alias, post):
if 'c' not in alias["access"]:
return ["", [('Content-Type','text/html')], "403 No"]
if post["file"].filename:
target = utils.target_filename(alias, utils.sanitize_filename(post["file"].filename))
while os.path.isfile(target):
target += "_dup"
try:
open(target, 'wb').write(post["file"].file.read())
print("Uploaded " + target)
return ["", [('Content-Type','text/html')], "200 OK"]
except PermissionError as e:
print("Failed to store " + target + ": " + e.msg)
return ["", [('Content-Type','text/html')], "500 Error"]
else:
print("Upload failed")
return ["", [('Content-Type','text/html')], "500 Error"]
def route(path, env, post):
if len(path) == 1 and not path[0]:
return config.Auth.login(env, post)
if path[0] == "admin":
return Admin.route(path, env, post)
if path[0] == "favicon.ico":
return playout("/assets/favicon.ico")
# Get mapped user/path/limits
alias = resolve_alias(path[0])
if not alias:
return utils.redirect(config.homepage)
# Redirect if no slash after alias
if len(path) == 1:
return utils.redirect(path[0] + "/");
if len(path) > 1 and path[1] == "upload":
return upload(alias, post)
if len(path) > 1 and path[1] == "listing":
return listing(alias)
if len(path) > 1 and path[1] == "file":
filename = utils.sanitize_filename(path[2])
return playout("/data/" + utils.target_filename_internal(alias, filename), mimetypes.guess_type(filename)[0])
if len(path) > 1 and path[1] == "preview" and alias["display"] == "gallery":
filename = utils.sanitize_filename(path[2])
return playout("/data/" + utils.target_filename_internal(alias, "preview/" + filename), mimetypes.guess_type(filename)[0])
return playout("/interface/listing.htm")
def handle_request(env, response):
path = env.get('PATH_INFO', '').lstrip('/').split('/')
post = None
headers = []
# Read POST data
try:
if env.get('CONTENT_TYPE', '').startswith("multipart/form-data"):
post = cgi.FieldStorage(environ=env, fp=env['wsgi.input'], keep_blank_values=True)
else:
content_length = int(env.get('CONTENT_LENGTH', '0'))
post_raw = env["wsgi.input"].read(content_length).decode('utf-8')
post = urllib.parse.parse_qs(post_raw)
except ValueError as e:
post = None
# Route request
data, addtl_headers, code = route(path, env, post)
headers.extend(addtl_headers)
response(code if code else "200 OK", headers)
return [bytes(data, "utf-8")]
|