11308: Fix modes not tested in test case.
[arvados.git] / sdk / python / tests / keepstub.py
1 from __future__ import division
2 from future import standard_library
3 standard_library.install_aliases()
4 from builtins import str
5 import http.server
6 import hashlib
7 import os
8 import re
9 import socket
10 import socketserver
11 import sys
12 import threading
13 import time
14
15 from . import arvados_testutil as tutil
16
17 _debug = os.environ.get('ARVADOS_DEBUG', None)
18
19
20 class StubKeepServers(tutil.ApiClientMock):
21
22     def setUp(self):
23         super(StubKeepServers, self).setUp()
24         sock = socket.socket()
25         sock.bind(('0.0.0.0', 0))
26         self.port = sock.getsockname()[1]
27         sock.close()
28         self.server = Server(('0.0.0.0', self.port), Handler)
29         self.thread = threading.Thread(target=self.server.serve_forever)
30         self.thread.daemon = True # Exit thread if main proc exits
31         self.thread.start()
32         self.api_client = self.mock_keep_services(
33             count=1,
34             service_host='localhost',
35             service_port=self.port,
36         )
37
38     def tearDown(self):
39         self.server.shutdown()
40         super(StubKeepServers, self).tearDown()
41
42
43 class Server(socketserver.ThreadingMixIn, http.server.HTTPServer, object):
44
45     allow_reuse_address = 1
46
47     def __init__(self, *args, **kwargs):
48         self.store = {}
49         self.delays = {
50             # before reading request headers
51             'request': 0,
52             # before reading request body
53             'request_body': 0,
54             # before setting response status and headers
55             'response': 0,
56             # before sending response body
57             'response_body': 0,
58             # before returning from handler (thus setting response EOF)
59             'response_close': 0,
60             # after writing over 1s worth of data at self.bandwidth
61             'mid_write': 0,
62             # after reading over 1s worth of data at self.bandwidth
63             'mid_read': 0,
64         }
65         self.bandwidth = None
66         super(Server, self).__init__(*args, **kwargs)
67
68     def setdelays(self, **kwargs):
69         """In future requests, induce delays at the given checkpoints."""
70         for (k, v) in kwargs.items():
71             self.delays.get(k)  # NameError if unknown key
72             self.delays[k] = v
73
74     def setbandwidth(self, bandwidth):
75         """For future requests, set the maximum bandwidth (number of bytes per
76         second) to operate at. If setbandwidth is never called, function at
77         maximum bandwidth possible"""
78         self.bandwidth = float(bandwidth)
79
80     def _sleep_at_least(self, seconds):
81         """Sleep for given time, even if signals are received."""
82         wake = time.time() + seconds
83         todo = seconds
84         while todo > 0:
85             time.sleep(todo)
86             todo = wake - time.time()
87
88     def _do_delay(self, k):
89         self._sleep_at_least(self.delays[k])
90
91
92 class Handler(http.server.BaseHTTPRequestHandler, object):
93
94     protocol_version = 'HTTP/1.1'
95
96     def wfile_bandwidth_write(self, data_to_write):
97         if self.server.bandwidth is None and self.server.delays['mid_write'] == 0:
98             self.wfile.write(data_to_write)
99         else:
100             BYTES_PER_WRITE = int(self.server.bandwidth/4) or 32768
101             outage_happened = False
102             num_bytes = len(data_to_write)
103             num_sent_bytes = 0
104             target_time = time.time()
105             while num_sent_bytes < num_bytes:
106                 if num_sent_bytes > self.server.bandwidth and not outage_happened:
107                     self.server._do_delay('mid_write')
108                     target_time += self.server.delays['mid_write']
109                     outage_happened = True
110                 num_write_bytes = min(BYTES_PER_WRITE,
111                     num_bytes - num_sent_bytes)
112                 self.wfile.write(data_to_write[
113                     num_sent_bytes:num_sent_bytes+num_write_bytes])
114                 num_sent_bytes += num_write_bytes
115                 if self.server.bandwidth is not None:
116                     target_time += num_write_bytes / self.server.bandwidth
117                     self.server._sleep_at_least(target_time - time.time())
118         return None
119
120     def rfile_bandwidth_read(self, bytes_to_read):
121         if self.server.bandwidth is None and self.server.delays['mid_read'] == 0:
122             return self.rfile.read(bytes_to_read)
123         else:
124             BYTES_PER_READ = int(self.server.bandwidth/4) or 32768
125             data = b''
126             outage_happened = False
127             bytes_read = 0
128             target_time = time.time()
129             while bytes_to_read > bytes_read:
130                 if bytes_read > self.server.bandwidth and not outage_happened:
131                     self.server._do_delay('mid_read')
132                     target_time += self.server.delays['mid_read']
133                     outage_happened = True
134                 next_bytes_to_read = min(BYTES_PER_READ,
135                     bytes_to_read - bytes_read)
136                 data += self.rfile.read(next_bytes_to_read)
137                 bytes_read += next_bytes_to_read
138                 if self.server.bandwidth is not None:
139                     target_time += next_bytes_to_read / self.server.bandwidth
140                     self.server._sleep_at_least(target_time - time.time())
141         return data
142
143     def finish(self, *args, **kwargs):
144         try:
145             return super(Handler, self).finish(*args, **kwargs)
146         except Exception as err:
147             if _debug:
148                 raise
149
150     def handle(self, *args, **kwargs):
151         try:
152             return super(Handler, self).handle(*args, **kwargs)
153         except:
154             if _debug:
155                 raise
156
157     def handle_one_request(self, *args, **kwargs):
158         self._sent_continue = False
159         self.server._do_delay('request')
160         return super(Handler, self).handle_one_request(*args, **kwargs)
161
162     def handle_expect_100(self):
163         self.server._do_delay('request_body')
164         self._sent_continue = True
165         return super(Handler, self).handle_expect_100()
166
167     def do_GET(self):
168         self.server._do_delay('response')
169         r = re.search(r'[0-9a-f]{32}', self.path)
170         if not r:
171             return self.send_response(422)
172         datahash = r.group(0)
173         if datahash not in self.server.store:
174             return self.send_response(404)
175         self.send_response(200)
176         self.send_header('Connection', 'close')
177         self.send_header('Content-type', 'application/octet-stream')
178         self.end_headers()
179         self.server._do_delay('response_body')
180         self.wfile_bandwidth_write(self.server.store[datahash])
181         self.server._do_delay('response_close')
182
183     def do_HEAD(self):
184         self.server._do_delay('response')
185         r = re.search(r'[0-9a-f]{32}', self.path)
186         if not r:
187             return self.send_response(422)
188         datahash = r.group(0)
189         if datahash not in self.server.store:
190             return self.send_response(404)
191         self.send_response(200)
192         self.send_header('Connection', 'close')
193         self.send_header('Content-type', 'application/octet-stream')
194         self.send_header('Content-length', str(len(self.server.store[datahash])))
195         self.end_headers()
196         self.server._do_delay('response_close')
197         self.close_connection = True
198
199     def do_PUT(self):
200         if not self._sent_continue and self.headers.get('expect') == '100-continue':
201             # The comments at https://bugs.python.org/issue1491
202             # implies that Python 2.7 BaseHTTPRequestHandler was
203             # patched to support 100 Continue, but reading the actual
204             # code that ships in Debian it clearly is not, so we need
205             # to send the response on the socket directly.
206             self.server._do_delay('request_body')
207             self.wfile.write("{} {} {}\r\n\r\n".format(
208                 self.protocol_version, 100, "Continue").encode())
209         data = self.rfile_bandwidth_read(
210             int(self.headers.get('content-length')))
211         datahash = hashlib.md5(data).hexdigest()
212         self.server.store[datahash] = data
213         resp = '{}+{}\n'.format(datahash, len(data)).encode()
214         self.server._do_delay('response')
215         self.send_response(200)
216         self.send_header('Connection', 'close')
217         self.send_header('Content-type', 'text/plain')
218         self.send_header('Content-length', len(resp))
219         self.end_headers()
220         self.server._do_delay('response_body')
221         self.wfile_bandwidth_write(resp)
222         self.server._do_delay('response_close')
223         self.close_connection = True
224
225     def log_request(self, *args, **kwargs):
226         if _debug:
227             super(Handler, self).log_request(*args, **kwargs)