16950: add the costanalyzer to arvados-client.
authorWard Vandewege <ward@curii.com>
Sat, 3 Oct 2020 17:58:50 +0000 (13:58 -0400)
committerWard Vandewege <ward@curii.com>
Sat, 31 Oct 2020 01:27:17 +0000 (21:27 -0400)
Arvados-DCO-1.1-Signed-off-by: Ward Vandewege <ward@curii.com>

cmd/arvados-client/cmd.go
lib/costanalyzer/command.go [new file with mode: 0644]
lib/costanalyzer/costanalyzer.go [new file with mode: 0644]

index bcc3dda09ac91559d4a35227ef81c95bf3e979cd..47fcd5ad7dc88275c5c9ce47369f3432ac861632 100644 (file)
@@ -9,6 +9,7 @@ import (
 
        "git.arvados.org/arvados.git/lib/cli"
        "git.arvados.org/arvados.git/lib/cmd"
+       "git.arvados.org/arvados.git/lib/costanalyzer"
        "git.arvados.org/arvados.git/lib/deduplicationreport"
        "git.arvados.org/arvados.git/lib/mount"
 )
@@ -55,6 +56,7 @@ var (
 
                "mount":                mount.Command,
                "deduplication-report": deduplicationreport.Command,
+               "costanalyzer":         costanalyzer.Command,
        })
 )
 
diff --git a/lib/costanalyzer/command.go b/lib/costanalyzer/command.go
new file mode 100644 (file)
index 0000000..3cca16e
--- /dev/null
@@ -0,0 +1,43 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package costanalyzer
+
+import (
+       "io"
+
+       "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/sirupsen/logrus"
+)
+
+var Command command
+
+type command struct{}
+
+type NoPrefixFormatter struct{}
+
+func (f *NoPrefixFormatter) Format(entry *logrus.Entry) ([]byte, error) {
+       return []byte(entry.Message), nil
+}
+
+// RunCommand implements the subcommand "deduplication-report <collection> <collection> ..."
+func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+       var err error
+       logger := ctxlog.New(stderr, "text", "info")
+       defer func() {
+               if err != nil {
+                       logger.WithError(err).Error("fatal")
+               }
+       }()
+
+       logger.SetFormatter(new(NoPrefixFormatter))
+
+       loader := config.NewLoader(stdin, logger)
+       loader.SkipLegacy = true
+
+       exitcode := costanalyzer(prog, args, loader, logger, stdout, stderr)
+
+       return exitcode
+}
diff --git a/lib/costanalyzer/costanalyzer.go b/lib/costanalyzer/costanalyzer.go
new file mode 100644 (file)
index 0000000..d754e88
--- /dev/null
@@ -0,0 +1,559 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package costanalyzer
+
+import (
+       "bytes"
+       "encoding/json"
+       "errors"
+       "flag"
+       "fmt"
+       "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/sdk/go/keepclient"
+       "io"
+       "io/ioutil"
+       "log"
+       "net/http"
+       "os"
+       "strconv"
+       "strings"
+       "time"
+
+       "github.com/sirupsen/logrus"
+)
+
+// Dict is a helper type so we don't have to write out 'map[string]interface{}' every time.
+type Dict map[string]interface{}
+
+// LegacyNodeInfo is a struct for records created by Arvados Node Manager (Arvados <= 1.4.3)
+// Example:
+// {
+//    "total_cpu_cores":2,
+//    "total_scratch_mb":33770,
+//    "cloud_node":
+//      {
+//        "price":0.1,
+//        "size":"m4.large"
+//      },
+//     "total_ram_mb":7986
+// }
+type LegacyNodeInfo struct {
+       CPUCores  int64           `json:"total_cpu_cores"`
+       ScratchMb int64           `json:"total_scratch_mb"`
+       RAMMb     int64           `json:"total_ram_mb"`
+       CloudNode LegacyCloudNode `json:"cloud_node"`
+}
+
+// LegacyCloudNode is a struct for records created by Arvados Node Manager (Arvados <= 1.4.3)
+type LegacyCloudNode struct {
+       Price float64 `json:"price"`
+       Size  string  `json:"size"`
+}
+
+// Node is a struct for records created by Arvados Dispatch Cloud (Arvados >= 2.0.0)
+// Example:
+// {
+//    "Name": "Standard_D1_v2",
+//    "ProviderType": "Standard_D1_v2",
+//    "VCPUs": 1,
+//    "RAM": 3584000000,
+//    "Scratch": 50000000000,
+//    "IncludedScratch": 50000000000,
+//    "AddedScratch": 0,
+//    "Price": 0.057,
+//    "Preemptible": false
+//}
+type Node struct {
+       VCPUs        int64
+       Scratch      int64
+       RAM          int64
+       Price        float64
+       Name         string
+       ProviderType string
+       Preemptible  bool
+}
+
+type arrayFlags []string
+
+func (i *arrayFlags) String() string {
+       return ""
+}
+
+func (i *arrayFlags) Set(value string) error {
+       *i = append(*i, value)
+       return nil
+}
+
+func parseFlags(prog string, args []string, loader *config.Loader, logger *logrus.Logger, stderr io.Writer) (exitCode int, uuids arrayFlags, resultsDir string) {
+       flags := flag.NewFlagSet("", flag.ContinueOnError)
+       flags.SetOutput(stderr)
+       flags.Usage = func() {
+               fmt.Fprintf(flags.Output(), `
+Usage:
+  %s [options ...]
+
+       This program analyzes the cost of Arvados container requests. For each uuid
+       supplied, it creates a CSV report that lists all the containers used to
+       fulfill the container request, together with the machine type and cost of
+       each container.
+
+       When supplied with the uuid of a container request, it will calculate the
+       cost of that container request and all its children. When suplied with a
+       project uuid or when supplied with multiple container request uuids, it will
+       create a CSV report for each supplied uuid, as well as a CSV file with
+       aggregate cost accounting for all supplied uuids. The aggregate cost report
+       takes container reuse into account: if a container was reused between several
+       container requests, its cost will only be counted once.
+
+       To get the node costs, the progam queries the Arvados API for current cost
+       data for each node type used. This means that the reported cost always
+       reflects the cost data as currently defined in the Arvados API configuration
+       file.
+
+       Caveats:
+       - the Arvados API configuration cost data may be out of sync with the cloud
+       provider.
+       - when generating reports for older container requests, the cost data in the
+       Arvados API configuration file may have changed since the container request
+       was fulfilled.
+
+       In order to get the data for the uuids supplied, the ARVADOS_API_HOST and
+       ARVADOS_API_TOKEN environment variables must be set.
+
+Options:
+`, prog)
+               flags.PrintDefaults()
+       }
+       loglevel := flags.String("log-level", "info", "logging level (debug, info, ...)")
+       resultsDir = *flags.String("output", "results", "output directory for the CSV reports")
+       flags.Var(&uuids, "uuid", "Toplevel project or container request uuid. May be specified more than once.")
+       err := flags.Parse(args)
+       if err == flag.ErrHelp {
+               exitCode = 0
+               return
+       } else if err != nil {
+               exitCode = 2
+               return
+       }
+
+       if len(uuids) < 1 {
+               logger.Errorf("Error: no uuid(s) provided")
+               flags.Usage()
+               exitCode = 2
+               return
+       }
+
+       lvl, err := logrus.ParseLevel(*loglevel)
+       if err != nil {
+               exitCode = 2
+               return
+       }
+       logger.SetLevel(lvl)
+       return
+}
+
+func ensureDirectory(logger *logrus.Logger, dir string) {
+       statData, err := os.Stat(dir)
+       if os.IsNotExist(err) {
+               err = os.MkdirAll(dir, 0700)
+               if err != nil {
+                       logger.Errorf("Error creating directory %s: %s\n", dir, err.Error())
+                       os.Exit(1)
+               }
+       } else {
+               if !statData.IsDir() {
+                       logger.Errorf("The path %s is not a directory\n", dir)
+                       os.Exit(1)
+               }
+       }
+}
+
+func addContainerLine(logger *logrus.Logger, node interface{}, cr Dict, container Dict) (csv string, cost float64) {
+       csv = cr["uuid"].(string) + ","
+       csv += cr["name"].(string) + ","
+       csv += container["uuid"].(string) + ","
+       csv += container["state"].(string) + ","
+       if container["started_at"] != nil {
+               csv += container["started_at"].(string) + ","
+       } else {
+               csv += ","
+       }
+
+       var delta time.Duration
+       if container["finished_at"] != nil {
+               csv += container["finished_at"].(string) + ","
+               finishedTimestamp, err := time.Parse("2006-01-02T15:04:05.000000000Z", container["finished_at"].(string))
+               if err != nil {
+                       fmt.Println(err)
+               }
+               startedTimestamp, err := time.Parse("2006-01-02T15:04:05.000000000Z", container["started_at"].(string))
+               if err != nil {
+                       fmt.Println(err)
+               }
+               delta = finishedTimestamp.Sub(startedTimestamp)
+               csv += strconv.FormatFloat(delta.Seconds(), 'f', 0, 64) + ","
+       } else {
+               csv += ",,"
+       }
+       var price float64
+       var size string
+       switch n := node.(type) {
+       case Node:
+               price = n.Price
+               size = n.ProviderType
+       case LegacyNodeInfo:
+               price = n.CloudNode.Price
+               size = n.CloudNode.Size
+       default:
+               logger.Warn("WARNING: unknown node type found!")
+       }
+       cost = delta.Seconds() / 3600 * price
+       csv += size + "," + strconv.FormatFloat(price, 'f', 8, 64) + "," + strconv.FormatFloat(cost, 'f', 8, 64) + "\n"
+       return
+}
+
+func loadCachedObject(logger *logrus.Logger, file string, uuid string) (reload bool, object Dict) {
+       reload = true
+       // See if we have a cached copy of this object
+       if _, err := os.Stat(file); err == nil {
+               data, err := ioutil.ReadFile(file)
+               if err != nil {
+                       logger.Errorf("error reading %q: %s", file, err)
+                       return
+               }
+               err = json.Unmarshal(data, &object)
+               if err != nil {
+                       logger.Errorf("failed to unmarshal json: %s: %s", data, err)
+                       return
+               }
+
+               // See if it is in a final state, if that makes sense
+               // Projects (j7d0g) do not have state so they should always be reloaded
+               if !strings.Contains(uuid, "-j7d0g-") {
+                       if object["state"].(string) == "Complete" || object["state"].(string) == "Failed" {
+                               reload = false
+                               logger.Debugf("Loaded object %s from local cache (%s)\n", uuid, file)
+                               return
+                       }
+               }
+       }
+       return
+}
+
+// Load an Arvados object.
+func loadObject(logger *logrus.Logger, arv *arvadosclient.ArvadosClient, path string, uuid string) (object Dict) {
+
+       ensureDirectory(logger, path)
+
+       file := path + "/" + uuid + ".json"
+
+       var reload bool
+       reload, object = loadCachedObject(logger, file, uuid)
+
+       if reload {
+               var err error
+               if strings.Contains(uuid, "-d1hrv-") {
+                       err = arv.Get("pipeline_instances", uuid, nil, &object)
+               } else if strings.Contains(uuid, "-j7d0g-") {
+                       err = arv.Get("groups", uuid, nil, &object)
+               } else if strings.Contains(uuid, "-xvhdp-") {
+                       err = arv.Get("container_requests", uuid, nil, &object)
+               } else if strings.Contains(uuid, "-dz642-") {
+                       err = arv.Get("containers", uuid, nil, &object)
+               } else {
+                       err = arv.Get("jobs", uuid, nil, &object)
+               }
+               if err != nil {
+                       logger.Errorf("Error loading object with UUID %q:\n  %s\n", uuid, err)
+                       os.Exit(1)
+               }
+               encoded, err := json.MarshalIndent(object, "", " ")
+               if err != nil {
+                       logger.Errorf("Error marshaling object with UUID %q:\n  %s\n", uuid, err)
+                       os.Exit(1)
+               }
+               err = ioutil.WriteFile(file, encoded, 0644)
+               if err != nil {
+                       logger.Errorf("Error writing file %s:\n  %s\n", file, err)
+                       os.Exit(1)
+               }
+       }
+       return
+}
+
+func getNode(logger *logrus.Logger, arv *arvadosclient.ArvadosClient, arv2 *arvados.Client, kc *keepclient.KeepClient, itemMap Dict) (node interface{}, err error) {
+       if _, ok := itemMap["log_uuid"]; ok {
+               if itemMap["log_uuid"] == nil {
+                       err = errors.New("No log collection")
+                       return
+               }
+
+               var collection arvados.Collection
+               err = arv.Get("collections", itemMap["log_uuid"].(string), nil, &collection)
+               if err != nil {
+                       logger.Errorf("error getting collection: %s\n", err)
+                       return
+               }
+
+               var fs arvados.CollectionFileSystem
+               fs, err = collection.FileSystem(arv2, kc)
+               if err != nil {
+                       logger.Errorf("error opening collection as filesystem: %s\n", err)
+                       return
+               }
+               var f http.File
+               f, err = fs.Open("node.json")
+               if err != nil {
+                       logger.Errorf("error opening file in collection: %s\n", err)
+                       return
+               }
+
+               var nodeDict Dict
+               // TODO: checkout io (ioutil?) readall function
+               buf := new(bytes.Buffer)
+               _, err = buf.ReadFrom(f)
+               if err != nil {
+                       logger.Errorf("error reading %q: %s\n", f, err)
+                       return
+               }
+               contents := buf.String()
+               f.Close()
+
+               err = json.Unmarshal([]byte(contents), &nodeDict)
+               if err != nil {
+                       logger.Errorf("error unmarshalling: %s\n", err)
+                       return
+               }
+               if val, ok := nodeDict["properties"]; ok {
+                       var encoded []byte
+                       encoded, err = json.MarshalIndent(val, "", " ")
+                       if err != nil {
+                               logger.Errorf("error marshalling: %s\n", err)
+                               return
+                       }
+                       // node is type LegacyNodeInfo
+                       var newNode LegacyNodeInfo
+                       err = json.Unmarshal(encoded, &newNode)
+                       if err != nil {
+                               logger.Errorf("error unmarshalling: %s\n", err)
+                               return
+                       }
+                       node = newNode
+               } else {
+                       // node is type Node
+                       var newNode Node
+                       err = json.Unmarshal([]byte(contents), &newNode)
+                       if err != nil {
+                               logger.Errorf("error unmarshalling: %s\n", err)
+                               return
+                       }
+                       node = newNode
+               }
+       }
+       return
+}
+
+func handleProject(logger *logrus.Logger, uuid string, arv *arvadosclient.ArvadosClient, arv2 *arvados.Client, kc *keepclient.KeepClient, resultsDir string) (cost map[string]float64) {
+
+       cost = make(map[string]float64)
+
+       project := loadObject(logger, arv, resultsDir+"/"+uuid, uuid)
+
+       // arv -f uuid container_request list --filters '[["owner_uuid","=","<someuuid>"],["requesting_container_uuid","=",null]]'
+
+       // Now find all container requests that have the container we found above as requesting_container_uuid
+       var childCrs map[string]interface{}
+       filterset := []arvados.Filter{
+               {
+                       Attr:     "owner_uuid",
+                       Operator: "=",
+                       Operand:  project["uuid"].(string),
+               },
+               {
+                       Attr:     "requesting_container_uuid",
+                       Operator: "=",
+                       Operand:  nil,
+               },
+       }
+       err := arv.List("container_requests", arvadosclient.Dict{"filters": filterset, "limit": 10000}, &childCrs)
+       if err != nil {
+               logger.Fatalf("Error querying container_requests: %s\n", err.Error())
+       }
+       if value, ok := childCrs["items"]; ok {
+               logger.Infof("Collecting top level container requests in project %s\n", uuid)
+               items := value.([]interface{})
+               for _, item := range items {
+                       itemMap := item.(map[string]interface{})
+                       for k, v := range generateCrCsv(logger, itemMap["uuid"].(string), arv, arv2, kc, resultsDir) {
+                               cost[k] = v
+                       }
+               }
+       } else {
+               logger.Infof("No top level container requests found in project %s\n", uuid)
+       }
+       return
+}
+
+func generateCrCsv(logger *logrus.Logger, uuid string, arv *arvadosclient.ArvadosClient, arv2 *arvados.Client, kc *keepclient.KeepClient, resultsDir string) (cost map[string]float64) {
+
+       cost = make(map[string]float64)
+
+       csv := "CR UUID,CR name,Container UUID,State,Started At,Finished At,Duration in seconds,Compute node type,Hourly node cost,Total cost\n"
+       var tmpCsv string
+       var tmpTotalCost float64
+       var totalCost float64
+
+       // This is a container request, find the container
+       cr := loadObject(logger, arv, resultsDir+"/"+uuid, uuid)
+       container := loadObject(logger, arv, resultsDir+"/"+uuid, cr["container_uuid"].(string))
+
+       topNode, err := getNode(logger, arv, arv2, kc, cr)
+       if err != nil {
+               log.Fatalf("error getting node: %s", err)
+       }
+       tmpCsv, totalCost = addContainerLine(logger, topNode, cr, container)
+       csv += tmpCsv
+       totalCost += tmpTotalCost
+
+       cost[container["uuid"].(string)] = totalCost
+
+       // Now find all container requests that have the container we found above as requesting_container_uuid
+       var childCrs map[string]interface{}
+       filterset := []arvados.Filter{
+               {
+                       Attr:     "requesting_container_uuid",
+                       Operator: "=",
+                       Operand:  container["uuid"].(string),
+               }}
+       err = arv.List("container_requests", arvadosclient.Dict{"filters": filterset, "limit": 10000}, &childCrs)
+       if err != nil {
+               log.Fatal("error querying container_requests", err.Error())
+       }
+       if value, ok := childCrs["items"]; ok {
+               logger.Infof("Collecting child containers for container request %s", uuid)
+               items := value.([]interface{})
+               for _, item := range items {
+                       logger.Info(".")
+                       itemMap := item.(map[string]interface{})
+                       node, _ := getNode(logger, arv, arv2, kc, itemMap)
+                       logger.Debug("\nChild container: " + itemMap["container_uuid"].(string) + "\n")
+                       c2 := loadObject(logger, arv, resultsDir+"/"+uuid, itemMap["container_uuid"].(string))
+                       tmpCsv, tmpTotalCost = addContainerLine(logger, node, itemMap, c2)
+                       cost[itemMap["container_uuid"].(string)] = tmpTotalCost
+                       csv += tmpCsv
+                       totalCost += tmpTotalCost
+               }
+       }
+       logger.Info(" done\n")
+
+       csv += "TOTAL,,,,,,,,," + strconv.FormatFloat(totalCost, 'f', 8, 64) + "\n"
+
+       // Write the resulting CSV file
+       fName := resultsDir + "/" + uuid + ".csv"
+       err = ioutil.WriteFile(fName, []byte(csv), 0644)
+       if err != nil {
+               logger.Errorf("Error writing file with path %s: %s\n", fName, err.Error())
+               os.Exit(1)
+       }
+
+       return
+}
+
+func costanalyzer(prog string, args []string, loader *config.Loader, logger *logrus.Logger, stdout, stderr io.Writer) (exitcode int) {
+       exitcode, uuids, resultsDir := parseFlags(prog, args, loader, logger, stderr)
+       if exitcode != 0 {
+               return
+       }
+
+       ensureDirectory(logger, resultsDir)
+
+       // Arvados Client setup
+       arv, err := arvadosclient.MakeArvadosClient()
+       if err != nil {
+               logger.Errorf("error creating Arvados object: %s", err)
+               os.Exit(1)
+       }
+       kc, err := keepclient.MakeKeepClient(arv)
+       if err != nil {
+               logger.Errorf("error creating Keep object: %s", err)
+               os.Exit(1)
+       }
+
+       arv2 := arvados.NewClientFromEnv()
+
+       cost := make(map[string]float64)
+
+       for _, uuid := range uuids {
+               //csv := "CR UUID,CR name,Container UUID,State,Started At,Finished At,Duration in seconds,Compute node type,Hourly node cost,Total cost\n"
+
+               if strings.Contains(uuid, "-d1hrv-") {
+                       // This is a pipeline instance, not a job! Find the cwl-runner job.
+                       pi := loadObject(logger, arv, resultsDir+"/"+uuid, uuid)
+                       for _, v := range pi["components"].(map[string]interface{}) {
+                               x := v.(map[string]interface{})
+                               y := x["job"].(map[string]interface{})
+                               uuid = y["uuid"].(string)
+                       }
+               }
+
+               // for projects:
+               // arv -f uuid container_request list --filters '[["owner_uuid","=","<someuuid>"],["requesting_container_uuid","=",null]]'
+
+               if strings.Contains(uuid, "-j7d0g-") {
+                       // This is a project (group)
+                       for k, v := range handleProject(logger, uuid, arv, arv2, kc, resultsDir) {
+                               cost[k] = v
+                       }
+               } else if strings.Contains(uuid, "-xvhdp-") {
+                       // This is a container request
+                       for k, v := range generateCrCsv(logger, uuid, arv, arv2, kc, resultsDir) {
+                               cost[k] = v
+                       }
+               } else if strings.Contains(uuid, "-tpzed-") {
+                       // This is a user. The "Home" project for a user is not a real project.
+                       // It is identified by the user uuid. As such, cost analysis for the
+                       // "Home" project is not supported by this program.
+                       logger.Errorf("Cost analysis is not supported for the 'Home' project: %s", uuid)
+               }
+       }
+
+       logger.Info("\n")
+       for k := range cost {
+               logger.Infof("Uuid report in %s/%s.csv\n", resultsDir, k)
+       }
+
+       if len(cost) == 0 {
+               logger.Info("Nothing to do!\n")
+               os.Exit(0)
+       }
+
+       var csv string
+
+       csv = "# Aggregate cost accounting for uuids:\n"
+       for _, uuid := range uuids {
+               csv += "# " + uuid + "\n"
+       }
+
+       var total float64
+       for k, v := range cost {
+               csv += k + "," + strconv.FormatFloat(v, 'f', 8, 64) + "\n"
+               total += v
+       }
+
+       csv += "TOTAL," + strconv.FormatFloat(total, 'f', 8, 64) + "\n"
+
+       // Write the resulting CSV file
+       aFile := resultsDir + "/" + time.Now().Format("2006-01-02-15-04-05") + "-aggregate-costaccounting.csv"
+       err = ioutil.WriteFile(aFile, []byte(csv), 0644)
+       if err != nil {
+               logger.Errorf("Error writing file with path %s: %s\n", aFile, err.Error())
+               os.Exit(1)
+       } else {
+               logger.Infof("\nAggregate cost accounting for all supplied uuids in %s\n", aFile)
+       }
+       return
+}