Merge branch 'main' into 18842-arv-mount-disk-config
[arvados.git] / lib / crunchrun / crunchrun.go
index 68181395fadcbafba4227b0d7cc16eb4b2f8624e..51e154c0ecfb3b978844947480f1efe7fe2f6fa9 100644 (file)
@@ -140,7 +140,9 @@ type ContainerRunner struct {
        MkArvClient   func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
        finalState    string
        parentTemp    string
+       costStartTime time.Time
 
+       keepstore        *exec.Cmd
        keepstoreLogger  io.WriteCloser
        keepstoreLogbuf  *bufThenWrite
        statLogger       io.WriteCloser
@@ -426,7 +428,13 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
                arvMountCmd = append(arvMountCmd, "--allow-other")
        }
 
-       if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
+       if runner.Container.RuntimeConstraints.KeepCacheDisk > 0 {
+               keepcachedir, err := runner.MkTempDir(runner.parentTemp, "keepcache")
+               if err != nil {
+                       return nil, fmt.Errorf("while creating keep cache temp dir: %v", err)
+               }
+               arvMountCmd = append(arvMountCmd, "--disk-cache", "--disk-cache-dir", keepcachedir, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheDisk))
+       } else if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
                arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
        }
 
@@ -659,6 +667,9 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
        if err != nil {
                return nil, fmt.Errorf("while trying to start arv-mount: %v", err)
        }
+       if runner.hoststatReporter != nil && runner.ArvMount != nil {
+               runner.hoststatReporter.ReportPID("arv-mount", runner.ArvMount.Process.Pid)
+       }
 
        for _, p := range collectionPaths {
                _, err = os.Stat(p)
@@ -732,6 +743,7 @@ func (runner *ContainerRunner) startHoststat() error {
                PollPeriod: runner.statInterval,
        }
        runner.hoststatReporter.Start()
+       runner.hoststatReporter.ReportPID("crunch-run", os.Getpid())
        return nil
 }
 
@@ -1457,6 +1469,10 @@ func (runner *ContainerRunner) UpdateContainerFinal() error {
        if runner.finalState == "Complete" && runner.OutputPDH != nil {
                update["output"] = *runner.OutputPDH
        }
+       var it arvados.InstanceType
+       if j := os.Getenv("InstanceType"); j != "" && json.Unmarshal([]byte(j), &it) == nil && it.Price > 0 {
+               update["cost"] = it.Price * time.Now().Sub(runner.costStartTime).Seconds() / time.Hour.Seconds()
+       }
        return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
 }
 
@@ -1489,6 +1505,7 @@ func (runner *ContainerRunner) Run() (err error) {
        runner.CrunchLog.Printf("Using FUSE mount: %s", v)
        runner.CrunchLog.Printf("Using container runtime: %s", runner.executor.Runtime())
        runner.CrunchLog.Printf("Executing container: %s", runner.Container.UUID)
+       runner.costStartTime = time.Now()
 
        hostname, hosterr := os.Hostname()
        if hosterr != nil {
@@ -1563,6 +1580,9 @@ func (runner *ContainerRunner) Run() (err error) {
        if err != nil {
                return
        }
+       if runner.keepstore != nil {
+               runner.hoststatReporter.ReportPID("keepstore", runner.keepstore.Process.Pid)
+       }
 
        // set up FUSE mount and binds
        bindmounts, err = runner.SetupMounts()
@@ -1744,6 +1764,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity")
        brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)")
        flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
+       version := flags.Bool("version", false, "Write version information to stdout and exit 0.")
 
        ignoreDetachFlag := false
        if len(args) > 0 && args[0] == "-no-detach" {
@@ -1759,6 +1780,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 
        if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok {
                return code
+       } else if *version {
+               fmt.Fprintln(stdout, prog, cmd.Version.String())
+               return 0
        } else if !*list && flags.NArg() != 1 {
                fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
                return 2
@@ -1843,6 +1867,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                return 1
        }
 
+       cr.keepstore = keepstore
        if keepstore == nil {
                // Log explanation (if any) for why we're not running
                // a local keepstore.
@@ -1906,11 +1931,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                // not safe to run a gateway service without an auth
                // secret
                cr.CrunchLog.Printf("Not starting a gateway server (GatewayAuthSecret was not provided by dispatcher)")
-       } else if gwListen := os.Getenv("GatewayAddress"); gwListen == "" {
-               // dispatcher did not tell us which external IP
-               // address to advertise --> no gateway service
-               cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)")
        } else {
+               gwListen := os.Getenv("GatewayAddress")
                cr.gateway = Gateway{
                        Address:       gwListen,
                        AuthSecret:    gwAuthSecret,
@@ -1918,6 +1940,18 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                        Target:        cr.executor,
                        Log:           cr.CrunchLog,
                }
+               if gwListen == "" {
+                       // Direct connection won't work, so we use the
+                       // gateway_address field to indicate the
+                       // internalURL of the controller process that
+                       // has the current tunnel connection.
+                       cr.gateway.ArvadosClient = cr.dispatcherClient
+                       cr.gateway.UpdateTunnelURL = func(url string) {
+                               cr.gateway.Address = "tunnel " + url
+                               cr.DispatcherArvClient.Update("containers", containerUUID,
+                                       arvadosclient.Dict{"container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}}, nil)
+                       }
+               }
                err = cr.gateway.Start()
                if err != nil {
                        log.Printf("error starting gateway server: %s", err)