18071: Use dblock to avoid concurrent keep-balance ops.
[arvados.git] / lib / crunchrun / crunchrun.go
index ff02257f2b36739ccd241f4d49e7c4aa4acb2c1c..55790f727a61d289d5b0d5080fa2911ee7789515 100644 (file)
@@ -140,7 +140,9 @@ type ContainerRunner struct {
        MkArvClient   func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
        finalState    string
        parentTemp    string
+       costStartTime time.Time
 
+       keepstore        *exec.Cmd
        keepstoreLogger  io.WriteCloser
        keepstoreLogbuf  *bufThenWrite
        statLogger       io.WriteCloser
@@ -659,6 +661,9 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
        if err != nil {
                return nil, fmt.Errorf("while trying to start arv-mount: %v", err)
        }
+       if runner.hoststatReporter != nil && runner.ArvMount != nil {
+               runner.hoststatReporter.ReportPID("arv-mount", runner.ArvMount.Process.Pid)
+       }
 
        for _, p := range collectionPaths {
                _, err = os.Stat(p)
@@ -732,6 +737,7 @@ func (runner *ContainerRunner) startHoststat() error {
                PollPeriod: runner.statInterval,
        }
        runner.hoststatReporter.Start()
+       runner.hoststatReporter.ReportPID("crunch-run", os.Getpid())
        return nil
 }
 
@@ -999,6 +1005,7 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st
                env["ARVADOS_API_TOKEN"] = tok
                env["ARVADOS_API_HOST"] = os.Getenv("ARVADOS_API_HOST")
                env["ARVADOS_API_HOST_INSECURE"] = os.Getenv("ARVADOS_API_HOST_INSECURE")
+               env["ARVADOS_KEEP_SERVICES"] = os.Getenv("ARVADOS_KEEP_SERVICES")
        }
        workdir := runner.Container.Cwd
        if workdir == "." {
@@ -1456,6 +1463,10 @@ func (runner *ContainerRunner) UpdateContainerFinal() error {
        if runner.finalState == "Complete" && runner.OutputPDH != nil {
                update["output"] = *runner.OutputPDH
        }
+       var it arvados.InstanceType
+       if j := os.Getenv("InstanceType"); j != "" && json.Unmarshal([]byte(j), &it) == nil && it.Price > 0 {
+               update["cost"] = it.Price * time.Now().Sub(runner.costStartTime).Seconds() / time.Hour.Seconds()
+       }
        return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
 }
 
@@ -1488,6 +1499,7 @@ func (runner *ContainerRunner) Run() (err error) {
        runner.CrunchLog.Printf("Using FUSE mount: %s", v)
        runner.CrunchLog.Printf("Using container runtime: %s", runner.executor.Runtime())
        runner.CrunchLog.Printf("Executing container: %s", runner.Container.UUID)
+       runner.costStartTime = time.Now()
 
        hostname, hosterr := os.Hostname()
        if hosterr != nil {
@@ -1562,6 +1574,9 @@ func (runner *ContainerRunner) Run() (err error) {
        if err != nil {
                return
        }
+       if runner.keepstore != nil {
+               runner.hoststatReporter.ReportPID("keepstore", runner.keepstore.Process.Pid)
+       }
 
        // set up FUSE mount and binds
        bindmounts, err = runner.SetupMounts()
@@ -1743,6 +1758,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity")
        brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)")
        flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
+       version := flags.Bool("version", false, "Write version information to stdout and exit 0.")
 
        ignoreDetachFlag := false
        if len(args) > 0 && args[0] == "-no-detach" {
@@ -1758,6 +1774,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 
        if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok {
                return code
+       } else if *version {
+               fmt.Fprintln(stdout, prog, cmd.Version.String())
+               return 0
        } else if !*list && flags.NArg() != 1 {
                fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
                return 2
@@ -1842,6 +1861,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                return 1
        }
 
+       cr.keepstore = keepstore
        if keepstore == nil {
                // Log explanation (if any) for why we're not running
                // a local keepstore.
@@ -2054,7 +2074,8 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er
        // modify the cluster configuration that we feed it on stdin.
        configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers
 
-       ln, err := net.Listen("tcp", "localhost:0")
+       localaddr := localKeepstoreAddr()
+       ln, err := net.Listen("tcp", net.JoinHostPort(localaddr, "0"))
        if err != nil {
                return nil, err
        }
@@ -2064,7 +2085,7 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er
                return nil, err
        }
        ln.Close()
-       url := "http://localhost:" + port
+       url := "http://" + net.JoinHostPort(localaddr, port)
 
        fmt.Fprintf(logbuf, "starting keepstore on %s\n", url)
 
@@ -2156,3 +2177,43 @@ func currentUserAndGroups() string {
        }
        return s
 }
+
+// Return a suitable local interface address for a local keepstore
+// service. Currently this is the numerically lowest non-loopback ipv4
+// address assigned to a local interface that is not in any of the
+// link-local/vpn/loopback ranges 169.254/16, 100.64/10, or 127/8.
+func localKeepstoreAddr() string {
+       var ips []net.IP
+       // Ignore error (proceed with zero IPs)
+       addrs, _ := processIPs(os.Getpid())
+       for addr := range addrs {
+               ip := net.ParseIP(addr)
+               if ip == nil {
+                       // invalid
+                       continue
+               }
+               if ip.Mask(net.CIDRMask(8, 32)).Equal(net.IPv4(127, 0, 0, 0)) ||
+                       ip.Mask(net.CIDRMask(10, 32)).Equal(net.IPv4(100, 64, 0, 0)) ||
+                       ip.Mask(net.CIDRMask(16, 32)).Equal(net.IPv4(169, 254, 0, 0)) {
+                       // unsuitable
+                       continue
+               }
+               ips = append(ips, ip)
+       }
+       if len(ips) == 0 {
+               return "0.0.0.0"
+       }
+       sort.Slice(ips, func(ii, jj int) bool {
+               i, j := ips[ii], ips[jj]
+               if len(i) != len(j) {
+                       return len(i) < len(j)
+               }
+               for x := range i {
+                       if i[x] != j[x] {
+                               return i[x] < j[x]
+                       }
+               }
+               return false
+       })
+       return ips[0].String()
+}