1 from __future__ import division
2 from future import standard_library
3 standard_library.install_aliases()
4 from builtins import str
13 class Server(socketserver.ThreadingMixIn, http.server.HTTPServer, object):
15 allow_reuse_address = 1
17 def __init__(self, *args, **kwargs):
20 # before reading request headers
22 # before reading request body
24 # before setting response status and headers
26 # before sending response body
28 # before returning from handler (thus setting response EOF)
30 # after writing over 1s worth of data at self.bandwidth
32 # after reading over 1s worth of data at self.bandwidth
36 super(Server, self).__init__(*args, **kwargs)
38 def setdelays(self, **kwargs):
39 """In future requests, induce delays at the given checkpoints."""
40 for (k, v) in kwargs.items():
41 self.delays.get(k) # NameError if unknown key
44 def setbandwidth(self, bandwidth):
45 """For future requests, set the maximum bandwidth (number of bytes per
46 second) to operate at. If setbandwidth is never called, function at
47 maximum bandwidth possible"""
48 self.bandwidth = float(bandwidth)
50 def _sleep_at_least(self, seconds):
51 """Sleep for given time, even if signals are received."""
52 wake = time.time() + seconds
56 todo = wake - time.time()
58 def _do_delay(self, k):
59 self._sleep_at_least(self.delays[k])
62 class Handler(http.server.BaseHTTPRequestHandler, object):
63 def wfile_bandwidth_write(self, data_to_write):
64 if self.server.bandwidth == None and self.server.delays['mid_write'] == 0:
65 self.wfile.write(data_to_write)
67 BYTES_PER_WRITE = int(self.server.bandwidth/4) or 32768
68 outage_happened = False
69 num_bytes = len(data_to_write)
71 target_time = time.time()
72 while num_sent_bytes < num_bytes:
73 if num_sent_bytes > self.server.bandwidth and not outage_happened:
74 self.server._do_delay('mid_write')
75 target_time += self.delays['mid_write']
76 outage_happened = True
77 num_write_bytes = min(BYTES_PER_WRITE,
78 num_bytes - num_sent_bytes)
79 self.wfile.write(data_to_write[
80 num_sent_bytes:num_sent_bytes+num_write_bytes])
81 num_sent_bytes += num_write_bytes
82 if self.server.bandwidth is not None:
83 target_time += num_write_bytes / self.server.bandwidth
84 self.server._sleep_at_least(target_time - time.time())
87 def rfile_bandwidth_read(self, bytes_to_read):
88 if self.server.bandwidth == None and self.server.delays['mid_read'] == 0:
89 return self.rfile.read(bytes_to_read)
91 BYTES_PER_READ = int(self.server.bandwidth/4) or 32768
93 outage_happened = False
95 target_time = time.time()
96 while bytes_to_read > bytes_read:
97 if bytes_read > self.server.bandwidth and not outage_happened:
98 self.server._do_delay('mid_read')
99 target_time += self.delays['mid_read']
100 outage_happened = True
101 next_bytes_to_read = min(BYTES_PER_READ,
102 bytes_to_read - bytes_read)
103 data += self.rfile.read(next_bytes_to_read)
104 bytes_read += next_bytes_to_read
105 if self.server.bandwidth is not None:
106 target_time += next_bytes_to_read / self.server.bandwidth
107 self.server._sleep_at_least(target_time - time.time())
110 def handle(self, *args, **kwargs):
111 self.server._do_delay('request')
112 return super(Handler, self).handle(*args, **kwargs)
115 self.server._do_delay('response')
116 r = re.search(r'[0-9a-f]{32}', self.path)
118 return self.send_response(422)
119 datahash = r.group(0)
120 if datahash not in self.server.store:
121 return self.send_response(404)
122 self.send_response(200)
123 self.send_header('Content-type', 'application/octet-stream')
125 self.server._do_delay('response_body')
126 self.wfile_bandwidth_write(self.server.store[datahash])
127 self.server._do_delay('response_close')
130 self.server._do_delay('response')
131 r = re.search(r'[0-9a-f]{32}', self.path)
133 return self.send_response(422)
134 datahash = r.group(0)
135 if datahash not in self.server.store:
136 return self.send_response(404)
137 self.send_response(200)
138 self.send_header('Content-type', 'application/octet-stream')
139 self.send_header('Content-length', str(len(self.server.store[datahash])))
141 self.server._do_delay('response_close')
143 def handle_expect_100(self):
144 self.server._do_delay('request_body')
147 if sys.version_info < (3, 0):
148 # The comments at https://bugs.python.org/issue1491
149 # implies that Python 2.7 BaseHTTPRequestHandler was
150 # patched to support 100 Continue, but reading the actual
151 # code that ships in Debian it clearly is not, so we need
152 # to send the response on the socket directly.
153 self.server._do_delay('request_body')
154 self.wfile.write("{} {} {}\r\n\r\n".format(
155 self.protocol_version, 100, "Continue"))
156 data = self.rfile_bandwidth_read(
157 int(self.headers.get('content-length')))
158 datahash = hashlib.md5(data).hexdigest()
159 self.server.store[datahash] = data
160 self.server._do_delay('response')
161 self.send_response(200)
162 self.send_header('Content-type', 'text/plain')
164 self.server._do_delay('response_body')
165 self.wfile_bandwidth_write(datahash + '+' + str(len(data)))
166 self.server._do_delay('response_close')
168 def log_request(self, *args, **kwargs):
169 if os.environ.get('ARVADOS_DEBUG', None):
170 super(Handler, self).log_request(*args, **kwargs)
172 def finish(self, *args, **kwargs):
173 """Ignore exceptions, notably "Broken pipe" when client times out."""
175 return super(Handler, self).finish(*args, **kwargs)
179 def handle_one_request(self, *args, **kwargs):
180 """Ignore exceptions, notably "Broken pipe" when client times out."""
182 return super(Handler, self).handle_one_request(*args, **kwargs)