13108: Simplify locking, add methods for recording process status
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20
21 from cwltool.errors import WorkflowException
22 import cwltool.main
23 import cwltool.workflow
24 import cwltool.process
25 from schema_salad.sourceline import SourceLine
26 import schema_salad.validate as validate
27
28 import arvados
29 import arvados.config
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
32
33 from .arvcontainer import ArvadosContainer, RunnerContainer
34 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
35 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
36 from .arvtool import ArvadosCommandTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from ._version import __version__
42
43 from cwltool.pack import pack
44 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
45 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
46 from cwltool.command_line_tool import compute_checksums
47 from arvados.api import OrderedJsonModel
48
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51 logger.setLevel(logging.INFO)
52
53 arvados.log_handler.setFormatter(logging.Formatter(
54         '%(asctime)s %(name)s %(levelname)s: %(message)s',
55         '%Y-%m-%d %H:%M:%S'))
56
57 DEFAULT_PRIORITY = 500
58
59 class ArvCwlRunner(object):
60     """Execute a CWL tool or workflow, submit work (using either jobs or
61     containers API), wait for them to complete, and report output.
62
63     """
64
65     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
66         self.api = api_client
67         self.processes = {}
68         self.in_flight = 0
69         self.workflow_eval_lock = threading.Condition(threading.RLock())
70         self.final_output = None
71         self.final_status = None
72         self.uploaded = {}
73         self.num_retries = num_retries
74         self.uuid = None
75         self.stop_polling = threading.Event()
76         self.poll_api = None
77         self.pipeline = None
78         self.final_output_collection = None
79         self.output_name = output_name
80         self.output_tags = output_tags
81         self.project_uuid = None
82         self.intermediate_output_ttl = 0
83         self.intermediate_output_collections = []
84         self.trash_intermediate = False
85
86         if keep_client is not None:
87             self.keep_client = keep_client
88         else:
89             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
90
91         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
92
93         self.work_api = None
94         expected_api = ["jobs", "containers"]
95         for api in expected_api:
96             try:
97                 methods = self.api._rootDesc.get('resources')[api]['methods']
98                 if ('httpMethod' in methods['create'] and
99                     (work_api == api or work_api is None)):
100                     self.work_api = api
101                     break
102             except KeyError:
103                 pass
104
105         if not self.work_api:
106             if work_api is None:
107                 raise Exception("No supported APIs")
108             else:
109                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
110
111     def arv_make_tool(self, toolpath_object, **kwargs):
112         kwargs["work_api"] = self.work_api
113         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
114                                                 api_client=self.api,
115                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
116                                                 num_retries=self.num_retries)
117         kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
118         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
119             return ArvadosCommandTool(self, toolpath_object, **kwargs)
120         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
121             return ArvadosWorkflow(self, toolpath_object, **kwargs)
122         else:
123             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
124
125     def output_callback(self, out, processStatus):
126         if processStatus == "success":
127             logger.info("Overall process status is %s", processStatus)
128             if self.pipeline:
129                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
130                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
131         else:
132             logger.warn("Overall process status is %s", processStatus)
133             if self.pipeline:
134                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
135                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
136         self.final_status = processStatus
137         self.final_output = out
138
139     def start_run(self, runnable, kwargs):
140         with self.workflow_eval_lock:
141             self.in_flight += 1
142         runnable.run(**kwargs)
143
144     def process_submitted(self, container):
145         with self.workflow_eval_lock:
146             self.processes[container.uuid] = container
147             self.in_flight -= 1
148
149     def process_done(self, uuid):
150         with self.workflow_eval_lock:
151             if uuid in self.processes:
152                 del self.processes[uuid]
153
154     def on_message(self, event):
155         if "object_uuid" in event:
156             if event["object_uuid"] in self.processes and event["event_type"] == "update":
157                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
158                     uuid = event["object_uuid"]
159                     with self.workflow_eval_lock:
160                         j = self.processes[uuid]
161                         logger.info("%s %s is Running", self.label(j), uuid)
162                         j.running = True
163                         j.update_pipeline_component(event["properties"]["new_attributes"])
164                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
165                     uuid = event["object_uuid"]
166                     with self.workflow_eval_lock:
167                         j = self.processes[uuid]
168                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
169                         with Perf(metrics, "done %s" % j.name):
170                             j.done(event["properties"]["new_attributes"])
171                         self.workflow_eval_lock.notify()
172
173     def label(self, obj):
174         return "[%s %s]" % (self.work_api[0:-1], obj.name)
175
176     def poll_states(self):
177         """Poll status of jobs or containers listed in the processes dict.
178
179         Runs in a separate thread.
180         """
181
182         try:
183             while True:
184                 self.stop_polling.wait(15)
185                 if self.stop_polling.is_set():
186                     break
187                 with self.workflow_eval_lock:
188                     keys = list(self.processes.keys())
189                 if not keys:
190                     continue
191
192                 if self.work_api == "containers":
193                     table = self.poll_api.container_requests()
194                 elif self.work_api == "jobs":
195                     table = self.poll_api.jobs()
196
197                 try:
198                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
199                 except Exception as e:
200                     logger.warn("Error checking states on API server: %s", e)
201                     continue
202
203                 for p in proc_states["items"]:
204                     self.on_message({
205                         "object_uuid": p["uuid"],
206                         "event_type": "update",
207                         "properties": {
208                             "new_attributes": p
209                         }
210                     })
211         except:
212             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
213             with workflow_eval_lock:
214                 self.processes.clear()
215                 self.workflow_eval_lock.notify()
216         finally:
217             self.stop_polling.set()
218
219     def get_uploaded(self):
220         return self.uploaded.copy()
221
222     def add_uploaded(self, src, pair):
223         self.uploaded[src] = pair
224
225     def add_intermediate_output(self, uuid):
226         if uuid:
227             self.intermediate_output_collections.append(uuid)
228
229     def trash_intermediate_output(self):
230         logger.info("Cleaning up intermediate output collections")
231         for i in self.intermediate_output_collections:
232             try:
233                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
234             except:
235                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
236             if sys.exc_info()[0] is KeyboardInterrupt:
237                 break
238
239     def check_features(self, obj):
240         if isinstance(obj, dict):
241             if obj.get("writable") and self.work_api != "containers":
242                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
243             if obj.get("class") == "DockerRequirement":
244                 if obj.get("dockerOutputDirectory"):
245                     if self.work_api != "containers":
246                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
247                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
248                     if not obj.get("dockerOutputDirectory").startswith('/'):
249                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
250                             "Option 'dockerOutputDirectory' must be an absolute path.")
251             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
252                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
253             for v in obj.itervalues():
254                 self.check_features(v)
255         elif isinstance(obj, list):
256             for i,v in enumerate(obj):
257                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
258                     self.check_features(v)
259
260     def make_output_collection(self, name, tagsString, outputObj):
261         outputObj = copy.deepcopy(outputObj)
262
263         files = []
264         def capture(fileobj):
265             files.append(fileobj)
266
267         adjustDirObjs(outputObj, capture)
268         adjustFileObjs(outputObj, capture)
269
270         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
271
272         final = arvados.collection.Collection(api_client=self.api,
273                                               keep_client=self.keep_client,
274                                               num_retries=self.num_retries)
275
276         for k,v in generatemapper.items():
277             if k.startswith("_:"):
278                 if v.type == "Directory":
279                     continue
280                 if v.type == "CreateFile":
281                     with final.open(v.target, "wb") as f:
282                         f.write(v.resolved.encode("utf-8"))
283                     continue
284
285             if not k.startswith("keep:"):
286                 raise Exception("Output source is not in keep or a literal")
287             sp = k.split("/")
288             srccollection = sp[0][5:]
289             try:
290                 reader = self.collection_cache.get(srccollection)
291                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
292                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
293             except arvados.errors.ArgumentError as e:
294                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
295                 raise
296             except IOError as e:
297                 logger.warn("While preparing output collection: %s", e)
298
299         def rewrite(fileobj):
300             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
301             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
302                 if k in fileobj:
303                     del fileobj[k]
304
305         adjustDirObjs(outputObj, rewrite)
306         adjustFileObjs(outputObj, rewrite)
307
308         with final.open("cwl.output.json", "w") as f:
309             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
310
311         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
312
313         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
314                     final.api_response()["name"],
315                     final.manifest_locator())
316
317         final_uuid = final.manifest_locator()
318         tags = tagsString.split(',')
319         for tag in tags:
320              self.api.links().create(body={
321                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
322                 }).execute(num_retries=self.num_retries)
323
324         def finalcollection(fileobj):
325             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
326
327         adjustDirObjs(outputObj, finalcollection)
328         adjustFileObjs(outputObj, finalcollection)
329
330         return (outputObj, final)
331
332     def set_crunch_output(self):
333         if self.work_api == "containers":
334             try:
335                 current = self.api.containers().current().execute(num_retries=self.num_retries)
336             except ApiError as e:
337                 # Status code 404 just means we're not running in a container.
338                 if e.resp.status != 404:
339                     logger.info("Getting current container: %s", e)
340                 return
341             try:
342                 self.api.containers().update(uuid=current['uuid'],
343                                              body={
344                                                  'output': self.final_output_collection.portable_data_hash(),
345                                              }).execute(num_retries=self.num_retries)
346                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
347                                               body={
348                                                   'is_trashed': True
349                                               }).execute(num_retries=self.num_retries)
350             except Exception as e:
351                 logger.info("Setting container output: %s", e)
352         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
353             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
354                                    body={
355                                        'output': self.final_output_collection.portable_data_hash(),
356                                        'success': self.final_status == "success",
357                                        'progress':1.0
358                                    }).execute(num_retries=self.num_retries)
359
360     def arv_executor(self, tool, job_order, **kwargs):
361         self.debug = kwargs.get("debug")
362
363         tool.visit(self.check_features)
364
365         self.project_uuid = kwargs.get("project_uuid")
366         self.pipeline = None
367         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
368                                                                  collection_cache=self.collection_cache)
369         self.fs_access = make_fs_access(kwargs["basedir"])
370         self.secret_store = kwargs.get("secret_store")
371
372         self.trash_intermediate = kwargs["trash_intermediate"]
373         if self.trash_intermediate and self.work_api != "containers":
374             raise Exception("--trash-intermediate is only supported with --api=containers.")
375
376         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
377         if self.intermediate_output_ttl and self.work_api != "containers":
378             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
379         if self.intermediate_output_ttl < 0:
380             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
381
382         if not kwargs.get("name"):
383             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
384
385         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
386         # Also uploads docker images.
387         merged_map = upload_workflow_deps(self, tool)
388
389         # Reload tool object which may have been updated by
390         # upload_workflow_deps
391         # Don't validate this time because it will just print redundant errors.
392         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
393                                   makeTool=self.arv_make_tool,
394                                   loader=tool.doc_loader,
395                                   avsc_names=tool.doc_schema,
396                                   metadata=tool.metadata,
397                                   do_validate=False)
398
399         # Upload local file references in the job order.
400         job_order = upload_job_order(self, "%s input" % kwargs["name"],
401                                      tool, job_order)
402
403         existing_uuid = kwargs.get("update_workflow")
404         if existing_uuid or kwargs.get("create_workflow"):
405             # Create a pipeline template or workflow record and exit.
406             if self.work_api == "jobs":
407                 tmpl = RunnerTemplate(self, tool, job_order,
408                                       kwargs.get("enable_reuse"),
409                                       uuid=existing_uuid,
410                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
411                                       name=kwargs["name"],
412                                       merged_map=merged_map)
413                 tmpl.save()
414                 # cwltool.main will write our return value to stdout.
415                 return (tmpl.uuid, "success")
416             elif self.work_api == "containers":
417                 return (upload_workflow(self, tool, job_order,
418                                         self.project_uuid,
419                                         uuid=existing_uuid,
420                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
421                                         name=kwargs["name"],
422                                         merged_map=merged_map),
423                         "success")
424
425         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
426         self.eval_timeout = kwargs.get("eval_timeout")
427
428         kwargs["make_fs_access"] = make_fs_access
429         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
430         kwargs["use_container"] = True
431         kwargs["tmpdir_prefix"] = "tmp"
432         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
433
434         if self.work_api == "containers":
435             if self.ignore_docker_for_reuse:
436                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
437             kwargs["outdir"] = "/var/spool/cwl"
438             kwargs["docker_outdir"] = "/var/spool/cwl"
439             kwargs["tmpdir"] = "/tmp"
440             kwargs["docker_tmpdir"] = "/tmp"
441         elif self.work_api == "jobs":
442             if kwargs["priority"] != DEFAULT_PRIORITY:
443                 raise Exception("--priority not implemented for jobs API.")
444             kwargs["outdir"] = "$(task.outdir)"
445             kwargs["docker_outdir"] = "$(task.outdir)"
446             kwargs["tmpdir"] = "$(task.tmpdir)"
447
448         if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
449             raise Exception("--priority must be in the range 1..1000.")
450
451         runnerjob = None
452         if kwargs.get("submit"):
453             # Submit a runner job to run the workflow for us.
454             if self.work_api == "containers":
455                 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
456                     kwargs["runnerjob"] = tool.tool["id"]
457                     runnerjob = tool.job(job_order,
458                                          self.output_callback,
459                                          **kwargs).next()
460                 else:
461                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
462                                                 self.output_name,
463                                                 self.output_tags,
464                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
465                                                 name=kwargs.get("name"),
466                                                 on_error=kwargs.get("on_error"),
467                                                 submit_runner_image=kwargs.get("submit_runner_image"),
468                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
469                                                 merged_map=merged_map,
470                                                 priority=kwargs.get("priority"),
471                                                 secret_store=self.secret_store)
472             elif self.work_api == "jobs":
473                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
474                                       self.output_name,
475                                       self.output_tags,
476                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
477                                       name=kwargs.get("name"),
478                                       on_error=kwargs.get("on_error"),
479                                       submit_runner_image=kwargs.get("submit_runner_image"),
480                                       merged_map=merged_map)
481         elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
482             # Create pipeline for local run
483             self.pipeline = self.api.pipeline_instances().create(
484                 body={
485                     "owner_uuid": self.project_uuid,
486                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
487                     "components": {},
488                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
489             logger.info("Pipeline instance %s", self.pipeline["uuid"])
490
491         if runnerjob and not kwargs.get("wait"):
492             runnerjob.run(wait=kwargs.get("wait"))
493             return (runnerjob.uuid, "success")
494
495         self.poll_api = arvados.api('v1')
496         self.polling_thread = threading.Thread(target=self.poll_states)
497         self.polling_thread.start()
498
499         if runnerjob:
500             jobiter = iter((runnerjob,))
501         else:
502             if "cwl_runner_job" in kwargs:
503                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
504             jobiter = tool.job(job_order,
505                                self.output_callback,
506                                **kwargs)
507
508         try:
509             self.workflow_eval_lock.acquire()
510             # Holds the lock while this code runs and releases it when
511             # it is safe to do so in self.workflow_eval_lock.wait(),
512             # at which point on_message can update job state and
513             # process output callbacks.
514
515             loopperf = Perf(metrics, "jobiter")
516             loopperf.__enter__()
517             for runnable in jobiter:
518                 loopperf.__exit__()
519
520                 if self.stop_polling.is_set():
521                     break
522
523                 if runnable:
524                     with Perf(metrics, "run"):
525                         self.start_run(runnable, kwargs)
526                 else:
527                     if (self.in_flight + len(self.processes)) > 0:
528                         self.workflow_eval_lock.wait(1)
529                     else:
530                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
531                         break
532                 loopperf.__enter__()
533             loopperf.__exit__()
534
535             while self.processes:
536                 self.workflow_eval_lock.wait(1)
537
538         except UnsupportedRequirement:
539             raise
540         except:
541             if sys.exc_info()[0] is KeyboardInterrupt:
542                 logger.error("Interrupted, marking pipeline as failed")
543             else:
544                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
545             if self.pipeline:
546                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
547                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
548             if runnerjob and runnerjob.uuid and self.work_api == "containers":
549                 self.api.container_requests().update(uuid=runnerjob.uuid,
550                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
551         finally:
552             self.workflow_eval_lock.release()
553             self.stop_polling.set()
554             self.polling_thread.join()
555
556         if self.final_status == "UnsupportedRequirement":
557             raise UnsupportedRequirement("Check log for details.")
558
559         if self.final_output is None:
560             raise WorkflowException("Workflow did not return a result.")
561
562         if kwargs.get("submit") and isinstance(runnerjob, Runner):
563             logger.info("Final output collection %s", runnerjob.final_output)
564         else:
565             if self.output_name is None:
566                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
567             if self.output_tags is None:
568                 self.output_tags = ""
569             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
570             self.set_crunch_output()
571
572         if kwargs.get("compute_checksum"):
573             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
574             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
575
576         if self.trash_intermediate and self.final_status == "success":
577             self.trash_intermediate_output()
578
579         return (self.final_output, self.final_status)
580
581
582 def versionstring():
583     """Print version string of key packages for provenance and debugging."""
584
585     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
586     arvpkg = pkg_resources.require("arvados-python-client")
587     cwlpkg = pkg_resources.require("cwltool")
588
589     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
590                                     "arvados-python-client", arvpkg[0].version,
591                                     "cwltool", cwlpkg[0].version)
592
593
594 def arg_parser():  # type: () -> argparse.ArgumentParser
595     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
596
597     parser.add_argument("--basedir", type=str,
598                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
599     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
600                         help="Output directory, default current directory")
601
602     parser.add_argument("--eval-timeout",
603                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
604                         type=float,
605                         default=20)
606
607     exgroup = parser.add_mutually_exclusive_group()
608     exgroup.add_argument("--print-dot", action="store_true",
609                          help="Print workflow visualization in graphviz format and exit")
610     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
611     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
612
613     exgroup = parser.add_mutually_exclusive_group()
614     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
615     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
616     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
617
618     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
619
620     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
621
622     exgroup = parser.add_mutually_exclusive_group()
623     exgroup.add_argument("--enable-reuse", action="store_true",
624                         default=True, dest="enable_reuse",
625                         help="Enable job or container reuse (default)")
626     exgroup.add_argument("--disable-reuse", action="store_false",
627                         default=True, dest="enable_reuse",
628                         help="Disable job or container reuse")
629
630     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
631     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
632     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
633     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
634                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
635                         default=False)
636
637     exgroup = parser.add_mutually_exclusive_group()
638     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
639                         default=True, dest="submit")
640     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
641                         default=True, dest="submit")
642     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
643                          dest="create_workflow")
644     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
645     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
646
647     exgroup = parser.add_mutually_exclusive_group()
648     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
649                         default=True, dest="wait")
650     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
651                         default=True, dest="wait")
652
653     exgroup = parser.add_mutually_exclusive_group()
654     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
655                         default=True, dest="log_timestamps")
656     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
657                         default=True, dest="log_timestamps")
658
659     parser.add_argument("--api", type=str,
660                         default=None, dest="work_api",
661                         choices=("jobs", "containers"),
662                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
663
664     parser.add_argument("--compute-checksum", action="store_true", default=False,
665                         help="Compute checksum of contents while collecting outputs",
666                         dest="compute_checksum")
667
668     parser.add_argument("--submit-runner-ram", type=int,
669                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
670                         default=1024)
671
672     parser.add_argument("--submit-runner-image", type=str,
673                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
674                         default=None)
675
676     parser.add_argument("--name", type=str,
677                         help="Name to use for workflow execution instance.",
678                         default=None)
679
680     parser.add_argument("--on-error", type=str,
681                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
682                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
683
684     parser.add_argument("--enable-dev", action="store_true",
685                         help="Enable loading and running development versions "
686                              "of CWL spec.", default=False)
687
688     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
689                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
690                         default=0)
691
692     parser.add_argument("--priority", type=int,
693                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
694                         default=DEFAULT_PRIORITY)
695
696     parser.add_argument("--disable-validate", dest="do_validate",
697                         action="store_false", default=True,
698                         help=argparse.SUPPRESS)
699
700     parser.add_argument("--disable-js-validation",
701                         action="store_true", default=False,
702                         help=argparse.SUPPRESS)
703
704     exgroup = parser.add_mutually_exclusive_group()
705     exgroup.add_argument("--trash-intermediate", action="store_true",
706                         default=False, dest="trash_intermediate",
707                          help="Immediately trash intermediate outputs on workflow success.")
708     exgroup.add_argument("--no-trash-intermediate", action="store_false",
709                         default=False, dest="trash_intermediate",
710                         help="Do not trash intermediate outputs (default).")
711
712     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
713     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
714
715     return parser
716
717 def add_arv_hints():
718     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
719     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
720     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
721     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
722     res.close()
723     cwltool.process.supportedProcessRequirements.extend([
724         "http://arvados.org/cwl#RunInSingleContainer",
725         "http://arvados.org/cwl#OutputDirType",
726         "http://arvados.org/cwl#RuntimeConstraints",
727         "http://arvados.org/cwl#PartitionRequirement",
728         "http://arvados.org/cwl#APIRequirement",
729         "http://commonwl.org/cwltool#LoadListingRequirement",
730         "http://arvados.org/cwl#IntermediateOutput",
731         "http://arvados.org/cwl#ReuseRequirement"
732     ])
733
734 def main(args, stdout, stderr, api_client=None, keep_client=None):
735     parser = arg_parser()
736
737     job_order_object = None
738     arvargs = parser.parse_args(args)
739
740     if arvargs.update_workflow:
741         if arvargs.update_workflow.find('-7fd4e-') == 5:
742             want_api = 'containers'
743         elif arvargs.update_workflow.find('-p5p6p-') == 5:
744             want_api = 'jobs'
745         else:
746             want_api = None
747         if want_api and arvargs.work_api and want_api != arvargs.work_api:
748             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
749                 arvargs.update_workflow, want_api, arvargs.work_api))
750             return 1
751         arvargs.work_api = want_api
752
753     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
754         job_order_object = ({}, "")
755
756     add_arv_hints()
757
758     try:
759         if api_client is None:
760             api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
761             keep_client = api_client.keep
762         if keep_client is None:
763             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
764         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
765                               num_retries=4, output_name=arvargs.output_name,
766                               output_tags=arvargs.output_tags)
767     except Exception as e:
768         logger.error(e)
769         return 1
770
771     if arvargs.debug:
772         logger.setLevel(logging.DEBUG)
773         logging.getLogger('arvados').setLevel(logging.DEBUG)
774
775     if arvargs.quiet:
776         logger.setLevel(logging.WARN)
777         logging.getLogger('arvados').setLevel(logging.WARN)
778         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
779
780     if arvargs.metrics:
781         metrics.setLevel(logging.DEBUG)
782         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
783
784     if arvargs.log_timestamps:
785         arvados.log_handler.setFormatter(logging.Formatter(
786             '%(asctime)s %(name)s %(levelname)s: %(message)s',
787             '%Y-%m-%d %H:%M:%S'))
788     else:
789         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
790
791     arvargs.conformance_test = None
792     arvargs.use_container = True
793     arvargs.relax_path_checks = True
794     arvargs.print_supported_versions = False
795
796     make_fs_access = partial(CollectionFsAccess,
797                            collection_cache=runner.collection_cache)
798
799     return cwltool.main.main(args=arvargs,
800                              stdout=stdout,
801                              stderr=stderr,
802                              executor=runner.arv_executor,
803                              makeTool=runner.arv_make_tool,
804                              versionfunc=versionstring,
805                              job_order_object=job_order_object,
806                              make_fs_access=make_fs_access,
807                              fetcher_constructor=partial(CollectionFetcher,
808                                                          api_client=api_client,
809                                                          fs_access=make_fs_access(""),
810                                                          num_retries=runner.num_retries),
811                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
812                              logger_handler=arvados.log_handler,
813                              custom_schema_callback=add_arv_hints)