11308: pep8
[arvados.git] / sdk / python / tests / keepstub.py
1 from __future__ import division
2 from future import standard_library
3 standard_library.install_aliases()
4 from builtins import str
5 import http.server
6 import hashlib
7 import os
8 import re
9 import socketserver
10 import sys
11 import time
12
13 class Server(socketserver.ThreadingMixIn, http.server.HTTPServer, object):
14
15     allow_reuse_address = 1
16
17     def __init__(self, *args, **kwargs):
18         self.store = {}
19         self.delays = {
20             # before reading request headers
21             'request': 0,
22             # before reading request body
23             'request_body': 0,
24             # before setting response status and headers
25             'response': 0,
26             # before sending response body
27             'response_body': 0,
28             # before returning from handler (thus setting response EOF)
29             'response_close': 0,
30             # after writing over 1s worth of data at self.bandwidth
31             'mid_write': 0,
32             # after reading over 1s worth of data at self.bandwidth
33             'mid_read': 0,
34         }
35         self.bandwidth = None
36         super(Server, self).__init__(*args, **kwargs)
37
38     def setdelays(self, **kwargs):
39         """In future requests, induce delays at the given checkpoints."""
40         for (k, v) in kwargs.items():
41             self.delays.get(k)  # NameError if unknown key
42             self.delays[k] = v
43
44     def setbandwidth(self, bandwidth):
45         """For future requests, set the maximum bandwidth (number of bytes per
46         second) to operate at. If setbandwidth is never called, function at
47         maximum bandwidth possible"""
48         self.bandwidth = float(bandwidth)
49
50     def _sleep_at_least(self, seconds):
51         """Sleep for given time, even if signals are received."""
52         wake = time.time() + seconds
53         todo = seconds
54         while todo > 0:
55             time.sleep(todo)
56             todo = wake - time.time()
57
58     def _do_delay(self, k):
59         self._sleep_at_least(self.delays[k])
60
61
62 class Handler(http.server.BaseHTTPRequestHandler, object):
63     def wfile_bandwidth_write(self, data_to_write):
64         if self.server.bandwidth is None and self.server.delays['mid_write'] == 0:
65             self.wfile.write(data_to_write)
66         else:
67             BYTES_PER_WRITE = int(self.server.bandwidth/4) or 32768
68             outage_happened = False
69             num_bytes = len(data_to_write)
70             num_sent_bytes = 0
71             target_time = time.time()
72             while num_sent_bytes < num_bytes:
73                 if num_sent_bytes > self.server.bandwidth and not outage_happened:
74                     self.server._do_delay('mid_write')
75                     target_time += self.delays['mid_write']
76                     outage_happened = True
77                 num_write_bytes = min(BYTES_PER_WRITE,
78                     num_bytes - num_sent_bytes)
79                 self.wfile.write(data_to_write[
80                     num_sent_bytes:num_sent_bytes+num_write_bytes])
81                 num_sent_bytes += num_write_bytes
82                 if self.server.bandwidth is not None:
83                     target_time += num_write_bytes / self.server.bandwidth
84                     self.server._sleep_at_least(target_time - time.time())
85         return None
86
87     def rfile_bandwidth_read(self, bytes_to_read):
88         if self.server.bandwidth is None and self.server.delays['mid_read'] == 0:
89             return self.rfile.read(bytes_to_read)
90         else:
91             BYTES_PER_READ = int(self.server.bandwidth/4) or 32768
92             data = b''
93             outage_happened = False
94             bytes_read = 0
95             target_time = time.time()
96             while bytes_to_read > bytes_read:
97                 if bytes_read > self.server.bandwidth and not outage_happened:
98                     self.server._do_delay('mid_read')
99                     target_time += self.delays['mid_read']
100                     outage_happened = True
101                 next_bytes_to_read = min(BYTES_PER_READ,
102                     bytes_to_read - bytes_read)
103                 data += self.rfile.read(next_bytes_to_read)
104                 bytes_read += next_bytes_to_read
105                 if self.server.bandwidth is not None:
106                     target_time += next_bytes_to_read / self.server.bandwidth
107                     self.server._sleep_at_least(target_time - time.time())
108         return data
109
110     def handle(self, *args, **kwargs):
111         self.server._do_delay('request')
112         return super(Handler, self).handle(*args, **kwargs)
113
114     def do_GET(self):
115         self.server._do_delay('response')
116         r = re.search(r'[0-9a-f]{32}', self.path)
117         if not r:
118             return self.send_response(422)
119         datahash = r.group(0)
120         if datahash not in self.server.store:
121             return self.send_response(404)
122         self.send_response(200)
123         self.send_header('Content-type', 'application/octet-stream')
124         self.end_headers()
125         self.server._do_delay('response_body')
126         self.wfile_bandwidth_write(self.server.store[datahash])
127         self.server._do_delay('response_close')
128
129     def do_HEAD(self):
130         self.server._do_delay('response')
131         r = re.search(r'[0-9a-f]{32}', self.path)
132         if not r:
133             return self.send_response(422)
134         datahash = r.group(0)
135         if datahash not in self.server.store:
136             return self.send_response(404)
137         self.send_response(200)
138         self.send_header('Content-type', 'application/octet-stream')
139         self.send_header('Content-length', str(len(self.server.store[datahash])))
140         self.end_headers()
141         self.server._do_delay('response_close')
142
143     def handle_expect_100(self):
144         self.server._do_delay('request_body')
145
146     def do_PUT(self):
147         if sys.version_info < (3, 0):
148             # The comments at https://bugs.python.org/issue1491
149             # implies that Python 2.7 BaseHTTPRequestHandler was
150             # patched to support 100 Continue, but reading the actual
151             # code that ships in Debian it clearly is not, so we need
152             # to send the response on the socket directly.
153             self.server._do_delay('request_body')
154             self.wfile.write("{} {} {}\r\n\r\n".format(
155                 self.protocol_version, 100, "Continue"))
156         data = self.rfile_bandwidth_read(
157             int(self.headers.get('content-length')))
158         datahash = hashlib.md5(data).hexdigest()
159         self.server.store[datahash] = data
160         self.server._do_delay('response')
161         self.send_response(200)
162         self.send_header('Content-type', 'text/plain')
163         self.end_headers()
164         self.server._do_delay('response_body')
165         self.wfile_bandwidth_write(datahash + '+' + str(len(data)))
166         self.server._do_delay('response_close')
167
168     def log_request(self, *args, **kwargs):
169         if os.environ.get('ARVADOS_DEBUG', None):
170             super(Handler, self).log_request(*args, **kwargs)
171
172     def finish(self, *args, **kwargs):
173         """Ignore exceptions, notably "Broken pipe" when client times out."""
174         try:
175             return super(Handler, self).finish(*args, **kwargs)
176         except:
177             pass
178
179     def handle_one_request(self, *args, **kwargs):
180         """Ignore exceptions, notably "Broken pipe" when client times out."""
181         try:
182             return super(Handler, self).handle_one_request(*args, **kwargs)
183         except:
184             pass