13773: Intercept logging calls to update runtime_status on runner containers.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20 import Queue
21 import time
22 import signal
23 import thread
24
25 from cwltool.errors import WorkflowException
26 import cwltool.main
27 import cwltool.workflow
28 import cwltool.process
29 from schema_salad.sourceline import SourceLine
30 import schema_salad.validate as validate
31 import cwltool.argparser
32
33 import arvados
34 import arvados.config
35 from arvados.keep import KeepClient
36 from arvados.errors import ApiError
37 import arvados.commands._util as arv_cmd
38
39 from .arvcontainer import ArvadosContainer, RunnerContainer
40 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
41 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
42 from .arvtool import ArvadosCommandTool
43 from .arvworkflow import ArvadosWorkflow, upload_workflow
44 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
45 from .perf import Perf
46 from .pathmapper import NoFollowPathMapper
47 from .task_queue import TaskQueue
48 from .context import ArvLoadingContext, ArvRuntimeContext
49 from .util import get_current_container
50 from ._version import __version__
51
52 from cwltool.pack import pack
53 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
54 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
55 from cwltool.command_line_tool import compute_checksums
56
57 from arvados.api import OrderedJsonModel
58
59 logger = logging.getLogger('arvados.cwl-runner')
60 metrics = logging.getLogger('arvados.cwl-runner.metrics')
61 logger.setLevel(logging.INFO)
62
63 arvados.log_handler.setFormatter(logging.Formatter(
64         '%(asctime)s %(name)s %(levelname)s: %(message)s',
65         '%Y-%m-%d %H:%M:%S'))
66
67 DEFAULT_PRIORITY = 500
68
69 class RuntimeStatusLoggingHandler(logging.Handler):
70     """
71     Intercepts logging calls and report them as runtime statuses on runner
72     containers.
73     """
74     def __init__(self, runtime_status_update_func):
75         super(RuntimeStatusLoggingHandler, self).__init__()
76         self.runtime_status_update = runtime_status_update_func
77
78     def emit(self, record):
79         kind = None
80         if record.levelno == logging.ERROR:
81             kind = 'error'
82         elif record.levelno == logging.WARNING:
83             kind = 'warning'
84         elif record.levelno == logging.INFO:
85             kind = 'activity'
86         if kind is not None:
87             log_msg = record.getMessage()
88             if '\n' in log_msg:
89                 # If the logged message is multi-line, include it as a detail
90                 self.runtime_status_update(
91                     kind,
92                     "%s from %s (please see details)" % (kind, record.name),
93                     log_msg
94                 )
95             else:
96                 self.runtime_status_update(
97                     kind,
98                     "%s: %s" % (record.name, record.getMessage())
99                 )
100
101 class ArvCwlRunner(object):
102     """Execute a CWL tool or workflow, submit work (using either jobs or
103     containers API), wait for them to complete, and report output.
104
105     """
106
107     def __init__(self, api_client,
108                  arvargs=None,
109                  keep_client=None,
110                  num_retries=4,
111                  thread_count=4):
112
113         if arvargs is None:
114             arvargs = argparse.Namespace()
115             arvargs.work_api = None
116             arvargs.output_name = None
117             arvargs.output_tags = None
118             arvargs.thread_count = 1
119
120         self.api = api_client
121         self.processes = {}
122         self.workflow_eval_lock = threading.Condition(threading.RLock())
123         self.final_output = None
124         self.final_status = None
125         self.num_retries = num_retries
126         self.uuid = None
127         self.stop_polling = threading.Event()
128         self.poll_api = None
129         self.pipeline = None
130         self.final_output_collection = None
131         self.output_name = arvargs.output_name
132         self.output_tags = arvargs.output_tags
133         self.project_uuid = None
134         self.intermediate_output_ttl = 0
135         self.intermediate_output_collections = []
136         self.trash_intermediate = False
137         self.thread_count = arvargs.thread_count
138         self.poll_interval = 12
139         self.loadingContext = None
140
141         if keep_client is not None:
142             self.keep_client = keep_client
143         else:
144             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
145
146         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
147
148         self.fetcher_constructor = partial(CollectionFetcher,
149                                            api_client=self.api,
150                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
151                                            num_retries=self.num_retries)
152
153         self.work_api = None
154         expected_api = ["jobs", "containers"]
155         for api in expected_api:
156             try:
157                 methods = self.api._rootDesc.get('resources')[api]['methods']
158                 if ('httpMethod' in methods['create'] and
159                     (arvargs.work_api == api or arvargs.work_api is None)):
160                     self.work_api = api
161                     break
162             except KeyError:
163                 pass
164
165         if not self.work_api:
166             if arvargs.work_api is None:
167                 raise Exception("No supported APIs")
168             else:
169                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
170
171         if self.work_api == "jobs":
172             logger.warn("""
173 *******************************
174 Using the deprecated 'jobs' API.
175
176 To get rid of this warning:
177
178 Users: read about migrating at
179 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
180 and use the option --api=containers
181
182 Admins: configure the cluster to disable the 'jobs' API as described at:
183 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
184 *******************************""")
185
186         self.loadingContext = ArvLoadingContext(vars(arvargs))
187         self.loadingContext.fetcher_constructor = self.fetcher_constructor
188         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
189         self.loadingContext.construct_tool_object = self.arv_make_tool
190
191         # Add a custom logging handler to the root logger for runtime status reporting
192         # if running inside a container
193         if get_current_container(self.api, self.num_retries, logger):
194             root_logger = logging.getLogger('')
195             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
196             root_logger.addHandler(handler)
197
198     def arv_make_tool(self, toolpath_object, loadingContext):
199         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
200             return ArvadosCommandTool(self, toolpath_object, loadingContext)
201         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
202             return ArvadosWorkflow(self, toolpath_object, loadingContext)
203         else:
204             return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
205
206     def output_callback(self, out, processStatus):
207         with self.workflow_eval_lock:
208             if processStatus == "success":
209                 logger.info("Overall process status is %s", processStatus)
210                 state = "Complete"
211             else:
212                 logger.error("Overall process status is %s", processStatus)
213                 state = "Failed"
214             if self.pipeline:
215                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
216                                                         body={"state": state}).execute(num_retries=self.num_retries)
217             self.final_status = processStatus
218             self.final_output = out
219             self.workflow_eval_lock.notifyAll()
220
221
222     def start_run(self, runnable, runtimeContext):
223         self.task_queue.add(partial(runnable.run, runtimeContext))
224
225     def process_submitted(self, container):
226         with self.workflow_eval_lock:
227             self.processes[container.uuid] = container
228
229     def process_done(self, uuid, record):
230         with self.workflow_eval_lock:
231             j = self.processes[uuid]
232             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
233             self.task_queue.add(partial(j.done, record))
234             del self.processes[uuid]
235
236     def runtime_status_update(self, kind, message, detail=None):
237         """
238         Updates the runtime_status field on the runner container.
239         Called from a failing child container: records the first child error
240         or updates the error count on subsequent error statuses.
241         Also called from other parts that need to report errros, warnings or just
242         activity statuses.
243         """
244         with self.workflow_eval_lock:
245             current = get_current_container(self.api, self.num_retries, logger)
246             if current is None:
247                 return
248             runtime_status = current.get('runtime_status', {})
249             # In case of status being an error, only report the first one.
250             if kind == 'error':
251                 if not runtime_status.get('error'):
252                     runtime_status.update({
253                         'error': message,
254                         'errorDetail': detail or "No error logs available"
255                     })
256                 # Further errors are only mentioned as a count.
257                 else:
258                     # Get anything before an optional 'and N more' string.
259                     try:
260                         error_msg = re.match(
261                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
262                         more_failures = re.match(
263                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
264                     except TypeError:
265                         # Ignore tests stubbing errors
266                         return
267                     if more_failures:
268                         failure_qty = int(more_failures.groups()[0])
269                         runtime_status.update({
270                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
271                         })
272                     else:
273                         runtime_status.update({
274                             'error': "%s (and 1 more)" % error_msg
275                         })
276             elif kind in ['warning', 'activity']:
277                 # Record the last warning/activity status without regard of
278                 # previous occurences.
279                 runtime_status.update({
280                     kind: message
281                 })
282                 if detail is not None:
283                     runtime_status.update({
284                         kind+"Detail": detail
285                     })
286             else:
287                 # Ignore any other status kind
288                 return
289             try:
290                 self.api.containers().update(uuid=current['uuid'],
291                                             body={
292                                                 'runtime_status': runtime_status,
293                                             }).execute(num_retries=self.num_retries)
294             except Exception as e:
295                 logger.info("Couldn't update runtime_status: %s", e)
296
297     def wrapped_callback(self, cb, obj, st):
298         with self.workflow_eval_lock:
299             cb(obj, st)
300             self.workflow_eval_lock.notifyAll()
301
302     def get_wrapped_callback(self, cb):
303         return partial(self.wrapped_callback, cb)
304
305     def on_message(self, event):
306         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
307             uuid = event["object_uuid"]
308             if event["properties"]["new_attributes"]["state"] == "Running":
309                 with self.workflow_eval_lock:
310                     j = self.processes[uuid]
311                     if j.running is False:
312                         j.running = True
313                         j.update_pipeline_component(event["properties"]["new_attributes"])
314                         logger.info("%s %s is Running", self.label(j), uuid)
315             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
316                 self.process_done(uuid, event["properties"]["new_attributes"])
317
318     def label(self, obj):
319         return "[%s %s]" % (self.work_api[0:-1], obj.name)
320
321     def poll_states(self):
322         """Poll status of jobs or containers listed in the processes dict.
323
324         Runs in a separate thread.
325         """
326
327         try:
328             remain_wait = self.poll_interval
329             while True:
330                 if remain_wait > 0:
331                     self.stop_polling.wait(remain_wait)
332                 if self.stop_polling.is_set():
333                     break
334                 with self.workflow_eval_lock:
335                     keys = list(self.processes.keys())
336                 if not keys:
337                     remain_wait = self.poll_interval
338                     continue
339
340                 begin_poll = time.time()
341                 if self.work_api == "containers":
342                     table = self.poll_api.container_requests()
343                 elif self.work_api == "jobs":
344                     table = self.poll_api.jobs()
345
346                 try:
347                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
348                 except Exception as e:
349                     logger.warn("Error checking states on API server: %s", e)
350                     remain_wait = self.poll_interval
351                     continue
352
353                 for p in proc_states["items"]:
354                     self.on_message({
355                         "object_uuid": p["uuid"],
356                         "event_type": "update",
357                         "properties": {
358                             "new_attributes": p
359                         }
360                     })
361                 finish_poll = time.time()
362                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
363         except:
364             logger.exception("Fatal error in state polling thread.")
365             with self.workflow_eval_lock:
366                 self.processes.clear()
367                 self.workflow_eval_lock.notifyAll()
368         finally:
369             self.stop_polling.set()
370
371     def add_intermediate_output(self, uuid):
372         if uuid:
373             self.intermediate_output_collections.append(uuid)
374
375     def trash_intermediate_output(self):
376         logger.info("Cleaning up intermediate output collections")
377         for i in self.intermediate_output_collections:
378             try:
379                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
380             except:
381                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
382             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
383                 break
384
385     def check_features(self, obj):
386         if isinstance(obj, dict):
387             if obj.get("writable") and self.work_api != "containers":
388                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
389             if obj.get("class") == "DockerRequirement":
390                 if obj.get("dockerOutputDirectory"):
391                     if self.work_api != "containers":
392                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
393                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
394                     if not obj.get("dockerOutputDirectory").startswith('/'):
395                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
396                             "Option 'dockerOutputDirectory' must be an absolute path.")
397             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
398                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
399             for v in obj.itervalues():
400                 self.check_features(v)
401         elif isinstance(obj, list):
402             for i,v in enumerate(obj):
403                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
404                     self.check_features(v)
405
406     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
407         outputObj = copy.deepcopy(outputObj)
408
409         files = []
410         def capture(fileobj):
411             files.append(fileobj)
412
413         adjustDirObjs(outputObj, capture)
414         adjustFileObjs(outputObj, capture)
415
416         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
417
418         final = arvados.collection.Collection(api_client=self.api,
419                                               keep_client=self.keep_client,
420                                               num_retries=self.num_retries)
421
422         for k,v in generatemapper.items():
423             if k.startswith("_:"):
424                 if v.type == "Directory":
425                     continue
426                 if v.type == "CreateFile":
427                     with final.open(v.target, "wb") as f:
428                         f.write(v.resolved.encode("utf-8"))
429                     continue
430
431             if not k.startswith("keep:"):
432                 raise Exception("Output source is not in keep or a literal")
433             sp = k.split("/")
434             srccollection = sp[0][5:]
435             try:
436                 reader = self.collection_cache.get(srccollection)
437                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
438                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
439             except arvados.errors.ArgumentError as e:
440                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
441                 raise
442             except IOError as e:
443                 logger.warn("While preparing output collection: %s", e)
444
445         def rewrite(fileobj):
446             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
447             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
448                 if k in fileobj:
449                     del fileobj[k]
450
451         adjustDirObjs(outputObj, rewrite)
452         adjustFileObjs(outputObj, rewrite)
453
454         with final.open("cwl.output.json", "w") as f:
455             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
456
457         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
458
459         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
460                     final.api_response()["name"],
461                     final.manifest_locator())
462
463         final_uuid = final.manifest_locator()
464         tags = tagsString.split(',')
465         for tag in tags:
466              self.api.links().create(body={
467                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
468                 }).execute(num_retries=self.num_retries)
469
470         def finalcollection(fileobj):
471             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
472
473         adjustDirObjs(outputObj, finalcollection)
474         adjustFileObjs(outputObj, finalcollection)
475
476         return (outputObj, final)
477
478     def set_crunch_output(self):
479         if self.work_api == "containers":
480             current = get_current_container(self.api, self.num_retries, logger)
481             if current is None:
482                 return
483             try:
484                 self.api.containers().update(uuid=current['uuid'],
485                                              body={
486                                                  'output': self.final_output_collection.portable_data_hash(),
487                                              }).execute(num_retries=self.num_retries)
488                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
489                                               body={
490                                                   'is_trashed': True
491                                               }).execute(num_retries=self.num_retries)
492             except Exception as e:
493                 logger.info("Setting container output: %s", e)
494         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
495             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
496                                    body={
497                                        'output': self.final_output_collection.portable_data_hash(),
498                                        'success': self.final_status == "success",
499                                        'progress':1.0
500                                    }).execute(num_retries=self.num_retries)
501
502     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
503         self.debug = runtimeContext.debug
504
505         tool.visit(self.check_features)
506
507         self.project_uuid = runtimeContext.project_uuid
508         self.pipeline = None
509         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
510         self.secret_store = runtimeContext.secret_store
511
512         self.trash_intermediate = runtimeContext.trash_intermediate
513         if self.trash_intermediate and self.work_api != "containers":
514             raise Exception("--trash-intermediate is only supported with --api=containers.")
515
516         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
517         if self.intermediate_output_ttl and self.work_api != "containers":
518             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
519         if self.intermediate_output_ttl < 0:
520             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
521
522         if runtimeContext.submit_request_uuid and self.work_api != "containers":
523             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
524
525         if not runtimeContext.name:
526             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
527
528         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
529         # Also uploads docker images.
530         merged_map = upload_workflow_deps(self, tool)
531
532         # Reload tool object which may have been updated by
533         # upload_workflow_deps
534         # Don't validate this time because it will just print redundant errors.
535         loadingContext = self.loadingContext.copy()
536         loadingContext.loader = tool.doc_loader
537         loadingContext.avsc_names = tool.doc_schema
538         loadingContext.metadata = tool.metadata
539         loadingContext.do_validate = False
540
541         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
542                                   loadingContext)
543
544         # Upload local file references in the job order.
545         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
546                                      tool, job_order)
547
548         existing_uuid = runtimeContext.update_workflow
549         if existing_uuid or runtimeContext.create_workflow:
550             # Create a pipeline template or workflow record and exit.
551             if self.work_api == "jobs":
552                 tmpl = RunnerTemplate(self, tool, job_order,
553                                       runtimeContext.enable_reuse,
554                                       uuid=existing_uuid,
555                                       submit_runner_ram=runtimeContext.submit_runner_ram,
556                                       name=runtimeContext.name,
557                                       merged_map=merged_map)
558                 tmpl.save()
559                 # cwltool.main will write our return value to stdout.
560                 return (tmpl.uuid, "success")
561             elif self.work_api == "containers":
562                 return (upload_workflow(self, tool, job_order,
563                                         self.project_uuid,
564                                         uuid=existing_uuid,
565                                         submit_runner_ram=runtimeContext.submit_runner_ram,
566                                         name=runtimeContext.name,
567                                         merged_map=merged_map),
568                         "success")
569
570         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
571         self.eval_timeout = runtimeContext.eval_timeout
572
573         runtimeContext = runtimeContext.copy()
574         runtimeContext.use_container = True
575         runtimeContext.tmpdir_prefix = "tmp"
576         runtimeContext.work_api = self.work_api
577
578         if self.work_api == "containers":
579             if self.ignore_docker_for_reuse:
580                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
581             runtimeContext.outdir = "/var/spool/cwl"
582             runtimeContext.docker_outdir = "/var/spool/cwl"
583             runtimeContext.tmpdir = "/tmp"
584             runtimeContext.docker_tmpdir = "/tmp"
585         elif self.work_api == "jobs":
586             if runtimeContext.priority != DEFAULT_PRIORITY:
587                 raise Exception("--priority not implemented for jobs API.")
588             runtimeContext.outdir = "$(task.outdir)"
589             runtimeContext.docker_outdir = "$(task.outdir)"
590             runtimeContext.tmpdir = "$(task.tmpdir)"
591
592         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
593             raise Exception("--priority must be in the range 1..1000.")
594
595         runnerjob = None
596         if runtimeContext.submit:
597             # Submit a runner job to run the workflow for us.
598             if self.work_api == "containers":
599                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
600                     runtimeContext.runnerjob = tool.tool["id"]
601                     runnerjob = tool.job(job_order,
602                                          self.output_callback,
603                                          runtimeContext).next()
604                 else:
605                     runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
606                                                 self.output_name,
607                                                 self.output_tags,
608                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
609                                                 name=runtimeContext.name,
610                                                 on_error=runtimeContext.on_error,
611                                                 submit_runner_image=runtimeContext.submit_runner_image,
612                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
613                                                 merged_map=merged_map,
614                                                 priority=runtimeContext.priority,
615                                                 secret_store=self.secret_store)
616             elif self.work_api == "jobs":
617                 runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
618                                       self.output_name,
619                                       self.output_tags,
620                                       submit_runner_ram=runtimeContext.submit_runner_ram,
621                                       name=runtimeContext.name,
622                                       on_error=runtimeContext.on_error,
623                                       submit_runner_image=runtimeContext.submit_runner_image,
624                                       merged_map=merged_map)
625         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
626             # Create pipeline for local run
627             self.pipeline = self.api.pipeline_instances().create(
628                 body={
629                     "owner_uuid": self.project_uuid,
630                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
631                     "components": {},
632                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
633             logger.info("Pipeline instance %s", self.pipeline["uuid"])
634
635         if runnerjob and not runtimeContext.wait:
636             submitargs = runtimeContext.copy()
637             submitargs.submit = False
638             runnerjob.run(submitargs)
639             return (runnerjob.uuid, "success")
640
641         self.poll_api = arvados.api('v1')
642         self.polling_thread = threading.Thread(target=self.poll_states)
643         self.polling_thread.start()
644
645         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
646
647         if runnerjob:
648             jobiter = iter((runnerjob,))
649         else:
650             if runtimeContext.cwl_runner_job is not None:
651                 self.uuid = runtimeContext.cwl_runner_job.get('uuid')
652             jobiter = tool.job(job_order,
653                                self.output_callback,
654                                runtimeContext)
655
656         try:
657             self.workflow_eval_lock.acquire()
658             # Holds the lock while this code runs and releases it when
659             # it is safe to do so in self.workflow_eval_lock.wait(),
660             # at which point on_message can update job state and
661             # process output callbacks.
662
663             loopperf = Perf(metrics, "jobiter")
664             loopperf.__enter__()
665             for runnable in jobiter:
666                 loopperf.__exit__()
667
668                 if self.stop_polling.is_set():
669                     break
670
671                 if self.task_queue.error is not None:
672                     raise self.task_queue.error
673
674                 if runnable:
675                     with Perf(metrics, "run"):
676                         self.start_run(runnable, runtimeContext)
677                 else:
678                     if (self.task_queue.in_flight + len(self.processes)) > 0:
679                         self.workflow_eval_lock.wait(3)
680                     else:
681                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
682                         break
683                 loopperf.__enter__()
684             loopperf.__exit__()
685
686             while (self.task_queue.in_flight + len(self.processes)) > 0:
687                 if self.task_queue.error is not None:
688                     raise self.task_queue.error
689                 self.workflow_eval_lock.wait(3)
690
691         except UnsupportedRequirement:
692             raise
693         except:
694             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
695                 logger.error("Interrupted, workflow will be cancelled")
696             else:
697                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
698             if self.pipeline:
699                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
700                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
701             if runnerjob and runnerjob.uuid and self.work_api == "containers":
702                 self.api.container_requests().update(uuid=runnerjob.uuid,
703                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
704         finally:
705             self.workflow_eval_lock.release()
706             self.task_queue.drain()
707             self.stop_polling.set()
708             self.polling_thread.join()
709             self.task_queue.join()
710
711         if self.final_status == "UnsupportedRequirement":
712             raise UnsupportedRequirement("Check log for details.")
713
714         if self.final_output is None:
715             raise WorkflowException("Workflow did not return a result.")
716
717         if runtimeContext.submit and isinstance(runnerjob, Runner):
718             logger.info("Final output collection %s", runnerjob.final_output)
719         else:
720             if self.output_name is None:
721                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
722             if self.output_tags is None:
723                 self.output_tags = ""
724
725             storage_classes = runtimeContext.storage_classes.strip().split(",")
726             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
727             self.set_crunch_output()
728
729         if runtimeContext.compute_checksum:
730             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
731             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
732
733         if self.trash_intermediate and self.final_status == "success":
734             self.trash_intermediate_output()
735
736         return (self.final_output, self.final_status)
737
738
739 def versionstring():
740     """Print version string of key packages for provenance and debugging."""
741
742     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
743     arvpkg = pkg_resources.require("arvados-python-client")
744     cwlpkg = pkg_resources.require("cwltool")
745
746     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
747                                     "arvados-python-client", arvpkg[0].version,
748                                     "cwltool", cwlpkg[0].version)
749
750
751 def arg_parser():  # type: () -> argparse.ArgumentParser
752     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
753
754     parser.add_argument("--basedir", type=str,
755                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
756     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
757                         help="Output directory, default current directory")
758
759     parser.add_argument("--eval-timeout",
760                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
761                         type=float,
762                         default=20)
763
764     exgroup = parser.add_mutually_exclusive_group()
765     exgroup.add_argument("--print-dot", action="store_true",
766                          help="Print workflow visualization in graphviz format and exit")
767     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
768     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
769
770     exgroup = parser.add_mutually_exclusive_group()
771     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
772     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
773     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
774
775     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
776
777     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
778
779     exgroup = parser.add_mutually_exclusive_group()
780     exgroup.add_argument("--enable-reuse", action="store_true",
781                         default=True, dest="enable_reuse",
782                         help="Enable job or container reuse (default)")
783     exgroup.add_argument("--disable-reuse", action="store_false",
784                         default=True, dest="enable_reuse",
785                         help="Disable job or container reuse")
786
787     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
788     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
789     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
790     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
791                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
792                         default=False)
793
794     exgroup = parser.add_mutually_exclusive_group()
795     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
796                         default=True, dest="submit")
797     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
798                         default=True, dest="submit")
799     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
800                          dest="create_workflow")
801     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
802     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
803
804     exgroup = parser.add_mutually_exclusive_group()
805     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
806                         default=True, dest="wait")
807     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
808                         default=True, dest="wait")
809
810     exgroup = parser.add_mutually_exclusive_group()
811     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
812                         default=True, dest="log_timestamps")
813     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
814                         default=True, dest="log_timestamps")
815
816     parser.add_argument("--api", type=str,
817                         default=None, dest="work_api",
818                         choices=("jobs", "containers"),
819                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
820
821     parser.add_argument("--compute-checksum", action="store_true", default=False,
822                         help="Compute checksum of contents while collecting outputs",
823                         dest="compute_checksum")
824
825     parser.add_argument("--submit-runner-ram", type=int,
826                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
827                         default=None)
828
829     parser.add_argument("--submit-runner-image", type=str,
830                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
831                         default=None)
832
833     parser.add_argument("--submit-request-uuid", type=str,
834                         default=None,
835                         help="Update and commit supplied container request instead of creating a new one (containers API only).")
836
837     parser.add_argument("--name", type=str,
838                         help="Name to use for workflow execution instance.",
839                         default=None)
840
841     parser.add_argument("--on-error", type=str,
842                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
843                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
844
845     parser.add_argument("--enable-dev", action="store_true",
846                         help="Enable loading and running development versions "
847                              "of CWL spec.", default=False)
848     parser.add_argument('--storage-classes', default="default", type=str,
849                         help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
850
851     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
852                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
853                         default=0)
854
855     parser.add_argument("--priority", type=int,
856                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
857                         default=DEFAULT_PRIORITY)
858
859     parser.add_argument("--disable-validate", dest="do_validate",
860                         action="store_false", default=True,
861                         help=argparse.SUPPRESS)
862
863     parser.add_argument("--disable-js-validation",
864                         action="store_true", default=False,
865                         help=argparse.SUPPRESS)
866
867     parser.add_argument("--thread-count", type=int,
868                         default=4, help="Number of threads to use for job submit and output collection.")
869
870     exgroup = parser.add_mutually_exclusive_group()
871     exgroup.add_argument("--trash-intermediate", action="store_true",
872                         default=False, dest="trash_intermediate",
873                          help="Immediately trash intermediate outputs on workflow success.")
874     exgroup.add_argument("--no-trash-intermediate", action="store_false",
875                         default=False, dest="trash_intermediate",
876                         help="Do not trash intermediate outputs (default).")
877
878     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
879     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
880
881     return parser
882
883 def add_arv_hints():
884     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
885     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
886     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
887     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
888     res.close()
889     cwltool.process.supportedProcessRequirements.extend([
890         "http://arvados.org/cwl#RunInSingleContainer",
891         "http://arvados.org/cwl#OutputDirType",
892         "http://arvados.org/cwl#RuntimeConstraints",
893         "http://arvados.org/cwl#PartitionRequirement",
894         "http://arvados.org/cwl#APIRequirement",
895         "http://commonwl.org/cwltool#LoadListingRequirement",
896         "http://arvados.org/cwl#IntermediateOutput",
897         "http://arvados.org/cwl#ReuseRequirement"
898     ])
899
900 def exit_signal_handler(sigcode, frame):
901     logger.error("Caught signal {}, exiting.".format(sigcode))
902     sys.exit(-sigcode)
903
904 def main(args, stdout, stderr, api_client=None, keep_client=None,
905          install_sig_handlers=True):
906     parser = arg_parser()
907
908     job_order_object = None
909     arvargs = parser.parse_args(args)
910
911     if len(arvargs.storage_classes.strip().split(',')) > 1:
912         logger.error("Multiple storage classes are not supported currently.")
913         return 1
914
915     arvargs.use_container = True
916     arvargs.relax_path_checks = True
917     arvargs.print_supported_versions = False
918
919     if install_sig_handlers:
920         arv_cmd.install_signal_handlers()
921
922     if arvargs.update_workflow:
923         if arvargs.update_workflow.find('-7fd4e-') == 5:
924             want_api = 'containers'
925         elif arvargs.update_workflow.find('-p5p6p-') == 5:
926             want_api = 'jobs'
927         else:
928             want_api = None
929         if want_api and arvargs.work_api and want_api != arvargs.work_api:
930             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
931                 arvargs.update_workflow, want_api, arvargs.work_api))
932             return 1
933         arvargs.work_api = want_api
934
935     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
936         job_order_object = ({}, "")
937
938     add_arv_hints()
939
940     try:
941         if api_client is None:
942             api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
943             keep_client = api_client.keep
944             # Make an API object now so errors are reported early.
945             api_client.users().current().execute()
946         if keep_client is None:
947             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
948         runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4)
949     except Exception as e:
950         logger.error(e)
951         return 1
952
953     if arvargs.debug:
954         logger.setLevel(logging.DEBUG)
955         logging.getLogger('arvados').setLevel(logging.DEBUG)
956
957     if arvargs.quiet:
958         logger.setLevel(logging.WARN)
959         logging.getLogger('arvados').setLevel(logging.WARN)
960         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
961
962     if arvargs.metrics:
963         metrics.setLevel(logging.DEBUG)
964         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
965
966     if arvargs.log_timestamps:
967         arvados.log_handler.setFormatter(logging.Formatter(
968             '%(asctime)s %(name)s %(levelname)s: %(message)s',
969             '%Y-%m-%d %H:%M:%S'))
970     else:
971         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
972
973     for key, val in cwltool.argparser.get_default_args().items():
974         if not hasattr(arvargs, key):
975             setattr(arvargs, key, val)
976
977     runtimeContext = ArvRuntimeContext(vars(arvargs))
978     runtimeContext.make_fs_access = partial(CollectionFsAccess,
979                              collection_cache=runner.collection_cache)
980
981     return cwltool.main.main(args=arvargs,
982                              stdout=stdout,
983                              stderr=stderr,
984                              executor=runner.arv_executor,
985                              versionfunc=versionstring,
986                              job_order_object=job_order_object,
987                              logger_handler=arvados.log_handler,
988                              custom_schema_callback=add_arv_hints,
989                              loadingContext=runner.loadingContext,
990                              runtimeContext=runtimeContext)