13773: Update runtime_status_error() method to report warning & activity status
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20 import Queue
21 import time
22 import signal
23 import thread
24
25 from cwltool.errors import WorkflowException
26 import cwltool.main
27 import cwltool.workflow
28 import cwltool.process
29 from schema_salad.sourceline import SourceLine
30 import schema_salad.validate as validate
31 import cwltool.argparser
32
33 import arvados
34 import arvados.config
35 from arvados.keep import KeepClient
36 from arvados.errors import ApiError
37 import arvados.commands._util as arv_cmd
38
39 from .arvcontainer import ArvadosContainer, RunnerContainer
40 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
41 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
42 from .arvtool import ArvadosCommandTool
43 from .arvworkflow import ArvadosWorkflow, upload_workflow
44 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
45 from .perf import Perf
46 from .pathmapper import NoFollowPathMapper
47 from .task_queue import TaskQueue
48 from .context import ArvLoadingContext, ArvRuntimeContext
49 from ._version import __version__
50
51 from cwltool.pack import pack
52 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
53 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
54 from cwltool.command_line_tool import compute_checksums
55
56 from arvados.api import OrderedJsonModel
57
58 logger = logging.getLogger('arvados.cwl-runner')
59 metrics = logging.getLogger('arvados.cwl-runner.metrics')
60 logger.setLevel(logging.INFO)
61
62 arvados.log_handler.setFormatter(logging.Formatter(
63         '%(asctime)s %(name)s %(levelname)s: %(message)s',
64         '%Y-%m-%d %H:%M:%S'))
65
66 DEFAULT_PRIORITY = 500
67
68 class ArvCwlRunner(object):
69     """Execute a CWL tool or workflow, submit work (using either jobs or
70     containers API), wait for them to complete, and report output.
71
72     """
73
74     def __init__(self, api_client,
75                  arvargs=None,
76                  keep_client=None,
77                  num_retries=4,
78                  thread_count=4):
79
80         if arvargs is None:
81             arvargs = argparse.Namespace()
82             arvargs.work_api = None
83             arvargs.output_name = None
84             arvargs.output_tags = None
85             arvargs.thread_count = 1
86
87         self.api = api_client
88         self.processes = {}
89         self.workflow_eval_lock = threading.Condition(threading.RLock())
90         self.final_output = None
91         self.final_status = None
92         self.num_retries = num_retries
93         self.uuid = None
94         self.stop_polling = threading.Event()
95         self.poll_api = None
96         self.pipeline = None
97         self.final_output_collection = None
98         self.output_name = arvargs.output_name
99         self.output_tags = arvargs.output_tags
100         self.project_uuid = None
101         self.intermediate_output_ttl = 0
102         self.intermediate_output_collections = []
103         self.trash_intermediate = False
104         self.thread_count = arvargs.thread_count
105         self.poll_interval = 12
106         self.loadingContext = None
107
108         if keep_client is not None:
109             self.keep_client = keep_client
110         else:
111             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
112
113         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
114
115         self.fetcher_constructor = partial(CollectionFetcher,
116                                            api_client=self.api,
117                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
118                                            num_retries=self.num_retries)
119
120         self.work_api = None
121         expected_api = ["jobs", "containers"]
122         for api in expected_api:
123             try:
124                 methods = self.api._rootDesc.get('resources')[api]['methods']
125                 if ('httpMethod' in methods['create'] and
126                     (arvargs.work_api == api or arvargs.work_api is None)):
127                     self.work_api = api
128                     break
129             except KeyError:
130                 pass
131
132         if not self.work_api:
133             if arvargs.work_api is None:
134                 raise Exception("No supported APIs")
135             else:
136                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
137
138         if self.work_api == "jobs":
139             logger.warn("""
140 *******************************
141 Using the deprecated 'jobs' API.
142
143 To get rid of this warning:
144
145 Users: read about migrating at
146 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
147 and use the option --api=containers
148
149 Admins: configure the cluster to disable the 'jobs' API as described at:
150 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
151 *******************************""")
152
153         self.loadingContext = ArvLoadingContext(vars(arvargs))
154         self.loadingContext.fetcher_constructor = self.fetcher_constructor
155         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
156         self.loadingContext.construct_tool_object = self.arv_make_tool
157
158
159     def arv_make_tool(self, toolpath_object, loadingContext):
160         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
161             return ArvadosCommandTool(self, toolpath_object, loadingContext)
162         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
163             return ArvadosWorkflow(self, toolpath_object, loadingContext)
164         else:
165             return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
166
167     def output_callback(self, out, processStatus):
168         with self.workflow_eval_lock:
169             if processStatus == "success":
170                 logger.info("Overall process status is %s", processStatus)
171                 state = "Complete"
172             else:
173                 logger.error("Overall process status is %s", processStatus)
174                 state = "Failed"
175             if self.pipeline:
176                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
177                                                         body={"state": state}).execute(num_retries=self.num_retries)
178             self.final_status = processStatus
179             self.final_output = out
180             self.workflow_eval_lock.notifyAll()
181
182
183     def start_run(self, runnable, runtimeContext):
184         self.task_queue.add(partial(runnable.run, runtimeContext))
185
186     def process_submitted(self, container):
187         with self.workflow_eval_lock:
188             self.processes[container.uuid] = container
189
190     def process_done(self, uuid, record):
191         with self.workflow_eval_lock:
192             j = self.processes[uuid]
193             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
194             self.task_queue.add(partial(j.done, record))
195             del self.processes[uuid]
196
197     def runtime_status_update(self, kind, message, detail=None):
198         """
199         Updates the runtime_status field on the runner container.
200         Called from a failing child container: records the first child error
201         or updates the error count on subsequent error statuses.
202         Also called from other parts that need to report errros, warnings or just
203         activity statuses.
204         """
205         with self.workflow_eval_lock:
206             try:
207                 current = self.api.containers().current().execute(num_retries=self.num_retries)
208             except ApiError as e:
209                 # Status code 404 just means we're not running in a container.
210                 if e.resp.status != 404:
211                     logger.info("Getting current container: %s", e)
212                 return
213             runtime_status = current.get('runtime_status', {})
214             # In case of status being an error, only report the first one.
215             if kind == 'error':
216                 if not runtime_status.get('error'):
217                     runtime_status.update({
218                         'error': message,
219                         'errorDetail': detail or "No error logs available"
220                     })
221                 # Further errors are only mentioned as a count.
222                 else:
223                     # Get anything before an optional 'and N more' string.
224                     error_msg = re.match(
225                         r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
226                     more_failures = re.match(
227                         r'.*\(and (\d+) more\)', runtime_status.get('error'))
228                     if more_failures:
229                         failure_qty = int(more_failures.groups()[0])
230                         runtime_status.update({
231                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
232                         })
233                     else:
234                         runtime_status.update({
235                             'error': "%s (and 1 more)" % error_msg
236                         })
237             elif kind in ['warning', 'activity']:
238                 # Record the last warning/activity status without regard of
239                 # previous occurences.
240                 runtime_status.update({
241                     kind: message
242                 })
243                 if detail is not None:
244                     runtime_status.update({
245                         kind+"Detail": detail
246                     })
247             else:
248                 # Ignore any other status kind
249                 return
250             try:
251                 self.api.containers().update(uuid=current['uuid'],
252                                             body={
253                                                 'runtime_status': runtime_status,
254                                             }).execute(num_retries=self.num_retries)
255             except Exception as e:
256                 logger.info("Couldn't update runtime_status: %s", e)
257
258     def wrapped_callback(self, cb, obj, st):
259         with self.workflow_eval_lock:
260             cb(obj, st)
261             self.workflow_eval_lock.notifyAll()
262
263     def get_wrapped_callback(self, cb):
264         return partial(self.wrapped_callback, cb)
265
266     def on_message(self, event):
267         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
268             uuid = event["object_uuid"]
269             if event["properties"]["new_attributes"]["state"] == "Running":
270                 with self.workflow_eval_lock:
271                     j = self.processes[uuid]
272                     if j.running is False:
273                         j.running = True
274                         j.update_pipeline_component(event["properties"]["new_attributes"])
275                         logger.info("%s %s is Running", self.label(j), uuid)
276             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
277                 self.process_done(uuid, event["properties"]["new_attributes"])
278
279     def label(self, obj):
280         return "[%s %s]" % (self.work_api[0:-1], obj.name)
281
282     def poll_states(self):
283         """Poll status of jobs or containers listed in the processes dict.
284
285         Runs in a separate thread.
286         """
287
288         try:
289             remain_wait = self.poll_interval
290             while True:
291                 if remain_wait > 0:
292                     self.stop_polling.wait(remain_wait)
293                 if self.stop_polling.is_set():
294                     break
295                 with self.workflow_eval_lock:
296                     keys = list(self.processes.keys())
297                 if not keys:
298                     remain_wait = self.poll_interval
299                     continue
300
301                 begin_poll = time.time()
302                 if self.work_api == "containers":
303                     table = self.poll_api.container_requests()
304                 elif self.work_api == "jobs":
305                     table = self.poll_api.jobs()
306
307                 try:
308                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
309                 except Exception as e:
310                     logger.warn("Error checking states on API server: %s", e)
311                     remain_wait = self.poll_interval
312                     continue
313
314                 for p in proc_states["items"]:
315                     self.on_message({
316                         "object_uuid": p["uuid"],
317                         "event_type": "update",
318                         "properties": {
319                             "new_attributes": p
320                         }
321                     })
322                 finish_poll = time.time()
323                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
324         except:
325             logger.exception("Fatal error in state polling thread.")
326             with self.workflow_eval_lock:
327                 self.processes.clear()
328                 self.workflow_eval_lock.notifyAll()
329         finally:
330             self.stop_polling.set()
331
332     def add_intermediate_output(self, uuid):
333         if uuid:
334             self.intermediate_output_collections.append(uuid)
335
336     def trash_intermediate_output(self):
337         logger.info("Cleaning up intermediate output collections")
338         for i in self.intermediate_output_collections:
339             try:
340                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
341             except:
342                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
343             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
344                 break
345
346     def check_features(self, obj):
347         if isinstance(obj, dict):
348             if obj.get("writable") and self.work_api != "containers":
349                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
350             if obj.get("class") == "DockerRequirement":
351                 if obj.get("dockerOutputDirectory"):
352                     if self.work_api != "containers":
353                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
354                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
355                     if not obj.get("dockerOutputDirectory").startswith('/'):
356                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
357                             "Option 'dockerOutputDirectory' must be an absolute path.")
358             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
359                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
360             for v in obj.itervalues():
361                 self.check_features(v)
362         elif isinstance(obj, list):
363             for i,v in enumerate(obj):
364                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
365                     self.check_features(v)
366
367     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
368         outputObj = copy.deepcopy(outputObj)
369
370         files = []
371         def capture(fileobj):
372             files.append(fileobj)
373
374         adjustDirObjs(outputObj, capture)
375         adjustFileObjs(outputObj, capture)
376
377         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
378
379         final = arvados.collection.Collection(api_client=self.api,
380                                               keep_client=self.keep_client,
381                                               num_retries=self.num_retries)
382
383         for k,v in generatemapper.items():
384             if k.startswith("_:"):
385                 if v.type == "Directory":
386                     continue
387                 if v.type == "CreateFile":
388                     with final.open(v.target, "wb") as f:
389                         f.write(v.resolved.encode("utf-8"))
390                     continue
391
392             if not k.startswith("keep:"):
393                 raise Exception("Output source is not in keep or a literal")
394             sp = k.split("/")
395             srccollection = sp[0][5:]
396             try:
397                 reader = self.collection_cache.get(srccollection)
398                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
399                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
400             except arvados.errors.ArgumentError as e:
401                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
402                 raise
403             except IOError as e:
404                 logger.warn("While preparing output collection: %s", e)
405
406         def rewrite(fileobj):
407             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
408             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
409                 if k in fileobj:
410                     del fileobj[k]
411
412         adjustDirObjs(outputObj, rewrite)
413         adjustFileObjs(outputObj, rewrite)
414
415         with final.open("cwl.output.json", "w") as f:
416             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
417
418         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
419
420         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
421                     final.api_response()["name"],
422                     final.manifest_locator())
423
424         final_uuid = final.manifest_locator()
425         tags = tagsString.split(',')
426         for tag in tags:
427              self.api.links().create(body={
428                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
429                 }).execute(num_retries=self.num_retries)
430
431         def finalcollection(fileobj):
432             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
433
434         adjustDirObjs(outputObj, finalcollection)
435         adjustFileObjs(outputObj, finalcollection)
436
437         return (outputObj, final)
438
439     def set_crunch_output(self):
440         if self.work_api == "containers":
441             try:
442                 current = self.api.containers().current().execute(num_retries=self.num_retries)
443             except ApiError as e:
444                 # Status code 404 just means we're not running in a container.
445                 if e.resp.status != 404:
446                     logger.info("Getting current container: %s", e)
447                 return
448             try:
449                 self.api.containers().update(uuid=current['uuid'],
450                                              body={
451                                                  'output': self.final_output_collection.portable_data_hash(),
452                                              }).execute(num_retries=self.num_retries)
453                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
454                                               body={
455                                                   'is_trashed': True
456                                               }).execute(num_retries=self.num_retries)
457             except Exception as e:
458                 logger.info("Setting container output: %s", e)
459         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
460             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
461                                    body={
462                                        'output': self.final_output_collection.portable_data_hash(),
463                                        'success': self.final_status == "success",
464                                        'progress':1.0
465                                    }).execute(num_retries=self.num_retries)
466
467     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
468         self.debug = runtimeContext.debug
469
470         tool.visit(self.check_features)
471
472         self.project_uuid = runtimeContext.project_uuid
473         self.pipeline = None
474         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
475         self.secret_store = runtimeContext.secret_store
476
477         self.trash_intermediate = runtimeContext.trash_intermediate
478         if self.trash_intermediate and self.work_api != "containers":
479             raise Exception("--trash-intermediate is only supported with --api=containers.")
480
481         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
482         if self.intermediate_output_ttl and self.work_api != "containers":
483             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
484         if self.intermediate_output_ttl < 0:
485             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
486
487         if runtimeContext.submit_request_uuid and self.work_api != "containers":
488             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
489
490         if not runtimeContext.name:
491             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
492
493         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
494         # Also uploads docker images.
495         merged_map = upload_workflow_deps(self, tool)
496
497         # Reload tool object which may have been updated by
498         # upload_workflow_deps
499         # Don't validate this time because it will just print redundant errors.
500         loadingContext = self.loadingContext.copy()
501         loadingContext.loader = tool.doc_loader
502         loadingContext.avsc_names = tool.doc_schema
503         loadingContext.metadata = tool.metadata
504         loadingContext.do_validate = False
505
506         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
507                                   loadingContext)
508
509         # Upload local file references in the job order.
510         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
511                                      tool, job_order)
512
513         existing_uuid = runtimeContext.update_workflow
514         if existing_uuid or runtimeContext.create_workflow:
515             # Create a pipeline template or workflow record and exit.
516             if self.work_api == "jobs":
517                 tmpl = RunnerTemplate(self, tool, job_order,
518                                       runtimeContext.enable_reuse,
519                                       uuid=existing_uuid,
520                                       submit_runner_ram=runtimeContext.submit_runner_ram,
521                                       name=runtimeContext.name,
522                                       merged_map=merged_map)
523                 tmpl.save()
524                 # cwltool.main will write our return value to stdout.
525                 return (tmpl.uuid, "success")
526             elif self.work_api == "containers":
527                 return (upload_workflow(self, tool, job_order,
528                                         self.project_uuid,
529                                         uuid=existing_uuid,
530                                         submit_runner_ram=runtimeContext.submit_runner_ram,
531                                         name=runtimeContext.name,
532                                         merged_map=merged_map),
533                         "success")
534
535         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
536         self.eval_timeout = runtimeContext.eval_timeout
537
538         runtimeContext = runtimeContext.copy()
539         runtimeContext.use_container = True
540         runtimeContext.tmpdir_prefix = "tmp"
541         runtimeContext.work_api = self.work_api
542
543         if self.work_api == "containers":
544             if self.ignore_docker_for_reuse:
545                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
546             runtimeContext.outdir = "/var/spool/cwl"
547             runtimeContext.docker_outdir = "/var/spool/cwl"
548             runtimeContext.tmpdir = "/tmp"
549             runtimeContext.docker_tmpdir = "/tmp"
550         elif self.work_api == "jobs":
551             if runtimeContext.priority != DEFAULT_PRIORITY:
552                 raise Exception("--priority not implemented for jobs API.")
553             runtimeContext.outdir = "$(task.outdir)"
554             runtimeContext.docker_outdir = "$(task.outdir)"
555             runtimeContext.tmpdir = "$(task.tmpdir)"
556
557         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
558             raise Exception("--priority must be in the range 1..1000.")
559
560         runnerjob = None
561         if runtimeContext.submit:
562             # Submit a runner job to run the workflow for us.
563             if self.work_api == "containers":
564                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
565                     runtimeContext.runnerjob = tool.tool["id"]
566                     runnerjob = tool.job(job_order,
567                                          self.output_callback,
568                                          runtimeContext).next()
569                 else:
570                     runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
571                                                 self.output_name,
572                                                 self.output_tags,
573                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
574                                                 name=runtimeContext.name,
575                                                 on_error=runtimeContext.on_error,
576                                                 submit_runner_image=runtimeContext.submit_runner_image,
577                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
578                                                 merged_map=merged_map,
579                                                 priority=runtimeContext.priority,
580                                                 secret_store=self.secret_store)
581             elif self.work_api == "jobs":
582                 runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
583                                       self.output_name,
584                                       self.output_tags,
585                                       submit_runner_ram=runtimeContext.submit_runner_ram,
586                                       name=runtimeContext.name,
587                                       on_error=runtimeContext.on_error,
588                                       submit_runner_image=runtimeContext.submit_runner_image,
589                                       merged_map=merged_map)
590         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
591             # Create pipeline for local run
592             self.pipeline = self.api.pipeline_instances().create(
593                 body={
594                     "owner_uuid": self.project_uuid,
595                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
596                     "components": {},
597                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
598             logger.info("Pipeline instance %s", self.pipeline["uuid"])
599
600         if runnerjob and not runtimeContext.wait:
601             submitargs = runtimeContext.copy()
602             submitargs.submit = False
603             runnerjob.run(submitargs)
604             return (runnerjob.uuid, "success")
605
606         self.poll_api = arvados.api('v1')
607         self.polling_thread = threading.Thread(target=self.poll_states)
608         self.polling_thread.start()
609
610         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
611
612         if runnerjob:
613             jobiter = iter((runnerjob,))
614         else:
615             if runtimeContext.cwl_runner_job is not None:
616                 self.uuid = runtimeContext.cwl_runner_job.get('uuid')
617             jobiter = tool.job(job_order,
618                                self.output_callback,
619                                runtimeContext)
620
621         try:
622             self.workflow_eval_lock.acquire()
623             # Holds the lock while this code runs and releases it when
624             # it is safe to do so in self.workflow_eval_lock.wait(),
625             # at which point on_message can update job state and
626             # process output callbacks.
627
628             loopperf = Perf(metrics, "jobiter")
629             loopperf.__enter__()
630             for runnable in jobiter:
631                 loopperf.__exit__()
632
633                 if self.stop_polling.is_set():
634                     break
635
636                 if self.task_queue.error is not None:
637                     raise self.task_queue.error
638
639                 if runnable:
640                     with Perf(metrics, "run"):
641                         self.start_run(runnable, runtimeContext)
642                 else:
643                     if (self.task_queue.in_flight + len(self.processes)) > 0:
644                         self.workflow_eval_lock.wait(3)
645                     else:
646                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
647                         break
648                 loopperf.__enter__()
649             loopperf.__exit__()
650
651             while (self.task_queue.in_flight + len(self.processes)) > 0:
652                 if self.task_queue.error is not None:
653                     raise self.task_queue.error
654                 self.workflow_eval_lock.wait(3)
655
656         except UnsupportedRequirement:
657             raise
658         except:
659             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
660                 logger.error("Interrupted, workflow will be cancelled")
661             else:
662                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
663             if self.pipeline:
664                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
665                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
666             if runnerjob and runnerjob.uuid and self.work_api == "containers":
667                 self.api.container_requests().update(uuid=runnerjob.uuid,
668                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
669         finally:
670             self.workflow_eval_lock.release()
671             self.task_queue.drain()
672             self.stop_polling.set()
673             self.polling_thread.join()
674             self.task_queue.join()
675
676         if self.final_status == "UnsupportedRequirement":
677             raise UnsupportedRequirement("Check log for details.")
678
679         if self.final_output is None:
680             raise WorkflowException("Workflow did not return a result.")
681
682         if runtimeContext.submit and isinstance(runnerjob, Runner):
683             logger.info("Final output collection %s", runnerjob.final_output)
684         else:
685             if self.output_name is None:
686                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
687             if self.output_tags is None:
688                 self.output_tags = ""
689
690             storage_classes = runtimeContext.storage_classes.strip().split(",")
691             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
692             self.set_crunch_output()
693
694         if runtimeContext.compute_checksum:
695             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
696             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
697
698         if self.trash_intermediate and self.final_status == "success":
699             self.trash_intermediate_output()
700
701         return (self.final_output, self.final_status)
702
703
704 def versionstring():
705     """Print version string of key packages for provenance and debugging."""
706
707     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
708     arvpkg = pkg_resources.require("arvados-python-client")
709     cwlpkg = pkg_resources.require("cwltool")
710
711     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
712                                     "arvados-python-client", arvpkg[0].version,
713                                     "cwltool", cwlpkg[0].version)
714
715
716 def arg_parser():  # type: () -> argparse.ArgumentParser
717     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
718
719     parser.add_argument("--basedir", type=str,
720                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
721     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
722                         help="Output directory, default current directory")
723
724     parser.add_argument("--eval-timeout",
725                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
726                         type=float,
727                         default=20)
728
729     exgroup = parser.add_mutually_exclusive_group()
730     exgroup.add_argument("--print-dot", action="store_true",
731                          help="Print workflow visualization in graphviz format and exit")
732     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
733     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
734
735     exgroup = parser.add_mutually_exclusive_group()
736     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
737     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
738     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
739
740     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
741
742     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
743
744     exgroup = parser.add_mutually_exclusive_group()
745     exgroup.add_argument("--enable-reuse", action="store_true",
746                         default=True, dest="enable_reuse",
747                         help="Enable job or container reuse (default)")
748     exgroup.add_argument("--disable-reuse", action="store_false",
749                         default=True, dest="enable_reuse",
750                         help="Disable job or container reuse")
751
752     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
753     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
754     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
755     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
756                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
757                         default=False)
758
759     exgroup = parser.add_mutually_exclusive_group()
760     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
761                         default=True, dest="submit")
762     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
763                         default=True, dest="submit")
764     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
765                          dest="create_workflow")
766     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
767     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
768
769     exgroup = parser.add_mutually_exclusive_group()
770     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
771                         default=True, dest="wait")
772     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
773                         default=True, dest="wait")
774
775     exgroup = parser.add_mutually_exclusive_group()
776     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
777                         default=True, dest="log_timestamps")
778     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
779                         default=True, dest="log_timestamps")
780
781     parser.add_argument("--api", type=str,
782                         default=None, dest="work_api",
783                         choices=("jobs", "containers"),
784                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
785
786     parser.add_argument("--compute-checksum", action="store_true", default=False,
787                         help="Compute checksum of contents while collecting outputs",
788                         dest="compute_checksum")
789
790     parser.add_argument("--submit-runner-ram", type=int,
791                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
792                         default=None)
793
794     parser.add_argument("--submit-runner-image", type=str,
795                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
796                         default=None)
797
798     parser.add_argument("--submit-request-uuid", type=str,
799                         default=None,
800                         help="Update and commit supplied container request instead of creating a new one (containers API only).")
801
802     parser.add_argument("--name", type=str,
803                         help="Name to use for workflow execution instance.",
804                         default=None)
805
806     parser.add_argument("--on-error", type=str,
807                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
808                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
809
810     parser.add_argument("--enable-dev", action="store_true",
811                         help="Enable loading and running development versions "
812                              "of CWL spec.", default=False)
813     parser.add_argument('--storage-classes', default="default", type=str,
814                         help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
815
816     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
817                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
818                         default=0)
819
820     parser.add_argument("--priority", type=int,
821                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
822                         default=DEFAULT_PRIORITY)
823
824     parser.add_argument("--disable-validate", dest="do_validate",
825                         action="store_false", default=True,
826                         help=argparse.SUPPRESS)
827
828     parser.add_argument("--disable-js-validation",
829                         action="store_true", default=False,
830                         help=argparse.SUPPRESS)
831
832     parser.add_argument("--thread-count", type=int,
833                         default=4, help="Number of threads to use for job submit and output collection.")
834
835     exgroup = parser.add_mutually_exclusive_group()
836     exgroup.add_argument("--trash-intermediate", action="store_true",
837                         default=False, dest="trash_intermediate",
838                          help="Immediately trash intermediate outputs on workflow success.")
839     exgroup.add_argument("--no-trash-intermediate", action="store_false",
840                         default=False, dest="trash_intermediate",
841                         help="Do not trash intermediate outputs (default).")
842
843     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
844     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
845
846     return parser
847
848 def add_arv_hints():
849     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
850     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
851     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
852     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
853     res.close()
854     cwltool.process.supportedProcessRequirements.extend([
855         "http://arvados.org/cwl#RunInSingleContainer",
856         "http://arvados.org/cwl#OutputDirType",
857         "http://arvados.org/cwl#RuntimeConstraints",
858         "http://arvados.org/cwl#PartitionRequirement",
859         "http://arvados.org/cwl#APIRequirement",
860         "http://commonwl.org/cwltool#LoadListingRequirement",
861         "http://arvados.org/cwl#IntermediateOutput",
862         "http://arvados.org/cwl#ReuseRequirement"
863     ])
864
865 def exit_signal_handler(sigcode, frame):
866     logger.error("Caught signal {}, exiting.".format(sigcode))
867     sys.exit(-sigcode)
868
869 def main(args, stdout, stderr, api_client=None, keep_client=None,
870          install_sig_handlers=True):
871     parser = arg_parser()
872
873     job_order_object = None
874     arvargs = parser.parse_args(args)
875
876     if len(arvargs.storage_classes.strip().split(',')) > 1:
877         logger.error("Multiple storage classes are not supported currently.")
878         return 1
879
880     arvargs.use_container = True
881     arvargs.relax_path_checks = True
882     arvargs.print_supported_versions = False
883
884     if install_sig_handlers:
885         arv_cmd.install_signal_handlers()
886
887     if arvargs.update_workflow:
888         if arvargs.update_workflow.find('-7fd4e-') == 5:
889             want_api = 'containers'
890         elif arvargs.update_workflow.find('-p5p6p-') == 5:
891             want_api = 'jobs'
892         else:
893             want_api = None
894         if want_api and arvargs.work_api and want_api != arvargs.work_api:
895             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
896                 arvargs.update_workflow, want_api, arvargs.work_api))
897             return 1
898         arvargs.work_api = want_api
899
900     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
901         job_order_object = ({}, "")
902
903     add_arv_hints()
904
905     try:
906         if api_client is None:
907             api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
908             keep_client = api_client.keep
909             # Make an API object now so errors are reported early.
910             api_client.users().current().execute()
911         if keep_client is None:
912             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
913         runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4)
914     except Exception as e:
915         logger.error(e)
916         return 1
917
918     if arvargs.debug:
919         logger.setLevel(logging.DEBUG)
920         logging.getLogger('arvados').setLevel(logging.DEBUG)
921
922     if arvargs.quiet:
923         logger.setLevel(logging.WARN)
924         logging.getLogger('arvados').setLevel(logging.WARN)
925         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
926
927     if arvargs.metrics:
928         metrics.setLevel(logging.DEBUG)
929         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
930
931     if arvargs.log_timestamps:
932         arvados.log_handler.setFormatter(logging.Formatter(
933             '%(asctime)s %(name)s %(levelname)s: %(message)s',
934             '%Y-%m-%d %H:%M:%S'))
935     else:
936         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
937
938     for key, val in cwltool.argparser.get_default_args().items():
939         if not hasattr(arvargs, key):
940             setattr(arvargs, key, val)
941
942     runtimeContext = ArvRuntimeContext(vars(arvargs))
943     runtimeContext.make_fs_access = partial(CollectionFsAccess,
944                              collection_cache=runner.collection_cache)
945
946     return cwltool.main.main(args=arvargs,
947                              stdout=stdout,
948                              stderr=stderr,
949                              executor=runner.arv_executor,
950                              versionfunc=versionstring,
951                              job_order_object=job_order_object,
952                              logger_handler=arvados.log_handler,
953                              custom_schema_callback=add_arv_hints,
954                              loadingContext=runner.loadingContext,
955                              runtimeContext=runtimeContext)