19967: Update container cost when crunch-run receives SIGUSR2 19967-crunch-run-cost-updates
authorBrett Smith <brett.smith@curii.com>
Mon, 20 Mar 2023 19:52:56 +0000 (15:52 -0400)
committerBrett Smith <brett.smith@curii.com>
Mon, 20 Mar 2023 19:52:56 +0000 (15:52 -0400)
The Crunch dispatcher sends this signal periodically, so this provides
regular cost updates for running containers.

Arvados-DCO-1.1-Signed-off-by: Brett Smith <brett.smith@curii.com>

lib/crunchrun/crunchrun.go
lib/crunchrun/crunchrun_test.go

index 3708be0c2417d5e603a671d1c2058517b4d0a807..3f254496ba488bb3aea553b65d50937e59652096 100644 (file)
@@ -1643,11 +1643,7 @@ func (runner *ContainerRunner) Run() (err error) {
        signal.Notify(sigusr2, syscall.SIGUSR2)
        defer signal.Stop(sigusr2)
        runner.loadPrices()
-       go func() {
-               for range sigusr2 {
-                       runner.loadPrices()
-               }
-       }()
+       go runner.handleSIGUSR2(sigusr2)
 
        runner.finalState = "Queued"
 
@@ -2453,3 +2449,15 @@ func (cr *ContainerRunner) calculateCost(now time.Time) float64 {
 
        return cost
 }
+
+func (runner *ContainerRunner) handleSIGUSR2(sigchan chan os.Signal) {
+       for range sigchan {
+               runner.loadPrices()
+               update := arvadosclient.Dict{
+                       "container": arvadosclient.Dict{
+                               "cost": runner.calculateCost(time.Now()),
+                       },
+               }
+               runner.DispatcherArvClient.Update("containers", runner.Container.UUID, update, nil)
+       }
+}
index 701be4517b631178924d2be2fbcdef5fdfd85139..56a605bdb8bdf5f109018a0520d579fcdc9e1056 100644 (file)
@@ -18,6 +18,7 @@ import (
        "net/http/httptest"
        "os"
        "os/exec"
+       "path"
        "regexp"
        "runtime/pprof"
        "strconv"
@@ -2350,6 +2351,80 @@ func (s *TestSuite) TestCalculateCost(c *C) {
        c.Check(logbuf.String(), Not(Matches), `(?ms).*changed to 2\.00 .* changed to 2\.00 .*`)
 }
 
+func (s *TestSuite) TestSIGUSR2CostUpdate(c *C) {
+       pid := os.Getpid()
+       now := time.Now()
+       pricesJSON, err := json.Marshal([]cloud.InstancePrice{
+               {StartTime: now.Add(-4 * time.Hour), Price: 2.4},
+               {StartTime: now.Add(-2 * time.Hour), Price: 2.6},
+       })
+       c.Assert(err, IsNil)
+
+       os.Setenv("InstanceType", `{"Price":2.2}`)
+       defer os.Unsetenv("InstanceType")
+       defer func(s string) { lockdir = s }(lockdir)
+       lockdir = c.MkDir()
+
+       // We can't use s.api.CalledWith because timing differences will yield
+       // different cost values across runs. getCostUpdate iterates over API
+       // calls until it finds one that sets the cost, then writes that value
+       // to the next index of costUpdates.
+       deadline := now.Add(time.Second)
+       costUpdates := make([]float64, 2)
+       costIndex := 0
+       apiIndex := 0
+       getCostUpdate := func() {
+               for ; time.Now().Before(deadline); time.Sleep(time.Second / 10) {
+                       for apiIndex < len(s.api.Content) {
+                               update := s.api.Content[apiIndex]
+                               apiIndex++
+                               var ok bool
+                               var cost float64
+                               if update, ok = update["container"].(arvadosclient.Dict); !ok {
+                                       continue
+                               }
+                               if cost, ok = update["cost"].(float64); !ok {
+                                       continue
+                               }
+                               c.Logf("API call #%d updates cost to %v", apiIndex-1, cost)
+                               costUpdates[costIndex] = cost
+                               costIndex++
+                               return
+                       }
+               }
+       }
+
+       s.fullRunHelper(c, `{
+               "command": ["true"],
+               "container_image": "`+arvadostest.DockerImage112PDH+`",
+               "cwd": ".",
+               "environment": {},
+               "mounts": {"/tmp": {"kind": "tmp"} },
+               "output_path": "/tmp",
+               "priority": 1,
+               "runtime_constraints": {},
+               "state": "Locked",
+               "uuid": "zzzzz-dz642-20230320101530a"
+       }`, nil, func() int {
+               s.runner.costStartTime = now.Add(-3 * time.Hour)
+               err := syscall.Kill(pid, syscall.SIGUSR2)
+               c.Check(err, IsNil, Commentf("error sending first SIGUSR2 to runner"))
+               getCostUpdate()
+
+               err = os.WriteFile(path.Join(lockdir, pricesfile), pricesJSON, 0o700)
+               c.Check(err, IsNil, Commentf("error writing JSON prices file"))
+               err = syscall.Kill(pid, syscall.SIGUSR2)
+               c.Check(err, IsNil, Commentf("error sending second SIGUSR2 to runner"))
+               getCostUpdate()
+
+               return 0
+       })
+       // Comparing with format strings makes it easy to ignore minor variations
+       // in cost across runs while keeping diagnostics pretty.
+       c.Check(fmt.Sprintf("%.3f", costUpdates[0]), Equals, "6.600")
+       c.Check(fmt.Sprintf("%.3f", costUpdates[1]), Equals, "7.600")
+}
+
 type FakeProcess struct {
        cmdLine []string
 }