Initial commit of the costanalyzer code.
[arvados-dev.git] / costanalyzer / costanalyzer.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "encoding/json"
10         "errors"
11         "flag"
12         "fmt"
13         "git.arvados.org/arvados.git/sdk/go/arvados"
14         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
15         "git.arvados.org/arvados.git/sdk/go/keepclient"
16         "io/ioutil"
17         "log"
18         "net/http"
19         "os"
20         "strconv"
21         "strings"
22         "time"
23 )
24
25 // Dict is a helper type so we don't have to write out 'map[string]interface{}' every time.
26 type Dict map[string]interface{}
27
28 // LegacyNodeInfo is a struct for records created by Arvados Node Manager (Arvados <= 1.4.3)
29 // Example:
30 // {
31 //    "total_cpu_cores":2,
32 //    "total_scratch_mb":33770,
33 //    "cloud_node":
34 //      {
35 //        "price":0.1,
36 //        "size":"m4.large"
37 //      },
38 //     "total_ram_mb":7986
39 // }
40 type LegacyNodeInfo struct {
41         CPUCores  int64           `json:"total_cpu_cores"`
42         ScratchMb int64           `json:"total_scratch_mb"`
43         RAMMb     int64           `json:"total_ram_mb"`
44         CloudNode LegacyCloudNode `json:"cloud_node"`
45 }
46
47 // LegacyCloudNode is a struct for records created by Arvados Node Manager (Arvados <= 1.4.3)
48 type LegacyCloudNode struct {
49         Price float64 `json:"price"`
50         Size  string  `json:"size"`
51 }
52
53 // Node is a struct for records created by Arvados Dispatch Cloud (Arvados >= 2.0.0)
54 // Example:
55 // {
56 //    "Name": "Standard_D1_v2",
57 //    "ProviderType": "Standard_D1_v2",
58 //    "VCPUs": 1,
59 //    "RAM": 3584000000,
60 //    "Scratch": 50000000000,
61 //    "IncludedScratch": 50000000000,
62 //    "AddedScratch": 0,
63 //    "Price": 0.057,
64 //    "Preemptible": false
65 //}
66 type Node struct {
67         VCPUs        int64
68         Scratch      int64
69         RAM          int64
70         Price        float64
71         Name         string
72         ProviderType string
73         Preemptible  bool
74 }
75
76 type report struct {
77         Type string
78         Msg  string
79 }
80
81 type arrayFlags []string
82
83 func (i *arrayFlags) String() string {
84         return ""
85 }
86
87 func (i *arrayFlags) Set(value string) error {
88         *i = append(*i, value)
89         return nil
90 }
91
92 func logError(m []string) {
93         log.Print(string(marshal(report{"Error", strings.Join(m, " ")})))
94 }
95
96 func marshal(message interface{}) (encoded []byte) {
97         encoded, err := json.Marshal(message)
98         if err != nil {
99                 // do not call logError here because that would create an infinite loop
100                 fmt.Fprintln(os.Stderr, "{\"Error\": \"Unable to marshal message into json:", message, "\"}")
101                 return nil
102         }
103         return
104 }
105
106 func parseFlags() (uuids arrayFlags) {
107
108         flags := flag.NewFlagSet("cost-analyzer", flag.ExitOnError)
109         flags.Var(&uuids, "uuid", "Toplevel project or container request uuid. May be specified more than once.")
110
111         flags.Usage = func() { usage(flags) }
112
113         // Parse args; omit the first arg which is the command name
114         err := flags.Parse(os.Args[1:])
115         if err != nil {
116                 logError([]string{"Unable to parse command line arguments:", err.Error()})
117                 os.Exit(1)
118         }
119
120         if len(uuids) == 0 {
121                 usage(flags)
122                 os.Exit(1)
123         }
124
125         return
126 }
127
128 func ensureDirectory(dir string) {
129         statData, err := os.Stat(dir)
130         if os.IsNotExist(err) {
131                 err = os.MkdirAll(dir, 0700)
132                 if err != nil {
133                         logError([]string{"Error creating directory", dir, ":", err.Error()})
134                         os.Exit(1)
135                 }
136         } else {
137                 if !statData.IsDir() {
138                         logError([]string{"The path", dir, "is not a directory"})
139                         os.Exit(1)
140                 }
141         }
142 }
143
144 func addContainerLine(node interface{}, cr Dict, container Dict) (csv string, cost float64) {
145         csv = cr["uuid"].(string) + ","
146         csv += cr["name"].(string) + ","
147         csv += container["uuid"].(string) + ","
148         csv += container["state"].(string) + ","
149         if container["started_at"] != nil {
150                 csv += container["started_at"].(string) + ","
151         } else {
152                 csv += ","
153         }
154
155         var delta time.Duration
156         if container["finished_at"] != nil {
157                 csv += container["finished_at"].(string) + ","
158                 finishedTimestamp, err := time.Parse("2006-01-02T15:04:05.000000000Z", container["finished_at"].(string))
159                 if err != nil {
160                         fmt.Println(err)
161                 }
162                 startedTimestamp, err := time.Parse("2006-01-02T15:04:05.000000000Z", container["started_at"].(string))
163                 if err != nil {
164                         fmt.Println(err)
165                 }
166                 delta = finishedTimestamp.Sub(startedTimestamp)
167                 csv += strconv.FormatFloat(delta.Seconds(), 'f', 0, 64) + ","
168         } else {
169                 csv += ",,"
170         }
171         var price float64
172         var size string
173         switch n := node.(type) {
174         case Node:
175                 price = n.Price
176                 size = n.ProviderType
177         case LegacyNodeInfo:
178                 price = n.CloudNode.Price
179                 size = n.CloudNode.Size
180         default:
181                 log.Printf("WARNING: unknown node type found!")
182         }
183         cost = delta.Seconds() / 3600 * price
184         csv += size + "," + strconv.FormatFloat(price, 'f', 8, 64) + "," + strconv.FormatFloat(cost, 'f', 8, 64) + "\n"
185         return
186 }
187
188 func loadCachedObject(file string, uuid string) (reload bool, object Dict) {
189         reload = true
190         // See if we have a cached copy of this object
191         if _, err := os.Stat(file); err == nil {
192                 data, err := ioutil.ReadFile(file)
193                 if err != nil {
194                         log.Printf("error reading %q: %s", file, err)
195                         return
196                 }
197                 err = json.Unmarshal(data, &object)
198                 if err != nil {
199                         log.Printf("failed to unmarshal json: %s: %s", data, err)
200                         return
201                 }
202
203                 // See if it is in a final state, if that makes sense
204                 // Projects (j7d0g) do not have state so they should always be reloaded
205                 if !strings.Contains(uuid, "-j7d0g-") {
206                         if object["state"].(string) == "Complete" || object["state"].(string) == "Failed" {
207                                 reload = false
208                                 return
209                         }
210                 }
211         }
212         return
213 }
214
215 // Load an Arvados object.
216 func loadObject(arv *arvadosclient.ArvadosClient, path string, uuid string) (object Dict) {
217
218         ensureDirectory(path)
219
220         file := path + "/" + uuid + ".json"
221
222         var reload bool
223         reload, object = loadCachedObject(file, uuid)
224
225         if reload {
226                 var err error
227                 if strings.Contains(uuid, "-d1hrv-") {
228                         err = arv.Get("pipeline_instances", uuid, nil, &object)
229                 } else if strings.Contains(uuid, "-j7d0g-") {
230                         err = arv.Get("groups", uuid, nil, &object)
231                 } else if strings.Contains(uuid, "-xvhdp-") {
232                         err = arv.Get("container_requests", uuid, nil, &object)
233                 } else if strings.Contains(uuid, "-dz642-") {
234                         err = arv.Get("containers", uuid, nil, &object)
235                 } else {
236                         err = arv.Get("jobs", uuid, nil, &object)
237                 }
238                 if err != nil {
239                         logError([]string{fmt.Sprintf("error loading object with UUID %q: %s", uuid, err)})
240                         os.Exit(1)
241                 }
242                 encoded, err := json.MarshalIndent(object, "", " ")
243                 if err != nil {
244                         logError([]string{fmt.Sprintf("error marshaling object with UUID %q: %s", uuid, err)})
245                         os.Exit(1)
246                 }
247                 err = ioutil.WriteFile(file, encoded, 0644)
248                 if err != nil {
249                         logError([]string{fmt.Sprintf("error writing file %s: %s", file, err)})
250                         os.Exit(1)
251                 }
252         }
253         return
254 }
255
256 func getNode(arv *arvadosclient.ArvadosClient, arv2 *arvados.Client, kc *keepclient.KeepClient, itemMap Dict) (node interface{}, err error) {
257         if _, ok := itemMap["log_uuid"]; ok {
258                 if itemMap["log_uuid"] == nil {
259                         err = errors.New("No log collection")
260                         return
261                 }
262
263                 var collection arvados.Collection
264                 err = arv.Get("collections", itemMap["log_uuid"].(string), nil, &collection)
265                 if err != nil {
266                         log.Printf("error getting collection: %s\n", err)
267                         return
268                 }
269
270                 var fs arvados.CollectionFileSystem
271                 fs, err = collection.FileSystem(arv2, kc)
272                 if err != nil {
273                         log.Printf("error opening collection as filesystem: %s\n", err)
274                         return
275                 }
276                 var f http.File
277                 f, err = fs.Open("node.json")
278                 if err != nil {
279                         log.Printf("error opening file in collection: %s\n", err)
280                         return
281                 }
282
283                 var nodeDict Dict
284                 // TODO: checkout io (ioutil?) readall function
285                 buf := new(bytes.Buffer)
286                 _, err = buf.ReadFrom(f)
287                 if err != nil {
288                         log.Printf("error reading %q: %s\n", f, err)
289                         return
290                 }
291                 contents := buf.String()
292                 f.Close()
293
294                 err = json.Unmarshal([]byte(contents), &nodeDict)
295                 if err != nil {
296                         log.Printf("error unmarshalling: %s\n", err)
297                         return
298                 }
299                 if val, ok := nodeDict["properties"]; ok {
300                         var encoded []byte
301                         encoded, err = json.MarshalIndent(val, "", " ")
302                         if err != nil {
303                                 log.Printf("error marshalling: %s\n", err)
304                                 return
305                         }
306                         // node is type LegacyNodeInfo
307                         var newNode LegacyNodeInfo
308                         err = json.Unmarshal(encoded, &newNode)
309                         if err != nil {
310                                 log.Printf("error unmarshalling: %s\n", err)
311                                 return
312                         }
313                         node = newNode
314                 } else {
315                         // node is type Node
316                         var newNode Node
317                         err = json.Unmarshal([]byte(contents), &newNode)
318                         if err != nil {
319                                 log.Printf("error unmarshalling: %s\n", err)
320                                 return
321                         }
322                         node = newNode
323                 }
324         }
325         return
326 }
327
328 func handleProject(uuid string, arv *arvadosclient.ArvadosClient, arv2 *arvados.Client, kc *keepclient.KeepClient) (cost map[string]float64) {
329
330         cost = make(map[string]float64)
331
332         project := loadObject(arv, "results"+"/"+uuid, uuid)
333
334         // arv -f uuid container_request list --filters '[["owner_uuid","=","<someuuid>"],["requesting_container_uuid","=",null]]'
335
336         // Now find all container requests that have the container we found above as requesting_container_uuid
337         var childCrs map[string]interface{}
338         filterset := []arvados.Filter{
339                 {
340                         Attr:     "owner_uuid",
341                         Operator: "=",
342                         Operand:  project["uuid"].(string),
343                 },
344                 {
345                         Attr:     "requesting_container_uuid",
346                         Operator: "=",
347                         Operand:  nil,
348                 },
349         }
350         err := arv.List("container_requests", arvadosclient.Dict{"filters": filterset, "limit": 10000}, &childCrs)
351         if err != nil {
352                 log.Fatal("error querying container_requests", err.Error())
353         }
354         if value, ok := childCrs["items"]; ok {
355                 log.Println("Collecting top level container requests in project")
356                 items := value.([]interface{})
357                 for _, item := range items {
358                         itemMap := item.(map[string]interface{})
359                         for k, v := range generateCrCsv(itemMap["uuid"].(string), arv, arv2, kc) {
360                                 cost[k] = v
361                         }
362                 }
363         }
364         return
365 }
366
367 func generateCrCsv(uuid string, arv *arvadosclient.ArvadosClient, arv2 *arvados.Client, kc *keepclient.KeepClient) (cost map[string]float64) {
368
369         cost = make(map[string]float64)
370
371         csv := "CR UUID,CR name,Container UUID,State,Started At,Finished At,Duration in seconds,Compute node type,Hourly node cost,Total cost\n"
372         var tmpCsv string
373         var tmpTotalCost float64
374         var totalCost float64
375
376         // This is a container request, find the container
377         cr := loadObject(arv, "results"+"/"+uuid, uuid)
378         container := loadObject(arv, "results"+"/"+uuid, cr["container_uuid"].(string))
379
380         topNode, err := getNode(arv, arv2, kc, cr)
381         if err != nil {
382                 log.Fatalf("error getting node: %s", err)
383         }
384         tmpCsv, totalCost = addContainerLine(topNode, cr, container)
385         csv += tmpCsv
386         totalCost += tmpTotalCost
387
388         cost[container["uuid"].(string)] = totalCost
389
390         // Now find all container requests that have the container we found above as requesting_container_uuid
391         var childCrs map[string]interface{}
392         filterset := []arvados.Filter{
393                 {
394                         Attr:     "requesting_container_uuid",
395                         Operator: "=",
396                         Operand:  container["uuid"].(string),
397                 }}
398         err = arv.List("container_requests", arvadosclient.Dict{"filters": filterset, "limit": 10000}, &childCrs)
399         if err != nil {
400                 log.Fatal("error querying container_requests", err.Error())
401         }
402         if value, ok := childCrs["items"]; ok {
403                 log.Println("Collecting child containers")
404                 items := value.([]interface{})
405                 for _, item := range items {
406                         fmt.Fprintf(os.Stderr, ".")
407                         itemMap := item.(map[string]interface{})
408                         node, _ := getNode(arv, arv2, kc, itemMap)
409                         c2 := loadObject(arv, "results"+"/"+uuid, itemMap["container_uuid"].(string))
410                         tmpCsv, tmpTotalCost = addContainerLine(node, itemMap, c2)
411                         cost[itemMap["container_uuid"].(string)] = tmpTotalCost
412                         csv += tmpCsv
413                         totalCost += tmpTotalCost
414                 }
415         }
416         fmt.Fprintf(os.Stderr, "\n")
417         log.Println("Done")
418
419         csv += "TOTAL,,,,,,,,," + strconv.FormatFloat(totalCost, 'f', 8, 64) + "\n"
420
421         // Write the resulting CSV file
422         err = ioutil.WriteFile("results"+"/"+uuid+".csv", []byte(csv), 0644)
423         if err != nil {
424                 logError([]string{"Error writing file", ":", err.Error()})
425                 os.Exit(1)
426         }
427
428         log.Println("Results in results/" + uuid + ".csv")
429         return
430 }
431
432 func main() {
433
434         uuids := parseFlags()
435
436         ensureDirectory("results")
437
438         // Arvados Client setup
439         arv, err := arvadosclient.MakeArvadosClient()
440         if err != nil {
441                 logError([]string{fmt.Sprintf("error creating Arvados object: %s", err)})
442                 os.Exit(1)
443         }
444         kc, err := keepclient.MakeKeepClient(arv)
445         if err != nil {
446                 logError([]string{fmt.Sprintf("error creating Keep object: %s", err)})
447                 os.Exit(1)
448         }
449
450         arv2 := arvados.NewClientFromEnv()
451
452         cost := make(map[string]float64)
453
454         for _, uuid := range uuids {
455                 //csv := "CR UUID,CR name,Container UUID,State,Started At,Finished At,Duration in seconds,Compute node type,Hourly node cost,Total cost\n"
456
457                 if strings.Contains(uuid, "-d1hrv-") {
458                         // This is a pipeline instance, not a job! Find the cwl-runner job.
459                         pi := loadObject(arv, "results"+"/"+uuid, uuid)
460                         for _, v := range pi["components"].(map[string]interface{}) {
461                                 x := v.(map[string]interface{})
462                                 y := x["job"].(map[string]interface{})
463                                 uuid = y["uuid"].(string)
464                         }
465                 }
466
467                 // for projects:
468                 // arv -f uuid container_request list --filters '[["owner_uuid","=","<someuuid>"],["requesting_container_uuid","=",null]]'
469
470                 // Is this a project?
471                 if strings.Contains(uuid, "-j7d0g-") {
472                         for k, v := range handleProject(uuid, arv, arv2, kc) {
473                                 cost[k] = v
474                         }
475                 }
476                 // Is this a container request?
477                 if strings.Contains(uuid, "-xvhdp-") {
478                         for k, v := range generateCrCsv(uuid, arv, arv2, kc) {
479                                 cost[k] = v
480                         }
481                 }
482         }
483
484         var csv string
485
486         csv = "# Aggregate cost accounting for uuids:\n"
487         for _, uuid := range uuids {
488                 csv += "# " + uuid + "\n"
489         }
490
491         var total float64
492         for k, v := range cost {
493                 csv += k + "," + strconv.FormatFloat(v, 'f', 8, 64) + "\n"
494                 total += v
495         }
496
497         csv += "TOTAL," + strconv.FormatFloat(total, 'f', 8, 64) + "\n"
498
499         // Write the resulting CSV file
500         err = ioutil.WriteFile("results"+"/"+time.Now().Format("2006-01-02-15-04-05")+"-aggregate-costaccounting.csv", []byte(csv), 0644)
501         if err != nil {
502                 logError([]string{"Error writing file", ":", err.Error()})
503                 os.Exit(1)
504         }
505 }