19320: Use AWS spot price data to calculate container cost.
[arvados.git] / lib / crunchrun / crunchrun.go
index eadf22876f9d6016668f049ddda2f100ce8eb0a3..1dd232d3ed2e7cecf92d5eb55164c1a3d2e76fc6 100644 (file)
@@ -31,6 +31,7 @@ import (
        "syscall"
        "time"
 
+       "git.arvados.org/arvados.git/lib/cloud"
        "git.arvados.org/arvados.git/lib/cmd"
        "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/lib/crunchstat"
@@ -140,7 +141,9 @@ type ContainerRunner struct {
        MkArvClient   func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
        finalState    string
        parentTemp    string
+       costStartTime time.Time
 
+       keepstore        *exec.Cmd
        keepstoreLogger  io.WriteCloser
        keepstoreLogbuf  *bufThenWrite
        statLogger       io.WriteCloser
@@ -175,6 +178,9 @@ type ContainerRunner struct {
        containerWatchdogInterval time.Duration
 
        gateway Gateway
+
+       prices     []cloud.InstancePrice
+       pricesLock sync.Mutex
 }
 
 // setupSignals sets up signal handling to gracefully terminate the
@@ -426,8 +432,14 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
                arvMountCmd = append(arvMountCmd, "--allow-other")
        }
 
-       if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
-               arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
+       if runner.Container.RuntimeConstraints.KeepCacheDisk > 0 {
+               keepcachedir, err := runner.MkTempDir(runner.parentTemp, "keepcache")
+               if err != nil {
+                       return nil, fmt.Errorf("while creating keep cache temp dir: %v", err)
+               }
+               arvMountCmd = append(arvMountCmd, "--disk-cache", "--disk-cache-dir", keepcachedir, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheDisk))
+       } else if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
+               arvMountCmd = append(arvMountCmd, "--ram-cache", "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
        }
 
        collectionPaths := []string{}
@@ -659,6 +671,9 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
        if err != nil {
                return nil, fmt.Errorf("while trying to start arv-mount: %v", err)
        }
+       if runner.hoststatReporter != nil && runner.ArvMount != nil {
+               runner.hoststatReporter.ReportPID("arv-mount", runner.ArvMount.Process.Pid)
+       }
 
        for _, p := range collectionPaths {
                _, err = os.Stat(p)
@@ -732,6 +747,7 @@ func (runner *ContainerRunner) startHoststat() error {
                PollPeriod: runner.statInterval,
        }
        runner.hoststatReporter.Start()
+       runner.hoststatReporter.ReportPID("crunch-run", os.Getpid())
        return nil
 }
 
@@ -1457,6 +1473,7 @@ func (runner *ContainerRunner) UpdateContainerFinal() error {
        if runner.finalState == "Complete" && runner.OutputPDH != nil {
                update["output"] = *runner.OutputPDH
        }
+       update["cost"] = runner.calculateCost(time.Now())
        return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
 }
 
@@ -1489,6 +1506,7 @@ func (runner *ContainerRunner) Run() (err error) {
        runner.CrunchLog.Printf("Using FUSE mount: %s", v)
        runner.CrunchLog.Printf("Using container runtime: %s", runner.executor.Runtime())
        runner.CrunchLog.Printf("Executing container: %s", runner.Container.UUID)
+       runner.costStartTime = time.Now()
 
        hostname, hosterr := os.Hostname()
        if hosterr != nil {
@@ -1497,6 +1515,16 @@ func (runner *ContainerRunner) Run() (err error) {
                runner.CrunchLog.Printf("Executing on host '%s'", hostname)
        }
 
+       sigusr2 := make(chan os.Signal, 1)
+       signal.Notify(sigusr2, syscall.SIGUSR2)
+       defer signal.Stop(sigusr2)
+       runner.loadPrices()
+       go func() {
+               for range sigusr2 {
+                       runner.loadPrices()
+               }
+       }()
+
        runner.finalState = "Queued"
 
        defer func() {
@@ -1563,6 +1591,9 @@ func (runner *ContainerRunner) Run() (err error) {
        if err != nil {
                return
        }
+       if runner.keepstore != nil {
+               runner.hoststatReporter.ReportPID("keepstore", runner.keepstore.Process.Pid)
+       }
 
        // set up FUSE mount and binds
        bindmounts, err = runner.SetupMounts()
@@ -1736,7 +1767,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        configFile := flags.String("config", arvados.DefaultConfigFile, "filename of cluster config file to try loading if -stdin-config=false (default is $ARVADOS_CONFIG)")
        sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
        kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
-       list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
+       list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes (and notify them to use price data passed on stdin)")
        enableMemoryLimit := flags.Bool("enable-memory-limit", true, "tell container runtime to limit container's memory usage")
        enableNetwork := flags.String("container-enable-networking", "default", "enable networking \"always\" (for all containers) or \"default\" (for containers that request it)")
        networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`)
@@ -1772,11 +1803,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 
        switch {
        case *detach && !ignoreDetachFlag:
-               return Detach(containerUUID, prog, args, os.Stdin, os.Stdout, os.Stderr)
+               return Detach(containerUUID, prog, args, stdin, stdout, stderr)
        case *kill >= 0:
-               return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr)
+               return KillProcess(containerUUID, syscall.Signal(*kill), stdout, stderr)
        case *list:
-               return ListProcesses(os.Stdout, os.Stderr)
+               return ListProcesses(stdin, stdout, stderr)
        }
 
        if len(containerUUID) != 27 {
@@ -1847,6 +1878,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                return 1
        }
 
+       cr.keepstore = keepstore
        if keepstore == nil {
                // Log explanation (if any) for why we're not running
                // a local keepstore.
@@ -2202,3 +2234,70 @@ func localKeepstoreAddr() string {
        })
        return ips[0].String()
 }
+
+func (cr *ContainerRunner) loadPrices() {
+       buf, err := os.ReadFile(filepath.Join(lockdir, pricesfile))
+       if err != nil {
+               if !os.IsNotExist(err) {
+                       cr.CrunchLog.Printf("loadPrices: read: %s", err)
+               }
+               return
+       }
+       var prices []cloud.InstancePrice
+       err = json.Unmarshal(buf, &prices)
+       if err != nil {
+               cr.CrunchLog.Printf("loadPrices: decode: %s", err)
+               return
+       }
+       cr.pricesLock.Lock()
+       defer cr.pricesLock.Unlock()
+       cr.prices = cloud.NormalizePriceHistory(append(prices, cr.prices...))
+}
+
+func (cr *ContainerRunner) calculateCost(now time.Time) float64 {
+       cr.pricesLock.Lock()
+       defer cr.pricesLock.Unlock()
+
+       // First, make a "prices" slice with the real data as far back
+       // as it goes, and (if needed) a "since the beginning of time"
+       // placeholder containing a reasonable guess about what the
+       // price was between cr.costStartTime and the earliest real
+       // data point.
+       prices := cr.prices
+       if len(prices) == 0 {
+               // use price info in InstanceType record initially
+               // provided by cloud dispatcher
+               var p float64
+               var it arvados.InstanceType
+               if j := os.Getenv("InstanceType"); j != "" && json.Unmarshal([]byte(j), &it) == nil && it.Price > 0 {
+                       p = it.Price
+               }
+               prices = []cloud.InstancePrice{{Price: p}}
+       } else if prices[len(prices)-1].StartTime.After(cr.costStartTime) {
+               // guess earlier pricing was the same as the earliest
+               // price we know about
+               filler := prices[len(prices)-1]
+               filler.StartTime = time.Time{}
+               prices = append(prices, filler)
+       }
+
+       // Now that our history of price changes goes back at least as
+       // far as cr.costStartTime, add up the costs for each
+       // interval.
+       cost := 0.0
+       spanEnd := now
+       for _, ip := range prices {
+               spanStart := ip.StartTime
+               last := false
+               if spanStart.Before(cr.costStartTime) {
+                       spanStart = cr.costStartTime
+                       last = true
+               }
+               cost += ip.Price * spanEnd.Sub(spanStart).Seconds() / 3600
+               if last {
+                       break
+               }
+               spanEnd = spanStart
+       }
+       return cost
+}