2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
14 from datetime import date, datetime, timedelta
15 from typing import Dict, List
18 from dataclasses import dataclass
19 from arvados_cluster_activity.prometheus import get_metric_usage, get_data_usage
20 from arvados_cluster_activity.reportchart import ReportChart
24 class WorkflowRunSummary:
36 runs: Dict[str, WorkflowRunSummary]
37 earliest: datetime = datetime(year=9999, month=1, day=1)
38 latest: datetime = datetime(year=1900, month=1, day=1)
43 activityspan: str = ""
47 def aws_monthly_cost(value):
48 value_gb = value / (1024*1024*1024)
49 first_50tb = min(1024*50, value_gb)
50 next_450tb = max(min(1024*450, value_gb-1024*50), 0)
51 over_500tb = max(value_gb-1024*500, 0)
53 monthly_cost = (first_50tb * 0.023) + (next_450tb * 0.022) + (over_500tb * 0.021)
57 def format_with_suffix_base2(summary_value):
58 for scale in ["KiB", "MiB", "GiB", "TiB", "PiB", "EiB"]:
59 summary_value = summary_value / 1024
60 if summary_value < 1024:
61 return "%.3f %s" % (summary_value, scale)
63 def format_with_suffix_base10(summary_value):
64 for scale in ["KB", "MB", "GB", "TB", "PB", "EB"]:
65 summary_value = summary_value / 1000
66 if summary_value < 1000:
67 return "%.3f %s" % (summary_value, scale)
69 containers_graph = ('Concurrent running containers', 'containers')
70 storage_graph = ('Storage usage', 'used')
71 managed_graph = ('Data under management', 'managed')
74 def runtime_str(container_request, containers):
75 length = ciso8601.parse_datetime(containers[container_request["container_uuid"]]["finished_at"]) - ciso8601.parse_datetime(containers[container_request["container_uuid"]]["started_at"])
77 hours = length.days * 24 + (length.seconds // 3600)
78 minutes = (length.seconds // 60) % 60
79 seconds = length.seconds % 60
81 return "%i:%02i:%02i" % (hours, minutes, seconds)
83 def runtime_in_hours(runtime):
84 sp = runtime.split(":")
86 hours += float(sp[1]) / 60
87 hours += float(sp[2]) / 3600
90 def hours_to_runtime_str(frac_hours):
91 hours = math.floor(frac_hours)
92 minutes = (frac_hours - math.floor(frac_hours)) * 60.0
93 seconds = (minutes - math.floor(minutes)) * 60.0
95 return "%i:%02i:%02i" % (hours, minutes, seconds)
98 def csv_dateformat(datestr):
99 dt = ciso8601.parse_datetime(datestr)
100 return dt.strftime("%Y-%m-%d %H:%M:%S")
103 class ClusterActivityReport(object):
104 def __init__(self, prom_client):
105 self.arv_client = arvados.api()
106 self.prom_client = prom_client
107 self.cluster = self.arv_client.config()["ClusterID"]
109 self.active_users = set()
110 self.project_summary = {}
113 self.total_workflows = 0
114 self.storage_cost = 0
115 self.summary_fetched = False
118 def collect_graph(self, since, to, metric, resample_to, extra=None):
119 if not self.prom_client:
124 for series in get_metric_usage(self.prom_client, since, to, metric % self.cluster, resampleTo=resample_to):
125 for t in series.itertuples():
126 flatdata.append([t[0], t[1]])
132 def collect_storage_cost(self, timestamp, value):
133 self.storage_cost += aws_monthly_cost(value) / (30*24)
135 def html_report(self, since, to, exclude, include_workflow_steps):
136 """Get a cluster activity report for the desired time period,
137 returning a string containing the report as an HTML document."""
139 self.label = "Cluster report for %s from %s to %s" % (self.cluster, since.date(), to.date())
141 if not self.summary_fetched:
142 # If we haven't done it already, need to fetch everything
143 # from the API to collect summary stats (report_from_api
144 # calls collect_summary_stats on each row).
146 # Because it is a Python generator, we need call it in a
147 # loop to process all the rows. This method also yields
148 # each row which is used by a different function to create
149 # the CSV report, but for the HTML report we just discard
151 for row in self.report_from_api(since, to, include_workflow_steps, exclude):
154 container_cumulative_hours = 0
155 def collect_container_hours(timestamp, value):
156 nonlocal container_cumulative_hours
157 # resampled to 5 minute increments but we want
159 container_cumulative_hours += value / 12
161 logging.info("Getting container hours time series")
163 self.graphs[containers_graph] = self.collect_graph(since, to,
164 "arvados_dispatchcloud_containers_running{cluster='%s'}",
166 extra=collect_container_hours
169 logging.info("Getting data usage time series")
170 self.graphs[managed_graph] = self.collect_graph(since, to,
171 "arvados_keep_collection_bytes{cluster='%s'}", resample_to="60min")
173 self.graphs[storage_graph] = self.collect_graph(since, to,
174 "arvados_keep_total_bytes{cluster='%s'}", resample_to="60min",
175 extra=self.collect_storage_cost)
181 workbench = self.arv_client.config()["Services"]["Workbench2"]["ExternalURL"]
182 if workbench.endswith("/"):
183 workbench = workbench[:-1]
185 if to.date() == self.today():
186 # The deduplication ratio overstates things a bit, you can
187 # have collections which reference a small slice of a large
188 # block, and this messes up the intuitive value of this ratio
189 # and exagerates the effect.
191 # So for now, as much fun as this is, I'm excluding it from
194 # dedup_savings = aws_monthly_cost(managed_data_now) - storage_cost
195 # <tr><th>Monthly savings from storage deduplication</th> <td>${dedup_savings:,.2f}</td></tr>
198 if self.graphs[managed_graph] and self.graphs[storage_graph]:
199 managed_data_now = self.graphs[managed_graph][-1][1]
200 storage_used_now = self.graphs[storage_graph][-1][1]
202 <tr><th>Total data under management</th> <td>{managed_data_now}</td></tr>
203 <tr><th>Total storage usage</th> <td>{storage_used_now}</td></tr>
204 <tr><th>Deduplication ratio</th> <td>{dedup_ratio:.1f}</td></tr>
205 <tr><th>Approximate monthly storage cost</th> <td>${storage_cost:,.2f}</td></tr>
207 managed_data_now=format_with_suffix_base10(managed_data_now),
208 storage_used_now=format_with_suffix_base10(storage_used_now),
209 storage_cost=aws_monthly_cost(storage_used_now),
210 dedup_ratio=managed_data_now / storage_used_now,
213 cards.append("""<h2>Cluster status as of {now}</h2>
214 <table class='aggtable'><tbody>
215 <tr><th><a href="{workbench}/users">Total users</a></th><td>{total_users}</td></tr>
216 <tr><th>Total projects</th><td>{total_projects}</td></tr>
219 <p>See <a href="#prices">note on usage and cost calculations</a> for details on how costs are calculated.</p>
220 """.format(now=self.today(),
221 total_users=self.total_users,
222 total_projects=self.total_projects,
224 data_rows=data_rows))
226 # We have a couple of options for getting total container hours
228 # total_hours=container_cumulative_hours
230 # calculates the sum from prometheus metrics
232 # total_hours=self.total_hours
234 # calculates the sum of the containers that were fetched
236 # The problem is these numbers tend not to match, especially
237 # if the report generation was not called with "include
240 # I decided to use the sum from containers fetched, because it
241 # will match the sum of compute time for each project listed
244 cards.append("""<h2>Activity and cost over the {reporting_days} day period {since} to {to}</h2>
245 <table class='aggtable'><tbody>
246 <tr><th>Active users</th> <td>{active_users}</td></tr>
247 <tr><th><a href="#Active_Projects">Active projects</a></th> <td>{active_projects}</td></tr>
248 <tr><th>Workflow runs</th> <td>{total_workflows:,}</td></tr>
249 <tr><th>Compute used</th> <td>{total_hours:,.1f} hours</td></tr>
250 <tr><th>Compute cost</th> <td>${total_cost:,.2f}</td></tr>
251 <tr><th>Storage cost</th> <td>${storage_cost:,.2f}</td></tr>
253 <p>See <a href="#prices">note on usage and cost calculations</a> for details on how costs are calculated.</p>
254 """.format(active_users=len(self.active_users),
255 total_users=self.total_users,
256 total_hours=self.total_hours,
257 total_cost=self.total_cost,
258 total_workflows=self.total_workflows,
259 active_projects=len(self.project_summary),
260 since=since.date(), to=to.date(),
261 reporting_days=(to - since).days,
262 storage_cost=self.storage_cost))
264 projectlist = sorted(self.project_summary.items(), key=lambda x: x[1].cost, reverse=True)
266 for k, prj in projectlist:
267 if prj.earliest.date() == prj.latest.date():
268 prj.activityspan = "{}".format(prj.earliest.date())
270 prj.activityspan = "{} to {}".format(prj.earliest.date(), prj.latest.date())
272 prj.tablerow = """<td>{users}</td> <td>{active}</td> <td>{hours:,.1f}</td> <td>${cost:,.2f}</td>""".format(
273 active=prj.activityspan,
276 users=", ".join(prj.users),
279 if any(self.graphs.values()):
281 <div id="chart"></div>
286 <a id="Active_Projects"><h2>Active Projects</h2></a>
287 <table class='sortable active-projects'>
288 <thead><tr><th>Project</th> <th>Users</th> <th>Active</th> <th>Compute usage (hours)</th> <th>Compute cost</th> </tr></thead>
289 <tbody><tr>{projects}</tr></tbody>
291 <p>See <a href="#prices">note on usage and cost calculations</a> for details on how costs are calculated.</p>
292 """.format(projects="</tr>\n<tr>".join("""<td><a href="#{name}">{name}</a></td>{rest}""".format(name=prj.name, rest=prj.tablerow) for k, prj in projectlist)))
294 for k, prj in projectlist:
296 for k2, r in sorted(prj.runs.items(), key=lambda x: x[1].count, reverse=True):
298 <tr><td>{count}</td> <td>{workflowlink}</td> <td>{median_runtime}</td> <td>{mean_runtime}</td> <td>${median_cost:,.2f}</td> <td>${mean_cost:,.2f}</td> <td>${totalcost:,.2f}</td></tr>
301 mean_runtime=hours_to_runtime_str(statistics.mean(r.hours)),
302 median_runtime=hours_to_runtime_str(statistics.median(r.hours)),
303 mean_cost=statistics.mean(r.cost),
304 median_cost=statistics.median(r.cost),
305 totalcost=sum(r.cost),
306 workflowlink="""<a href="{workbench}/workflows/{uuid}">{name}</a>""".format(workbench=workbench,uuid=r.uuid,name=r.name)
307 if r.uuid != "none" else r.name))
310 """<a id="{name}"></a><a href="{workbench}/projects/{uuid}"><h2>{name}</h2></a>
312 <table class='sortable single-project'>
313 <thead><tr> <th>Users</th> <th>Active</th> <th>Compute usage (hours)</th> <th>Compute cost</th> </tr></thead>
314 <tbody><tr>{projectrow}</tr></tbody>
317 <table class='sortable project'>
318 <thead><tr><th>Workflow run count</th> <th>Workflow name</th> <th>Median runtime</th> <th>Mean runtime</th> <th>Median cost per run</th> <th>Mean cost per run</th> <th>Sum cost over runs</th></tr></thead>
322 """.format(name=prj.name,
323 wfsum=" ".join(wfsum),
324 projectrow=prj.tablerow,
329 # The deduplication ratio overstates things a bit, you can
330 # have collections which reference a small slice of a large
331 # block, and this messes up the intuitive value of this ratio
332 # and exagerates the effect.
334 # So for now, as much fun as this is, I'm excluding it from
337 # <p>"Monthly savings from storage deduplication" is the
338 # estimated cost difference between "storage usage" and "data
339 # under management" as a way of comparing with other
340 # technologies that do not support data deduplication.</p>
344 <h2 id="prices">Note on usage and cost calculations</h2>
346 <div style="max-width: 60em">
348 <p>The numbers presented in this report are estimates and will
349 not perfectly match your cloud bill. Nevertheless this report
350 should be useful for identifying your main cost drivers.</p>
354 <p>"Total data under management" is what you get if you add up
355 all blocks referenced by all collections in Workbench, without
356 considering deduplication.</p>
358 <p>"Total storage usage" is the actual underlying storage
359 usage, accounting for data deduplication.</p>
361 <p>Storage costs are based on AWS "S3 Standard"
362 described on the <a href="https://aws.amazon.com/s3/pricing/">Amazon S3 pricing</a> page:</p>
365 <li>$0.023 per GB / Month for the first 50 TB</li>
366 <li>$0.022 per GB / Month for the next 450 TB</li>
367 <li>$0.021 per GB / Month over 500 TB</li>
370 <p>Finally, this only the base storage cost, and does not
371 include any fees associated with S3 API usage. However, there
372 are generally no ingress/egress fees if your Arvados instance
373 and S3 bucket are in the same region, which is the normal
374 recommended configuration.</p>
378 <p>"Compute usage" are instance-hours used in running
379 workflows. Because multiple steps may run in parallel on
380 multiple instances, a workflow that completes in four hours
381 but runs parallel steps on five instances, would be reported
382 as using 20 instance hours.</p>
384 <p>"Runtime" is the actual wall clock time that it took to
385 complete a workflow. This does not include time spent in the
386 queue for the workflow itself, but does include queuing time
387 of individual workflow steps.</p>
389 <p>Computational costs are derived from Arvados cost
390 calculations of container runs. For on-demand instances, this
391 uses the prices from the InstanceTypes section of the Arvado
392 config file, set by the system administrator. For spot
393 instances, this uses current spot prices retrieved on the fly
396 <p>Be aware that the cost calculations are only for the time
397 the container is running and only do not take into account the
398 overhead of launching instances or idle time between scheduled
399 tasks or prior to automatic shutdown.</p>
404 return ReportChart(label, cards, self.graphs).html()
406 def iter_container_info(self, pending, include_steps, exclude):
407 # "pending" is a list of arvados-cwl-runner container requests
408 # returned by the API. This method fetches detailed
409 # information about the runs and yields report rows.
411 # 1. Get container records corresponding to container requests.
414 for container in arvados.util.keyset_list_all(
415 self.arv_client.containers().list,
417 ["uuid", "in", [c["container_uuid"] for c in pending if c["container_uuid"]]],
419 select=["uuid", "started_at", "finished_at", "cost"]):
421 containers[container["uuid"]] = container
423 # 2. Look for the template_uuid property and fetch the
424 # corresponding workflow record.
426 workflows["none"] = "workflow run from command line"
428 for wf in arvados.util.keyset_list_all(
429 self.arv_client.workflows().list,
431 ["uuid", "in", list(set(c["properties"]["template_uuid"]
433 if "template_uuid" in c["properties"] and c["properties"]["template_uuid"].startswith(self.arv_client.config()["ClusterID"])))],
435 select=["uuid", "name"]):
436 workflows[wf["uuid"]] = wf["name"]
438 # 3. Look at owner_uuid and fetch owning projects and users
441 for pr in arvados.util.keyset_list_all(
442 self.arv_client.groups().list,
444 ["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'j7d0g'))],
446 select=["uuid", "name"]):
447 projects[pr["uuid"]] = pr["name"]
449 # 4. Look at owner_uuid and modified_by_user_uuid and get user records
450 for pr in arvados.util.keyset_list_all(
451 self.arv_client.users().list,
453 ["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'tpzed')|set(c["modified_by_user_uuid"] for c in pending))],
455 select=["uuid", "full_name", "first_name", "last_name"]):
456 projects[pr["uuid"]] = pr["full_name"]
458 # 5. Optionally iterate over individual workflow steps.
460 name_regex = re.compile(r"(.+)_[0-9]+")
462 child_cr_containers = set()
465 # 5.1. Go through the container requests owned by the toplevel workflow container
466 logging.info("Getting workflow steps")
467 for cr in arvados.util.keyset_list_all(
468 self.arv_client.container_requests().list,
470 ["requesting_container_uuid", "in", list(containers.keys())],
472 select=["uuid", "name", "cumulative_cost", "requesting_container_uuid", "container_uuid"]):
474 if cr["cumulative_cost"] == 0:
477 g = name_regex.fullmatch(cr["name"])
481 # 5.2. Get the containers corresponding to the
482 # container requests. This has the same logic as
483 # report_from_api where we batch it into 1000 items at
485 child_crs.setdefault(cr["requesting_container_uuid"], []).append(cr)
486 child_cr_containers.add(cr["container_uuid"])
487 if len(child_cr_containers) == 1000:
488 stepcount += len(child_cr_containers)
489 for container in arvados.util.keyset_list_all(
490 self.arv_client.containers().list,
492 ["uuid", "in", list(child_cr_containers)],
494 select=["uuid", "started_at", "finished_at", "cost"]):
496 containers[container["uuid"]] = container
498 logging.info("Got workflow steps %s - %s", stepcount-len(child_cr_containers), stepcount)
499 child_cr_containers.clear()
501 # Get any remaining containers
502 if child_cr_containers:
503 stepcount += len(child_cr_containers)
504 for container in arvados.util.keyset_list_all(
505 self.arv_client.containers().list,
507 ["uuid", "in", list(child_cr_containers)],
509 select=["uuid", "started_at", "finished_at", "cost"]):
511 containers[container["uuid"]] = container
512 logging.info("Got workflow steps %s - %s", stepcount-len(child_cr_containers), stepcount)
514 # 6. Now go through the list of workflow runs, yield a row
515 # with all the information we have collected, as well as the
516 # details for each workflow step (if enabled)
517 for container_request in pending:
518 if not container_request["container_uuid"] or not containers[container_request["container_uuid"]]["started_at"] or not containers[container_request["container_uuid"]]["finished_at"]:
521 template_uuid = container_request["properties"].get("template_uuid", "none")
522 workflowname = container_request["name"] if template_uuid == "none" else workflows.get(template_uuid, template_uuid)
524 if exclude and re.search(exclude, workflowname, flags=re.IGNORECASE):
528 "Project": projects.get(container_request["owner_uuid"], "unknown owner"),
529 "ProjectUUID": container_request["owner_uuid"],
530 "Workflow": workflowname,
531 "WorkflowUUID": container_request["properties"].get("template_uuid", "none"),
532 "Step": "workflow runner",
533 "StepUUID": container_request["uuid"],
534 "Sample": container_request["name"],
535 "SampleUUID": container_request["uuid"],
536 "User": projects.get(container_request["modified_by_user_uuid"], "unknown user"),
537 "UserUUID": container_request["modified_by_user_uuid"],
538 "Submitted": csv_dateformat(container_request["created_at"]),
539 "Started": csv_dateformat(containers[container_request["container_uuid"]]["started_at"]),
540 "Finished": csv_dateformat(containers[container_request["container_uuid"]]["finished_at"]),
541 "Runtime": runtime_str(container_request, containers),
542 "Cost": round(containers[container_request["container_uuid"]]["cost"] if include_steps else container_request["cumulative_cost"], 3),
543 "CumulativeCost": round(container_request["cumulative_cost"], 3)
547 for child_cr in child_crs.get(container_request["container_uuid"], []):
548 if not child_cr["container_uuid"] or not containers[child_cr["container_uuid"]]["started_at"] or not containers[child_cr["container_uuid"]]["finished_at"]:
551 "Project": projects.get(container_request["owner_uuid"], "unknown owner"),
552 "ProjectUUID": container_request["owner_uuid"],
553 "Workflow": workflows.get(container_request["properties"].get("template_uuid", "none"), "workflow missing"),
554 "WorkflowUUID": container_request["properties"].get("template_uuid", "none"),
555 "Step": child_cr["name"],
556 "StepUUID": child_cr["uuid"],
557 "Sample": container_request["name"],
558 "SampleUUID": container_request["name"],
559 "User": projects.get(container_request["modified_by_user_uuid"], "unknown user"),
560 "UserUUID": container_request["modified_by_user_uuid"],
561 "Submitted": csv_dateformat(child_cr["created_at"]),
562 "Started": csv_dateformat(containers[child_cr["container_uuid"]]["started_at"]),
563 "Finished": csv_dateformat(containers[child_cr["container_uuid"]]["finished_at"]),
564 "Runtime": runtime_str(child_cr, containers),
565 "Cost": round(containers[child_cr["container_uuid"]]["cost"], 3),
566 "CumulativeCost": round(containers[child_cr["container_uuid"]]["cost"], 3),
570 def collect_summary_stats(self, row):
571 self.active_users.add(row["User"])
572 self.project_summary.setdefault(row["ProjectUUID"],
573 ProjectSummary(users=set(),
575 uuid=row["ProjectUUID"],
576 name=row["Project"]))
577 prj = self.project_summary[row["ProjectUUID"]]
581 prj.users.add(row["User"])
582 hrs = runtime_in_hours(row["Runtime"])
585 started = datetime.strptime(row["Started"], "%Y-%m-%d %H:%M:%S")
586 finished = datetime.strptime(row["Finished"], "%Y-%m-%d %H:%M:%S")
588 if started < prj.earliest:
589 prj.earliest = started
591 if finished > prj.latest:
592 prj.latest = finished
594 if row["Step"] == "workflow runner":
595 prj.runs.setdefault(row["Workflow"], WorkflowRunSummary(name=row["Workflow"],
596 uuid=row["WorkflowUUID"],
598 wfuuid = row["Workflow"]
599 prj.runs[wfuuid].count += 1
600 prj.runs[wfuuid].cost.append(row["CumulativeCost"])
601 prj.runs[wfuuid].hours.append(hrs)
602 self.total_workflows += 1
604 self.total_hours += hrs
605 self.total_cost += cost
607 def report_from_api(self, since, to, include_steps, exclude):
611 for container_request in arvados.util.keyset_list_all(
612 self.arv_client.container_requests().list,
614 ["command", "like", "[\"arvados-cwl-runner%"],
615 ["created_at", ">=", since.strftime("%Y%m%dT%H%M%SZ")],
616 ["created_at", "<=", to.strftime("%Y%m%dT%H%M%SZ")],
618 select=["uuid", "owner_uuid", "container_uuid", "name", "cumulative_cost", "properties", "modified_by_user_uuid", "created_at"]):
620 if container_request["cumulative_cost"] == 0:
623 # Every 1000 container requests, we fetch the
624 # corresponding container records.
626 # What's so special about 1000? Because that's the
627 # maximum Arvados page size, so when we use ['uuid', 'in',
628 # [...]] to fetch associated records it doesn't make sense
629 # to provide more than 1000 uuids.
631 # TODO: use the ?include=container_uuid feature so a
632 # separate request to the containers table isn't necessary.
633 if len(pending) < 1000:
634 pending.append(container_request)
636 count += len(pending)
637 logging.info("Exporting workflow runs %s - %s", count-len(pending), count)
638 for row in self.iter_container_info(pending, include_steps, exclude):
639 self.collect_summary_stats(row)
643 count += len(pending)
644 logging.info("Exporting workflow runs %s - %s", count-len(pending), count)
645 for row in self.iter_container_info(pending, include_steps, exclude):
646 self.collect_summary_stats(row)
649 userinfo = self.arv_client.users().list(filters=[["is_active", "=", True]], limit=0).execute()
650 self.total_users = userinfo["items_available"]
652 groupinfo = self.arv_client.groups().list(filters=[["group_class", "=", "project"]], limit=0).execute()
653 self.total_projects = groupinfo["items_available"]
655 def csv_report(self, since, to, out, include_steps, columns, exclude):
657 columns = columns.split(",")
660 columns = ("Project", "Workflow", "Step", "Sample", "User", "Submitted", "Runtime", "Cost")
662 columns = ("Project", "Workflow", "Sample", "User", "Submitted", "Runtime", "Cost")
664 csvwriter = csv.DictWriter(out, fieldnames=columns, extrasaction="ignore")
665 csvwriter.writeheader()
667 for row in self.report_from_api(since, to, include_steps, exclude):
668 csvwriter.writerow(row)
670 self.summary_fetched = True