22274: Only generate a chart card when chart data is available
[arvados.git] / tools / cluster-activity / arvados_cluster_activity / report.py
1 #!/usr/bin/env python3
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 import logging
7 import ciso8601
8 import arvados.util
9 import re
10 import csv
11 import math
12 import collections
13 import json
14 from datetime import date, datetime, timedelta
15 from typing import Dict, List
16 import statistics
17
18 from dataclasses import dataclass
19 from arvados_cluster_activity.prometheus import get_metric_usage, get_data_usage
20 from arvados_cluster_activity.reportchart import ReportChart
21
22
23 @dataclass
24 class WorkflowRunSummary:
25     name: str
26     uuid: str
27     cost: List[float]
28     hours: List[float]
29     count: int = 0
30
31
32 @dataclass
33 class ProjectSummary:
34     users: set
35     uuid: str
36     runs: Dict[str, WorkflowRunSummary]
37     earliest: datetime = datetime(year=9999, month=1, day=1)
38     latest: datetime = datetime(year=1900, month=1, day=1)
39     name: str = ""
40     cost: float = 0
41     count: int = 0
42     hours: float = 0
43     activityspan: str = ""
44     tablerow: str = ""
45
46
47 def aws_monthly_cost(value):
48     value_gb = value / (1024*1024*1024)
49     first_50tb = min(1024*50, value_gb)
50     next_450tb = max(min(1024*450, value_gb-1024*50), 0)
51     over_500tb = max(value_gb-1024*500, 0)
52
53     monthly_cost = (first_50tb * 0.023) + (next_450tb * 0.022) + (over_500tb * 0.021)
54     return monthly_cost
55
56
57 def format_with_suffix_base2(summary_value):
58     for scale in ["KiB", "MiB", "GiB", "TiB", "PiB", "EiB"]:
59         summary_value = summary_value / 1024
60         if summary_value < 1024:
61             return "%.3f %s" % (summary_value, scale)
62
63 def format_with_suffix_base10(summary_value):
64     for scale in ["KB", "MB", "GB", "TB", "PB", "EB"]:
65         summary_value = summary_value / 1000
66         if summary_value < 1000:
67             return "%.3f %s" % (summary_value, scale)
68
69 containers_graph = ('Concurrent running containers', 'containers')
70 storage_graph = ('Storage usage', 'used')
71 managed_graph = ('Data under management', 'managed')
72
73
74 def runtime_str(container_request, containers):
75     length = ciso8601.parse_datetime(containers[container_request["container_uuid"]]["finished_at"]) - ciso8601.parse_datetime(containers[container_request["container_uuid"]]["started_at"])
76
77     hours = length.days * 24 + (length.seconds // 3600)
78     minutes = (length.seconds // 60) % 60
79     seconds = length.seconds % 60
80
81     return "%i:%02i:%02i" % (hours, minutes, seconds)
82
83 def runtime_in_hours(runtime):
84     sp = runtime.split(":")
85     hours = float(sp[0])
86     hours += float(sp[1]) / 60
87     hours += float(sp[2]) / 3600
88     return hours
89
90 def hours_to_runtime_str(frac_hours):
91     hours = math.floor(frac_hours)
92     minutes = (frac_hours - math.floor(frac_hours)) * 60.0
93     seconds = (minutes - math.floor(minutes)) * 60.0
94
95     return "%i:%02i:%02i" % (hours, minutes, seconds)
96
97
98 def csv_dateformat(datestr):
99     dt = ciso8601.parse_datetime(datestr)
100     return dt.strftime("%Y-%m-%d %H:%M:%S")
101
102
103 class ClusterActivityReport(object):
104     def __init__(self, prom_client):
105         self.arv_client = arvados.api()
106         self.prom_client = prom_client
107         self.cluster = self.arv_client.config()["ClusterID"]
108
109         self.active_users = set()
110         self.project_summary = {}
111         self.total_hours = 0
112         self.total_cost = 0
113         self.total_workflows = 0
114         self.storage_cost = 0
115         self.summary_fetched = False
116         self.graphs = {}
117
118     def collect_graph(self, since, to, metric, resample_to, extra=None):
119         if not self.prom_client:
120             return
121
122         flatdata = []
123
124         for series in get_metric_usage(self.prom_client, since, to, metric % self.cluster, resampleTo=resample_to):
125             for t in series.itertuples():
126                 flatdata.append([t[0], t[1]])
127                 if extra:
128                     extra(t[0], t[1])
129
130         return flatdata
131
132     def collect_storage_cost(self, timestamp, value):
133         self.storage_cost += aws_monthly_cost(value) / (30*24)
134
135     def html_report(self, since, to, exclude, include_workflow_steps):
136         """Get a cluster activity report for the desired time period,
137         returning a string containing the report as an HTML document."""
138
139         self.label = "Cluster report for %s from %s to %s" % (self.cluster, since.date(), to.date())
140
141         if not self.summary_fetched:
142             # If we haven't done it already, need to fetch everything
143             # from the API to collect summary stats (report_from_api
144             # calls collect_summary_stats on each row).
145             #
146             # Because it is a Python generator, we need call it in a
147             # loop to process all the rows.  This method also yields
148             # each row which is used by a different function to create
149             # the CSV report, but for the HTML report we just discard
150             # them.
151             for row in self.report_from_api(since, to, include_workflow_steps, exclude):
152                 pass
153
154         container_cumulative_hours = 0
155         def collect_container_hours(timestamp, value):
156             nonlocal container_cumulative_hours
157             # resampled to 5 minute increments but we want
158             # a sum of hours
159             container_cumulative_hours += value / 12
160
161         logging.info("Getting container hours time series")
162
163         self.graphs[containers_graph] = self.collect_graph(since, to,
164                            "arvados_dispatchcloud_containers_running{cluster='%s'}",
165                            resample_to="5min",
166                            extra=collect_container_hours
167                            )
168
169         logging.info("Getting data usage time series")
170         self.graphs[managed_graph] = self.collect_graph(since, to,
171                            "arvados_keep_collection_bytes{cluster='%s'}", resample_to="60min")
172
173         self.graphs[storage_graph] = self.collect_graph(since, to,
174                            "arvados_keep_total_bytes{cluster='%s'}", resample_to="60min",
175                                                         extra=self.collect_storage_cost)
176
177         label = self.label
178
179         cards = []
180
181         workbench = self.arv_client.config()["Services"]["Workbench2"]["ExternalURL"]
182         if workbench.endswith("/"):
183             workbench = workbench[:-1]
184
185         if to.date() == self.today():
186             # The deduplication ratio overstates things a bit, you can
187             # have collections which reference a small slice of a large
188             # block, and this messes up the intuitive value of this ratio
189             # and exagerates the effect.
190             #
191             # So for now, as much fun as this is, I'm excluding it from
192             # the report.
193             #
194             # dedup_savings = aws_monthly_cost(managed_data_now) - storage_cost
195             # <tr><th>Monthly savings from storage deduplication</th> <td>${dedup_savings:,.2f}</td></tr>
196
197             data_rows = ""
198             if self.graphs[managed_graph] and self.graphs[storage_graph]:
199                 managed_data_now = self.graphs[managed_graph][-1][1]
200                 storage_used_now = self.graphs[storage_graph][-1][1]
201                 data_rows = """
202             <tr><th>Total data under management</th> <td>{managed_data_now}</td></tr>
203             <tr><th>Total storage usage</th> <td>{storage_used_now}</td></tr>
204             <tr><th>Deduplication ratio</th> <td>{dedup_ratio:.1f}</td></tr>
205             <tr><th>Approximate monthly storage cost</th> <td>${storage_cost:,.2f}</td></tr>
206                 """.format(
207                        managed_data_now=format_with_suffix_base10(managed_data_now),
208                        storage_used_now=format_with_suffix_base10(storage_used_now),
209                        storage_cost=aws_monthly_cost(storage_used_now),
210                        dedup_ratio=managed_data_now / storage_used_now,
211                 )
212
213             cards.append("""<h2>Cluster status as of {now}</h2>
214             <table class='aggtable'><tbody>
215             <tr><th><a href="{workbench}/users">Total users</a></th><td>{total_users}</td></tr>
216             <tr><th>Total projects</th><td>{total_projects}</td></tr>
217             {data_rows}
218             </tbody></table>
219             <p>See <a href="#prices">note on usage and cost calculations</a> for details on how costs are calculated.</p>
220             """.format(now=self.today(),
221                        total_users=self.total_users,
222                        total_projects=self.total_projects,
223                        workbench=workbench,
224                        data_rows=data_rows))
225
226         # We have a couple of options for getting total container hours
227         #
228         # total_hours=container_cumulative_hours
229         #
230         # calculates the sum from prometheus metrics
231         #
232         # total_hours=self.total_hours
233         #
234         # calculates the sum of the containers that were fetched
235         #
236         # The problem is these numbers tend not to match, especially
237         # if the report generation was not called with "include
238         # workflow steps".
239         #
240         # I decided to use the sum from containers fetched, because it
241         # will match the sum of compute time for each project listed
242         # in the report.
243
244         cards.append("""<h2>Activity and cost over the {reporting_days} day period {since} to {to}</h2>
245         <table class='aggtable'><tbody>
246         <tr><th>Active users</th> <td>{active_users}</td></tr>
247         <tr><th><a href="#Active_Projects">Active projects</a></th> <td>{active_projects}</td></tr>
248         <tr><th>Workflow runs</th> <td>{total_workflows:,}</td></tr>
249         <tr><th>Compute used</th> <td>{total_hours:,.1f} hours</td></tr>
250         <tr><th>Compute cost</th> <td>${total_cost:,.2f}</td></tr>
251         <tr><th>Storage cost</th> <td>${storage_cost:,.2f}</td></tr>
252         </tbody></table>
253         <p>See <a href="#prices">note on usage and cost calculations</a> for details on how costs are calculated.</p>
254         """.format(active_users=len(self.active_users),
255                    total_users=self.total_users,
256                    total_hours=self.total_hours,
257                    total_cost=self.total_cost,
258                    total_workflows=self.total_workflows,
259                    active_projects=len(self.project_summary),
260                    since=since.date(), to=to.date(),
261                    reporting_days=(to - since).days,
262                    storage_cost=self.storage_cost))
263
264         projectlist = sorted(self.project_summary.items(), key=lambda x: x[1].cost, reverse=True)
265
266         for k, prj in projectlist:
267             if prj.earliest.date() == prj.latest.date():
268                 prj.activityspan = "{}".format(prj.earliest.date())
269             else:
270                 prj.activityspan = "{} to {}".format(prj.earliest.date(), prj.latest.date())
271
272             prj.tablerow = """<td>{users}</td> <td>{active}</td> <td>{hours:,.1f}</td> <td>${cost:,.2f}</td>""".format(
273                 active=prj.activityspan,
274                 cost=prj.cost,
275                 hours=prj.hours,
276                 users=", ".join(prj.users),
277             )
278
279         if any(self.graphs.values()):
280             cards.append("""
281                 <div id="chart"></div>
282             """)
283
284         cards.append(
285             """
286             <a id="Active_Projects"><h2>Active Projects</h2></a>
287             <table class='sortable active-projects'>
288             <thead><tr><th>Project</th> <th>Users</th> <th>Active</th> <th>Compute usage (hours)</th> <th>Compute cost</th> </tr></thead>
289             <tbody><tr>{projects}</tr></tbody>
290             </table>
291             <p>See <a href="#prices">note on usage and cost calculations</a> for details on how costs are calculated.</p>
292             """.format(projects="</tr>\n<tr>".join("""<td><a href="#{name}">{name}</a></td>{rest}""".format(name=prj.name, rest=prj.tablerow) for k, prj in projectlist)))
293
294         for k, prj in projectlist:
295             wfsum = []
296             for k2, r in sorted(prj.runs.items(), key=lambda x: x[1].count, reverse=True):
297                 wfsum.append("""
298                 <tr><td>{count}</td> <td>{workflowlink}</td> <td>{median_runtime}</td> <td>{mean_runtime}</td> <td>${median_cost:,.2f}</td> <td>${mean_cost:,.2f}</td> <td>${totalcost:,.2f}</td></tr>
299                 """.format(
300                     count=r.count,
301                     mean_runtime=hours_to_runtime_str(statistics.mean(r.hours)),
302                     median_runtime=hours_to_runtime_str(statistics.median(r.hours)),
303                     mean_cost=statistics.mean(r.cost),
304                     median_cost=statistics.median(r.cost),
305                     totalcost=sum(r.cost),
306                     workflowlink="""<a href="{workbench}/workflows/{uuid}">{name}</a>""".format(workbench=workbench,uuid=r.uuid,name=r.name)
307                     if r.uuid != "none" else r.name))
308
309             cards.append(
310                 """<a id="{name}"></a><a href="{workbench}/projects/{uuid}"><h2>{name}</h2></a>
311
312                 <table class='sortable single-project'>
313                 <thead><tr> <th>Users</th> <th>Active</th> <th>Compute usage (hours)</th> <th>Compute cost</th> </tr></thead>
314                 <tbody><tr>{projectrow}</tr></tbody>
315                 </table>
316
317                 <table class='sortable project'>
318                 <thead><tr><th>Workflow run count</th> <th>Workflow name</th> <th>Median runtime</th> <th>Mean runtime</th> <th>Median cost per run</th> <th>Mean cost per run</th> <th>Sum cost over runs</th></tr></thead>
319                 <tbody>
320                 {wfsum}
321                 </tbody></table>
322                 """.format(name=prj.name,
323                            wfsum=" ".join(wfsum),
324                            projectrow=prj.tablerow,
325                            workbench=workbench,
326                            uuid=prj.uuid)
327             )
328
329         # The deduplication ratio overstates things a bit, you can
330         # have collections which reference a small slice of a large
331         # block, and this messes up the intuitive value of this ratio
332         # and exagerates the effect.
333         #
334         # So for now, as much fun as this is, I'm excluding it from
335         # the report.
336         #
337         # <p>"Monthly savings from storage deduplication" is the
338         # estimated cost difference between "storage usage" and "data
339         # under management" as a way of comparing with other
340         # technologies that do not support data deduplication.</p>
341
342
343         cards.append("""
344         <h2 id="prices">Note on usage and cost calculations</h2>
345
346         <div style="max-width: 60em">
347
348         <p>The numbers presented in this report are estimates and will
349         not perfectly match your cloud bill.  Nevertheless this report
350         should be useful for identifying your main cost drivers.</p>
351
352         <h3>Storage</h3>
353
354         <p>"Total data under management" is what you get if you add up
355         all blocks referenced by all collections in Workbench, without
356         considering deduplication.</p>
357
358         <p>"Total storage usage" is the actual underlying storage
359         usage, accounting for data deduplication.</p>
360
361         <p>Storage costs are based on AWS "S3 Standard"
362         described on the <a href="https://aws.amazon.com/s3/pricing/">Amazon S3 pricing</a> page:</p>
363
364         <ul>
365         <li>$0.023 per GB / Month for the first 50 TB</li>
366         <li>$0.022 per GB / Month for the next 450 TB</li>
367         <li>$0.021 per GB / Month over 500 TB</li>
368         </ul>
369
370         <p>Finally, this only the base storage cost, and does not
371         include any fees associated with S3 API usage.  However, there
372         are generally no ingress/egress fees if your Arvados instance
373         and S3 bucket are in the same region, which is the normal
374         recommended configuration.</p>
375
376         <h3>Compute</h3>
377
378         <p>"Compute usage" are instance-hours used in running
379         workflows.  Because multiple steps may run in parallel on
380         multiple instances, a workflow that completes in four hours
381         but runs parallel steps on five instances, would be reported
382         as using 20 instance hours.</p>
383
384         <p>"Runtime" is the actual wall clock time that it took to
385         complete a workflow.  This does not include time spent in the
386         queue for the workflow itself, but does include queuing time
387         of individual workflow steps.</p>
388
389         <p>Computational costs are derived from Arvados cost
390         calculations of container runs.  For on-demand instances, this
391         uses the prices from the InstanceTypes section of the Arvado
392         config file, set by the system administrator.  For spot
393         instances, this uses current spot prices retrieved on the fly
394         the AWS API.</p>
395
396         <p>Be aware that the cost calculations are only for the time
397         the container is running and only do not take into account the
398         overhead of launching instances or idle time between scheduled
399         tasks or prior to automatic shutdown.</p>
400
401         </div>
402         """)
403
404         return ReportChart(label, cards, self.graphs).html()
405
406     def iter_container_info(self, pending, include_steps, exclude):
407         # "pending" is a list of arvados-cwl-runner container requests
408         # returned by the API.  This method fetches detailed
409         # information about the runs and yields report rows.
410
411         # 1. Get container records corresponding to container requests.
412         containers = {}
413
414         for container in arvados.util.keyset_list_all(
415             self.arv_client.containers().list,
416             filters=[
417                 ["uuid", "in", [c["container_uuid"] for c in pending if c["container_uuid"]]],
418             ],
419             select=["uuid", "started_at", "finished_at", "cost"]):
420
421             containers[container["uuid"]] = container
422
423         # 2. Look for the template_uuid property and fetch the
424         # corresponding workflow record.
425         workflows = {}
426         workflows["none"] = "workflow run from command line"
427
428         for wf in arvados.util.keyset_list_all(
429                 self.arv_client.workflows().list,
430                 filters=[
431                     ["uuid", "in", list(set(c["properties"]["template_uuid"]
432                                             for c in pending
433                                             if "template_uuid" in c["properties"] and c["properties"]["template_uuid"].startswith(self.arv_client.config()["ClusterID"])))],
434                 ],
435                 select=["uuid", "name"]):
436             workflows[wf["uuid"]] = wf["name"]
437
438         # 3. Look at owner_uuid and fetch owning projects and users
439         projects = {}
440
441         for pr in arvados.util.keyset_list_all(
442                 self.arv_client.groups().list,
443                 filters=[
444                     ["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'j7d0g'))],
445                 ],
446                 select=["uuid", "name"]):
447             projects[pr["uuid"]] = pr["name"]
448
449         # 4. Look at owner_uuid and modified_by_user_uuid and get user records
450         for pr in arvados.util.keyset_list_all(
451                 self.arv_client.users().list,
452                 filters=[
453                     ["uuid", "in", list(set(c["owner_uuid"] for c in pending if c["owner_uuid"][6:11] == 'tpzed')|set(c["modified_by_user_uuid"] for c in pending))],
454                 ],
455                 select=["uuid", "full_name", "first_name", "last_name"]):
456             projects[pr["uuid"]] = pr["full_name"]
457
458         # 5. Optionally iterate over individual workflow steps.
459         if include_steps:
460             name_regex = re.compile(r"(.+)_[0-9]+")
461             child_crs = {}
462             child_cr_containers = set()
463             stepcount = 0
464
465             # 5.1. Go through the container requests owned by the toplevel workflow container
466             logging.info("Getting workflow steps")
467             for cr in arvados.util.keyset_list_all(
468                 self.arv_client.container_requests().list,
469                 filters=[
470                     ["requesting_container_uuid", "in", list(containers.keys())],
471                 ],
472                 select=["uuid", "name", "cumulative_cost", "requesting_container_uuid", "container_uuid"]):
473
474                 if cr["cumulative_cost"] == 0:
475                     continue
476
477                 g = name_regex.fullmatch(cr["name"])
478                 if g:
479                     cr["name"] = g[1]
480
481                 # 5.2. Get the containers corresponding to the
482                 # container requests.  This has the same logic as
483                 # report_from_api where we batch it into 1000 items at
484                 # a time.
485                 child_crs.setdefault(cr["requesting_container_uuid"], []).append(cr)
486                 child_cr_containers.add(cr["container_uuid"])
487                 if len(child_cr_containers) == 1000:
488                     stepcount += len(child_cr_containers)
489                     for container in arvados.util.keyset_list_all(
490                             self.arv_client.containers().list,
491                             filters=[
492                                 ["uuid", "in", list(child_cr_containers)],
493                             ],
494                             select=["uuid", "started_at", "finished_at", "cost"]):
495
496                         containers[container["uuid"]] = container
497
498                     logging.info("Got workflow steps %s - %s", stepcount-len(child_cr_containers), stepcount)
499                     child_cr_containers.clear()
500
501             # Get any remaining containers
502             if child_cr_containers:
503                 stepcount += len(child_cr_containers)
504                 for container in arvados.util.keyset_list_all(
505                         self.arv_client.containers().list,
506                         filters=[
507                             ["uuid", "in", list(child_cr_containers)],
508                         ],
509                         select=["uuid", "started_at", "finished_at", "cost"]):
510
511                     containers[container["uuid"]] = container
512                 logging.info("Got workflow steps %s - %s", stepcount-len(child_cr_containers), stepcount)
513
514         # 6. Now go through the list of workflow runs, yield a row
515         # with all the information we have collected, as well as the
516         # details for each workflow step (if enabled)
517         for container_request in pending:
518             if not container_request["container_uuid"] or not containers[container_request["container_uuid"]]["started_at"] or not containers[container_request["container_uuid"]]["finished_at"]:
519                 continue
520
521             template_uuid = container_request["properties"].get("template_uuid", "none")
522             workflowname = container_request["name"] if template_uuid == "none" else workflows.get(template_uuid, template_uuid)
523
524             if exclude and re.search(exclude, workflowname, flags=re.IGNORECASE):
525                 continue
526
527             yield {
528                 "Project": projects.get(container_request["owner_uuid"], "unknown owner"),
529                 "ProjectUUID": container_request["owner_uuid"],
530                 "Workflow": workflowname,
531                 "WorkflowUUID": container_request["properties"].get("template_uuid", "none"),
532                 "Step": "workflow runner",
533                 "StepUUID": container_request["uuid"],
534                 "Sample": container_request["name"],
535                 "SampleUUID": container_request["uuid"],
536                 "User": projects.get(container_request["modified_by_user_uuid"], "unknown user"),
537                 "UserUUID": container_request["modified_by_user_uuid"],
538                 "Submitted": csv_dateformat(container_request["created_at"]),
539                 "Started": csv_dateformat(containers[container_request["container_uuid"]]["started_at"]),
540                 "Finished": csv_dateformat(containers[container_request["container_uuid"]]["finished_at"]),
541                 "Runtime": runtime_str(container_request, containers),
542                 "Cost": round(containers[container_request["container_uuid"]]["cost"] if include_steps else container_request["cumulative_cost"], 3),
543                 "CumulativeCost": round(container_request["cumulative_cost"], 3)
544                 }
545
546             if include_steps:
547                 for child_cr in child_crs.get(container_request["container_uuid"], []):
548                     if not child_cr["container_uuid"] or not containers[child_cr["container_uuid"]]["started_at"] or not containers[child_cr["container_uuid"]]["finished_at"]:
549                         continue
550                     yield {
551                         "Project": projects.get(container_request["owner_uuid"], "unknown owner"),
552                         "ProjectUUID": container_request["owner_uuid"],
553                         "Workflow": workflows.get(container_request["properties"].get("template_uuid", "none"), "workflow missing"),
554                         "WorkflowUUID": container_request["properties"].get("template_uuid", "none"),
555                         "Step": child_cr["name"],
556                         "StepUUID": child_cr["uuid"],
557                         "Sample": container_request["name"],
558                         "SampleUUID": container_request["name"],
559                         "User": projects.get(container_request["modified_by_user_uuid"], "unknown user"),
560                         "UserUUID": container_request["modified_by_user_uuid"],
561                         "Submitted": csv_dateformat(child_cr["created_at"]),
562                         "Started": csv_dateformat(containers[child_cr["container_uuid"]]["started_at"]),
563                         "Finished": csv_dateformat(containers[child_cr["container_uuid"]]["finished_at"]),
564                         "Runtime": runtime_str(child_cr, containers),
565                         "Cost": round(containers[child_cr["container_uuid"]]["cost"], 3),
566                         "CumulativeCost": round(containers[child_cr["container_uuid"]]["cost"], 3),
567                         }
568
569
570     def collect_summary_stats(self, row):
571         self.active_users.add(row["User"])
572         self.project_summary.setdefault(row["ProjectUUID"],
573                                         ProjectSummary(users=set(),
574                                                        runs={},
575                                                        uuid=row["ProjectUUID"],
576                                                        name=row["Project"]))
577         prj = self.project_summary[row["ProjectUUID"]]
578         cost = row["Cost"]
579         prj.cost += cost
580         prj.count += 1
581         prj.users.add(row["User"])
582         hrs = runtime_in_hours(row["Runtime"])
583         prj.hours += hrs
584
585         started = datetime.strptime(row["Started"], "%Y-%m-%d %H:%M:%S")
586         finished = datetime.strptime(row["Finished"], "%Y-%m-%d %H:%M:%S")
587
588         if started < prj.earliest:
589             prj.earliest = started
590
591         if finished > prj.latest:
592             prj.latest = finished
593
594         if row["Step"] == "workflow runner":
595             prj.runs.setdefault(row["Workflow"], WorkflowRunSummary(name=row["Workflow"],
596                                                                     uuid=row["WorkflowUUID"],
597                                                                     cost=[], hours=[]))
598             wfuuid = row["Workflow"]
599             prj.runs[wfuuid].count += 1
600             prj.runs[wfuuid].cost.append(row["CumulativeCost"])
601             prj.runs[wfuuid].hours.append(hrs)
602             self.total_workflows += 1
603
604         self.total_hours += hrs
605         self.total_cost += cost
606
607     def report_from_api(self, since, to, include_steps, exclude):
608         pending = []
609
610         count = 0
611         for container_request in arvados.util.keyset_list_all(
612                 self.arv_client.container_requests().list,
613                 filters=[
614                     ["command", "like", "[\"arvados-cwl-runner%"],
615                     ["created_at", ">=", since.strftime("%Y%m%dT%H%M%SZ")],
616                     ["created_at", "<=", to.strftime("%Y%m%dT%H%M%SZ")],
617                 ],
618                 select=["uuid", "owner_uuid", "container_uuid", "name", "cumulative_cost", "properties", "modified_by_user_uuid", "created_at"]):
619
620             if container_request["cumulative_cost"] == 0:
621                 continue
622
623             # Every 1000 container requests, we fetch the
624             # corresponding container records.
625             #
626             # What's so special about 1000?  Because that's the
627             # maximum Arvados page size, so when we use ['uuid', 'in',
628             # [...]] to fetch associated records it doesn't make sense
629             # to provide more than 1000 uuids.
630             #
631             # TODO: use the ?include=container_uuid feature so a
632             # separate request to the containers table isn't necessary.
633             if len(pending) < 1000:
634                 pending.append(container_request)
635             else:
636                 count += len(pending)
637                 logging.info("Exporting workflow runs %s - %s", count-len(pending), count)
638                 for row in self.iter_container_info(pending, include_steps, exclude):
639                     self.collect_summary_stats(row)
640                     yield row
641                 pending.clear()
642
643         count += len(pending)
644         logging.info("Exporting workflow runs %s - %s", count-len(pending), count)
645         for row in self.iter_container_info(pending, include_steps, exclude):
646             self.collect_summary_stats(row)
647             yield row
648
649         userinfo = self.arv_client.users().list(filters=[["is_active", "=", True]], limit=0).execute()
650         self.total_users = userinfo["items_available"]
651
652         groupinfo = self.arv_client.groups().list(filters=[["group_class", "=", "project"]], limit=0).execute()
653         self.total_projects = groupinfo["items_available"]
654
655     def csv_report(self, since, to, out, include_steps, columns, exclude):
656         if columns:
657             columns = columns.split(",")
658         else:
659             if include_steps:
660                 columns = ("Project", "Workflow", "Step", "Sample", "User", "Submitted", "Runtime", "Cost")
661             else:
662                 columns = ("Project", "Workflow", "Sample", "User", "Submitted", "Runtime", "Cost")
663
664         csvwriter = csv.DictWriter(out, fieldnames=columns, extrasaction="ignore")
665         csvwriter.writeheader()
666
667         for row in self.report_from_api(since, to, include_steps, exclude):
668             csvwriter.writerow(row)
669
670         self.summary_fetched = True
671
672     def today(self):
673         return date.today()