1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
13 "git.arvados.org/arvados.git/sdk/go/arvados"
14 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
15 "git.arvados.org/arvados.git/sdk/go/keepclient"
25 // Dict is a helper type so we don't have to write out 'map[string]interface{}' every time.
26 type Dict map[string]interface{}
28 // LegacyNodeInfo is a struct for records created by Arvados Node Manager (Arvados <= 1.4.3)
31 // "total_cpu_cores":2,
32 // "total_scratch_mb":33770,
38 // "total_ram_mb":7986
40 type LegacyNodeInfo struct {
41 CPUCores int64 `json:"total_cpu_cores"`
42 ScratchMb int64 `json:"total_scratch_mb"`
43 RAMMb int64 `json:"total_ram_mb"`
44 CloudNode LegacyCloudNode `json:"cloud_node"`
47 // LegacyCloudNode is a struct for records created by Arvados Node Manager (Arvados <= 1.4.3)
48 type LegacyCloudNode struct {
49 Price float64 `json:"price"`
50 Size string `json:"size"`
53 // Node is a struct for records created by Arvados Dispatch Cloud (Arvados >= 2.0.0)
56 // "Name": "Standard_D1_v2",
57 // "ProviderType": "Standard_D1_v2",
60 // "Scratch": 50000000000,
61 // "IncludedScratch": 50000000000,
64 // "Preemptible": false
81 type arrayFlags []string
83 func (i *arrayFlags) String() string {
87 func (i *arrayFlags) Set(value string) error {
88 *i = append(*i, value)
92 func logError(m []string) {
93 log.Print(string(marshal(report{"Error", strings.Join(m, " ")})))
96 func marshal(message interface{}) (encoded []byte) {
97 encoded, err := json.Marshal(message)
99 // do not call logError here because that would create an infinite loop
100 fmt.Fprintln(os.Stderr, "{\"Error\": \"Unable to marshal message into json:", message, "\"}")
106 func parseFlags() (uuids arrayFlags) {
108 flags := flag.NewFlagSet("cost-analyzer", flag.ExitOnError)
109 flags.Var(&uuids, "uuid", "Toplevel project or container request uuid. May be specified more than once.")
111 flags.Usage = func() { usage(flags) }
113 // Parse args; omit the first arg which is the command name
114 err := flags.Parse(os.Args[1:])
116 logError([]string{"Unable to parse command line arguments:", err.Error()})
128 func ensureDirectory(dir string) {
129 statData, err := os.Stat(dir)
130 if os.IsNotExist(err) {
131 err = os.MkdirAll(dir, 0700)
133 logError([]string{"Error creating directory", dir, ":", err.Error()})
137 if !statData.IsDir() {
138 logError([]string{"The path", dir, "is not a directory"})
144 func addContainerLine(node interface{}, cr Dict, container Dict) (csv string, cost float64) {
145 csv = cr["uuid"].(string) + ","
146 csv += cr["name"].(string) + ","
147 csv += container["uuid"].(string) + ","
148 csv += container["state"].(string) + ","
149 if container["started_at"] != nil {
150 csv += container["started_at"].(string) + ","
155 var delta time.Duration
156 if container["finished_at"] != nil {
157 csv += container["finished_at"].(string) + ","
158 finishedTimestamp, err := time.Parse("2006-01-02T15:04:05.000000000Z", container["finished_at"].(string))
162 startedTimestamp, err := time.Parse("2006-01-02T15:04:05.000000000Z", container["started_at"].(string))
166 delta = finishedTimestamp.Sub(startedTimestamp)
167 csv += strconv.FormatFloat(delta.Seconds(), 'f', 0, 64) + ","
173 switch n := node.(type) {
176 size = n.ProviderType
178 price = n.CloudNode.Price
179 size = n.CloudNode.Size
181 log.Printf("WARNING: unknown node type found!")
183 cost = delta.Seconds() / 3600 * price
184 csv += size + "," + strconv.FormatFloat(price, 'f', 8, 64) + "," + strconv.FormatFloat(cost, 'f', 8, 64) + "\n"
188 func loadCachedObject(file string, uuid string) (reload bool, object Dict) {
190 // See if we have a cached copy of this object
191 if _, err := os.Stat(file); err == nil {
192 data, err := ioutil.ReadFile(file)
194 log.Printf("error reading %q: %s", file, err)
197 err = json.Unmarshal(data, &object)
199 log.Printf("failed to unmarshal json: %s: %s", data, err)
203 // See if it is in a final state, if that makes sense
204 // Projects (j7d0g) do not have state so they should always be reloaded
205 if !strings.Contains(uuid, "-j7d0g-") {
206 if object["state"].(string) == "Complete" || object["state"].(string) == "Failed" {
215 // Load an Arvados object.
216 func loadObject(arv *arvadosclient.ArvadosClient, path string, uuid string) (object Dict) {
218 ensureDirectory(path)
220 file := path + "/" + uuid + ".json"
223 reload, object = loadCachedObject(file, uuid)
227 if strings.Contains(uuid, "-d1hrv-") {
228 err = arv.Get("pipeline_instances", uuid, nil, &object)
229 } else if strings.Contains(uuid, "-j7d0g-") {
230 err = arv.Get("groups", uuid, nil, &object)
231 } else if strings.Contains(uuid, "-xvhdp-") {
232 err = arv.Get("container_requests", uuid, nil, &object)
233 } else if strings.Contains(uuid, "-dz642-") {
234 err = arv.Get("containers", uuid, nil, &object)
236 err = arv.Get("jobs", uuid, nil, &object)
239 logError([]string{fmt.Sprintf("error loading object with UUID %q: %s", uuid, err)})
242 encoded, err := json.MarshalIndent(object, "", " ")
244 logError([]string{fmt.Sprintf("error marshaling object with UUID %q: %s", uuid, err)})
247 err = ioutil.WriteFile(file, encoded, 0644)
249 logError([]string{fmt.Sprintf("error writing file %s: %s", file, err)})
256 func getNode(arv *arvadosclient.ArvadosClient, arv2 *arvados.Client, kc *keepclient.KeepClient, itemMap Dict) (node interface{}, err error) {
257 if _, ok := itemMap["log_uuid"]; ok {
258 if itemMap["log_uuid"] == nil {
259 err = errors.New("No log collection")
263 var collection arvados.Collection
264 err = arv.Get("collections", itemMap["log_uuid"].(string), nil, &collection)
266 log.Printf("error getting collection: %s\n", err)
270 var fs arvados.CollectionFileSystem
271 fs, err = collection.FileSystem(arv2, kc)
273 log.Printf("error opening collection as filesystem: %s\n", err)
277 f, err = fs.Open("node.json")
279 log.Printf("error opening file in collection: %s\n", err)
284 // TODO: checkout io (ioutil?) readall function
285 buf := new(bytes.Buffer)
286 _, err = buf.ReadFrom(f)
288 log.Printf("error reading %q: %s\n", f, err)
291 contents := buf.String()
294 err = json.Unmarshal([]byte(contents), &nodeDict)
296 log.Printf("error unmarshalling: %s\n", err)
299 if val, ok := nodeDict["properties"]; ok {
301 encoded, err = json.MarshalIndent(val, "", " ")
303 log.Printf("error marshalling: %s\n", err)
306 // node is type LegacyNodeInfo
307 var newNode LegacyNodeInfo
308 err = json.Unmarshal(encoded, &newNode)
310 log.Printf("error unmarshalling: %s\n", err)
317 err = json.Unmarshal([]byte(contents), &newNode)
319 log.Printf("error unmarshalling: %s\n", err)
328 func handleProject(uuid string, arv *arvadosclient.ArvadosClient, arv2 *arvados.Client, kc *keepclient.KeepClient) (cost map[string]float64) {
330 cost = make(map[string]float64)
332 project := loadObject(arv, "results"+"/"+uuid, uuid)
334 // arv -f uuid container_request list --filters '[["owner_uuid","=","<someuuid>"],["requesting_container_uuid","=",null]]'
336 // Now find all container requests that have the container we found above as requesting_container_uuid
337 var childCrs map[string]interface{}
338 filterset := []arvados.Filter{
342 Operand: project["uuid"].(string),
345 Attr: "requesting_container_uuid",
350 err := arv.List("container_requests", arvadosclient.Dict{"filters": filterset, "limit": 10000}, &childCrs)
352 log.Fatal("error querying container_requests", err.Error())
354 if value, ok := childCrs["items"]; ok {
355 log.Println("Collecting top level container requests in project")
356 items := value.([]interface{})
357 for _, item := range items {
358 itemMap := item.(map[string]interface{})
359 for k, v := range generateCrCsv(itemMap["uuid"].(string), arv, arv2, kc) {
367 func generateCrCsv(uuid string, arv *arvadosclient.ArvadosClient, arv2 *arvados.Client, kc *keepclient.KeepClient) (cost map[string]float64) {
369 cost = make(map[string]float64)
371 csv := "CR UUID,CR name,Container UUID,State,Started At,Finished At,Duration in seconds,Compute node type,Hourly node cost,Total cost\n"
373 var tmpTotalCost float64
374 var totalCost float64
376 // This is a container request, find the container
377 cr := loadObject(arv, "results"+"/"+uuid, uuid)
378 container := loadObject(arv, "results"+"/"+uuid, cr["container_uuid"].(string))
380 topNode, err := getNode(arv, arv2, kc, cr)
382 log.Fatalf("error getting node: %s", err)
384 tmpCsv, totalCost = addContainerLine(topNode, cr, container)
386 totalCost += tmpTotalCost
388 cost[container["uuid"].(string)] = totalCost
390 // Now find all container requests that have the container we found above as requesting_container_uuid
391 var childCrs map[string]interface{}
392 filterset := []arvados.Filter{
394 Attr: "requesting_container_uuid",
396 Operand: container["uuid"].(string),
398 err = arv.List("container_requests", arvadosclient.Dict{"filters": filterset, "limit": 10000}, &childCrs)
400 log.Fatal("error querying container_requests", err.Error())
402 if value, ok := childCrs["items"]; ok {
403 log.Println("Collecting child containers")
404 items := value.([]interface{})
405 for _, item := range items {
406 fmt.Fprintf(os.Stderr, ".")
407 itemMap := item.(map[string]interface{})
408 node, _ := getNode(arv, arv2, kc, itemMap)
409 c2 := loadObject(arv, "results"+"/"+uuid, itemMap["container_uuid"].(string))
410 tmpCsv, tmpTotalCost = addContainerLine(node, itemMap, c2)
411 cost[itemMap["container_uuid"].(string)] = tmpTotalCost
413 totalCost += tmpTotalCost
416 fmt.Fprintf(os.Stderr, "\n")
419 csv += "TOTAL,,,,,,,,," + strconv.FormatFloat(totalCost, 'f', 8, 64) + "\n"
421 // Write the resulting CSV file
422 err = ioutil.WriteFile("results"+"/"+uuid+".csv", []byte(csv), 0644)
424 logError([]string{"Error writing file", ":", err.Error()})
428 log.Println("Results in results/" + uuid + ".csv")
434 uuids := parseFlags()
436 ensureDirectory("results")
438 // Arvados Client setup
439 arv, err := arvadosclient.MakeArvadosClient()
441 logError([]string{fmt.Sprintf("error creating Arvados object: %s", err)})
444 kc, err := keepclient.MakeKeepClient(arv)
446 logError([]string{fmt.Sprintf("error creating Keep object: %s", err)})
450 arv2 := arvados.NewClientFromEnv()
452 cost := make(map[string]float64)
454 for _, uuid := range uuids {
455 //csv := "CR UUID,CR name,Container UUID,State,Started At,Finished At,Duration in seconds,Compute node type,Hourly node cost,Total cost\n"
457 if strings.Contains(uuid, "-d1hrv-") {
458 // This is a pipeline instance, not a job! Find the cwl-runner job.
459 pi := loadObject(arv, "results"+"/"+uuid, uuid)
460 for _, v := range pi["components"].(map[string]interface{}) {
461 x := v.(map[string]interface{})
462 y := x["job"].(map[string]interface{})
463 uuid = y["uuid"].(string)
468 // arv -f uuid container_request list --filters '[["owner_uuid","=","<someuuid>"],["requesting_container_uuid","=",null]]'
470 // Is this a project?
471 if strings.Contains(uuid, "-j7d0g-") {
472 for k, v := range handleProject(uuid, arv, arv2, kc) {
476 // Is this a container request?
477 if strings.Contains(uuid, "-xvhdp-") {
478 for k, v := range generateCrCsv(uuid, arv, arv2, kc) {
486 csv = "# Aggregate cost accounting for uuids:\n"
487 for _, uuid := range uuids {
488 csv += "# " + uuid + "\n"
492 for k, v := range cost {
493 csv += k + "," + strconv.FormatFloat(v, 'f', 8, 64) + "\n"
497 csv += "TOTAL," + strconv.FormatFloat(total, 'f', 8, 64) + "\n"
499 // Write the resulting CSV file
500 err = ioutil.WriteFile("results"+"/"+time.Now().Format("2006-01-02-15-04-05")+"-aggregate-costaccounting.csv", []byte(csv), 0644)
502 logError([]string{"Error writing file", ":", err.Error()})