Merge branch '10499-cwl-crunch2-docs' refs #10499
[arvados.git] / services / api / script / crunch_failure_report.py
1 #! /usr/bin/env python
2
3 import argparse
4 import datetime
5 import json
6 import re
7 import sys
8
9 import arvados
10
11 # Useful configuration variables:
12
13 # Number of log lines to use as context in diagnosing failure.
14 LOG_CONTEXT_LINES = 10
15
16 # Regex that signifies a failed task.
17 FAILED_TASK_REGEX = re.compile(' \d+ failure (.*permanent)')
18
19 # Regular expressions used to classify failure types.
20 JOB_FAILURE_TYPES = {
21     'sys/docker': 'Cannot destroy container',
22     'crunch/node': 'User not found on host',
23     'slurm/comm':  'Communication connection failure'
24 }
25
26 def parse_arguments(arguments):
27     arg_parser = argparse.ArgumentParser(
28         description='Produce a report of Crunch failures within a specified time range')
29
30     arg_parser.add_argument(
31         '--start',
32         help='Start date and time')
33     arg_parser.add_argument(
34         '--end',
35         help='End date and time')
36
37     args = arg_parser.parse_args(arguments)
38
39     if args.start and not is_valid_timestamp(args.start):
40         raise ValueError(args.start)
41     if args.end and not is_valid_timestamp(args.end):
42         raise ValueError(args.end)
43
44     return args
45
46
47 def api_timestamp(when=None):
48     """Returns a string representing the timestamp 'when' in a format
49     suitable for delivering to the API server.  Defaults to the
50     current time.
51     """
52     if when is None:
53         when = datetime.datetime.utcnow()
54     return when.strftime("%Y-%m-%dT%H:%M:%SZ")
55
56
57 def is_valid_timestamp(ts):
58     return re.match(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z', ts)
59
60
61 def jobs_created_between_dates(api, start, end):
62     return arvados.util.list_all(
63         api.jobs().list,
64         filters=json.dumps([ ['created_at', '>=', start],
65                              ['created_at', '<=', end] ]))
66
67
68 def job_logs(api, job):
69     # Returns the contents of the log for this job (as an array of lines).
70     if job['log']:
71         log_collection = arvados.CollectionReader(job['log'], api)
72         log_filename = "{}.log.txt".format(job['uuid'])
73         return log_collection.open(log_filename).readlines()
74     return []
75
76
77 user_names = {}
78 def job_user_name(api, user_uuid):
79     def _lookup_user_name(api, user_uuid):
80         try:
81             return api.users().get(uuid=user_uuid).execute()['full_name']
82         except arvados.errors.ApiError:
83             return user_uuid
84
85     if user_uuid not in user_names:
86         user_names[user_uuid] = _lookup_user_name(api, user_uuid)
87     return user_names[user_uuid]
88
89
90 job_pipeline_names = {}
91 def job_pipeline_name(api, job_uuid):
92     def _lookup_pipeline_name(api, job_uuid):
93         try:
94             pipelines = api.pipeline_instances().list(
95                 filters='[["components", "like", "%{}%"]]'.format(job_uuid)).execute()
96             pi = pipelines['items'][0]
97             if pi['name']:
98                 return pi['name']
99             else:
100                 # Use the pipeline template name
101                 pt = api.pipeline_templates().get(uuid=pi['pipeline_template_uuid']).execute()
102                 return pt['name']
103         except (TypeError, ValueError, IndexError):
104             return ""
105
106     if job_uuid not in job_pipeline_names:
107         job_pipeline_names[job_uuid] = _lookup_pipeline_name(api, job_uuid)
108     return job_pipeline_names[job_uuid]
109
110
111 def is_failed_task(logline):
112     return FAILED_TASK_REGEX.search(logline) != None
113
114
115 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
116     args = parse_arguments(arguments)
117
118     api = arvados.api('v1')
119
120     now = datetime.datetime.utcnow()
121     start_time = args.start or api_timestamp(now - datetime.timedelta(days=1))
122     end_time = args.end or api_timestamp(now)
123
124     # Find all jobs created within the specified window,
125     # and their corresponding job logs.
126     jobs_created = jobs_created_between_dates(api, start_time, end_time)
127     jobs_by_state = {}
128     for job in jobs_created:
129         jobs_by_state.setdefault(job['state'], [])
130         jobs_by_state[job['state']].append(job)
131
132     # Find failed jobs and record the job failure text.
133
134     # failure_stats maps failure types (e.g. "sys/docker") to
135     # a set of job UUIDs that failed for that reason.
136     failure_stats = {}
137     for job in jobs_by_state['Failed']:
138         job_uuid = job['uuid']
139         logs = job_logs(api, job)
140         # Find the first permanent task failure, and collect the
141         # preceding log lines.
142         failure_type = None
143         for i, lg in enumerate(logs):
144             if is_failed_task(lg):
145                 # Get preceding log record to provide context.
146                 log_start = i - LOG_CONTEXT_LINES if i >= LOG_CONTEXT_LINES else 0
147                 log_end = i + 1
148                 lastlogs = ''.join(logs[log_start:log_end])
149                 # try to identify the type of failure.
150                 for key, rgx in JOB_FAILURE_TYPES.iteritems():
151                     if re.search(rgx, lastlogs):
152                         failure_type = key
153                         break
154             if failure_type is not None:
155                 break
156         if failure_type is None:
157             failure_type = 'unknown'
158         failure_stats.setdefault(failure_type, set())
159         failure_stats[failure_type].add(job_uuid)
160
161     # Report percentages of successful, failed and unfinished jobs.
162     print "Start: {:20s}".format(start_time)
163     print "End:   {:20s}".format(end_time)
164     print ""
165
166     print "Overview"
167     print ""
168
169     job_start_count = len(jobs_created)
170     print "  {: <25s} {:4d}".format('Started', job_start_count)
171     for state in ['Complete', 'Failed', 'Queued', 'Cancelled', 'Running']:
172         if state in jobs_by_state:
173             job_count = len(jobs_by_state[state])
174             job_percentage = job_count / float(job_start_count)
175             print "  {: <25s} {:4d} ({: >4.0%})".format(state,
176                                                         job_count,
177                                                         job_percentage)
178     print ""
179
180     # Report failure types.
181     failure_summary = ""
182     failure_detail = ""
183
184     # Generate a mapping from failed job uuids to job records, to assist
185     # in generating detailed statistics for job failures.
186     jobs_failed_map = { job['uuid']: job for job in jobs_by_state.get('Failed', []) }
187
188     # sort the failure stats in descending order by occurrence.
189     sorted_failures = sorted(failure_stats,
190                              reverse=True,
191                              key=lambda failure_type: len(failure_stats[failure_type]))
192     for failtype in sorted_failures:
193         job_uuids = failure_stats[failtype]
194         failstat = "  {: <25s} {:4d} ({: >4.0%})\n".format(
195             failtype,
196             len(job_uuids),
197             len(job_uuids) / float(len(jobs_by_state['Failed'])))
198         failure_summary = failure_summary + failstat
199         failure_detail = failure_detail + failstat
200         for j in job_uuids:
201             job_info = jobs_failed_map[j]
202             job_owner = job_user_name(api, job_info['modified_by_user_uuid'])
203             job_name = job_pipeline_name(api, job_info['uuid'])
204             failure_detail = failure_detail + "    {}  {: <15.15s}  {:29.29s}\n".format(j, job_owner, job_name)
205         failure_detail = failure_detail + "\n"
206
207     print "Failures by class"
208     print ""
209     print failure_summary
210
211     print "Failures by class (detail)"
212     print ""
213     print failure_detail
214
215     return 0
216
217
218 if __name__ == "__main__":
219     sys.exit(main())