2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
14 # Useful configuration variables:
16 # Number of log lines to use as context in diagnosing failure.
17 LOG_CONTEXT_LINES = 10
19 # Regex that signifies a failed task.
20 FAILED_TASK_REGEX = re.compile(' \d+ failure (.*permanent)')
22 # Regular expressions used to classify failure types.
24 'sys/docker': 'Cannot destroy container',
25 'crunch/node': 'User not found on host',
26 'slurm/comm': 'Communication connection failure'
29 def parse_arguments(arguments):
30 arg_parser = argparse.ArgumentParser(
31 description='Produce a report of Crunch failures within a specified time range')
33 arg_parser.add_argument(
35 help='Start date and time')
36 arg_parser.add_argument(
38 help='End date and time')
40 args = arg_parser.parse_args(arguments)
42 if args.start and not is_valid_timestamp(args.start):
43 raise ValueError(args.start)
44 if args.end and not is_valid_timestamp(args.end):
45 raise ValueError(args.end)
50 def api_timestamp(when=None):
51 """Returns a string representing the timestamp 'when' in a format
52 suitable for delivering to the API server. Defaults to the
56 when = datetime.datetime.utcnow()
57 return when.strftime("%Y-%m-%dT%H:%M:%SZ")
60 def is_valid_timestamp(ts):
61 return re.match(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z', ts)
64 def jobs_created_between_dates(api, start, end):
65 return arvados.util.list_all(
67 filters=json.dumps([ ['created_at', '>=', start],
68 ['created_at', '<=', end] ]))
71 def job_logs(api, job):
72 # Returns the contents of the log for this job (as an array of lines).
74 log_collection = arvados.CollectionReader(job['log'], api)
75 log_filename = "{}.log.txt".format(job['uuid'])
76 return log_collection.open(log_filename).readlines()
81 def job_user_name(api, user_uuid):
82 def _lookup_user_name(api, user_uuid):
84 return api.users().get(uuid=user_uuid).execute()['full_name']
85 except arvados.errors.ApiError:
88 if user_uuid not in user_names:
89 user_names[user_uuid] = _lookup_user_name(api, user_uuid)
90 return user_names[user_uuid]
93 job_pipeline_names = {}
94 def job_pipeline_name(api, job_uuid):
95 def _lookup_pipeline_name(api, job_uuid):
97 pipelines = api.pipeline_instances().list(
98 filters='[["components", "like", "%{}%"]]'.format(job_uuid)).execute()
99 pi = pipelines['items'][0]
103 # Use the pipeline template name
104 pt = api.pipeline_templates().get(uuid=pi['pipeline_template_uuid']).execute()
106 except (TypeError, ValueError, IndexError):
109 if job_uuid not in job_pipeline_names:
110 job_pipeline_names[job_uuid] = _lookup_pipeline_name(api, job_uuid)
111 return job_pipeline_names[job_uuid]
114 def is_failed_task(logline):
115 return FAILED_TASK_REGEX.search(logline) != None
118 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
119 args = parse_arguments(arguments)
121 api = arvados.api('v1')
123 now = datetime.datetime.utcnow()
124 start_time = args.start or api_timestamp(now - datetime.timedelta(days=1))
125 end_time = args.end or api_timestamp(now)
127 # Find all jobs created within the specified window,
128 # and their corresponding job logs.
129 jobs_created = jobs_created_between_dates(api, start_time, end_time)
131 for job in jobs_created:
132 jobs_by_state.setdefault(job['state'], [])
133 jobs_by_state[job['state']].append(job)
135 # Find failed jobs and record the job failure text.
137 # failure_stats maps failure types (e.g. "sys/docker") to
138 # a set of job UUIDs that failed for that reason.
140 for job in jobs_by_state['Failed']:
141 job_uuid = job['uuid']
142 logs = job_logs(api, job)
143 # Find the first permanent task failure, and collect the
144 # preceding log lines.
146 for i, lg in enumerate(logs):
147 if is_failed_task(lg):
148 # Get preceding log record to provide context.
149 log_start = i - LOG_CONTEXT_LINES if i >= LOG_CONTEXT_LINES else 0
151 lastlogs = ''.join(logs[log_start:log_end])
152 # try to identify the type of failure.
153 for key, rgx in JOB_FAILURE_TYPES.iteritems():
154 if re.search(rgx, lastlogs):
157 if failure_type is not None:
159 if failure_type is None:
160 failure_type = 'unknown'
161 failure_stats.setdefault(failure_type, set())
162 failure_stats[failure_type].add(job_uuid)
164 # Report percentages of successful, failed and unfinished jobs.
165 print "Start: {:20s}".format(start_time)
166 print "End: {:20s}".format(end_time)
172 job_start_count = len(jobs_created)
173 print " {: <25s} {:4d}".format('Started', job_start_count)
174 for state in ['Complete', 'Failed', 'Queued', 'Cancelled', 'Running']:
175 if state in jobs_by_state:
176 job_count = len(jobs_by_state[state])
177 job_percentage = job_count / float(job_start_count)
178 print " {: <25s} {:4d} ({: >4.0%})".format(state,
183 # Report failure types.
187 # Generate a mapping from failed job uuids to job records, to assist
188 # in generating detailed statistics for job failures.
189 jobs_failed_map = { job['uuid']: job for job in jobs_by_state.get('Failed', []) }
191 # sort the failure stats in descending order by occurrence.
192 sorted_failures = sorted(failure_stats,
194 key=lambda failure_type: len(failure_stats[failure_type]))
195 for failtype in sorted_failures:
196 job_uuids = failure_stats[failtype]
197 failstat = " {: <25s} {:4d} ({: >4.0%})\n".format(
200 len(job_uuids) / float(len(jobs_by_state['Failed'])))
201 failure_summary = failure_summary + failstat
202 failure_detail = failure_detail + failstat
204 job_info = jobs_failed_map[j]
205 job_owner = job_user_name(api, job_info['modified_by_user_uuid'])
206 job_name = job_pipeline_name(api, job_info['uuid'])
207 failure_detail = failure_detail + " {} {: <15.15s} {:29.29s}\n".format(j, job_owner, job_name)
208 failure_detail = failure_detail + "\n"
210 print "Failures by class"
212 print failure_summary
214 print "Failures by class (detail)"
221 if __name__ == "__main__":