- print("Upload local files: \"%s\"" % '" "'.join([c.fn for c in files]))
-
- if args.dry_run:
- print("cd %s" % pathprefix)
- pdh = "$(input)"
- else:
- files = sorted(files, key=lambda x: x.fn)
- collection = arvados.CollectionWriter(api, num_retries=args.retries)
- stream = None
- for f in files:
- sp = os.path.split(f.fn)
- if sp[0] != stream:
- stream = sp[0]
- collection.start_new_stream(stream)
- collection.write_file(f.fn, sp[1])
- item = api.collections().create(body={"owner_uuid": project, "manifest_text": collection.manifest_text()}).execute()
- pdh = item["portable_data_hash"]
- print "Uploaded to %s" % item["uuid"]
-
- for c in files:
- c.fn = "$(file %s/%s)" % (pdh, c.fn)
-
- os.chdir(orgdir)
-
- for i in xrange(1, len(slots)):
- slots[i] = [("%s%s" % (c.prefix, c.fn)) if isinstance(c, ArvFile) else c for c in slots[i]]
-
- component = {
- "script": "run-command",
- "script_version": args.script_version,
- "repository": args.repository,
- "script_parameters": {
- },
- "runtime_constraints": {
- "docker_image": args.docker_image
- }
- }
-
- task_foreach = []
- group_parser = argparse.ArgumentParser()
- group_parser.add_argument('-b', '--batch-size', type=int)
- group_parser.add_argument('args', nargs=argparse.REMAINDER)
-
- for s in xrange(2, len(slots)):
- for i in xrange(0, len(slots[s])):
- if slots[s][i] == '--':
- inp = "input%i" % (s-2)
- groupargs = group_parser.parse_args(slots[2][i+1:])
- if groupargs.batch_size:
- component["script_parameters"][inp] = {"value": {"batch":groupargs.args, "size":groupargs.batch_size}}
- slots[s] = slots[s][0:i] + [{"foreach": inp, "command": "$(%s)" % inp}]
+ if dry_run:
+ logger.info("$(input) is %s", pathprefix.rstrip('/'))
+ pdh = "$(input)"
+ else:
+ files = sorted(files, key=lambda x: x.fn)
+ if collection is None:
+ collection = arvados.collection.Collection(api_client=api, num_retries=num_retries)
+ prev = ""
+ for f in files:
+ localpath = os.path.join(pathprefix, f.fn)
+ if prev and localpath.startswith(prev+"/"):
+ # If this path is inside an already uploaded subdirectory,
+ # don't redundantly re-upload it.
+ # e.g. we uploaded /tmp/foo and the next file is /tmp/foo/bar
+ # skip it because it starts with "/tmp/foo/"
+ continue
+ prev = localpath
+ if os.path.isfile(localpath):
+ write_file(collection, pathprefix, f.fn, not packed)
+ elif os.path.isdir(localpath):
+ for root, dirs, iterfiles in os.walk(localpath):
+ root = root[len(pathprefix):]
+ for src in iterfiles:
+ write_file(collection, pathprefix, os.path.join(root, src), not packed)
+
+ pdh = None
+ if len(collection) > 0:
+ # non-empty collection
+ filters = [["portable_data_hash", "=", collection.portable_data_hash()]]
+ name_pdh = "%s (%s)" % (name, collection.portable_data_hash())
+ if name:
+ filters.append(["name", "=", name_pdh])
+ if project:
+ filters.append(["owner_uuid", "=", project])
+
+ # do the list / create in a loop with up to 2 tries as we are using `ensure_unique_name=False`
+ # and there is a potential race with other workflows that may have created the collection
+ # between when we list it and find it does not exist and when we attempt to create it.
+ tries = 2
+ while pdh is None and tries > 0:
+ exists = api.collections().list(filters=filters, limit=1).execute(num_retries=num_retries)
+
+ if exists["items"]:
+ item = exists["items"][0]
+ pdh = item["portable_data_hash"]
+ logger.info("Using collection %s (%s)", pdh, item["uuid"])