X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5e20c073d84304c3e84770bb7d89035bf1fb9626..aa9507e1633819259794ea4d6cf391dc88621dac:/lib/crunchrun/crunchrun.go diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index ff02257f2b..55790f727a 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -140,7 +140,9 @@ type ContainerRunner struct { MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) finalState string parentTemp string + costStartTime time.Time + keepstore *exec.Cmd keepstoreLogger io.WriteCloser keepstoreLogbuf *bufThenWrite statLogger io.WriteCloser @@ -659,6 +661,9 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { if err != nil { return nil, fmt.Errorf("while trying to start arv-mount: %v", err) } + if runner.hoststatReporter != nil && runner.ArvMount != nil { + runner.hoststatReporter.ReportPID("arv-mount", runner.ArvMount.Process.Pid) + } for _, p := range collectionPaths { _, err = os.Stat(p) @@ -732,6 +737,7 @@ func (runner *ContainerRunner) startHoststat() error { PollPeriod: runner.statInterval, } runner.hoststatReporter.Start() + runner.hoststatReporter.ReportPID("crunch-run", os.Getpid()) return nil } @@ -999,6 +1005,7 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st env["ARVADOS_API_TOKEN"] = tok env["ARVADOS_API_HOST"] = os.Getenv("ARVADOS_API_HOST") env["ARVADOS_API_HOST_INSECURE"] = os.Getenv("ARVADOS_API_HOST_INSECURE") + env["ARVADOS_KEEP_SERVICES"] = os.Getenv("ARVADOS_KEEP_SERVICES") } workdir := runner.Container.Cwd if workdir == "." { @@ -1456,6 +1463,10 @@ func (runner *ContainerRunner) UpdateContainerFinal() error { if runner.finalState == "Complete" && runner.OutputPDH != nil { update["output"] = *runner.OutputPDH } + var it arvados.InstanceType + if j := os.Getenv("InstanceType"); j != "" && json.Unmarshal([]byte(j), &it) == nil && it.Price > 0 { + update["cost"] = it.Price * time.Now().Sub(runner.costStartTime).Seconds() / time.Hour.Seconds() + } return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil) } @@ -1488,6 +1499,7 @@ func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Printf("Using FUSE mount: %s", v) runner.CrunchLog.Printf("Using container runtime: %s", runner.executor.Runtime()) runner.CrunchLog.Printf("Executing container: %s", runner.Container.UUID) + runner.costStartTime = time.Now() hostname, hosterr := os.Hostname() if hosterr != nil { @@ -1562,6 +1574,9 @@ func (runner *ContainerRunner) Run() (err error) { if err != nil { return } + if runner.keepstore != nil { + runner.hoststatReporter.ReportPID("keepstore", runner.keepstore.Process.Pid) + } // set up FUSE mount and binds bindmounts, err = runner.SetupMounts() @@ -1743,6 +1758,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity") brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)") flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.") + version := flags.Bool("version", false, "Write version information to stdout and exit 0.") ignoreDetachFlag := false if len(args) > 0 && args[0] == "-no-detach" { @@ -1758,6 +1774,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok { return code + } else if *version { + fmt.Fprintln(stdout, prog, cmd.Version.String()) + return 0 } else if !*list && flags.NArg() != 1 { fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n") return 2 @@ -1842,6 +1861,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s return 1 } + cr.keepstore = keepstore if keepstore == nil { // Log explanation (if any) for why we're not running // a local keepstore. @@ -2054,7 +2074,8 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er // modify the cluster configuration that we feed it on stdin. configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers - ln, err := net.Listen("tcp", "localhost:0") + localaddr := localKeepstoreAddr() + ln, err := net.Listen("tcp", net.JoinHostPort(localaddr, "0")) if err != nil { return nil, err } @@ -2064,7 +2085,7 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er return nil, err } ln.Close() - url := "http://localhost:" + port + url := "http://" + net.JoinHostPort(localaddr, port) fmt.Fprintf(logbuf, "starting keepstore on %s\n", url) @@ -2156,3 +2177,43 @@ func currentUserAndGroups() string { } return s } + +// Return a suitable local interface address for a local keepstore +// service. Currently this is the numerically lowest non-loopback ipv4 +// address assigned to a local interface that is not in any of the +// link-local/vpn/loopback ranges 169.254/16, 100.64/10, or 127/8. +func localKeepstoreAddr() string { + var ips []net.IP + // Ignore error (proceed with zero IPs) + addrs, _ := processIPs(os.Getpid()) + for addr := range addrs { + ip := net.ParseIP(addr) + if ip == nil { + // invalid + continue + } + if ip.Mask(net.CIDRMask(8, 32)).Equal(net.IPv4(127, 0, 0, 0)) || + ip.Mask(net.CIDRMask(10, 32)).Equal(net.IPv4(100, 64, 0, 0)) || + ip.Mask(net.CIDRMask(16, 32)).Equal(net.IPv4(169, 254, 0, 0)) { + // unsuitable + continue + } + ips = append(ips, ip) + } + if len(ips) == 0 { + return "0.0.0.0" + } + sort.Slice(ips, func(ii, jj int) bool { + i, j := ips[ii], ips[jj] + if len(i) != len(j) { + return len(i) < len(j) + } + for x := range i { + if i[x] != j[x] { + return i[x] < j[x] + } + } + return false + }) + return ips[0].String() +}