X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/420e857f8e8ac75beca258fa72b9edac680500cd..df5588faa32a61d40968cf5c0ef50bdfb36985e3:/lib/crunchrun/crunchrun.go diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index 68181395fa..3def8851ce 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -31,6 +31,7 @@ import ( "syscall" "time" + "git.arvados.org/arvados.git/lib/cloud" "git.arvados.org/arvados.git/lib/cmd" "git.arvados.org/arvados.git/lib/config" "git.arvados.org/arvados.git/lib/crunchstat" @@ -140,7 +141,9 @@ type ContainerRunner struct { MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) finalState string parentTemp string + costStartTime time.Time + keepstore *exec.Cmd keepstoreLogger io.WriteCloser keepstoreLogbuf *bufThenWrite statLogger io.WriteCloser @@ -175,6 +178,9 @@ type ContainerRunner struct { containerWatchdogInterval time.Duration gateway Gateway + + prices []cloud.InstancePrice + pricesLock sync.Mutex } // setupSignals sets up signal handling to gracefully terminate the @@ -426,8 +432,14 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { arvMountCmd = append(arvMountCmd, "--allow-other") } - if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 { - arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM)) + if runner.Container.RuntimeConstraints.KeepCacheDisk > 0 { + keepcachedir, err := runner.MkTempDir(runner.parentTemp, "keepcache") + if err != nil { + return nil, fmt.Errorf("while creating keep cache temp dir: %v", err) + } + arvMountCmd = append(arvMountCmd, "--disk-cache", "--disk-cache-dir", keepcachedir, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheDisk)) + } else if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 { + arvMountCmd = append(arvMountCmd, "--ram-cache", "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM)) } collectionPaths := []string{} @@ -659,6 +671,9 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { if err != nil { return nil, fmt.Errorf("while trying to start arv-mount: %v", err) } + if runner.hoststatReporter != nil && runner.ArvMount != nil { + runner.hoststatReporter.ReportPID("arv-mount", runner.ArvMount.Process.Pid) + } for _, p := range collectionPaths { _, err = os.Stat(p) @@ -732,6 +747,7 @@ func (runner *ContainerRunner) startHoststat() error { PollPeriod: runner.statInterval, } runner.hoststatReporter.Start() + runner.hoststatReporter.ReportPID("crunch-run", os.Getpid()) return nil } @@ -1415,14 +1431,25 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C } // UpdateContainerRunning updates the container state to "Running" -func (runner *ContainerRunner) UpdateContainerRunning() error { +func (runner *ContainerRunner) UpdateContainerRunning(logId string) error { runner.cStateLock.Lock() defer runner.cStateLock.Unlock() if runner.cCancelled { return ErrCancelled } - return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, - arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running", "gateway_address": runner.gateway.Address}}, nil) + updates := arvadosclient.Dict{ + "gateway_address": runner.gateway.Address, + "state": "Running", + } + if logId != "" { + updates["log"] = logId + } + return runner.DispatcherArvClient.Update( + "containers", + runner.Container.UUID, + arvadosclient.Dict{"container": updates}, + nil, + ) } // ContainerToken returns the api_token the container (and any @@ -1457,6 +1484,7 @@ func (runner *ContainerRunner) UpdateContainerFinal() error { if runner.finalState == "Complete" && runner.OutputPDH != nil { update["output"] = *runner.OutputPDH } + update["cost"] = runner.calculateCost(time.Now()) return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil) } @@ -1489,6 +1517,7 @@ func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Printf("Using FUSE mount: %s", v) runner.CrunchLog.Printf("Using container runtime: %s", runner.executor.Runtime()) runner.CrunchLog.Printf("Executing container: %s", runner.Container.UUID) + runner.costStartTime = time.Now() hostname, hosterr := os.Hostname() if hosterr != nil { @@ -1497,6 +1526,16 @@ func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Printf("Executing on host '%s'", hostname) } + sigusr2 := make(chan os.Signal, 1) + signal.Notify(sigusr2, syscall.SIGUSR2) + defer signal.Stop(sigusr2) + runner.loadPrices() + go func() { + for range sigusr2 { + runner.loadPrices() + } + }() + runner.finalState = "Queued" defer func() { @@ -1563,6 +1602,9 @@ func (runner *ContainerRunner) Run() (err error) { if err != nil { return } + if runner.keepstore != nil { + runner.hoststatReporter.ReportPID("keepstore", runner.keepstore.Process.Pid) + } // set up FUSE mount and binds bindmounts, err = runner.SetupMounts() @@ -1605,7 +1647,14 @@ func (runner *ContainerRunner) Run() (err error) { return } - err = runner.UpdateContainerRunning() + logCollection, err := runner.saveLogCollection(false) + var logId string + if err == nil { + logId = logCollection.PortableDataHash + } else { + runner.CrunchLog.Printf("Error committing initial log collection: %v", err) + } + err = runner.UpdateContainerRunning(logId) if err != nil { return } @@ -1736,7 +1785,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s configFile := flags.String("config", arvados.DefaultConfigFile, "filename of cluster config file to try loading if -stdin-config=false (default is $ARVADOS_CONFIG)") sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)") kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID") - list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes") + list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes (and notify them to use price data passed on stdin)") enableMemoryLimit := flags.Bool("enable-memory-limit", true, "tell container runtime to limit container's memory usage") enableNetwork := flags.String("container-enable-networking", "default", "enable networking \"always\" (for all containers) or \"default\" (for containers that request it)") networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`) @@ -1744,6 +1793,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity") brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)") flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.") + version := flags.Bool("version", false, "Write version information to stdout and exit 0.") ignoreDetachFlag := false if len(args) > 0 && args[0] == "-no-detach" { @@ -1759,6 +1809,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok { return code + } else if *version { + fmt.Fprintln(stdout, prog, cmd.Version.String()) + return 0 } else if !*list && flags.NArg() != 1 { fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n") return 2 @@ -1768,11 +1821,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s switch { case *detach && !ignoreDetachFlag: - return Detach(containerUUID, prog, args, os.Stdin, os.Stdout, os.Stderr) + return Detach(containerUUID, prog, args, stdin, stdout, stderr) case *kill >= 0: - return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr) + return KillProcess(containerUUID, syscall.Signal(*kill), stdout, stderr) case *list: - return ListProcesses(os.Stdout, os.Stderr) + return ListProcesses(stdin, stdout, stderr) } if len(containerUUID) != 27 { @@ -1843,6 +1896,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s return 1 } + cr.keepstore = keepstore if keepstore == nil { // Log explanation (if any) for why we're not running // a local keepstore. @@ -1906,11 +1960,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s // not safe to run a gateway service without an auth // secret cr.CrunchLog.Printf("Not starting a gateway server (GatewayAuthSecret was not provided by dispatcher)") - } else if gwListen := os.Getenv("GatewayAddress"); gwListen == "" { - // dispatcher did not tell us which external IP - // address to advertise --> no gateway service - cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)") } else { + gwListen := os.Getenv("GatewayAddress") cr.gateway = Gateway{ Address: gwListen, AuthSecret: gwAuthSecret, @@ -1918,6 +1969,18 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s Target: cr.executor, Log: cr.CrunchLog, } + if gwListen == "" { + // Direct connection won't work, so we use the + // gateway_address field to indicate the + // internalURL of the controller process that + // has the current tunnel connection. + cr.gateway.ArvadosClient = cr.dispatcherClient + cr.gateway.UpdateTunnelURL = func(url string) { + cr.gateway.Address = "tunnel " + url + cr.DispatcherArvClient.Update("containers", containerUUID, + arvadosclient.Dict{"container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}}, nil) + } + } err = cr.gateway.Start() if err != nil { log.Printf("error starting gateway server: %s", err) @@ -2189,3 +2252,87 @@ func localKeepstoreAddr() string { }) return ips[0].String() } + +func (cr *ContainerRunner) loadPrices() { + buf, err := os.ReadFile(filepath.Join(lockdir, pricesfile)) + if err != nil { + if !os.IsNotExist(err) { + cr.CrunchLog.Printf("loadPrices: read: %s", err) + } + return + } + var prices []cloud.InstancePrice + err = json.Unmarshal(buf, &prices) + if err != nil { + cr.CrunchLog.Printf("loadPrices: decode: %s", err) + return + } + cr.pricesLock.Lock() + defer cr.pricesLock.Unlock() + var lastKnown time.Time + if len(cr.prices) > 0 { + lastKnown = cr.prices[0].StartTime + } + cr.prices = cloud.NormalizePriceHistory(append(prices, cr.prices...)) + for i := len(cr.prices) - 1; i >= 0; i-- { + price := cr.prices[i] + if price.StartTime.After(lastKnown) { + cr.CrunchLog.Printf("Instance price changed to %#.3g at %s", price.Price, price.StartTime.UTC()) + } + } +} + +func (cr *ContainerRunner) calculateCost(now time.Time) float64 { + cr.pricesLock.Lock() + defer cr.pricesLock.Unlock() + + // First, make a "prices" slice with the real data as far back + // as it goes, and (if needed) a "since the beginning of time" + // placeholder containing a reasonable guess about what the + // price was between cr.costStartTime and the earliest real + // data point. + prices := cr.prices + if len(prices) == 0 { + // use price info in InstanceType record initially + // provided by cloud dispatcher + var p float64 + var it arvados.InstanceType + if j := os.Getenv("InstanceType"); j != "" && json.Unmarshal([]byte(j), &it) == nil && it.Price > 0 { + p = it.Price + } + prices = []cloud.InstancePrice{{Price: p}} + } else if prices[len(prices)-1].StartTime.After(cr.costStartTime) { + // guess earlier pricing was the same as the earliest + // price we know about + filler := prices[len(prices)-1] + filler.StartTime = time.Time{} + prices = append(prices, filler) + } + + // Now that our history of price changes goes back at least as + // far as cr.costStartTime, add up the costs for each + // interval. + cost := 0.0 + spanEnd := now + for _, ip := range prices { + spanStart := ip.StartTime + if spanStart.After(now) { + // pricing information from the future -- not + // expected from AWS, but possible in + // principle, and exercised by tests. + continue + } + last := false + if spanStart.Before(cr.costStartTime) { + spanStart = cr.costStartTime + last = true + } + cost += ip.Price * spanEnd.Sub(spanStart).Seconds() / 3600 + if last { + break + } + spanEnd = spanStart + } + + return cost +}