X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/714c555bda26a6a27fad7caef382d1d6705ad215..e9c78ef7855e7ae263fe461e069c89ff7fc0b798:/sdk/python/tests/keepstub.py diff --git a/sdk/python/tests/keepstub.py b/sdk/python/tests/keepstub.py index ef724ed5c5..f074f8d6cf 100644 --- a/sdk/python/tests/keepstub.py +++ b/sdk/python/tests/keepstub.py @@ -22,7 +22,12 @@ class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object): 'response_body': 0, # before returning from handler (thus setting response EOF) 'response_close': 0, + # after writing over 1s worth of data at self.bandwidth + 'mid_write': 0, + # after reading over 1s worth of data at self.bandwidth + 'mid_read': 0, } + self.bandwidth = None super(Server, self).__init__(*args, **kwargs) def setdelays(self, **kwargs): @@ -31,6 +36,12 @@ class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object): self.delays.get(k) # NameError if unknown key self.delays[k] = v + def setbandwidth(self, bandwidth): + """For future requests, set the maximum bandwidth (number of bytes per + second) to operate at. If setbandwidth is never called, function at + maximum bandwidth possible""" + self.bandwidth = float(bandwidth) + def _sleep_at_least(self, seconds): """Sleep for given time, even if signals are received.""" wake = time.time() + seconds @@ -44,6 +55,53 @@ class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object): class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object): + def wfile_bandwidth_write(self, data_to_write): + if self.server.bandwidth == None and self.server.delays['mid_write'] == 0: + self.wfile.write(data_to_write) + else: + BYTES_PER_WRITE = int(self.server.bandwidth/4.0) or 32768 + outage_happened = False + num_bytes = len(data_to_write) + num_sent_bytes = 0 + target_time = time.time() + while num_sent_bytes < num_bytes: + if num_sent_bytes > self.server.bandwidth and not outage_happened: + self.server._do_delay('mid_write') + target_time += self.delays['mid_write'] + outage_happened = True + num_write_bytes = min(BYTES_PER_WRITE, + num_bytes - num_sent_bytes) + self.wfile.write(data_to_write[ + num_sent_bytes:num_sent_bytes+num_write_bytes]) + num_sent_bytes += num_write_bytes + if self.server.bandwidth is not None: + target_time += num_write_bytes / self.server.bandwidth + self.server._sleep_at_least(target_time - time.time()) + return None + + def rfile_bandwidth_read(self, bytes_to_read): + if self.server.bandwidth == None and self.server.delays['mid_read'] == 0: + return self.rfile.read(bytes_to_read) + else: + BYTES_PER_READ = int(self.server.bandwidth/4.0) or 32768 + data = '' + outage_happened = False + bytes_read = 0 + target_time = time.time() + while bytes_to_read > bytes_read: + if bytes_read > self.server.bandwidth and not outage_happened: + self.server._do_delay('mid_read') + target_time += self.delays['mid_read'] + outage_happened = True + next_bytes_to_read = min(BYTES_PER_READ, + bytes_to_read - bytes_read) + data += self.rfile.read(next_bytes_to_read) + bytes_read += next_bytes_to_read + if self.server.bandwidth is not None: + target_time += next_bytes_to_read / self.server.bandwidth + self.server._sleep_at_least(target_time - time.time()) + return data + def handle(self, *args, **kwargs): self.server._do_delay('request') return super(Handler, self).handle(*args, **kwargs) @@ -60,21 +118,18 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object): self.send_header('Content-type', 'application/octet-stream') self.end_headers() self.server._do_delay('response_body') - self.wfile.write(self.server.store[datahash]) + self.wfile_bandwidth_write(self.server.store[datahash]) self.server._do_delay('response_close') def do_PUT(self): self.server._do_delay('request_body') - # The comments at https://bugs.python.org/issue1491 implies that Python # 2.7 BaseHTTPRequestHandler was patched to support 100 Continue, but # reading the actual code that ships in Debian it clearly is not, so we # need to send the response on the socket directly. - - self.wfile.write("%s %d %s\r\n\r\n" % + self.wfile_bandwidth_write("%s %d %s\r\n\r\n" % (self.protocol_version, 100, "Continue")) - - data = self.rfile.read(int(self.headers.getheader('content-length'))) + data = self.rfile_bandwidth_read(int(self.headers.getheader('content-length'))) datahash = hashlib.md5(data).hexdigest() self.server.store[datahash] = data self.server._do_delay('response') @@ -82,7 +137,7 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object): self.send_header('Content-type', 'text/plain') self.end_headers() self.server._do_delay('response_body') - self.wfile.write(datahash + '+' + str(len(data))) + self.wfile_bandwidth_write(datahash + '+' + str(len(data))) self.server._do_delay('response_close') def log_request(self, *args, **kwargs):