import (
"bytes"
+ "context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
+ "io/fs"
"io/ioutil"
"log"
+ "net"
+ "net/http"
"os"
"os/exec"
"os/signal"
+ "os/user"
"path"
"path/filepath"
"regexp"
"syscall"
"time"
+ "git.arvados.org/arvados.git/lib/cloud"
"git.arvados.org/arvados.git/lib/cmd"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/crunchstat"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/keepclient"
"git.arvados.org/arvados.git/sdk/go/manifest"
- "golang.org/x/net/context"
+ "golang.org/x/sys/unix"
)
type command struct{}
var Command = command{}
+// ConfigData contains environment variables and (when needed) cluster
+// configuration, passed from dispatchcloud to crunch-run on stdin.
+type ConfigData struct {
+ Env map[string]string
+ KeepBuffers int
+ EC2SpotCheck bool
+ Cluster *arvados.Cluster
+}
+
// IArvadosClient is the minimal Arvados API methods used by crunch-run.
type IArvadosClient interface {
Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
// NewLogWriter is a factory function to create a new log writer.
type NewLogWriter func(name string) (io.WriteCloser, error)
-type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
+type RunArvMount func(cmdline []string, tok string) (*exec.Cmd, error)
type MkTempDir func(string, string) (string, error)
MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
finalState string
parentTemp string
+ costStartTime time.Time
+ keepstore *exec.Cmd
+ keepstoreLogger io.WriteCloser
+ keepstoreLogbuf *bufThenWrite
statLogger io.WriteCloser
statReporter *crunchstat.Reporter
hoststatLogger io.WriteCloser
hoststatReporter *crunchstat.Reporter
statInterval time.Duration
- cgroupRoot string
- // What we expect the container's cgroup parent to be.
- expectCgroupParent string
// What we tell docker to use as the container's cgroup
- // parent. Note: Ideally we would use the same field for both
- // expectCgroupParent and setCgroupParent, and just make it
- // default to "docker". However, when using docker < 1.10 with
- // systemd, specifying a non-empty cgroup parent (even the
- // default value "docker") hits a docker bug
- // (https://github.com/docker/docker/issues/17126). Using two
- // separate fields makes it possible to use the "expect cgroup
- // parent to be X" feature even on sites where the "specify
- // cgroup parent" feature breaks.
+ // parent.
setCgroupParent string
+ // Fake root dir where crunchstat.Reporter should read OS
+ // files, for testing.
+ crunchstatFakeFS fs.FS
cStateLock sync.Mutex
cCancelled bool // StopContainer() invoked
enableMemoryLimit bool
enableNetwork string // one of "default" or "always"
networkMode string // "none", "host", or "" -- passed through to executor
+ brokenNodeHook string // script to run if node appears to be broken
arvMountLog *ThrottledLogger
containerWatchdogInterval time.Duration
gateway Gateway
+
+ prices []cloud.InstancePrice
+ pricesLock sync.Mutex
}
// setupSignals sets up signal handling to gracefully terminate the
"(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*",
"(?ms).*grpc: the connection is unavailable.*",
}
-var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
func (runner *ContainerRunner) runBrokenNodeHook() {
- if *brokenNodeHook == "" {
+ if runner.brokenNodeHook == "" {
path := filepath.Join(lockdir, brokenfile)
runner.CrunchLog.Printf("Writing %s to mark node as broken", path)
f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700)
}
f.Close()
} else {
- runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
+ runner.CrunchLog.Printf("Running broken node hook %q", runner.brokenNodeHook)
// run killme script
- c := exec.Command(*brokenNodeHook)
+ c := exec.Command(runner.brokenNodeHook)
c.Stdout = runner.CrunchLog
c.Stderr = runner.CrunchLog
err := c.Run()
return "", fmt.Errorf("cannot choose from multiple tar files in image collection: %v", tarfiles)
}
imageID := tarfiles[0][:len(tarfiles[0])-4]
- imageFile := runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage + "/" + tarfiles[0]
+ imageTarballPath := runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage + "/" + imageID + ".tar"
runner.CrunchLog.Printf("Using Docker image id %q", imageID)
- if !runner.executor.ImageLoaded(imageID) {
- runner.CrunchLog.Print("Loading Docker image from keep")
- err = runner.executor.LoadImage(imageFile)
- if err != nil {
- return "", err
- }
- } else {
- runner.CrunchLog.Print("Docker image is available")
+ runner.CrunchLog.Print("Loading Docker image from keep")
+ err = runner.executor.LoadImage(imageID, imageTarballPath, runner.Container, runner.ArvMountPoint,
+ runner.containerClient)
+ if err != nil {
+ return "", err
}
+
return imageID, nil
}
-func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) {
- c = exec.Command("arv-mount", arvMountCmd...)
+func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *exec.Cmd, err error) {
+ c = exec.Command(cmdline[0], cmdline[1:]...)
// Copy our environment, but override ARVADOS_API_TOKEN with
// the container auth token.
return nil, err
}
runner.arvMountLog = NewThrottledLogger(w)
+ scanner := logScanner{
+ Patterns: []string{
+ "Keep write error",
+ "Block not found error",
+ "Unhandled exception during FUSE operation",
+ },
+ ReportFunc: func(pattern, text string) {
+ runner.updateRuntimeStatus(arvadosclient.Dict{
+ "warning": "arv-mount: " + pattern,
+ "warningDetail": text,
+ })
+ },
+ }
c.Stdout = runner.arvMountLog
- c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr)
+ c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr, &scanner)
runner.CrunchLog.Printf("Running %v", c.Args)
pdhOnly := true
tmpcount := 0
arvMountCmd := []string{
+ "arv-mount",
"--foreground",
- "--allow-other",
"--read-write",
"--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","),
fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
- if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
- arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
+ if _, isdocker := runner.executor.(*dockerExecutor); isdocker {
+ arvMountCmd = append(arvMountCmd, "--allow-other")
+ }
+
+ if runner.Container.RuntimeConstraints.KeepCacheDisk > 0 {
+ keepcachedir, err := runner.MkTempDir(runner.parentTemp, "keepcache")
+ if err != nil {
+ return nil, fmt.Errorf("while creating keep cache temp dir: %v", err)
+ }
+ arvMountCmd = append(arvMountCmd, "--disk-cache", "--disk-cache-dir", keepcachedir, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheDisk))
+ } else if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
+ arvMountCmd = append(arvMountCmd, "--ram-cache", "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
}
collectionPaths := []string{}
sort.Strings(binds)
for _, bind := range binds {
- mnt, ok := runner.Container.Mounts[bind]
- if !ok {
+ mnt, notSecret := runner.Container.Mounts[bind]
+ if !notSecret {
mnt = runner.SecretMounts[bind]
}
if bind == "stdout" || bind == "stderr" {
}
} else {
src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
- arvMountCmd = append(arvMountCmd, "--mount-tmp")
- arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
+ arvMountCmd = append(arvMountCmd, "--mount-tmp", fmt.Sprintf("tmp%d", tmpcount))
tmpcount++
}
if mnt.Writable {
if err != nil {
return nil, fmt.Errorf("writing temp file: %v", err)
}
- if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
+ if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && (notSecret || runner.Container.Mounts[runner.Container.OutputPath].Kind != "collection") {
+ // In most cases, if the container
+ // specifies a literal file inside the
+ // output path, we copy it into the
+ // output directory (either a mounted
+ // collection or a staging area on the
+ // host fs). If it's a secret, it will
+ // be skipped when copying output from
+ // staging to Keep later.
copyFiles = append(copyFiles, copyFile{tmpfn, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
} else {
+ // If a secret is outside OutputPath,
+ // we bind mount the secret file
+ // directly just like other mounts. We
+ // also use this strategy when a
+ // secret is inside OutputPath but
+ // OutputPath is a live collection, to
+ // avoid writing the secret to
+ // Keep. Attempting to remove a
+ // bind-mounted secret file from
+ // inside the container will return a
+ // "Device or resource busy" error
+ // that might not be handled well by
+ // the container, which is why we
+ // don't use this strategy when
+ // OutputPath is a staging directory.
bindmounts[bind] = bindmount{HostPath: tmpfn, ReadOnly: true}
}
if err != nil {
return nil, fmt.Errorf("creating temp dir: %v", err)
}
- err = gitMount(mnt).extractTree(runner.ContainerArvClient, tmpdir, token)
+ err = gitMount(mnt).extractTree(runner.containerClient, tmpdir, token)
if err != nil {
return nil, err
}
}
if pdhOnly {
- arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
+ // If we are only mounting collections by pdh, make
+ // sure we don't subscribe to websocket events to
+ // avoid putting undesired load on the API server
+ arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id", "--disable-event-listening")
} else {
arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
}
+ // the by_uuid mount point is used by singularity when writing
+ // out docker images converted to SIF
+ arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_uuid")
arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
if err != nil {
return nil, fmt.Errorf("while trying to start arv-mount: %v", err)
}
+ if runner.hoststatReporter != nil && runner.ArvMount != nil {
+ runner.hoststatReporter.ReportPID("arv-mount", runner.ArvMount.Process.Pid)
+ }
for _, p := range collectionPaths {
_, err = os.Stat(p)
return nil
}
runner.hoststatReporter.Stop()
+ runner.hoststatReporter.LogProcessMemMax(runner.CrunchLog)
err := runner.hoststatLogger.Close()
if err != nil {
return fmt.Errorf("error closing hoststat logs: %v", err)
}
runner.hoststatLogger = NewThrottledLogger(w)
runner.hoststatReporter = &crunchstat.Reporter{
- Logger: log.New(runner.hoststatLogger, "", 0),
- CgroupRoot: runner.cgroupRoot,
+ Logger: log.New(runner.hoststatLogger, "", 0),
+ // Our own cgroup is the "host" cgroup, in the sense
+ // that it accounts for resource usage outside the
+ // container. It doesn't count _all_ resource usage on
+ // the system.
+ //
+ // TODO?: Use the furthest ancestor of our own cgroup
+ // that has stats available. (Currently crunchstat
+ // does not have that capability.)
+ Pid: os.Getpid,
PollPeriod: runner.statInterval,
}
runner.hoststatReporter.Start()
+ runner.hoststatReporter.ReportPID("crunch-run", os.Getpid())
return nil
}
}
runner.statLogger = NewThrottledLogger(w)
runner.statReporter = &crunchstat.Reporter{
- CID: runner.executor.CgroupID(),
- Logger: log.New(runner.statLogger, "", 0),
- CgroupParent: runner.expectCgroupParent,
- CgroupRoot: runner.cgroupRoot,
- PollPeriod: runner.statInterval,
- TempDir: runner.parentTemp,
+ Pid: runner.executor.Pid,
+ FS: runner.crunchstatFakeFS,
+ Logger: log.New(runner.statLogger, "", 0),
+ MemThresholds: map[string][]crunchstat.Threshold{
+ "rss": crunchstat.NewThresholdsFromPercentages(runner.Container.RuntimeConstraints.RAM, []int64{90, 95, 99}),
+ },
+ PollPeriod: runner.statInterval,
+ TempDir: runner.parentTemp,
+ ThresholdLogger: runner.CrunchLog,
}
runner.statReporter.Start()
return nil
env["ARVADOS_API_TOKEN"] = tok
env["ARVADOS_API_HOST"] = os.Getenv("ARVADOS_API_HOST")
env["ARVADOS_API_HOST_INSECURE"] = os.Getenv("ARVADOS_API_HOST_INSECURE")
+ env["ARVADOS_KEEP_SERVICES"] = os.Getenv("ARVADOS_KEEP_SERVICES")
}
workdir := runner.Container.Cwd
if workdir == "." {
runner.executorStdin = stdin
runner.executorStdout = stdout
runner.executorStderr = stderr
+
+ if runner.Container.RuntimeConstraints.CUDA.DeviceCount > 0 {
+ nvidiaModprobe(runner.CrunchLog)
+ }
+
return runner.executor.Create(containerSpec{
- Image: imageID,
- VCPUs: runner.Container.RuntimeConstraints.VCPUs,
- RAM: ram,
- WorkingDir: workdir,
- Env: env,
- BindMounts: bindmounts,
- Command: runner.Container.Command,
- EnableNetwork: enableNetwork,
- NetworkMode: runner.networkMode,
- CgroupParent: runner.setCgroupParent,
- Stdin: stdin,
- Stdout: stdout,
- Stderr: stderr,
+ Image: imageID,
+ VCPUs: runner.Container.RuntimeConstraints.VCPUs,
+ RAM: ram,
+ WorkingDir: workdir,
+ Env: env,
+ BindMounts: bindmounts,
+ Command: runner.Container.Command,
+ EnableNetwork: enableNetwork,
+ CUDADeviceCount: runner.Container.RuntimeConstraints.CUDA.DeviceCount,
+ NetworkMode: runner.networkMode,
+ CgroupParent: runner.setCgroupParent,
+ Stdin: stdin,
+ Stdout: stdout,
+ Stderr: stderr,
})
}
}
runner.ExitCode = &exitcode
+ extra := ""
+ if exitcode&0x80 != 0 {
+ // Convert raw exit status (0x80 + signal number) to a
+ // string to log after the code, like " (signal 101)"
+ // or " (signal 9, killed)"
+ sig := syscall.WaitStatus(exitcode).Signal()
+ if name := unix.SignalName(sig); name != "" {
+ extra = fmt.Sprintf(" (signal %d, %s)", sig, name)
+ } else {
+ extra = fmt.Sprintf(" (signal %d)", sig)
+ }
+ }
+ runner.CrunchLog.Printf("Container exited with status code %d%s", exitcode, extra)
+ err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+ "select": []string{"uuid"},
+ "container": arvadosclient.Dict{"exit_code": exitcode},
+ }, nil)
+ if err != nil {
+ runner.CrunchLog.Printf("ignoring error updating exit_code: %s", err)
+ }
+
var returnErr error
if err = runner.executorStdin.Close(); err != nil {
err = fmt.Errorf("error closing container stdin: %s", err)
if runner.statReporter != nil {
runner.statReporter.Stop()
+ runner.statReporter.LogMaxima(runner.CrunchLog, map[string]int64{
+ "rss": runner.Container.RuntimeConstraints.RAM,
+ })
err = runner.statLogger.Close()
if err != nil {
runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
continue
}
- var updated arvados.Container
err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
- "container": arvadosclient.Dict{"log": saved.PortableDataHash},
- }, &updated)
+ "select": []string{"uuid"},
+ "container": arvadosclient.Dict{
+ "log": saved.PortableDataHash,
+ },
+ }, nil)
if err != nil {
runner.CrunchLog.Printf("error updating container log to %s: %s", saved.PortableDataHash, err)
continue
}
}
+var spotInterruptionCheckInterval = 5 * time.Second
+var ec2MetadataBaseURL = "http://169.254.169.254"
+
+const ec2TokenTTL = time.Second * 21600
+
+func (runner *ContainerRunner) checkSpotInterruptionNotices() {
+ type ec2metadata struct {
+ Action string `json:"action"`
+ Time time.Time `json:"time"`
+ }
+ runner.CrunchLog.Printf("Checking for spot interruptions every %v using instance metadata at %s", spotInterruptionCheckInterval, ec2MetadataBaseURL)
+ var metadata ec2metadata
+ var token string
+ var tokenExp time.Time
+ check := func() error {
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+ defer cancel()
+ if token == "" || tokenExp.Sub(time.Now()) < time.Minute {
+ req, err := http.NewRequestWithContext(ctx, http.MethodPut, ec2MetadataBaseURL+"/latest/api/token", nil)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", fmt.Sprintf("%d", int(ec2TokenTTL/time.Second)))
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("%s", resp.Status)
+ }
+ newtoken, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+ token = strings.TrimSpace(string(newtoken))
+ tokenExp = time.Now().Add(ec2TokenTTL)
+ }
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, ec2MetadataBaseURL+"/latest/meta-data/spot/instance-action", nil)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("X-aws-ec2-metadata-token", token)
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ metadata = ec2metadata{}
+ switch resp.StatusCode {
+ case http.StatusOK:
+ break
+ case http.StatusNotFound:
+ // "If Amazon EC2 is not preparing to stop or
+ // terminate the instance, or if you
+ // terminated the instance yourself,
+ // instance-action is not present in the
+ // instance metadata and you receive an HTTP
+ // 404 error when you try to retrieve it."
+ return nil
+ case http.StatusUnauthorized:
+ token = ""
+ return fmt.Errorf("%s", resp.Status)
+ default:
+ return fmt.Errorf("%s", resp.Status)
+ }
+ err = json.NewDecoder(resp.Body).Decode(&metadata)
+ if err != nil {
+ return err
+ }
+ return nil
+ }
+ failures := 0
+ var lastmetadata ec2metadata
+ for range time.NewTicker(spotInterruptionCheckInterval).C {
+ err := check()
+ if err != nil {
+ runner.CrunchLog.Printf("Error checking spot interruptions: %s", err)
+ failures++
+ if failures > 5 {
+ runner.CrunchLog.Printf("Giving up on checking spot interruptions after too many consecutive failures")
+ return
+ }
+ continue
+ }
+ failures = 0
+ if metadata != lastmetadata {
+ lastmetadata = metadata
+ text := fmt.Sprintf("Cloud provider scheduled instance %s at %s", metadata.Action, metadata.Time.UTC().Format(time.RFC3339))
+ runner.CrunchLog.Printf("%s", text)
+ runner.updateRuntimeStatus(arvadosclient.Dict{
+ "warning": "preemption notice",
+ "warningDetail": text,
+ "preemptionNotice": text,
+ })
+ if proc, err := os.FindProcess(os.Getpid()); err == nil {
+ // trigger updateLogs
+ proc.Signal(syscall.SIGUSR1)
+ }
+ }
+ }
+}
+
+func (runner *ContainerRunner) updateRuntimeStatus(status arvadosclient.Dict) {
+ err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+ "select": []string{"uuid"},
+ "container": arvadosclient.Dict{
+ "runtime_status": status,
+ },
+ }, nil)
+ if err != nil {
+ runner.CrunchLog.Printf("error updating container runtime_status: %s", err)
+ }
+}
+
// CaptureOutput saves data from the container's output directory if
// needed, and updates the container output accordingly.
func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) error {
// Output may have been set directly by the container, so
// refresh the container record to check.
err := runner.DispatcherArvClient.Get("containers", runner.Container.UUID,
- nil, &runner.Container)
+ arvadosclient.Dict{
+ "select": []string{"output"},
+ }, &runner.Container)
if err != nil {
return err
}
txt, err := (&copier{
client: runner.containerClient,
- arvClient: runner.ContainerArvClient,
keepClient: runner.ContainerKeepClient,
hostOutputDir: runner.HostOutputDir,
ctrOutputDir: runner.Container.OutputPath,
var resp arvados.Collection
err = runner.ContainerArvClient.Create("collections", arvadosclient.Dict{
"ensure_unique_name": true,
+ "select": []string{"portable_data_hash"},
"collection": arvadosclient.Dict{
"is_trashed": true,
"name": "output for " + runner.Container.UUID,
if umnterr != nil {
runner.CrunchLog.Printf("Error unmounting: %v", umnterr)
+ runner.ArvMount.Process.Kill()
} else {
// If arv-mount --unmount gets stuck for any reason, we
// don't want to wait for it forever. Do Wait() in a goroutine
}
}
}
+ runner.ArvMount = nil
}
if runner.ArvMountPoint != "" {
if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil {
runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr)
}
+ runner.ArvMountPoint = ""
}
if rmerr := os.RemoveAll(runner.parentTemp); rmerr != nil {
runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
}()
+ if runner.keepstoreLogger != nil {
+ // Flush any buffered logs from our local keepstore
+ // process. Discard anything logged after this point
+ // -- it won't end up in the log collection, so
+ // there's no point writing it to the collectionfs.
+ runner.keepstoreLogbuf.SetWriter(io.Discard)
+ runner.keepstoreLogger.Close()
+ runner.keepstoreLogger = nil
+ }
+
if runner.LogsPDH != nil {
// If we have already assigned something to LogsPDH,
// we must be closing the re-opened log, which won't
// -- it exists only to send logs to other channels.
return nil
}
+
saved, err := runner.saveLogCollection(true)
if err != nil {
return fmt.Errorf("error saving log collection: %s", err)
return nil
}
+// Create/update the log collection. Return value has UUID and
+// PortableDataHash fields populated, but others may be blank.
func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.Collection, err error) {
runner.logMtx.Lock()
defer runner.logMtx.Unlock()
if final {
updates["is_trashed"] = true
} else {
- exp := time.Now().Add(crunchLogUpdatePeriod * 24)
+ // We set trash_at so this collection gets
+ // automatically cleaned up eventually. It used to be
+ // 12 hours but we had a situation where the API
+ // server was down over a weekend but the containers
+ // kept running such that the log collection got
+ // trashed, so now we make it 2 weeks. refs #20378
+ exp := time.Now().Add(time.Duration(24*14) * time.Hour)
updates["trash_at"] = exp
updates["delete_at"] = exp
}
- reqBody := arvadosclient.Dict{"collection": updates}
+ reqBody := arvadosclient.Dict{
+ "select": []string{"uuid", "portable_data_hash"},
+ "collection": updates,
+ }
var err2 error
if runner.logUUID == "" {
reqBody["ensure_unique_name"] = true
}
// UpdateContainerRunning updates the container state to "Running"
-func (runner *ContainerRunner) UpdateContainerRunning() error {
+func (runner *ContainerRunner) UpdateContainerRunning(logId string) error {
runner.cStateLock.Lock()
defer runner.cStateLock.Unlock()
if runner.cCancelled {
return ErrCancelled
}
- return runner.DispatcherArvClient.Update("containers", runner.Container.UUID,
- arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running", "gateway_address": runner.gateway.Address}}, nil)
+ updates := arvadosclient.Dict{
+ "gateway_address": runner.gateway.Address,
+ "state": "Running",
+ }
+ if logId != "" {
+ updates["log"] = logId
+ }
+ return runner.DispatcherArvClient.Update(
+ "containers",
+ runner.Container.UUID,
+ arvadosclient.Dict{
+ "select": []string{"uuid"},
+ "container": updates,
+ },
+ nil,
+ )
}
// ContainerToken returns the api_token the container (and any
if runner.LogsPDH != nil {
update["log"] = *runner.LogsPDH
}
- if runner.finalState == "Complete" {
- if runner.ExitCode != nil {
- update["exit_code"] = *runner.ExitCode
- }
- if runner.OutputPDH != nil {
- update["output"] = *runner.OutputPDH
- }
+ if runner.ExitCode != nil {
+ update["exit_code"] = *runner.ExitCode
+ } else {
+ update["exit_code"] = nil
+ }
+ if runner.finalState == "Complete" && runner.OutputPDH != nil {
+ update["output"] = *runner.OutputPDH
}
- return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
+ update["cost"] = runner.calculateCost(time.Now())
+ return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+ "select": []string{"uuid"},
+ "container": update,
+ }, nil)
}
// IsCancelled returns the value of Cancelled, with goroutine safety.
// Run the full container lifecycle.
func (runner *ContainerRunner) Run() (err error) {
runner.CrunchLog.Printf("crunch-run %s started", cmd.Version.String())
- runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
+ runner.CrunchLog.Printf("%s", currentUserAndGroups())
+ v, _ := exec.Command("arv-mount", "--version").CombinedOutput()
+ runner.CrunchLog.Printf("Using FUSE mount: %s", v)
+ runner.CrunchLog.Printf("Using container runtime: %s", runner.executor.Runtime())
+ runner.CrunchLog.Printf("Executing container: %s", runner.Container.UUID)
+ runner.costStartTime = time.Now()
hostname, hosterr := os.Hostname()
if hosterr != nil {
runner.CrunchLog.Printf("Executing on host '%s'", hostname)
}
+ sigusr2 := make(chan os.Signal, 1)
+ signal.Notify(sigusr2, syscall.SIGUSR2)
+ defer signal.Stop(sigusr2)
+ runner.loadPrices()
+ go runner.handleSIGUSR2(sigusr2)
+
runner.finalState = "Queued"
defer func() {
}
checkErr("stopHoststat", runner.stopHoststat())
checkErr("CommitLogs", runner.CommitLogs())
+ runner.CleanupDirs()
checkErr("UpdateContainerFinal", runner.UpdateContainerFinal())
}()
if err != nil {
return
}
+ if runner.keepstore != nil {
+ runner.hoststatReporter.ReportPID("keepstore", runner.keepstore.Process.Pid)
+ }
// set up FUSE mount and binds
bindmounts, err = runner.SetupMounts()
return
}
- err = runner.UpdateContainerRunning()
+ logCollection, err := runner.saveLogCollection(false)
+ var logId string
+ if err == nil {
+ logId = logCollection.PortableDataHash
+ } else {
+ runner.CrunchLog.Printf("Error committing initial log collection: %v", err)
+ }
+ err = runner.UpdateContainerRunning(logId)
if err != nil {
return
}
}
func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+ log := log.New(stderr, "", 0)
flags := flag.NewFlagSet(prog, flag.ContinueOnError)
statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
- cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
- cgroupParent := flags.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
- cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
+ flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree (obsolete, ignored)")
+ flags.String("cgroup-parent", "docker", "name of container's parent cgroup (obsolete, ignored)")
+ cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given `subsystem` as parent cgroup for container (subsystem argument is only relevant for cgroups v1; in cgroups v2 / unified mode, any non-empty value means use current cgroup)")
caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
- stdinEnv := flags.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
+ stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin")
+ configFile := flags.String("config", arvados.DefaultConfigFile, "filename of cluster config file to try loading if -stdin-config=false (default is $ARVADOS_CONFIG)")
sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
- list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
+ list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes (and notify them to use price data passed on stdin)")
enableMemoryLimit := flags.Bool("enable-memory-limit", true, "tell container runtime to limit container's memory usage")
enableNetwork := flags.String("container-enable-networking", "default", "enable networking \"always\" (for all containers) or \"default\" (for containers that request it)")
networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`)
memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity")
+ brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)")
flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
+ version := flags.Bool("version", false, "Write version information to stdout and exit 0.")
ignoreDetachFlag := false
if len(args) > 0 && args[0] == "-no-detach" {
ignoreDetachFlag = true
}
- if err := flags.Parse(args); err == flag.ErrHelp {
+ if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok {
+ return code
+ } else if *version {
+ fmt.Fprintln(stdout, prog, cmd.Version.String())
return 0
- } else if err != nil {
- log.Print(err)
- return 1
- }
-
- if *stdinEnv && !ignoreDetachFlag {
- // Load env vars on stdin if asked (but not in a
- // detached child process, in which case stdin is
- // /dev/null).
- err := loadEnv(os.Stdin)
- if err != nil {
- log.Print(err)
- return 1
- }
+ } else if !*list && flags.NArg() != 1 {
+ fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
+ return 2
}
containerUUID := flags.Arg(0)
switch {
case *detach && !ignoreDetachFlag:
- return Detach(containerUUID, prog, args, os.Stdout, os.Stderr)
+ return Detach(containerUUID, prog, args, stdin, stdout, stderr)
case *kill >= 0:
- return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr)
+ return KillProcess(containerUUID, syscall.Signal(*kill), stdout, stderr)
case *list:
- return ListProcesses(os.Stdout, os.Stderr)
+ return ListProcesses(stdin, stdout, stderr)
}
- if containerUUID == "" {
+ if len(containerUUID) != 27 {
log.Printf("usage: %s [options] UUID", prog)
return 1
}
+ var keepstoreLogbuf bufThenWrite
+ var conf ConfigData
+ if *stdinConfig {
+ err := json.NewDecoder(stdin).Decode(&conf)
+ if err != nil {
+ log.Printf("decode stdin: %s", err)
+ return 1
+ }
+ for k, v := range conf.Env {
+ err = os.Setenv(k, v)
+ if err != nil {
+ log.Printf("setenv(%q): %s", k, err)
+ return 1
+ }
+ }
+ if conf.Cluster != nil {
+ // ClusterID is missing from the JSON
+ // representation, but we need it to generate
+ // a valid config file for keepstore, so we
+ // fill it using the container UUID prefix.
+ conf.Cluster.ClusterID = containerUUID[:5]
+ }
+ } else {
+ conf = hpcConfData(containerUUID, *configFile, io.MultiWriter(&keepstoreLogbuf, stderr))
+ }
+
log.Printf("crunch-run %s started", cmd.Version.String())
time.Sleep(*sleep)
arvadosclient.CertFiles = []string{*caCertsPath}
}
+ keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
+ if err != nil {
+ log.Print(err)
+ return 1
+ }
+ if keepstore != nil {
+ defer keepstore.Process.Kill()
+ }
+
api, err := arvadosclient.MakeArvadosClient()
if err != nil {
log.Printf("%s: %v", containerUUID, err)
return 1
}
- api.Retries = 8
+ // arvadosclient now interprets Retries=10 to mean
+ // Timeout=10m, retrying with exponential backoff + jitter.
+ api.Retries = 10
- kc, kcerr := keepclient.MakeKeepClient(api)
- if kcerr != nil {
- log.Printf("%s: %v", containerUUID, kcerr)
+ kc, err := keepclient.MakeKeepClient(api)
+ if err != nil {
+ log.Printf("%s: %v", containerUUID, err)
return 1
}
kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
return 1
}
+ cr.keepstore = keepstore
+ if keepstore == nil {
+ // Log explanation (if any) for why we're not running
+ // a local keepstore.
+ var buf bytes.Buffer
+ keepstoreLogbuf.SetWriter(&buf)
+ if buf.Len() > 0 {
+ cr.CrunchLog.Printf("%s", strings.TrimSpace(buf.String()))
+ }
+ } else if logWhat := conf.Cluster.Containers.LocalKeepLogsToContainerLog; logWhat == "none" {
+ cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
+ keepstoreLogbuf.SetWriter(io.Discard)
+ } else {
+ cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
+ logwriter, err := cr.NewLogWriter("keepstore")
+ if err != nil {
+ log.Print(err)
+ return 1
+ }
+ cr.keepstoreLogger = NewThrottledLogger(logwriter)
+
+ var writer io.WriteCloser = cr.keepstoreLogger
+ if logWhat == "errors" {
+ writer = &filterKeepstoreErrorsOnly{WriteCloser: writer}
+ } else if logWhat != "all" {
+ // should have been caught earlier by
+ // dispatcher's config loader
+ log.Printf("invalid value for Containers.LocalKeepLogsToContainerLog: %q", logWhat)
+ return 1
+ }
+ err = keepstoreLogbuf.SetWriter(writer)
+ if err != nil {
+ log.Print(err)
+ return 1
+ }
+ cr.keepstoreLogbuf = &keepstoreLogbuf
+ }
+
switch *runtimeEngine {
case "docker":
cr.executor, err = newDockerExecutor(containerUUID, cr.CrunchLog.Printf, cr.containerWatchdogInterval)
}
defer cr.executor.Close()
+ cr.brokenNodeHook = *brokenNodeHook
+
gwAuthSecret := os.Getenv("GatewayAuthSecret")
os.Unsetenv("GatewayAuthSecret")
if gwAuthSecret == "" {
// not safe to run a gateway service without an auth
// secret
cr.CrunchLog.Printf("Not starting a gateway server (GatewayAuthSecret was not provided by dispatcher)")
- } else if gwListen := os.Getenv("GatewayAddress"); gwListen == "" {
- // dispatcher did not tell us which external IP
- // address to advertise --> no gateway service
- cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)")
- } else if de, ok := cr.executor.(*dockerExecutor); ok {
+ } else {
+ gwListen := os.Getenv("GatewayAddress")
cr.gateway = Gateway{
- Address: gwListen,
- AuthSecret: gwAuthSecret,
- ContainerUUID: containerUUID,
- DockerContainerID: &de.containerID,
- Log: cr.CrunchLog,
- ContainerIPAddress: dockerContainerIPAddress(&de.containerID),
+ Address: gwListen,
+ AuthSecret: gwAuthSecret,
+ ContainerUUID: containerUUID,
+ Target: cr.executor,
+ Log: cr.CrunchLog,
+ LogCollection: cr.LogCollection,
+ }
+ if gwListen == "" {
+ // Direct connection won't work, so we use the
+ // gateway_address field to indicate the
+ // internalURL of the controller process that
+ // has the current tunnel connection.
+ cr.gateway.ArvadosClient = cr.dispatcherClient
+ cr.gateway.UpdateTunnelURL = func(url string) {
+ cr.gateway.Address = "tunnel " + url
+ cr.DispatcherArvClient.Update("containers", containerUUID,
+ arvadosclient.Dict{
+ "select": []string{"uuid"},
+ "container": arvadosclient.Dict{"gateway_address": cr.gateway.Address},
+ }, nil)
+ }
}
err = cr.gateway.Start()
if err != nil {
cr.parentTemp = parentTemp
cr.statInterval = *statInterval
- cr.cgroupRoot = *cgroupRoot
- cr.expectCgroupParent = *cgroupParent
cr.enableMemoryLimit = *enableMemoryLimit
cr.enableNetwork = *enableNetwork
cr.networkMode = *networkMode
if *cgroupParentSubsystem != "" {
- p := findCgroup(*cgroupParentSubsystem)
+ p, err := findCgroup(os.DirFS("/"), *cgroupParentSubsystem)
+ if err != nil {
+ log.Printf("fatal: cgroup parent subsystem: %s", err)
+ return 1
+ }
cr.setCgroupParent = p
- cr.expectCgroupParent = p
+ }
+
+ if conf.EC2SpotCheck {
+ go cr.checkSpotInterruptionNotices()
}
runerr := cr.Run()
return 0
}
-func loadEnv(rdr io.Reader) error {
- buf, err := ioutil.ReadAll(rdr)
+// Try to load ConfigData in hpc (slurm/lsf) environment. This means
+// loading the cluster config from the specified file and (if that
+// works) getting the runtime_constraints container field from
+// controller to determine # VCPUs so we can calculate KeepBuffers.
+func hpcConfData(uuid string, configFile string, stderr io.Writer) ConfigData {
+ var conf ConfigData
+ conf.Cluster = loadClusterConfigFile(configFile, stderr)
+ if conf.Cluster == nil {
+ // skip loading the container record -- we won't be
+ // able to start local keepstore anyway.
+ return conf
+ }
+ arv, err := arvadosclient.MakeArvadosClient()
+ if err != nil {
+ fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err)
+ return conf
+ }
+ // arvadosclient now interprets Retries=10 to mean
+ // Timeout=10m, retrying with exponential backoff + jitter.
+ arv.Retries = 10
+ var ctr arvados.Container
+ err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr)
+ if err != nil {
+ fmt.Fprintf(stderr, "error getting container record: %s\n", err)
+ return conf
+ }
+ if ctr.RuntimeConstraints.VCPUs > 0 {
+ conf.KeepBuffers = ctr.RuntimeConstraints.VCPUs * conf.Cluster.Containers.LocalKeepBlobBuffersPerVCPU
+ }
+ return conf
+}
+
+// Load cluster config file from given path. If an error occurs, log
+// the error to stderr and return nil.
+func loadClusterConfigFile(path string, stderr io.Writer) *arvados.Cluster {
+ ldr := config.NewLoader(&bytes.Buffer{}, ctxlog.New(stderr, "plain", "info"))
+ ldr.Path = path
+ cfg, err := ldr.Load()
+ if err != nil {
+ fmt.Fprintf(stderr, "could not load config file %s: %s\n", path, err)
+ return nil
+ }
+ cluster, err := cfg.GetCluster("")
+ if err != nil {
+ fmt.Fprintf(stderr, "could not use config file %s: %s\n", path, err)
+ return nil
+ }
+ fmt.Fprintf(stderr, "loaded config file %s\n", path)
+ return cluster
+}
+
+func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) {
+ if configData.KeepBuffers < 1 {
+ fmt.Fprintf(logbuf, "not starting a local keepstore process because KeepBuffers=%v in config\n", configData.KeepBuffers)
+ return nil, nil
+ }
+ if configData.Cluster == nil {
+ fmt.Fprint(logbuf, "not starting a local keepstore process because cluster config file was not loaded\n")
+ return nil, nil
+ }
+ for uuid, vol := range configData.Cluster.Volumes {
+ if len(vol.AccessViaHosts) > 0 {
+ fmt.Fprintf(logbuf, "not starting a local keepstore process because a volume (%s) uses AccessViaHosts\n", uuid)
+ return nil, nil
+ }
+ if !vol.ReadOnly && vol.Replication < configData.Cluster.Collections.DefaultReplication {
+ fmt.Fprintf(logbuf, "not starting a local keepstore process because a writable volume (%s) has replication less than Collections.DefaultReplication (%d < %d)\n", uuid, vol.Replication, configData.Cluster.Collections.DefaultReplication)
+ return nil, nil
+ }
+ }
+
+ // Rather than have an alternate way to tell keepstore how
+ // many buffers to use when starting it this way, we just
+ // modify the cluster configuration that we feed it on stdin.
+ configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers
+
+ localaddr := localKeepstoreAddr()
+ ln, err := net.Listen("tcp", net.JoinHostPort(localaddr, "0"))
+ if err != nil {
+ return nil, err
+ }
+ _, port, err := net.SplitHostPort(ln.Addr().String())
+ if err != nil {
+ ln.Close()
+ return nil, err
+ }
+ ln.Close()
+ url := "http://" + net.JoinHostPort(localaddr, port)
+
+ fmt.Fprintf(logbuf, "starting keepstore on %s\n", url)
+
+ var confJSON bytes.Buffer
+ err = json.NewEncoder(&confJSON).Encode(arvados.Config{
+ Clusters: map[string]arvados.Cluster{
+ configData.Cluster.ClusterID: *configData.Cluster,
+ },
+ })
if err != nil {
- return fmt.Errorf("read stdin: %s", err)
+ return nil, err
}
- var env map[string]string
- err = json.Unmarshal(buf, &env)
+ cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-")
+ if target, err := os.Readlink(cmd.Path); err == nil && strings.HasSuffix(target, ".test") {
+ // If we're a 'go test' process, running
+ // /proc/self/exe would start the test suite in a
+ // child process, which is not what we want.
+ cmd.Path, _ = exec.LookPath("go")
+ cmd.Args = append([]string{"go", "run", "../../cmd/arvados-server"}, cmd.Args[1:]...)
+ cmd.Env = os.Environ()
+ }
+ cmd.Stdin = &confJSON
+ cmd.Stdout = logbuf
+ cmd.Stderr = logbuf
+ cmd.Env = append(cmd.Env,
+ "GOGC=10",
+ "ARVADOS_SERVICE_INTERNAL_URL="+url)
+ err = cmd.Start()
if err != nil {
- return fmt.Errorf("decode stdin: %s", err)
+ return nil, fmt.Errorf("error starting keepstore process: %w", err)
}
- for k, v := range env {
- err = os.Setenv(k, v)
+ cmdExited := false
+ go func() {
+ cmd.Wait()
+ cmdExited = true
+ }()
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
+ defer cancel()
+ poll := time.NewTicker(time.Second / 10)
+ defer poll.Stop()
+ client := http.Client{}
+ for range poll.C {
+ testReq, err := http.NewRequestWithContext(ctx, "GET", url+"/_health/ping", nil)
+ testReq.Header.Set("Authorization", "Bearer "+configData.Cluster.ManagementToken)
if err != nil {
- return fmt.Errorf("setenv(%q): %s", k, err)
+ return nil, err
+ }
+ resp, err := client.Do(testReq)
+ if err == nil {
+ resp.Body.Close()
+ if resp.StatusCode == http.StatusOK {
+ break
+ }
+ }
+ if cmdExited {
+ return nil, fmt.Errorf("keepstore child process exited")
+ }
+ if ctx.Err() != nil {
+ return nil, fmt.Errorf("timed out waiting for new keepstore process to report healthy")
}
}
- return nil
+ os.Setenv("ARVADOS_KEEP_SERVICES", url)
+ return cmd, nil
+}
+
+// return current uid, gid, groups in a format suitable for logging:
+// "crunch-run process has uid=1234(arvados) gid=1234(arvados)
+// groups=1234(arvados),114(fuse)"
+func currentUserAndGroups() string {
+ u, err := user.Current()
+ if err != nil {
+ return fmt.Sprintf("error getting current user ID: %s", err)
+ }
+ s := fmt.Sprintf("crunch-run process has uid=%s(%s) gid=%s", u.Uid, u.Username, u.Gid)
+ if g, err := user.LookupGroupId(u.Gid); err == nil {
+ s += fmt.Sprintf("(%s)", g.Name)
+ }
+ s += " groups="
+ if gids, err := u.GroupIds(); err == nil {
+ for i, gid := range gids {
+ if i > 0 {
+ s += ","
+ }
+ s += gid
+ if g, err := user.LookupGroupId(gid); err == nil {
+ s += fmt.Sprintf("(%s)", g.Name)
+ }
+ }
+ }
+ return s
+}
+
+// Return a suitable local interface address for a local keepstore
+// service. Currently this is the numerically lowest non-loopback ipv4
+// address assigned to a local interface that is not in any of the
+// link-local/vpn/loopback ranges 169.254/16, 100.64/10, or 127/8.
+func localKeepstoreAddr() string {
+ var ips []net.IP
+ // Ignore error (proceed with zero IPs)
+ addrs, _ := processIPs(os.Getpid())
+ for addr := range addrs {
+ ip := net.ParseIP(addr)
+ if ip == nil {
+ // invalid
+ continue
+ }
+ if ip.Mask(net.CIDRMask(8, 32)).Equal(net.IPv4(127, 0, 0, 0)) ||
+ ip.Mask(net.CIDRMask(10, 32)).Equal(net.IPv4(100, 64, 0, 0)) ||
+ ip.Mask(net.CIDRMask(16, 32)).Equal(net.IPv4(169, 254, 0, 0)) {
+ // unsuitable
+ continue
+ }
+ ips = append(ips, ip)
+ }
+ if len(ips) == 0 {
+ return "0.0.0.0"
+ }
+ sort.Slice(ips, func(ii, jj int) bool {
+ i, j := ips[ii], ips[jj]
+ if len(i) != len(j) {
+ return len(i) < len(j)
+ }
+ for x := range i {
+ if i[x] != j[x] {
+ return i[x] < j[x]
+ }
+ }
+ return false
+ })
+ return ips[0].String()
+}
+
+func (cr *ContainerRunner) loadPrices() {
+ buf, err := os.ReadFile(filepath.Join(lockdir, pricesfile))
+ if err != nil {
+ if !os.IsNotExist(err) {
+ cr.CrunchLog.Printf("loadPrices: read: %s", err)
+ }
+ return
+ }
+ var prices []cloud.InstancePrice
+ err = json.Unmarshal(buf, &prices)
+ if err != nil {
+ cr.CrunchLog.Printf("loadPrices: decode: %s", err)
+ return
+ }
+ cr.pricesLock.Lock()
+ defer cr.pricesLock.Unlock()
+ var lastKnown time.Time
+ if len(cr.prices) > 0 {
+ lastKnown = cr.prices[0].StartTime
+ }
+ cr.prices = cloud.NormalizePriceHistory(append(prices, cr.prices...))
+ for i := len(cr.prices) - 1; i >= 0; i-- {
+ price := cr.prices[i]
+ if price.StartTime.After(lastKnown) {
+ cr.CrunchLog.Printf("Instance price changed to %#.3g at %s", price.Price, price.StartTime.UTC())
+ }
+ }
+}
+
+func (cr *ContainerRunner) calculateCost(now time.Time) float64 {
+ cr.pricesLock.Lock()
+ defer cr.pricesLock.Unlock()
+
+ // First, make a "prices" slice with the real data as far back
+ // as it goes, and (if needed) a "since the beginning of time"
+ // placeholder containing a reasonable guess about what the
+ // price was between cr.costStartTime and the earliest real
+ // data point.
+ prices := cr.prices
+ if len(prices) == 0 {
+ // use price info in InstanceType record initially
+ // provided by cloud dispatcher
+ var p float64
+ var it arvados.InstanceType
+ if j := os.Getenv("InstanceType"); j != "" && json.Unmarshal([]byte(j), &it) == nil && it.Price > 0 {
+ p = it.Price
+ }
+ prices = []cloud.InstancePrice{{Price: p}}
+ } else if prices[len(prices)-1].StartTime.After(cr.costStartTime) {
+ // guess earlier pricing was the same as the earliest
+ // price we know about
+ filler := prices[len(prices)-1]
+ filler.StartTime = time.Time{}
+ prices = append(prices, filler)
+ }
+
+ // Now that our history of price changes goes back at least as
+ // far as cr.costStartTime, add up the costs for each
+ // interval.
+ cost := 0.0
+ spanEnd := now
+ for _, ip := range prices {
+ spanStart := ip.StartTime
+ if spanStart.After(now) {
+ // pricing information from the future -- not
+ // expected from AWS, but possible in
+ // principle, and exercised by tests.
+ continue
+ }
+ last := false
+ if spanStart.Before(cr.costStartTime) {
+ spanStart = cr.costStartTime
+ last = true
+ }
+ cost += ip.Price * spanEnd.Sub(spanStart).Seconds() / 3600
+ if last {
+ break
+ }
+ spanEnd = spanStart
+ }
+
+ return cost
+}
+
+func (runner *ContainerRunner) handleSIGUSR2(sigchan chan os.Signal) {
+ for range sigchan {
+ runner.loadPrices()
+ update := arvadosclient.Dict{
+ "select": []string{"uuid"},
+ "container": arvadosclient.Dict{
+ "cost": runner.calculateCost(time.Now()),
+ },
+ }
+ runner.DispatcherArvClient.Update("containers", runner.Container.UUID, update, nil)
+ }
}