num_retries=self.num_retries)
for k,v in generatemapper.items():
- if k.startswith("_:"):
- if v.type == "Directory":
+ if v.type == "Directory" and v.resolved.startswith("_:"):
continue
- if v.type == "CreateFile":
- with final.open(v.target, "wb") as f:
- f.write(v.resolved.encode("utf-8"))
+ if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
+ with final.open(v.target, "wb") as f:
+ f.write(v.resolved.encode("utf-8"))
continue
- if not k.startswith("keep:"):
+ if not v.resolved.startswith("keep:"):
raise Exception("Output source is not in keep or a literal")
- sp = k.split("/")
+ sp = v.resolved.split("/")
srccollection = sp[0][5:]
try:
reader = self.collection_cache.get(srccollection)
logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
raise
except IOError as e:
- logger.warning("While preparing output collection: %s", e)
+ logger.error("While preparing output collection: %s", e)
+ raise
def rewrite(fileobj):
fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
class StagingPathMapper(PathMapper):
+ # Note that StagingPathMapper internally maps files from target to source.
+ # Specifically, the 'self._pathmap' dict keys are the target location and the
+ # values are 'MapperEnt' named tuples from which we use the 'resolved' attribute
+ # as the file identifier. This makes it possible to map an input file to multiple
+ # target directories. The exception is for file literals, which store the contents of
+ # the file in 'MapperEnt.resolved' and are therefore still mapped from source to target.
+
_follow_dirs = True
def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
loc = obj["location"]
tgt = os.path.join(stagedir, obj["basename"])
basetgt, baseext = os.path.splitext(tgt)
+
+ def targetExists():
+ return tgt in self.targets and ("contents" not in obj) and (self._pathmap[tgt].resolved != loc)
+ def literalTargetExists():
+ return tgt in self.targets and "contents" in obj
+
n = 1
- if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
+ if targetExists() or literalTargetExists():
while tgt in self.targets:
n += 1
tgt = "%s_%i%s" % (basetgt, n, baseext)
self.targets.add(tgt)
if obj["class"] == "Directory":
if obj.get("writable"):
- self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
+ self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableDirectory", staged)
else:
- self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
+ self._pathmap[tgt] = MapperEnt(loc, tgt, "Directory", staged)
if loc.startswith("_:") or self._follow_dirs:
self.visitlisting(obj.get("listing", []), tgt, basedir)
elif obj["class"] == "File":
- if loc in self._pathmap:
+ if tgt in self._pathmap:
return
if "contents" in obj and loc.startswith("_:"):
self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
else:
if copy or obj.get("writable"):
- self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
+ self._pathmap[tgt] = MapperEnt(loc, tgt, "WritableFile", staged)
else:
- self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
+ self._pathmap[tgt] = MapperEnt(loc, tgt, "File", staged)
self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
+ def mapper(self, src): # type: (Text) -> MapperEnt.
+ # Overridden to maintain the use case of mapping by source (identifier) to
+ # target regardless of how the map is structured interally.
+ def getMapperEnt(src):
+ for k,v in viewitems(self._pathmap):
+ if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
+ return v
+
+ if u"#" in src:
+ i = src.index(u"#")
+ v = getMapperEnt(src[i:])
+ return MapperEnt(v.resolved, v.target + src[i:], v.type, v.staged)
+ return getMapperEnt(src)
+
class VwdPathMapper(StagingPathMapper):
def setup(self, referenced_files, basedir):
self.api.links().create.assert_has_calls([mock.call(body={"head_uuid": final_uuid, "link_class": "tag", "name": "tag0"}), mock.call().execute(num_retries=num_retries)])
self.api.links().create.assert_has_calls([mock.call(body={"head_uuid": final_uuid, "link_class": "tag", "name": "tag1"}), mock.call().execute(num_retries=num_retries)])
self.api.links().create.assert_has_calls([mock.call(body={"head_uuid": final_uuid, "link_class": "tag", "name": "tag2"}), mock.call().execute(num_retries=num_retries)])
+
+ @mock.patch("arvados.collection.Collection")
+ @mock.patch("arvados.collection.CollectionReader")
+ def test_make_output_for_multiple_file_targets(self, reader, col):
+ keep_client = mock.MagicMock()
+ runner = arvados_cwl.executor.ArvCwlExecutor(self.api, keep_client=keep_client)
+ runner.project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+
+ final = mock.MagicMock()
+ col.return_value = final
+ readermock = mock.MagicMock()
+ reader.return_value = readermock
+
+ # This output describes a single file listed in 2 different directories
+ _, runner.final_output_collection = runner.make_output_collection("Test output", ["foo"], "", { 'out': [
+ {
+ 'basename': 'testdir1',
+ 'listing': [
+ {
+ 'basename': 'test.txt',
+ 'nameroot': 'test',
+ 'nameext': '.txt',
+ 'location': 'keep:99999999999999999999999999999991+99/test.txt',
+ 'class': 'File',
+ 'size': 16
+ }
+ ],
+ 'location': '_:99999999999999999999999999999992+99',
+ 'class': 'Directory'
+ },
+ {
+ 'basename': 'testdir2',
+ 'listing': [
+ {
+ 'basename': 'test.txt',
+ 'nameroot': 'test',
+ 'nameext': '.txt',
+ 'location': 'keep:99999999999999999999999999999991+99/test.txt',
+ 'class':
+ 'File',
+ 'size': 16
+ }
+ ],
+ 'location': '_:99999999999999999999999999999993+99',
+ 'class': 'Directory'
+ }]})
+
+ # Check that copy is called on the collection for both locations
+ final.copy.assert_any_call("test.txt", "testdir1/test.txt", source_collection=mock.ANY, overwrite=mock.ANY)
+ final.copy.assert_any_call("test.txt", "testdir2/test.txt", source_collection=mock.ANY, overwrite=mock.ANY)
+
+ @mock.patch("arvados.collection.Collection")
+ @mock.patch("arvados.collection.CollectionReader")
+ def test_make_output_for_literal_name_conflicts(self, reader, col):
+ keep_client = mock.MagicMock()
+ runner = arvados_cwl.executor.ArvCwlExecutor(self.api, keep_client=keep_client)
+ runner.project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+
+ final = mock.MagicMock()
+ col.return_value = final
+ readermock = mock.MagicMock()
+ reader.return_value = readermock
+
+ # This output describes two literals with the same basename
+ _, runner.final_output_collection = runner.make_output_collection("Test output", ["foo"], "", [
+ {
+ 'lit':
+ {
+ 'basename': 'a_file',
+ 'nameext': '',
+ 'nameroot': 'a_file',
+ 'location': '_:f168fc0c-4291-40aa-a04e-366d57390560',
+ 'class': 'File',
+ 'contents': 'Hello file literal.'
+ }
+ },
+ {
+ 'lit':
+ {
+ 'basename': 'a_file',
+ 'nameext': '',
+ 'nameroot': 'a_file',
+ 'location': '_:1728da8f-c64e-4a3e-b2e2-1ee356be7bc8',
+ 'class': 'File',
+ 'contents': 'Hello file literal.'
+ }
+ }])
+
+ # Check that the file name conflict is resolved and open is called for both
+ final.open.assert_any_call("a_file", "wb")
+ final.open.assert_any_call("a_file_2", "wb")
\ No newline at end of file