- sp = os.path.split(f.fn)
- if sp[0] != stream:
- stream = sp[0]
- collection.start_new_stream(stream)
- collection.write_file(f.fn, sp[1])
-
- exists = api.collections().list(filters=[["owner_uuid", "=", project],
- ["portable_data_hash", "=", collection.portable_data_hash()],
- ["name", "=", name]]).execute(num_retries=num_retries)
- if exists["items"]:
- item = exists["items"][0]
- logger.info("Using collection %s", item["uuid"])
+ localpath = os.path.join(pathprefix, f.fn)
+ if prev and localpath.startswith(prev+"/"):
+ # If this path is inside an already uploaded subdirectory,
+ # don't redundantly re-upload it.
+ # e.g. we uploaded /tmp/foo and the next file is /tmp/foo/bar
+ # skip it because it starts with "/tmp/foo/"
+ continue
+ prev = localpath
+ if os.path.isfile(localpath):
+ write_file(collection, pathprefix, f.fn, not packed)
+ elif os.path.isdir(localpath):
+ for root, dirs, iterfiles in os.walk(localpath):
+ root = root[len(pathprefix):]
+ for src in iterfiles:
+ write_file(collection, pathprefix, os.path.join(root, src), not packed)
+
+ pdh = None
+ if len(collection) > 0:
+ # non-empty collection
+ filters = [["portable_data_hash", "=", collection.portable_data_hash()]]
+ name_pdh = "%s (%s)" % (name, collection.portable_data_hash())
+ if name:
+ filters.append(["name", "=", name_pdh])
+ if project:
+ filters.append(["owner_uuid", "=", project])
+
+ # do the list / create in a loop with up to 2 tries as we are using `ensure_unique_name=False`
+ # and there is a potential race with other workflows that may have created the collection
+ # between when we list it and find it does not exist and when we attempt to create it.
+ tries = 2
+ while pdh is None and tries > 0:
+ exists = api.collections().list(filters=filters, limit=1).execute(num_retries=num_retries)
+
+ if exists["items"]:
+ item = exists["items"][0]
+ pdh = item["portable_data_hash"]
+ logger.info("Using collection %s (%s)", pdh, item["uuid"])
+ else:
+ try:
+ collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=False)
+ pdh = collection.portable_data_hash()
+ logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
+ except arvados.errors.ApiError as ae:
+ tries -= 1
+ if pdh is None:
+ # Something weird going on here, probably a collection
+ # with a conflicting name but wrong PDH. We won't
+ # able to reuse it but we still need to save our
+ # collection, so so save it with unique name.
+ logger.info("Name conflict on '%s', existing collection has an unexpected portable data hash", name_pdh)
+ collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=True)
+ pdh = collection.portable_data_hash()
+ logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())