#!/usr/bin/env python3 # Copyright (C) The Arvados Authors. All rights reserved. # # SPDX-License-Identifier: AGPL-3.0 import logging import ciso8601 import arvados.util import re import csv import math import collections import json from datetime import date, datetime, timedelta from typing import List import statistics from dataclasses import dataclass from arvados_cluster_activity.prometheus import get_metric_usage, get_data_usage from arvados_cluster_activity.reportchart import ReportChart @dataclass class WorkflowRunSummary: name: str uuid: str cost: List[float] hours: List[float] count: int = 0 @dataclass class ProjectSummary: users: set uuid: str runs: dict[str, WorkflowRunSummary] earliest: datetime = datetime(year=9999, month=1, day=1) latest: datetime = datetime(year=1900, month=1, day=1) name: str = "" cost: float = 0 count: int = 0 hours: float = 0 activityspan: str = "" tablerow: str = "" def aws_monthly_cost(value): value_gb = value / (1024*1024*1024) first_50tb = min(1024*50, value_gb) next_450tb = max(min(1024*450, value_gb-1024*50), 0) over_500tb = max(value_gb-1024*500, 0) monthly_cost = (first_50tb * 0.023) + (next_450tb * 0.022) + (over_500tb * 0.021) return monthly_cost def format_with_suffix_base2(summary_value): for scale in ["KiB", "MiB", "GiB", "TiB", "PiB", "EiB"]: summary_value = summary_value / 1024 if summary_value < 1024: return "%.3f %s" % (summary_value, scale) def format_with_suffix_base10(summary_value): for scale in ["KB", "MB", "GB", "TB", "PB", "EB"]: summary_value = summary_value / 1000 if summary_value < 1000: return "%.3f %s" % (summary_value, scale) containers_graph = ('Concurrent running containers', 'containers') storage_graph = ('Storage usage', 'used') managed_graph = ('Data under management', 'managed') def runtime_str(container_request, containers): length = ciso8601.parse_datetime(containers[container_request["container_uuid"]]["finished_at"]) - ciso8601.parse_datetime(containers[container_request["container_uuid"]]["started_at"]) hours = length.days * 24 + (length.seconds // 3600) minutes = (length.seconds // 60) % 60 seconds = length.seconds % 60 return "%i:%02i:%02i" % (hours, minutes, seconds) def runtime_in_hours(runtime): sp = runtime.split(":") hours = float(sp[0]) hours += float(sp[1]) / 60 hours += float(sp[2]) / 3600 return hours def hours_to_runtime_str(frac_hours): hours = math.floor(frac_hours) minutes = (frac_hours - math.floor(frac_hours)) * 60.0 seconds = (minutes - math.floor(minutes)) * 60.0 return "%i:%02i:%02i" % (hours, minutes, seconds) def csv_dateformat(datestr): dt = ciso8601.parse_datetime(datestr) return dt.strftime("%Y-%m-%d %H:%M:%S") class ClusterActivityReport(object): def __init__(self, prom_client): self.arv_client = arvados.api() self.prom_client = prom_client self.cluster = self.arv_client.config()["ClusterID"] self.active_users = set() self.project_summary = {} self.total_hours = 0 self.total_cost = 0 self.total_workflows = 0 self.storage_cost = 0 self.summary_fetched = False self.graphs = {} def collect_graph(self, since, to, metric, resample_to, extra=None): if not self.prom_client: return flatdata = [] for series in get_metric_usage(self.prom_client, since, to, metric % self.cluster, resampleTo=resample_to): for t in series.itertuples(): flatdata.append([t[0], t[1]]) if extra: extra(t[0], t[1]) return flatdata def collect_storage_cost(self, timestamp, value): self.storage_cost += aws_monthly_cost(value) / (30*24) def html_report(self, since, to, exclude, include_workflow_steps): """Get a cluster activity report for the desired time period, returning a string containing the report as an HTML document.""" self.label = "Cluster report for %s from %s to %s" % (self.cluster, since.date(), to.date()) if not self.summary_fetched: # If we haven't done it already, need to fetch everything # from the API to collect summary stats (report_from_api # calls collect_summary_stats on each row). # # Because it is a Python generator, we need call it in a # loop to process all the rows. This method also yields # each row which is used by a different function to create # the CSV report, but for the HTML report we just discard # them. for row in self.report_from_api(since, to, include_workflow_steps, exclude): pass container_cumulative_hours = 0 def collect_container_hours(timestamp, value): nonlocal container_cumulative_hours # resampled to 5 minute increments but we want # a sum of hours container_cumulative_hours += value / 12 logging.info("Getting container hours time series") self.graphs[containers_graph] = self.collect_graph(since, to, "arvados_dispatchcloud_containers_running{cluster='%s'}", resample_to="5min", extra=collect_container_hours ) logging.info("Getting data usage time series") self.graphs[managed_graph] = self.collect_graph(since, to, "arvados_keep_collection_bytes{cluster='%s'}", resample_to="60min") self.graphs[storage_graph] = self.collect_graph(since, to, "arvados_keep_total_bytes{cluster='%s'}", resample_to="60min", extra=self.collect_storage_cost) managed_data_now = None storage_used_now = None if len(self.graphs.get(managed_graph, [])) > 0: managed_data_now = self.graphs[managed_graph][-1][1] if len(self.graphs.get(storage_graph, [])) > 0: storage_used_now = self.graphs[storage_graph][-1][1] if managed_data_now and storage_used_now: storage_cost = aws_monthly_cost(storage_used_now) dedup_ratio = managed_data_now/storage_used_now label = self.label cards = [] workbench = self.arv_client.config()["Services"]["Workbench2"]["ExternalURL"] if workbench.endswith("/"): workbench = workbench[:-1] if to.date() == self.today(): # The deduplication ratio overstates things a bit, you can # have collections which reference a small slice of a large # block, and this messes up the intuitive value of this ratio # and exagerates the effect. # # So for now, as much fun as this is, I'm excluding it from # the report. # # dedup_savings = aws_monthly_cost(managed_data_now) - storage_cost # Monthly savings from storage deduplication ${dedup_savings:,.2f} data_rows = "" if managed_data_now and storage_used_now: data_rows = """ Total data under management {managed_data_now} Total storage usage {storage_used_now} Deduplication ratio {dedup_ratio:.1f} Approximate monthly storage cost ${storage_cost:,.2f} """.format( managed_data_now=format_with_suffix_base10(managed_data_now), storage_used_now=format_with_suffix_base10(storage_used_now), storage_cost=storage_cost, dedup_ratio=dedup_ratio, ) cards.append("""

Cluster status as of {now}

{data_rows}
Total users{total_users}
Total projects{total_projects}

See note on usage and cost calculations for details on how costs are calculated.

""".format(now=self.today(), total_users=self.total_users, total_projects=self.total_projects, workbench=workbench, data_rows=data_rows)) # We have a couple of options for getting total container hours # # total_hours=container_cumulative_hours # # calculates the sum from prometheus metrics # # total_hours=self.total_hours # # calculates the sum of the containers that were fetched # # The problem is these numbers tend not to match, especially # if the report generation was not called with "include # workflow steps". # # I decided to use the sum from containers fetched, because it # will match the sum of compute time for each project listed # in the report. cards.append("""

Activity and cost over the {reporting_days} day period {since} to {to}

Active users {active_users}
Active projects {active_projects}
Workflow runs {total_workflows:,}
Compute used {total_hours:,.1f} hours
Compute cost ${total_cost:,.2f}
Storage cost ${storage_cost:,.2f}

See note on usage and cost calculations for details on how costs are calculated.

""".format(active_users=len(self.active_users), total_users=self.total_users, total_hours=self.total_hours, total_cost=self.total_cost, total_workflows=self.total_workflows, active_projects=len(self.project_summary), since=since.date(), to=to.date(), reporting_days=(to - since).days, storage_cost=self.storage_cost)) projectlist = sorted(self.project_summary.items(), key=lambda x: x[1].cost, reverse=True) for k, prj in projectlist: if prj.earliest.date() == prj.latest.date(): prj.activityspan = "{}".format(prj.earliest.date()) else: prj.activityspan = "{} to {}".format(prj.earliest.date(), prj.latest.date()) prj.tablerow = """{users} {active} {hours:,.1f} ${cost:,.2f}""".format( active=prj.activityspan, cost=prj.cost, hours=prj.hours, users=", ".join(prj.users), ) cards.append("""
""") cards.append( """

Active Projects

{projects}
Project Users Active Compute usage (hours) Compute cost

See note on usage and cost calculations for details on how costs are calculated.

""".format(projects="\n".join("""{name}{rest}""".format(name=prj.name, rest=prj.tablerow) for k, prj in projectlist))) for k, prj in projectlist: wfsum = [] for k2, r in sorted(prj.runs.items(), key=lambda x: x[1].count, reverse=True): wfsum.append(""" {count} {workflowlink} {median_runtime} {mean_runtime} ${median_cost:,.2f} ${mean_cost:,.2f} ${totalcost:,.2f} """.format( count=r.count, mean_runtime=hours_to_runtime_str(statistics.mean(r.hours)), median_runtime=hours_to_runtime_str(statistics.median(r.hours)), mean_cost=statistics.mean(r.cost), median_cost=statistics.median(r.cost), totalcost=sum(r.cost), workflowlink="""{name}""".format(workbench=workbench,uuid=r.uuid,name=r.name) if r.uuid != "none" else r.name)) cards.append( """

{name}

{projectrow}
Users Active Compute usage (hours) Compute cost
{wfsum}
Workflow run count Workflow name Median runtime Mean runtime Median cost per run Mean cost per run Sum cost over runs
""".format(name=prj.name, wfsum=" ".join(wfsum), projectrow=prj.tablerow, workbench=workbench, uuid=prj.uuid) ) # The deduplication ratio overstates things a bit, you can # have collections which reference a small slice of a large # block, and this messes up the intuitive value of this ratio # and exagerates the effect. # # So for now, as much fun as this is, I'm excluding it from # the report. # #

"Monthly savings from storage deduplication" is the # estimated cost difference between "storage usage" and "data # under management" as a way of comparing with other # technologies that do not support data deduplication.

cards.append("""

Note on usage and cost calculations

The numbers presented in this report are estimates and will not perfectly match your cloud bill. Nevertheless this report should be useful for identifying your main cost drivers.

Storage

"Total data under management" is what you get if you add up all blocks referenced by all collections in Workbench, without considering deduplication.

"Total storage usage" is the actual underlying storage usage, accounting for data deduplication.

Storage costs are based on AWS "S3 Standard" described on the Amazon S3 pricing page:

Finally, this only the base storage cost, and does not include any fees associated with S3 API usage. However, there are generally no ingress/egress fees if your Arvados instance and S3 bucket are in the same region, which is the normal recommended configuration.

Compute

"Compute usage" are instance-hours used in running workflows. Because multiple steps may run in parallel on multiple instances, a workflow that completes in four hours but runs parallel steps on five instances, would be reported as using 20 instance hours.

"Runtime" is the actual wall clock time that it took to complete a workflow. This does not include time spent in the queue for the workflow itself, but does include queuing time of individual workflow steps.

Computational costs are derived from Arvados cost calculations of container runs. For on-demand instances, this uses the prices from the InstanceTypes section of the Arvado config file, set by the system administrator. For spot instances, this uses current spot prices retrieved on the fly the AWS API.

Be aware that the cost calculations are only for the time the container is running and only do not take into account the overhead of launching instances or idle time between scheduled tasks or prior to automatic shutdown.

""") return ReportChart(label, cards, self.graphs).html() def iter_container_info(self, pending, include_steps, exclude): # "pending" is a list of arvados-cwl-runner container requests # returned by the API. This method fetches detailed # information about the runs and yields report rows. # 1. Get container records corresponding to container requests. containers = {} for container in arvados.util.keyset_list_all( self.arv_client.containers().list, filters=[ ["uuid", "in", [c["container_uuid"] for c in pending if c["container_uuid"]]], ], select=["uuid", "started_at", "finished_at", "cost"]): containers[container["uuid"]] = container # 2. Look for the template_uuid property and fetch the # corresponding workflow record. workflows = {} workflows["none"] = "workflow run from command line" for wf in arvados.util.keyset_list_all( self.arv_client.workflows().list, filters=[ ["uuid", "in", list(set(c["properties"]["template_uuid"] for c in pending if "template_uuid" in c["properties"] and c["properties"]["template_uuid"].startswith(self.arv_client.config()["ClusterID"])))], ], select=["uuid", "name"]): workflows[wf["uuid"]] = wf["name"] # 3. Look at owner_uuid and fetch owning projects and users projects = {} for pr in arvados.util.keyset_list_all( self.arv_client.groups().list, filters=[ ["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'j7d0g'))], ], select=["uuid", "name"]): projects[pr["uuid"]] = pr["name"] # 4. Look at owner_uuid and modified_by_user_uuid and get user records for pr in arvados.util.keyset_list_all( self.arv_client.users().list, filters=[ ["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'tpzed')|set(c["modified_by_user_uuid"] for c in pending))], ], select=["uuid", "full_name", "first_name", "last_name"]): projects[pr["uuid"]] = pr["full_name"] # 5. Optionally iterate over individual workflow steps. if include_steps: name_regex = re.compile(r"(.+)_[0-9]+") child_crs = {} child_cr_containers = set() stepcount = 0 # 5.1. Go through the container requests owned by the toplevel workflow container logging.info("Getting workflow steps") for cr in arvados.util.keyset_list_all( self.arv_client.container_requests().list, filters=[ ["requesting_container_uuid", "in", list(containers.keys())], ], select=["uuid", "name", "cumulative_cost", "requesting_container_uuid", "container_uuid"]): if cr["cumulative_cost"] == 0: continue g = name_regex.fullmatch(cr["name"]) if g: cr["name"] = g[1] # 5.2. Get the containers corresponding to the # container requests. This has the same logic as # report_from_api where we batch it into 1000 items at # a time. child_crs.setdefault(cr["requesting_container_uuid"], []).append(cr) child_cr_containers.add(cr["container_uuid"]) if len(child_cr_containers) == 1000: stepcount += len(child_cr_containers) for container in arvados.util.keyset_list_all( self.arv_client.containers().list, filters=[ ["uuid", "in", list(child_cr_containers)], ], select=["uuid", "started_at", "finished_at", "cost"]): containers[container["uuid"]] = container logging.info("Got workflow steps %s - %s", stepcount-len(child_cr_containers), stepcount) child_cr_containers.clear() # Get any remaining containers if child_cr_containers: stepcount += len(child_cr_containers) for container in arvados.util.keyset_list_all( self.arv_client.containers().list, filters=[ ["uuid", "in", list(child_cr_containers)], ], select=["uuid", "started_at", "finished_at", "cost"]): containers[container["uuid"]] = container logging.info("Got workflow steps %s - %s", stepcount-len(child_cr_containers), stepcount) # 6. Now go through the list of workflow runs, yield a row # with all the information we have collected, as well as the # details for each workflow step (if enabled) for container_request in pending: if not container_request["container_uuid"] or not containers[container_request["container_uuid"]]["started_at"] or not containers[container_request["container_uuid"]]["finished_at"]: continue template_uuid = container_request["properties"].get("template_uuid", "none") workflowname = container_request["name"] if template_uuid == "none" else workflows.get(template_uuid, template_uuid) if exclude and re.search(exclude, workflowname, flags=re.IGNORECASE): continue yield { "Project": projects.get(container_request["owner_uuid"], "unknown owner"), "ProjectUUID": container_request["owner_uuid"], "Workflow": workflowname, "WorkflowUUID": container_request["properties"].get("template_uuid", "none"), "Step": "workflow runner", "StepUUID": container_request["uuid"], "Sample": container_request["name"], "SampleUUID": container_request["uuid"], "User": projects.get(container_request["modified_by_user_uuid"], "unknown user"), "UserUUID": container_request["modified_by_user_uuid"], "Submitted": csv_dateformat(container_request["created_at"]), "Started": csv_dateformat(containers[container_request["container_uuid"]]["started_at"]), "Finished": csv_dateformat(containers[container_request["container_uuid"]]["finished_at"]), "Runtime": runtime_str(container_request, containers), "Cost": round(containers[container_request["container_uuid"]]["cost"] if include_steps else container_request["cumulative_cost"], 3), "CumulativeCost": round(container_request["cumulative_cost"], 3) } if include_steps: for child_cr in child_crs.get(container_request["container_uuid"], []): if not child_cr["container_uuid"] or not containers[child_cr["container_uuid"]]["started_at"] or not containers[child_cr["container_uuid"]]["finished_at"]: continue yield { "Project": projects.get(container_request["owner_uuid"], "unknown owner"), "ProjectUUID": container_request["owner_uuid"], "Workflow": workflows.get(container_request["properties"].get("template_uuid", "none"), "workflow missing"), "WorkflowUUID": container_request["properties"].get("template_uuid", "none"), "Step": child_cr["name"], "StepUUID": child_cr["uuid"], "Sample": container_request["name"], "SampleUUID": container_request["name"], "User": projects.get(container_request["modified_by_user_uuid"], "unknown user"), "UserUUID": container_request["modified_by_user_uuid"], "Submitted": csv_dateformat(child_cr["created_at"]), "Started": csv_dateformat(containers[child_cr["container_uuid"]]["started_at"]), "Finished": csv_dateformat(containers[child_cr["container_uuid"]]["finished_at"]), "Runtime": runtime_str(child_cr, containers), "Cost": round(containers[child_cr["container_uuid"]]["cost"], 3), "CumulativeCost": round(containers[child_cr["container_uuid"]]["cost"], 3), } def collect_summary_stats(self, row): self.active_users.add(row["User"]) self.project_summary.setdefault(row["ProjectUUID"], ProjectSummary(users=set(), runs={}, uuid=row["ProjectUUID"], name=row["Project"])) prj = self.project_summary[row["ProjectUUID"]] cost = row["Cost"] prj.cost += cost prj.count += 1 prj.users.add(row["User"]) hrs = runtime_in_hours(row["Runtime"]) prj.hours += hrs started = datetime.strptime(row["Started"], "%Y-%m-%d %H:%M:%S") finished = datetime.strptime(row["Finished"], "%Y-%m-%d %H:%M:%S") if started < prj.earliest: prj.earliest = started if finished > prj.latest: prj.latest = finished if row["Step"] == "workflow runner": prj.runs.setdefault(row["Workflow"], WorkflowRunSummary(name=row["Workflow"], uuid=row["WorkflowUUID"], cost=[], hours=[])) wfuuid = row["Workflow"] prj.runs[wfuuid].count += 1 prj.runs[wfuuid].cost.append(row["CumulativeCost"]) prj.runs[wfuuid].hours.append(hrs) self.total_workflows += 1 self.total_hours += hrs self.total_cost += cost def report_from_api(self, since, to, include_steps, exclude): pending = [] count = 0 for container_request in arvados.util.keyset_list_all( self.arv_client.container_requests().list, filters=[ ["command", "like", "[\"arvados-cwl-runner%"], ["created_at", ">=", since.strftime("%Y%m%dT%H%M%SZ")], ["created_at", "<=", to.strftime("%Y%m%dT%H%M%SZ")], ], select=["uuid", "owner_uuid", "container_uuid", "name", "cumulative_cost", "properties", "modified_by_user_uuid", "created_at"]): if container_request["cumulative_cost"] == 0: continue # Every 1000 container requests, we fetch the # corresponding container records. # # What's so special about 1000? Because that's the # maximum Arvados page size, so when we use ['uuid', 'in', # [...]] to fetch associated records it doesn't make sense # to provide more than 1000 uuids. # # TODO: use the ?include=container_uuid feature so a # separate request to the containers table isn't necessary. if len(pending) < 1000: pending.append(container_request) else: count += len(pending) logging.info("Exporting workflow runs %s - %s", count-len(pending), count) for row in self.iter_container_info(pending, include_steps, exclude): self.collect_summary_stats(row) yield row pending.clear() count += len(pending) logging.info("Exporting workflow runs %s - %s", count-len(pending), count) for row in self.iter_container_info(pending, include_steps, exclude): self.collect_summary_stats(row) yield row userinfo = self.arv_client.users().list(filters=[["is_active", "=", True]], limit=0).execute() self.total_users = userinfo["items_available"] groupinfo = self.arv_client.groups().list(filters=[["group_class", "=", "project"]], limit=0).execute() self.total_projects = groupinfo["items_available"] def csv_report(self, since, to, out, include_steps, columns, exclude): if columns: columns = columns.split(",") else: if include_steps: columns = ("Project", "Workflow", "Step", "Sample", "User", "Submitted", "Runtime", "Cost") else: columns = ("Project", "Workflow", "Sample", "User", "Submitted", "Runtime", "Cost") csvwriter = csv.DictWriter(out, fieldnames=columns, extrasaction="ignore") csvwriter.writeheader() for row in self.report_from_api(since, to, include_steps, exclude): csvwriter.writerow(row) self.summary_fetched = True def today(self): return date.today()