18071: Use dblock to avoid concurrent keep-balance ops.
[arvados.git] / lib / crunchrun / crunchrun.go
index 68181395fadcbafba4227b0d7cc16eb4b2f8624e..55790f727a61d289d5b0d5080fa2911ee7789515 100644 (file)
@@ -140,7 +140,9 @@ type ContainerRunner struct {
        MkArvClient   func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
        finalState    string
        parentTemp    string
+       costStartTime time.Time
 
+       keepstore        *exec.Cmd
        keepstoreLogger  io.WriteCloser
        keepstoreLogbuf  *bufThenWrite
        statLogger       io.WriteCloser
@@ -659,6 +661,9 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
        if err != nil {
                return nil, fmt.Errorf("while trying to start arv-mount: %v", err)
        }
+       if runner.hoststatReporter != nil && runner.ArvMount != nil {
+               runner.hoststatReporter.ReportPID("arv-mount", runner.ArvMount.Process.Pid)
+       }
 
        for _, p := range collectionPaths {
                _, err = os.Stat(p)
@@ -732,6 +737,7 @@ func (runner *ContainerRunner) startHoststat() error {
                PollPeriod: runner.statInterval,
        }
        runner.hoststatReporter.Start()
+       runner.hoststatReporter.ReportPID("crunch-run", os.Getpid())
        return nil
 }
 
@@ -1457,6 +1463,10 @@ func (runner *ContainerRunner) UpdateContainerFinal() error {
        if runner.finalState == "Complete" && runner.OutputPDH != nil {
                update["output"] = *runner.OutputPDH
        }
+       var it arvados.InstanceType
+       if j := os.Getenv("InstanceType"); j != "" && json.Unmarshal([]byte(j), &it) == nil && it.Price > 0 {
+               update["cost"] = it.Price * time.Now().Sub(runner.costStartTime).Seconds() / time.Hour.Seconds()
+       }
        return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
 }
 
@@ -1489,6 +1499,7 @@ func (runner *ContainerRunner) Run() (err error) {
        runner.CrunchLog.Printf("Using FUSE mount: %s", v)
        runner.CrunchLog.Printf("Using container runtime: %s", runner.executor.Runtime())
        runner.CrunchLog.Printf("Executing container: %s", runner.Container.UUID)
+       runner.costStartTime = time.Now()
 
        hostname, hosterr := os.Hostname()
        if hosterr != nil {
@@ -1563,6 +1574,9 @@ func (runner *ContainerRunner) Run() (err error) {
        if err != nil {
                return
        }
+       if runner.keepstore != nil {
+               runner.hoststatReporter.ReportPID("keepstore", runner.keepstore.Process.Pid)
+       }
 
        // set up FUSE mount and binds
        bindmounts, err = runner.SetupMounts()
@@ -1744,6 +1758,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity")
        brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)")
        flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
+       version := flags.Bool("version", false, "Write version information to stdout and exit 0.")
 
        ignoreDetachFlag := false
        if len(args) > 0 && args[0] == "-no-detach" {
@@ -1759,6 +1774,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 
        if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok {
                return code
+       } else if *version {
+               fmt.Fprintln(stdout, prog, cmd.Version.String())
+               return 0
        } else if !*list && flags.NArg() != 1 {
                fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
                return 2
@@ -1843,6 +1861,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                return 1
        }
 
+       cr.keepstore = keepstore
        if keepstore == nil {
                // Log explanation (if any) for why we're not running
                // a local keepstore.
@@ -1906,11 +1925,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                // not safe to run a gateway service without an auth
                // secret
                cr.CrunchLog.Printf("Not starting a gateway server (GatewayAuthSecret was not provided by dispatcher)")
-       } else if gwListen := os.Getenv("GatewayAddress"); gwListen == "" {
-               // dispatcher did not tell us which external IP
-               // address to advertise --> no gateway service
-               cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)")
        } else {
+               gwListen := os.Getenv("GatewayAddress")
                cr.gateway = Gateway{
                        Address:       gwListen,
                        AuthSecret:    gwAuthSecret,
@@ -1918,6 +1934,18 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                        Target:        cr.executor,
                        Log:           cr.CrunchLog,
                }
+               if gwListen == "" {
+                       // Direct connection won't work, so we use the
+                       // gateway_address field to indicate the
+                       // internalURL of the controller process that
+                       // has the current tunnel connection.
+                       cr.gateway.ArvadosClient = cr.dispatcherClient
+                       cr.gateway.UpdateTunnelURL = func(url string) {
+                               cr.gateway.Address = "tunnel " + url
+                               cr.DispatcherArvClient.Update("containers", containerUUID,
+                                       arvadosclient.Dict{"container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}}, nil)
+                       }
+               }
                err = cr.gateway.Start()
                if err != nil {
                        log.Printf("error starting gateway server: %s", err)