13108: Rename parallel_submit_count to thread_count
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20 import Queue
21 import time
22
23 from cwltool.errors import WorkflowException
24 import cwltool.main
25 import cwltool.workflow
26 import cwltool.process
27 from schema_salad.sourceline import SourceLine
28 import schema_salad.validate as validate
29
30 import arvados
31 import arvados.config
32 from arvados.keep import KeepClient
33 from arvados.errors import ApiError
34
35 from .arvcontainer import ArvadosContainer, RunnerContainer
36 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
37 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
38 from .arvtool import ArvadosCommandTool
39 from .arvworkflow import ArvadosWorkflow, upload_workflow
40 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
41 from .perf import Perf
42 from .pathmapper import NoFollowPathMapper
43 from ._version import __version__
44
45 from cwltool.pack import pack
46 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
47 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
48 from cwltool.command_line_tool import compute_checksums
49 from arvados.api import OrderedJsonModel
50
51 logger = logging.getLogger('arvados.cwl-runner')
52 metrics = logging.getLogger('arvados.cwl-runner.metrics')
53 logger.setLevel(logging.INFO)
54
55 arvados.log_handler.setFormatter(logging.Formatter(
56         '%(asctime)s %(name)s %(levelname)s: %(message)s',
57         '%Y-%m-%d %H:%M:%S'))
58
59 DEFAULT_PRIORITY = 500
60
61 class ArvCwlRunner(object):
62     """Execute a CWL tool or workflow, submit work (using either jobs or
63     containers API), wait for them to complete, and report output.
64
65     """
66
67     def __init__(self, api_client, work_api=None, keep_client=None,
68                  output_name=None, output_tags=None, num_retries=4,
69                  thread_count=4):
70         self.api = api_client
71         self.processes = {}
72         self.in_flight = 0
73         self.workflow_eval_lock = threading.Condition(threading.RLock())
74         self.final_output = None
75         self.final_status = None
76         self.uploaded = {}
77         self.num_retries = num_retries
78         self.uuid = None
79         self.stop_polling = threading.Event()
80         self.poll_api = None
81         self.pipeline = None
82         self.final_output_collection = None
83         self.output_name = output_name
84         self.output_tags = output_tags
85         self.project_uuid = None
86         self.intermediate_output_ttl = 0
87         self.intermediate_output_collections = []
88         self.trash_intermediate = False
89         self.task_queue = Queue.Queue()
90         self.task_queue_threads = []
91         self.thread_count = thread_count
92         self.poll_interval = 12
93
94         if keep_client is not None:
95             self.keep_client = keep_client
96         else:
97             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
98
99         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
100
101         self.work_api = None
102         expected_api = ["jobs", "containers"]
103         for api in expected_api:
104             try:
105                 methods = self.api._rootDesc.get('resources')[api]['methods']
106                 if ('httpMethod' in methods['create'] and
107                     (work_api == api or work_api is None)):
108                     self.work_api = api
109                     break
110             except KeyError:
111                 pass
112
113         if not self.work_api:
114             if work_api is None:
115                 raise Exception("No supported APIs")
116             else:
117                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
118
119     def arv_make_tool(self, toolpath_object, **kwargs):
120         kwargs["work_api"] = self.work_api
121         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
122                                                 api_client=self.api,
123                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
124                                                 num_retries=self.num_retries)
125         kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
126         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
127             return ArvadosCommandTool(self, toolpath_object, **kwargs)
128         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
129             return ArvadosWorkflow(self, toolpath_object, **kwargs)
130         else:
131             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
132
133     def output_callback(self, out, processStatus):
134         with self.workflow_eval_lock:
135             if processStatus == "success":
136                 logger.info("Overall process status is %s", processStatus)
137                 if self.pipeline:
138                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
139                                                          body={"state": "Complete"}).execute(num_retries=self.num_retries)
140             else:
141                 logger.warn("Overall process status is %s", processStatus)
142                 if self.pipeline:
143                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
144                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
145             self.final_status = processStatus
146             self.final_output = out
147
148     def task_queue_func(self):
149         while True:
150             task = self.task_queue.get()
151             if task is None:
152                 return
153             task()
154
155     def task_queue_add(self, task):
156         if self.thread_count > 1:
157             self.task_queue.put(task)
158         else:
159             task()
160
161     def start_run(self, runnable, kwargs):
162         with self.workflow_eval_lock:
163             self.in_flight += 1
164         self.task_queue_add(partial(runnable.run, **kwargs))
165
166     def process_submitted(self, container):
167         with self.workflow_eval_lock:
168             self.processes[container.uuid] = container
169             self.in_flight -= 1
170
171     def process_done(self, uuid):
172         with self.workflow_eval_lock:
173             if uuid in self.processes:
174                 del self.processes[uuid]
175
176     def wrapped_callback(self, cb, obj, st):
177         with self.workflow_eval_lock:
178             cb(obj, st)
179
180     def get_wrapped_callback(self, cb):
181         return partial(self.wrapped_callback, cb)
182
183     def on_message(self, event):
184         if "object_uuid" in event:
185             if event["object_uuid"] in self.processes and event["event_type"] == "update":
186                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
187                     uuid = event["object_uuid"]
188                     with self.workflow_eval_lock:
189                         j = self.processes[uuid]
190                         logger.info("%s %s is Running", self.label(j), uuid)
191                         j.running = True
192                         j.update_pipeline_component(event["properties"]["new_attributes"])
193                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
194                     uuid = event["object_uuid"]
195                     with self.workflow_eval_lock:
196                         j = self.processes[uuid]
197                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
198                     def done_cb():
199                         j.done(event["properties"]["new_attributes"])
200                         with self.workflow_eval_lock:
201                             self.workflow_eval_lock.notify()
202                     self.task_queue_add(done_cb)
203
204
205     def label(self, obj):
206         return "[%s %s]" % (self.work_api[0:-1], obj.name)
207
208     def poll_states(self):
209         """Poll status of jobs or containers listed in the processes dict.
210
211         Runs in a separate thread.
212         """
213
214         try:
215             remain_wait = self.poll_interval
216             while True:
217                 if remain_wait > 0:
218                     self.stop_polling.wait(remain_wait)
219                 if self.stop_polling.is_set():
220                     break
221                 with self.workflow_eval_lock:
222                     keys = list(self.processes.keys())
223                 if not keys:
224                     remain_wait = self.poll_interval
225                     continue
226
227                 begin_poll = time.time()
228                 if self.work_api == "containers":
229                     table = self.poll_api.container_requests()
230                 elif self.work_api == "jobs":
231                     table = self.poll_api.jobs()
232
233                 try:
234                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
235                 except Exception as e:
236                     logger.warn("Error checking states on API server: %s", e)
237                     remain_wait = self.poll_interval
238                     continue
239
240                 for p in proc_states["items"]:
241                     self.on_message({
242                         "object_uuid": p["uuid"],
243                         "event_type": "update",
244                         "properties": {
245                             "new_attributes": p
246                         }
247                     })
248                 finish_poll = time.time()
249                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
250         except:
251             logger.exception("Fatal error in state polling thread.")
252             with self.workflow_eval_lock:
253                 self.processes.clear()
254                 self.workflow_eval_lock.notify()
255         finally:
256             self.stop_polling.set()
257
258     def get_uploaded(self):
259         return self.uploaded.copy()
260
261     def add_uploaded(self, src, pair):
262         self.uploaded[src] = pair
263
264     def add_intermediate_output(self, uuid):
265         if uuid:
266             self.intermediate_output_collections.append(uuid)
267
268     def trash_intermediate_output(self):
269         logger.info("Cleaning up intermediate output collections")
270         for i in self.intermediate_output_collections:
271             try:
272                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
273             except:
274                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
275             if sys.exc_info()[0] is KeyboardInterrupt:
276                 break
277
278     def check_features(self, obj):
279         if isinstance(obj, dict):
280             if obj.get("writable") and self.work_api != "containers":
281                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
282             if obj.get("class") == "DockerRequirement":
283                 if obj.get("dockerOutputDirectory"):
284                     if self.work_api != "containers":
285                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
286                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
287                     if not obj.get("dockerOutputDirectory").startswith('/'):
288                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
289                             "Option 'dockerOutputDirectory' must be an absolute path.")
290             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
291                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
292             for v in obj.itervalues():
293                 self.check_features(v)
294         elif isinstance(obj, list):
295             for i,v in enumerate(obj):
296                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
297                     self.check_features(v)
298
299     def make_output_collection(self, name, tagsString, outputObj):
300         outputObj = copy.deepcopy(outputObj)
301
302         files = []
303         def capture(fileobj):
304             files.append(fileobj)
305
306         adjustDirObjs(outputObj, capture)
307         adjustFileObjs(outputObj, capture)
308
309         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
310
311         final = arvados.collection.Collection(api_client=self.api,
312                                               keep_client=self.keep_client,
313                                               num_retries=self.num_retries)
314
315         for k,v in generatemapper.items():
316             if k.startswith("_:"):
317                 if v.type == "Directory":
318                     continue
319                 if v.type == "CreateFile":
320                     with final.open(v.target, "wb") as f:
321                         f.write(v.resolved.encode("utf-8"))
322                     continue
323
324             if not k.startswith("keep:"):
325                 raise Exception("Output source is not in keep or a literal")
326             sp = k.split("/")
327             srccollection = sp[0][5:]
328             try:
329                 reader = self.collection_cache.get(srccollection)
330                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
331                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
332             except arvados.errors.ArgumentError as e:
333                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
334                 raise
335             except IOError as e:
336                 logger.warn("While preparing output collection: %s", e)
337
338         def rewrite(fileobj):
339             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
340             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
341                 if k in fileobj:
342                     del fileobj[k]
343
344         adjustDirObjs(outputObj, rewrite)
345         adjustFileObjs(outputObj, rewrite)
346
347         with final.open("cwl.output.json", "w") as f:
348             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
349
350         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
351
352         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
353                     final.api_response()["name"],
354                     final.manifest_locator())
355
356         final_uuid = final.manifest_locator()
357         tags = tagsString.split(',')
358         for tag in tags:
359              self.api.links().create(body={
360                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
361                 }).execute(num_retries=self.num_retries)
362
363         def finalcollection(fileobj):
364             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
365
366         adjustDirObjs(outputObj, finalcollection)
367         adjustFileObjs(outputObj, finalcollection)
368
369         return (outputObj, final)
370
371     def set_crunch_output(self):
372         if self.work_api == "containers":
373             try:
374                 current = self.api.containers().current().execute(num_retries=self.num_retries)
375             except ApiError as e:
376                 # Status code 404 just means we're not running in a container.
377                 if e.resp.status != 404:
378                     logger.info("Getting current container: %s", e)
379                 return
380             try:
381                 self.api.containers().update(uuid=current['uuid'],
382                                              body={
383                                                  'output': self.final_output_collection.portable_data_hash(),
384                                              }).execute(num_retries=self.num_retries)
385                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
386                                               body={
387                                                   'is_trashed': True
388                                               }).execute(num_retries=self.num_retries)
389             except Exception as e:
390                 logger.info("Setting container output: %s", e)
391         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
392             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
393                                    body={
394                                        'output': self.final_output_collection.portable_data_hash(),
395                                        'success': self.final_status == "success",
396                                        'progress':1.0
397                                    }).execute(num_retries=self.num_retries)
398
399     def arv_executor(self, tool, job_order, **kwargs):
400         self.debug = kwargs.get("debug")
401
402         tool.visit(self.check_features)
403
404         self.project_uuid = kwargs.get("project_uuid")
405         self.pipeline = None
406         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
407                                                                  collection_cache=self.collection_cache)
408         self.fs_access = make_fs_access(kwargs["basedir"])
409         self.secret_store = kwargs.get("secret_store")
410
411         self.trash_intermediate = kwargs["trash_intermediate"]
412         if self.trash_intermediate and self.work_api != "containers":
413             raise Exception("--trash-intermediate is only supported with --api=containers.")
414
415         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
416         if self.intermediate_output_ttl and self.work_api != "containers":
417             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
418         if self.intermediate_output_ttl < 0:
419             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
420
421         if not kwargs.get("name"):
422             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
423
424         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
425         # Also uploads docker images.
426         merged_map = upload_workflow_deps(self, tool)
427
428         # Reload tool object which may have been updated by
429         # upload_workflow_deps
430         # Don't validate this time because it will just print redundant errors.
431         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
432                                   makeTool=self.arv_make_tool,
433                                   loader=tool.doc_loader,
434                                   avsc_names=tool.doc_schema,
435                                   metadata=tool.metadata,
436                                   do_validate=False)
437
438         # Upload local file references in the job order.
439         job_order = upload_job_order(self, "%s input" % kwargs["name"],
440                                      tool, job_order)
441
442         existing_uuid = kwargs.get("update_workflow")
443         if existing_uuid or kwargs.get("create_workflow"):
444             # Create a pipeline template or workflow record and exit.
445             if self.work_api == "jobs":
446                 tmpl = RunnerTemplate(self, tool, job_order,
447                                       kwargs.get("enable_reuse"),
448                                       uuid=existing_uuid,
449                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
450                                       name=kwargs["name"],
451                                       merged_map=merged_map)
452                 tmpl.save()
453                 # cwltool.main will write our return value to stdout.
454                 return (tmpl.uuid, "success")
455             elif self.work_api == "containers":
456                 return (upload_workflow(self, tool, job_order,
457                                         self.project_uuid,
458                                         uuid=existing_uuid,
459                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
460                                         name=kwargs["name"],
461                                         merged_map=merged_map),
462                         "success")
463
464         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
465         self.eval_timeout = kwargs.get("eval_timeout")
466
467         kwargs["make_fs_access"] = make_fs_access
468         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
469         kwargs["use_container"] = True
470         kwargs["tmpdir_prefix"] = "tmp"
471         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
472
473         if self.work_api == "containers":
474             if self.ignore_docker_for_reuse:
475                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
476             kwargs["outdir"] = "/var/spool/cwl"
477             kwargs["docker_outdir"] = "/var/spool/cwl"
478             kwargs["tmpdir"] = "/tmp"
479             kwargs["docker_tmpdir"] = "/tmp"
480         elif self.work_api == "jobs":
481             if kwargs["priority"] != DEFAULT_PRIORITY:
482                 raise Exception("--priority not implemented for jobs API.")
483             kwargs["outdir"] = "$(task.outdir)"
484             kwargs["docker_outdir"] = "$(task.outdir)"
485             kwargs["tmpdir"] = "$(task.tmpdir)"
486
487         if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
488             raise Exception("--priority must be in the range 1..1000.")
489
490         runnerjob = None
491         if kwargs.get("submit"):
492             # Submit a runner job to run the workflow for us.
493             if self.work_api == "containers":
494                 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
495                     kwargs["runnerjob"] = tool.tool["id"]
496                     runnerjob = tool.job(job_order,
497                                          self.output_callback,
498                                          **kwargs).next()
499                 else:
500                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
501                                                 self.output_name,
502                                                 self.output_tags,
503                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
504                                                 name=kwargs.get("name"),
505                                                 on_error=kwargs.get("on_error"),
506                                                 submit_runner_image=kwargs.get("submit_runner_image"),
507                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
508                                                 merged_map=merged_map,
509                                                 priority=kwargs.get("priority"),
510                                                 secret_store=self.secret_store)
511             elif self.work_api == "jobs":
512                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
513                                       self.output_name,
514                                       self.output_tags,
515                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
516                                       name=kwargs.get("name"),
517                                       on_error=kwargs.get("on_error"),
518                                       submit_runner_image=kwargs.get("submit_runner_image"),
519                                       merged_map=merged_map)
520         elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
521             # Create pipeline for local run
522             self.pipeline = self.api.pipeline_instances().create(
523                 body={
524                     "owner_uuid": self.project_uuid,
525                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
526                     "components": {},
527                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
528             logger.info("Pipeline instance %s", self.pipeline["uuid"])
529
530         if runnerjob and not kwargs.get("wait"):
531             runnerjob.run(**kwargs)
532             return (runnerjob.uuid, "success")
533
534         self.poll_api = arvados.api('v1')
535         self.polling_thread = threading.Thread(target=self.poll_states)
536         self.polling_thread.start()
537
538         for r in xrange(0, self.thread_count):
539             t = threading.Thread(target=self.task_queue_func)
540             self.task_queue_threads.append(t)
541             t.start()
542
543         if runnerjob:
544             jobiter = iter((runnerjob,))
545         else:
546             if "cwl_runner_job" in kwargs:
547                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
548             jobiter = tool.job(job_order,
549                                self.output_callback,
550                                **kwargs)
551
552         try:
553             self.workflow_eval_lock.acquire()
554             # Holds the lock while this code runs and releases it when
555             # it is safe to do so in self.workflow_eval_lock.wait(),
556             # at which point on_message can update job state and
557             # process output callbacks.
558
559             loopperf = Perf(metrics, "jobiter")
560             loopperf.__enter__()
561             for runnable in jobiter:
562                 loopperf.__exit__()
563
564                 if self.stop_polling.is_set():
565                     break
566
567                 if runnable:
568                     with Perf(metrics, "run"):
569                         self.start_run(runnable, kwargs)
570                 else:
571                     if (self.in_flight + len(self.processes)) > 0:
572                         self.workflow_eval_lock.wait(3)
573                     else:
574                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
575                         break
576                 loopperf.__enter__()
577             loopperf.__exit__()
578
579             while (self.in_flight + len(self.processes)) > 0:
580                 self.workflow_eval_lock.wait(3)
581
582         except UnsupportedRequirement:
583             raise
584         except:
585             if sys.exc_info()[0] is KeyboardInterrupt:
586                 logger.error("Interrupted, marking pipeline as failed")
587             else:
588                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
589             if self.pipeline:
590                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
591                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
592             if runnerjob and runnerjob.uuid and self.work_api == "containers":
593                 self.api.container_requests().update(uuid=runnerjob.uuid,
594                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
595         finally:
596             self.workflow_eval_lock.release()
597             try:
598                 # Drain queue
599                 while not self.task_queue.empty():
600                     self.task_queue.get()
601             except Queue.Empty:
602                 pass
603             self.stop_polling.set()
604             self.polling_thread.join()
605             for t in self.task_queue_threads:
606                 self.task_queue.put(None)
607             for t in self.task_queue_threads:
608                 t.join()
609
610         if self.final_status == "UnsupportedRequirement":
611             raise UnsupportedRequirement("Check log for details.")
612
613         if self.final_output is None:
614             raise WorkflowException("Workflow did not return a result.")
615
616         if kwargs.get("submit") and isinstance(runnerjob, Runner):
617             logger.info("Final output collection %s", runnerjob.final_output)
618         else:
619             if self.output_name is None:
620                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
621             if self.output_tags is None:
622                 self.output_tags = ""
623             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
624             self.set_crunch_output()
625
626         if kwargs.get("compute_checksum"):
627             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
628             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
629
630         if self.trash_intermediate and self.final_status == "success":
631             self.trash_intermediate_output()
632
633         return (self.final_output, self.final_status)
634
635
636 def versionstring():
637     """Print version string of key packages for provenance and debugging."""
638
639     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
640     arvpkg = pkg_resources.require("arvados-python-client")
641     cwlpkg = pkg_resources.require("cwltool")
642
643     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
644                                     "arvados-python-client", arvpkg[0].version,
645                                     "cwltool", cwlpkg[0].version)
646
647
648 def arg_parser():  # type: () -> argparse.ArgumentParser
649     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
650
651     parser.add_argument("--basedir", type=str,
652                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
653     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
654                         help="Output directory, default current directory")
655
656     parser.add_argument("--eval-timeout",
657                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
658                         type=float,
659                         default=20)
660
661     exgroup = parser.add_mutually_exclusive_group()
662     exgroup.add_argument("--print-dot", action="store_true",
663                          help="Print workflow visualization in graphviz format and exit")
664     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
665     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
666
667     exgroup = parser.add_mutually_exclusive_group()
668     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
669     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
670     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
671
672     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
673
674     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
675
676     exgroup = parser.add_mutually_exclusive_group()
677     exgroup.add_argument("--enable-reuse", action="store_true",
678                         default=True, dest="enable_reuse",
679                         help="Enable job or container reuse (default)")
680     exgroup.add_argument("--disable-reuse", action="store_false",
681                         default=True, dest="enable_reuse",
682                         help="Disable job or container reuse")
683
684     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
685     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
686     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
687     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
688                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
689                         default=False)
690
691     exgroup = parser.add_mutually_exclusive_group()
692     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
693                         default=True, dest="submit")
694     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
695                         default=True, dest="submit")
696     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
697                          dest="create_workflow")
698     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
699     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
700
701     exgroup = parser.add_mutually_exclusive_group()
702     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
703                         default=True, dest="wait")
704     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
705                         default=True, dest="wait")
706
707     exgroup = parser.add_mutually_exclusive_group()
708     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
709                         default=True, dest="log_timestamps")
710     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
711                         default=True, dest="log_timestamps")
712
713     parser.add_argument("--api", type=str,
714                         default=None, dest="work_api",
715                         choices=("jobs", "containers"),
716                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
717
718     parser.add_argument("--compute-checksum", action="store_true", default=False,
719                         help="Compute checksum of contents while collecting outputs",
720                         dest="compute_checksum")
721
722     parser.add_argument("--submit-runner-ram", type=int,
723                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
724                         default=1024)
725
726     parser.add_argument("--submit-runner-image", type=str,
727                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
728                         default=None)
729
730     parser.add_argument("--name", type=str,
731                         help="Name to use for workflow execution instance.",
732                         default=None)
733
734     parser.add_argument("--on-error", type=str,
735                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
736                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
737
738     parser.add_argument("--enable-dev", action="store_true",
739                         help="Enable loading and running development versions "
740                              "of CWL spec.", default=False)
741
742     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
743                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
744                         default=0)
745
746     parser.add_argument("--priority", type=int,
747                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
748                         default=DEFAULT_PRIORITY)
749
750     parser.add_argument("--disable-validate", dest="do_validate",
751                         action="store_false", default=True,
752                         help=argparse.SUPPRESS)
753
754     parser.add_argument("--disable-js-validation",
755                         action="store_true", default=False,
756                         help=argparse.SUPPRESS)
757
758     parser.add_argument("--thread-count", type=int,
759                         default=4, help="Number of threads to use for job submit and output collection.")
760
761     exgroup = parser.add_mutually_exclusive_group()
762     exgroup.add_argument("--trash-intermediate", action="store_true",
763                         default=False, dest="trash_intermediate",
764                          help="Immediately trash intermediate outputs on workflow success.")
765     exgroup.add_argument("--no-trash-intermediate", action="store_false",
766                         default=False, dest="trash_intermediate",
767                         help="Do not trash intermediate outputs (default).")
768
769     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
770     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
771
772     return parser
773
774 def add_arv_hints():
775     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
776     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
777     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
778     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
779     res.close()
780     cwltool.process.supportedProcessRequirements.extend([
781         "http://arvados.org/cwl#RunInSingleContainer",
782         "http://arvados.org/cwl#OutputDirType",
783         "http://arvados.org/cwl#RuntimeConstraints",
784         "http://arvados.org/cwl#PartitionRequirement",
785         "http://arvados.org/cwl#APIRequirement",
786         "http://commonwl.org/cwltool#LoadListingRequirement",
787         "http://arvados.org/cwl#IntermediateOutput",
788         "http://arvados.org/cwl#ReuseRequirement"
789     ])
790
791 def main(args, stdout, stderr, api_client=None, keep_client=None):
792     parser = arg_parser()
793
794     job_order_object = None
795     arvargs = parser.parse_args(args)
796
797     if arvargs.update_workflow:
798         if arvargs.update_workflow.find('-7fd4e-') == 5:
799             want_api = 'containers'
800         elif arvargs.update_workflow.find('-p5p6p-') == 5:
801             want_api = 'jobs'
802         else:
803             want_api = None
804         if want_api and arvargs.work_api and want_api != arvargs.work_api:
805             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
806                 arvargs.update_workflow, want_api, arvargs.work_api))
807             return 1
808         arvargs.work_api = want_api
809
810     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
811         job_order_object = ({}, "")
812
813     add_arv_hints()
814
815     try:
816         if api_client is None:
817             api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
818             keep_client = api_client.keep
819         if keep_client is None:
820             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
821         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
822                               num_retries=4, output_name=arvargs.output_name,
823                               output_tags=arvargs.output_tags)
824     except Exception as e:
825         logger.error(e)
826         return 1
827
828     if arvargs.debug:
829         logger.setLevel(logging.DEBUG)
830         logging.getLogger('arvados').setLevel(logging.DEBUG)
831
832     if arvargs.quiet:
833         logger.setLevel(logging.WARN)
834         logging.getLogger('arvados').setLevel(logging.WARN)
835         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
836
837     if arvargs.metrics:
838         metrics.setLevel(logging.DEBUG)
839         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
840
841     if arvargs.log_timestamps:
842         arvados.log_handler.setFormatter(logging.Formatter(
843             '%(asctime)s %(name)s %(levelname)s: %(message)s',
844             '%Y-%m-%d %H:%M:%S'))
845     else:
846         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
847
848     arvargs.conformance_test = None
849     arvargs.use_container = True
850     arvargs.relax_path_checks = True
851     arvargs.print_supported_versions = False
852
853     make_fs_access = partial(CollectionFsAccess,
854                            collection_cache=runner.collection_cache)
855
856     return cwltool.main.main(args=arvargs,
857                              stdout=stdout,
858                              stderr=stderr,
859                              executor=runner.arv_executor,
860                              makeTool=runner.arv_make_tool,
861                              versionfunc=versionstring,
862                              job_order_object=job_order_object,
863                              make_fs_access=make_fs_access,
864                              fetcher_constructor=partial(CollectionFetcher,
865                                                          api_client=api_client,
866                                                          fs_access=make_fs_access(""),
867                                                          num_retries=runner.num_retries),
868                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
869                              logger_handler=arvados.log_handler,
870                              custom_schema_callback=add_arv_hints)