16417: Installs and configures Grafana Alloy on all nodes.
[arvados.git] / tools / cluster-activity / arvados_cluster_activity / report.py
1 #!/usr/bin/env python3
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 import logging
7 import ciso8601
8 import arvados.util
9 import re
10 import csv
11 import math
12 import collections
13 import json
14 from datetime import date, datetime, timedelta
15 from typing import List
16 import statistics
17
18 from dataclasses import dataclass
19 from arvados_cluster_activity.prometheus import get_metric_usage, get_data_usage
20 from arvados_cluster_activity.reportchart import ReportChart
21
22
23 @dataclass
24 class WorkflowRunSummary:
25     name: str
26     uuid: str
27     cost: List[float]
28     hours: List[float]
29     count: int = 0
30
31
32 @dataclass
33 class ProjectSummary:
34     users: set
35     uuid: str
36     runs: dict[str, WorkflowRunSummary]
37     earliest: datetime = datetime(year=9999, month=1, day=1)
38     latest: datetime = datetime(year=1900, month=1, day=1)
39     name: str = ""
40     cost: float = 0
41     count: int = 0
42     hours: float = 0
43     activityspan: str = ""
44     tablerow: str = ""
45
46
47 def aws_monthly_cost(value):
48     value_gb = value / (1024*1024*1024)
49     first_50tb = min(1024*50, value_gb)
50     next_450tb = max(min(1024*450, value_gb-1024*50), 0)
51     over_500tb = max(value_gb-1024*500, 0)
52
53     monthly_cost = (first_50tb * 0.023) + (next_450tb * 0.022) + (over_500tb * 0.021)
54     return monthly_cost
55
56
57 def format_with_suffix_base2(summary_value):
58     for scale in ["KiB", "MiB", "GiB", "TiB", "PiB", "EiB"]:
59         summary_value = summary_value / 1024
60         if summary_value < 1024:
61             return "%.3f %s" % (summary_value, scale)
62
63 def format_with_suffix_base10(summary_value):
64     for scale in ["KB", "MB", "GB", "TB", "PB", "EB"]:
65         summary_value = summary_value / 1000
66         if summary_value < 1000:
67             return "%.3f %s" % (summary_value, scale)
68
69 containers_graph = ('Concurrent running containers', 'containers')
70 storage_graph = ('Storage usage', 'used')
71 managed_graph = ('Data under management', 'managed')
72
73
74 def runtime_str(container_request, containers):
75     length = ciso8601.parse_datetime(containers[container_request["container_uuid"]]["finished_at"]) - ciso8601.parse_datetime(containers[container_request["container_uuid"]]["started_at"])
76
77     hours = length.days * 24 + (length.seconds // 3600)
78     minutes = (length.seconds // 60) % 60
79     seconds = length.seconds % 60
80
81     return "%i:%02i:%02i" % (hours, minutes, seconds)
82
83 def runtime_in_hours(runtime):
84     sp = runtime.split(":")
85     hours = float(sp[0])
86     hours += float(sp[1]) / 60
87     hours += float(sp[2]) / 3600
88     return hours
89
90 def hours_to_runtime_str(frac_hours):
91     hours = math.floor(frac_hours)
92     minutes = (frac_hours - math.floor(frac_hours)) * 60.0
93     seconds = (minutes - math.floor(minutes)) * 60.0
94
95     return "%i:%02i:%02i" % (hours, minutes, seconds)
96
97
98 def csv_dateformat(datestr):
99     dt = ciso8601.parse_datetime(datestr)
100     return dt.strftime("%Y-%m-%d %H:%M:%S")
101
102
103 class ClusterActivityReport(object):
104     def __init__(self, prom_client):
105         self.arv_client = arvados.api()
106         self.prom_client = prom_client
107         self.cluster = self.arv_client.config()["ClusterID"]
108
109         self.active_users = set()
110         self.project_summary = {}
111         self.total_hours = 0
112         self.total_cost = 0
113         self.total_workflows = 0
114         self.storage_cost = 0
115         self.summary_fetched = False
116         self.graphs = {}
117
118     def collect_graph(self, since, to, metric, resample_to, extra=None):
119         if not self.prom_client:
120             return
121
122         flatdata = []
123
124         for series in get_metric_usage(self.prom_client, since, to, metric % self.cluster, resampleTo=resample_to):
125             for t in series.itertuples():
126                 flatdata.append([t[0], t[1]])
127                 if extra:
128                     extra(t[0], t[1])
129
130         return flatdata
131
132     def collect_storage_cost(self, timestamp, value):
133         self.storage_cost += aws_monthly_cost(value) / (30*24)
134
135     def html_report(self, since, to, exclude, include_workflow_steps):
136         """Get a cluster activity report for the desired time period,
137         returning a string containing the report as an HTML document."""
138
139         self.label = "Cluster report for %s from %s to %s" % (self.cluster, since.date(), to.date())
140
141         if not self.summary_fetched:
142             # If we haven't done it already, need to fetch everything
143             # from the API to collect summary stats (report_from_api
144             # calls collect_summary_stats on each row).
145             #
146             # Because it is a Python generator, we need call it in a
147             # loop to process all the rows.  This method also yields
148             # each row which is used by a different function to create
149             # the CSV report, but for the HTML report we just discard
150             # them.
151             for row in self.report_from_api(since, to, include_workflow_steps, exclude):
152                 pass
153
154         container_cumulative_hours = 0
155         def collect_container_hours(timestamp, value):
156             nonlocal container_cumulative_hours
157             # resampled to 5 minute increments but we want
158             # a sum of hours
159             container_cumulative_hours += value / 12
160
161         logging.info("Getting container hours time series")
162
163         self.graphs[containers_graph] = self.collect_graph(since, to,
164                            "arvados_dispatchcloud_containers_running{cluster='%s'}",
165                            resample_to="5min",
166                            extra=collect_container_hours
167                            )
168
169         logging.info("Getting data usage time series")
170         self.graphs[managed_graph] = self.collect_graph(since, to,
171                            "arvados_keep_collection_bytes{cluster='%s'}", resample_to="60min")
172
173         self.graphs[storage_graph] = self.collect_graph(since, to,
174                            "arvados_keep_total_bytes{cluster='%s'}", resample_to="60min",
175                                                         extra=self.collect_storage_cost)
176
177         managed_data_now = None
178         storage_used_now = None
179
180         if len(self.graphs.get(managed_graph, [])) > 0:
181             managed_data_now = self.graphs[managed_graph][-1][1]
182
183         if len(self.graphs.get(storage_graph, [])) > 0:
184             storage_used_now = self.graphs[storage_graph][-1][1]
185
186         if managed_data_now and storage_used_now:
187             storage_cost = aws_monthly_cost(storage_used_now)
188             dedup_ratio = managed_data_now/storage_used_now
189
190
191         label = self.label
192
193         cards = []
194
195         workbench = self.arv_client.config()["Services"]["Workbench2"]["ExternalURL"]
196         if workbench.endswith("/"):
197             workbench = workbench[:-1]
198
199         if to.date() == self.today():
200             # The deduplication ratio overstates things a bit, you can
201             # have collections which reference a small slice of a large
202             # block, and this messes up the intuitive value of this ratio
203             # and exagerates the effect.
204             #
205             # So for now, as much fun as this is, I'm excluding it from
206             # the report.
207             #
208             # dedup_savings = aws_monthly_cost(managed_data_now) - storage_cost
209             # <tr><th>Monthly savings from storage deduplication</th> <td>${dedup_savings:,.2f}</td></tr>
210
211             data_rows = ""
212             if managed_data_now and storage_used_now:
213                 data_rows = """
214             <tr><th>Total data under management</th> <td>{managed_data_now}</td></tr>
215             <tr><th>Total storage usage</th> <td>{storage_used_now}</td></tr>
216             <tr><th>Deduplication ratio</th> <td>{dedup_ratio:.1f}</td></tr>
217             <tr><th>Approximate monthly storage cost</th> <td>${storage_cost:,.2f}</td></tr>
218                 """.format(
219                        managed_data_now=format_with_suffix_base10(managed_data_now),
220                        storage_used_now=format_with_suffix_base10(storage_used_now),
221                        storage_cost=storage_cost,
222                        dedup_ratio=dedup_ratio,
223                 )
224
225             cards.append("""<h2>Cluster status as of {now}</h2>
226             <table class='aggtable'><tbody>
227             <tr><th><a href="{workbench}/users">Total users</a></th><td>{total_users}</td></tr>
228             <tr><th>Total projects</th><td>{total_projects}</td></tr>
229             {data_rows}
230             </tbody></table>
231             <p>See <a href="#prices">note on usage and cost calculations</a> for details on how costs are calculated.</p>
232             """.format(now=self.today(),
233                        total_users=self.total_users,
234                        total_projects=self.total_projects,
235                        workbench=workbench,
236                        data_rows=data_rows))
237
238         # We have a couple of options for getting total container hours
239         #
240         # total_hours=container_cumulative_hours
241         #
242         # calculates the sum from prometheus metrics
243         #
244         # total_hours=self.total_hours
245         #
246         # calculates the sum of the containers that were fetched
247         #
248         # The problem is these numbers tend not to match, especially
249         # if the report generation was not called with "include
250         # workflow steps".
251         #
252         # I decided to use the sum from containers fetched, because it
253         # will match the sum of compute time for each project listed
254         # in the report.
255
256         cards.append("""<h2>Activity and cost over the {reporting_days} day period {since} to {to}</h2>
257         <table class='aggtable'><tbody>
258         <tr><th>Active users</th> <td>{active_users}</td></tr>
259         <tr><th><a href="#Active_Projects">Active projects</a></th> <td>{active_projects}</td></tr>
260         <tr><th>Workflow runs</th> <td>{total_workflows:,}</td></tr>
261         <tr><th>Compute used</th> <td>{total_hours:,.1f} hours</td></tr>
262         <tr><th>Compute cost</th> <td>${total_cost:,.2f}</td></tr>
263         <tr><th>Storage cost</th> <td>${storage_cost:,.2f}</td></tr>
264         </tbody></table>
265         <p>See <a href="#prices">note on usage and cost calculations</a> for details on how costs are calculated.</p>
266         """.format(active_users=len(self.active_users),
267                    total_users=self.total_users,
268                    total_hours=self.total_hours,
269                    total_cost=self.total_cost,
270                    total_workflows=self.total_workflows,
271                    active_projects=len(self.project_summary),
272                    since=since.date(), to=to.date(),
273                    reporting_days=(to - since).days,
274                    storage_cost=self.storage_cost))
275
276         projectlist = sorted(self.project_summary.items(), key=lambda x: x[1].cost, reverse=True)
277
278         for k, prj in projectlist:
279             if prj.earliest.date() == prj.latest.date():
280                 prj.activityspan = "{}".format(prj.earliest.date())
281             else:
282                 prj.activityspan = "{} to {}".format(prj.earliest.date(), prj.latest.date())
283
284             prj.tablerow = """<td>{users}</td> <td>{active}</td> <td>{hours:,.1f}</td> <td>${cost:,.2f}</td>""".format(
285                 active=prj.activityspan,
286                 cost=prj.cost,
287                 hours=prj.hours,
288                 users=", ".join(prj.users),
289             )
290
291         cards.append("""
292                 <div id="chart"></div>
293             """)
294
295         cards.append(
296             """
297             <a id="Active_Projects"><h2>Active Projects</h2></a>
298             <table class='sortable active-projects'>
299             <thead><tr><th>Project</th> <th>Users</th> <th>Active</th> <th>Compute usage (hours)</th> <th>Compute cost</th> </tr></thead>
300             <tbody><tr>{projects}</tr></tbody>
301             </table>
302             <p>See <a href="#prices">note on usage and cost calculations</a> for details on how costs are calculated.</p>
303             """.format(projects="</tr>\n<tr>".join("""<td><a href="#{name}">{name}</a></td>{rest}""".format(name=prj.name, rest=prj.tablerow) for k, prj in projectlist)))
304
305         for k, prj in projectlist:
306             wfsum = []
307             for k2, r in sorted(prj.runs.items(), key=lambda x: x[1].count, reverse=True):
308                 wfsum.append("""
309                 <tr><td>{count}</td> <td>{workflowlink}</td> <td>{median_runtime}</td> <td>{mean_runtime}</td> <td>${median_cost:,.2f}</td> <td>${mean_cost:,.2f}</td> <td>${totalcost:,.2f}</td></tr>
310                 """.format(
311                     count=r.count,
312                     mean_runtime=hours_to_runtime_str(statistics.mean(r.hours)),
313                     median_runtime=hours_to_runtime_str(statistics.median(r.hours)),
314                     mean_cost=statistics.mean(r.cost),
315                     median_cost=statistics.median(r.cost),
316                     totalcost=sum(r.cost),
317                     workflowlink="""<a href="{workbench}/workflows/{uuid}">{name}</a>""".format(workbench=workbench,uuid=r.uuid,name=r.name)
318                     if r.uuid != "none" else r.name))
319
320             cards.append(
321                 """<a id="{name}"></a><a href="{workbench}/projects/{uuid}"><h2>{name}</h2></a>
322
323                 <table class='sortable single-project'>
324                 <thead><tr> <th>Users</th> <th>Active</th> <th>Compute usage (hours)</th> <th>Compute cost</th> </tr></thead>
325                 <tbody><tr>{projectrow}</tr></tbody>
326                 </table>
327
328                 <table class='sortable project'>
329                 <thead><tr><th>Workflow run count</th> <th>Workflow name</th> <th>Median runtime</th> <th>Mean runtime</th> <th>Median cost per run</th> <th>Mean cost per run</th> <th>Sum cost over runs</th></tr></thead>
330                 <tbody>
331                 {wfsum}
332                 </tbody></table>
333                 """.format(name=prj.name,
334                            wfsum=" ".join(wfsum),
335                            projectrow=prj.tablerow,
336                            workbench=workbench,
337                            uuid=prj.uuid)
338             )
339
340         # The deduplication ratio overstates things a bit, you can
341         # have collections which reference a small slice of a large
342         # block, and this messes up the intuitive value of this ratio
343         # and exagerates the effect.
344         #
345         # So for now, as much fun as this is, I'm excluding it from
346         # the report.
347         #
348         # <p>"Monthly savings from storage deduplication" is the
349         # estimated cost difference between "storage usage" and "data
350         # under management" as a way of comparing with other
351         # technologies that do not support data deduplication.</p>
352
353
354         cards.append("""
355         <h2 id="prices">Note on usage and cost calculations</h2>
356
357         <div style="max-width: 60em">
358
359         <p>The numbers presented in this report are estimates and will
360         not perfectly match your cloud bill.  Nevertheless this report
361         should be useful for identifying your main cost drivers.</p>
362
363         <h3>Storage</h3>
364
365         <p>"Total data under management" is what you get if you add up
366         all blocks referenced by all collections in Workbench, without
367         considering deduplication.</p>
368
369         <p>"Total storage usage" is the actual underlying storage
370         usage, accounting for data deduplication.</p>
371
372         <p>Storage costs are based on AWS "S3 Standard"
373         described on the <a href="https://aws.amazon.com/s3/pricing/">Amazon S3 pricing</a> page:</p>
374
375         <ul>
376         <li>$0.023 per GB / Month for the first 50 TB</li>
377         <li>$0.022 per GB / Month for the next 450 TB</li>
378         <li>$0.021 per GB / Month over 500 TB</li>
379         </ul>
380
381         <p>Finally, this only the base storage cost, and does not
382         include any fees associated with S3 API usage.  However, there
383         are generally no ingress/egress fees if your Arvados instance
384         and S3 bucket are in the same region, which is the normal
385         recommended configuration.</p>
386
387         <h3>Compute</h3>
388
389         <p>"Compute usage" are instance-hours used in running
390         workflows.  Because multiple steps may run in parallel on
391         multiple instances, a workflow that completes in four hours
392         but runs parallel steps on five instances, would be reported
393         as using 20 instance hours.</p>
394
395         <p>"Runtime" is the actual wall clock time that it took to
396         complete a workflow.  This does not include time spent in the
397         queue for the workflow itself, but does include queuing time
398         of individual workflow steps.</p>
399
400         <p>Computational costs are derived from Arvados cost
401         calculations of container runs.  For on-demand instances, this
402         uses the prices from the InstanceTypes section of the Arvado
403         config file, set by the system administrator.  For spot
404         instances, this uses current spot prices retrieved on the fly
405         the AWS API.</p>
406
407         <p>Be aware that the cost calculations are only for the time
408         the container is running and only do not take into account the
409         overhead of launching instances or idle time between scheduled
410         tasks or prior to automatic shutdown.</p>
411
412         </div>
413         """)
414
415         return ReportChart(label, cards, self.graphs).html()
416
417     def iter_container_info(self, pending, include_steps, exclude):
418         # "pending" is a list of arvados-cwl-runner container requests
419         # returned by the API.  This method fetches detailed
420         # information about the runs and yields report rows.
421
422         # 1. Get container records corresponding to container requests.
423         containers = {}
424
425         for container in arvados.util.keyset_list_all(
426             self.arv_client.containers().list,
427             filters=[
428                 ["uuid", "in", [c["container_uuid"] for c in pending if c["container_uuid"]]],
429             ],
430             select=["uuid", "started_at", "finished_at", "cost"]):
431
432             containers[container["uuid"]] = container
433
434         # 2. Look for the template_uuid property and fetch the
435         # corresponding workflow record.
436         workflows = {}
437         workflows["none"] = "workflow run from command line"
438
439         for wf in arvados.util.keyset_list_all(
440                 self.arv_client.workflows().list,
441                 filters=[
442                     ["uuid", "in", list(set(c["properties"]["template_uuid"]
443                                             for c in pending
444                                             if "template_uuid" in c["properties"] and c["properties"]["template_uuid"].startswith(self.arv_client.config()["ClusterID"])))],
445                 ],
446                 select=["uuid", "name"]):
447             workflows[wf["uuid"]] = wf["name"]
448
449         # 3. Look at owner_uuid and fetch owning projects and users
450         projects = {}
451
452         for pr in arvados.util.keyset_list_all(
453                 self.arv_client.groups().list,
454                 filters=[
455                     ["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'j7d0g'))],
456                 ],
457                 select=["uuid", "name"]):
458             projects[pr["uuid"]] = pr["name"]
459
460         # 4. Look at owner_uuid and modified_by_user_uuid and get user records
461         for pr in arvados.util.keyset_list_all(
462                 self.arv_client.users().list,
463                 filters=[
464                     ["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'tpzed')|set(c["modified_by_user_uuid"] for c in pending))],
465                 ],
466                 select=["uuid", "full_name", "first_name", "last_name"]):
467             projects[pr["uuid"]] = pr["full_name"]
468
469         # 5. Optionally iterate over individual workflow steps.
470         if include_steps:
471             name_regex = re.compile(r"(.+)_[0-9]+")
472             child_crs = {}
473             child_cr_containers = set()
474             stepcount = 0
475
476             # 5.1. Go through the container requests owned by the toplevel workflow container
477             logging.info("Getting workflow steps")
478             for cr in arvados.util.keyset_list_all(
479                 self.arv_client.container_requests().list,
480                 filters=[
481                     ["requesting_container_uuid", "in", list(containers.keys())],
482                 ],
483                 select=["uuid", "name", "cumulative_cost", "requesting_container_uuid", "container_uuid"]):
484
485                 if cr["cumulative_cost"] == 0:
486                     continue
487
488                 g = name_regex.fullmatch(cr["name"])
489                 if g:
490                     cr["name"] = g[1]
491
492                 # 5.2. Get the containers corresponding to the
493                 # container requests.  This has the same logic as
494                 # report_from_api where we batch it into 1000 items at
495                 # a time.
496                 child_crs.setdefault(cr["requesting_container_uuid"], []).append(cr)
497                 child_cr_containers.add(cr["container_uuid"])
498                 if len(child_cr_containers) == 1000:
499                     stepcount += len(child_cr_containers)
500                     for container in arvados.util.keyset_list_all(
501                             self.arv_client.containers().list,
502                             filters=[
503                                 ["uuid", "in", list(child_cr_containers)],
504                             ],
505                             select=["uuid", "started_at", "finished_at", "cost"]):
506
507                         containers[container["uuid"]] = container
508
509                     logging.info("Got workflow steps %s - %s", stepcount-len(child_cr_containers), stepcount)
510                     child_cr_containers.clear()
511
512             # Get any remaining containers
513             if child_cr_containers:
514                 stepcount += len(child_cr_containers)
515                 for container in arvados.util.keyset_list_all(
516                         self.arv_client.containers().list,
517                         filters=[
518                             ["uuid", "in", list(child_cr_containers)],
519                         ],
520                         select=["uuid", "started_at", "finished_at", "cost"]):
521
522                     containers[container["uuid"]] = container
523                 logging.info("Got workflow steps %s - %s", stepcount-len(child_cr_containers), stepcount)
524
525         # 6. Now go through the list of workflow runs, yield a row
526         # with all the information we have collected, as well as the
527         # details for each workflow step (if enabled)
528         for container_request in pending:
529             if not container_request["container_uuid"] or not containers[container_request["container_uuid"]]["started_at"] or not containers[container_request["container_uuid"]]["finished_at"]:
530                 continue
531
532             template_uuid = container_request["properties"].get("template_uuid", "none")
533             workflowname = container_request["name"] if template_uuid == "none" else workflows.get(template_uuid, template_uuid)
534
535             if exclude and re.search(exclude, workflowname, flags=re.IGNORECASE):
536                 continue
537
538             yield {
539                 "Project": projects.get(container_request["owner_uuid"], "unknown owner"),
540                 "ProjectUUID": container_request["owner_uuid"],
541                 "Workflow": workflowname,
542                 "WorkflowUUID": container_request["properties"].get("template_uuid", "none"),
543                 "Step": "workflow runner",
544                 "StepUUID": container_request["uuid"],
545                 "Sample": container_request["name"],
546                 "SampleUUID": container_request["uuid"],
547                 "User": projects.get(container_request["modified_by_user_uuid"], "unknown user"),
548                 "UserUUID": container_request["modified_by_user_uuid"],
549                 "Submitted": csv_dateformat(container_request["created_at"]),
550                 "Started": csv_dateformat(containers[container_request["container_uuid"]]["started_at"]),
551                 "Finished": csv_dateformat(containers[container_request["container_uuid"]]["finished_at"]),
552                 "Runtime": runtime_str(container_request, containers),
553                 "Cost": round(containers[container_request["container_uuid"]]["cost"] if include_steps else container_request["cumulative_cost"], 3),
554                 "CumulativeCost": round(container_request["cumulative_cost"], 3)
555                 }
556
557             if include_steps:
558                 for child_cr in child_crs.get(container_request["container_uuid"], []):
559                     if not child_cr["container_uuid"] or not containers[child_cr["container_uuid"]]["started_at"] or not containers[child_cr["container_uuid"]]["finished_at"]:
560                         continue
561                     yield {
562                         "Project": projects.get(container_request["owner_uuid"], "unknown owner"),
563                         "ProjectUUID": container_request["owner_uuid"],
564                         "Workflow": workflows.get(container_request["properties"].get("template_uuid", "none"), "workflow missing"),
565                         "WorkflowUUID": container_request["properties"].get("template_uuid", "none"),
566                         "Step": child_cr["name"],
567                         "StepUUID": child_cr["uuid"],
568                         "Sample": container_request["name"],
569                         "SampleUUID": container_request["name"],
570                         "User": projects.get(container_request["modified_by_user_uuid"], "unknown user"),
571                         "UserUUID": container_request["modified_by_user_uuid"],
572                         "Submitted": csv_dateformat(child_cr["created_at"]),
573                         "Started": csv_dateformat(containers[child_cr["container_uuid"]]["started_at"]),
574                         "Finished": csv_dateformat(containers[child_cr["container_uuid"]]["finished_at"]),
575                         "Runtime": runtime_str(child_cr, containers),
576                         "Cost": round(containers[child_cr["container_uuid"]]["cost"], 3),
577                         "CumulativeCost": round(containers[child_cr["container_uuid"]]["cost"], 3),
578                         }
579
580
581     def collect_summary_stats(self, row):
582         self.active_users.add(row["User"])
583         self.project_summary.setdefault(row["ProjectUUID"],
584                                         ProjectSummary(users=set(),
585                                                        runs={},
586                                                        uuid=row["ProjectUUID"],
587                                                        name=row["Project"]))
588         prj = self.project_summary[row["ProjectUUID"]]
589         cost = row["Cost"]
590         prj.cost += cost
591         prj.count += 1
592         prj.users.add(row["User"])
593         hrs = runtime_in_hours(row["Runtime"])
594         prj.hours += hrs
595
596         started = datetime.strptime(row["Started"], "%Y-%m-%d %H:%M:%S")
597         finished = datetime.strptime(row["Finished"], "%Y-%m-%d %H:%M:%S")
598
599         if started < prj.earliest:
600             prj.earliest = started
601
602         if finished > prj.latest:
603             prj.latest = finished
604
605         if row["Step"] == "workflow runner":
606             prj.runs.setdefault(row["Workflow"], WorkflowRunSummary(name=row["Workflow"],
607                                                                     uuid=row["WorkflowUUID"],
608                                                                     cost=[], hours=[]))
609             wfuuid = row["Workflow"]
610             prj.runs[wfuuid].count += 1
611             prj.runs[wfuuid].cost.append(row["CumulativeCost"])
612             prj.runs[wfuuid].hours.append(hrs)
613             self.total_workflows += 1
614
615         self.total_hours += hrs
616         self.total_cost += cost
617
618     def report_from_api(self, since, to, include_steps, exclude):
619         pending = []
620
621         count = 0
622         for container_request in arvados.util.keyset_list_all(
623                 self.arv_client.container_requests().list,
624                 filters=[
625                     ["command", "like", "[\"arvados-cwl-runner%"],
626                     ["created_at", ">=", since.strftime("%Y%m%dT%H%M%SZ")],
627                     ["created_at", "<=", to.strftime("%Y%m%dT%H%M%SZ")],
628                 ],
629                 select=["uuid", "owner_uuid", "container_uuid", "name", "cumulative_cost", "properties", "modified_by_user_uuid", "created_at"]):
630
631             if container_request["cumulative_cost"] == 0:
632                 continue
633
634             # Every 1000 container requests, we fetch the
635             # corresponding container records.
636             #
637             # What's so special about 1000?  Because that's the
638             # maximum Arvados page size, so when we use ['uuid', 'in',
639             # [...]] to fetch associated records it doesn't make sense
640             # to provide more than 1000 uuids.
641             #
642             # TODO: use the ?include=container_uuid feature so a
643             # separate request to the containers table isn't necessary.
644             if len(pending) < 1000:
645                 pending.append(container_request)
646             else:
647                 count += len(pending)
648                 logging.info("Exporting workflow runs %s - %s", count-len(pending), count)
649                 for row in self.iter_container_info(pending, include_steps, exclude):
650                     self.collect_summary_stats(row)
651                     yield row
652                 pending.clear()
653
654         count += len(pending)
655         logging.info("Exporting workflow runs %s - %s", count-len(pending), count)
656         for row in self.iter_container_info(pending, include_steps, exclude):
657             self.collect_summary_stats(row)
658             yield row
659
660         userinfo = self.arv_client.users().list(filters=[["is_active", "=", True]], limit=0).execute()
661         self.total_users = userinfo["items_available"]
662
663         groupinfo = self.arv_client.groups().list(filters=[["group_class", "=", "project"]], limit=0).execute()
664         self.total_projects = groupinfo["items_available"]
665
666     def csv_report(self, since, to, out, include_steps, columns, exclude):
667         if columns:
668             columns = columns.split(",")
669         else:
670             if include_steps:
671                 columns = ("Project", "Workflow", "Step", "Sample", "User", "Submitted", "Runtime", "Cost")
672             else:
673                 columns = ("Project", "Workflow", "Sample", "User", "Submitted", "Runtime", "Cost")
674
675         csvwriter = csv.DictWriter(out, fieldnames=columns, extrasaction="ignore")
676         csvwriter.writeheader()
677
678         for row in self.report_from_api(since, to, include_steps, exclude):
679             csvwriter.writerow(row)
680
681         self.summary_fetched = True
682
683     def today(self):
684         return date.today()