Merge branch 'master' into 7490-datamanager-dont-die-return-error
[arvados.git] / sdk / python / tests / keepstub.py
1 import BaseHTTPServer
2 import hashlib
3 import os
4 import re
5 import SocketServer
6 import time
7
8 class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object):
9
10     allow_reuse_address = 1
11
12     def __init__(self, *args, **kwargs):
13         self.store = {}
14         self.delays = {
15             # before reading request headers
16             'request': 0,
17             # before reading request body
18             'request_body': 0,
19             # before setting response status and headers
20             'response': 0,
21             # before sending response body
22             'response_body': 0,
23             # before returning from handler (thus setting response EOF)
24             'response_close': 0,
25             # after writing over 1s worth of data at self.bandwidth
26             'mid_write': 0,
27             # after reading over 1s worth of data at self.bandwidth
28             'mid_read': 0,
29         }
30         self.bandwidth = None
31         super(Server, self).__init__(*args, **kwargs)
32
33     def setdelays(self, **kwargs):
34         """In future requests, induce delays at the given checkpoints."""
35         for (k, v) in kwargs.iteritems():
36             self.delays.get(k) # NameError if unknown key
37             self.delays[k] = v
38
39     def setbandwidth(self, bandwidth):
40         """For future requests, set the maximum bandwidth (number of bytes per
41         second) to operate at. If setbandwidth is never called, function at
42         maximum bandwidth possible"""
43         self.bandwidth = float(bandwidth)
44
45     def _sleep_at_least(self, seconds):
46         """Sleep for given time, even if signals are received."""
47         wake = time.time() + seconds
48         todo = seconds
49         while todo > 0:
50             time.sleep(todo)
51             todo = wake - time.time()
52
53     def _do_delay(self, k):
54         self._sleep_at_least(self.delays[k])
55
56
57 class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
58     def wfile_bandwidth_write(self, data_to_write):
59         if self.server.bandwidth == None and self.server.delays['mid_write'] == 0:
60             self.wfile.write(data_to_write)
61         else:
62             BYTES_PER_WRITE = int(self.server.bandwidth/4.0) or 32768
63             outage_happened = False
64             num_bytes = len(data_to_write)
65             num_sent_bytes = 0
66             target_time = time.time()
67             while num_sent_bytes < num_bytes:
68                 if num_sent_bytes > self.server.bandwidth and not outage_happened:
69                     self.server._do_delay('mid_write')
70                     target_time += self.delays['mid_write']
71                     outage_happened = True
72                 num_write_bytes = min(BYTES_PER_WRITE,
73                     num_bytes - num_sent_bytes)
74                 self.wfile.write(data_to_write[
75                     num_sent_bytes:num_sent_bytes+num_write_bytes])
76                 num_sent_bytes += num_write_bytes
77                 if self.server.bandwidth is not None:
78                     target_time += num_write_bytes / self.server.bandwidth
79                     self.server._sleep_at_least(target_time - time.time())
80         return None
81
82     def rfile_bandwidth_read(self, bytes_to_read):
83         if self.server.bandwidth == None and self.server.delays['mid_read'] == 0:
84             return self.rfile.read(bytes_to_read)
85         else:
86             BYTES_PER_READ = int(self.server.bandwidth/4.0) or 32768
87             data = ''
88             outage_happened = False
89             bytes_read = 0
90             target_time = time.time()
91             while bytes_to_read > bytes_read:
92                 if bytes_read > self.server.bandwidth and not outage_happened:
93                     self.server._do_delay('mid_read')
94                     target_time += self.delays['mid_read']
95                     outage_happened = True
96                 next_bytes_to_read = min(BYTES_PER_READ,
97                     bytes_to_read - bytes_read)
98                 data += self.rfile.read(next_bytes_to_read)
99                 bytes_read += next_bytes_to_read
100                 if self.server.bandwidth is not None:
101                     target_time += next_bytes_to_read / self.server.bandwidth
102                     self.server._sleep_at_least(target_time - time.time())
103         return data
104
105     def handle(self, *args, **kwargs):
106         self.server._do_delay('request')
107         return super(Handler, self).handle(*args, **kwargs)
108
109     def do_GET(self):
110         self.server._do_delay('response')
111         r = re.search(r'[0-9a-f]{32}', self.path)
112         if not r:
113             return self.send_response(422)
114         datahash = r.group(0)
115         if datahash not in self.server.store:
116             return self.send_response(404)
117         self.send_response(200)
118         self.send_header('Content-type', 'application/octet-stream')
119         self.end_headers()
120         self.server._do_delay('response_body')
121         self.wfile_bandwidth_write(self.server.store[datahash])
122         self.server._do_delay('response_close')
123
124     def do_PUT(self):
125         self.server._do_delay('request_body')
126         # The comments at https://bugs.python.org/issue1491 implies that Python
127         # 2.7 BaseHTTPRequestHandler was patched to support 100 Continue, but
128         # reading the actual code that ships in Debian it clearly is not, so we
129         # need to send the response on the socket directly.
130         self.wfile_bandwidth_write("%s %d %s\r\n\r\n" %
131                          (self.protocol_version, 100, "Continue"))
132         data = self.rfile_bandwidth_read(int(self.headers.getheader('content-length')))
133         datahash = hashlib.md5(data).hexdigest()
134         self.server.store[datahash] = data
135         self.server._do_delay('response')
136         self.send_response(200)
137         self.send_header('Content-type', 'text/plain')
138         self.end_headers()
139         self.server._do_delay('response_body')
140         self.wfile_bandwidth_write(datahash + '+' + str(len(data)))
141         self.server._do_delay('response_close')
142
143     def log_request(self, *args, **kwargs):
144         if os.environ.get('ARVADOS_DEBUG', None):
145             super(Handler, self).log_request(*args, **kwargs)
146
147     def finish(self, *args, **kwargs):
148         """Ignore exceptions, notably "Broken pipe" when client times out."""
149         try:
150             return super(Handler, self).finish(*args, **kwargs)
151         except:
152             pass
153
154     def handle_one_request(self, *args, **kwargs):
155         """Ignore exceptions, notably "Broken pipe" when client times out."""
156         try:
157             return super(Handler, self).handle_one_request(*args, **kwargs)
158         except:
159             pass