8 class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object):
10 allow_reuse_address = 1
12 def __init__(self, *args, **kwargs):
15 # before reading request headers
17 # before reading request body
19 # before setting response status and headers
21 # before sending response body
23 # before returning from handler (thus setting response EOF)
25 # after writing over 1s worth of data at self.bandwidth
27 # after reading over 1s worth of data at self.bandwidth
31 super(Server, self).__init__(*args, **kwargs)
33 def setdelays(self, **kwargs):
34 """In future requests, induce delays at the given checkpoints."""
35 for (k, v) in kwargs.iteritems():
36 self.delays.get(k) # NameError if unknown key
39 def setbandwidth(self, bandwidth):
40 """For future requests, set the maximum bandwidth (number of bytes per
41 second) to operate at. If setbandwidth is never called, function at
42 maximum bandwidth possible"""
43 self.bandwidth = float(bandwidth)
45 def _sleep_at_least(self, seconds):
46 """Sleep for given time, even if signals are received."""
47 wake = time.time() + seconds
51 todo = wake - time.time()
53 def _do_delay(self, k):
54 self._sleep_at_least(self.delays[k])
57 class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
58 def wfile_bandwidth_write(self, data_to_write):
59 if self.server.bandwidth == None and self.server.delays['mid_write'] == 0:
60 self.wfile.write(data_to_write)
62 BYTES_PER_WRITE = int(self.server.bandwidth/4.0) or 32768
63 outage_happened = False
64 num_bytes = len(data_to_write)
66 target_time = time.time()
67 while num_sent_bytes < num_bytes:
68 if num_sent_bytes > self.server.bandwidth and not outage_happened:
69 self.server._do_delay('mid_write')
70 target_time += self.delays['mid_write']
71 outage_happened = True
72 num_write_bytes = min(BYTES_PER_WRITE,
73 num_bytes - num_sent_bytes)
74 self.wfile.write(data_to_write[
75 num_sent_bytes:num_sent_bytes+num_write_bytes])
76 num_sent_bytes += num_write_bytes
77 if self.server.bandwidth is not None:
78 target_time += num_write_bytes / self.server.bandwidth
79 self.server._sleep_at_least(target_time - time.time())
82 def rfile_bandwidth_read(self, bytes_to_read):
83 if self.server.bandwidth == None and self.server.delays['mid_read'] == 0:
84 return self.rfile.read(bytes_to_read)
86 BYTES_PER_READ = int(self.server.bandwidth/4.0) or 32768
88 outage_happened = False
90 target_time = time.time()
91 while bytes_to_read > bytes_read:
92 if bytes_read > self.server.bandwidth and not outage_happened:
93 self.server._do_delay('mid_read')
94 target_time += self.delays['mid_read']
95 outage_happened = True
96 next_bytes_to_read = min(BYTES_PER_READ,
97 bytes_to_read - bytes_read)
98 data += self.rfile.read(next_bytes_to_read)
99 bytes_read += next_bytes_to_read
100 if self.server.bandwidth is not None:
101 target_time += next_bytes_to_read / self.server.bandwidth
102 self.server._sleep_at_least(target_time - time.time())
105 def handle(self, *args, **kwargs):
106 self.server._do_delay('request')
107 return super(Handler, self).handle(*args, **kwargs)
110 self.server._do_delay('response')
111 r = re.search(r'[0-9a-f]{32}', self.path)
113 return self.send_response(422)
114 datahash = r.group(0)
115 if datahash not in self.server.store:
116 return self.send_response(404)
117 self.send_response(200)
118 self.send_header('Content-type', 'application/octet-stream')
120 self.server._do_delay('response_body')
121 self.wfile_bandwidth_write(self.server.store[datahash])
122 self.server._do_delay('response_close')
125 self.server._do_delay('request_body')
126 # The comments at https://bugs.python.org/issue1491 implies that Python
127 # 2.7 BaseHTTPRequestHandler was patched to support 100 Continue, but
128 # reading the actual code that ships in Debian it clearly is not, so we
129 # need to send the response on the socket directly.
130 self.wfile_bandwidth_write("%s %d %s\r\n\r\n" %
131 (self.protocol_version, 100, "Continue"))
132 data = self.rfile_bandwidth_read(int(self.headers.getheader('content-length')))
133 datahash = hashlib.md5(data).hexdigest()
134 self.server.store[datahash] = data
135 self.server._do_delay('response')
136 self.send_response(200)
137 self.send_header('Content-type', 'text/plain')
139 self.server._do_delay('response_body')
140 self.wfile_bandwidth_write(datahash + '+' + str(len(data)))
141 self.server._do_delay('response_close')
143 def log_request(self, *args, **kwargs):
144 if os.environ.get('ARVADOS_DEBUG', None):
145 super(Handler, self).log_request(*args, **kwargs)
147 def finish(self, *args, **kwargs):
148 """Ignore exceptions, notably "Broken pipe" when client times out."""
150 return super(Handler, self).finish(*args, **kwargs)
154 def handle_one_request(self, *args, **kwargs):
155 """Ignore exceptions, notably "Broken pipe" when client times out."""
157 return super(Handler, self).handle_one_request(*args, **kwargs)