1 from __future__ import division
2 from future import standard_library
3 standard_library.install_aliases()
4 from builtins import str
15 from . import arvados_testutil as tutil
17 _debug = os.environ.get('ARVADOS_DEBUG', None)
20 class StubKeepServers(tutil.ApiClientMock):
23 super(StubKeepServers, self).setUp()
24 sock = socket.socket()
25 sock.bind(('0.0.0.0', 0))
26 self.port = sock.getsockname()[1]
28 self.server = Server(('0.0.0.0', self.port), Handler)
29 self.thread = threading.Thread(target=self.server.serve_forever)
30 self.thread.daemon = True # Exit thread if main proc exits
32 self.api_client = self.mock_keep_services(
34 service_host='localhost',
35 service_port=self.port,
39 self.server.shutdown()
40 super(StubKeepServers, self).tearDown()
43 class Server(socketserver.ThreadingMixIn, http.server.HTTPServer, object):
45 allow_reuse_address = 1
47 def __init__(self, *args, **kwargs):
50 # before reading request headers
52 # before reading request body
54 # before setting response status and headers
56 # before sending response body
58 # before returning from handler (thus setting response EOF)
60 # after writing over 1s worth of data at self.bandwidth
62 # after reading over 1s worth of data at self.bandwidth
66 super(Server, self).__init__(*args, **kwargs)
68 def setdelays(self, **kwargs):
69 """In future requests, induce delays at the given checkpoints."""
70 for (k, v) in kwargs.items():
71 self.delays.get(k) # NameError if unknown key
74 def setbandwidth(self, bandwidth):
75 """For future requests, set the maximum bandwidth (number of bytes per
76 second) to operate at. If setbandwidth is never called, function at
77 maximum bandwidth possible"""
78 self.bandwidth = float(bandwidth)
80 def _sleep_at_least(self, seconds):
81 """Sleep for given time, even if signals are received."""
82 wake = time.time() + seconds
86 todo = wake - time.time()
88 def _do_delay(self, k):
89 self._sleep_at_least(self.delays[k])
92 class Handler(http.server.BaseHTTPRequestHandler, object):
94 protocol_version = 'HTTP/1.1'
96 def wfile_bandwidth_write(self, data_to_write):
97 if self.server.bandwidth is None and self.server.delays['mid_write'] == 0:
98 self.wfile.write(data_to_write)
100 BYTES_PER_WRITE = int(self.server.bandwidth/4) or 32768
101 outage_happened = False
102 num_bytes = len(data_to_write)
104 target_time = time.time()
105 while num_sent_bytes < num_bytes:
106 if num_sent_bytes > self.server.bandwidth and not outage_happened:
107 self.server._do_delay('mid_write')
108 target_time += self.server.delays['mid_write']
109 outage_happened = True
110 num_write_bytes = min(BYTES_PER_WRITE,
111 num_bytes - num_sent_bytes)
112 self.wfile.write(data_to_write[
113 num_sent_bytes:num_sent_bytes+num_write_bytes])
114 num_sent_bytes += num_write_bytes
115 if self.server.bandwidth is not None:
116 target_time += num_write_bytes / self.server.bandwidth
117 self.server._sleep_at_least(target_time - time.time())
120 def rfile_bandwidth_read(self, bytes_to_read):
121 if self.server.bandwidth is None and self.server.delays['mid_read'] == 0:
122 return self.rfile.read(bytes_to_read)
124 BYTES_PER_READ = int(self.server.bandwidth/4) or 32768
126 outage_happened = False
128 target_time = time.time()
129 while bytes_to_read > bytes_read:
130 if bytes_read > self.server.bandwidth and not outage_happened:
131 self.server._do_delay('mid_read')
132 target_time += self.server.delays['mid_read']
133 outage_happened = True
134 next_bytes_to_read = min(BYTES_PER_READ,
135 bytes_to_read - bytes_read)
136 data += self.rfile.read(next_bytes_to_read)
137 bytes_read += next_bytes_to_read
138 if self.server.bandwidth is not None:
139 target_time += next_bytes_to_read / self.server.bandwidth
140 self.server._sleep_at_least(target_time - time.time())
143 def finish(self, *args, **kwargs):
145 return super(Handler, self).finish(*args, **kwargs)
146 except Exception as err:
150 def handle(self, *args, **kwargs):
152 return super(Handler, self).handle(*args, **kwargs)
157 def handle_one_request(self, *args, **kwargs):
158 self._sent_continue = False
159 self.server._do_delay('request')
160 return super(Handler, self).handle_one_request(*args, **kwargs)
162 def handle_expect_100(self):
163 self.server._do_delay('request_body')
164 self._sent_continue = True
165 return super(Handler, self).handle_expect_100()
168 self.server._do_delay('response')
169 r = re.search(r'[0-9a-f]{32}', self.path)
171 return self.send_response(422)
172 datahash = r.group(0)
173 if datahash not in self.server.store:
174 return self.send_response(404)
175 self.send_response(200)
176 self.send_header('Connection', 'close')
177 self.send_header('Content-type', 'application/octet-stream')
179 self.server._do_delay('response_body')
180 self.wfile_bandwidth_write(self.server.store[datahash])
181 self.server._do_delay('response_close')
184 self.server._do_delay('response')
185 r = re.search(r'[0-9a-f]{32}', self.path)
187 return self.send_response(422)
188 datahash = r.group(0)
189 if datahash not in self.server.store:
190 return self.send_response(404)
191 self.send_response(200)
192 self.send_header('Connection', 'close')
193 self.send_header('Content-type', 'application/octet-stream')
194 self.send_header('Content-length', str(len(self.server.store[datahash])))
196 self.server._do_delay('response_close')
197 self.close_connection = True
200 if not self._sent_continue and self.headers.get('expect') == '100-continue':
201 # The comments at https://bugs.python.org/issue1491
202 # implies that Python 2.7 BaseHTTPRequestHandler was
203 # patched to support 100 Continue, but reading the actual
204 # code that ships in Debian it clearly is not, so we need
205 # to send the response on the socket directly.
206 self.server._do_delay('request_body')
207 self.wfile.write("{} {} {}\r\n\r\n".format(
208 self.protocol_version, 100, "Continue").encode())
209 data = self.rfile_bandwidth_read(
210 int(self.headers.get('content-length')))
211 datahash = hashlib.md5(data).hexdigest()
212 self.server.store[datahash] = data
213 resp = '{}+{}\n'.format(datahash, len(data)).encode()
214 self.server._do_delay('response')
215 self.send_response(200)
216 self.send_header('Connection', 'close')
217 self.send_header('Content-type', 'text/plain')
218 self.send_header('Content-length', len(resp))
220 self.server._do_delay('response_body')
221 self.wfile_bandwidth_write(resp)
222 self.server._do_delay('response_close')
223 self.close_connection = True
225 def log_request(self, *args, **kwargs):
227 super(Handler, self).log_request(*args, **kwargs)