+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
from __future__ import division
from future import standard_library
standard_library.install_aliases()
import hashlib
import os
import re
+import socket
import socketserver
import sys
+import threading
import time
+from . import arvados_testutil as tutil
+
_debug = os.environ.get('ARVADOS_DEBUG', None)
+class StubKeepServers(tutil.ApiClientMock):
+
+ def setUp(self):
+ super(StubKeepServers, self).setUp()
+ sock = socket.socket()
+ sock.bind(('0.0.0.0', 0))
+ self.port = sock.getsockname()[1]
+ sock.close()
+ self.server = Server(('0.0.0.0', self.port), Handler)
+ self.thread = threading.Thread(target=self.server.serve_forever)
+ self.thread.daemon = True # Exit thread if main proc exits
+ self.thread.start()
+ self.api_client = self.mock_keep_services(
+ count=1,
+ service_host='localhost',
+ service_port=self.port,
+ )
+
+ def tearDown(self):
+ self.server.shutdown()
+ super(StubKeepServers, self).tearDown()
+
+
class Server(socketserver.ThreadingMixIn, http.server.HTTPServer, object):
allow_reuse_address = 1