21121: Fix warnings
[arvados.git] / tools / cluster-activity / arvados_cluster_activity / report.py
1 #!/usr/bin/env python3
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 import logging
7 import ciso8601
8 import arvados.util
9 import re
10 import csv
11 import math
12 import collections
13 import json
14 from datetime import date, datetime, timedelta
15 from typing import List
16 import statistics
17
18 from dataclasses import dataclass
19 from arvados_cluster_activity.prometheus import get_metric_usage, get_data_usage
20 from arvados_cluster_activity.reportchart import ReportChart
21
22
23 @dataclass
24 class WorkflowRunSummary:
25     name: str
26     uuid: str
27     cost: List[float]
28     hours: List[float]
29     count: int = 0
30
31
32 @dataclass
33 class ProjectSummary:
34     users: set
35     uuid: str
36     runs: dict[str, WorkflowRunSummary]
37     earliest: datetime = datetime(year=9999, month=1, day=1)
38     latest: datetime = datetime(year=1900, month=1, day=1)
39     name: str = ""
40     cost: float = 0
41     count: int = 0
42     hours: float = 0
43     activityspan: str = ""
44     tablerow: str = ""
45
46
47 def aws_monthly_cost(value):
48     value_gb = value / (1024*1024*1024)
49     first_50tb = min(1024*50, value_gb)
50     next_450tb = max(min(1024*450, value_gb-1024*50), 0)
51     over_500tb = max(value_gb-1024*500, 0)
52
53     monthly_cost = (first_50tb * 0.023) + (next_450tb * 0.022) + (over_500tb * 0.021)
54     return monthly_cost
55
56
57 def format_with_suffix_base2(summary_value):
58     for scale in ["KiB", "MiB", "GiB", "TiB", "PiB", "EiB"]:
59         summary_value = summary_value / 1024
60         if summary_value < 1024:
61             return "%.3f %s" % (summary_value, scale)
62
63 def format_with_suffix_base10(summary_value):
64     for scale in ["KB", "MB", "GB", "TB", "PB", "EB"]:
65         summary_value = summary_value / 1000
66         if summary_value < 1000:
67             return "%.3f %s" % (summary_value, scale)
68
69 containers_graph = ('Concurrent running containers', 'containers')
70 storage_graph = ('Storage usage', 'used')
71 managed_graph = ('Data under management', 'managed')
72
73
74 def runtime_str(container_request, containers):
75     length = ciso8601.parse_datetime(containers[container_request["container_uuid"]]["finished_at"]) - ciso8601.parse_datetime(containers[container_request["container_uuid"]]["started_at"])
76
77     hours = length.days * 24 + (length.seconds // 3600)
78     minutes = (length.seconds // 60) % 60
79     seconds = length.seconds % 60
80
81     return "%i:%02i:%02i" % (hours, minutes, seconds)
82
83 def runtime_in_hours(runtime):
84     sp = runtime.split(":")
85     hours = float(sp[0])
86     hours += float(sp[1]) / 60
87     hours += float(sp[2]) / 3600
88     return hours
89
90 def hours_to_runtime_str(frac_hours):
91     hours = math.floor(frac_hours)
92     minutes = (frac_hours - math.floor(frac_hours)) * 60.0
93     seconds = (minutes - math.floor(minutes)) * 60.0
94
95     return "%i:%02i:%02i" % (hours, minutes, seconds)
96
97
98 def csv_dateformat(datestr):
99     dt = ciso8601.parse_datetime(datestr)
100     return dt.strftime("%Y-%m-%d %H:%M:%S")
101
102
103 class ClusterActivityReport(object):
104     def __init__(self, prom_client):
105         self.arv_client = arvados.api()
106         self.prom_client = prom_client
107         self.cluster = self.arv_client.config()["ClusterID"]
108
109         self.active_users = set()
110         self.project_summary = {}
111         self.total_hours = 0
112         self.total_cost = 0
113         self.total_workflows = 0
114         self.storage_cost = 0
115         self.summary_fetched = False
116         self.graphs = {}
117
118     def collect_graph(self, since, to, metric, resample_to, extra=None):
119         if not self.prom_client:
120             return
121
122         flatdata = []
123
124         for series in get_metric_usage(self.prom_client, since, to, metric % self.cluster, resampleTo=resample_to):
125             for t in series.itertuples():
126                 flatdata.append([t[0], t[1]])
127                 if extra:
128                     extra(t[0], t[1])
129
130         return flatdata
131
132     def collect_storage_cost(self, timestamp, value):
133         self.storage_cost += aws_monthly_cost(value) / (30*24)
134
135     def html_report(self, since, to, exclude, include_workflow_steps):
136         """Get a cluster activity report for the desired time period,
137         returning a string containing the report as an HTML document."""
138
139         self.label = "Cluster report for %s from %s to %s" % (self.cluster, since.date(), to.date())
140
141         # If we already produced a CSV report we have summary stats
142         # and don't need to fetch everything all over again.
143         if not self.summary_fetched:
144             for row in self.report_from_api(since, to, include_workflow_steps, exclude):
145                 pass
146
147         container_cumulative_hours = 0
148         def collect_container_hours(timestamp, value):
149             nonlocal container_cumulative_hours
150             # resampled to 5 minute increments but we want
151             # a sum of hours
152             container_cumulative_hours += value / 12
153
154         logging.info("Getting container hours time series")
155
156         self.graphs[containers_graph] = self.collect_graph(since, to,
157                            "arvados_dispatchcloud_containers_running{cluster='%s'}",
158                            resample_to="5min",
159                            extra=collect_container_hours
160                            )
161
162         logging.info("Getting data usage time series")
163         self.graphs[managed_graph] = self.collect_graph(since, to,
164                            "arvados_keep_collection_bytes{cluster='%s'}", resample_to="60min")
165
166         self.graphs[storage_graph] = self.collect_graph(since, to,
167                            "arvados_keep_total_bytes{cluster='%s'}", resample_to="60min",
168                                                         extra=self.collect_storage_cost)
169
170         managed_data_now = None
171         storage_used_now = None
172
173         if len(self.graphs.get(managed_graph, [])) > 0:
174             managed_data_now = self.graphs[managed_graph][-1][1]
175
176         if len(self.graphs.get(storage_graph, [])) > 0:
177             storage_used_now = self.graphs[storage_graph][-1][1]
178
179         if managed_data_now and storage_used_now:
180             storage_cost = aws_monthly_cost(storage_used_now)
181             dedup_ratio = managed_data_now/storage_used_now
182
183
184         label = self.label
185
186         cards = []
187
188         workbench = self.arv_client.config()["Services"]["Workbench2"]["ExternalURL"]
189         if workbench.endswith("/"):
190             workbench = workbench[:-1]
191
192         print(to.date(), self.today())
193         if to.date() == self.today():
194             # The deduplication ratio overstates things a bit, you can
195             # have collections which reference a small slice of a large
196             # block, and this messes up the intuitive value of this ratio
197             # and exagerates the effect.
198             #
199             # So for now, as much fun as this is, I'm excluding it from
200             # the report.
201             #
202             # dedup_savings = aws_monthly_cost(managed_data_now) - storage_cost
203             # <tr><th>Monthly savings from storage deduplication</th> <td>${dedup_savings:,.2f}</td></tr>
204
205             data_rows = ""
206             if managed_data_now and storage_used_now:
207                 data_rows = """
208             <tr><th>Total data under management</th> <td>{managed_data_now}</td></tr>
209             <tr><th>Total storage usage</th> <td>{storage_used_now}</td></tr>
210             <tr><th>Deduplication ratio</th> <td>{dedup_ratio:.1f}</td></tr>
211             <tr><th>Approximate monthly storage cost</th> <td>${storage_cost:,.2f}</td></tr>
212                 """.format(
213                        managed_data_now=format_with_suffix_base10(managed_data_now),
214                        storage_used_now=format_with_suffix_base10(storage_used_now),
215                        storage_cost=storage_cost,
216                        dedup_ratio=dedup_ratio,
217                 )
218
219             cards.append("""<h2>Cluster status as of {now}</h2>
220             <table class='aggtable'><tbody>
221             <tr><th><a href="{workbench}/users">Total users</a></th><td>{total_users}</td></tr>
222             <tr><th>Total projects</th><td>{total_projects}</td></tr>
223             {data_rows}
224             </tbody></table>
225             <p>See <a href="#prices">note on usage and cost calculations</a> for details on how costs are calculated.</p>
226             """.format(now=self.today(),
227                        total_users=self.total_users,
228                        total_projects=self.total_projects,
229                        workbench=workbench,
230                        data_rows=data_rows))
231
232         cards.append("""<h2>Activity and cost over the {reporting_days} day period {since} to {to}</h2>
233         <table class='aggtable'><tbody>
234         <tr><th>Active users</th> <td>{active_users}</td></tr>
235         <tr><th><a href="#Active_Projects">Active projects</a></th> <td>{active_projects}</td></tr>
236         <tr><th>Workflow runs</th> <td>{total_workflows:,}</td></tr>
237         <tr><th>Compute used</th> <td>{total_hours:,.1f} hours</td></tr>
238         <tr><th>Compute cost</th> <td>${total_cost:,.2f}</td></tr>
239         <tr><th>Storage cost</th> <td>${storage_cost:,.2f}</td></tr>
240         </tbody></table>
241         <p>See <a href="#prices">note on usage and cost calculations</a> for details on how costs are calculated.</p>
242         """.format(active_users=len(self.active_users),
243                    total_users=self.total_users,
244                    #total_hours=container_cumulative_hours,
245                    total_hours=self.total_hours,
246                    total_cost=self.total_cost,
247                    total_workflows=self.total_workflows,
248                    active_projects=len(self.project_summary),
249                    since=since.date(), to=to.date(),
250                    reporting_days=(to - since).days,
251                    storage_cost=self.storage_cost))
252
253         projectlist = sorted(self.project_summary.items(), key=lambda x: x[1].cost, reverse=True)
254
255         for k, prj in projectlist:
256             if prj.earliest.date() == prj.latest.date():
257                 prj.activityspan = "{}".format(prj.earliest.date())
258             else:
259                 prj.activityspan = "{} to {}".format(prj.earliest.date(), prj.latest.date())
260
261             prj.tablerow = """<td>{users}</td> <td>{active}</td> <td>{hours:,.1f}</td> <td>${cost:,.2f}</td>""".format(
262                 active=prj.activityspan,
263                 cost=prj.cost,
264                 hours=prj.hours,
265                 users=", ".join(prj.users),
266             )
267
268         cards.append("""
269                 <div id="chart"></div>
270             """)
271
272         cards.append(
273             """
274             <a id="Active_Projects"><h2>Active Projects</h2></a>
275             <table class='sortable active-projects'>
276             <thead><tr><th>Project</th> <th>Users</th> <th>Active</th> <th>Compute usage (hours)</th> <th>Compute cost</th> </tr></thead>
277             <tbody><tr>{projects}</tr></tbody>
278             </table>
279             <p>See <a href="#prices">note on usage and cost calculations</a> for details on how costs are calculated.</p>
280             """.format(projects="</tr>\n<tr>".join("""<td><a href="#{name}">{name}</a></td>{rest}""".format(name=prj.name, rest=prj.tablerow) for k, prj in projectlist)))
281
282         for k, prj in projectlist:
283             wfsum = []
284             for k2, r in sorted(prj.runs.items(), key=lambda x: x[1].count, reverse=True):
285                 wfsum.append("""
286                 <tr><td>{count}</td> <td>{workflowlink}</td> <td>{median_runtime}</td> <td>{mean_runtime}</td> <td>${median_cost:,.2f}</td> <td>${mean_cost:,.2f}</td> <td>${totalcost:,.2f}</td></tr>
287                 """.format(
288                     count=r.count,
289                     mean_runtime=hours_to_runtime_str(statistics.mean(r.hours)),
290                     median_runtime=hours_to_runtime_str(statistics.median(r.hours)),
291                     mean_cost=statistics.mean(r.cost),
292                     median_cost=statistics.median(r.cost),
293                     totalcost=sum(r.cost),
294                     workflowlink="""<a href="{workbench}/workflows/{uuid}">{name}</a>""".format(workbench=workbench,uuid=r.uuid,name=r.name)
295                     if r.uuid != "none" else r.name))
296
297             cards.append(
298                 """<a id="{name}"></a><a href="{workbench}/projects/{uuid}"><h2>{name}</h2></a>
299
300                 <table class='sortable single-project'>
301                 <thead><tr> <th>Users</th> <th>Active</th> <th>Compute usage (hours)</th> <th>Compute cost</th> </tr></thead>
302                 <tbody><tr>{projectrow}</tr></tbody>
303                 </table>
304
305                 <table class='sortable project'>
306                 <thead><tr><th>Workflow run count</th> <th>Workflow name</th> <th>Median runtime</th> <th>Mean runtime</th> <th>Median cost per run</th> <th>Mean cost per run</th> <th>Sum cost over runs</th></tr></thead>
307                 <tbody>
308                 {wfsum}
309                 </tbody></table>
310                 """.format(name=prj.name,
311                            users=", ".join(prj.users),
312                            cost=prj.cost,
313                            hours=prj.hours,
314                            wfsum=" ".join(wfsum),
315                            earliest=prj.earliest.date(),
316                            latest=prj.latest.date(),
317                            activity=prj.activityspan,
318                            userplural='s' if len(prj.users) > 1 else '',
319                            projectrow=prj.tablerow,
320                            workbench=workbench,
321                            uuid=prj.uuid)
322             )
323
324         # The deduplication ratio overstates things a bit, you can
325         # have collections which reference a small slice of a large
326         # block, and this messes up the intuitive value of this ratio
327         # and exagerates the effect.
328         #
329         # So for now, as much fun as this is, I'm excluding it from
330         # the report.
331         #
332         # <p>"Monthly savings from storage deduplication" is the
333         # estimated cost difference between "storage usage" and "data
334         # under management" as a way of comparing with other
335         # technologies that do not support data deduplication.</p>
336
337
338         cards.append("""
339         <h2 id="prices">Note on usage and cost calculations</h2>
340
341         <div style="max-width: 60em">
342
343         <p>The numbers presented in this report are estimates and will
344         not perfectly match your cloud bill.  Nevertheless this report
345         should be useful for identifying your main cost drivers.</p>
346
347         <h3>Storage</h3>
348
349         <p>"Total data under management" is what you get if you add up
350         all blocks referenced by all collections in Workbench, without
351         considering deduplication.</p>
352
353         <p>"Total storage usage" is the actual underlying storage
354         usage, accounting for data deduplication.</p>
355
356         <p>Storage costs are based on AWS "S3 Standard"
357         described on the <a href="https://aws.amazon.com/s3/pricing/">Amazon S3 pricing</a> page:</p>
358
359         <ul>
360         <li>$0.023 per GB / Month for the first 50 TB</li>
361         <li>$0.022 per GB / Month for the next 450 TB</li>
362         <li>$0.021 per GB / Month over 500 TB</li>
363         </ul>
364
365         <p>Finally, this only the base storage cost, and does not
366         include any fees associated with S3 API usage.  However, there
367         are generally no ingress/egress fees if your Arvados instance
368         and S3 bucket are in the same region, which is the normal
369         recommended configuration.</p>
370
371         <h3>Compute</h3>
372
373         <p>"Compute usage" are instance-hours used in running
374         workflows.  Because multiple steps may run in parallel on
375         multiple instances, a workflow that completes in four hours
376         but runs parallel steps on five instances, would be reported
377         as using 20 instance hours.</p>
378
379         <p>"Runtime" is the actual wall clock time that it took to
380         complete a workflow.  This does not include time spent in the
381         queue for the workflow itself, but does include queuing time
382         of individual workflow steps.</p>
383
384         <p>Computational costs are derived from Arvados cost
385         calculations of container runs.  For on-demand instances, this
386         uses the prices from the InstanceTypes section of the Arvado
387         config file, set by the system administrator.  For spot
388         instances, this uses current spot prices retrieved on the fly
389         the AWS API.</p>
390
391         <p>Be aware that the cost calculations are only for the time
392         the container is running and only do not take into account the
393         overhead of launching instances or idle time between scheduled
394         tasks or prior to automatic shutdown.</p>
395
396         </div>
397         """)
398
399         return ReportChart(label, cards, self.graphs).html()
400
401     def iter_container_info(self, pending, include_steps, exclude):
402         # "pending" is a list of arvados-cwl-runner container requests
403         # returned by the API.  This method fetches detailed
404         # information about the runs and yields report rows.
405
406         # 1. Get container records corresponding to container requests.
407         containers = {}
408
409         for container in arvados.util.keyset_list_all(
410             self.arv_client.containers().list,
411             filters=[
412                 ["uuid", "in", [c["container_uuid"] for c in pending if c["container_uuid"]]],
413             ],
414             select=["uuid", "started_at", "finished_at", "cost"]):
415
416             containers[container["uuid"]] = container
417
418         # 2. Look for the template_uuid property and fetch the
419         # corresponding workflow record.
420         workflows = {}
421         workflows["none"] = "workflow run from command line"
422
423         for wf in arvados.util.keyset_list_all(
424                 self.arv_client.workflows().list,
425                 filters=[
426                     ["uuid", "in", list(set(c["properties"]["template_uuid"]
427                                             for c in pending
428                                             if "template_uuid" in c["properties"] and c["properties"]["template_uuid"].startswith(self.arv_client.config()["ClusterID"])))],
429                 ],
430                 select=["uuid", "name"]):
431             workflows[wf["uuid"]] = wf["name"]
432
433         # 3. Look at owner_uuid and fetch owning projects and users
434         projects = {}
435
436         for pr in arvados.util.keyset_list_all(
437                 self.arv_client.groups().list,
438                 filters=[
439                     ["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'j7d0g'))],
440                 ],
441                 select=["uuid", "name"]):
442             projects[pr["uuid"]] = pr["name"]
443
444         # 4. Look at owner_uuid and modified_by_user_uuid and get user records
445         for pr in arvados.util.keyset_list_all(
446                 self.arv_client.users().list,
447                 filters=[
448                     ["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'tpzed')|set(c["modified_by_user_uuid"] for c in pending))],
449                 ],
450                 select=["uuid", "full_name", "first_name", "last_name"]):
451             projects[pr["uuid"]] = pr["full_name"]
452
453         # 5. Optionally iterate over individual workflow steps.
454         if include_steps:
455             name_regex = re.compile(r"(.+)_[0-9]+")
456             child_crs = {}
457             child_cr_containers = set()
458             stepcount = 0
459
460             # 5.1. Go through the container requests owned by the toplevel workflow container
461             logging.info("Getting workflow steps")
462             for cr in arvados.util.keyset_list_all(
463                 self.arv_client.container_requests().list,
464                 filters=[
465                     ["requesting_container_uuid", "in", list(containers.keys())],
466                 ],
467                 select=["uuid", "name", "cumulative_cost", "requesting_container_uuid", "container_uuid"]):
468
469                 if cr["cumulative_cost"] == 0:
470                     continue
471
472                 g = name_regex.fullmatch(cr["name"])
473                 if g:
474                     cr["name"] = g[1]
475
476                 # 5.2. Get the containers corresponding to the
477                 # container requests.  This has the same logic as
478                 # report_from_api where we batch it into 1000 items at
479                 # a time.
480                 child_crs.setdefault(cr["requesting_container_uuid"], []).append(cr)
481                 child_cr_containers.add(cr["container_uuid"])
482                 if len(child_cr_containers) == 1000:
483                     stepcount += len(child_cr_containers)
484                     for container in arvados.util.keyset_list_all(
485                             self.arv_client.containers().list,
486                             filters=[
487                                 ["uuid", "in", list(child_cr_containers)],
488                             ],
489                             select=["uuid", "started_at", "finished_at", "cost"]):
490
491                         containers[container["uuid"]] = container
492
493                     logging.info("Got workflow steps %s - %s", stepcount-len(child_cr_containers), stepcount)
494                     child_cr_containers.clear()
495
496             # Get any remaining containers
497             if child_cr_containers:
498                 stepcount += len(child_cr_containers)
499                 for container in arvados.util.keyset_list_all(
500                         self.arv_client.containers().list,
501                         filters=[
502                             ["uuid", "in", list(child_cr_containers)],
503                         ],
504                         select=["uuid", "started_at", "finished_at", "cost"]):
505
506                     containers[container["uuid"]] = container
507                 logging.info("Got workflow steps %s - %s", stepcount-len(child_cr_containers), stepcount)
508
509         # 6. Now go through the list of workflow runs, yield a row
510         # with all the information we have collected, as well as the
511         # details for each workflow step (if enabled)
512         for container_request in pending:
513             if not container_request["container_uuid"] or not containers[container_request["container_uuid"]]["started_at"] or not containers[container_request["container_uuid"]]["finished_at"]:
514                 continue
515
516             template_uuid = container_request["properties"].get("template_uuid", "none")
517             workflowname = container_request["name"] if template_uuid == "none" else workflows.get(template_uuid, template_uuid)
518
519             if exclude and re.search(exclude, workflowname, flags=re.IGNORECASE):
520                 continue
521
522             yield {
523                 "Project": projects.get(container_request["owner_uuid"], "unknown owner"),
524                 "ProjectUUID": container_request["owner_uuid"],
525                 "Workflow": workflowname,
526                 "WorkflowUUID": container_request["properties"].get("template_uuid", "none"),
527                 "Step": "workflow runner",
528                 "StepUUID": container_request["uuid"],
529                 "Sample": container_request["name"],
530                 "SampleUUID": container_request["uuid"],
531                 "User": projects.get(container_request["modified_by_user_uuid"], "unknown user"),
532                 "UserUUID": container_request["modified_by_user_uuid"],
533                 "Submitted": csv_dateformat(container_request["created_at"]),
534                 "Started": csv_dateformat(containers[container_request["container_uuid"]]["started_at"]),
535                 "Finished": csv_dateformat(containers[container_request["container_uuid"]]["finished_at"]),
536                 "Runtime": runtime_str(container_request, containers),
537                 "Cost": round(containers[container_request["container_uuid"]]["cost"] if include_steps else container_request["cumulative_cost"], 3),
538                 "CumulativeCost": round(container_request["cumulative_cost"], 3)
539                 }
540
541             if include_steps:
542                 for child_cr in child_crs.get(container_request["container_uuid"], []):
543                     if not child_cr["container_uuid"] or not containers[child_cr["container_uuid"]]["started_at"] or not containers[child_cr["container_uuid"]]["finished_at"]:
544                         continue
545                     yield {
546                         "Project": projects.get(container_request["owner_uuid"], "unknown owner"),
547                         "ProjectUUID": container_request["owner_uuid"],
548                         "Workflow": workflows.get(container_request["properties"].get("template_uuid", "none"), "workflow missing"),
549                         "WorkflowUUID": container_request["properties"].get("template_uuid", "none"),
550                         "Step": child_cr["name"],
551                         "StepUUID": child_cr["uuid"],
552                         "Sample": container_request["name"],
553                         "SampleUUID": container_request["name"],
554                         "User": projects.get(container_request["modified_by_user_uuid"], "unknown user"),
555                         "UserUUID": container_request["modified_by_user_uuid"],
556                         "Submitted": csv_dateformat(child_cr["created_at"]),
557                         "Started": csv_dateformat(containers[child_cr["container_uuid"]]["started_at"]),
558                         "Finished": csv_dateformat(containers[child_cr["container_uuid"]]["finished_at"]),
559                         "Runtime": runtime_str(child_cr, containers),
560                         "Cost": round(containers[child_cr["container_uuid"]]["cost"], 3),
561                         "CumulativeCost": round(containers[child_cr["container_uuid"]]["cost"], 3),
562                         }
563
564
565     def collect_summary_stats(self, row):
566         self.active_users.add(row["User"])
567         self.project_summary.setdefault(row["ProjectUUID"],
568                                         ProjectSummary(users=set(),
569                                                        runs={},
570                                                        uuid=row["ProjectUUID"],
571                                                        name=row["Project"]))
572         prj = self.project_summary[row["ProjectUUID"]]
573         cost = row["Cost"]
574         prj.cost += cost
575         prj.count += 1
576         prj.users.add(row["User"])
577         hrs = runtime_in_hours(row["Runtime"])
578         prj.hours += hrs
579
580         started = datetime.strptime(row["Started"], "%Y-%m-%d %H:%M:%S")
581         finished = datetime.strptime(row["Finished"], "%Y-%m-%d %H:%M:%S")
582
583         if started < prj.earliest:
584             prj.earliest = started
585
586         if finished > prj.latest:
587             prj.latest = finished
588
589         if row["Step"] == "workflow runner":
590             prj.runs.setdefault(row["Workflow"], WorkflowRunSummary(name=row["Workflow"],
591                                                                     uuid=row["WorkflowUUID"],
592                                                                     cost=[], hours=[]))
593             wfuuid = row["Workflow"]
594             prj.runs[wfuuid].count += 1
595             prj.runs[wfuuid].cost.append(row["CumulativeCost"])
596             prj.runs[wfuuid].hours.append(hrs)
597             self.total_workflows += 1
598
599         self.total_hours += hrs
600         self.total_cost += cost
601
602     def report_from_api(self, since, to, include_steps, exclude):
603         pending = []
604
605         count = 0
606         for container_request in arvados.util.keyset_list_all(
607                 self.arv_client.container_requests().list,
608                 filters=[
609                     ["command", "like", "[\"arvados-cwl-runner%"],
610                     ["created_at", ">=", since.strftime("%Y%m%dT%H%M%SZ")],
611                     ["created_at", "<=", to.strftime("%Y%m%dT%H%M%SZ")],
612                 ],
613                 select=["uuid", "owner_uuid", "container_uuid", "name", "cumulative_cost", "properties", "modified_by_user_uuid", "created_at"]):
614
615             if container_request["cumulative_cost"] == 0:
616                 continue
617
618             # Every 1000 container requests, we fetch the
619             # corresponding container records.
620             #
621             # What's so special about 1000?  Because that's the
622             # maximum Arvados page size, so when we use ['uuid', 'in',
623             # [...]] to fetch associated records it doesn't make sense
624             # to provide more than 1000 uuids.
625             #
626             # TODO: use the ?include=container_uuid feature so a
627             # separate request to the containers table isn't necessary.
628             if len(pending) < 1000:
629                 pending.append(container_request)
630             else:
631                 count += len(pending)
632                 logging.info("Exporting workflow runs %s - %s", count-len(pending), count)
633                 for row in self.iter_container_info(pending, include_steps, exclude):
634                     self.collect_summary_stats(row)
635                     yield row
636                 pending.clear()
637
638         count += len(pending)
639         logging.info("Exporting workflow runs %s - %s", count-len(pending), count)
640         for row in self.iter_container_info(pending, include_steps, exclude):
641             self.collect_summary_stats(row)
642             yield row
643
644         userinfo = self.arv_client.users().list(filters=[["is_active", "=", True]], limit=0).execute()
645         self.total_users = userinfo["items_available"]
646
647         groupinfo = self.arv_client.groups().list(filters=[["group_class", "=", "project"]], limit=0).execute()
648         self.total_projects = groupinfo["items_available"]
649
650     def csv_report(self, since, to, out, include_steps, columns, exclude):
651         if columns:
652             columns = columns.split(",")
653         else:
654             if include_steps:
655                 columns = ("Project", "Workflow", "Step", "Sample", "User", "Submitted", "Runtime", "Cost")
656             else:
657                 columns = ("Project", "Workflow", "Sample", "User", "Submitted", "Runtime", "Cost")
658
659         csvwriter = csv.DictWriter(out, fieldnames=columns, extrasaction="ignore")
660         csvwriter.writeheader()
661
662         for row in self.report_from_api(since, to, include_steps, exclude):
663             csvwriter.writerow(row)
664
665         self.summary_fetched = True
666
667     def today(self):
668         return date.today()