Merge branch '12690-12748-crunchstat-summary'
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20 import Queue
21 import time
22 import signal
23 import thread
24
25 from cwltool.errors import WorkflowException
26 import cwltool.main
27 import cwltool.workflow
28 import cwltool.process
29 from schema_salad.sourceline import SourceLine
30 import schema_salad.validate as validate
31 import cwltool.argparser
32
33 import arvados
34 import arvados.config
35 from arvados.keep import KeepClient
36 from arvados.errors import ApiError
37 import arvados.commands._util as arv_cmd
38
39 from .arvcontainer import ArvadosContainer, RunnerContainer
40 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
41 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
42 from .arvtool import ArvadosCommandTool
43 from .arvworkflow import ArvadosWorkflow, upload_workflow
44 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
45 from .perf import Perf
46 from .pathmapper import NoFollowPathMapper
47 from .task_queue import TaskQueue
48 from .context import ArvLoadingContext, ArvRuntimeContext
49 from .util import get_current_container
50 from ._version import __version__
51
52 from cwltool.pack import pack
53 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
54 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
55 from cwltool.command_line_tool import compute_checksums
56
57 from arvados.api import OrderedJsonModel
58
59 logger = logging.getLogger('arvados.cwl-runner')
60 metrics = logging.getLogger('arvados.cwl-runner.metrics')
61 logger.setLevel(logging.INFO)
62
63 arvados.log_handler.setFormatter(logging.Formatter(
64         '%(asctime)s %(name)s %(levelname)s: %(message)s',
65         '%Y-%m-%d %H:%M:%S'))
66
67 DEFAULT_PRIORITY = 500
68
69 class RuntimeStatusLoggingHandler(logging.Handler):
70     """
71     Intercepts logging calls and report them as runtime statuses on runner
72     containers.
73     """
74     def __init__(self, runtime_status_update_func):
75         super(RuntimeStatusLoggingHandler, self).__init__()
76         self.runtime_status_update = runtime_status_update_func
77
78     def emit(self, record):
79         kind = None
80         if record.levelno >= logging.ERROR:
81             kind = 'error'
82         elif record.levelno >= logging.WARNING:
83             kind = 'warning'
84         if kind is not None:
85             log_msg = record.getMessage()
86             if '\n' in log_msg:
87                 # If the logged message is multi-line, use its first line as status
88                 # and the rest as detail.
89                 status, detail = log_msg.split('\n', 1)
90                 self.runtime_status_update(
91                     kind,
92                     "%s: %s" % (record.name, status),
93                     detail
94                 )
95             else:
96                 self.runtime_status_update(
97                     kind,
98                     "%s: %s" % (record.name, record.getMessage())
99                 )
100
101 class ArvCwlRunner(object):
102     """Execute a CWL tool or workflow, submit work (using either jobs or
103     containers API), wait for them to complete, and report output.
104
105     """
106
107     def __init__(self, api_client,
108                  arvargs=None,
109                  keep_client=None,
110                  num_retries=4,
111                  thread_count=4):
112
113         if arvargs is None:
114             arvargs = argparse.Namespace()
115             arvargs.work_api = None
116             arvargs.output_name = None
117             arvargs.output_tags = None
118             arvargs.thread_count = 1
119
120         self.api = api_client
121         self.processes = {}
122         self.workflow_eval_lock = threading.Condition(threading.RLock())
123         self.final_output = None
124         self.final_status = None
125         self.num_retries = num_retries
126         self.uuid = None
127         self.stop_polling = threading.Event()
128         self.poll_api = None
129         self.pipeline = None
130         self.final_output_collection = None
131         self.output_name = arvargs.output_name
132         self.output_tags = arvargs.output_tags
133         self.project_uuid = None
134         self.intermediate_output_ttl = 0
135         self.intermediate_output_collections = []
136         self.trash_intermediate = False
137         self.thread_count = arvargs.thread_count
138         self.poll_interval = 12
139         self.loadingContext = None
140
141         if keep_client is not None:
142             self.keep_client = keep_client
143         else:
144             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
145
146         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
147
148         self.fetcher_constructor = partial(CollectionFetcher,
149                                            api_client=self.api,
150                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
151                                            num_retries=self.num_retries)
152
153         self.work_api = None
154         expected_api = ["jobs", "containers"]
155         for api in expected_api:
156             try:
157                 methods = self.api._rootDesc.get('resources')[api]['methods']
158                 if ('httpMethod' in methods['create'] and
159                     (arvargs.work_api == api or arvargs.work_api is None)):
160                     self.work_api = api
161                     break
162             except KeyError:
163                 pass
164
165         if not self.work_api:
166             if arvargs.work_api is None:
167                 raise Exception("No supported APIs")
168             else:
169                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
170
171         if self.work_api == "jobs":
172             logger.warn("""
173 *******************************
174 Using the deprecated 'jobs' API.
175
176 To get rid of this warning:
177
178 Users: read about migrating at
179 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
180 and use the option --api=containers
181
182 Admins: configure the cluster to disable the 'jobs' API as described at:
183 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
184 *******************************""")
185
186         self.loadingContext = ArvLoadingContext(vars(arvargs))
187         self.loadingContext.fetcher_constructor = self.fetcher_constructor
188         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
189         self.loadingContext.construct_tool_object = self.arv_make_tool
190
191         # Add a custom logging handler to the root logger for runtime status reporting
192         # if running inside a container
193         if get_current_container(self.api, self.num_retries, logger):
194             root_logger = logging.getLogger('')
195             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
196             root_logger.addHandler(handler)
197
198     def arv_make_tool(self, toolpath_object, loadingContext):
199         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
200             return ArvadosCommandTool(self, toolpath_object, loadingContext)
201         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
202             return ArvadosWorkflow(self, toolpath_object, loadingContext)
203         else:
204             return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
205
206     def output_callback(self, out, processStatus):
207         with self.workflow_eval_lock:
208             if processStatus == "success":
209                 logger.info("Overall process status is %s", processStatus)
210                 state = "Complete"
211             else:
212                 logger.error("Overall process status is %s", processStatus)
213                 state = "Failed"
214             if self.pipeline:
215                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
216                                                         body={"state": state}).execute(num_retries=self.num_retries)
217             self.final_status = processStatus
218             self.final_output = out
219             self.workflow_eval_lock.notifyAll()
220
221
222     def start_run(self, runnable, runtimeContext):
223         self.task_queue.add(partial(runnable.run, runtimeContext))
224
225     def process_submitted(self, container):
226         with self.workflow_eval_lock:
227             self.processes[container.uuid] = container
228
229     def process_done(self, uuid, record):
230         with self.workflow_eval_lock:
231             j = self.processes[uuid]
232             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
233             self.task_queue.add(partial(j.done, record))
234             del self.processes[uuid]
235
236     def runtime_status_update(self, kind, message, detail=None):
237         """
238         Updates the runtime_status field on the runner container.
239         Called when there's a need to report errors, warnings or just
240         activity statuses, for example in the RuntimeStatusLoggingHandler.
241         """
242         with self.workflow_eval_lock:
243             current = get_current_container(self.api, self.num_retries, logger)
244             if current is None:
245                 return
246             runtime_status = current.get('runtime_status', {})
247             # In case of status being an error, only report the first one.
248             if kind == 'error':
249                 if not runtime_status.get('error'):
250                     runtime_status.update({
251                         'error': message
252                     })
253                     if detail is not None:
254                         runtime_status.update({
255                             'errorDetail': detail
256                         })
257                 # Further errors are only mentioned as a count.
258                 else:
259                     # Get anything before an optional 'and N more' string.
260                     try:
261                         error_msg = re.match(
262                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
263                         more_failures = re.match(
264                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
265                     except TypeError:
266                         # Ignore tests stubbing errors
267                         return
268                     if more_failures:
269                         failure_qty = int(more_failures.groups()[0])
270                         runtime_status.update({
271                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
272                         })
273                     else:
274                         runtime_status.update({
275                             'error': "%s (and 1 more)" % error_msg
276                         })
277             elif kind in ['warning', 'activity']:
278                 # Record the last warning/activity status without regard of
279                 # previous occurences.
280                 runtime_status.update({
281                     kind: message
282                 })
283                 if detail is not None:
284                     runtime_status.update({
285                         kind+"Detail": detail
286                     })
287             else:
288                 # Ignore any other status kind
289                 return
290             try:
291                 self.api.containers().update(uuid=current['uuid'],
292                                             body={
293                                                 'runtime_status': runtime_status,
294                                             }).execute(num_retries=self.num_retries)
295             except Exception as e:
296                 logger.info("Couldn't update runtime_status: %s", e)
297
298     def wrapped_callback(self, cb, obj, st):
299         with self.workflow_eval_lock:
300             cb(obj, st)
301             self.workflow_eval_lock.notifyAll()
302
303     def get_wrapped_callback(self, cb):
304         return partial(self.wrapped_callback, cb)
305
306     def on_message(self, event):
307         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
308             uuid = event["object_uuid"]
309             if event["properties"]["new_attributes"]["state"] == "Running":
310                 with self.workflow_eval_lock:
311                     j = self.processes[uuid]
312                     if j.running is False:
313                         j.running = True
314                         j.update_pipeline_component(event["properties"]["new_attributes"])
315                         logger.info("%s %s is Running", self.label(j), uuid)
316             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
317                 self.process_done(uuid, event["properties"]["new_attributes"])
318
319     def label(self, obj):
320         return "[%s %s]" % (self.work_api[0:-1], obj.name)
321
322     def poll_states(self):
323         """Poll status of jobs or containers listed in the processes dict.
324
325         Runs in a separate thread.
326         """
327
328         try:
329             remain_wait = self.poll_interval
330             while True:
331                 if remain_wait > 0:
332                     self.stop_polling.wait(remain_wait)
333                 if self.stop_polling.is_set():
334                     break
335                 with self.workflow_eval_lock:
336                     keys = list(self.processes.keys())
337                 if not keys:
338                     remain_wait = self.poll_interval
339                     continue
340
341                 begin_poll = time.time()
342                 if self.work_api == "containers":
343                     table = self.poll_api.container_requests()
344                 elif self.work_api == "jobs":
345                     table = self.poll_api.jobs()
346
347                 try:
348                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
349                 except Exception as e:
350                     logger.warn("Error checking states on API server: %s", e)
351                     remain_wait = self.poll_interval
352                     continue
353
354                 for p in proc_states["items"]:
355                     self.on_message({
356                         "object_uuid": p["uuid"],
357                         "event_type": "update",
358                         "properties": {
359                             "new_attributes": p
360                         }
361                     })
362                 finish_poll = time.time()
363                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
364         except:
365             logger.exception("Fatal error in state polling thread.")
366             with self.workflow_eval_lock:
367                 self.processes.clear()
368                 self.workflow_eval_lock.notifyAll()
369         finally:
370             self.stop_polling.set()
371
372     def add_intermediate_output(self, uuid):
373         if uuid:
374             self.intermediate_output_collections.append(uuid)
375
376     def trash_intermediate_output(self):
377         logger.info("Cleaning up intermediate output collections")
378         for i in self.intermediate_output_collections:
379             try:
380                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
381             except:
382                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
383             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
384                 break
385
386     def check_features(self, obj):
387         if isinstance(obj, dict):
388             if obj.get("writable") and self.work_api != "containers":
389                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
390             if obj.get("class") == "DockerRequirement":
391                 if obj.get("dockerOutputDirectory"):
392                     if self.work_api != "containers":
393                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
394                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
395                     if not obj.get("dockerOutputDirectory").startswith('/'):
396                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
397                             "Option 'dockerOutputDirectory' must be an absolute path.")
398             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
399                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
400             for v in obj.itervalues():
401                 self.check_features(v)
402         elif isinstance(obj, list):
403             for i,v in enumerate(obj):
404                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
405                     self.check_features(v)
406
407     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
408         outputObj = copy.deepcopy(outputObj)
409
410         files = []
411         def capture(fileobj):
412             files.append(fileobj)
413
414         adjustDirObjs(outputObj, capture)
415         adjustFileObjs(outputObj, capture)
416
417         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
418
419         final = arvados.collection.Collection(api_client=self.api,
420                                               keep_client=self.keep_client,
421                                               num_retries=self.num_retries)
422
423         for k,v in generatemapper.items():
424             if k.startswith("_:"):
425                 if v.type == "Directory":
426                     continue
427                 if v.type == "CreateFile":
428                     with final.open(v.target, "wb") as f:
429                         f.write(v.resolved.encode("utf-8"))
430                     continue
431
432             if not k.startswith("keep:"):
433                 raise Exception("Output source is not in keep or a literal")
434             sp = k.split("/")
435             srccollection = sp[0][5:]
436             try:
437                 reader = self.collection_cache.get(srccollection)
438                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
439                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
440             except arvados.errors.ArgumentError as e:
441                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
442                 raise
443             except IOError as e:
444                 logger.warn("While preparing output collection: %s", e)
445
446         def rewrite(fileobj):
447             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
448             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
449                 if k in fileobj:
450                     del fileobj[k]
451
452         adjustDirObjs(outputObj, rewrite)
453         adjustFileObjs(outputObj, rewrite)
454
455         with final.open("cwl.output.json", "w") as f:
456             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
457
458         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
459
460         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
461                     final.api_response()["name"],
462                     final.manifest_locator())
463
464         final_uuid = final.manifest_locator()
465         tags = tagsString.split(',')
466         for tag in tags:
467              self.api.links().create(body={
468                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
469                 }).execute(num_retries=self.num_retries)
470
471         def finalcollection(fileobj):
472             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
473
474         adjustDirObjs(outputObj, finalcollection)
475         adjustFileObjs(outputObj, finalcollection)
476
477         return (outputObj, final)
478
479     def set_crunch_output(self):
480         if self.work_api == "containers":
481             current = get_current_container(self.api, self.num_retries, logger)
482             if current is None:
483                 return
484             try:
485                 self.api.containers().update(uuid=current['uuid'],
486                                              body={
487                                                  'output': self.final_output_collection.portable_data_hash(),
488                                              }).execute(num_retries=self.num_retries)
489                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
490                                               body={
491                                                   'is_trashed': True
492                                               }).execute(num_retries=self.num_retries)
493             except Exception as e:
494                 logger.info("Setting container output: %s", e)
495         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
496             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
497                                    body={
498                                        'output': self.final_output_collection.portable_data_hash(),
499                                        'success': self.final_status == "success",
500                                        'progress':1.0
501                                    }).execute(num_retries=self.num_retries)
502
503     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
504         self.debug = runtimeContext.debug
505
506         tool.visit(self.check_features)
507
508         self.project_uuid = runtimeContext.project_uuid
509         self.pipeline = None
510         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
511         self.secret_store = runtimeContext.secret_store
512
513         self.trash_intermediate = runtimeContext.trash_intermediate
514         if self.trash_intermediate and self.work_api != "containers":
515             raise Exception("--trash-intermediate is only supported with --api=containers.")
516
517         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
518         if self.intermediate_output_ttl and self.work_api != "containers":
519             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
520         if self.intermediate_output_ttl < 0:
521             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
522
523         if runtimeContext.submit_request_uuid and self.work_api != "containers":
524             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
525
526         if not runtimeContext.name:
527             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
528
529         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
530         # Also uploads docker images.
531         merged_map = upload_workflow_deps(self, tool)
532
533         # Reload tool object which may have been updated by
534         # upload_workflow_deps
535         # Don't validate this time because it will just print redundant errors.
536         loadingContext = self.loadingContext.copy()
537         loadingContext.loader = tool.doc_loader
538         loadingContext.avsc_names = tool.doc_schema
539         loadingContext.metadata = tool.metadata
540         loadingContext.do_validate = False
541
542         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
543                                   loadingContext)
544
545         # Upload local file references in the job order.
546         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
547                                      tool, job_order)
548
549         existing_uuid = runtimeContext.update_workflow
550         if existing_uuid or runtimeContext.create_workflow:
551             # Create a pipeline template or workflow record and exit.
552             if self.work_api == "jobs":
553                 tmpl = RunnerTemplate(self, tool, job_order,
554                                       runtimeContext.enable_reuse,
555                                       uuid=existing_uuid,
556                                       submit_runner_ram=runtimeContext.submit_runner_ram,
557                                       name=runtimeContext.name,
558                                       merged_map=merged_map)
559                 tmpl.save()
560                 # cwltool.main will write our return value to stdout.
561                 return (tmpl.uuid, "success")
562             elif self.work_api == "containers":
563                 return (upload_workflow(self, tool, job_order,
564                                         self.project_uuid,
565                                         uuid=existing_uuid,
566                                         submit_runner_ram=runtimeContext.submit_runner_ram,
567                                         name=runtimeContext.name,
568                                         merged_map=merged_map),
569                         "success")
570
571         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
572         self.eval_timeout = runtimeContext.eval_timeout
573
574         runtimeContext = runtimeContext.copy()
575         runtimeContext.use_container = True
576         runtimeContext.tmpdir_prefix = "tmp"
577         runtimeContext.work_api = self.work_api
578
579         if self.work_api == "containers":
580             if self.ignore_docker_for_reuse:
581                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
582             runtimeContext.outdir = "/var/spool/cwl"
583             runtimeContext.docker_outdir = "/var/spool/cwl"
584             runtimeContext.tmpdir = "/tmp"
585             runtimeContext.docker_tmpdir = "/tmp"
586         elif self.work_api == "jobs":
587             if runtimeContext.priority != DEFAULT_PRIORITY:
588                 raise Exception("--priority not implemented for jobs API.")
589             runtimeContext.outdir = "$(task.outdir)"
590             runtimeContext.docker_outdir = "$(task.outdir)"
591             runtimeContext.tmpdir = "$(task.tmpdir)"
592
593         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
594             raise Exception("--priority must be in the range 1..1000.")
595
596         runnerjob = None
597         if runtimeContext.submit:
598             # Submit a runner job to run the workflow for us.
599             if self.work_api == "containers":
600                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
601                     runtimeContext.runnerjob = tool.tool["id"]
602                     runnerjob = tool.job(job_order,
603                                          self.output_callback,
604                                          runtimeContext).next()
605                 else:
606                     runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
607                                                 self.output_name,
608                                                 self.output_tags,
609                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
610                                                 name=runtimeContext.name,
611                                                 on_error=runtimeContext.on_error,
612                                                 submit_runner_image=runtimeContext.submit_runner_image,
613                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
614                                                 merged_map=merged_map,
615                                                 priority=runtimeContext.priority,
616                                                 secret_store=self.secret_store)
617             elif self.work_api == "jobs":
618                 runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
619                                       self.output_name,
620                                       self.output_tags,
621                                       submit_runner_ram=runtimeContext.submit_runner_ram,
622                                       name=runtimeContext.name,
623                                       on_error=runtimeContext.on_error,
624                                       submit_runner_image=runtimeContext.submit_runner_image,
625                                       merged_map=merged_map)
626         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
627             # Create pipeline for local run
628             self.pipeline = self.api.pipeline_instances().create(
629                 body={
630                     "owner_uuid": self.project_uuid,
631                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
632                     "components": {},
633                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
634             logger.info("Pipeline instance %s", self.pipeline["uuid"])
635
636         if runnerjob and not runtimeContext.wait:
637             submitargs = runtimeContext.copy()
638             submitargs.submit = False
639             runnerjob.run(submitargs)
640             return (runnerjob.uuid, "success")
641
642         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
643         self.polling_thread = threading.Thread(target=self.poll_states)
644         self.polling_thread.start()
645
646         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
647
648         if runnerjob:
649             jobiter = iter((runnerjob,))
650         else:
651             if runtimeContext.cwl_runner_job is not None:
652                 self.uuid = runtimeContext.cwl_runner_job.get('uuid')
653             jobiter = tool.job(job_order,
654                                self.output_callback,
655                                runtimeContext)
656
657         try:
658             self.workflow_eval_lock.acquire()
659             # Holds the lock while this code runs and releases it when
660             # it is safe to do so in self.workflow_eval_lock.wait(),
661             # at which point on_message can update job state and
662             # process output callbacks.
663
664             loopperf = Perf(metrics, "jobiter")
665             loopperf.__enter__()
666             for runnable in jobiter:
667                 loopperf.__exit__()
668
669                 if self.stop_polling.is_set():
670                     break
671
672                 if self.task_queue.error is not None:
673                     raise self.task_queue.error
674
675                 if runnable:
676                     with Perf(metrics, "run"):
677                         self.start_run(runnable, runtimeContext)
678                 else:
679                     if (self.task_queue.in_flight + len(self.processes)) > 0:
680                         self.workflow_eval_lock.wait(3)
681                     else:
682                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
683                         break
684                 loopperf.__enter__()
685             loopperf.__exit__()
686
687             while (self.task_queue.in_flight + len(self.processes)) > 0:
688                 if self.task_queue.error is not None:
689                     raise self.task_queue.error
690                 self.workflow_eval_lock.wait(3)
691
692         except UnsupportedRequirement:
693             raise
694         except:
695             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
696                 logger.error("Interrupted, workflow will be cancelled")
697             else:
698                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
699             if self.pipeline:
700                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
701                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
702             if runnerjob and runnerjob.uuid and self.work_api == "containers":
703                 self.api.container_requests().update(uuid=runnerjob.uuid,
704                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
705         finally:
706             self.workflow_eval_lock.release()
707             self.task_queue.drain()
708             self.stop_polling.set()
709             self.polling_thread.join()
710             self.task_queue.join()
711
712         if self.final_status == "UnsupportedRequirement":
713             raise UnsupportedRequirement("Check log for details.")
714
715         if self.final_output is None:
716             raise WorkflowException("Workflow did not return a result.")
717
718         if runtimeContext.submit and isinstance(runnerjob, Runner):
719             logger.info("Final output collection %s", runnerjob.final_output)
720         else:
721             if self.output_name is None:
722                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
723             if self.output_tags is None:
724                 self.output_tags = ""
725
726             storage_classes = runtimeContext.storage_classes.strip().split(",")
727             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
728             self.set_crunch_output()
729
730         if runtimeContext.compute_checksum:
731             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
732             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
733
734         if self.trash_intermediate and self.final_status == "success":
735             self.trash_intermediate_output()
736
737         return (self.final_output, self.final_status)
738
739
740 def versionstring():
741     """Print version string of key packages for provenance and debugging."""
742
743     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
744     arvpkg = pkg_resources.require("arvados-python-client")
745     cwlpkg = pkg_resources.require("cwltool")
746
747     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
748                                     "arvados-python-client", arvpkg[0].version,
749                                     "cwltool", cwlpkg[0].version)
750
751
752 def arg_parser():  # type: () -> argparse.ArgumentParser
753     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
754
755     parser.add_argument("--basedir", type=str,
756                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
757     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
758                         help="Output directory, default current directory")
759
760     parser.add_argument("--eval-timeout",
761                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
762                         type=float,
763                         default=20)
764
765     exgroup = parser.add_mutually_exclusive_group()
766     exgroup.add_argument("--print-dot", action="store_true",
767                          help="Print workflow visualization in graphviz format and exit")
768     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
769     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
770
771     exgroup = parser.add_mutually_exclusive_group()
772     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
773     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
774     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
775
776     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
777
778     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
779
780     exgroup = parser.add_mutually_exclusive_group()
781     exgroup.add_argument("--enable-reuse", action="store_true",
782                         default=True, dest="enable_reuse",
783                         help="Enable job or container reuse (default)")
784     exgroup.add_argument("--disable-reuse", action="store_false",
785                         default=True, dest="enable_reuse",
786                         help="Disable job or container reuse")
787
788     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
789     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
790     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
791     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
792                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
793                         default=False)
794
795     exgroup = parser.add_mutually_exclusive_group()
796     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
797                         default=True, dest="submit")
798     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
799                         default=True, dest="submit")
800     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
801                          dest="create_workflow")
802     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
803     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
804
805     exgroup = parser.add_mutually_exclusive_group()
806     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
807                         default=True, dest="wait")
808     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
809                         default=True, dest="wait")
810
811     exgroup = parser.add_mutually_exclusive_group()
812     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
813                         default=True, dest="log_timestamps")
814     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
815                         default=True, dest="log_timestamps")
816
817     parser.add_argument("--api", type=str,
818                         default=None, dest="work_api",
819                         choices=("jobs", "containers"),
820                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
821
822     parser.add_argument("--compute-checksum", action="store_true", default=False,
823                         help="Compute checksum of contents while collecting outputs",
824                         dest="compute_checksum")
825
826     parser.add_argument("--submit-runner-ram", type=int,
827                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
828                         default=None)
829
830     parser.add_argument("--submit-runner-image", type=str,
831                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
832                         default=None)
833
834     parser.add_argument("--submit-request-uuid", type=str,
835                         default=None,
836                         help="Update and commit supplied container request instead of creating a new one (containers API only).")
837
838     parser.add_argument("--name", type=str,
839                         help="Name to use for workflow execution instance.",
840                         default=None)
841
842     parser.add_argument("--on-error", type=str,
843                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
844                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
845
846     parser.add_argument("--enable-dev", action="store_true",
847                         help="Enable loading and running development versions "
848                              "of CWL spec.", default=False)
849     parser.add_argument('--storage-classes', default="default", type=str,
850                         help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
851
852     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
853                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
854                         default=0)
855
856     parser.add_argument("--priority", type=int,
857                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
858                         default=DEFAULT_PRIORITY)
859
860     parser.add_argument("--disable-validate", dest="do_validate",
861                         action="store_false", default=True,
862                         help=argparse.SUPPRESS)
863
864     parser.add_argument("--disable-js-validation",
865                         action="store_true", default=False,
866                         help=argparse.SUPPRESS)
867
868     parser.add_argument("--thread-count", type=int,
869                         default=4, help="Number of threads to use for job submit and output collection.")
870
871     parser.add_argument("--http-timeout", type=int,
872                         default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
873
874     exgroup = parser.add_mutually_exclusive_group()
875     exgroup.add_argument("--trash-intermediate", action="store_true",
876                         default=False, dest="trash_intermediate",
877                          help="Immediately trash intermediate outputs on workflow success.")
878     exgroup.add_argument("--no-trash-intermediate", action="store_false",
879                         default=False, dest="trash_intermediate",
880                         help="Do not trash intermediate outputs (default).")
881
882     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
883     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
884
885     return parser
886
887 def add_arv_hints():
888     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
889     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
890     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
891     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
892     res.close()
893     cwltool.process.supportedProcessRequirements.extend([
894         "http://arvados.org/cwl#RunInSingleContainer",
895         "http://arvados.org/cwl#OutputDirType",
896         "http://arvados.org/cwl#RuntimeConstraints",
897         "http://arvados.org/cwl#PartitionRequirement",
898         "http://arvados.org/cwl#APIRequirement",
899         "http://commonwl.org/cwltool#LoadListingRequirement",
900         "http://arvados.org/cwl#IntermediateOutput",
901         "http://arvados.org/cwl#ReuseRequirement"
902     ])
903
904 def exit_signal_handler(sigcode, frame):
905     logger.error("Caught signal {}, exiting.".format(sigcode))
906     sys.exit(-sigcode)
907
908 def main(args, stdout, stderr, api_client=None, keep_client=None,
909          install_sig_handlers=True):
910     parser = arg_parser()
911
912     job_order_object = None
913     arvargs = parser.parse_args(args)
914
915     if len(arvargs.storage_classes.strip().split(',')) > 1:
916         logger.error("Multiple storage classes are not supported currently.")
917         return 1
918
919     arvargs.use_container = True
920     arvargs.relax_path_checks = True
921     arvargs.print_supported_versions = False
922
923     if install_sig_handlers:
924         arv_cmd.install_signal_handlers()
925
926     if arvargs.update_workflow:
927         if arvargs.update_workflow.find('-7fd4e-') == 5:
928             want_api = 'containers'
929         elif arvargs.update_workflow.find('-p5p6p-') == 5:
930             want_api = 'jobs'
931         else:
932             want_api = None
933         if want_api and arvargs.work_api and want_api != arvargs.work_api:
934             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
935                 arvargs.update_workflow, want_api, arvargs.work_api))
936             return 1
937         arvargs.work_api = want_api
938
939     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
940         job_order_object = ({}, "")
941
942     add_arv_hints()
943
944     try:
945         if api_client is None:
946             api_client = arvados.safeapi.ThreadSafeApiCache(
947                 api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
948                 keep_params={"num_retries": 4})
949             keep_client = api_client.keep
950             # Make an API object now so errors are reported early.
951             api_client.users().current().execute()
952         if keep_client is None:
953             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
954         runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4)
955     except Exception as e:
956         logger.error(e)
957         return 1
958
959     if arvargs.debug:
960         logger.setLevel(logging.DEBUG)
961         logging.getLogger('arvados').setLevel(logging.DEBUG)
962
963     if arvargs.quiet:
964         logger.setLevel(logging.WARN)
965         logging.getLogger('arvados').setLevel(logging.WARN)
966         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
967
968     if arvargs.metrics:
969         metrics.setLevel(logging.DEBUG)
970         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
971
972     if arvargs.log_timestamps:
973         arvados.log_handler.setFormatter(logging.Formatter(
974             '%(asctime)s %(name)s %(levelname)s: %(message)s',
975             '%Y-%m-%d %H:%M:%S'))
976     else:
977         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
978
979     for key, val in cwltool.argparser.get_default_args().items():
980         if not hasattr(arvargs, key):
981             setattr(arvargs, key, val)
982
983     runtimeContext = ArvRuntimeContext(vars(arvargs))
984     runtimeContext.make_fs_access = partial(CollectionFsAccess,
985                              collection_cache=runner.collection_cache)
986     runtimeContext.http_timeout = arvargs.http_timeout
987
988     return cwltool.main.main(args=arvargs,
989                              stdout=stdout,
990                              stderr=stderr,
991                              executor=runner.arv_executor,
992                              versionfunc=versionstring,
993                              job_order_object=job_order_object,
994                              logger_handler=arvados.log_handler,
995                              custom_schema_callback=add_arv_hints,
996                              loadingContext=runner.loadingContext,
997                              runtimeContext=runtimeContext)