10293: Improve error messages for --api and unsupported InitialWorkDirRequirement.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import schema_salad
21
22 import arvados
23 import arvados.config
24 from arvados.errors import ApiError
25
26 from .arvcontainer import ArvadosContainer, RunnerContainer
27 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
28 from. runner import Runner, upload_instance
29 from .arvtool import ArvadosCommandTool
30 from .arvworkflow import ArvadosWorkflow, upload_workflow
31 from .fsaccess import CollectionFsAccess
32 from .perf import Perf
33 from .pathmapper import FinalOutputPathMapper
34 from ._version import __version__
35
36 from cwltool.pack import pack
37 from cwltool.process import shortname, UnsupportedRequirement, getListing
38 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
39 from cwltool.draft2tool import compute_checksums
40 from arvados.api import OrderedJsonModel
41
42 logger = logging.getLogger('arvados.cwl-runner')
43 metrics = logging.getLogger('arvados.cwl-runner.metrics')
44 logger.setLevel(logging.INFO)
45
46
47 class ArvCwlRunner(object):
48     """Execute a CWL tool or workflow, submit work (using either jobs or
49     containers API), wait for them to complete, and report output.
50
51     """
52
53     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None):
54         self.api = api_client
55         self.processes = {}
56         self.lock = threading.Lock()
57         self.cond = threading.Condition(self.lock)
58         self.final_output = None
59         self.final_status = None
60         self.uploaded = {}
61         self.num_retries = 4
62         self.uuid = None
63         self.stop_polling = threading.Event()
64         self.poll_api = None
65         self.pipeline = None
66         self.final_output_collection = None
67         self.output_name = output_name
68         self.output_tags = output_tags
69         self.project_uuid = None
70
71         if keep_client is not None:
72             self.keep_client = keep_client
73         else:
74             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
75
76         for api in ["jobs", "containers"]:
77             try:
78                 methods = self.api._rootDesc.get('resources')[api]['methods']
79                 if ('httpMethod' in methods['create'] and
80                     (work_api == api or work_api is None)):
81                     self.work_api = api
82                     break
83             except KeyError:
84                 pass
85         if not self.work_api:
86             if work_api is None:
87                 raise Exception("No supported APIs")
88             else:
89                 raise Exception("Unsupported API '%s'" % work_api)
90
91     def arv_make_tool(self, toolpath_object, **kwargs):
92         kwargs["work_api"] = self.work_api
93         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
94             return ArvadosCommandTool(self, toolpath_object, **kwargs)
95         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
96             return ArvadosWorkflow(self, toolpath_object, **kwargs)
97         else:
98             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
99
100     def output_callback(self, out, processStatus):
101         if processStatus == "success":
102             logger.info("Overall process status is %s", processStatus)
103             if self.pipeline:
104                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
105                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
106         else:
107             logger.warn("Overall process status is %s", processStatus)
108             if self.pipeline:
109                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
110                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
111         self.final_status = processStatus
112         self.final_output = out
113
114     def on_message(self, event):
115         if "object_uuid" in event:
116             if event["object_uuid"] in self.processes and event["event_type"] == "update":
117                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
118                     uuid = event["object_uuid"]
119                     with self.lock:
120                         j = self.processes[uuid]
121                         logger.info("Job %s (%s) is Running", j.name, uuid)
122                         j.running = True
123                         j.update_pipeline_component(event["properties"]["new_attributes"])
124                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
125                     uuid = event["object_uuid"]
126                     try:
127                         self.cond.acquire()
128                         j = self.processes[uuid]
129                         txt = self.work_api[0].upper() + self.work_api[1:-1]
130                         logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
131                         with Perf(metrics, "done %s" % j.name):
132                             j.done(event["properties"]["new_attributes"])
133                         self.cond.notify()
134                     finally:
135                         self.cond.release()
136
137     def poll_states(self):
138         """Poll status of jobs or containers listed in the processes dict.
139
140         Runs in a separate thread.
141         """
142
143         try:
144             while True:
145                 self.stop_polling.wait(15)
146                 if self.stop_polling.is_set():
147                     break
148                 with self.lock:
149                     keys = self.processes.keys()
150                 if not keys:
151                     continue
152
153                 if self.work_api == "containers":
154                     table = self.poll_api.container_requests()
155                 elif self.work_api == "jobs":
156                     table = self.poll_api.jobs()
157
158                 try:
159                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
160                 except Exception as e:
161                     logger.warn("Error checking states on API server: %s", e)
162                     continue
163
164                 for p in proc_states["items"]:
165                     self.on_message({
166                         "object_uuid": p["uuid"],
167                         "event_type": "update",
168                         "properties": {
169                             "new_attributes": p
170                         }
171                     })
172         except:
173             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
174             self.cond.acquire()
175             self.processes.clear()
176             self.cond.notify()
177             self.cond.release()
178         finally:
179             self.stop_polling.set()
180
181     def get_uploaded(self):
182         return self.uploaded.copy()
183
184     def add_uploaded(self, src, pair):
185         self.uploaded[src] = pair
186
187     def check_writable(self, obj):
188         if isinstance(obj, dict):
189             if obj.get("writable"):
190                 raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
191             for v in obj.itervalues():
192                 self.check_writable(v)
193         if isinstance(obj, list):
194             for v in obj:
195                 self.check_writable(v)
196
197     def make_output_collection(self, name, tagsString, outputObj):
198         outputObj = copy.deepcopy(outputObj)
199
200         files = []
201         def capture(fileobj):
202             files.append(fileobj)
203
204         adjustDirObjs(outputObj, capture)
205         adjustFileObjs(outputObj, capture)
206
207         generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
208
209         final = arvados.collection.Collection(api_client=self.api,
210                                               keep_client=self.keep_client,
211                                               num_retries=self.num_retries)
212
213         srccollections = {}
214         for k,v in generatemapper.items():
215             if k.startswith("_:"):
216                 if v.type == "Directory":
217                     continue
218                 if v.type == "CreateFile":
219                     with final.open(v.target, "wb") as f:
220                         f.write(v.resolved.encode("utf-8"))
221                     continue
222
223             if not k.startswith("keep:"):
224                 raise Exception("Output source is not in keep or a literal")
225             sp = k.split("/")
226             srccollection = sp[0][5:]
227             if srccollection not in srccollections:
228                 try:
229                     srccollections[srccollection] = arvados.collection.CollectionReader(
230                         srccollection,
231                         api_client=self.api,
232                         keep_client=self.keep_client,
233                         num_retries=self.num_retries)
234                 except arvados.errors.ArgumentError as e:
235                     logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
236                     raise
237             reader = srccollections[srccollection]
238             try:
239                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
240                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
241             except IOError as e:
242                 logger.warn("While preparing output collection: %s", e)
243
244         def rewrite(fileobj):
245             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
246             for k in ("basename", "listing", "contents"):
247                 if k in fileobj:
248                     del fileobj[k]
249
250         adjustDirObjs(outputObj, rewrite)
251         adjustFileObjs(outputObj, rewrite)
252
253         with final.open("cwl.output.json", "w") as f:
254             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
255
256         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
257
258         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
259                     final.api_response()["name"],
260                     final.manifest_locator())
261
262         final_uuid = final.manifest_locator()
263         tags = tagsString.split(',')
264         for tag in tags:
265              self.api.links().create(body={
266                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
267                 }).execute(num_retries=self.num_retries)
268
269         def finalcollection(fileobj):
270             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
271
272         adjustDirObjs(outputObj, finalcollection)
273         adjustFileObjs(outputObj, finalcollection)
274
275         return (outputObj, final)
276
277     def set_crunch_output(self):
278         if self.work_api == "containers":
279             try:
280                 current = self.api.containers().current().execute(num_retries=self.num_retries)
281             except ApiError as e:
282                 # Status code 404 just means we're not running in a container.
283                 if e.resp.status != 404:
284                     logger.info("Getting current container: %s", e)
285                 return
286             try:
287                 self.api.containers().update(uuid=current['uuid'],
288                                              body={
289                                                  'output': self.final_output_collection.portable_data_hash(),
290                                              }).execute(num_retries=self.num_retries)
291             except Exception as e:
292                 logger.info("Setting container output: %s", e)
293         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
294             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
295                                    body={
296                                        'output': self.final_output_collection.portable_data_hash(),
297                                        'success': self.final_status == "success",
298                                        'progress':1.0
299                                    }).execute(num_retries=self.num_retries)
300
301     def arv_executor(self, tool, job_order, **kwargs):
302         self.debug = kwargs.get("debug")
303
304         tool.visit(self.check_writable)
305
306         self.project_uuid = kwargs.get("project_uuid")
307         self.pipeline = None
308         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
309                                                                  api_client=self.api,
310                                                                  keep_client=self.keep_client)
311         self.fs_access = make_fs_access(kwargs["basedir"])
312
313         existing_uuid = kwargs.get("update_workflow")
314         if existing_uuid or kwargs.get("create_workflow"):
315             if self.work_api == "jobs":
316                 tmpl = RunnerTemplate(self, tool, job_order,
317                                       kwargs.get("enable_reuse"),
318                                       uuid=existing_uuid)
319                 tmpl.save()
320                 # cwltool.main will write our return value to stdout.
321                 return tmpl.uuid
322             else:
323                 return upload_workflow(self, tool, job_order,
324                                        self.project_uuid,
325                                        uuid=existing_uuid)
326
327         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
328
329         kwargs["make_fs_access"] = make_fs_access
330         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
331         kwargs["use_container"] = True
332         kwargs["tmpdir_prefix"] = "tmp"
333         kwargs["on_error"] = "continue"
334         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
335
336         if self.work_api == "containers":
337             kwargs["outdir"] = "/var/spool/cwl"
338             kwargs["docker_outdir"] = "/var/spool/cwl"
339             kwargs["tmpdir"] = "/tmp"
340             kwargs["docker_tmpdir"] = "/tmp"
341         elif self.work_api == "jobs":
342             kwargs["outdir"] = "$(task.outdir)"
343             kwargs["docker_outdir"] = "$(task.outdir)"
344             kwargs["tmpdir"] = "$(task.tmpdir)"
345
346         upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
347
348         runnerjob = None
349         if kwargs.get("submit"):
350             if self.work_api == "containers":
351                 if tool.tool["class"] == "CommandLineTool":
352                     runnerjob = tool.job(job_order,
353                                          self.output_callback,
354                                          **kwargs).next()
355                 else:
356                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, self.output_tags)
357             else:
358                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, self.output_tags)
359
360         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
361             # Create pipeline for local run
362             self.pipeline = self.api.pipeline_instances().create(
363                 body={
364                     "owner_uuid": self.project_uuid,
365                     "name": shortname(tool.tool["id"]),
366                     "components": {},
367                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
368             logger.info("Pipeline instance %s", self.pipeline["uuid"])
369
370         if runnerjob and not kwargs.get("wait"):
371             runnerjob.run(wait=kwargs.get("wait"))
372             return runnerjob.uuid
373
374         self.poll_api = arvados.api('v1')
375         self.polling_thread = threading.Thread(target=self.poll_states)
376         self.polling_thread.start()
377
378         if runnerjob:
379             jobiter = iter((runnerjob,))
380         else:
381             if "cwl_runner_job" in kwargs:
382                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
383             jobiter = tool.job(job_order,
384                                self.output_callback,
385                                **kwargs)
386
387         try:
388             self.cond.acquire()
389             # Will continue to hold the lock for the duration of this code
390             # except when in cond.wait(), at which point on_message can update
391             # job state and process output callbacks.
392
393             loopperf = Perf(metrics, "jobiter")
394             loopperf.__enter__()
395             for runnable in jobiter:
396                 loopperf.__exit__()
397
398                 if self.stop_polling.is_set():
399                     break
400
401                 if runnable:
402                     with Perf(metrics, "run"):
403                         runnable.run(**kwargs)
404                 else:
405                     if self.processes:
406                         self.cond.wait(1)
407                     else:
408                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
409                         break
410                 loopperf.__enter__()
411             loopperf.__exit__()
412
413             while self.processes:
414                 self.cond.wait(1)
415
416         except UnsupportedRequirement:
417             raise
418         except:
419             if sys.exc_info()[0] is KeyboardInterrupt:
420                 logger.error("Interrupted, marking pipeline as failed")
421             else:
422                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
423             if self.pipeline:
424                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
425                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
426             if runnerjob and runnerjob.uuid and self.work_api == "containers":
427                 self.api.container_requests().update(uuid=runnerjob.uuid,
428                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
429         finally:
430             self.cond.release()
431             self.stop_polling.set()
432             self.polling_thread.join()
433
434         if self.final_status == "UnsupportedRequirement":
435             raise UnsupportedRequirement("Check log for details.")
436
437         if self.final_output is None:
438             raise WorkflowException("Workflow did not return a result.")
439
440         if kwargs.get("submit") and isinstance(runnerjob, Runner):
441             logger.info("Final output collection %s", runnerjob.final_output)
442         else:
443             if self.output_name is None:
444                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
445             if self.output_tags is None:
446                 self.output_tags = ""
447             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
448             self.set_crunch_output()
449
450         if self.final_status != "success":
451             raise WorkflowException("Workflow failed.")
452
453         if kwargs.get("compute_checksum"):
454             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
455             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
456
457         return self.final_output
458
459
460 def versionstring():
461     """Print version string of key packages for provenance and debugging."""
462
463     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
464     arvpkg = pkg_resources.require("arvados-python-client")
465     cwlpkg = pkg_resources.require("cwltool")
466
467     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
468                                     "arvados-python-client", arvpkg[0].version,
469                                     "cwltool", cwlpkg[0].version)
470
471
472 def arg_parser():  # type: () -> argparse.ArgumentParser
473     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
474
475     parser.add_argument("--basedir", type=str,
476                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
477     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
478                         help="Output directory, default current directory")
479
480     parser.add_argument("--eval-timeout",
481                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
482                         type=float,
483                         default=20)
484     parser.add_argument("--version", action="store_true", help="Print version and exit")
485
486     exgroup = parser.add_mutually_exclusive_group()
487     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
488     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
489     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
490
491     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
492
493     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
494
495     exgroup = parser.add_mutually_exclusive_group()
496     exgroup.add_argument("--enable-reuse", action="store_true",
497                         default=True, dest="enable_reuse",
498                         help="")
499     exgroup.add_argument("--disable-reuse", action="store_false",
500                         default=True, dest="enable_reuse",
501                         help="")
502
503     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
504     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
505     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
506     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
507                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
508                         default=False)
509
510     exgroup = parser.add_mutually_exclusive_group()
511     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
512                         default=True, dest="submit")
513     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
514                         default=True, dest="submit")
515     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
516                          dest="create_workflow")
517     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
518     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
519
520     exgroup = parser.add_mutually_exclusive_group()
521     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
522                         default=True, dest="wait")
523     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
524                         default=True, dest="wait")
525
526     parser.add_argument("--api", type=str,
527                         default=None, dest="work_api",
528                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
529
530     parser.add_argument("--compute-checksum", action="store_true", default=False,
531                         help="Compute checksum of contents while collecting outputs",
532                         dest="compute_checksum")
533
534     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
535     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
536
537     return parser
538
539 def add_arv_hints():
540     cache = {}
541     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
542     cache["http://arvados.org/cwl"] = res.read()
543     res.close()
544     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
545     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
546     for n in extnames.names:
547         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
548             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
549         document_loader.idx["http://arvados.org/cwl#"+n] = {}
550
551 def main(args, stdout, stderr, api_client=None, keep_client=None):
552     parser = arg_parser()
553
554     job_order_object = None
555     arvargs = parser.parse_args(args)
556
557     if arvargs.update_workflow:
558         if arvargs.update_workflow.find('-7fd4e-') == 5:
559             want_api = 'containers'
560         elif arvargs.update_workflow.find('-p5p6p-') == 5:
561             want_api = 'jobs'
562         else:
563             want_api = None
564         if want_api and arvargs.work_api and want_api != arvargs.work_api:
565             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
566                 arvargs.update_workflow, want_api, arvargs.work_api))
567             return 1
568         arvargs.work_api = want_api
569
570     if arvargs.work_api not in ("jobs", "containers"):
571         logger.error("Unknown --api '%s' expected one of 'jobs' or 'containers'", arvargs.work_api)
572         return 1
573
574     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
575         job_order_object = ({}, "")
576
577     add_arv_hints()
578
579     try:
580         if api_client is None:
581             api_client=arvados.api('v1', model=OrderedJsonModel())
582         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name, output_tags=arvargs.output_tags)
583     except Exception as e:
584         logger.error(e)
585         return 1
586
587     if arvargs.debug:
588         logger.setLevel(logging.DEBUG)
589
590     if arvargs.quiet:
591         logger.setLevel(logging.WARN)
592         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
593
594     if arvargs.metrics:
595         metrics.setLevel(logging.DEBUG)
596         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
597
598     arvargs.conformance_test = None
599     arvargs.use_container = True
600     arvargs.relax_path_checks = True
601
602     return cwltool.main.main(args=arvargs,
603                              stdout=stdout,
604                              stderr=stderr,
605                              executor=runner.arv_executor,
606                              makeTool=runner.arv_make_tool,
607                              versionfunc=versionstring,
608                              job_order_object=job_order_object,
609                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))