#!/usr/bin/env python3
# Copyright (C) The Arvados Authors. All rights reserved.
#
# SPDX-License-Identifier: AGPL-3.0
import logging
import ciso8601
import arvados.util
import re
import csv
import math
import collections
import json
from datetime import date, datetime, timedelta
import pkg_resources
from dataclasses import dataclass
import crunchstat_summary.dygraphs
from crunchstat_summary.summarizer import Task
from arvados_cluster_activity.prometheus import get_metric_usage, get_data_usage
@dataclass
class WorkflowRunSummary:
name: str
uuid: str
count: int = 0
cost: float = 0
hours: float = 0
@dataclass
class ProjectSummary:
users: set
uuid: str
runs: dict[str, WorkflowRunSummary]
earliest: datetime = datetime(year=9999, month=1, day=1)
latest: datetime = datetime(year=1900, month=1, day=1)
name: str = ""
cost: float = 0
count: int = 0
hours: float = 0
activityspan: str = ""
tablerow: str = ""
@dataclass
class Summarizer:
label: str
tasks: collections.defaultdict[str, Task]
def long_label(self):
return self.label
def date_export(item):
if isinstance(item, datetime):
return """@new Date("{}")@""".format(item.strftime("%Y-%m-%dT%H:%M:%SZ"))
def aws_monthly_cost(value):
value_gb = value / (1024*1024*1024)
first_50tb = min(1024*50, value_gb)
next_450tb = max(min(1024*450, value_gb-1024*50), 0)
over_500tb = max(value_gb-1024*500, 0)
monthly_cost = (first_50tb * 0.023) + (next_450tb * 0.022) + (over_500tb * 0.021)
return monthly_cost
def format_with_suffix_base2(summary_value):
for scale in ["KiB", "MiB", "GiB", "TiB", "PiB", "EiB"]:
summary_value = summary_value / 1024
if summary_value < 1024:
return "%.3f %s" % (summary_value, scale)
def format_with_suffix_base10(summary_value):
for scale in ["KB", "MB", "GB", "TB", "PB", "EB"]:
summary_value = summary_value / 1000
if summary_value < 1000:
return "%.3f %s" % (summary_value, scale)
class ReportChart(crunchstat_summary.dygraphs.DygraphsChart):
def sections(self):
return [
{
'label': s.long_label(),
'charts': [
self.chartdata(s.label, s.tasks, stat)
for stat in (('Concurrent running containers', ['containers']),
('Data under management', ['actual storage used']),
)
],
}
for s in self.summarizers]
def js(self):
return 'var chartdata = {};\n{}'.format(
json.dumps(self.sections(), default=date_export).replace('"@', '').replace('@"', '').replace('\\"', '"'),
'\n'.join([pkg_resources.resource_string('crunchstat_summary', jsa).decode('utf-8') for jsa in self.JSASSETS]))
def _collate_data(self, tasks, stats):
data = []
nulls = []
# uuid is category for crunch2
for uuid, task in tasks.items():
# All stats in a category are assumed to have the same time base and same number of samples
category = stats[0]
series_names = stats[1]
sn0 = series_names[0]
series = task.series[(category,sn0)]
for i in range(len(series)):
pt = series[i]
vals = [task.series[(category,stat)][i][1] for stat in series_names[1:]]
data.append([pt[0]] + nulls + [pt[1]] + vals)
nulls.append(None)
return sorted(data)
WEBCHART_CLASS = ReportChart
def runtime_str(container_request, containers):
length = ciso8601.parse_datetime(containers[container_request["container_uuid"]]["finished_at"]) - ciso8601.parse_datetime(containers[container_request["container_uuid"]]["started_at"])
hours = length.days * 24 + (length.seconds // 3600)
minutes = (length.seconds // 60) % 60
seconds = length.seconds % 60
return "%i:%02i:%02i" % (hours, minutes, seconds)
def runtime_in_hours(runtime):
sp = runtime.split(":")
hours = float(sp[0])
hours += float(sp[1]) / 60
hours += float(sp[2]) / 3600
return hours
def hours_to_runtime_str(frac_hours):
hours = math.floor(frac_hours)
minutes = (frac_hours - math.floor(frac_hours)) * 60.0
seconds = (minutes - math.floor(minutes)) * 60.0
return "%i:%02i:%02i" % (hours, minutes, seconds)
def csv_dateformat(datestr):
dt = ciso8601.parse_datetime(datestr)
return dt.strftime("%Y-%m-%d %H:%M:%S")
class ClusterActivityReport(object):
def __init__(self, prom_client, label=None, threads=1, **kwargs):
self.threadcount = threads
self.arv_client = arvados.api()
self.prom_client = prom_client
self.cluster = self.arv_client.config()["ClusterID"]
self.active_users = set()
self.project_summary = {}
self.total_hours = 0
self.total_cost = 0
self.total_workflows = 0
self.summarizers = []
self.storage_cost = 0
def run(self):
pass
def collect_graph(self, s1, since, to, taskname, legend, metric, resampleTo, extra=None):
if not self.prom_client:
return
task = s1.tasks[taskname]
for series in get_metric_usage(self.prom_client, since, to, metric % self.cluster, resampleTo=resampleTo):
for t in series.itertuples():
task.series[taskname, legend].append(t)
if extra:
extra(t)
def collect_storage_cost(self, t):
self.storage_cost += aws_monthly_cost(t.y) / (30*24)
def html_report(self, since, to, exclude):
self.label = "Cluster report for %s from %s to %s" % (self.cluster, since.date(), to.date())
for row in self.report_from_api(since, to, True, exclude):
pass
logging.info("Getting container hours time series")
s1 = Summarizer(label="", tasks=collections.defaultdict(Task))
self.collect_graph(s1, since, to, "Concurrent running containers", "containers",
"arvados_dispatchcloud_containers_running{cluster='%s'}", resampleTo="5min")
logging.info("Getting data usage time series")
s2 = Summarizer(label="", tasks=collections.defaultdict(Task))
self.collect_graph(s2, since, to, "Data under management", "managed",
"arvados_keep_collection_bytes{cluster='%s'}", resampleTo="60min")
self.collect_graph(s2, since, to, "Data under management", "actual storage used",
"arvados_keep_total_bytes{cluster='%s'}", resampleTo="60min", extra=self.collect_storage_cost)
managed_data_now = s2.tasks["Data under management"].series["Data under management","managed"][-1]
storage_used_now = s2.tasks["Data under management"].series["Data under management","actual storage used"][-1]
storage_cost = aws_monthly_cost(storage_used_now.y)
dedup_ratio = managed_data_now.y/storage_used_now.y
dedup_savings = aws_monthly_cost(managed_data_now.y) - storage_cost
self.summarizers = [s1, s2]
tophtml = ""
bottomhtml = ""
label = self.label
tophtml = []
workbench = self.arv_client.config()["Services"]["Workbench2"]["ExternalURL"]
if workbench.endswith("/"):
workbench = workbench[:-1]
if to.date() == date.today():
tophtml.append("""
Cluster status as of {now}
Total users | {total_users} |
Total projects | {total_projects} |
Total data under management | {managed_data_now} |
Total storage usage | {storage_used_now} |
Deduplication ratio | {dedup_ratio:.1f} |
Approximate monthly storage cost | ${storage_cost:,.2f} |
Monthly savings from storage deduplication | ${dedup_savings:,.2f} |
""".format(now=date.today(),
total_users=self.total_users,
total_projects=self.total_projects,
managed_data_now=format_with_suffix_base10(managed_data_now.y),
storage_used_now=format_with_suffix_base10(storage_used_now.y),
dedup_savings=dedup_savings,
storage_cost=storage_cost,
dedup_ratio=dedup_ratio,
workbench=workbench))
tophtml.append("""Activity and cost over the {reporting_days} day period {since} to {to}
Active users | {active_users} |
Active projects | {active_projects} |
Workflow runs | {total_workflows:,} |
Compute used | {total_hours:,.1f} hours |
Compute cost | ${total_cost:,.2f} |
Storage cost | ${storage_cost:,.2f} |
""".format(active_users=len(self.active_users),
total_users=self.total_users,
total_hours=self.total_hours,
total_cost=self.total_cost,
total_workflows=self.total_workflows,
active_projects=len(self.project_summary),
since=since.date(), to=to.date(),
reporting_days=(to - since).days,
storage_cost=self.storage_cost))
bottomhtml = []
projectlist = sorted(self.project_summary.items(), key=lambda x: x[1].cost, reverse=True)
for k, prj in projectlist:
if prj.earliest.date() == prj.latest.date():
prj.activityspan = "{}".format(prj.earliest.date())
else:
prj.activityspan = "{} to {}".format(prj.earliest.date(), prj.latest.date())
prj.tablerow = """{users} | {active} | {hours:,.1f} | ${cost:,.2f} | """.format(
name=prj.name,
active=prj.activityspan,
cost=prj.cost,
hours=prj.hours,
users=", ".join(prj.users),
)
bottomhtml.append(
"""
Active Projects
Project | Users | Active | Compute usage (hours) | Compute cost |
{projects}
""".format(projects="\n".join("""{name} | {rest}""".format(name=prj.name, rest=prj.tablerow) for k, prj in projectlist)))
for k, prj in projectlist:
wfsum = []
for k2, r in sorted(prj.runs.items(), key=lambda x: x[1].count, reverse=True):
wfsum.append("""
{count} | {workflowlink} | {runtime} | ${cost:,.2f} |
""".format(
count=r.count,
runtime=hours_to_runtime_str(r.hours/r.count),
cost=r.cost/r.count,
workflowlink="""{name}""".format(workbench=workbench,uuid=r.uuid,name=r.name)
if r.uuid != "none" else r.name))
#
# Users | Active | Compute usage | Compute cost |
# {projectrow}
#
bottomhtml.append(
"""{name}
Workflow run count | Workflow name | Avg runtime | Avg cost per run |
{wfsum}
""".format(name=prj.name,
users=", ".join(prj.users),
cost=prj.cost,
hours=prj.hours,
wfsum=" ".join(wfsum),
earliest=prj.earliest.date(),
latest=prj.latest.date(),
activity=prj.activityspan,
userplural='s' if len(prj.users) > 1 else '',
projectrow=prj.tablerow,
workbench=workbench,
uuid=prj.uuid)
)
return WEBCHART_CLASS(label, self.summarizers).html(tophtml, bottomhtml)
def flush_containers(self, pending, include_steps, exclude):
containers = {}
for container in arvados.util.keyset_list_all(
self.arv_client.containers().list,
filters=[
["uuid", "in", [c["container_uuid"] for c in pending if c["container_uuid"]]],
],
select=["uuid", "started_at", "finished_at", "cost"]):
containers[container["uuid"]] = container
workflows = {}
workflows["none"] = "workflow run from command line"
for wf in arvados.util.keyset_list_all(
self.arv_client.workflows().list,
filters=[
["uuid", "in", list(set(c["properties"]["template_uuid"]
for c in pending
if "template_uuid" in c["properties"] and c["properties"]["template_uuid"].startswith(self.arv_client.config()["ClusterID"])))],
],
select=["uuid", "name"]):
workflows[wf["uuid"]] = wf["name"]
projects = {}
for pr in arvados.util.keyset_list_all(
self.arv_client.groups().list,
filters=[
["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'j7d0g'))],
],
select=["uuid", "name"]):
projects[pr["uuid"]] = pr["name"]
for pr in arvados.util.keyset_list_all(
self.arv_client.users().list,
filters=[
["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'tpzed')|set(c["modified_by_user_uuid"] for c in pending))],
],
select=["uuid", "full_name", "first_name", "last_name"]):
projects[pr["uuid"]] = pr["full_name"]
if include_steps:
name_regex = re.compile(r"(.+)_[0-9]+")
child_crs = {}
child_cr_containers = set()
stepcount = 0
logging.info("Getting workflow steps")
for cr in arvados.util.keyset_list_all(
self.arv_client.container_requests().list,
filters=[
["requesting_container_uuid", "in", list(containers.keys())],
],
select=["uuid", "name", "cumulative_cost", "requesting_container_uuid", "container_uuid"]):
if cr["cumulative_cost"] == 0:
continue
g = name_regex.fullmatch(cr["name"])
if g:
cr["name"] = g[1]
child_crs.setdefault(cr["requesting_container_uuid"], []).append(cr)
child_cr_containers.add(cr["container_uuid"])
if len(child_cr_containers) == 1000:
stepcount += len(child_cr_containers)
for container in arvados.util.keyset_list_all(
self.arv_client.containers().list,
filters=[
["uuid", "in", list(child_cr_containers)],
],
select=["uuid", "started_at", "finished_at", "cost"]):
containers[container["uuid"]] = container
logging.info("Got workflow steps %s - %s", stepcount-len(child_cr_containers), stepcount)
child_cr_containers.clear()
if child_cr_containers:
stepcount += len(child_cr_containers)
for container in arvados.util.keyset_list_all(
self.arv_client.containers().list,
filters=[
["uuid", "in", list(child_cr_containers)],
],
select=["uuid", "started_at", "finished_at", "cost"]):
containers[container["uuid"]] = container
logging.info("Got workflow steps %s - %s", stepcount-len(child_cr_containers), stepcount)
for container_request in pending:
if not container_request["container_uuid"] or not containers[container_request["container_uuid"]]["started_at"] or not containers[container_request["container_uuid"]]["finished_at"]:
continue
template_uuid = container_request["properties"].get("template_uuid", "none")
workflowname = container_request["name"] if template_uuid == "none" else workflows.get(template_uuid, "workflow missing")
if exclude and re.search(exclude, workflowname, flags=re.IGNORECASE):
continue
yield {
"Project": projects.get(container_request["owner_uuid"], "unknown owner"),
"ProjectUUID": container_request["owner_uuid"],
"Workflow": workflowname,
"WorkflowUUID": container_request["properties"].get("template_uuid", "none"),
"Step": "workflow runner",
"StepUUID": container_request["uuid"],
"Sample": container_request["name"],
"SampleUUID": container_request["uuid"],
"User": projects.get(container_request["modified_by_user_uuid"], "unknown user"),
"UserUUID": container_request["modified_by_user_uuid"],
"Submitted": csv_dateformat(container_request["created_at"]),
"Started": csv_dateformat(containers[container_request["container_uuid"]]["started_at"]),
"Finished": csv_dateformat(containers[container_request["container_uuid"]]["finished_at"]),
"Runtime": runtime_str(container_request, containers),
"Cost": round(containers[container_request["container_uuid"]]["cost"] if include_steps else container_request["cumulative_cost"], 3),
"CumulativeCost": round(container_request["cumulative_cost"], 3)
}
if include_steps:
for child_cr in child_crs.get(container_request["container_uuid"], []):
if not child_cr["container_uuid"] or not containers[child_cr["container_uuid"]]["started_at"] or not containers[child_cr["container_uuid"]]["finished_at"]:
continue
yield {
"Project": projects.get(container_request["owner_uuid"], "unknown owner"),
"ProjectUUID": container_request["owner_uuid"],
"Workflow": workflows.get(container_request["properties"].get("template_uuid", "none"), "workflow missing"),
"WorkflowUUID": container_request["properties"].get("template_uuid", "none"),
"Step": child_cr["name"],
"StepUUID": child_cr["uuid"],
"Sample": container_request["name"],
"SampleUUID": container_request["name"],
"User": projects.get(container_request["modified_by_user_uuid"], "unknown user"),
"UserUUID": container_request["modified_by_user_uuid"],
"Submitted": csv_dateformat(child_cr["created_at"]),
"Started": csv_dateformat(containers[child_cr["container_uuid"]]["started_at"]),
"Finished": csv_dateformat(containers[child_cr["container_uuid"]]["finished_at"]),
"Runtime": runtime_str(child_cr, containers),
"Cost": round(containers[child_cr["container_uuid"]]["cost"], 3),
"CumulativeCost": round(containers[child_cr["container_uuid"]]["cost"], 3),
}
def collect_summary_stats(self, row):
self.active_users.add(row["User"])
self.project_summary.setdefault(row["ProjectUUID"],
ProjectSummary(users=set(),
runs={},
uuid=row["ProjectUUID"],
name=row["Project"]))
prj = self.project_summary[row["ProjectUUID"]]
cost = row["Cost"]
prj.cost += cost
prj.count += 1
prj.users.add(row["User"])
hrs = runtime_in_hours(row["Runtime"])
prj.hours += hrs
started = datetime.strptime(row["Started"], "%Y-%m-%d %H:%M:%S")
finished = datetime.strptime(row["Finished"], "%Y-%m-%d %H:%M:%S")
if started < prj.earliest:
prj.earliest = started
if finished > prj.latest:
prj.latest = finished
if row["Step"] == "workflow runner":
prj.runs.setdefault(row["Workflow"], WorkflowRunSummary(name=row["Workflow"],
uuid=row["WorkflowUUID"]))
wfuuid = row["Workflow"]
prj.runs[wfuuid].count += 1
prj.runs[wfuuid].cost += row["CumulativeCost"]
prj.runs[wfuuid].hours += hrs
self.total_workflows += 1
self.total_hours += hrs
self.total_cost += cost
def report_from_api(self, since, to, include_steps, exclude):
pending = []
count = 0
for container_request in arvados.util.keyset_list_all(
self.arv_client.container_requests().list,
filters=[
["command", "like", "[\"arvados-cwl-runner%"],
["created_at", ">=", since.strftime("%Y%m%dT%H%M%SZ")],
],
select=["uuid", "owner_uuid", "container_uuid", "name", "cumulative_cost", "properties", "modified_by_user_uuid", "created_at"]):
if container_request["cumulative_cost"] == 0:
continue
if len(pending) < 1000:
pending.append(container_request)
else:
count += len(pending)
logging.info("Exporting workflow runs %s - %s", count-len(pending), count)
for row in self.flush_containers(pending, include_steps, exclude):
self.collect_summary_stats(row)
yield row
pending.clear()
count += len(pending)
logging.info("Exporting workflow runs %s - %s", count-len(pending), count)
for row in self.flush_containers(pending, include_steps, exclude):
self.collect_summary_stats(row)
yield row
userinfo = self.arv_client.users().list(filters=[["is_active", "=", True]], limit=0).execute()
self.total_users = userinfo["items_available"]
groupinfo = self.arv_client.groups().list(filters=[["group_class", "=", "project"]], limit=0).execute()
self.total_projects = groupinfo["items_available"]
def csv_report(self, since, to, out, include_steps, columns, exclude):
if columns:
columns = columns.split(",")
else:
if include_steps:
columns = ("Project", "Workflow", "Step", "Sample", "User", "Submitted", "Runtime", "Cost")
else:
columns = ("Project", "Workflow", "Sample", "User", "Submitted", "Runtime", "Cost")
csvwriter = csv.DictWriter(out, fieldnames=columns, extrasaction="ignore")
csvwriter.writeheader()
for row in self.report_from_api(since, to, include_steps, exclude):
csvwriter.writerow(row)