21121: Optional prometheus support
[arvados.git] / tools / cluster-activity / arvados_cluster_activity / main.py
index 2fe008d67d275387b2e7fa57720b6cadd93bd799..e01b00230b20f9374d6b53f9b7b416034c41a3c3 100755 (executable)
@@ -12,20 +12,24 @@ import datetime
 import ciso8601
 import csv
 import os
-from prometheus_api_client.utils import parse_datetime
-from datetime import timedelta
-import pandas
+import logging
+import re
+
+from datetime import timedelta, timezone
 import base64
 
-from prometheus_api_client import PrometheusConnect, MetricsList, Metric
+prometheus_support = True
 
 def parse_arguments(arguments):
     arg_parser = argparse.ArgumentParser()
     arg_parser.add_argument('--start', help='Start date for the report in YYYY-MM-DD format (UTC)')
     arg_parser.add_argument('--end', help='End date for the report in YYYY-MM-DD format (UTC), default "now"')
     arg_parser.add_argument('--days', type=int, help='Number of days before "end" to start the report')
-    arg_parser.add_argument('--cluster', type=str, help='Cluster to query')
     arg_parser.add_argument('--cost-report-file', type=str, help='Export cost report to specified CSV file')
+    if prometheus_support:
+        arg_parser.add_argument('--cluster', type=str, help='Cluster to query for prometheus stats')
+        arg_parser.add_argument('--prometheus-auth', type=str, help='Authorization file with prometheus info')
+
     args = arg_parser.parse_args(arguments)
 
     if args.days and args.start:
@@ -51,7 +55,7 @@ def parse_arguments(arguments):
             print("\nError: end date must be in YYYY-MM-DD format")
             exit(1)
     else:
-        to = datetime.datetime.utcnow()
+        to = datetime.datetime.now(timezone.utc)
 
     if args.days:
         since = to - datetime.timedelta(days=args.days)
@@ -65,6 +69,13 @@ def parse_arguments(arguments):
             exit(1)
 
 
+    if prometheus_support and args.prometheus_auth:
+        with open(args.prometheus_auth, "rt") as f:
+            for line in f:
+                sp = line.strip().split("=")
+                if sp[0].startswith("PROMETHEUS_"):
+                    os.environ[sp[0]] = sp[1]
+
     return args, since, to
 
 def data_usage(prom, timestamp, cluster, label):
@@ -111,6 +122,10 @@ def data_usage(prom, timestamp, cluster, label):
 
 
 def container_usage(prom, start_time, end_time, metric, label, fn=None):
+    from prometheus_api_client.utils import parse_datetime
+    from prometheus_api_client import PrometheusConnect, MetricsList, Metric
+    import pandas
+
     start = start_time
     chunk_size = timedelta(days=1)
     cumulative = 0
@@ -151,6 +166,8 @@ def container_usage(prom, start_time, end_time, metric, label, fn=None):
     print(label % cumulative)
 
 def report_from_prometheus(cluster, since, to):
+    from prometheus_api_client import PrometheusConnect
+
     prom_host = os.environ.get("PROMETHEUS_HOST")
     prom_token = os.environ.get("PROMETHEUS_APIKEY")
     prom_user = os.environ.get("PROMETHEUS_USER")
@@ -186,9 +203,9 @@ def flush_containers(arv_client, csvwriter, pending):
     for container in arvados.util.keyset_list_all(
         arv_client.containers().list,
         filters=[
-            ["uuid", "in", [c["container_uuid"] for c in pending]],
+            ["uuid", "in", [c["container_uuid"] for c in pending if c["container_uuid"]]],
         ],
-        select=["uuid", "started_at", "finished_at"]):
+        select=["uuid", "started_at", "finished_at", "cost"]):
 
         containers[container["uuid"]] = container
 
@@ -198,7 +215,9 @@ def flush_containers(arv_client, csvwriter, pending):
     for wf in arvados.util.keyset_list_all(
             arv_client.workflows().list,
             filters=[
-                ["uuid", "in", [c["properties"]["template_uuid"] for c in pending if "template_uuid" in c["properties"]]],
+                ["uuid", "in", list(set(c["properties"]["template_uuid"]
+                                        for c in pending
+                                        if "template_uuid" in c["properties"] and c["properties"]["template_uuid"].startswith(arv_client.config()["ClusterID"])))],
             ],
             select=["uuid", "name"]):
         workflows[wf["uuid"]] = wf["name"]
@@ -208,7 +227,7 @@ def flush_containers(arv_client, csvwriter, pending):
     for pr in arvados.util.keyset_list_all(
             arv_client.groups().list,
             filters=[
-                ["uuid", "in", [c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'j7d0g']],
+                ["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'j7d0g'))],
             ],
             select=["uuid", "name"]):
         projects[pr["uuid"]] = pr["name"]
@@ -216,12 +235,30 @@ def flush_containers(arv_client, csvwriter, pending):
     for pr in arvados.util.keyset_list_all(
             arv_client.users().list,
             filters=[
-                ["uuid", "in", [c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'tpzed']],
+                ["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'tpzed')|set(c["modified_by_user_uuid"] for c in pending))],
             ],
             select=["uuid", "full_name", "first_name", "last_name"]):
         projects[pr["uuid"]] = pr["full_name"]
 
+    name_regex = re.compile(r"(.+)_[0-9]+")
+    child_crs = {}
+    for cr in arvados.util.keyset_list_all(
+        arv_client.container_requests().list,
+        filters=[
+            ["requesting_container_uuid", "in", list(containers.keys())],
+        ],
+        select=["uuid", "name", "cumulative_cost", "requesting_container_uuid", "container_uuid"]):
+
+        g = name_regex.fullmatch(cr["name"])
+        if g:
+            cr["name"] = g[1]
+
+        child_crs.setdefault(cr["requesting_container_uuid"], []).append(cr)
+
     for container_request in pending:
+        if not container_request["container_uuid"] or not containers[container_request["container_uuid"]]["started_at"] or not containers[container_request["container_uuid"]]["finished_at"]:
+            continue
+
         length = ciso8601.parse_datetime(containers[container_request["container_uuid"]]["finished_at"]) - ciso8601.parse_datetime(containers[container_request["container_uuid"]]["started_at"])
 
         hours = length.seconds // 3600
@@ -231,29 +268,41 @@ def flush_containers(arv_client, csvwriter, pending):
         csvwriter.writerow((
             projects.get(container_request["owner_uuid"], "unknown owner"),
             workflows.get(container_request["properties"].get("template_uuid", "none"), "workflow missing"),
+            "workflow runner",
             container_request["name"],
-            containers[container_request["container_uuid"]]["started_at"],
-            "%i:%02i:%02i:%02i" % (length.days, hours, minutes, seconds),
-            container_request["cumulative_cost"],
+            projects.get(container_request["modified_by_user_uuid"], "unknown user"),
+            container_request["created_at"],
+            #"%i:%02i:%02i:%02i" % (length.days, hours, minutes, seconds),
+            round(containers[container_request["container_uuid"]]["cost"], 3),
             ))
 
+        for child_cr in child_crs.get(container_request["container_uuid"], []):
+            csvwriter.writerow((
+                projects.get(container_request["owner_uuid"], "unknown owner"),
+                workflows.get(container_request["properties"].get("template_uuid", "none"), "workflow missing"),
+                child_cr["name"],
+                container_request["name"],
+                projects.get(container_request["modified_by_user_uuid"], "unknown user"),
+                child_cr["created_at"],
+                round(child_cr["cumulative_cost"], 3),
+                ))
+
 
 def report_from_api(since, to, out):
     arv_client = arvados.api()
 
     csvwriter = csv.writer(out)
-    csvwriter.writerow(("Project", "Workflow", "Sample", "Started", "Runtime", "Cost"))
+    csvwriter.writerow(("Project", "Workflow", "Step", "Sample", "User", "Submitted", "Cost"))
 
     pending = []
 
-    print(since.isoformat())
     for container_request in arvados.util.keyset_list_all(
             arv_client.container_requests().list,
             filters=[
                 ["command", "like", "[\"arvados-cwl-runner%"],
                 ["created_at", ">=", since.strftime("%Y%m%dT%H%M%SZ")],
             ],
-            select=["uuid", "owner_uuid", "container_uuid", "name", "cumulative_cost", "properties"]):
+            select=["uuid", "owner_uuid", "container_uuid", "name", "cumulative_cost", "properties", "modified_by_user_uuid", "created_at"]):
 
         if len(pending) < 1000:
             pending.append(container_request)
@@ -269,12 +318,20 @@ def main(arguments=None):
 
     args, since, to = parse_arguments(arguments)
 
-    if "PROMETHEUS_HOST" in os.environ:
-        report_from_prometheus(args.cluster, since, to)
+    if prometheus_support:
+        if "PROMETHEUS_HOST" in os.environ:
+            if args.cluster:
+                report_from_prometheus(args.cluster, since, to)
+            else:
+                logging.warn("--cluster not provided, not collecting activity from Prometheus")
+        else:
+            logging.warn("PROMETHEUS_HOST not found, not collecting activity from Prometheus")
 
     if args.cost_report_file:
         with open(args.cost_report_file, "wt") as f:
             report_from_api(since, to, f)
+    else:
+        logging.warn("--cost-report-file not provided, not writing cost report")
 
 if __name__ == "__main__":
     main()