21121: Report generator and charting WIP
[arvados.git] / tools / cluster-activity / arvados_cluster_activity / report.py
1 #!/usr/bin/env python3
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 import logging
7 import ciso8601
8 import arvados.util
9 import re
10 import csv
11 import math
12 import collections
13 import json
14 from datetime import datetime, timedelta
15 import pkg_resources
16
17 from dataclasses import dataclass
18
19 import crunchstat_summary.dygraphs
20 from crunchstat_summary.summarizer import Task
21
22 @dataclass
23 class WorkflowRunSummary:
24     name: str
25     uuid: str
26     count: int = 0
27     cost: float = 0
28     hours: float = 0
29
30 @dataclass
31 class ProjectSummary:
32     users: set
33     uuid: str
34     runs: dict[str, WorkflowRunSummary]
35     name: str = ""
36     cost: float = 0
37     count: int = 0
38     hours: float = 0
39
40
41 class Summarizer:
42     label: str
43     tasks: collections.defaultdict[str, Task]
44
45     def long_label(self):
46         return self.label
47
48
49 def date_export(item):
50     if isinstance(item, datetime):
51         return """@new Date("{}")@""".format(item.strftime("%Y-%m-%dT%H:%M:%SZ"))
52
53 class ReportChart(crunchstat_summary.dygraphs.DygraphsChart):
54     def sections(self):
55         return [
56             {
57                 'label': s.long_label(),
58                 'charts': [
59                     self.chartdata(s.label, s.tasks, stat)
60                     for stat in (('Compute', ['hours']),
61                                  ('Disk', ['GiB']),
62                                  )
63                     ],
64             }
65             for s in self.summarizers]
66
67     def js(self):
68         return 'var chartdata = {};\n{}'.format(
69             json.dumps(self.sections(), default=date_export).replace('"@', '').replace('@"', '').replace('\\"', '"'),
70             '\n'.join([pkg_resources.resource_string('crunchstat_summary', jsa).decode('utf-8') for jsa in self.JSASSETS]))
71
72     def _collate_data(self, tasks, stats):
73         data = []
74         nulls = []
75         # uuid is category for crunch2
76         print(tasks, stats)
77         for uuid, task in tasks.items():
78             # All stats in a category are assumed to have the same time base and same number of samples
79             category = stats[0]
80             series_names = stats[1]
81             sn0 = series_names[0]
82             series = task.series[(category,sn0)]
83             for i in range(len(series)):
84                 pt = series[i]
85                 vals = [task.series[(category,stat)][i][1] for stat in series_names[1:]]
86                 data.append([pt[0]] + nulls + [pt[1]] + vals)
87             nulls.append(None)
88         return sorted(data)
89
90
91 WEBCHART_CLASS = ReportChart
92
93
94 def runtime_str(container_request, containers):
95     length = ciso8601.parse_datetime(containers[container_request["container_uuid"]]["finished_at"]) - ciso8601.parse_datetime(containers[container_request["container_uuid"]]["started_at"])
96
97     hours = length.days * 24 + (length.seconds // 3600)
98     minutes = (length.seconds // 60) % 60
99     seconds = length.seconds % 60
100
101     return "%i:%02i:%02i" % (hours, minutes, seconds)
102
103 def runtime_in_hours(runtime):
104     sp = runtime.split(":")
105     hours = float(sp[0])
106     hours += float(sp[1]) / 60
107     hours += float(sp[2]) / 3600
108     return hours
109
110 def hours_to_runtime_str(frac_hours):
111     hours = math.floor(frac_hours)
112     minutes = (frac_hours - math.floor(frac_hours)) * 60.0
113     seconds = (minutes - math.floor(minutes)) * 60.0
114
115     return "%i:%02i:%02i" % (hours, minutes, seconds)
116
117
118 def csv_dateformat(date):
119     dt = ciso8601.parse_datetime(date)
120     return dt.strftime("%Y-%m-%d %H:%M%S")
121
122
123 class ClusterActivityReport(object):
124     def __init__(self, label=None, threads=1, **kwargs):
125         self.label = "Cluster report"
126         self.threadcount = threads
127         self.arv_client = arvados.api()
128
129         self.active_users = set()
130         self.project_summary = {}
131         self.total_hours = 0
132         self.total_cost = 0
133
134     def run(self):
135         pass
136
137     def html_report(self, since, to, exclude):
138
139         for row in self.report_from_api(since, to, True, exclude):
140             pass
141
142         tophtml = ""
143         bottomhtml = ""
144         label = self.label
145
146         tophtml = """<h2>Summary</h2>
147         <p>Total user accounts: {total_users}</p>
148         <p>Active users: {active_users}</p>
149         <p>Aggregate compute hours: {total_hours}</p>
150         <p>Aggregate compute cost: ${total_cost}</p>
151         """.format(active_users=len(self.active_users),
152                    total_users=self.total_users,
153                    total_hours=round(self.total_hours, 1),
154                    total_cost=round(self.total_cost, 2))
155
156         bottomhtml = []
157
158         for k, prj in sorted(self.project_summary.items(), key=lambda x: x[1].cost, reverse=True):
159             wfsum = []
160             for k2, r in sorted(prj.runs.items(), key=lambda x: x[1].cost, reverse=True):
161                 wfsum.append( """
162                 {name} -- count {count} -- average runtime {runtime} -- average cost per run ${cost}
163                 """.format(name=r.name, count=r.count, runtime=hours_to_runtime_str(r.hours/r.count), cost=round(r.cost/r.count, 2)))
164
165             bottomhtml.append(
166                 """<h2>{name}</h2>
167                 <p>Users: {users}</p>
168                 <p>{wfsum}</p>
169                 <p>Compute hours: {hours}</p>
170                 <p>Compute cost: ${cost}</p>
171                 """.format(name=prj.name,
172                            users=", ".join(prj.users),
173                            cost=round(prj.cost, 2),
174                            hours=round(prj.hours, 1),
175                            wfsum="</p><p>".join(wfsum))
176             )
177
178
179         summarizers = []
180         s1 = Summarizer()
181         s1.label = "Compute"
182         s1.tasks = collections.defaultdict(Task)
183
184         task = s1.tasks["Compute"]
185         task.series["Compute", "hours"].append((datetime.now() + timedelta(minutes=0), 1))
186         task.series["Compute", "hours"].append((datetime.today() + timedelta(minutes=2), 2))
187         task.series["Compute", "hours"].append((datetime.today() + timedelta(minutes=4), 1))
188
189         summarizers.append(s1)
190
191         return WEBCHART_CLASS(label, summarizers).html(tophtml, bottomhtml)
192
193     def flush_containers(self, pending, include_steps, exclude):
194         containers = {}
195
196         for container in arvados.util.keyset_list_all(
197             self.arv_client.containers().list,
198             filters=[
199                 ["uuid", "in", [c["container_uuid"] for c in pending if c["container_uuid"]]],
200             ],
201             select=["uuid", "started_at", "finished_at", "cost"]):
202
203             containers[container["uuid"]] = container
204
205         workflows = {}
206         workflows["none"] = "workflow run from command line"
207
208         for wf in arvados.util.keyset_list_all(
209                 self.arv_client.workflows().list,
210                 filters=[
211                     ["uuid", "in", list(set(c["properties"]["template_uuid"]
212                                             for c in pending
213                                             if "template_uuid" in c["properties"] and c["properties"]["template_uuid"].startswith(self.arv_client.config()["ClusterID"])))],
214                 ],
215                 select=["uuid", "name"]):
216             workflows[wf["uuid"]] = wf["name"]
217
218         projects = {}
219
220         for pr in arvados.util.keyset_list_all(
221                 self.arv_client.groups().list,
222                 filters=[
223                     ["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'j7d0g'))],
224                 ],
225                 select=["uuid", "name"]):
226             projects[pr["uuid"]] = pr["name"]
227
228         for pr in arvados.util.keyset_list_all(
229                 self.arv_client.users().list,
230                 filters=[
231                     ["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'tpzed')|set(c["modified_by_user_uuid"] for c in pending))],
232                 ],
233                 select=["uuid", "full_name", "first_name", "last_name"]):
234             projects[pr["uuid"]] = pr["full_name"]
235
236         if include_steps:
237             name_regex = re.compile(r"(.+)_[0-9]+")
238             child_crs = {}
239             child_cr_containers = set()
240             stepcount = 0
241
242             logging.info("Getting workflow steps")
243             for cr in arvados.util.keyset_list_all(
244                 self.arv_client.container_requests().list,
245                 filters=[
246                     ["requesting_container_uuid", "in", list(containers.keys())],
247                 ],
248                 select=["uuid", "name", "cumulative_cost", "requesting_container_uuid", "container_uuid"]):
249
250                 if cr["cumulative_cost"] == 0:
251                     continue
252
253                 g = name_regex.fullmatch(cr["name"])
254                 if g:
255                     cr["name"] = g[1]
256
257                 child_crs.setdefault(cr["requesting_container_uuid"], []).append(cr)
258                 child_cr_containers.add(cr["container_uuid"])
259                 if len(child_cr_containers) == 1000:
260                     stepcount += len(child_cr_containers)
261                     for container in arvados.util.keyset_list_all(
262                             self.arv_client.containers().list,
263                             filters=[
264                                 ["uuid", "in", list(child_cr_containers)],
265                             ],
266                             select=["uuid", "started_at", "finished_at", "cost"]):
267
268                         containers[container["uuid"]] = container
269
270                     logging.info("Got workflow steps %s - %s", stepcount-len(child_cr_containers), stepcount)
271                     child_cr_containers.clear()
272
273             if child_cr_containers:
274                 stepcount += len(child_cr_containers)
275                 for container in arvados.util.keyset_list_all(
276                         self.arv_client.containers().list,
277                         filters=[
278                             ["uuid", "in", list(child_cr_containers)],
279                         ],
280                         select=["uuid", "started_at", "finished_at", "cost"]):
281
282                     containers[container["uuid"]] = container
283                 logging.info("Got workflow steps %s - %s", stepcount-len(child_cr_containers), stepcount)
284
285         for container_request in pending:
286             if not container_request["container_uuid"] or not containers[container_request["container_uuid"]]["started_at"] or not containers[container_request["container_uuid"]]["finished_at"]:
287                 continue
288
289             template_uuid = container_request["properties"].get("template_uuid", "none")
290             workflowname = container_request["name"] if template_uuid == "none" else workflows.get(template_uuid, "workflow missing")
291
292             if exclude and re.search(exclude, workflowname, flags=re.IGNORECASE):
293                 continue
294
295             yield {
296                 "Project": projects.get(container_request["owner_uuid"], "unknown owner"),
297                 "ProjectUUID": container_request["owner_uuid"],
298                 "Workflow": workflowname,
299                 "WorkflowUUID": container_request["properties"].get("template_uuid", "none"),
300                 "Step": "workflow runner",
301                 "StepUUID": container_request["uuid"],
302                 "Sample": container_request["name"],
303                 "SampleUUID": container_request["uuid"],
304                 "User": projects.get(container_request["modified_by_user_uuid"], "unknown user"),
305                 "UserUUID": container_request["modified_by_user_uuid"],
306                 "Submitted": csv_dateformat(container_request["created_at"]),
307                 "Started": csv_dateformat(containers[container_request["container_uuid"]]["started_at"]),
308                 "Runtime": runtime_str(container_request, containers),
309                 "Cost": round(containers[container_request["container_uuid"]]["cost"] if include_steps else container_request["cumulative_cost"], 3),
310                 "CumulativeCost": round(container_request["cumulative_cost"], 3)
311                 }
312
313             if include_steps:
314                 for child_cr in child_crs.get(container_request["container_uuid"], []):
315                     if not child_cr["container_uuid"] or not containers[child_cr["container_uuid"]]["started_at"] or not containers[child_cr["container_uuid"]]["finished_at"]:
316                         continue
317                     yield {
318                         "Project": projects.get(container_request["owner_uuid"], "unknown owner"),
319                         "ProjectUUID": container_request["owner_uuid"],
320                         "Workflow": workflows.get(container_request["properties"].get("template_uuid", "none"), "workflow missing"),
321                         "WorkflowUUID": container_request["properties"].get("template_uuid", "none"),
322                         "Step": child_cr["name"],
323                         "StepUUID": child_cr["uuid"],
324                         "Sample": container_request["name"],
325                         "SampleUUID": container_request["name"],
326                         "User": projects.get(container_request["modified_by_user_uuid"], "unknown user"),
327                         "UserUUID": container_request["modified_by_user_uuid"],
328                         "Submitted": csv_dateformat(child_cr["created_at"]),
329                         "Started": csv_dateformat(containers[child_cr["container_uuid"]]["started_at"]),
330                         "Runtime": runtime_str(child_cr, containers),
331                         "Cost": round(containers[child_cr["container_uuid"]]["cost"], 3),
332                         "CumulativeCost": round(containers[child_cr["container_uuid"]]["cost"], 3),
333                         }
334
335
336     def collect_summary_stats(self, row):
337         self.active_users.add(row["User"])
338         self.project_summary.setdefault(row["ProjectUUID"],
339                                         ProjectSummary(users=set(),
340                                                        runs={},
341                                                        uuid=row["ProjectUUID"],
342                                                        name=row["Project"]))
343         prj = self.project_summary[row["ProjectUUID"]]
344         cost = row["Cost"]
345         prj.cost += cost
346         prj.count += 1
347         prj.users.add(row["User"])
348         hrs = runtime_in_hours(row["Runtime"])
349         prj.hours += hrs
350
351         if row["Step"] == "workflow runner":
352             prj.runs.setdefault(row["Workflow"], WorkflowRunSummary(name=row["Workflow"],
353                                                                     uuid=row["WorkflowUUID"]))
354             wfuuid = row["Workflow"]
355             prj.runs[wfuuid].count += 1
356             prj.runs[wfuuid].cost += row["CumulativeCost"]
357             prj.runs[wfuuid].hours += hrs
358
359         self.total_hours += hrs
360         self.total_cost += cost
361
362     def report_from_api(self, since, to, include_steps, exclude):
363         pending = []
364
365         count = 0
366         for container_request in arvados.util.keyset_list_all(
367                 self.arv_client.container_requests().list,
368                 filters=[
369                     ["command", "like", "[\"arvados-cwl-runner%"],
370                     ["created_at", ">=", since.strftime("%Y%m%dT%H%M%SZ")],
371                 ],
372                 select=["uuid", "owner_uuid", "container_uuid", "name", "cumulative_cost", "properties", "modified_by_user_uuid", "created_at"]):
373
374             if container_request["cumulative_cost"] == 0:
375                 continue
376
377             if len(pending) < 1000:
378                 pending.append(container_request)
379             else:
380                 count += len(pending)
381                 logging.info("Exporting workflow runs %s - %s", count-len(pending), count)
382                 for row in self.flush_containers(pending, include_steps, exclude):
383                     self.collect_summary_stats(row)
384                     yield row
385                 pending.clear()
386
387         count += len(pending)
388         logging.info("Exporting workflow runs %s - %s", count-len(pending), count)
389         for row in self.flush_containers(pending, include_steps, exclude):
390             self.collect_summary_stats(row)
391             yield row
392
393         userinfo = self.arv_client.users().list(filters=[["is_active", "=", True]], limit=0).execute()
394         self.total_users = userinfo["items_available"]
395
396     def csv_report(self, since, to, out, include_steps, columns, exclude):
397         if columns:
398             columns = columns.split(",")
399         else:
400             if include_steps:
401                 columns = ("Project", "Workflow", "Step", "Sample", "User", "Submitted", "Runtime", "Cost")
402             else:
403                 columns = ("Project", "Workflow", "Sample", "User", "Submitted", "Runtime", "Cost")
404
405         csvwriter = csv.DictWriter(out, fieldnames=columns, extrasaction="ignore")
406         csvwriter.writeheader()
407
408         for row in self.report_from_api(since, to, include_steps, exclude):
409             csvwriter.writerow(row)