"""
- def __init__(self, api_client, work_api=None, keep_client=None, output_name=None):
+ def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None):
self.api = api_client
self.processes = {}
self.lock = threading.Lock()
self.pipeline = None
self.final_output_collection = None
self.output_name = output_name
+ self.output_tags = output_tags
self.project_uuid = None
if keep_client is not None:
for v in obj:
self.check_writable(v)
- def make_output_collection(self, name, outputObj):
+ def make_output_collection(self, name, tagsString, outputObj):
outputObj = copy.deepcopy(outputObj)
files = []
final.api_response()["name"],
final.manifest_locator())
+ final_uuid = final.manifest_locator()
+ tags = tagsString.split(',')
+ for tag in tags:
+ self.api.links().create(body={
+ "head_uuid": final_uuid, "link_class": "tag", "name": tag
+ }).execute(num_retries=self.num_retries)
+
def finalcollection(fileobj):
fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
self.output_callback,
**kwargs).next()
else:
- runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
+ runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, self.output_tags)
else:
- runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
+ runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, self.output_tags)
if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
# Create pipeline for local run
else:
if self.output_name is None:
self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
- self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.final_output)
+ if self.output_tags is None:
+ self.output_tags = ""
+ self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
self.set_crunch_output()
if self.final_status != "success":
parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
+ parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
parser.add_argument("--ignore-docker-for-reuse", action="store_true",
help="Ignore Docker image version when deciding whether to reuse past jobs.",
default=False)
try:
if api_client is None:
api_client=arvados.api('v1', model=OrderedJsonModel())
- runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name)
+ runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name, output_tags=arvargs.output_tags)
except Exception as e:
logger.error(e)
return 1
if self.output_name:
command.append("--output-name=" + self.output_name)
+ if self.output_tags:
+ command.append("--output-tags=" + self.output_tags)
+
if self.enable_reuse:
command.append("--enable-reuse")
else:
if self.output_name:
self.job_order["arv:output_name"] = self.output_name
+ if self.output_tags:
+ self.job_order["arv:output_tags"] = self.output_tags
+
self.job_order["arv:enable_reuse"] = self.enable_reuse
return {
tool=tool,
job_order=job_order,
enable_reuse=enable_reuse,
- output_name=None)
+ output_name=None,
+ output_tags=None)
def pipeline_component_spec(self):
"""Return a component that Workbench and a-r-p-i will understand.
adjustDirObjs(job_order_object, functools.partial(getListing, arvados_cwl.fsaccess.CollectionFsAccess("", api_client=api)))
output_name = None
+ output_tags = None
enable_reuse = True
if "arv:output_name" in job_order_object:
output_name = job_order_object["arv:output_name"]
del job_order_object["arv:output_name"]
+ if "arv:output_tags" in job_order_object:
+ output_tags = job_order_object["arv:output_tags"]
+ del job_order_object["arv:output_tags"]
+
if "arv:enable_reuse" in job_order_object:
enable_reuse = job_order_object["arv:enable_reuse"]
del job_order_object["arv:enable_reuse"]
runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
- output_name=output_name)
+ output_name=output_name, output_tags=output_tags)
t = load_tool(job_order_object, runner.arv_make_tool)
return img
class Runner(object):
- def __init__(self, runner, tool, job_order, enable_reuse, output_name):
+ def __init__(self, runner, tool, job_order, enable_reuse, output_name, output_tags):
self.arvrunner = runner
self.tool = tool
self.job_order = job_order
self.uuid = None
self.final_output = None
self.output_name = output_name
+ self.output_tags = output_tags
def update_pipeline_component(self, record):
pass
readermock = mock.MagicMock()
reader.return_value = readermock
+ final_uuid = final.manifest_locator()
+ num_retries = runner.num_retries
+
cwlout = StringIO.StringIO()
openmock = mock.MagicMock()
final.open.return_value = openmock
openmock.__enter__.return_value = cwlout
- _, runner.final_output_collection = runner.make_output_collection("Test output", {
+ _, runner.final_output_collection = runner.make_output_collection("Test output", "tag0,tag1,tag2", {
"foo": {
"class": "File",
"location": "keep:99999999999999999999999999999991+99/foo.txt",
}""", cwlout.getvalue())
self.assertIs(final, runner.final_output_collection)
+ self.assertIs(final_uuid, runner.final_output_collection.manifest_locator())
+ self.api.links().create.assert_has_calls([mock.call(body={"head_uuid": final_uuid, "link_class": "tag", "name": "tag0"}), mock.call().execute(num_retries=num_retries)])
+ self.api.links().create.assert_has_calls([mock.call(body={"head_uuid": final_uuid, "link_class": "tag", "name": "tag1"}), mock.call().execute(num_retries=num_retries)])
+ self.api.links().create.assert_has_calls([mock.call(body={"head_uuid": final_uuid, "link_class": "tag", "name": "tag2"}), mock.call().execute(num_retries=num_retries)])
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_pipeline_uuid + '\n')
+ @mock.patch("time.sleep")
+ @stubs
+ def test_submit_output_name(self, stubs, tm):
+ output_name = "test_output_name"
+
+ capture_stdout = cStringIO.StringIO()
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--debug", "--output-name", output_name,
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:output_name"] = output_name
+
+ expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["owner_uuid"] = stubs.fake_user_uuid
+ stubs.api.pipeline_instances().create.assert_called_with(
+ body=expect_pipeline)
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_pipeline_uuid + '\n')
+
+ @mock.patch("time.sleep")
+ @stubs
+ def test_submit_output_tags(self, stubs, tm):
+ output_tags = "tag0,tag1,tag2"
+
+ capture_stdout = cStringIO.StringIO()
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--debug", "--output-tags", output_tags,
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:output_tags"] = output_tags
+
+ expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["owner_uuid"] = stubs.fake_user_uuid
+ stubs.api.pipeline_instances().create.assert_called_with(
+ body=expect_pipeline)
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_pipeline_uuid + '\n')
+
@mock.patch("time.sleep")
@stubs
def test_submit_with_project_uuid(self, stubs, tm):
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_container_request_uuid + '\n')
+ @stubs
+ def test_submit_container_output_name(self, stubs):
+ output_name = "test_output_name"
+
+ capture_stdout = cStringIO.StringIO()
+ try:
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--output-name", output_name,
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', "--output-name="+output_name, '--enable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/job/cwl.input.json']
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["owner_uuid"] = stubs.fake_user_uuid
+ stubs.api.container_requests().create.assert_called_with(
+ body=expect_container)
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
+ @stubs
+ def test_submit_container_output_tags(self, stubs):
+ output_tags = "tag0,tag1,tag2"
+
+ capture_stdout = cStringIO.StringIO()
+ try:
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--output-tags", output_tags,
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', "--output-tags="+output_tags, '--enable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/job/cwl.input.json']
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["owner_uuid"] = stubs.fake_user_uuid
+ stubs.api.container_requests().create.assert_called_with(
+ body=expect_container)
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
@mock.patch("arvados.commands.keepdocker.find_one_image_hash")
@mock.patch("cwltool.docker.get_image")
@mock.patch("arvados.api")