2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
14 from datetime import date, datetime, timedelta
15 from typing import List
18 from dataclasses import dataclass
19 from arvados_cluster_activity.prometheus import get_metric_usage, get_data_usage
20 from arvados_cluster_activity.reportchart import ReportChart
24 class WorkflowRunSummary:
36 runs: dict[str, WorkflowRunSummary]
37 earliest: datetime = datetime(year=9999, month=1, day=1)
38 latest: datetime = datetime(year=1900, month=1, day=1)
43 activityspan: str = ""
47 def aws_monthly_cost(value):
48 value_gb = value / (1024*1024*1024)
49 first_50tb = min(1024*50, value_gb)
50 next_450tb = max(min(1024*450, value_gb-1024*50), 0)
51 over_500tb = max(value_gb-1024*500, 0)
53 monthly_cost = (first_50tb * 0.023) + (next_450tb * 0.022) + (over_500tb * 0.021)
57 def format_with_suffix_base2(summary_value):
58 for scale in ["KiB", "MiB", "GiB", "TiB", "PiB", "EiB"]:
59 summary_value = summary_value / 1024
60 if summary_value < 1024:
61 return "%.3f %s" % (summary_value, scale)
63 def format_with_suffix_base10(summary_value):
64 for scale in ["KB", "MB", "GB", "TB", "PB", "EB"]:
65 summary_value = summary_value / 1000
66 if summary_value < 1000:
67 return "%.3f %s" % (summary_value, scale)
69 containers_graph = ('Concurrent running containers', 'containers')
70 storage_graph = ('Storage usage', 'used')
71 managed_graph = ('Data under management', 'managed')
74 def runtime_str(container_request, containers):
75 length = ciso8601.parse_datetime(containers[container_request["container_uuid"]]["finished_at"]) - ciso8601.parse_datetime(containers[container_request["container_uuid"]]["started_at"])
77 hours = length.days * 24 + (length.seconds // 3600)
78 minutes = (length.seconds // 60) % 60
79 seconds = length.seconds % 60
81 return "%i:%02i:%02i" % (hours, minutes, seconds)
83 def runtime_in_hours(runtime):
84 sp = runtime.split(":")
86 hours += float(sp[1]) / 60
87 hours += float(sp[2]) / 3600
90 def hours_to_runtime_str(frac_hours):
91 hours = math.floor(frac_hours)
92 minutes = (frac_hours - math.floor(frac_hours)) * 60.0
93 seconds = (minutes - math.floor(minutes)) * 60.0
95 return "%i:%02i:%02i" % (hours, minutes, seconds)
98 def csv_dateformat(datestr):
99 dt = ciso8601.parse_datetime(datestr)
100 return dt.strftime("%Y-%m-%d %H:%M:%S")
103 class ClusterActivityReport(object):
104 def __init__(self, prom_client):
105 self.arv_client = arvados.api()
106 self.prom_client = prom_client
107 self.cluster = self.arv_client.config()["ClusterID"]
109 self.active_users = set()
110 self.project_summary = {}
113 self.total_workflows = 0
114 self.storage_cost = 0
115 self.summary_fetched = False
118 def collect_graph(self, since, to, metric, resample_to, extra=None):
119 if not self.prom_client:
124 for series in get_metric_usage(self.prom_client, since, to, metric % self.cluster, resampleTo=resample_to):
125 for t in series.itertuples():
126 flatdata.append([t[0], t[1]])
132 def collect_storage_cost(self, timestamp, value):
133 self.storage_cost += aws_monthly_cost(value) / (30*24)
135 def html_report(self, since, to, exclude, include_workflow_steps):
136 """Get a cluster activity report for the desired time period,
137 returning a string containing the report as an HTML document."""
139 self.label = "Cluster report for %s from %s to %s" % (self.cluster, since.date(), to.date())
141 # If we already produced a CSV report we have summary stats
142 # and don't need to fetch everything all over again.
143 if not self.summary_fetched:
144 for row in self.report_from_api(since, to, include_workflow_steps, exclude):
147 container_cumulative_hours = 0
148 def collect_container_hours(timestamp, value):
149 nonlocal container_cumulative_hours
150 # resampled to 5 minute increments but we want
152 container_cumulative_hours += value / 12
154 logging.info("Getting container hours time series")
156 self.graphs[containers_graph] = self.collect_graph(since, to,
157 "arvados_dispatchcloud_containers_running{cluster='%s'}",
159 extra=collect_container_hours
162 logging.info("Getting data usage time series")
163 self.graphs[managed_graph] = self.collect_graph(since, to,
164 "arvados_keep_collection_bytes{cluster='%s'}", resample_to="60min")
166 self.graphs[storage_graph] = self.collect_graph(since, to,
167 "arvados_keep_total_bytes{cluster='%s'}", resample_to="60min",
168 extra=self.collect_storage_cost)
170 managed_data_now = None
171 storage_used_now = None
173 if len(self.graphs.get(managed_graph, [])) > 0:
174 managed_data_now = self.graphs[managed_graph][-1][1]
176 if len(self.graphs.get(storage_graph, [])) > 0:
177 storage_used_now = self.graphs[storage_graph][-1][1]
179 if managed_data_now and storage_used_now:
180 storage_cost = aws_monthly_cost(storage_used_now)
181 dedup_ratio = managed_data_now/storage_used_now
188 workbench = self.arv_client.config()["Services"]["Workbench2"]["ExternalURL"]
189 if workbench.endswith("/"):
190 workbench = workbench[:-1]
192 print(to.date(), self.today())
193 if to.date() == self.today():
194 # The deduplication ratio overstates things a bit, you can
195 # have collections which reference a small slice of a large
196 # block, and this messes up the intuitive value of this ratio
197 # and exagerates the effect.
199 # So for now, as much fun as this is, I'm excluding it from
202 # dedup_savings = aws_monthly_cost(managed_data_now) - storage_cost
203 # <tr><th>Monthly savings from storage deduplication</th> <td>${dedup_savings:,.2f}</td></tr>
206 if managed_data_now and storage_used_now:
208 <tr><th>Total data under management</th> <td>{managed_data_now}</td></tr>
209 <tr><th>Total storage usage</th> <td>{storage_used_now}</td></tr>
210 <tr><th>Deduplication ratio</th> <td>{dedup_ratio:.1f}</td></tr>
211 <tr><th>Approximate monthly storage cost</th> <td>${storage_cost:,.2f}</td></tr>
213 managed_data_now=format_with_suffix_base10(managed_data_now),
214 storage_used_now=format_with_suffix_base10(storage_used_now),
215 storage_cost=storage_cost,
216 dedup_ratio=dedup_ratio,
219 cards.append("""<h2>Cluster status as of {now}</h2>
220 <table class='aggtable'><tbody>
221 <tr><th><a href="{workbench}/users">Total users</a></th><td>{total_users}</td></tr>
222 <tr><th>Total projects</th><td>{total_projects}</td></tr>
225 <p>See <a href="#prices">note on usage and cost calculations</a> for details on how costs are calculated.</p>
226 """.format(now=self.today(),
227 total_users=self.total_users,
228 total_projects=self.total_projects,
230 data_rows=data_rows))
232 cards.append("""<h2>Activity and cost over the {reporting_days} day period {since} to {to}</h2>
233 <table class='aggtable'><tbody>
234 <tr><th>Active users</th> <td>{active_users}</td></tr>
235 <tr><th><a href="#Active_Projects">Active projects</a></th> <td>{active_projects}</td></tr>
236 <tr><th>Workflow runs</th> <td>{total_workflows:,}</td></tr>
237 <tr><th>Compute used</th> <td>{total_hours:,.1f} hours</td></tr>
238 <tr><th>Compute cost</th> <td>${total_cost:,.2f}</td></tr>
239 <tr><th>Storage cost</th> <td>${storage_cost:,.2f}</td></tr>
241 <p>See <a href="#prices">note on usage and cost calculations</a> for details on how costs are calculated.</p>
242 """.format(active_users=len(self.active_users),
243 total_users=self.total_users,
244 #total_hours=container_cumulative_hours,
245 total_hours=self.total_hours,
246 total_cost=self.total_cost,
247 total_workflows=self.total_workflows,
248 active_projects=len(self.project_summary),
249 since=since.date(), to=to.date(),
250 reporting_days=(to - since).days,
251 storage_cost=self.storage_cost))
253 projectlist = sorted(self.project_summary.items(), key=lambda x: x[1].cost, reverse=True)
255 for k, prj in projectlist:
256 if prj.earliest.date() == prj.latest.date():
257 prj.activityspan = "{}".format(prj.earliest.date())
259 prj.activityspan = "{} to {}".format(prj.earliest.date(), prj.latest.date())
261 prj.tablerow = """<td>{users}</td> <td>{active}</td> <td>{hours:,.1f}</td> <td>${cost:,.2f}</td>""".format(
262 active=prj.activityspan,
265 users=", ".join(prj.users),
269 <div id="chart"></div>
274 <a id="Active_Projects"><h2>Active Projects</h2></a>
275 <table class='sortable active-projects'>
276 <thead><tr><th>Project</th> <th>Users</th> <th>Active</th> <th>Compute usage (hours)</th> <th>Compute cost</th> </tr></thead>
277 <tbody><tr>{projects}</tr></tbody>
279 <p>See <a href="#prices">note on usage and cost calculations</a> for details on how costs are calculated.</p>
280 """.format(projects="</tr>\n<tr>".join("""<td><a href="#{name}">{name}</a></td>{rest}""".format(name=prj.name, rest=prj.tablerow) for k, prj in projectlist)))
282 for k, prj in projectlist:
284 for k2, r in sorted(prj.runs.items(), key=lambda x: x[1].count, reverse=True):
286 <tr><td>{count}</td> <td>{workflowlink}</td> <td>{median_runtime}</td> <td>{mean_runtime}</td> <td>${median_cost:,.2f}</td> <td>${mean_cost:,.2f}</td> <td>${totalcost:,.2f}</td></tr>
289 mean_runtime=hours_to_runtime_str(statistics.mean(r.hours)),
290 median_runtime=hours_to_runtime_str(statistics.median(r.hours)),
291 mean_cost=statistics.mean(r.cost),
292 median_cost=statistics.median(r.cost),
293 totalcost=sum(r.cost),
294 workflowlink="""<a href="{workbench}/workflows/{uuid}">{name}</a>""".format(workbench=workbench,uuid=r.uuid,name=r.name)
295 if r.uuid != "none" else r.name))
298 """<a id="{name}"></a><a href="{workbench}/projects/{uuid}"><h2>{name}</h2></a>
300 <table class='sortable single-project'>
301 <thead><tr> <th>Users</th> <th>Active</th> <th>Compute usage (hours)</th> <th>Compute cost</th> </tr></thead>
302 <tbody><tr>{projectrow}</tr></tbody>
305 <table class='sortable project'>
306 <thead><tr><th>Workflow run count</th> <th>Workflow name</th> <th>Median runtime</th> <th>Mean runtime</th> <th>Median cost per run</th> <th>Mean cost per run</th> <th>Sum cost over runs</th></tr></thead>
310 """.format(name=prj.name,
311 users=", ".join(prj.users),
314 wfsum=" ".join(wfsum),
315 earliest=prj.earliest.date(),
316 latest=prj.latest.date(),
317 activity=prj.activityspan,
318 userplural='s' if len(prj.users) > 1 else '',
319 projectrow=prj.tablerow,
324 # The deduplication ratio overstates things a bit, you can
325 # have collections which reference a small slice of a large
326 # block, and this messes up the intuitive value of this ratio
327 # and exagerates the effect.
329 # So for now, as much fun as this is, I'm excluding it from
332 # <p>"Monthly savings from storage deduplication" is the
333 # estimated cost difference between "storage usage" and "data
334 # under management" as a way of comparing with other
335 # technologies that do not support data deduplication.</p>
339 <h2 id="prices">Note on usage and cost calculations</h2>
341 <div style="max-width: 60em">
343 <p>The numbers presented in this report are estimates and will
344 not perfectly match your cloud bill. Nevertheless this report
345 should be useful for identifying your main cost drivers.</p>
349 <p>"Total data under management" is what you get if you add up
350 all blocks referenced by all collections in Workbench, without
351 considering deduplication.</p>
353 <p>"Total storage usage" is the actual underlying storage
354 usage, accounting for data deduplication.</p>
356 <p>Storage costs are based on AWS "S3 Standard"
357 described on the <a href="https://aws.amazon.com/s3/pricing/">Amazon S3 pricing</a> page:</p>
360 <li>$0.023 per GB / Month for the first 50 TB</li>
361 <li>$0.022 per GB / Month for the next 450 TB</li>
362 <li>$0.021 per GB / Month over 500 TB</li>
365 <p>Finally, this only the base storage cost, and does not
366 include any fees associated with S3 API usage. However, there
367 are generally no ingress/egress fees if your Arvados instance
368 and S3 bucket are in the same region, which is the normal
369 recommended configuration.</p>
373 <p>"Compute usage" are instance-hours used in running
374 workflows. Because multiple steps may run in parallel on
375 multiple instances, a workflow that completes in four hours
376 but runs parallel steps on five instances, would be reported
377 as using 20 instance hours.</p>
379 <p>"Runtime" is the actual wall clock time that it took to
380 complete a workflow. This does not include time spent in the
381 queue for the workflow itself, but does include queuing time
382 of individual workflow steps.</p>
384 <p>Computational costs are derived from Arvados cost
385 calculations of container runs. For on-demand instances, this
386 uses the prices from the InstanceTypes section of the Arvado
387 config file, set by the system administrator. For spot
388 instances, this uses current spot prices retrieved on the fly
391 <p>Be aware that the cost calculations are only for the time
392 the container is running and only do not take into account the
393 overhead of launching instances or idle time between scheduled
394 tasks or prior to automatic shutdown.</p>
399 return ReportChart(label, cards, self.graphs).html()
401 def iter_container_info(self, pending, include_steps, exclude):
402 # "pending" is a list of arvados-cwl-runner container requests
403 # returned by the API. This method fetches detailed
404 # information about the runs and yields report rows.
406 # 1. Get container records corresponding to container requests.
409 for container in arvados.util.keyset_list_all(
410 self.arv_client.containers().list,
412 ["uuid", "in", [c["container_uuid"] for c in pending if c["container_uuid"]]],
414 select=["uuid", "started_at", "finished_at", "cost"]):
416 containers[container["uuid"]] = container
418 # 2. Look for the template_uuid property and fetch the
419 # corresponding workflow record.
421 workflows["none"] = "workflow run from command line"
423 for wf in arvados.util.keyset_list_all(
424 self.arv_client.workflows().list,
426 ["uuid", "in", list(set(c["properties"]["template_uuid"]
428 if "template_uuid" in c["properties"] and c["properties"]["template_uuid"].startswith(self.arv_client.config()["ClusterID"])))],
430 select=["uuid", "name"]):
431 workflows[wf["uuid"]] = wf["name"]
433 # 3. Look at owner_uuid and fetch owning projects and users
436 for pr in arvados.util.keyset_list_all(
437 self.arv_client.groups().list,
439 ["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'j7d0g'))],
441 select=["uuid", "name"]):
442 projects[pr["uuid"]] = pr["name"]
444 # 4. Look at owner_uuid and modified_by_user_uuid and get user records
445 for pr in arvados.util.keyset_list_all(
446 self.arv_client.users().list,
448 ["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'tpzed')|set(c["modified_by_user_uuid"] for c in pending))],
450 select=["uuid", "full_name", "first_name", "last_name"]):
451 projects[pr["uuid"]] = pr["full_name"]
453 # 5. Optionally iterate over individual workflow steps.
455 name_regex = re.compile(r"(.+)_[0-9]+")
457 child_cr_containers = set()
460 # 5.1. Go through the container requests owned by the toplevel workflow container
461 logging.info("Getting workflow steps")
462 for cr in arvados.util.keyset_list_all(
463 self.arv_client.container_requests().list,
465 ["requesting_container_uuid", "in", list(containers.keys())],
467 select=["uuid", "name", "cumulative_cost", "requesting_container_uuid", "container_uuid"]):
469 if cr["cumulative_cost"] == 0:
472 g = name_regex.fullmatch(cr["name"])
476 # 5.2. Get the containers corresponding to the
477 # container requests. This has the same logic as
478 # report_from_api where we batch it into 1000 items at
480 child_crs.setdefault(cr["requesting_container_uuid"], []).append(cr)
481 child_cr_containers.add(cr["container_uuid"])
482 if len(child_cr_containers) == 1000:
483 stepcount += len(child_cr_containers)
484 for container in arvados.util.keyset_list_all(
485 self.arv_client.containers().list,
487 ["uuid", "in", list(child_cr_containers)],
489 select=["uuid", "started_at", "finished_at", "cost"]):
491 containers[container["uuid"]] = container
493 logging.info("Got workflow steps %s - %s", stepcount-len(child_cr_containers), stepcount)
494 child_cr_containers.clear()
496 # Get any remaining containers
497 if child_cr_containers:
498 stepcount += len(child_cr_containers)
499 for container in arvados.util.keyset_list_all(
500 self.arv_client.containers().list,
502 ["uuid", "in", list(child_cr_containers)],
504 select=["uuid", "started_at", "finished_at", "cost"]):
506 containers[container["uuid"]] = container
507 logging.info("Got workflow steps %s - %s", stepcount-len(child_cr_containers), stepcount)
509 # 6. Now go through the list of workflow runs, yield a row
510 # with all the information we have collected, as well as the
511 # details for each workflow step (if enabled)
512 for container_request in pending:
513 if not container_request["container_uuid"] or not containers[container_request["container_uuid"]]["started_at"] or not containers[container_request["container_uuid"]]["finished_at"]:
516 template_uuid = container_request["properties"].get("template_uuid", "none")
517 workflowname = container_request["name"] if template_uuid == "none" else workflows.get(template_uuid, template_uuid)
519 if exclude and re.search(exclude, workflowname, flags=re.IGNORECASE):
523 "Project": projects.get(container_request["owner_uuid"], "unknown owner"),
524 "ProjectUUID": container_request["owner_uuid"],
525 "Workflow": workflowname,
526 "WorkflowUUID": container_request["properties"].get("template_uuid", "none"),
527 "Step": "workflow runner",
528 "StepUUID": container_request["uuid"],
529 "Sample": container_request["name"],
530 "SampleUUID": container_request["uuid"],
531 "User": projects.get(container_request["modified_by_user_uuid"], "unknown user"),
532 "UserUUID": container_request["modified_by_user_uuid"],
533 "Submitted": csv_dateformat(container_request["created_at"]),
534 "Started": csv_dateformat(containers[container_request["container_uuid"]]["started_at"]),
535 "Finished": csv_dateformat(containers[container_request["container_uuid"]]["finished_at"]),
536 "Runtime": runtime_str(container_request, containers),
537 "Cost": round(containers[container_request["container_uuid"]]["cost"] if include_steps else container_request["cumulative_cost"], 3),
538 "CumulativeCost": round(container_request["cumulative_cost"], 3)
542 for child_cr in child_crs.get(container_request["container_uuid"], []):
543 if not child_cr["container_uuid"] or not containers[child_cr["container_uuid"]]["started_at"] or not containers[child_cr["container_uuid"]]["finished_at"]:
546 "Project": projects.get(container_request["owner_uuid"], "unknown owner"),
547 "ProjectUUID": container_request["owner_uuid"],
548 "Workflow": workflows.get(container_request["properties"].get("template_uuid", "none"), "workflow missing"),
549 "WorkflowUUID": container_request["properties"].get("template_uuid", "none"),
550 "Step": child_cr["name"],
551 "StepUUID": child_cr["uuid"],
552 "Sample": container_request["name"],
553 "SampleUUID": container_request["name"],
554 "User": projects.get(container_request["modified_by_user_uuid"], "unknown user"),
555 "UserUUID": container_request["modified_by_user_uuid"],
556 "Submitted": csv_dateformat(child_cr["created_at"]),
557 "Started": csv_dateformat(containers[child_cr["container_uuid"]]["started_at"]),
558 "Finished": csv_dateformat(containers[child_cr["container_uuid"]]["finished_at"]),
559 "Runtime": runtime_str(child_cr, containers),
560 "Cost": round(containers[child_cr["container_uuid"]]["cost"], 3),
561 "CumulativeCost": round(containers[child_cr["container_uuid"]]["cost"], 3),
565 def collect_summary_stats(self, row):
566 self.active_users.add(row["User"])
567 self.project_summary.setdefault(row["ProjectUUID"],
568 ProjectSummary(users=set(),
570 uuid=row["ProjectUUID"],
571 name=row["Project"]))
572 prj = self.project_summary[row["ProjectUUID"]]
576 prj.users.add(row["User"])
577 hrs = runtime_in_hours(row["Runtime"])
580 started = datetime.strptime(row["Started"], "%Y-%m-%d %H:%M:%S")
581 finished = datetime.strptime(row["Finished"], "%Y-%m-%d %H:%M:%S")
583 if started < prj.earliest:
584 prj.earliest = started
586 if finished > prj.latest:
587 prj.latest = finished
589 if row["Step"] == "workflow runner":
590 prj.runs.setdefault(row["Workflow"], WorkflowRunSummary(name=row["Workflow"],
591 uuid=row["WorkflowUUID"],
593 wfuuid = row["Workflow"]
594 prj.runs[wfuuid].count += 1
595 prj.runs[wfuuid].cost.append(row["CumulativeCost"])
596 prj.runs[wfuuid].hours.append(hrs)
597 self.total_workflows += 1
599 self.total_hours += hrs
600 self.total_cost += cost
602 def report_from_api(self, since, to, include_steps, exclude):
606 for container_request in arvados.util.keyset_list_all(
607 self.arv_client.container_requests().list,
609 ["command", "like", "[\"arvados-cwl-runner%"],
610 ["created_at", ">=", since.strftime("%Y%m%dT%H%M%SZ")],
611 ["created_at", "<=", to.strftime("%Y%m%dT%H%M%SZ")],
613 select=["uuid", "owner_uuid", "container_uuid", "name", "cumulative_cost", "properties", "modified_by_user_uuid", "created_at"]):
615 if container_request["cumulative_cost"] == 0:
618 # Every 1000 container requests, we fetch the
619 # corresponding container records.
621 # What's so special about 1000? Because that's the
622 # maximum Arvados page size, so when we use ['uuid', 'in',
623 # [...]] to fetch associated records it doesn't make sense
624 # to provide more than 1000 uuids.
626 # TODO: use the ?include=container_uuid feature so a
627 # separate request to the containers table isn't necessary.
628 if len(pending) < 1000:
629 pending.append(container_request)
631 count += len(pending)
632 logging.info("Exporting workflow runs %s - %s", count-len(pending), count)
633 for row in self.iter_container_info(pending, include_steps, exclude):
634 self.collect_summary_stats(row)
638 count += len(pending)
639 logging.info("Exporting workflow runs %s - %s", count-len(pending), count)
640 for row in self.iter_container_info(pending, include_steps, exclude):
641 self.collect_summary_stats(row)
644 userinfo = self.arv_client.users().list(filters=[["is_active", "=", True]], limit=0).execute()
645 self.total_users = userinfo["items_available"]
647 groupinfo = self.arv_client.groups().list(filters=[["group_class", "=", "project"]], limit=0).execute()
648 self.total_projects = groupinfo["items_available"]
650 def csv_report(self, since, to, out, include_steps, columns, exclude):
652 columns = columns.split(",")
655 columns = ("Project", "Workflow", "Step", "Sample", "User", "Submitted", "Runtime", "Cost")
657 columns = ("Project", "Workflow", "Sample", "User", "Submitted", "Runtime", "Cost")
659 csvwriter = csv.DictWriter(out, fieldnames=columns, extrasaction="ignore")
660 csvwriter.writeheader()
662 for row in self.report_from_api(since, to, include_steps, exclude):
663 csvwriter.writerow(row)
665 self.summary_fetched = True