13773: Don't intercept logger.info() calls for runtime status.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20 import Queue
21 import time
22 import signal
23 import thread
24
25 from cwltool.errors import WorkflowException
26 import cwltool.main
27 import cwltool.workflow
28 import cwltool.process
29 from schema_salad.sourceline import SourceLine
30 import schema_salad.validate as validate
31 import cwltool.argparser
32
33 import arvados
34 import arvados.config
35 from arvados.keep import KeepClient
36 from arvados.errors import ApiError
37 import arvados.commands._util as arv_cmd
38
39 from .arvcontainer import ArvadosContainer, RunnerContainer
40 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
41 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
42 from .arvtool import ArvadosCommandTool
43 from .arvworkflow import ArvadosWorkflow, upload_workflow
44 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
45 from .perf import Perf
46 from .pathmapper import NoFollowPathMapper
47 from .task_queue import TaskQueue
48 from .context import ArvLoadingContext, ArvRuntimeContext
49 from .util import get_current_container
50 from ._version import __version__
51
52 from cwltool.pack import pack
53 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
54 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
55 from cwltool.command_line_tool import compute_checksums
56
57 from arvados.api import OrderedJsonModel
58
59 logger = logging.getLogger('arvados.cwl-runner')
60 metrics = logging.getLogger('arvados.cwl-runner.metrics')
61 logger.setLevel(logging.INFO)
62
63 arvados.log_handler.setFormatter(logging.Formatter(
64         '%(asctime)s %(name)s %(levelname)s: %(message)s',
65         '%Y-%m-%d %H:%M:%S'))
66
67 DEFAULT_PRIORITY = 500
68
69 class RuntimeStatusLoggingHandler(logging.Handler):
70     """
71     Intercepts logging calls and report them as runtime statuses on runner
72     containers.
73     """
74     def __init__(self, runtime_status_update_func):
75         super(RuntimeStatusLoggingHandler, self).__init__()
76         self.runtime_status_update = runtime_status_update_func
77
78     def emit(self, record):
79         kind = None
80         if record.levelno == logging.ERROR:
81             kind = 'error'
82         elif record.levelno == logging.WARNING:
83             kind = 'warning'
84         if kind is not None:
85             log_msg = record.getMessage()
86             if '\n' in log_msg:
87                 # If the logged message is multi-line, use its first line as status
88                 # and the rest as detail.
89                 status, detail = log_msg.split('\n', 1)
90                 self.runtime_status_update(
91                     kind,
92                     "%s: %s" % (record.name, status),
93                     detail
94                 )
95             else:
96                 self.runtime_status_update(
97                     kind,
98                     "%s: %s" % (record.name, record.getMessage())
99                 )
100
101 class ArvCwlRunner(object):
102     """Execute a CWL tool or workflow, submit work (using either jobs or
103     containers API), wait for them to complete, and report output.
104
105     """
106
107     def __init__(self, api_client,
108                  arvargs=None,
109                  keep_client=None,
110                  num_retries=4,
111                  thread_count=4):
112
113         if arvargs is None:
114             arvargs = argparse.Namespace()
115             arvargs.work_api = None
116             arvargs.output_name = None
117             arvargs.output_tags = None
118             arvargs.thread_count = 1
119
120         self.api = api_client
121         self.processes = {}
122         self.workflow_eval_lock = threading.Condition(threading.RLock())
123         self.final_output = None
124         self.final_status = None
125         self.num_retries = num_retries
126         self.uuid = None
127         self.stop_polling = threading.Event()
128         self.poll_api = None
129         self.pipeline = None
130         self.final_output_collection = None
131         self.output_name = arvargs.output_name
132         self.output_tags = arvargs.output_tags
133         self.project_uuid = None
134         self.intermediate_output_ttl = 0
135         self.intermediate_output_collections = []
136         self.trash_intermediate = False
137         self.thread_count = arvargs.thread_count
138         self.poll_interval = 12
139         self.loadingContext = None
140
141         if keep_client is not None:
142             self.keep_client = keep_client
143         else:
144             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
145
146         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
147
148         self.fetcher_constructor = partial(CollectionFetcher,
149                                            api_client=self.api,
150                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
151                                            num_retries=self.num_retries)
152
153         self.work_api = None
154         expected_api = ["jobs", "containers"]
155         for api in expected_api:
156             try:
157                 methods = self.api._rootDesc.get('resources')[api]['methods']
158                 if ('httpMethod' in methods['create'] and
159                     (arvargs.work_api == api or arvargs.work_api is None)):
160                     self.work_api = api
161                     break
162             except KeyError:
163                 pass
164
165         if not self.work_api:
166             if arvargs.work_api is None:
167                 raise Exception("No supported APIs")
168             else:
169                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
170
171         if self.work_api == "jobs":
172             logger.warn("""
173 *******************************
174 Using the deprecated 'jobs' API.
175
176 To get rid of this warning:
177
178 Users: read about migrating at
179 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
180 and use the option --api=containers
181
182 Admins: configure the cluster to disable the 'jobs' API as described at:
183 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
184 *******************************""")
185
186         self.loadingContext = ArvLoadingContext(vars(arvargs))
187         self.loadingContext.fetcher_constructor = self.fetcher_constructor
188         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
189         self.loadingContext.construct_tool_object = self.arv_make_tool
190
191         # Add a custom logging handler to the root logger for runtime status reporting
192         # if running inside a container
193         if get_current_container(self.api, self.num_retries, logger):
194             root_logger = logging.getLogger('')
195             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
196             root_logger.addHandler(handler)
197
198     def arv_make_tool(self, toolpath_object, loadingContext):
199         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
200             return ArvadosCommandTool(self, toolpath_object, loadingContext)
201         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
202             return ArvadosWorkflow(self, toolpath_object, loadingContext)
203         else:
204             return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
205
206     def output_callback(self, out, processStatus):
207         with self.workflow_eval_lock:
208             if processStatus == "success":
209                 logger.info("Overall process status is %s", processStatus)
210                 state = "Complete"
211             else:
212                 logger.error("Overall process status is %s", processStatus)
213                 state = "Failed"
214             if self.pipeline:
215                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
216                                                         body={"state": state}).execute(num_retries=self.num_retries)
217             self.final_status = processStatus
218             self.final_output = out
219             self.workflow_eval_lock.notifyAll()
220
221
222     def start_run(self, runnable, runtimeContext):
223         self.task_queue.add(partial(runnable.run, runtimeContext))
224
225     def process_submitted(self, container):
226         with self.workflow_eval_lock:
227             self.processes[container.uuid] = container
228
229     def process_done(self, uuid, record):
230         with self.workflow_eval_lock:
231             j = self.processes[uuid]
232             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
233             self.task_queue.add(partial(j.done, record))
234             del self.processes[uuid]
235
236     def runtime_status_update(self, kind, message, detail=None):
237         """
238         Updates the runtime_status field on the runner container.
239         Called from a failing child container: records the first child error
240         or updates the error count on subsequent error statuses.
241         Also called from other parts that need to report errros, warnings or just
242         activity statuses.
243         """
244         with self.workflow_eval_lock:
245             current = get_current_container(self.api, self.num_retries, logger)
246             if current is None:
247                 return
248             runtime_status = current.get('runtime_status', {})
249             # In case of status being an error, only report the first one.
250             if kind == 'error':
251                 if not runtime_status.get('error'):
252                     runtime_status.update({
253                         'error': message
254                     })
255                     if detail is not None:
256                         runtime_status.update({
257                             'errorDetail': detail
258                         })
259                 # Further errors are only mentioned as a count.
260                 else:
261                     # Get anything before an optional 'and N more' string.
262                     try:
263                         error_msg = re.match(
264                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
265                         more_failures = re.match(
266                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
267                     except TypeError:
268                         # Ignore tests stubbing errors
269                         return
270                     if more_failures:
271                         failure_qty = int(more_failures.groups()[0])
272                         runtime_status.update({
273                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
274                         })
275                     else:
276                         runtime_status.update({
277                             'error': "%s (and 1 more)" % error_msg
278                         })
279             elif kind in ['warning', 'activity']:
280                 # Record the last warning/activity status without regard of
281                 # previous occurences.
282                 runtime_status.update({
283                     kind: message
284                 })
285                 if detail is not None:
286                     runtime_status.update({
287                         kind+"Detail": detail
288                     })
289             else:
290                 # Ignore any other status kind
291                 return
292             try:
293                 self.api.containers().update(uuid=current['uuid'],
294                                             body={
295                                                 'runtime_status': runtime_status,
296                                             }).execute(num_retries=self.num_retries)
297             except Exception as e:
298                 logger.info("Couldn't update runtime_status: %s", e)
299
300     def wrapped_callback(self, cb, obj, st):
301         with self.workflow_eval_lock:
302             cb(obj, st)
303             self.workflow_eval_lock.notifyAll()
304
305     def get_wrapped_callback(self, cb):
306         return partial(self.wrapped_callback, cb)
307
308     def on_message(self, event):
309         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
310             uuid = event["object_uuid"]
311             if event["properties"]["new_attributes"]["state"] == "Running":
312                 with self.workflow_eval_lock:
313                     j = self.processes[uuid]
314                     if j.running is False:
315                         j.running = True
316                         j.update_pipeline_component(event["properties"]["new_attributes"])
317                         logger.info("%s %s is Running", self.label(j), uuid)
318             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
319                 self.process_done(uuid, event["properties"]["new_attributes"])
320
321     def label(self, obj):
322         return "[%s %s]" % (self.work_api[0:-1], obj.name)
323
324     def poll_states(self):
325         """Poll status of jobs or containers listed in the processes dict.
326
327         Runs in a separate thread.
328         """
329
330         try:
331             remain_wait = self.poll_interval
332             while True:
333                 if remain_wait > 0:
334                     self.stop_polling.wait(remain_wait)
335                 if self.stop_polling.is_set():
336                     break
337                 with self.workflow_eval_lock:
338                     keys = list(self.processes.keys())
339                 if not keys:
340                     remain_wait = self.poll_interval
341                     continue
342
343                 begin_poll = time.time()
344                 if self.work_api == "containers":
345                     table = self.poll_api.container_requests()
346                 elif self.work_api == "jobs":
347                     table = self.poll_api.jobs()
348
349                 try:
350                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
351                 except Exception as e:
352                     logger.warn("Error checking states on API server: %s", e)
353                     remain_wait = self.poll_interval
354                     continue
355
356                 for p in proc_states["items"]:
357                     self.on_message({
358                         "object_uuid": p["uuid"],
359                         "event_type": "update",
360                         "properties": {
361                             "new_attributes": p
362                         }
363                     })
364                 finish_poll = time.time()
365                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
366         except:
367             logger.exception("Fatal error in state polling thread.")
368             with self.workflow_eval_lock:
369                 self.processes.clear()
370                 self.workflow_eval_lock.notifyAll()
371         finally:
372             self.stop_polling.set()
373
374     def add_intermediate_output(self, uuid):
375         if uuid:
376             self.intermediate_output_collections.append(uuid)
377
378     def trash_intermediate_output(self):
379         logger.info("Cleaning up intermediate output collections")
380         for i in self.intermediate_output_collections:
381             try:
382                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
383             except:
384                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
385             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
386                 break
387
388     def check_features(self, obj):
389         if isinstance(obj, dict):
390             if obj.get("writable") and self.work_api != "containers":
391                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
392             if obj.get("class") == "DockerRequirement":
393                 if obj.get("dockerOutputDirectory"):
394                     if self.work_api != "containers":
395                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
396                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
397                     if not obj.get("dockerOutputDirectory").startswith('/'):
398                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
399                             "Option 'dockerOutputDirectory' must be an absolute path.")
400             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
401                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
402             for v in obj.itervalues():
403                 self.check_features(v)
404         elif isinstance(obj, list):
405             for i,v in enumerate(obj):
406                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
407                     self.check_features(v)
408
409     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
410         outputObj = copy.deepcopy(outputObj)
411
412         files = []
413         def capture(fileobj):
414             files.append(fileobj)
415
416         adjustDirObjs(outputObj, capture)
417         adjustFileObjs(outputObj, capture)
418
419         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
420
421         final = arvados.collection.Collection(api_client=self.api,
422                                               keep_client=self.keep_client,
423                                               num_retries=self.num_retries)
424
425         for k,v in generatemapper.items():
426             if k.startswith("_:"):
427                 if v.type == "Directory":
428                     continue
429                 if v.type == "CreateFile":
430                     with final.open(v.target, "wb") as f:
431                         f.write(v.resolved.encode("utf-8"))
432                     continue
433
434             if not k.startswith("keep:"):
435                 raise Exception("Output source is not in keep or a literal")
436             sp = k.split("/")
437             srccollection = sp[0][5:]
438             try:
439                 reader = self.collection_cache.get(srccollection)
440                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
441                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
442             except arvados.errors.ArgumentError as e:
443                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
444                 raise
445             except IOError as e:
446                 logger.warn("While preparing output collection: %s", e)
447
448         def rewrite(fileobj):
449             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
450             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
451                 if k in fileobj:
452                     del fileobj[k]
453
454         adjustDirObjs(outputObj, rewrite)
455         adjustFileObjs(outputObj, rewrite)
456
457         with final.open("cwl.output.json", "w") as f:
458             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
459
460         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
461
462         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
463                     final.api_response()["name"],
464                     final.manifest_locator())
465
466         final_uuid = final.manifest_locator()
467         tags = tagsString.split(',')
468         for tag in tags:
469              self.api.links().create(body={
470                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
471                 }).execute(num_retries=self.num_retries)
472
473         def finalcollection(fileobj):
474             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
475
476         adjustDirObjs(outputObj, finalcollection)
477         adjustFileObjs(outputObj, finalcollection)
478
479         return (outputObj, final)
480
481     def set_crunch_output(self):
482         if self.work_api == "containers":
483             current = get_current_container(self.api, self.num_retries, logger)
484             if current is None:
485                 return
486             try:
487                 self.api.containers().update(uuid=current['uuid'],
488                                              body={
489                                                  'output': self.final_output_collection.portable_data_hash(),
490                                              }).execute(num_retries=self.num_retries)
491                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
492                                               body={
493                                                   'is_trashed': True
494                                               }).execute(num_retries=self.num_retries)
495             except Exception as e:
496                 logger.info("Setting container output: %s", e)
497         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
498             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
499                                    body={
500                                        'output': self.final_output_collection.portable_data_hash(),
501                                        'success': self.final_status == "success",
502                                        'progress':1.0
503                                    }).execute(num_retries=self.num_retries)
504
505     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
506         self.debug = runtimeContext.debug
507
508         tool.visit(self.check_features)
509
510         self.project_uuid = runtimeContext.project_uuid
511         self.pipeline = None
512         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
513         self.secret_store = runtimeContext.secret_store
514
515         self.trash_intermediate = runtimeContext.trash_intermediate
516         if self.trash_intermediate and self.work_api != "containers":
517             raise Exception("--trash-intermediate is only supported with --api=containers.")
518
519         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
520         if self.intermediate_output_ttl and self.work_api != "containers":
521             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
522         if self.intermediate_output_ttl < 0:
523             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
524
525         if runtimeContext.submit_request_uuid and self.work_api != "containers":
526             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
527
528         if not runtimeContext.name:
529             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
530
531         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
532         # Also uploads docker images.
533         merged_map = upload_workflow_deps(self, tool)
534
535         # Reload tool object which may have been updated by
536         # upload_workflow_deps
537         # Don't validate this time because it will just print redundant errors.
538         loadingContext = self.loadingContext.copy()
539         loadingContext.loader = tool.doc_loader
540         loadingContext.avsc_names = tool.doc_schema
541         loadingContext.metadata = tool.metadata
542         loadingContext.do_validate = False
543
544         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
545                                   loadingContext)
546
547         # Upload local file references in the job order.
548         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
549                                      tool, job_order)
550
551         existing_uuid = runtimeContext.update_workflow
552         if existing_uuid or runtimeContext.create_workflow:
553             # Create a pipeline template or workflow record and exit.
554             if self.work_api == "jobs":
555                 tmpl = RunnerTemplate(self, tool, job_order,
556                                       runtimeContext.enable_reuse,
557                                       uuid=existing_uuid,
558                                       submit_runner_ram=runtimeContext.submit_runner_ram,
559                                       name=runtimeContext.name,
560                                       merged_map=merged_map)
561                 tmpl.save()
562                 # cwltool.main will write our return value to stdout.
563                 return (tmpl.uuid, "success")
564             elif self.work_api == "containers":
565                 return (upload_workflow(self, tool, job_order,
566                                         self.project_uuid,
567                                         uuid=existing_uuid,
568                                         submit_runner_ram=runtimeContext.submit_runner_ram,
569                                         name=runtimeContext.name,
570                                         merged_map=merged_map),
571                         "success")
572
573         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
574         self.eval_timeout = runtimeContext.eval_timeout
575
576         runtimeContext = runtimeContext.copy()
577         runtimeContext.use_container = True
578         runtimeContext.tmpdir_prefix = "tmp"
579         runtimeContext.work_api = self.work_api
580
581         if self.work_api == "containers":
582             if self.ignore_docker_for_reuse:
583                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
584             runtimeContext.outdir = "/var/spool/cwl"
585             runtimeContext.docker_outdir = "/var/spool/cwl"
586             runtimeContext.tmpdir = "/tmp"
587             runtimeContext.docker_tmpdir = "/tmp"
588         elif self.work_api == "jobs":
589             if runtimeContext.priority != DEFAULT_PRIORITY:
590                 raise Exception("--priority not implemented for jobs API.")
591             runtimeContext.outdir = "$(task.outdir)"
592             runtimeContext.docker_outdir = "$(task.outdir)"
593             runtimeContext.tmpdir = "$(task.tmpdir)"
594
595         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
596             raise Exception("--priority must be in the range 1..1000.")
597
598         runnerjob = None
599         if runtimeContext.submit:
600             # Submit a runner job to run the workflow for us.
601             if self.work_api == "containers":
602                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
603                     runtimeContext.runnerjob = tool.tool["id"]
604                     runnerjob = tool.job(job_order,
605                                          self.output_callback,
606                                          runtimeContext).next()
607                 else:
608                     runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
609                                                 self.output_name,
610                                                 self.output_tags,
611                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
612                                                 name=runtimeContext.name,
613                                                 on_error=runtimeContext.on_error,
614                                                 submit_runner_image=runtimeContext.submit_runner_image,
615                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
616                                                 merged_map=merged_map,
617                                                 priority=runtimeContext.priority,
618                                                 secret_store=self.secret_store)
619             elif self.work_api == "jobs":
620                 runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
621                                       self.output_name,
622                                       self.output_tags,
623                                       submit_runner_ram=runtimeContext.submit_runner_ram,
624                                       name=runtimeContext.name,
625                                       on_error=runtimeContext.on_error,
626                                       submit_runner_image=runtimeContext.submit_runner_image,
627                                       merged_map=merged_map)
628         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
629             # Create pipeline for local run
630             self.pipeline = self.api.pipeline_instances().create(
631                 body={
632                     "owner_uuid": self.project_uuid,
633                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
634                     "components": {},
635                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
636             logger.info("Pipeline instance %s", self.pipeline["uuid"])
637
638         if runnerjob and not runtimeContext.wait:
639             submitargs = runtimeContext.copy()
640             submitargs.submit = False
641             runnerjob.run(submitargs)
642             return (runnerjob.uuid, "success")
643
644         self.poll_api = arvados.api('v1')
645         self.polling_thread = threading.Thread(target=self.poll_states)
646         self.polling_thread.start()
647
648         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
649
650         if runnerjob:
651             jobiter = iter((runnerjob,))
652         else:
653             if runtimeContext.cwl_runner_job is not None:
654                 self.uuid = runtimeContext.cwl_runner_job.get('uuid')
655             jobiter = tool.job(job_order,
656                                self.output_callback,
657                                runtimeContext)
658
659         try:
660             self.workflow_eval_lock.acquire()
661             # Holds the lock while this code runs and releases it when
662             # it is safe to do so in self.workflow_eval_lock.wait(),
663             # at which point on_message can update job state and
664             # process output callbacks.
665
666             loopperf = Perf(metrics, "jobiter")
667             loopperf.__enter__()
668             for runnable in jobiter:
669                 loopperf.__exit__()
670
671                 if self.stop_polling.is_set():
672                     break
673
674                 if self.task_queue.error is not None:
675                     raise self.task_queue.error
676
677                 if runnable:
678                     with Perf(metrics, "run"):
679                         self.start_run(runnable, runtimeContext)
680                 else:
681                     if (self.task_queue.in_flight + len(self.processes)) > 0:
682                         self.workflow_eval_lock.wait(3)
683                     else:
684                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
685                         break
686                 loopperf.__enter__()
687             loopperf.__exit__()
688
689             while (self.task_queue.in_flight + len(self.processes)) > 0:
690                 if self.task_queue.error is not None:
691                     raise self.task_queue.error
692                 self.workflow_eval_lock.wait(3)
693
694         except UnsupportedRequirement:
695             raise
696         except:
697             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
698                 logger.error("Interrupted, workflow will be cancelled")
699             else:
700                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
701             if self.pipeline:
702                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
703                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
704             if runnerjob and runnerjob.uuid and self.work_api == "containers":
705                 self.api.container_requests().update(uuid=runnerjob.uuid,
706                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
707         finally:
708             self.workflow_eval_lock.release()
709             self.task_queue.drain()
710             self.stop_polling.set()
711             self.polling_thread.join()
712             self.task_queue.join()
713
714         if self.final_status == "UnsupportedRequirement":
715             raise UnsupportedRequirement("Check log for details.")
716
717         if self.final_output is None:
718             raise WorkflowException("Workflow did not return a result.")
719
720         if runtimeContext.submit and isinstance(runnerjob, Runner):
721             logger.info("Final output collection %s", runnerjob.final_output)
722         else:
723             if self.output_name is None:
724                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
725             if self.output_tags is None:
726                 self.output_tags = ""
727
728             storage_classes = runtimeContext.storage_classes.strip().split(",")
729             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
730             self.set_crunch_output()
731
732         if runtimeContext.compute_checksum:
733             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
734             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
735
736         if self.trash_intermediate and self.final_status == "success":
737             self.trash_intermediate_output()
738
739         return (self.final_output, self.final_status)
740
741
742 def versionstring():
743     """Print version string of key packages for provenance and debugging."""
744
745     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
746     arvpkg = pkg_resources.require("arvados-python-client")
747     cwlpkg = pkg_resources.require("cwltool")
748
749     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
750                                     "arvados-python-client", arvpkg[0].version,
751                                     "cwltool", cwlpkg[0].version)
752
753
754 def arg_parser():  # type: () -> argparse.ArgumentParser
755     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
756
757     parser.add_argument("--basedir", type=str,
758                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
759     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
760                         help="Output directory, default current directory")
761
762     parser.add_argument("--eval-timeout",
763                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
764                         type=float,
765                         default=20)
766
767     exgroup = parser.add_mutually_exclusive_group()
768     exgroup.add_argument("--print-dot", action="store_true",
769                          help="Print workflow visualization in graphviz format and exit")
770     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
771     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
772
773     exgroup = parser.add_mutually_exclusive_group()
774     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
775     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
776     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
777
778     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
779
780     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
781
782     exgroup = parser.add_mutually_exclusive_group()
783     exgroup.add_argument("--enable-reuse", action="store_true",
784                         default=True, dest="enable_reuse",
785                         help="Enable job or container reuse (default)")
786     exgroup.add_argument("--disable-reuse", action="store_false",
787                         default=True, dest="enable_reuse",
788                         help="Disable job or container reuse")
789
790     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
791     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
792     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
793     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
794                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
795                         default=False)
796
797     exgroup = parser.add_mutually_exclusive_group()
798     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
799                         default=True, dest="submit")
800     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
801                         default=True, dest="submit")
802     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
803                          dest="create_workflow")
804     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
805     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
806
807     exgroup = parser.add_mutually_exclusive_group()
808     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
809                         default=True, dest="wait")
810     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
811                         default=True, dest="wait")
812
813     exgroup = parser.add_mutually_exclusive_group()
814     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
815                         default=True, dest="log_timestamps")
816     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
817                         default=True, dest="log_timestamps")
818
819     parser.add_argument("--api", type=str,
820                         default=None, dest="work_api",
821                         choices=("jobs", "containers"),
822                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
823
824     parser.add_argument("--compute-checksum", action="store_true", default=False,
825                         help="Compute checksum of contents while collecting outputs",
826                         dest="compute_checksum")
827
828     parser.add_argument("--submit-runner-ram", type=int,
829                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
830                         default=None)
831
832     parser.add_argument("--submit-runner-image", type=str,
833                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
834                         default=None)
835
836     parser.add_argument("--submit-request-uuid", type=str,
837                         default=None,
838                         help="Update and commit supplied container request instead of creating a new one (containers API only).")
839
840     parser.add_argument("--name", type=str,
841                         help="Name to use for workflow execution instance.",
842                         default=None)
843
844     parser.add_argument("--on-error", type=str,
845                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
846                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
847
848     parser.add_argument("--enable-dev", action="store_true",
849                         help="Enable loading and running development versions "
850                              "of CWL spec.", default=False)
851     parser.add_argument('--storage-classes', default="default", type=str,
852                         help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
853
854     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
855                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
856                         default=0)
857
858     parser.add_argument("--priority", type=int,
859                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
860                         default=DEFAULT_PRIORITY)
861
862     parser.add_argument("--disable-validate", dest="do_validate",
863                         action="store_false", default=True,
864                         help=argparse.SUPPRESS)
865
866     parser.add_argument("--disable-js-validation",
867                         action="store_true", default=False,
868                         help=argparse.SUPPRESS)
869
870     parser.add_argument("--thread-count", type=int,
871                         default=4, help="Number of threads to use for job submit and output collection.")
872
873     exgroup = parser.add_mutually_exclusive_group()
874     exgroup.add_argument("--trash-intermediate", action="store_true",
875                         default=False, dest="trash_intermediate",
876                          help="Immediately trash intermediate outputs on workflow success.")
877     exgroup.add_argument("--no-trash-intermediate", action="store_false",
878                         default=False, dest="trash_intermediate",
879                         help="Do not trash intermediate outputs (default).")
880
881     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
882     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
883
884     return parser
885
886 def add_arv_hints():
887     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
888     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
889     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
890     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
891     res.close()
892     cwltool.process.supportedProcessRequirements.extend([
893         "http://arvados.org/cwl#RunInSingleContainer",
894         "http://arvados.org/cwl#OutputDirType",
895         "http://arvados.org/cwl#RuntimeConstraints",
896         "http://arvados.org/cwl#PartitionRequirement",
897         "http://arvados.org/cwl#APIRequirement",
898         "http://commonwl.org/cwltool#LoadListingRequirement",
899         "http://arvados.org/cwl#IntermediateOutput",
900         "http://arvados.org/cwl#ReuseRequirement"
901     ])
902
903 def exit_signal_handler(sigcode, frame):
904     logger.error("Caught signal {}, exiting.".format(sigcode))
905     sys.exit(-sigcode)
906
907 def main(args, stdout, stderr, api_client=None, keep_client=None,
908          install_sig_handlers=True):
909     parser = arg_parser()
910
911     job_order_object = None
912     arvargs = parser.parse_args(args)
913
914     if len(arvargs.storage_classes.strip().split(',')) > 1:
915         logger.error("Multiple storage classes are not supported currently.")
916         return 1
917
918     arvargs.use_container = True
919     arvargs.relax_path_checks = True
920     arvargs.print_supported_versions = False
921
922     if install_sig_handlers:
923         arv_cmd.install_signal_handlers()
924
925     if arvargs.update_workflow:
926         if arvargs.update_workflow.find('-7fd4e-') == 5:
927             want_api = 'containers'
928         elif arvargs.update_workflow.find('-p5p6p-') == 5:
929             want_api = 'jobs'
930         else:
931             want_api = None
932         if want_api and arvargs.work_api and want_api != arvargs.work_api:
933             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
934                 arvargs.update_workflow, want_api, arvargs.work_api))
935             return 1
936         arvargs.work_api = want_api
937
938     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
939         job_order_object = ({}, "")
940
941     add_arv_hints()
942
943     try:
944         if api_client is None:
945             api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
946             keep_client = api_client.keep
947             # Make an API object now so errors are reported early.
948             api_client.users().current().execute()
949         if keep_client is None:
950             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
951         runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4)
952     except Exception as e:
953         logger.error(e)
954         return 1
955
956     if arvargs.debug:
957         logger.setLevel(logging.DEBUG)
958         logging.getLogger('arvados').setLevel(logging.DEBUG)
959
960     if arvargs.quiet:
961         logger.setLevel(logging.WARN)
962         logging.getLogger('arvados').setLevel(logging.WARN)
963         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
964
965     if arvargs.metrics:
966         metrics.setLevel(logging.DEBUG)
967         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
968
969     if arvargs.log_timestamps:
970         arvados.log_handler.setFormatter(logging.Formatter(
971             '%(asctime)s %(name)s %(levelname)s: %(message)s',
972             '%Y-%m-%d %H:%M:%S'))
973     else:
974         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
975
976     for key, val in cwltool.argparser.get_default_args().items():
977         if not hasattr(arvargs, key):
978             setattr(arvargs, key, val)
979
980     runtimeContext = ArvRuntimeContext(vars(arvargs))
981     runtimeContext.make_fs_access = partial(CollectionFsAccess,
982                              collection_cache=runner.collection_cache)
983
984     return cwltool.main.main(args=arvargs,
985                              stdout=stdout,
986                              stderr=stderr,
987                              executor=runner.arv_executor,
988                              versionfunc=versionstring,
989                              job_order_object=job_order_object,
990                              logger_handler=arvados.log_handler,
991                              custom_schema_callback=add_arv_hints,
992                              loadingContext=runner.loadingContext,
993                              runtimeContext=runtimeContext)