100032: Add metrics to job submission in --debug mode.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 65a36f68773a6293aca0ee53d4442e5b3c71d0c2..7bfdba8b80e48d9cb24d7054933b428aa3729764 100644 (file)
@@ -23,6 +23,8 @@ from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 from .arvtool import ArvadosCommandTool
 from .fsaccess import CollectionFsAccess
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 from .arvtool import ArvadosCommandTool
 from .fsaccess import CollectionFsAccess
+from .arvworkflow import make_workflow
+from .perf import Perf
 
 from cwltool.process import shortname, UnsupportedRequirement
 from cwltool.pathmapper import adjustFileObjs
 
 from cwltool.process import shortname, UnsupportedRequirement
 from cwltool.pathmapper import adjustFileObjs
@@ -96,7 +98,8 @@ class ArvCwlRunner(object):
                         self.cond.acquire()
                         j = self.processes[uuid]
                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
                         self.cond.acquire()
                         j = self.processes[uuid]
                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
-                        j.done(event["properties"]["new_attributes"])
+                        with Perf(logger, "done %s" % j.name):
+                            j.done(event["properties"]["new_attributes"])
                         self.cond.notify()
                     finally:
                         self.cond.release()
                         self.cond.notify()
                     finally:
                         self.cond.release()
@@ -161,6 +164,9 @@ class ArvCwlRunner(object):
             logger.setLevel(logging.WARN)
             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 
             logger.setLevel(logging.WARN)
             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 
+        if self.debug:
+            logger.setLevel(logging.DEBUG)
+
         useruuid = self.api.users().current().execute()["uuid"]
         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
         self.pipeline = None
         useruuid = self.api.users().current().execute()["uuid"]
         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
         self.pipeline = None
@@ -173,7 +179,9 @@ class ArvCwlRunner(object):
             # cwltool.main will write our return value to stdout.
             return tmpl.uuid
 
             # cwltool.main will write our return value to stdout.
             return tmpl.uuid
 
-        self.debug = kwargs.get("debug")
+        if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
+            return make_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
+
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
         kwargs["make_fs_access"] = make_fs_access
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
         kwargs["make_fs_access"] = make_fs_access
@@ -240,7 +248,8 @@ class ArvCwlRunner(object):
 
             for runnable in jobiter:
                 if runnable:
 
             for runnable in jobiter:
                 if runnable:
-                    runnable.run(**kwargs)
+                    with Perf(logger, "run"):
+                        runnable.run(**kwargs)
                 else:
                     if self.processes:
                         self.cond.wait(1)
                 else:
                     if self.processes:
                         self.cond.wait(1)
@@ -325,7 +334,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         default=True, dest="enable_reuse",
                         help="")
 
                         default=True, dest="enable_reuse",
                         help="")
 
-    parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
+    parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
                         default=False)
     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
                         default=False)
@@ -336,6 +345,8 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
                         default=True, dest="submit")
     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
                         default=True, dest="submit")
     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
+    exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
+    exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
@@ -362,7 +373,7 @@ def main(args, stdout, stderr, api_client=None):
 
     job_order_object = None
     arvargs = parser.parse_args(args)
 
     job_order_object = None
     arvargs = parser.parse_args(args)
-    if arvargs.create_template and not arvargs.job_order:
+    if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
         job_order_object = ({}, "")
 
     try:
         job_order_object = ({}, "")
 
     try: