X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c36ec856598f214e340e3335ddd347d131335bf8..09cbdc3074b3f1e69c9c537875146f6da0a6ed8f:/lib/crunchrun/crunchrun.go diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index 0253ac3fa8..51e154c0ec 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -140,7 +140,9 @@ type ContainerRunner struct { MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) finalState string parentTemp string + costStartTime time.Time + keepstore *exec.Cmd keepstoreLogger io.WriteCloser keepstoreLogbuf *bufThenWrite statLogger io.WriteCloser @@ -426,7 +428,13 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { arvMountCmd = append(arvMountCmd, "--allow-other") } - if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 { + if runner.Container.RuntimeConstraints.KeepCacheDisk > 0 { + keepcachedir, err := runner.MkTempDir(runner.parentTemp, "keepcache") + if err != nil { + return nil, fmt.Errorf("while creating keep cache temp dir: %v", err) + } + arvMountCmd = append(arvMountCmd, "--disk-cache", "--disk-cache-dir", keepcachedir, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheDisk)) + } else if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 { arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM)) } @@ -659,6 +667,9 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { if err != nil { return nil, fmt.Errorf("while trying to start arv-mount: %v", err) } + if runner.hoststatReporter != nil && runner.ArvMount != nil { + runner.hoststatReporter.ReportPID("arv-mount", runner.ArvMount.Process.Pid) + } for _, p := range collectionPaths { _, err = os.Stat(p) @@ -732,6 +743,7 @@ func (runner *ContainerRunner) startHoststat() error { PollPeriod: runner.statInterval, } runner.hoststatReporter.Start() + runner.hoststatReporter.ReportPID("crunch-run", os.Getpid()) return nil } @@ -999,6 +1011,7 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st env["ARVADOS_API_TOKEN"] = tok env["ARVADOS_API_HOST"] = os.Getenv("ARVADOS_API_HOST") env["ARVADOS_API_HOST_INSECURE"] = os.Getenv("ARVADOS_API_HOST_INSECURE") + env["ARVADOS_KEEP_SERVICES"] = os.Getenv("ARVADOS_KEEP_SERVICES") } workdir := runner.Container.Cwd if workdir == "." { @@ -1095,6 +1108,12 @@ func (runner *ContainerRunner) WaitFinish() error { } } runner.CrunchLog.Printf("Container exited with status code %d%s", exitcode, extra) + err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ + "container": arvadosclient.Dict{"exit_code": exitcode}, + }, nil) + if err != nil { + runner.CrunchLog.Printf("ignoring error updating exit_code: %s", err) + } var returnErr error if err = runner.executorStdin.Close(); err != nil { @@ -1162,10 +1181,9 @@ func (runner *ContainerRunner) updateLogs() { continue } - var updated arvados.Container err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ "container": arvadosclient.Dict{"log": saved.PortableDataHash}, - }, &updated) + }, nil) if err != nil { runner.CrunchLog.Printf("error updating container log to %s: %s", saved.PortableDataHash, err) continue @@ -1443,13 +1461,17 @@ func (runner *ContainerRunner) UpdateContainerFinal() error { if runner.LogsPDH != nil { update["log"] = *runner.LogsPDH } - if runner.finalState == "Complete" { - if runner.ExitCode != nil { - update["exit_code"] = *runner.ExitCode - } - if runner.OutputPDH != nil { - update["output"] = *runner.OutputPDH - } + if runner.ExitCode != nil { + update["exit_code"] = *runner.ExitCode + } else { + update["exit_code"] = nil + } + if runner.finalState == "Complete" && runner.OutputPDH != nil { + update["output"] = *runner.OutputPDH + } + var it arvados.InstanceType + if j := os.Getenv("InstanceType"); j != "" && json.Unmarshal([]byte(j), &it) == nil && it.Price > 0 { + update["cost"] = it.Price * time.Now().Sub(runner.costStartTime).Seconds() / time.Hour.Seconds() } return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil) } @@ -1483,6 +1505,7 @@ func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Printf("Using FUSE mount: %s", v) runner.CrunchLog.Printf("Using container runtime: %s", runner.executor.Runtime()) runner.CrunchLog.Printf("Executing container: %s", runner.Container.UUID) + runner.costStartTime = time.Now() hostname, hosterr := os.Hostname() if hosterr != nil { @@ -1557,6 +1580,9 @@ func (runner *ContainerRunner) Run() (err error) { if err != nil { return } + if runner.keepstore != nil { + runner.hoststatReporter.ReportPID("keepstore", runner.keepstore.Process.Pid) + } // set up FUSE mount and binds bindmounts, err = runner.SetupMounts() @@ -1738,6 +1764,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity") brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)") flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.") + version := flags.Bool("version", false, "Write version information to stdout and exit 0.") ignoreDetachFlag := false if len(args) > 0 && args[0] == "-no-detach" { @@ -1753,6 +1780,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok { return code + } else if *version { + fmt.Fprintln(stdout, prog, cmd.Version.String()) + return 0 } else if !*list && flags.NArg() != 1 { fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n") return 2 @@ -1837,6 +1867,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s return 1 } + cr.keepstore = keepstore if keepstore == nil { // Log explanation (if any) for why we're not running // a local keepstore. @@ -1900,18 +1931,26 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s // not safe to run a gateway service without an auth // secret cr.CrunchLog.Printf("Not starting a gateway server (GatewayAuthSecret was not provided by dispatcher)") - } else if gwListen := os.Getenv("GatewayAddress"); gwListen == "" { - // dispatcher did not tell us which external IP - // address to advertise --> no gateway service - cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)") - } else if de, ok := cr.executor.(*dockerExecutor); ok { + } else { + gwListen := os.Getenv("GatewayAddress") cr.gateway = Gateway{ - Address: gwListen, - AuthSecret: gwAuthSecret, - ContainerUUID: containerUUID, - DockerContainerID: &de.containerID, - Log: cr.CrunchLog, - ContainerIPAddress: dockerContainerIPAddress(&de.containerID), + Address: gwListen, + AuthSecret: gwAuthSecret, + ContainerUUID: containerUUID, + Target: cr.executor, + Log: cr.CrunchLog, + } + if gwListen == "" { + // Direct connection won't work, so we use the + // gateway_address field to indicate the + // internalURL of the controller process that + // has the current tunnel connection. + cr.gateway.ArvadosClient = cr.dispatcherClient + cr.gateway.UpdateTunnelURL = func(url string) { + cr.gateway.Address = "tunnel " + url + cr.DispatcherArvClient.Update("containers", containerUUID, + arvadosclient.Dict{"container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}}, nil) + } } err = cr.gateway.Start() if err != nil { @@ -2041,7 +2080,8 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er // modify the cluster configuration that we feed it on stdin. configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers - ln, err := net.Listen("tcp", "localhost:0") + localaddr := localKeepstoreAddr() + ln, err := net.Listen("tcp", net.JoinHostPort(localaddr, "0")) if err != nil { return nil, err } @@ -2051,7 +2091,7 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er return nil, err } ln.Close() - url := "http://localhost:" + port + url := "http://" + net.JoinHostPort(localaddr, port) fmt.Fprintf(logbuf, "starting keepstore on %s\n", url) @@ -2143,3 +2183,43 @@ func currentUserAndGroups() string { } return s } + +// Return a suitable local interface address for a local keepstore +// service. Currently this is the numerically lowest non-loopback ipv4 +// address assigned to a local interface that is not in any of the +// link-local/vpn/loopback ranges 169.254/16, 100.64/10, or 127/8. +func localKeepstoreAddr() string { + var ips []net.IP + // Ignore error (proceed with zero IPs) + addrs, _ := processIPs(os.Getpid()) + for addr := range addrs { + ip := net.ParseIP(addr) + if ip == nil { + // invalid + continue + } + if ip.Mask(net.CIDRMask(8, 32)).Equal(net.IPv4(127, 0, 0, 0)) || + ip.Mask(net.CIDRMask(10, 32)).Equal(net.IPv4(100, 64, 0, 0)) || + ip.Mask(net.CIDRMask(16, 32)).Equal(net.IPv4(169, 254, 0, 0)) { + // unsuitable + continue + } + ips = append(ips, ip) + } + if len(ips) == 0 { + return "0.0.0.0" + } + sort.Slice(ips, func(ii, jj int) bool { + i, j := ips[ii], ips[jj] + if len(i) != len(j) { + return len(i) < len(j) + } + for x := range i { + if i[x] != j[x] { + return i[x] < j[x] + } + } + return false + }) + return ips[0].String() +}