-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgemini.py
105 lines (95 loc) · 4.22 KB
/
gemini.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import getpass
import urllib.request
import urllib.parse
import posixpath
import ssl
import socket
import warnings
import urllib.error
from urllib.response import addinfourl
import email
# WHY DOES GEMINI'S URL PARSER HAVE TO BE SO F**KING REBELLIOUS!?
# this function is buggy and bad and i don't understand why it doesn't work
def gemini_urljoin(old, new):
print(old, new)
old_parsed = urllib.parse.urlparse(old)
new_parsed = list(urllib.parse.urlparse(new))
if ':' not in new and new[0] != '/': # relative directory.
if old_parsed.path not in ('', '/'):
new_parsed = [old_parsed.scheme, old_parsed.netloc,
posixpath.join(posixpath.dirname(old_parsed.path), new), '', '', '']
else:
new_parsed = [old_parsed.scheme, old_parsed.netloc, new, '', '', '']
elif new[0] == '/':
if len(new) > 1 and new[1] == '/':
return 'gemini:' + new
new_parsed = [old_parsed.scheme, old_parsed.netloc, new, '', '', '']
else:
if not new_parsed[0]:
new_parsed[0] = old_parsed[0]
if not new_parsed[1]:
new_parsed[1] = old_parsed[1]
if './' in new_parsed[2]:
new_parsed[2] = posixpath.normpath('/'+new_parsed[2])[1:]
return urllib.parse.urlunparse(new_parsed)
class GeminiWarning(Warning):
pass
class GeminiHandler(urllib.request.BaseHandler):
__slots__ = ('sslcontext',)
def __init__(self, ctx=None, input=input, sensitive_input=getpass.getpass):
self.sslcontext = ctx or ssl.SSLContext()
self.input = input
self.sensitive_input = sensitive_input
def gemini_open(self, req: urllib.request.Request):
host, _, port = req.host.partition(':')
port = int(port) if port else 1965
s = self.sslcontext.wrap_socket(socket.create_connection((host, port)), server_hostname=host)
s.sendall(req.full_url.encode('ascii'))
s.sendall(b'\r\n')
f = s.makefile('rb')
s.close() # iorefcount will keep the socket open.
status = f.read(2).decode()
meta = f.readline().decode()
# So far, I have only encountered one gemini capsule that uses a tab instead of a space in the status line,
# but support it I shall
if meta[0] != ' ' or meta[-2:] != '\r\n':
# Having this as a warning instead of an error *technically* causes this to fail conman.org test #10.
warnings.warn(GeminiWarning("Gemini protocol violation re: status line format."))
print(repr(meta))
meta = meta.strip()
if status[0] == '2':
headers = email.message_from_string('Content-Type: %s\n' % meta)
if headers.get_content_charset() is None:
headers.set_charset('utf8')
print('GEMINI OK')
return addinfourl(f, headers, req.full_url, int(status))
f.close() # non-2x status will never return a body
error_handler = getattr(self.parent, 'gemini_error_%sx' % status[0], None)
if not error_handler:
error_handler = getattr(self, 'gemini_error_%sx' % status[0], None)
if not error_handler:
raise urllib.error.URLError(meta)
return error_handler(status, meta, req)
def gemini_error_3x(self, error, dest_url, req):
if dest_url.startswith('//'):
dest_url = 'gemini:' + dest_url
print('GEMINI REDIRECTING', dest_url)
new_req = urllib.request.Request(gemini_urljoin(req.full_url, dest_url))
redirs = getattr(req, 'redirs', set())
if dest_url in redirs:
raise urllib.error.URLError("infinite redirect loop")
if len(redirs) > 25:
raise urllib.error.URLError("Too many redirects (25)")
redirs.add(dest_url)
new_req.redirs = redirs
return self.parent.open(new_req)
def gemini_error_1x(self, error, prompt, req):
if error == '11':
user_input = self.sensitive_input(prompt)
else:
user_input = self.input(prompt)
import urllib.parse
parsed = list(urllib.parse.urlparse(req.full_url))
parsed[4] = urllib.parse.quote(user_input)
unparsed = urllib.parse.urlunparse(parsed)
return self.parent.open(unparsed)