2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
14 from datetime import datetime, timedelta
17 from dataclasses import dataclass
19 import crunchstat_summary.dygraphs
20 from crunchstat_summary.summarizer import Task
23 class WorkflowRunSummary:
34 runs: dict[str, WorkflowRunSummary]
43 tasks: collections.defaultdict[str, Task]
49 def date_export(item):
50 if isinstance(item, datetime):
51 return """@new Date("{}")@""".format(item.strftime("%Y-%m-%dT%H:%M:%SZ"))
53 class ReportChart(crunchstat_summary.dygraphs.DygraphsChart):
57 'label': s.long_label(),
59 self.chartdata(s.label, s.tasks, stat)
60 for stat in (('Compute', ['hours']),
65 for s in self.summarizers]
68 return 'var chartdata = {};\n{}'.format(
69 json.dumps(self.sections(), default=date_export).replace('"@', '').replace('@"', '').replace('\\"', '"'),
70 '\n'.join([pkg_resources.resource_string('crunchstat_summary', jsa).decode('utf-8') for jsa in self.JSASSETS]))
72 def _collate_data(self, tasks, stats):
75 # uuid is category for crunch2
77 for uuid, task in tasks.items():
78 # All stats in a category are assumed to have the same time base and same number of samples
80 series_names = stats[1]
82 series = task.series[(category,sn0)]
83 for i in range(len(series)):
85 vals = [task.series[(category,stat)][i][1] for stat in series_names[1:]]
86 data.append([pt[0]] + nulls + [pt[1]] + vals)
91 WEBCHART_CLASS = ReportChart
94 def runtime_str(container_request, containers):
95 length = ciso8601.parse_datetime(containers[container_request["container_uuid"]]["finished_at"]) - ciso8601.parse_datetime(containers[container_request["container_uuid"]]["started_at"])
97 hours = length.days * 24 + (length.seconds // 3600)
98 minutes = (length.seconds // 60) % 60
99 seconds = length.seconds % 60
101 return "%i:%02i:%02i" % (hours, minutes, seconds)
103 def runtime_in_hours(runtime):
104 sp = runtime.split(":")
106 hours += float(sp[1]) / 60
107 hours += float(sp[2]) / 3600
110 def hours_to_runtime_str(frac_hours):
111 hours = math.floor(frac_hours)
112 minutes = (frac_hours - math.floor(frac_hours)) * 60.0
113 seconds = (minutes - math.floor(minutes)) * 60.0
115 return "%i:%02i:%02i" % (hours, minutes, seconds)
118 def csv_dateformat(date):
119 dt = ciso8601.parse_datetime(date)
120 return dt.strftime("%Y-%m-%d %H:%M%S")
123 class ClusterActivityReport(object):
124 def __init__(self, label=None, threads=1, **kwargs):
125 self.label = "Cluster report"
126 self.threadcount = threads
127 self.arv_client = arvados.api()
129 self.active_users = set()
130 self.project_summary = {}
137 def html_report(self, since, to, exclude):
139 for row in self.report_from_api(since, to, True, exclude):
146 tophtml = """<h2>Summary</h2>
147 <p>Total user accounts: {total_users}</p>
148 <p>Active users: {active_users}</p>
149 <p>Aggregate compute hours: {total_hours}</p>
150 <p>Aggregate compute cost: ${total_cost}</p>
151 """.format(active_users=len(self.active_users),
152 total_users=self.total_users,
153 total_hours=round(self.total_hours, 1),
154 total_cost=round(self.total_cost, 2))
158 for k, prj in sorted(self.project_summary.items(), key=lambda x: x[1].cost, reverse=True):
160 for k2, r in sorted(prj.runs.items(), key=lambda x: x[1].cost, reverse=True):
162 {name} -- count {count} -- average runtime {runtime} -- average cost per run ${cost}
163 """.format(name=r.name, count=r.count, runtime=hours_to_runtime_str(r.hours/r.count), cost=round(r.cost/r.count, 2)))
167 <p>Users: {users}</p>
169 <p>Compute hours: {hours}</p>
170 <p>Compute cost: ${cost}</p>
171 """.format(name=prj.name,
172 users=", ".join(prj.users),
173 cost=round(prj.cost, 2),
174 hours=round(prj.hours, 1),
175 wfsum="</p><p>".join(wfsum))
182 s1.tasks = collections.defaultdict(Task)
184 task = s1.tasks["Compute"]
185 task.series["Compute", "hours"].append((datetime.now() + timedelta(minutes=0), 1))
186 task.series["Compute", "hours"].append((datetime.today() + timedelta(minutes=2), 2))
187 task.series["Compute", "hours"].append((datetime.today() + timedelta(minutes=4), 1))
189 summarizers.append(s1)
191 return WEBCHART_CLASS(label, summarizers).html(tophtml, bottomhtml)
193 def flush_containers(self, pending, include_steps, exclude):
196 for container in arvados.util.keyset_list_all(
197 self.arv_client.containers().list,
199 ["uuid", "in", [c["container_uuid"] for c in pending if c["container_uuid"]]],
201 select=["uuid", "started_at", "finished_at", "cost"]):
203 containers[container["uuid"]] = container
206 workflows["none"] = "workflow run from command line"
208 for wf in arvados.util.keyset_list_all(
209 self.arv_client.workflows().list,
211 ["uuid", "in", list(set(c["properties"]["template_uuid"]
213 if "template_uuid" in c["properties"] and c["properties"]["template_uuid"].startswith(self.arv_client.config()["ClusterID"])))],
215 select=["uuid", "name"]):
216 workflows[wf["uuid"]] = wf["name"]
220 for pr in arvados.util.keyset_list_all(
221 self.arv_client.groups().list,
223 ["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'j7d0g'))],
225 select=["uuid", "name"]):
226 projects[pr["uuid"]] = pr["name"]
228 for pr in arvados.util.keyset_list_all(
229 self.arv_client.users().list,
231 ["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'tpzed')|set(c["modified_by_user_uuid"] for c in pending))],
233 select=["uuid", "full_name", "first_name", "last_name"]):
234 projects[pr["uuid"]] = pr["full_name"]
237 name_regex = re.compile(r"(.+)_[0-9]+")
239 child_cr_containers = set()
242 logging.info("Getting workflow steps")
243 for cr in arvados.util.keyset_list_all(
244 self.arv_client.container_requests().list,
246 ["requesting_container_uuid", "in", list(containers.keys())],
248 select=["uuid", "name", "cumulative_cost", "requesting_container_uuid", "container_uuid"]):
250 if cr["cumulative_cost"] == 0:
253 g = name_regex.fullmatch(cr["name"])
257 child_crs.setdefault(cr["requesting_container_uuid"], []).append(cr)
258 child_cr_containers.add(cr["container_uuid"])
259 if len(child_cr_containers) == 1000:
260 stepcount += len(child_cr_containers)
261 for container in arvados.util.keyset_list_all(
262 self.arv_client.containers().list,
264 ["uuid", "in", list(child_cr_containers)],
266 select=["uuid", "started_at", "finished_at", "cost"]):
268 containers[container["uuid"]] = container
270 logging.info("Got workflow steps %s - %s", stepcount-len(child_cr_containers), stepcount)
271 child_cr_containers.clear()
273 if child_cr_containers:
274 stepcount += len(child_cr_containers)
275 for container in arvados.util.keyset_list_all(
276 self.arv_client.containers().list,
278 ["uuid", "in", list(child_cr_containers)],
280 select=["uuid", "started_at", "finished_at", "cost"]):
282 containers[container["uuid"]] = container
283 logging.info("Got workflow steps %s - %s", stepcount-len(child_cr_containers), stepcount)
285 for container_request in pending:
286 if not container_request["container_uuid"] or not containers[container_request["container_uuid"]]["started_at"] or not containers[container_request["container_uuid"]]["finished_at"]:
289 template_uuid = container_request["properties"].get("template_uuid", "none")
290 workflowname = container_request["name"] if template_uuid == "none" else workflows.get(template_uuid, "workflow missing")
292 if exclude and re.search(exclude, workflowname, flags=re.IGNORECASE):
296 "Project": projects.get(container_request["owner_uuid"], "unknown owner"),
297 "ProjectUUID": container_request["owner_uuid"],
298 "Workflow": workflowname,
299 "WorkflowUUID": container_request["properties"].get("template_uuid", "none"),
300 "Step": "workflow runner",
301 "StepUUID": container_request["uuid"],
302 "Sample": container_request["name"],
303 "SampleUUID": container_request["uuid"],
304 "User": projects.get(container_request["modified_by_user_uuid"], "unknown user"),
305 "UserUUID": container_request["modified_by_user_uuid"],
306 "Submitted": csv_dateformat(container_request["created_at"]),
307 "Started": csv_dateformat(containers[container_request["container_uuid"]]["started_at"]),
308 "Runtime": runtime_str(container_request, containers),
309 "Cost": round(containers[container_request["container_uuid"]]["cost"] if include_steps else container_request["cumulative_cost"], 3),
310 "CumulativeCost": round(container_request["cumulative_cost"], 3)
314 for child_cr in child_crs.get(container_request["container_uuid"], []):
315 if not child_cr["container_uuid"] or not containers[child_cr["container_uuid"]]["started_at"] or not containers[child_cr["container_uuid"]]["finished_at"]:
318 "Project": projects.get(container_request["owner_uuid"], "unknown owner"),
319 "ProjectUUID": container_request["owner_uuid"],
320 "Workflow": workflows.get(container_request["properties"].get("template_uuid", "none"), "workflow missing"),
321 "WorkflowUUID": container_request["properties"].get("template_uuid", "none"),
322 "Step": child_cr["name"],
323 "StepUUID": child_cr["uuid"],
324 "Sample": container_request["name"],
325 "SampleUUID": container_request["name"],
326 "User": projects.get(container_request["modified_by_user_uuid"], "unknown user"),
327 "UserUUID": container_request["modified_by_user_uuid"],
328 "Submitted": csv_dateformat(child_cr["created_at"]),
329 "Started": csv_dateformat(containers[child_cr["container_uuid"]]["started_at"]),
330 "Runtime": runtime_str(child_cr, containers),
331 "Cost": round(containers[child_cr["container_uuid"]]["cost"], 3),
332 "CumulativeCost": round(containers[child_cr["container_uuid"]]["cost"], 3),
336 def collect_summary_stats(self, row):
337 self.active_users.add(row["User"])
338 self.project_summary.setdefault(row["ProjectUUID"],
339 ProjectSummary(users=set(),
341 uuid=row["ProjectUUID"],
342 name=row["Project"]))
343 prj = self.project_summary[row["ProjectUUID"]]
347 prj.users.add(row["User"])
348 hrs = runtime_in_hours(row["Runtime"])
351 if row["Step"] == "workflow runner":
352 prj.runs.setdefault(row["Workflow"], WorkflowRunSummary(name=row["Workflow"],
353 uuid=row["WorkflowUUID"]))
354 wfuuid = row["Workflow"]
355 prj.runs[wfuuid].count += 1
356 prj.runs[wfuuid].cost += row["CumulativeCost"]
357 prj.runs[wfuuid].hours += hrs
359 self.total_hours += hrs
360 self.total_cost += cost
362 def report_from_api(self, since, to, include_steps, exclude):
366 for container_request in arvados.util.keyset_list_all(
367 self.arv_client.container_requests().list,
369 ["command", "like", "[\"arvados-cwl-runner%"],
370 ["created_at", ">=", since.strftime("%Y%m%dT%H%M%SZ")],
372 select=["uuid", "owner_uuid", "container_uuid", "name", "cumulative_cost", "properties", "modified_by_user_uuid", "created_at"]):
374 if container_request["cumulative_cost"] == 0:
377 if len(pending) < 1000:
378 pending.append(container_request)
380 count += len(pending)
381 logging.info("Exporting workflow runs %s - %s", count-len(pending), count)
382 for row in self.flush_containers(pending, include_steps, exclude):
383 self.collect_summary_stats(row)
387 count += len(pending)
388 logging.info("Exporting workflow runs %s - %s", count-len(pending), count)
389 for row in self.flush_containers(pending, include_steps, exclude):
390 self.collect_summary_stats(row)
393 userinfo = self.arv_client.users().list(filters=[["is_active", "=", True]], limit=0).execute()
394 self.total_users = userinfo["items_available"]
396 def csv_report(self, since, to, out, include_steps, columns, exclude):
398 columns = columns.split(",")
401 columns = ("Project", "Workflow", "Step", "Sample", "User", "Submitted", "Runtime", "Cost")
403 columns = ("Project", "Workflow", "Sample", "User", "Submitted", "Runtime", "Cost")
405 csvwriter = csv.DictWriter(out, fieldnames=columns, extrasaction="ignore")
406 csvwriter.writeheader()
408 for row in self.report_from_api(since, to, include_steps, exclude):
409 csvwriter.writerow(row)