2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
14 from datetime import date, datetime, timedelta
15 from typing import List
18 from dataclasses import dataclass
19 from arvados_cluster_activity.prometheus import get_metric_usage, get_data_usage
20 from arvados_cluster_activity.reportchart import ReportChart
24 class WorkflowRunSummary:
36 runs: dict[str, WorkflowRunSummary]
37 earliest: datetime = datetime(year=9999, month=1, day=1)
38 latest: datetime = datetime(year=1900, month=1, day=1)
43 activityspan: str = ""
47 def aws_monthly_cost(value):
48 value_gb = value / (1024*1024*1024)
49 first_50tb = min(1024*50, value_gb)
50 next_450tb = max(min(1024*450, value_gb-1024*50), 0)
51 over_500tb = max(value_gb-1024*500, 0)
53 monthly_cost = (first_50tb * 0.023) + (next_450tb * 0.022) + (over_500tb * 0.021)
57 def format_with_suffix_base2(summary_value):
58 for scale in ["KiB", "MiB", "GiB", "TiB", "PiB", "EiB"]:
59 summary_value = summary_value / 1024
60 if summary_value < 1024:
61 return "%.3f %s" % (summary_value, scale)
63 def format_with_suffix_base10(summary_value):
64 for scale in ["KB", "MB", "GB", "TB", "PB", "EB"]:
65 summary_value = summary_value / 1000
66 if summary_value < 1000:
67 return "%.3f %s" % (summary_value, scale)
69 containers_graph = ('Concurrent running containers', 'containers')
70 storage_graph = ('Storage usage', 'used')
71 managed_graph = ('Data under management', 'managed')
74 def runtime_str(container_request, containers):
75 length = ciso8601.parse_datetime(containers[container_request["container_uuid"]]["finished_at"]) - ciso8601.parse_datetime(containers[container_request["container_uuid"]]["started_at"])
77 hours = length.days * 24 + (length.seconds // 3600)
78 minutes = (length.seconds // 60) % 60
79 seconds = length.seconds % 60
81 return "%i:%02i:%02i" % (hours, minutes, seconds)
83 def runtime_in_hours(runtime):
84 sp = runtime.split(":")
86 hours += float(sp[1]) / 60
87 hours += float(sp[2]) / 3600
90 def hours_to_runtime_str(frac_hours):
91 hours = math.floor(frac_hours)
92 minutes = (frac_hours - math.floor(frac_hours)) * 60.0
93 seconds = (minutes - math.floor(minutes)) * 60.0
95 return "%i:%02i:%02i" % (hours, minutes, seconds)
98 def csv_dateformat(datestr):
99 dt = ciso8601.parse_datetime(datestr)
100 return dt.strftime("%Y-%m-%d %H:%M:%S")
103 class ClusterActivityReport(object):
104 def __init__(self, prom_client):
105 self.arv_client = arvados.api()
106 self.prom_client = prom_client
107 self.cluster = self.arv_client.config()["ClusterID"]
109 self.active_users = set()
110 self.project_summary = {}
113 self.total_workflows = 0
114 self.storage_cost = 0
115 self.summary_fetched = False
118 def collect_graph(self, since, to, metric, resample_to, extra=None):
119 if not self.prom_client:
124 for series in get_metric_usage(self.prom_client, since, to, metric % self.cluster, resampleTo=resample_to):
125 for t in series.itertuples():
126 flatdata.append([t[0], t[1]])
132 def collect_storage_cost(self, timestamp, value):
133 self.storage_cost += aws_monthly_cost(value) / (30*24)
135 def html_report(self, since, to, exclude, include_workflow_steps):
136 """Get a cluster activity report for the desired time period,
137 returning a string containing the report as an HTML document."""
139 self.label = "Cluster report for %s from %s to %s" % (self.cluster, since.date(), to.date())
141 if not self.summary_fetched:
142 # If we haven't done it already, need to fetch everything
143 # from the API to collect summary stats (report_from_api
144 # calls collect_summary_stats on each row).
146 # Because it is a Python generator, we need call it in a
147 # loop to process all the rows. This method also yields
148 # each row which is used by a different function to create
149 # the CSV report, but for the HTML report we just discard
151 for row in self.report_from_api(since, to, include_workflow_steps, exclude):
154 container_cumulative_hours = 0
155 def collect_container_hours(timestamp, value):
156 nonlocal container_cumulative_hours
157 # resampled to 5 minute increments but we want
159 container_cumulative_hours += value / 12
161 logging.info("Getting container hours time series")
163 self.graphs[containers_graph] = self.collect_graph(since, to,
164 "arvados_dispatchcloud_containers_running{cluster='%s'}",
166 extra=collect_container_hours
169 logging.info("Getting data usage time series")
170 self.graphs[managed_graph] = self.collect_graph(since, to,
171 "arvados_keep_collection_bytes{cluster='%s'}", resample_to="60min")
173 self.graphs[storage_graph] = self.collect_graph(since, to,
174 "arvados_keep_total_bytes{cluster='%s'}", resample_to="60min",
175 extra=self.collect_storage_cost)
177 managed_data_now = None
178 storage_used_now = None
180 if len(self.graphs.get(managed_graph, [])) > 0:
181 managed_data_now = self.graphs[managed_graph][-1][1]
183 if len(self.graphs.get(storage_graph, [])) > 0:
184 storage_used_now = self.graphs[storage_graph][-1][1]
186 if managed_data_now and storage_used_now:
187 storage_cost = aws_monthly_cost(storage_used_now)
188 dedup_ratio = managed_data_now/storage_used_now
195 workbench = self.arv_client.config()["Services"]["Workbench2"]["ExternalURL"]
196 if workbench.endswith("/"):
197 workbench = workbench[:-1]
199 if to.date() == self.today():
200 # The deduplication ratio overstates things a bit, you can
201 # have collections which reference a small slice of a large
202 # block, and this messes up the intuitive value of this ratio
203 # and exagerates the effect.
205 # So for now, as much fun as this is, I'm excluding it from
208 # dedup_savings = aws_monthly_cost(managed_data_now) - storage_cost
209 # <tr><th>Monthly savings from storage deduplication</th> <td>${dedup_savings:,.2f}</td></tr>
212 if managed_data_now and storage_used_now:
214 <tr><th>Total data under management</th> <td>{managed_data_now}</td></tr>
215 <tr><th>Total storage usage</th> <td>{storage_used_now}</td></tr>
216 <tr><th>Deduplication ratio</th> <td>{dedup_ratio:.1f}</td></tr>
217 <tr><th>Approximate monthly storage cost</th> <td>${storage_cost:,.2f}</td></tr>
219 managed_data_now=format_with_suffix_base10(managed_data_now),
220 storage_used_now=format_with_suffix_base10(storage_used_now),
221 storage_cost=storage_cost,
222 dedup_ratio=dedup_ratio,
225 cards.append("""<h2>Cluster status as of {now}</h2>
226 <table class='aggtable'><tbody>
227 <tr><th><a href="{workbench}/users">Total users</a></th><td>{total_users}</td></tr>
228 <tr><th>Total projects</th><td>{total_projects}</td></tr>
231 <p>See <a href="#prices">note on usage and cost calculations</a> for details on how costs are calculated.</p>
232 """.format(now=self.today(),
233 total_users=self.total_users,
234 total_projects=self.total_projects,
236 data_rows=data_rows))
238 # We have a couple of options for getting total container hours
240 # total_hours=container_cumulative_hours
242 # calculates the sum from prometheus metrics
244 # total_hours=self.total_hours
246 # calculates the sum of the containers that were fetched
248 # The problem is these numbers tend not to match, especially
249 # if the report generation was not called with "include
252 # I decided to use the sum from containers fetched, because it
253 # will match the sum of compute time for each project listed
256 cards.append("""<h2>Activity and cost over the {reporting_days} day period {since} to {to}</h2>
257 <table class='aggtable'><tbody>
258 <tr><th>Active users</th> <td>{active_users}</td></tr>
259 <tr><th><a href="#Active_Projects">Active projects</a></th> <td>{active_projects}</td></tr>
260 <tr><th>Workflow runs</th> <td>{total_workflows:,}</td></tr>
261 <tr><th>Compute used</th> <td>{total_hours:,.1f} hours</td></tr>
262 <tr><th>Compute cost</th> <td>${total_cost:,.2f}</td></tr>
263 <tr><th>Storage cost</th> <td>${storage_cost:,.2f}</td></tr>
265 <p>See <a href="#prices">note on usage and cost calculations</a> for details on how costs are calculated.</p>
266 """.format(active_users=len(self.active_users),
267 total_users=self.total_users,
268 total_hours=self.total_hours,
269 total_cost=self.total_cost,
270 total_workflows=self.total_workflows,
271 active_projects=len(self.project_summary),
272 since=since.date(), to=to.date(),
273 reporting_days=(to - since).days,
274 storage_cost=self.storage_cost))
276 projectlist = sorted(self.project_summary.items(), key=lambda x: x[1].cost, reverse=True)
278 for k, prj in projectlist:
279 if prj.earliest.date() == prj.latest.date():
280 prj.activityspan = "{}".format(prj.earliest.date())
282 prj.activityspan = "{} to {}".format(prj.earliest.date(), prj.latest.date())
284 prj.tablerow = """<td>{users}</td> <td>{active}</td> <td>{hours:,.1f}</td> <td>${cost:,.2f}</td>""".format(
285 active=prj.activityspan,
288 users=", ".join(prj.users),
292 <div id="chart"></div>
297 <a id="Active_Projects"><h2>Active Projects</h2></a>
298 <table class='sortable active-projects'>
299 <thead><tr><th>Project</th> <th>Users</th> <th>Active</th> <th>Compute usage (hours)</th> <th>Compute cost</th> </tr></thead>
300 <tbody><tr>{projects}</tr></tbody>
302 <p>See <a href="#prices">note on usage and cost calculations</a> for details on how costs are calculated.</p>
303 """.format(projects="</tr>\n<tr>".join("""<td><a href="#{name}">{name}</a></td>{rest}""".format(name=prj.name, rest=prj.tablerow) for k, prj in projectlist)))
305 for k, prj in projectlist:
307 for k2, r in sorted(prj.runs.items(), key=lambda x: x[1].count, reverse=True):
309 <tr><td>{count}</td> <td>{workflowlink}</td> <td>{median_runtime}</td> <td>{mean_runtime}</td> <td>${median_cost:,.2f}</td> <td>${mean_cost:,.2f}</td> <td>${totalcost:,.2f}</td></tr>
312 mean_runtime=hours_to_runtime_str(statistics.mean(r.hours)),
313 median_runtime=hours_to_runtime_str(statistics.median(r.hours)),
314 mean_cost=statistics.mean(r.cost),
315 median_cost=statistics.median(r.cost),
316 totalcost=sum(r.cost),
317 workflowlink="""<a href="{workbench}/workflows/{uuid}">{name}</a>""".format(workbench=workbench,uuid=r.uuid,name=r.name)
318 if r.uuid != "none" else r.name))
321 """<a id="{name}"></a><a href="{workbench}/projects/{uuid}"><h2>{name}</h2></a>
323 <table class='sortable single-project'>
324 <thead><tr> <th>Users</th> <th>Active</th> <th>Compute usage (hours)</th> <th>Compute cost</th> </tr></thead>
325 <tbody><tr>{projectrow}</tr></tbody>
328 <table class='sortable project'>
329 <thead><tr><th>Workflow run count</th> <th>Workflow name</th> <th>Median runtime</th> <th>Mean runtime</th> <th>Median cost per run</th> <th>Mean cost per run</th> <th>Sum cost over runs</th></tr></thead>
333 """.format(name=prj.name,
334 wfsum=" ".join(wfsum),
335 projectrow=prj.tablerow,
340 # The deduplication ratio overstates things a bit, you can
341 # have collections which reference a small slice of a large
342 # block, and this messes up the intuitive value of this ratio
343 # and exagerates the effect.
345 # So for now, as much fun as this is, I'm excluding it from
348 # <p>"Monthly savings from storage deduplication" is the
349 # estimated cost difference between "storage usage" and "data
350 # under management" as a way of comparing with other
351 # technologies that do not support data deduplication.</p>
355 <h2 id="prices">Note on usage and cost calculations</h2>
357 <div style="max-width: 60em">
359 <p>The numbers presented in this report are estimates and will
360 not perfectly match your cloud bill. Nevertheless this report
361 should be useful for identifying your main cost drivers.</p>
365 <p>"Total data under management" is what you get if you add up
366 all blocks referenced by all collections in Workbench, without
367 considering deduplication.</p>
369 <p>"Total storage usage" is the actual underlying storage
370 usage, accounting for data deduplication.</p>
372 <p>Storage costs are based on AWS "S3 Standard"
373 described on the <a href="https://aws.amazon.com/s3/pricing/">Amazon S3 pricing</a> page:</p>
376 <li>$0.023 per GB / Month for the first 50 TB</li>
377 <li>$0.022 per GB / Month for the next 450 TB</li>
378 <li>$0.021 per GB / Month over 500 TB</li>
381 <p>Finally, this only the base storage cost, and does not
382 include any fees associated with S3 API usage. However, there
383 are generally no ingress/egress fees if your Arvados instance
384 and S3 bucket are in the same region, which is the normal
385 recommended configuration.</p>
389 <p>"Compute usage" are instance-hours used in running
390 workflows. Because multiple steps may run in parallel on
391 multiple instances, a workflow that completes in four hours
392 but runs parallel steps on five instances, would be reported
393 as using 20 instance hours.</p>
395 <p>"Runtime" is the actual wall clock time that it took to
396 complete a workflow. This does not include time spent in the
397 queue for the workflow itself, but does include queuing time
398 of individual workflow steps.</p>
400 <p>Computational costs are derived from Arvados cost
401 calculations of container runs. For on-demand instances, this
402 uses the prices from the InstanceTypes section of the Arvado
403 config file, set by the system administrator. For spot
404 instances, this uses current spot prices retrieved on the fly
407 <p>Be aware that the cost calculations are only for the time
408 the container is running and only do not take into account the
409 overhead of launching instances or idle time between scheduled
410 tasks or prior to automatic shutdown.</p>
415 return ReportChart(label, cards, self.graphs).html()
417 def iter_container_info(self, pending, include_steps, exclude):
418 # "pending" is a list of arvados-cwl-runner container requests
419 # returned by the API. This method fetches detailed
420 # information about the runs and yields report rows.
422 # 1. Get container records corresponding to container requests.
425 for container in arvados.util.keyset_list_all(
426 self.arv_client.containers().list,
428 ["uuid", "in", [c["container_uuid"] for c in pending if c["container_uuid"]]],
430 select=["uuid", "started_at", "finished_at", "cost"]):
432 containers[container["uuid"]] = container
434 # 2. Look for the template_uuid property and fetch the
435 # corresponding workflow record.
437 workflows["none"] = "workflow run from command line"
439 for wf in arvados.util.keyset_list_all(
440 self.arv_client.workflows().list,
442 ["uuid", "in", list(set(c["properties"]["template_uuid"]
444 if "template_uuid" in c["properties"] and c["properties"]["template_uuid"].startswith(self.arv_client.config()["ClusterID"])))],
446 select=["uuid", "name"]):
447 workflows[wf["uuid"]] = wf["name"]
449 # 3. Look at owner_uuid and fetch owning projects and users
452 for pr in arvados.util.keyset_list_all(
453 self.arv_client.groups().list,
455 ["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'j7d0g'))],
457 select=["uuid", "name"]):
458 projects[pr["uuid"]] = pr["name"]
460 # 4. Look at owner_uuid and modified_by_user_uuid and get user records
461 for pr in arvados.util.keyset_list_all(
462 self.arv_client.users().list,
464 ["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'tpzed')|set(c["modified_by_user_uuid"] for c in pending))],
466 select=["uuid", "full_name", "first_name", "last_name"]):
467 projects[pr["uuid"]] = pr["full_name"]
469 # 5. Optionally iterate over individual workflow steps.
471 name_regex = re.compile(r"(.+)_[0-9]+")
473 child_cr_containers = set()
476 # 5.1. Go through the container requests owned by the toplevel workflow container
477 logging.info("Getting workflow steps")
478 for cr in arvados.util.keyset_list_all(
479 self.arv_client.container_requests().list,
481 ["requesting_container_uuid", "in", list(containers.keys())],
483 select=["uuid", "name", "cumulative_cost", "requesting_container_uuid", "container_uuid"]):
485 if cr["cumulative_cost"] == 0:
488 g = name_regex.fullmatch(cr["name"])
492 # 5.2. Get the containers corresponding to the
493 # container requests. This has the same logic as
494 # report_from_api where we batch it into 1000 items at
496 child_crs.setdefault(cr["requesting_container_uuid"], []).append(cr)
497 child_cr_containers.add(cr["container_uuid"])
498 if len(child_cr_containers) == 1000:
499 stepcount += len(child_cr_containers)
500 for container in arvados.util.keyset_list_all(
501 self.arv_client.containers().list,
503 ["uuid", "in", list(child_cr_containers)],
505 select=["uuid", "started_at", "finished_at", "cost"]):
507 containers[container["uuid"]] = container
509 logging.info("Got workflow steps %s - %s", stepcount-len(child_cr_containers), stepcount)
510 child_cr_containers.clear()
512 # Get any remaining containers
513 if child_cr_containers:
514 stepcount += len(child_cr_containers)
515 for container in arvados.util.keyset_list_all(
516 self.arv_client.containers().list,
518 ["uuid", "in", list(child_cr_containers)],
520 select=["uuid", "started_at", "finished_at", "cost"]):
522 containers[container["uuid"]] = container
523 logging.info("Got workflow steps %s - %s", stepcount-len(child_cr_containers), stepcount)
525 # 6. Now go through the list of workflow runs, yield a row
526 # with all the information we have collected, as well as the
527 # details for each workflow step (if enabled)
528 for container_request in pending:
529 if not container_request["container_uuid"] or not containers[container_request["container_uuid"]]["started_at"] or not containers[container_request["container_uuid"]]["finished_at"]:
532 template_uuid = container_request["properties"].get("template_uuid", "none")
533 workflowname = container_request["name"] if template_uuid == "none" else workflows.get(template_uuid, template_uuid)
535 if exclude and re.search(exclude, workflowname, flags=re.IGNORECASE):
539 "Project": projects.get(container_request["owner_uuid"], "unknown owner"),
540 "ProjectUUID": container_request["owner_uuid"],
541 "Workflow": workflowname,
542 "WorkflowUUID": container_request["properties"].get("template_uuid", "none"),
543 "Step": "workflow runner",
544 "StepUUID": container_request["uuid"],
545 "Sample": container_request["name"],
546 "SampleUUID": container_request["uuid"],
547 "User": projects.get(container_request["modified_by_user_uuid"], "unknown user"),
548 "UserUUID": container_request["modified_by_user_uuid"],
549 "Submitted": csv_dateformat(container_request["created_at"]),
550 "Started": csv_dateformat(containers[container_request["container_uuid"]]["started_at"]),
551 "Finished": csv_dateformat(containers[container_request["container_uuid"]]["finished_at"]),
552 "Runtime": runtime_str(container_request, containers),
553 "Cost": round(containers[container_request["container_uuid"]]["cost"] if include_steps else container_request["cumulative_cost"], 3),
554 "CumulativeCost": round(container_request["cumulative_cost"], 3)
558 for child_cr in child_crs.get(container_request["container_uuid"], []):
559 if not child_cr["container_uuid"] or not containers[child_cr["container_uuid"]]["started_at"] or not containers[child_cr["container_uuid"]]["finished_at"]:
562 "Project": projects.get(container_request["owner_uuid"], "unknown owner"),
563 "ProjectUUID": container_request["owner_uuid"],
564 "Workflow": workflows.get(container_request["properties"].get("template_uuid", "none"), "workflow missing"),
565 "WorkflowUUID": container_request["properties"].get("template_uuid", "none"),
566 "Step": child_cr["name"],
567 "StepUUID": child_cr["uuid"],
568 "Sample": container_request["name"],
569 "SampleUUID": container_request["name"],
570 "User": projects.get(container_request["modified_by_user_uuid"], "unknown user"),
571 "UserUUID": container_request["modified_by_user_uuid"],
572 "Submitted": csv_dateformat(child_cr["created_at"]),
573 "Started": csv_dateformat(containers[child_cr["container_uuid"]]["started_at"]),
574 "Finished": csv_dateformat(containers[child_cr["container_uuid"]]["finished_at"]),
575 "Runtime": runtime_str(child_cr, containers),
576 "Cost": round(containers[child_cr["container_uuid"]]["cost"], 3),
577 "CumulativeCost": round(containers[child_cr["container_uuid"]]["cost"], 3),
581 def collect_summary_stats(self, row):
582 self.active_users.add(row["User"])
583 self.project_summary.setdefault(row["ProjectUUID"],
584 ProjectSummary(users=set(),
586 uuid=row["ProjectUUID"],
587 name=row["Project"]))
588 prj = self.project_summary[row["ProjectUUID"]]
592 prj.users.add(row["User"])
593 hrs = runtime_in_hours(row["Runtime"])
596 started = datetime.strptime(row["Started"], "%Y-%m-%d %H:%M:%S")
597 finished = datetime.strptime(row["Finished"], "%Y-%m-%d %H:%M:%S")
599 if started < prj.earliest:
600 prj.earliest = started
602 if finished > prj.latest:
603 prj.latest = finished
605 if row["Step"] == "workflow runner":
606 prj.runs.setdefault(row["Workflow"], WorkflowRunSummary(name=row["Workflow"],
607 uuid=row["WorkflowUUID"],
609 wfuuid = row["Workflow"]
610 prj.runs[wfuuid].count += 1
611 prj.runs[wfuuid].cost.append(row["CumulativeCost"])
612 prj.runs[wfuuid].hours.append(hrs)
613 self.total_workflows += 1
615 self.total_hours += hrs
616 self.total_cost += cost
618 def report_from_api(self, since, to, include_steps, exclude):
622 for container_request in arvados.util.keyset_list_all(
623 self.arv_client.container_requests().list,
625 ["command", "like", "[\"arvados-cwl-runner%"],
626 ["created_at", ">=", since.strftime("%Y%m%dT%H%M%SZ")],
627 ["created_at", "<=", to.strftime("%Y%m%dT%H%M%SZ")],
629 select=["uuid", "owner_uuid", "container_uuid", "name", "cumulative_cost", "properties", "modified_by_user_uuid", "created_at"]):
631 if container_request["cumulative_cost"] == 0:
634 # Every 1000 container requests, we fetch the
635 # corresponding container records.
637 # What's so special about 1000? Because that's the
638 # maximum Arvados page size, so when we use ['uuid', 'in',
639 # [...]] to fetch associated records it doesn't make sense
640 # to provide more than 1000 uuids.
642 # TODO: use the ?include=container_uuid feature so a
643 # separate request to the containers table isn't necessary.
644 if len(pending) < 1000:
645 pending.append(container_request)
647 count += len(pending)
648 logging.info("Exporting workflow runs %s - %s", count-len(pending), count)
649 for row in self.iter_container_info(pending, include_steps, exclude):
650 self.collect_summary_stats(row)
654 count += len(pending)
655 logging.info("Exporting workflow runs %s - %s", count-len(pending), count)
656 for row in self.iter_container_info(pending, include_steps, exclude):
657 self.collect_summary_stats(row)
660 userinfo = self.arv_client.users().list(filters=[["is_active", "=", True]], limit=0).execute()
661 self.total_users = userinfo["items_available"]
663 groupinfo = self.arv_client.groups().list(filters=[["group_class", "=", "project"]], limit=0).execute()
664 self.total_projects = groupinfo["items_available"]
666 def csv_report(self, since, to, out, include_steps, columns, exclude):
668 columns = columns.split(",")
671 columns = ("Project", "Workflow", "Step", "Sample", "User", "Submitted", "Runtime", "Cost")
673 columns = ("Project", "Workflow", "Sample", "User", "Submitted", "Runtime", "Cost")
675 csvwriter = csv.DictWriter(out, fieldnames=columns, extrasaction="ignore")
676 csvwriter.writeheader()
678 for row in self.report_from_api(since, to, include_steps, exclude):
679 csvwriter.writerow(row)
681 self.summary_fetched = True