gobins = [os.path.join(path, 'bin') for path in gopaths]
os.environ['PATH'] = ':'.join(gobins) + ':' + os.environ['PATH']
+if os.path.isdir('tests'):
+ TEST_TMPDIR = 'tests/tmp'
+else:
+ TEST_TMPDIR = 'tmp'
+
def find_server_pid(PID_PATH, wait=10):
now = time.time()
timeout = now + wait
keep_cmd = ["keepstore",
"-volumes={}".format(keep0),
"-listen=:{}".format(25107+n),
- "-pid={}".format("tests/tmp/keep{}.pid".format(n))]
+ "-pid={}".format("{}/keep{}.pid".format(TEST_TMPDIR, n))]
for arg, val in keep_args.iteritems():
keep_cmd.append("{}={}".format(arg, val))
kp0 = subprocess.Popen(keep_cmd)
- with open("tests/tmp/keep{}.pid".format(n), 'w') as f:
+ with open("{}/keep{}.pid".format(TEST_TMPDIR, n), 'w') as f:
f.write(str(kp0.pid))
- with open("tests/tmp/keep{}.volume".format(n), 'w') as f:
+ with open("{}/keep{}.volume".format(TEST_TMPDIR, n), 'w') as f:
f.write(keep0)
def run_keep(blob_signing_key=None, enforce_permissions=False):
stop_keep()
- if not os.path.exists("tests/tmp"):
- os.mkdir("tests/tmp")
+ if not os.path.exists(TEST_TMPDIR):
+ os.mkdir(TEST_TMPDIR)
keep_args = {}
if blob_signing_key:
- with open("tests/tmp/keep.blob_signing_key", "w") as f:
+ with open(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"), "w") as f:
+ keep_args['--permission-key-file'] = f.name
f.write(blob_signing_key)
- keep_args['--permission-key-file'] = 'tests/tmp/keep.blob_signing_key'
if enforce_permissions:
keep_args['--enforce-permissions'] = 'true'
api.keep_disks().create(body={"keep_disk": {"keep_service_uuid": s2["uuid"] } }).execute()
def _stop_keep(n):
- kill_server_pid("tests/tmp/keep{}.pid".format(n), 0)
- if os.path.exists("tests/tmp/keep{}.volume".format(n)):
- with open("tests/tmp/keep{}.volume".format(n), 'r') as r:
+ kill_server_pid("{}/keep{}.pid".format(TEST_TMPDIR, n), 0)
+ if os.path.exists("{}/keep{}.volume".format(TEST_TMPDIR, n)):
+ with open("{}/keep{}.volume".format(TEST_TMPDIR, n), 'r') as r:
shutil.rmtree(r.read(), True)
- os.unlink("tests/tmp/keep{}.volume".format(n))
- if os.path.exists("tests/tmp/keep.blob_signing_key"):
- os.remove("tests/tmp/keep.blob_signing_key")
+ os.unlink("{}/keep{}.volume".format(TEST_TMPDIR, n))
+ if os.path.exists(os.path.join(TEST_TMPDIR, "keep.blob_signing_key")):
+ os.remove(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"))
def stop_keep():
_stop_keep(0)
def run_keep_proxy(auth):
stop_keep_proxy()
- if not os.path.exists("tests/tmp"):
- os.mkdir("tests/tmp")
+ if not os.path.exists(TEST_TMPDIR):
+ os.mkdir(TEST_TMPDIR)
os.environ["ARVADOS_API_HOST"] = "127.0.0.1:3001"
os.environ["ARVADOS_API_HOST_INSECURE"] = "true"
os.environ["ARVADOS_API_TOKEN"] = fixture("api_client_authorizations")[auth]["api_token"]
kp0 = subprocess.Popen(["keepproxy",
- "-pid=tests/tmp/keepproxy.pid",
+ "-pid={}/keepproxy.pid".format(TEST_TMPDIR),
"-listen=:{}".format(25101)])
authorize_with("admin")
os.environ["ARVADOS_KEEP_PROXY"] = "http://localhost:25101"
def stop_keep_proxy():
- kill_server_pid("tests/tmp/keepproxy.pid", 0)
+ kill_server_pid(os.path.join(TEST_TMPDIR, "keepproxy.pid"), 0)
def fixture(fix):
'''load a fixture yaml file'''