import time
import subprocess
import logging
+import sys
import arvados.commands._util as arv_cmd
+from arvados._version import __version__
+
logger = logging.getLogger('arvados.arv-run')
logger.setLevel(logging.INFO)
arvrun_parser = argparse.ArgumentParser(parents=[arv_cmd.retry_opt])
-arvrun_parser.add_argument('--dry-run', action="store_true", help="Print out the pipeline that would be submitted and exit")
-arvrun_parser.add_argument('--local', action="store_true", help="Run locally using arv-run-pipeline-instance")
-arvrun_parser.add_argument('--docker-image', type=str, help="Docker image to use, otherwise use instance default.")
-arvrun_parser.add_argument('--ignore-rcode', action="store_true", help="Commands that return non-zero return codes should not be considered failed.")
-arvrun_parser.add_argument('--no-reuse', action="store_true", help="Do not reuse past jobs.")
-arvrun_parser.add_argument('--no-wait', action="store_true", help="Do not wait and display logs after submitting command, just exit.")
-arvrun_parser.add_argument('--project-uuid', type=str, help="Parent project of the pipeline")
-arvrun_parser.add_argument('--git-dir', type=str, default="", help="Git repository passed to arv-crunch-job when using --local")
-arvrun_parser.add_argument('--repository', type=str, default="arvados", help="repository field of component, default 'arvados'")
-arvrun_parser.add_argument('--script-version', type=str, default="master", help="script_version field of component, default 'master'")
+arvrun_parser.add_argument('--dry-run', action="store_true",
+ help="Print out the pipeline that would be submitted and exit")
+arvrun_parser.add_argument('--local', action="store_true",
+ help="Run locally using arv-run-pipeline-instance")
+arvrun_parser.add_argument('--docker-image', type=str,
+ help="Docker image to use, otherwise use instance default.")
+arvrun_parser.add_argument('--ignore-rcode', action="store_true",
+ help="Commands that return non-zero return codes should not be considered failed.")
+arvrun_parser.add_argument('--no-reuse', action="store_true",
+ help="Do not reuse past jobs.")
+arvrun_parser.add_argument('--no-wait', action="store_true",
+ help="Do not wait and display logs after submitting command, just exit.")
+arvrun_parser.add_argument('--project-uuid', type=str,
+ help="Parent project of the pipeline")
+arvrun_parser.add_argument('--git-dir', type=str, default="",
+ help="Git repository passed to arv-crunch-job when using --local")
+arvrun_parser.add_argument('--repository', type=str, default="arvados",
+ help="repository field of component, default 'arvados'")
+arvrun_parser.add_argument('--script-version', type=str, default="master",
+ help="script_version field of component, default 'master'")
+arvrun_parser.add_argument('--version', action='version',
+ version="%s %s" % (sys.argv[0], __version__),
+ help='Print version and exit.')
arvrun_parser.add_argument('args', nargs=argparse.REMAINDER)
class ArvFile(object):
self.prefix = prefix
self.fn = fn
+ def __hash__(self):
+ return (self.prefix+self.fn).__hash__()
+
+ def __eq__(self, other):
+ return (self.prefix == other.prefix) and (self.fn == other.fn)
+
class UploadFile(ArvFile):
pass
def uploadfiles(files, api, dry_run=False, num_retries=0, project=None, fnPattern="$(file %s/%s)", name=None):
# Find the smallest path prefix that includes all the files that need to be uploaded.
# This starts at the root and iteratively removes common parent directory prefixes
- # until all file pathes no longer have a common parent.
+ # until all file paths no longer have a common parent.
n = True
pathprefix = "/"
while n:
stream = sp[0]
collection.start_new_stream(stream)
collection.write_file(f.fn, sp[1])
- body = {"owner_uuid": project, "manifest_text": collection.manifest_text()}
- if name is not None:
- body["name"] = name
- item = api.collections().create(body=body, ensure_unique_name=True).execute()
+
+ exists = api.collections().list(filters=[["owner_uuid", "=", project],
+ ["portable_data_hash", "=", collection.portable_data_hash()],
+ ["name", "=", name]]).execute(num_retries=num_retries)
+ if exists["items"]:
+ item = exists["items"][0]
+ logger.info("Using collection %s", item["uuid"])
+ else:
+ body = {"owner_uuid": project, "manifest_text": collection.manifest_text()}
+ if name is not None:
+ body["name"] = name
+ item = api.collections().create(body=body, ensure_unique_name=True).execute()
+ logger.info("Uploaded to %s", item["uuid"])
+
pdh = item["portable_data_hash"]
- logger.info("Uploaded to %s", item["uuid"])
for c in files:
+ c.keepref = "%s/%s" % (pdh, c.fn)
c.fn = fnPattern % (pdh, c.fn)
os.chdir(orgdir)