21121: Optional prometheus support
[arvados.git] / tools / cluster-activity / arvados_cluster_activity / main.py
1 #!/usr/bin/env python3
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 import argparse
7 import sys
8
9 import arvados
10 import arvados.util
11 import datetime
12 import ciso8601
13 import csv
14 import os
15 import logging
16 import re
17
18 from datetime import timedelta, timezone
19 import base64
20
21 prometheus_support = True
22
23 def parse_arguments(arguments):
24     arg_parser = argparse.ArgumentParser()
25     arg_parser.add_argument('--start', help='Start date for the report in YYYY-MM-DD format (UTC)')
26     arg_parser.add_argument('--end', help='End date for the report in YYYY-MM-DD format (UTC), default "now"')
27     arg_parser.add_argument('--days', type=int, help='Number of days before "end" to start the report')
28     arg_parser.add_argument('--cost-report-file', type=str, help='Export cost report to specified CSV file')
29     if prometheus_support:
30         arg_parser.add_argument('--cluster', type=str, help='Cluster to query for prometheus stats')
31         arg_parser.add_argument('--prometheus-auth', type=str, help='Authorization file with prometheus info')
32
33     args = arg_parser.parse_args(arguments)
34
35     if args.days and args.start:
36         arg_parser.print_help()
37         print("Error: either specify --days or both --start and --end")
38         exit(1)
39
40     if not args.days and not args.start:
41         arg_parser.print_help()
42         print("\nError: either specify --days or both --start and --end")
43         exit(1)
44
45     if (args.start and not args.end):
46         arg_parser.print_help()
47         print("\nError: no start or end date found, either specify --days or both --start and --end")
48         exit(1)
49
50     if args.end:
51         try:
52             to = datetime.datetime.strptime(args.end,"%Y-%m-%d")
53         except:
54             arg_parser.print_help()
55             print("\nError: end date must be in YYYY-MM-DD format")
56             exit(1)
57     else:
58         to = datetime.datetime.now(timezone.utc)
59
60     if args.days:
61         since = to - datetime.timedelta(days=args.days)
62
63     if args.start:
64         try:
65             since = datetime.datetime.strptime(args.start,"%Y-%m-%d")
66         except:
67             arg_parser.print_help()
68             print("\nError: start date must be in YYYY-MM-DD format")
69             exit(1)
70
71
72     if prometheus_support and args.prometheus_auth:
73         with open(args.prometheus_auth, "rt") as f:
74             for line in f:
75                 sp = line.strip().split("=")
76                 if sp[0].startswith("PROMETHEUS_"):
77                     os.environ[sp[0]] = sp[1]
78
79     return args, since, to
80
81 def data_usage(prom, timestamp, cluster, label):
82     metric_data = prom.get_current_metric_value(metric_name='arvados_keep_total_bytes',
83                                                 label_config={"cluster": cluster},
84                                                 params={"time": timestamp.timestamp()})
85
86     metric_object_list = MetricsList(metric_data)
87
88     if len(metric_data) == 0:
89         return
90
91     my_metric_object = metric_object_list[0] # one of the metrics from the list
92     value = my_metric_object.metric_values.iloc[0]["y"]
93     summary_value = value
94
95     metric_data = prom.get_current_metric_value(metric_name='arvados_keep_dedup_byte_ratio',
96                                                 label_config={"cluster": cluster},
97                                                 params={"time": timestamp.timestamp()})
98
99     if len(metric_data) == 0:
100         return
101
102     my_metric_object = MetricsList(metric_data)[0]
103     dedup_ratio = my_metric_object.metric_values.iloc[0]["y"]
104
105     value_gb = value / (1024*1024*1024)
106     first_50tb = min(1024*50, value_gb)
107     next_450tb = max(min(1024*450, value_gb-1024*50), 0)
108     over_500tb = max(value_gb-1024*500, 0)
109
110     monthly_cost = (first_50tb * 0.023) + (next_450tb * 0.022) + (over_500tb * 0.021)
111
112     for scale in ["KiB", "MiB", "GiB", "TiB", "PiB"]:
113         summary_value = summary_value / 1024
114         if summary_value < 1024:
115             print(label,
116                   "%.3f %s apparent," % (summary_value*dedup_ratio, scale),
117                   "%.3f %s actually stored," % (summary_value, scale),
118                   "$%.2f monthly S3 storage cost" % monthly_cost)
119             break
120
121
122
123
124 def container_usage(prom, start_time, end_time, metric, label, fn=None):
125     from prometheus_api_client.utils import parse_datetime
126     from prometheus_api_client import PrometheusConnect, MetricsList, Metric
127     import pandas
128
129     start = start_time
130     chunk_size = timedelta(days=1)
131     cumulative = 0
132
133     while start < end_time:
134         if start + chunk_size > end_time:
135             chunk_size = end_time - start
136
137         metric_data = prom.custom_query_range(metric,
138                                               start_time=start,
139                                               end_time=(start + chunk_size),
140                                               step=15
141                                               )
142
143         if len(metric_data) == 0:
144             break
145
146         if "__name__" not in metric_data[0]["metric"]:
147             metric_data[0]["metric"]["__name__"] = metric
148
149         metric_object_list = MetricsList(metric_data)
150         my_metric_object = metric_object_list[0] # one of the metrics from the list
151
152         series = my_metric_object.metric_values.set_index(pandas.DatetimeIndex(my_metric_object.metric_values['ds']))
153
154         # Resample to 1 minute increments, fill in missing values
155         rs = series.resample("min").mean(1).ffill()
156
157         # Calculate the sum of values
158         #print(rs.sum()["y"])
159         cumulative += rs.sum()["y"]
160
161         start += chunk_size
162
163     if fn is not None:
164         cumulative = fn(cumulative)
165
166     print(label % cumulative)
167
168 def report_from_prometheus(cluster, since, to):
169     from prometheus_api_client import PrometheusConnect
170
171     prom_host = os.environ.get("PROMETHEUS_HOST")
172     prom_token = os.environ.get("PROMETHEUS_APIKEY")
173     prom_user = os.environ.get("PROMETHEUS_USER")
174     prom_pw = os.environ.get("PROMETHEUS_PASSWORD")
175
176     headers = {}
177     if prom_token:
178         headers["Authorization"] = "Bearer %s" % prom_token
179
180     if prom_user:
181         headers["Authorization"] = "Basic %s" % str(base64.b64encode(bytes("%s:%s" % (prom_user, prom_pw), 'utf-8')), 'utf-8')
182
183     prom = PrometheusConnect(url=prom_host, headers=headers)
184
185     print(cluster, "between", since, "and", to, "timespan", (to-since))
186
187     try:
188         data_usage(prom, since, cluster, "at start:")
189     except:
190         pass
191     try:
192         data_usage(prom, to - timedelta(minutes=240), cluster, "current :")
193     except:
194         pass
195
196     container_usage(prom, since, to, "arvados_dispatchcloud_containers_running{cluster='%s'}" % cluster, '%.1f container hours', lambda x: x/60)
197     container_usage(prom, since, to, "sum(arvados_dispatchcloud_instances_price{cluster='%s'})" % cluster, '$%.2f spent on compute', lambda x: x/60)
198     print()
199
200 def flush_containers(arv_client, csvwriter, pending):
201     containers = {}
202
203     for container in arvados.util.keyset_list_all(
204         arv_client.containers().list,
205         filters=[
206             ["uuid", "in", [c["container_uuid"] for c in pending if c["container_uuid"]]],
207         ],
208         select=["uuid", "started_at", "finished_at", "cost"]):
209
210         containers[container["uuid"]] = container
211
212     workflows = {}
213     workflows["none"] = "workflow run from command line"
214
215     for wf in arvados.util.keyset_list_all(
216             arv_client.workflows().list,
217             filters=[
218                 ["uuid", "in", list(set(c["properties"]["template_uuid"]
219                                         for c in pending
220                                         if "template_uuid" in c["properties"] and c["properties"]["template_uuid"].startswith(arv_client.config()["ClusterID"])))],
221             ],
222             select=["uuid", "name"]):
223         workflows[wf["uuid"]] = wf["name"]
224
225     projects = {}
226
227     for pr in arvados.util.keyset_list_all(
228             arv_client.groups().list,
229             filters=[
230                 ["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'j7d0g'))],
231             ],
232             select=["uuid", "name"]):
233         projects[pr["uuid"]] = pr["name"]
234
235     for pr in arvados.util.keyset_list_all(
236             arv_client.users().list,
237             filters=[
238                 ["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'tpzed')|set(c["modified_by_user_uuid"] for c in pending))],
239             ],
240             select=["uuid", "full_name", "first_name", "last_name"]):
241         projects[pr["uuid"]] = pr["full_name"]
242
243     name_regex = re.compile(r"(.+)_[0-9]+")
244     child_crs = {}
245     for cr in arvados.util.keyset_list_all(
246         arv_client.container_requests().list,
247         filters=[
248             ["requesting_container_uuid", "in", list(containers.keys())],
249         ],
250         select=["uuid", "name", "cumulative_cost", "requesting_container_uuid", "container_uuid"]):
251
252         g = name_regex.fullmatch(cr["name"])
253         if g:
254             cr["name"] = g[1]
255
256         child_crs.setdefault(cr["requesting_container_uuid"], []).append(cr)
257
258     for container_request in pending:
259         if not container_request["container_uuid"] or not containers[container_request["container_uuid"]]["started_at"] or not containers[container_request["container_uuid"]]["finished_at"]:
260             continue
261
262         length = ciso8601.parse_datetime(containers[container_request["container_uuid"]]["finished_at"]) - ciso8601.parse_datetime(containers[container_request["container_uuid"]]["started_at"])
263
264         hours = length.seconds // 3600
265         minutes = (length.seconds // 60) % 60
266         seconds = length.seconds % 60
267
268         csvwriter.writerow((
269             projects.get(container_request["owner_uuid"], "unknown owner"),
270             workflows.get(container_request["properties"].get("template_uuid", "none"), "workflow missing"),
271             "workflow runner",
272             container_request["name"],
273             projects.get(container_request["modified_by_user_uuid"], "unknown user"),
274             container_request["created_at"],
275             #"%i:%02i:%02i:%02i" % (length.days, hours, minutes, seconds),
276             round(containers[container_request["container_uuid"]]["cost"], 3),
277             ))
278
279         for child_cr in child_crs.get(container_request["container_uuid"], []):
280             csvwriter.writerow((
281                 projects.get(container_request["owner_uuid"], "unknown owner"),
282                 workflows.get(container_request["properties"].get("template_uuid", "none"), "workflow missing"),
283                 child_cr["name"],
284                 container_request["name"],
285                 projects.get(container_request["modified_by_user_uuid"], "unknown user"),
286                 child_cr["created_at"],
287                 round(child_cr["cumulative_cost"], 3),
288                 ))
289
290
291 def report_from_api(since, to, out):
292     arv_client = arvados.api()
293
294     csvwriter = csv.writer(out)
295     csvwriter.writerow(("Project", "Workflow", "Step", "Sample", "User", "Submitted", "Cost"))
296
297     pending = []
298
299     for container_request in arvados.util.keyset_list_all(
300             arv_client.container_requests().list,
301             filters=[
302                 ["command", "like", "[\"arvados-cwl-runner%"],
303                 ["created_at", ">=", since.strftime("%Y%m%dT%H%M%SZ")],
304             ],
305             select=["uuid", "owner_uuid", "container_uuid", "name", "cumulative_cost", "properties", "modified_by_user_uuid", "created_at"]):
306
307         if len(pending) < 1000:
308             pending.append(container_request)
309         else:
310             flush_containers(arv_client, csvwriter, pending)
311             pending.clear()
312
313     flush_containers(arv_client, csvwriter, pending)
314
315 def main(arguments=None):
316     if arguments is None:
317         arguments = sys.argv[1:]
318
319     args, since, to = parse_arguments(arguments)
320
321     if prometheus_support:
322         if "PROMETHEUS_HOST" in os.environ:
323             if args.cluster:
324                 report_from_prometheus(args.cluster, since, to)
325             else:
326                 logging.warn("--cluster not provided, not collecting activity from Prometheus")
327         else:
328             logging.warn("PROMETHEUS_HOST not found, not collecting activity from Prometheus")
329
330     if args.cost_report_file:
331         with open(args.cost_report_file, "wt") as f:
332             report_from_api(since, to, f)
333     else:
334         logging.warn("--cost-report-file not provided, not writing cost report")
335
336 if __name__ == "__main__":
337     main()