Merge branch '10287-sso-username' refs #10287
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import schema_salad
21
22 import arvados
23 import arvados.config
24
25 from .arvcontainer import ArvadosContainer, RunnerContainer
26 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
27 from. runner import Runner, upload_instance
28 from .arvtool import ArvadosCommandTool
29 from .arvworkflow import ArvadosWorkflow, upload_workflow
30 from .fsaccess import CollectionFsAccess
31 from .perf import Perf
32 from .pathmapper import FinalOutputPathMapper
33
34 from cwltool.pack import pack
35 from cwltool.process import shortname, UnsupportedRequirement, getListing
36 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
37 from cwltool.draft2tool import compute_checksums
38 from arvados.api import OrderedJsonModel
39
40 logger = logging.getLogger('arvados.cwl-runner')
41 metrics = logging.getLogger('arvados.cwl-runner.metrics')
42 logger.setLevel(logging.INFO)
43
44
45 class ArvCwlRunner(object):
46     """Execute a CWL tool or workflow, submit work (using either jobs or
47     containers API), wait for them to complete, and report output.
48
49     """
50
51     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None):
52         self.api = api_client
53         self.processes = {}
54         self.lock = threading.Lock()
55         self.cond = threading.Condition(self.lock)
56         self.final_output = None
57         self.final_status = None
58         self.uploaded = {}
59         self.num_retries = 4
60         self.uuid = None
61         self.stop_polling = threading.Event()
62         self.poll_api = None
63         self.pipeline = None
64         self.final_output_collection = None
65         self.output_name = output_name
66         self.project_uuid = None
67
68         if keep_client is not None:
69             self.keep_client = keep_client
70         else:
71             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
72
73         for api in ["jobs", "containers"]:
74             try:
75                 methods = self.api._rootDesc.get('resources')[api]['methods']
76                 if ('httpMethod' in methods['create'] and
77                     (work_api == api or work_api is None)):
78                     self.work_api = api
79                     break
80             except KeyError:
81                 pass
82         if not self.work_api:
83             if work_api is None:
84                 raise Exception("No supported APIs")
85             else:
86                 raise Exception("Unsupported API '%s'" % work_api)
87
88     def arv_make_tool(self, toolpath_object, **kwargs):
89         kwargs["work_api"] = self.work_api
90         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
91             return ArvadosCommandTool(self, toolpath_object, **kwargs)
92         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
93             return ArvadosWorkflow(self, toolpath_object, **kwargs)
94         else:
95             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
96
97     def output_callback(self, out, processStatus):
98         if processStatus == "success":
99             logger.info("Overall process status is %s", processStatus)
100             if self.pipeline:
101                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
102                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
103         else:
104             logger.warn("Overall process status is %s", processStatus)
105             if self.pipeline:
106                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
107                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
108         self.final_status = processStatus
109         self.final_output = out
110
111     def on_message(self, event):
112         if "object_uuid" in event:
113             if event["object_uuid"] in self.processes and event["event_type"] == "update":
114                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
115                     uuid = event["object_uuid"]
116                     with self.lock:
117                         j = self.processes[uuid]
118                         logger.info("Job %s (%s) is Running", j.name, uuid)
119                         j.running = True
120                         j.update_pipeline_component(event["properties"]["new_attributes"])
121                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
122                     uuid = event["object_uuid"]
123                     try:
124                         self.cond.acquire()
125                         j = self.processes[uuid]
126                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
127                         with Perf(metrics, "done %s" % j.name):
128                             j.done(event["properties"]["new_attributes"])
129                         self.cond.notify()
130                     finally:
131                         self.cond.release()
132
133     def poll_states(self):
134         """Poll status of jobs or containers listed in the processes dict.
135
136         Runs in a separate thread.
137         """
138
139         while True:
140             self.stop_polling.wait(15)
141             if self.stop_polling.is_set():
142                 break
143             with self.lock:
144                 keys = self.processes.keys()
145             if not keys:
146                 continue
147
148             if self.work_api == "containers":
149                 table = self.poll_api.containers()
150             elif self.work_api == "jobs":
151                 table = self.poll_api.jobs()
152
153             try:
154                 proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
155             except Exception as e:
156                 logger.warn("Error checking states on API server: %s", e)
157                 continue
158
159             for p in proc_states["items"]:
160                 self.on_message({
161                     "object_uuid": p["uuid"],
162                     "event_type": "update",
163                     "properties": {
164                         "new_attributes": p
165                     }
166                 })
167
168     def get_uploaded(self):
169         return self.uploaded.copy()
170
171     def add_uploaded(self, src, pair):
172         self.uploaded[src] = pair
173
174     def check_writable(self, obj):
175         if isinstance(obj, dict):
176             if obj.get("writable"):
177                 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
178             for v in obj.itervalues():
179                 self.check_writable(v)
180         if isinstance(obj, list):
181             for v in obj:
182                 self.check_writable(v)
183
184     def make_output_collection(self, name, outputObj):
185         outputObj = copy.deepcopy(outputObj)
186
187         files = []
188         def capture(fileobj):
189             files.append(fileobj)
190
191         adjustDirObjs(outputObj, capture)
192         adjustFileObjs(outputObj, capture)
193
194         generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
195
196         final = arvados.collection.Collection(api_client=self.api,
197                                               keep_client=self.keep_client,
198                                               num_retries=self.num_retries)
199
200         srccollections = {}
201         for k,v in generatemapper.items():
202             sp = k.split("/")
203             srccollection = sp[0][5:]
204             if srccollection not in srccollections:
205                 srccollections[srccollection] = arvados.collection.CollectionReader(
206                     srccollection,
207                     api_client=self.api,
208                     keep_client=self.keep_client,
209                     num_retries=self.num_retries)
210             reader = srccollections[srccollection]
211             try:
212                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
213                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
214             except IOError as e:
215                 logger.warn("While preparing output collection: %s", e)
216
217         def rewrite(fileobj):
218             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
219             for k in ("basename", "size", "listing"):
220                 if k in fileobj:
221                     del fileobj[k]
222
223         adjustDirObjs(outputObj, rewrite)
224         adjustFileObjs(outputObj, rewrite)
225
226         with final.open("cwl.output.json", "w") as f:
227             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
228
229         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
230
231         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
232                     final.api_response()["name"],
233                     final.manifest_locator())
234
235         self.final_output_collection = final
236
237     def arv_executor(self, tool, job_order, **kwargs):
238         self.debug = kwargs.get("debug")
239
240         tool.visit(self.check_writable)
241
242         useruuid = self.api.users().current().execute()["uuid"]
243         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
244         self.pipeline = None
245         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
246                                                                  api_client=self.api,
247                                                                  keep_client=self.keep_client)
248         self.fs_access = make_fs_access(kwargs["basedir"])
249
250         if kwargs.get("create_template"):
251             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
252             tmpl.save()
253             # cwltool.main will write our return value to stdout.
254             return tmpl.uuid
255
256         if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
257             return upload_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
258
259         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
260
261         kwargs["make_fs_access"] = make_fs_access
262         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
263         kwargs["use_container"] = True
264         kwargs["tmpdir_prefix"] = "tmp"
265         kwargs["on_error"] = "continue"
266         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
267
268         if self.work_api == "containers":
269             kwargs["outdir"] = "/var/spool/cwl"
270             kwargs["docker_outdir"] = "/var/spool/cwl"
271             kwargs["tmpdir"] = "/tmp"
272             kwargs["docker_tmpdir"] = "/tmp"
273         elif self.work_api == "jobs":
274             kwargs["outdir"] = "$(task.outdir)"
275             kwargs["docker_outdir"] = "$(task.outdir)"
276             kwargs["tmpdir"] = "$(task.tmpdir)"
277
278         upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
279
280         runnerjob = None
281         if kwargs.get("submit"):
282             if self.work_api == "containers":
283                 if tool.tool["class"] == "CommandLineTool":
284                     runnerjob = tool.job(job_order,
285                                          self.output_callback,
286                                          **kwargs).next()
287                 else:
288                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
289             else:
290                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
291
292         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
293             # Create pipeline for local run
294             self.pipeline = self.api.pipeline_instances().create(
295                 body={
296                     "owner_uuid": self.project_uuid,
297                     "name": shortname(tool.tool["id"]),
298                     "components": {},
299                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
300             logger.info("Pipeline instance %s", self.pipeline["uuid"])
301
302         if runnerjob and not kwargs.get("wait"):
303             runnerjob.run(wait=kwargs.get("wait"))
304             return runnerjob.uuid
305
306         self.poll_api = arvados.api('v1')
307         self.polling_thread = threading.Thread(target=self.poll_states)
308         self.polling_thread.start()
309
310         if runnerjob:
311             jobiter = iter((runnerjob,))
312         else:
313             if "cwl_runner_job" in kwargs:
314                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
315             jobiter = tool.job(job_order,
316                                self.output_callback,
317                                **kwargs)
318
319         try:
320             self.cond.acquire()
321             # Will continue to hold the lock for the duration of this code
322             # except when in cond.wait(), at which point on_message can update
323             # job state and process output callbacks.
324
325             loopperf = Perf(metrics, "jobiter")
326             loopperf.__enter__()
327             for runnable in jobiter:
328                 loopperf.__exit__()
329                 if runnable:
330                     with Perf(metrics, "run"):
331                         runnable.run(**kwargs)
332                 else:
333                     if self.processes:
334                         self.cond.wait(1)
335                     else:
336                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
337                         break
338                 loopperf.__enter__()
339             loopperf.__exit__()
340
341             while self.processes:
342                 self.cond.wait(1)
343
344         except UnsupportedRequirement:
345             raise
346         except:
347             if sys.exc_info()[0] is KeyboardInterrupt:
348                 logger.error("Interrupted, marking pipeline as failed")
349             else:
350                 logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
351             if self.pipeline:
352                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
353                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
354             if runnerjob and runnerjob.uuid and self.work_api == "containers":
355                 self.api.container_requests().update(uuid=runnerjob.uuid,
356                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
357         finally:
358             self.cond.release()
359             self.stop_polling.set()
360             self.polling_thread.join()
361
362         if self.final_status == "UnsupportedRequirement":
363             raise UnsupportedRequirement("Check log for details.")
364
365         if self.final_status != "success":
366             raise WorkflowException("Workflow failed.")
367
368         if self.final_output is None:
369             raise WorkflowException("Workflow did not return a result.")
370
371         if kwargs.get("submit") and isinstance(runnerjob, Runner):
372             logger.info("Final output collection %s", runnerjob.final_output)
373         else:
374             if self.output_name is None:
375                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
376             self.make_output_collection(self.output_name, self.final_output)
377
378         if kwargs.get("compute_checksum"):
379             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
380             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
381
382         return self.final_output
383
384
385 def versionstring():
386     """Print version string of key packages for provenance and debugging."""
387
388     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
389     arvpkg = pkg_resources.require("arvados-python-client")
390     cwlpkg = pkg_resources.require("cwltool")
391
392     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
393                                     "arvados-python-client", arvpkg[0].version,
394                                     "cwltool", cwlpkg[0].version)
395
396
397 def arg_parser():  # type: () -> argparse.ArgumentParser
398     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
399
400     parser.add_argument("--basedir", type=str,
401                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
402     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
403                         help="Output directory, default current directory")
404
405     parser.add_argument("--eval-timeout",
406                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
407                         type=float,
408                         default=20)
409     parser.add_argument("--version", action="store_true", help="Print version and exit")
410
411     exgroup = parser.add_mutually_exclusive_group()
412     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
413     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
414     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
415
416     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
417
418     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
419
420     exgroup = parser.add_mutually_exclusive_group()
421     exgroup.add_argument("--enable-reuse", action="store_true",
422                         default=True, dest="enable_reuse",
423                         help="")
424     exgroup.add_argument("--disable-reuse", action="store_false",
425                         default=True, dest="enable_reuse",
426                         help="")
427
428     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
429     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
430     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
431                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
432                         default=False)
433
434     exgroup = parser.add_mutually_exclusive_group()
435     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
436                         default=True, dest="submit")
437     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
438                         default=True, dest="submit")
439     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
440     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
441     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
442
443     exgroup = parser.add_mutually_exclusive_group()
444     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
445                         default=True, dest="wait")
446     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
447                         default=True, dest="wait")
448
449     parser.add_argument("--api", type=str,
450                         default=None, dest="work_api",
451                         help="Select work submission API, one of 'jobs' or 'containers'.")
452
453     parser.add_argument("--compute-checksum", action="store_true", default=False,
454                         help="Compute checksum of contents while collecting outputs",
455                         dest="compute_checksum")
456
457     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
458     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
459
460     return parser
461
462 def add_arv_hints():
463     cache = {}
464     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
465     cache["http://arvados.org/cwl"] = res.read()
466     res.close()
467     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
468     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
469     for n in extnames.names:
470         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
471             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
472         document_loader.idx["http://arvados.org/cwl#"+n] = {}
473
474 def main(args, stdout, stderr, api_client=None, keep_client=None):
475     parser = arg_parser()
476
477     job_order_object = None
478     arvargs = parser.parse_args(args)
479     if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
480         job_order_object = ({}, "")
481
482     add_arv_hints()
483
484     try:
485         if api_client is None:
486             api_client=arvados.api('v1', model=OrderedJsonModel())
487         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name)
488     except Exception as e:
489         logger.error(e)
490         return 1
491
492     if arvargs.debug:
493         logger.setLevel(logging.DEBUG)
494
495     if arvargs.quiet:
496         logger.setLevel(logging.WARN)
497         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
498
499     if arvargs.metrics:
500         metrics.setLevel(logging.DEBUG)
501         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
502
503     arvargs.conformance_test = None
504     arvargs.use_container = True
505
506     return cwltool.main.main(args=arvargs,
507                              stdout=stdout,
508                              stderr=stderr,
509                              executor=runner.arv_executor,
510                              makeTool=runner.arv_make_tool,
511                              versionfunc=versionstring,
512                              job_order_object=job_order_object,
513                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))