9701: Merge branch '9463-change-arvput-use-collection-class' into 9701-collection...
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 20bbc41bd32627d020f156c4b4e86ac31663971b..5262cb4971aaa09c002c1d52a56cc1899bf9ed4b 100644 (file)
@@ -23,9 +23,8 @@ import arvados.config
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 from .arvtool import ArvadosCommandTool
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 from .arvtool import ArvadosCommandTool
-from .arvworkflow import ArvadosWorkflow
+from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess
 from .fsaccess import CollectionFsAccess
-from .arvworkflow import upload_workflow
 from .perf import Perf
 from cwltool.pack import pack
 
 from .perf import Perf
 from cwltool.pack import pack
 
@@ -35,8 +34,10 @@ from cwltool.draft2tool import compute_checksums
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
+metrics = logging.getLogger('arvados.cwl-runner.metrics')
 logger.setLevel(logging.INFO)
 
 logger.setLevel(logging.INFO)
 
+
 class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
     containers API), wait for them to complete, and report output.
 class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
     containers API), wait for them to complete, and report output.
@@ -56,6 +57,7 @@ class ArvCwlRunner(object):
         self.work_api = work_api
         self.stop_polling = threading.Event()
         self.poll_api = None
         self.work_api = work_api
         self.stop_polling = threading.Event()
         self.poll_api = None
+        self.pipeline = None
 
         if self.work_api is None:
             # todo: autodetect API to use.
 
         if self.work_api is None:
             # todo: autodetect API to use.
@@ -103,7 +105,7 @@ class ArvCwlRunner(object):
                         self.cond.acquire()
                         j = self.processes[uuid]
                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
                         self.cond.acquire()
                         j = self.processes[uuid]
                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
-                        with Perf(logger, "done %s" % j.name):
+                        with Perf(metrics, "done %s" % j.name):
                             j.done(event["properties"]["new_attributes"])
                         self.cond.notify()
                     finally:
                             j.done(event["properties"]["new_attributes"])
                         self.cond.notify()
                     finally:
@@ -244,9 +246,12 @@ class ArvCwlRunner(object):
             # except when in cond.wait(), at which point on_message can update
             # job state and process output callbacks.
 
             # except when in cond.wait(), at which point on_message can update
             # job state and process output callbacks.
 
+            loopperf = Perf(metrics, "jobiter")
+            loopperf.__enter__()
             for runnable in jobiter:
             for runnable in jobiter:
+                loopperf.__exit__()
                 if runnable:
                 if runnable:
-                    with Perf(logger, "run"):
+                    with Perf(metrics, "run"):
                         runnable.run(**kwargs)
                 else:
                     if self.processes:
                         runnable.run(**kwargs)
                 else:
                     if self.processes:
@@ -254,6 +259,8 @@ class ArvCwlRunner(object):
                     else:
                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
                         break
                     else:
                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
                         break
+                loopperf.__enter__()
+            loopperf.__exit__()
 
             while self.processes:
                 self.cond.wait(1)
 
             while self.processes:
                 self.cond.wait(1)
@@ -322,6 +329,8 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
 
     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
 
+    parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
+
     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
 
     exgroup = parser.add_mutually_exclusive_group()
     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
 
     exgroup = parser.add_mutually_exclusive_group()
@@ -368,12 +377,13 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
 def add_arv_hints():
     cache = {}
     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
 def add_arv_hints():
     cache = {}
     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
-    cache["https://w3id.org/cwl/arv-cwl-schema.yml"] = res.read()
+    cache["http://arvados.org/cwl"] = res.read()
     res.close()
     _, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
     res.close()
     _, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
-    _, extnames, _, _ = schema_salad.schema.load_schema("https://w3id.org/cwl/arv-cwl-schema.yml", cache=cache)
+    _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
     for n in extnames.names:
     for n in extnames.names:
-        cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
+        if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
+            cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
 
 def main(args, stdout, stderr, api_client=None):
     parser = arg_parser()
 
 def main(args, stdout, stderr, api_client=None):
     parser = arg_parser()
@@ -400,6 +410,10 @@ def main(args, stdout, stderr, api_client=None):
         logger.setLevel(logging.WARN)
         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 
         logger.setLevel(logging.WARN)
         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 
+    if arvargs.metrics:
+        metrics.setLevel(logging.DEBUG)
+        logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
+
     arvargs.conformance_test = None
     arvargs.use_container = True
 
     arvargs.conformance_test = None
     arvargs.use_container = True