"""
- def __init__(self, api_client, work_api=None, keep_client=None, output_name=None):
+ def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None):
self.api = api_client
self.processes = {}
self.lock = threading.Lock()
self.pipeline = None
self.final_output_collection = None
self.output_name = output_name
+ self.output_tags = output_tags
self.project_uuid = None
if keep_client is not None:
for v in obj:
self.check_writable(v)
- def make_output_collection(self, name, outputObj):
+ def make_output_collection(self, name, tagsString, outputObj):
outputObj = copy.deepcopy(outputObj)
files = []
srccollections = {}
for k,v in generatemapper.items():
+ if k.startswith("_:"):
+ if v.type == "Directory":
+ continue
+ if v.type == "CreateFile":
+ with final.open(v.target, "wb") as f:
+ f.write(v.resolved.encode("utf-8"))
+ continue
+
+ if not k.startswith("keep:"):
+ raise Exception("Output source is not in keep or a literal")
sp = k.split("/")
srccollection = sp[0][5:]
if srccollection not in srccollections:
- srccollections[srccollection] = arvados.collection.CollectionReader(
- srccollection,
- api_client=self.api,
- keep_client=self.keep_client,
- num_retries=self.num_retries)
+ try:
+ srccollections[srccollection] = arvados.collection.CollectionReader(
+ srccollection,
+ api_client=self.api,
+ keep_client=self.keep_client,
+ num_retries=self.num_retries)
+ except arvados.errors.ArgumentError as e:
+ logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
+ raise
reader = srccollections[srccollection]
try:
srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
def rewrite(fileobj):
fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
- for k in ("basename", "size", "listing"):
+ for k in ("basename", "listing", "contents"):
if k in fileobj:
del fileobj[k]
final.api_response()["name"],
final.manifest_locator())
- self.final_output_collection = final
+ final_uuid = final.manifest_locator()
+ tags = tagsString.split(',')
+ for tag in tags:
+ self.api.links().create(body={
+ "head_uuid": final_uuid, "link_class": "tag", "name": tag
+ }).execute(num_retries=self.num_retries)
+
+ def finalcollection(fileobj):
+ fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
+
+ adjustDirObjs(outputObj, finalcollection)
+ adjustFileObjs(outputObj, finalcollection)
+
+ return (outputObj, final)
def set_crunch_output(self):
if self.work_api == "containers":
self.output_callback,
**kwargs).next()
else:
- runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
+ runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, self.output_tags)
else:
- runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
+ runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, self.output_tags)
if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
# Create pipeline for local run
else:
if self.output_name is None:
self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
- self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.final_output)
+ if self.output_tags is None:
+ self.output_tags = ""
- self.make_output_collection(self.output_name, self.output_tags, self.final_output)
++ self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
self.set_crunch_output()
if self.final_status != "success":
parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
+ parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
parser.add_argument("--ignore-docker-for-reuse", action="store_true",
help="Ignore Docker image version when deciding whether to reuse past jobs.",
default=False)
try:
if api_client is None:
api_client=arvados.api('v1', model=OrderedJsonModel())
- runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name)
+ runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name, output_tags=arvargs.output_tags)
except Exception as e:
logger.error(e)
return 1
import cwltool.draft2tool
from cwltool.draft2tool import CommandLineTool
import cwltool.workflow
- from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
+ from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
from cwltool.load_tool import fetch_document
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
+ from cwltool.utils import aslist
+ from cwltool.builder import substitute
import arvados.collection
import ruamel.yaml as yaml
if isinstance(tool, CommandLineTool):
(docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
if docker_req:
+ if docker_req.get("dockerOutputDirectory"):
+ # TODO: can be supported by containers API, but not jobs API.
+ raise UnsupportedRequirement("Option 'dockerOutputDirectory' of DockerRequirement not supported.")
arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
def upload_instance(arvrunner, name, tool, job_order):
upload_docker(arvrunner, tool)
+ for t in tool.tool["inputs"]:
+ def setSecondary(fileobj):
+ if isinstance(fileobj, dict) and fileobj.get("class") == "File":
+ if "secondaryFiles" not in fileobj:
+ fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
+
+ if isinstance(fileobj, list):
+ for e in fileobj:
+ setSecondary(e)
+
+ if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
+ setSecondary(job_order[shortname(t["id"])])
+
workflowmapper = upload_dependencies(arvrunner,
name,
tool.doc_loader,
return img
class Runner(object):
- def __init__(self, runner, tool, job_order, enable_reuse, output_name):
+ def __init__(self, runner, tool, job_order, enable_reuse, output_name, output_tags):
self.arvrunner = runner
self.tool = tool
self.job_order = job_order
self.uuid = None
self.final_output = None
self.output_name = output_name
+ self.output_tags = output_tags
def update_pipeline_component(self, record):
pass
readermock = mock.MagicMock()
reader.return_value = readermock
+ final_uuid = final.manifest_locator()
+ num_retries = runner.num_retries
+
cwlout = StringIO.StringIO()
openmock = mock.MagicMock()
final.open.return_value = openmock
openmock.__enter__.return_value = cwlout
- runner.make_output_collection("Test output", "tag0,tag1,tag2", {
- _, runner.final_output_collection = runner.make_output_collection("Test output", {
++ _, runner.final_output_collection = runner.make_output_collection("Test output", "tag0,tag1,tag2", {
"foo": {
"class": "File",
"location": "keep:99999999999999999999999999999991+99/foo.txt",
"bar": {
"class": "File",
"location": "keep:99999999999999999999999999999992+99/bar.txt",
- "basename": "baz.txt"
+ "basename": "baz.txt",
+ "size": 4
}
})
self.assertEqual("""{
"bar": {
"class": "File",
- "location": "baz.txt"
+ "location": "baz.txt",
+ "size": 4
},
"foo": {
"class": "File",
- "location": "foo.txt"
+ "location": "foo.txt",
+ "size": 3
}
}""", cwlout.getvalue())
self.assertIs(final, runner.final_output_collection)
+ self.assertIs(final_uuid, runner.final_output_collection.manifest_locator())
+ self.api.links().create.assert_has_calls([mock.call(body={"head_uuid": final_uuid, "link_class": "tag", "name": "tag0"}), mock.call().execute(num_retries=num_retries)])
+ self.api.links().create.assert_has_calls([mock.call(body={"head_uuid": final_uuid, "link_class": "tag", "name": "tag1"}), mock.call().execute(num_retries=num_retries)])
+ self.api.links().create.assert_has_calls([mock.call(body={"head_uuid": final_uuid, "link_class": "tag", "name": "tag2"}), mock.call().execute(num_retries=num_retries)])