- for s in xrange(2, len(slots)):
- for i in xrange(0, len(slots[s])):
- if slots[s][i] == '--':
- inp = "input%i" % (s-2)
- groupargs = group_parser.parse_args(slots[2][i+1:])
- if groupargs.batch_size:
- component["script_parameters"][inp] = {"value": {"batch":groupargs.args, "size":groupargs.batch_size}}
- slots[s] = slots[s][0:i] + [{"foreach": inp, "command": "$(%s)" % inp}]
+ if dry_run:
+ logger.info("$(input) is %s", pathprefix.rstrip('/'))
+ pdh = "$(input)"
+ else:
+ files = sorted(files, key=lambda x: x.fn)
+ if collection is None:
+ collection = arvados.collection.Collection(api_client=api, num_retries=num_retries)
+ prev = ""
+ for f in files:
+ localpath = os.path.join(pathprefix, f.fn)
+ if prev and localpath.startswith(prev+"/"):
+ # If this path is inside an already uploaded subdirectory,
+ # don't redundantly re-upload it.
+ # e.g. we uploaded /tmp/foo and the next file is /tmp/foo/bar
+ # skip it because it starts with "/tmp/foo/"
+ continue
+ prev = localpath
+ if os.path.isfile(localpath):
+ write_file(collection, pathprefix, f.fn, not packed)
+ elif os.path.isdir(localpath):
+ for root, dirs, iterfiles in os.walk(localpath):
+ root = root[len(pathprefix):]
+ for src in iterfiles:
+ write_file(collection, pathprefix, os.path.join(root, src), not packed)
+
+ pdh = None
+ if len(collection) > 0:
+ # non-empty collection
+ filters = [["portable_data_hash", "=", collection.portable_data_hash()]]
+ name_pdh = "%s (%s)" % (name, collection.portable_data_hash())
+ if name:
+ filters.append(["name", "=", name_pdh])
+ if project:
+ filters.append(["owner_uuid", "=", project])
+
+ # do the list / create in a loop with up to 2 tries as we are using `ensure_unique_name=False`
+ # and there is a potential race with other workflows that may have created the collection
+ # between when we list it and find it does not exist and when we attempt to create it.
+ tries = 2
+ while pdh is None and tries > 0:
+ exists = api.collections().list(filters=filters, limit=1).execute(num_retries=num_retries)
+
+ if exists["items"]:
+ item = exists["items"][0]
+ pdh = item["portable_data_hash"]
+ logger.info("Using collection %s (%s)", pdh, item["uuid"])