"flag"
"fmt"
"io"
+ "io/fs"
"io/ioutil"
"log"
"net"
type command struct{}
+var arvadosCertPath = "/etc/arvados/ca-certificates.crt"
+
var Command = command{}
// ConfigData contains environment variables and (when needed) cluster
ReadAt(locator string, p []byte, off int) (int, error)
ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
LocalLocator(locator string) (string, error)
- ClearBlockCache()
SetStorageClasses(sc []string)
}
-// NewLogWriter is a factory function to create a new log writer.
-type NewLogWriter func(name string) (io.WriteCloser, error)
-
type RunArvMount func(cmdline []string, tok string) (*exec.Cmd, error)
type MkTempDir func(string, string) (string, error)
Container arvados.Container
token string
ExitCode *int
- NewLogWriter NewLogWriter
- CrunchLog *ThrottledLogger
+ CrunchLog *logWriter
logUUID string
logMtx sync.Mutex
LogCollection arvados.CollectionFileSystem
hoststatLogger io.WriteCloser
hoststatReporter *crunchstat.Reporter
statInterval time.Duration
- cgroupRoot string
- // What we expect the container's cgroup parent to be.
- expectCgroupParent string
// What we tell docker to use as the container's cgroup
- // parent. Note: Ideally we would use the same field for both
- // expectCgroupParent and setCgroupParent, and just make it
- // default to "docker". However, when using docker < 1.10 with
- // systemd, specifying a non-empty cgroup parent (even the
- // default value "docker") hits a docker bug
- // (https://github.com/docker/docker/issues/17126). Using two
- // separate fields makes it possible to use the "expect cgroup
- // parent to be X" feature even on sites where the "specify
- // cgroup parent" feature breaks.
+ // parent.
setCgroupParent string
+ // Fake root dir where crunchstat.Reporter should read OS
+ // files, for testing.
+ crunchstatFakeFS fs.FS
cStateLock sync.Mutex
cCancelled bool // StopContainer() invoked
enableNetwork string // one of "default" or "always"
networkMode string // "none", "host", or "" -- passed through to executor
brokenNodeHook string // script to run if node appears to be broken
- arvMountLog *ThrottledLogger
+ arvMountLog io.WriteCloser
containerWatchdogInterval time.Duration
}
c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
- w, err := runner.NewLogWriter("arv-mount")
+ runner.arvMountLog, err = runner.openLogFile("arv-mount")
if err != nil {
return nil, err
}
- runner.arvMountLog = NewThrottledLogger(w)
scanner := logScanner{
Patterns: []string{
"Keep write error",
"Unhandled exception during FUSE operation",
},
ReportFunc: func(pattern, text string) {
- runner.updateRuntimeStatus("arv-mount: "+pattern, text)
+ runner.updateRuntimeStatus(arvadosclient.Dict{
+ "warning": "arv-mount: " + pattern,
+ "warningDetail": text,
+ })
},
}
c.Stdout = runner.arvMountLog
}
}
- if bind == "/etc/arvados/ca-certificates.crt" {
+ if bind == arvadosCertPath {
needCertMount = false
}
// OutputPath is a staging directory.
bindmounts[bind] = bindmount{HostPath: tmpfn, ReadOnly: true}
}
-
- case mnt.Kind == "git_tree":
- tmpdir, err := runner.MkTempDir(runner.parentTemp, "git_tree")
- if err != nil {
- return nil, fmt.Errorf("creating temp dir: %v", err)
- }
- err = gitMount(mnt).extractTree(runner.ContainerArvClient, tmpdir, token)
- if err != nil {
- return nil, err
- }
- bindmounts[bind] = bindmount{HostPath: tmpdir, ReadOnly: true}
}
}
}
if needCertMount && runner.Container.RuntimeConstraints.API {
- for _, certfile := range arvadosclient.CertFiles {
- _, err := os.Stat(certfile)
- if err == nil {
- bindmounts["/etc/arvados/ca-certificates.crt"] = bindmount{HostPath: certfile, ReadOnly: true}
+ for _, certfile := range []string{
+ // Populated by caller, or sdk/go/arvados init(), or test suite:
+ os.Getenv("SSL_CERT_FILE"),
+ // Copied from Go 1.21 stdlib (src/crypto/x509/root_linux.go):
+ "/etc/ssl/certs/ca-certificates.crt", // Debian/Ubuntu/Gentoo etc.
+ "/etc/pki/tls/certs/ca-bundle.crt", // Fedora/RHEL 6
+ "/etc/ssl/ca-bundle.pem", // OpenSUSE
+ "/etc/pki/tls/cacert.pem", // OpenELEC
+ "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", // CentOS/RHEL 7
+ "/etc/ssl/cert.pem", // Alpine Linux
+ } {
+ if _, err := os.Stat(certfile); err == nil {
+ bindmounts[arvadosCertPath] = bindmount{HostPath: certfile, ReadOnly: true}
break
}
}
return nil
}
runner.hoststatReporter.Stop()
+ runner.hoststatReporter.LogProcessMemMax(runner.CrunchLog)
err := runner.hoststatLogger.Close()
if err != nil {
return fmt.Errorf("error closing hoststat logs: %v", err)
}
func (runner *ContainerRunner) startHoststat() error {
- w, err := runner.NewLogWriter("hoststat")
+ var err error
+ runner.hoststatLogger, err = runner.openLogFile("hoststat")
if err != nil {
return err
}
- runner.hoststatLogger = NewThrottledLogger(w)
runner.hoststatReporter = &crunchstat.Reporter{
- Logger: log.New(runner.hoststatLogger, "", 0),
- CgroupRoot: runner.cgroupRoot,
+ Logger: newLogWriter(runner.hoststatLogger),
+ // Our own cgroup is the "host" cgroup, in the sense
+ // that it accounts for resource usage outside the
+ // container. It doesn't count _all_ resource usage on
+ // the system.
+ //
+ // TODO?: Use the furthest ancestor of our own cgroup
+ // that has stats available. (Currently crunchstat
+ // does not have that capability.)
+ Pid: os.Getpid,
PollPeriod: runner.statInterval,
}
runner.hoststatReporter.Start()
}
func (runner *ContainerRunner) startCrunchstat() error {
- w, err := runner.NewLogWriter("crunchstat")
+ var err error
+ runner.statLogger, err = runner.openLogFile("crunchstat")
if err != nil {
return err
}
- runner.statLogger = NewThrottledLogger(w)
runner.statReporter = &crunchstat.Reporter{
- CID: runner.executor.CgroupID(),
- Logger: log.New(runner.statLogger, "", 0),
- CgroupParent: runner.expectCgroupParent,
- CgroupRoot: runner.cgroupRoot,
- PollPeriod: runner.statInterval,
- TempDir: runner.parentTemp,
+ Pid: runner.executor.Pid,
+ FS: runner.crunchstatFakeFS,
+ Logger: newLogWriter(runner.statLogger),
+ MemThresholds: map[string][]crunchstat.Threshold{
+ "rss": crunchstat.NewThresholdsFromPercentages(runner.Container.RuntimeConstraints.RAM, []int64{90, 95, 99}),
+ },
+ PollPeriod: runner.statInterval,
+ TempDir: runner.parentTemp,
+ ThresholdLogger: runner.CrunchLog,
}
runner.statReporter.Start()
return nil
// might differ from what's described in the node record (see
// LogNodeRecord).
func (runner *ContainerRunner) LogHostInfo() (err error) {
- w, err := runner.NewLogWriter("node-info")
+ w, err := runner.openLogFile("node-info")
if err != nil {
return
}
// LogContainerRecord gets and saves the raw JSON container record from the API server
func (runner *ContainerRunner) LogContainerRecord() error {
- logged, err := runner.logAPIResponse("container", "containers", map[string]interface{}{"filters": [][]string{{"uuid", "=", runner.Container.UUID}}}, nil)
+ logged, err := runner.logAPIResponse("container", "containers", map[string]interface{}{"filters": [][]string{{"uuid", "=", runner.Container.UUID}}})
if !logged && err == nil {
err = fmt.Errorf("error: no container record found for %s", runner.Container.UUID)
}
return err
}
-// LogNodeRecord logs the current host's InstanceType config entry (or
-// the arvados#node record, if running via crunch-dispatch-slurm).
+// LogNodeRecord logs the current host's InstanceType config entry, if
+// running via arvados-dispatch-cloud.
func (runner *ContainerRunner) LogNodeRecord() error {
- if it := os.Getenv("InstanceType"); it != "" {
- // Dispatched via arvados-dispatch-cloud. Save
- // InstanceType config fragment received from
- // dispatcher on stdin.
- w, err := runner.LogCollection.OpenFile("node.json", os.O_CREATE|os.O_WRONLY, 0666)
- if err != nil {
- return err
- }
- defer w.Close()
- _, err = io.WriteString(w, it)
- if err != nil {
- return err
- }
- return w.Close()
+ it := os.Getenv("InstanceType")
+ if it == "" {
+ // Not dispatched by arvados-dispatch-cloud.
+ return nil
}
- // Dispatched via crunch-dispatch-slurm. Look up
- // apiserver's node record corresponding to
- // $SLURMD_NODENAME.
- hostname := os.Getenv("SLURMD_NODENAME")
- if hostname == "" {
- hostname, _ = os.Hostname()
+ // Save InstanceType config fragment received from dispatcher
+ // on stdin.
+ w, err := runner.LogCollection.OpenFile("node.json", os.O_CREATE|os.O_WRONLY, 0666)
+ if err != nil {
+ return err
}
- _, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) {
- // The "info" field has admin-only info when
- // obtained with a privileged token, and
- // should not be logged.
- node, ok := resp.(map[string]interface{})
- if ok {
- delete(node, "info")
- }
- })
- return err
+ defer w.Close()
+ _, err = io.WriteString(w, it)
+ if err != nil {
+ return err
+ }
+ return w.Close()
}
-func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}, munge func(interface{})) (logged bool, err error) {
+func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}) (logged bool, err error) {
writer, err := runner.LogCollection.OpenFile(label+".json", os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return false, err
}
- w := &ArvLogWriter{
- ArvClient: runner.DispatcherArvClient,
- UUID: runner.Container.UUID,
- loggingStream: label,
- writeCloser: writer,
- }
-
reader, err := runner.DispatcherArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params))
if err != nil {
return false, fmt.Errorf("error getting %s record: %v", label, err)
} else if len(items) < 1 {
return false, nil
}
- if munge != nil {
- munge(items[0])
- }
// Re-encode it using indentation to improve readability
- enc := json.NewEncoder(w)
+ enc := json.NewEncoder(writer)
enc.SetIndent("", " ")
if err = enc.Encode(items[0]); err != nil {
return false, fmt.Errorf("error logging %s record: %v", label, err)
}
- err = w.Close()
+ err = writer.Close()
if err != nil {
return false, fmt.Errorf("error closing %s.json in log collection: %v", label, err)
}
return err
}
stdout = f
- } else if w, err := runner.NewLogWriter("stdout"); err != nil {
+ } else if w, err := runner.openLogFile("stdout"); err != nil {
return err
} else {
- stdout = NewThrottledLogger(w)
+ stdout = w
}
if mnt, ok := runner.Container.Mounts["stderr"]; ok {
return err
}
stderr = f
- } else if w, err := runner.NewLogWriter("stderr"); err != nil {
+ } else if w, err := runner.openLogFile("stderr"); err != nil {
return err
} else {
- stderr = NewThrottledLogger(w)
+ stderr = w
}
env := runner.Container.Environment
}
runner.CrunchLog.Printf("Container exited with status code %d%s", exitcode, extra)
err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+ "select": []string{"uuid"},
"container": arvadosclient.Dict{"exit_code": exitcode},
}, nil)
if err != nil {
if runner.statReporter != nil {
runner.statReporter.Stop()
+ runner.statReporter.LogMaxima(runner.CrunchLog, map[string]int64{
+ "rss": runner.Container.RuntimeConstraints.RAM,
+ })
err = runner.statLogger.Close()
if err != nil {
runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
}
err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
- "container": arvadosclient.Dict{"log": saved.PortableDataHash},
+ "select": []string{"uuid"},
+ "container": arvadosclient.Dict{
+ "log": saved.PortableDataHash,
+ },
}, nil)
if err != nil {
runner.CrunchLog.Printf("error updating container log to %s: %s", saved.PortableDataHash, err)
failures = 0
if metadata != lastmetadata {
lastmetadata = metadata
- text := fmt.Sprintf("Cloud provider indicates instance action %q scheduled for time %q", metadata.Action, metadata.Time.UTC().Format(time.RFC3339))
+ text := fmt.Sprintf("Cloud provider scheduled instance %s at %s", metadata.Action, metadata.Time.UTC().Format(time.RFC3339))
runner.CrunchLog.Printf("%s", text)
- runner.updateRuntimeStatus("instance interruption", text)
+ runner.updateRuntimeStatus(arvadosclient.Dict{
+ "warning": "preemption notice",
+ "warningDetail": text,
+ "preemptionNotice": text,
+ })
if proc, err := os.FindProcess(os.Getpid()); err == nil {
// trigger updateLogs
proc.Signal(syscall.SIGUSR1)
}
}
-func (runner *ContainerRunner) updateRuntimeStatus(warning, detail string) {
+func (runner *ContainerRunner) updateRuntimeStatus(status arvadosclient.Dict) {
err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+ "select": []string{"uuid"},
"container": arvadosclient.Dict{
- "runtime_status": arvadosclient.Dict{
- "warning": warning,
- "warningDetail": detail,
- },
+ "runtime_status": status,
},
}, nil)
if err != nil {
// Output may have been set directly by the container, so
// refresh the container record to check.
err := runner.DispatcherArvClient.Get("containers", runner.Container.UUID,
- nil, &runner.Container)
+ arvadosclient.Dict{
+ "select": []string{"output"},
+ }, &runner.Container)
if err != nil {
return err
}
txt, err := (&copier{
client: runner.containerClient,
- arvClient: runner.ContainerArvClient,
keepClient: runner.ContainerKeepClient,
hostOutputDir: runner.HostOutputDir,
ctrOutputDir: runner.Container.OutputPath,
+ globs: runner.Container.OutputGlob,
bindmounts: bindmounts,
mounts: runner.Container.Mounts,
secretMounts: runner.SecretMounts,
var resp arvados.Collection
err = runner.ContainerArvClient.Create("collections", arvadosclient.Dict{
"ensure_unique_name": true,
+ "select": []string{"portable_data_hash"},
"collection": arvadosclient.Dict{
"is_trashed": true,
"name": "output for " + runner.Container.UUID,
if runner.arvMountLog != nil {
runner.arvMountLog.Close()
}
- runner.CrunchLog.Close()
-
- // Closing CrunchLog above allows them to be committed to Keep at this
- // point, but re-open crunch log with ArvClient in case there are any
- // other further errors (such as failing to write the log to Keep!)
- // while shutting down
- runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{
- ArvClient: runner.DispatcherArvClient,
- UUID: runner.Container.UUID,
- loggingStream: "crunch-run",
- writeCloser: nil,
- })
- runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
+
+ // From now on just log to stderr, in case there are
+ // any other further errors (such as failing to write
+ // the log to Keep!) while shutting down
+ runner.CrunchLog = newLogWriter(newTimestamper(newStringPrefixer(os.Stderr, runner.Container.UUID+" ")))
}()
if runner.keepstoreLogger != nil {
return nil
}
+// Create/update the log collection. Return value has UUID and
+// PortableDataHash fields populated, but others may be blank.
func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.Collection, err error) {
runner.logMtx.Lock()
defer runner.logMtx.Unlock()
if final {
updates["is_trashed"] = true
} else {
- exp := time.Now().Add(crunchLogUpdatePeriod * 24)
+ // We set trash_at so this collection gets
+ // automatically cleaned up eventually. It used to be
+ // 12 hours but we had a situation where the API
+ // server was down over a weekend but the containers
+ // kept running such that the log collection got
+ // trashed, so now we make it 2 weeks. refs #20378
+ exp := time.Now().Add(time.Duration(24*14) * time.Hour)
updates["trash_at"] = exp
updates["delete_at"] = exp
}
- reqBody := arvadosclient.Dict{"collection": updates}
+ reqBody := arvadosclient.Dict{
+ "select": []string{"uuid", "portable_data_hash"},
+ "collection": updates,
+ }
var err2 error
if runner.logUUID == "" {
reqBody["ensure_unique_name"] = true
return runner.DispatcherArvClient.Update(
"containers",
runner.Container.UUID,
- arvadosclient.Dict{"container": updates},
+ arvadosclient.Dict{
+ "select": []string{"uuid"},
+ "container": updates,
+ },
nil,
)
}
update["output"] = *runner.OutputPDH
}
update["cost"] = runner.calculateCost(time.Now())
- return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
+ return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+ "select": []string{"uuid"},
+ "container": update,
+ }, nil)
}
// IsCancelled returns the value of Cancelled, with goroutine safety.
return runner.cCancelled
}
-// NewArvLogWriter creates an ArvLogWriter
-func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, error) {
- writer, err := runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666)
- if err != nil {
- return nil, err
- }
- return &ArvLogWriter{
- ArvClient: runner.DispatcherArvClient,
- UUID: runner.Container.UUID,
- loggingStream: name,
- writeCloser: writer,
- }, nil
+func (runner *ContainerRunner) openLogFile(name string) (io.WriteCloser, error) {
+ return runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666)
}
// Run the full container lifecycle.
signal.Notify(sigusr2, syscall.SIGUSR2)
defer signal.Stop(sigusr2)
runner.loadPrices()
- go func() {
- for range sigusr2 {
- runner.loadPrices()
- }
- }()
+ go runner.handleSIGUSR2(sigusr2)
runner.finalState = "Queued"
defer func() {
runner.CleanupDirs()
-
runner.CrunchLog.Printf("crunch-run finished")
- runner.CrunchLog.Close()
}()
err = runner.fetchContainerRecord()
DispatcherArvClient: dispatcherArvClient,
DispatcherKeepClient: dispatcherKeepClient,
}
- cr.NewLogWriter = cr.NewArvLogWriter
cr.RunArvMount = cr.ArvMountCmd
cr.MkTempDir = ioutil.TempDir
cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
return nil, err
}
cr.Container.UUID = containerUUID
- w, err := cr.NewLogWriter("crunch-run")
+ f, err := cr.openLogFile("crunch-run")
if err != nil {
return nil, err
}
- cr.CrunchLog = NewThrottledLogger(w)
- cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
+ cr.CrunchLog = newLogWriter(newTimestamper(io.MultiWriter(f, newStringPrefixer(os.Stderr, cr.Container.UUID+" "))))
- loadLogThrottleParams(dispatcherArvClient)
go cr.updateLogs()
return cr, nil
log := log.New(stderr, "", 0)
flags := flag.NewFlagSet(prog, flag.ContinueOnError)
statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
- cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
- cgroupParent := flags.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
- cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
+ flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree (obsolete, ignored)")
+ flags.String("cgroup-parent", "docker", "name of container's parent cgroup (obsolete, ignored)")
+ cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given `subsystem` as parent cgroup for container (subsystem argument is only relevant for cgroups v1; in cgroups v2 / unified mode, any non-empty value means use current cgroup); if empty, use the docker daemon's default cgroup parent. See https://doc.arvados.org/install/crunch2-slurm/install-dispatch.html#CrunchRunCommand-cgroups")
caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin")
time.Sleep(*sleep)
if *caCertsPath != "" {
- arvadosclient.CertFiles = []string{*caCertsPath}
+ os.Setenv("SSL_CERT_FILE", *caCertsPath)
}
keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
log.Printf("%s: %v", containerUUID, err)
return 1
}
- api.Retries = 8
+ // arvadosclient now interprets Retries=10 to mean
+ // Timeout=10m, retrying with exponential backoff + jitter.
+ api.Retries = 10
kc, err := keepclient.MakeKeepClient(api)
if err != nil {
log.Printf("%s: %v", containerUUID, err)
return 1
}
- kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
kc.Retries = 4
cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, containerUUID)
keepstoreLogbuf.SetWriter(io.Discard)
} else {
cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
- logwriter, err := cr.NewLogWriter("keepstore")
+ cr.keepstoreLogger, err = cr.openLogFile("keepstore")
if err != nil {
log.Print(err)
return 1
}
- cr.keepstoreLogger = NewThrottledLogger(logwriter)
var writer io.WriteCloser = cr.keepstoreLogger
if logWhat == "errors" {
cr.executor, err = newSingularityExecutor(cr.CrunchLog.Printf)
default:
cr.CrunchLog.Printf("%s: unsupported RuntimeEngine %q", containerUUID, *runtimeEngine)
- cr.CrunchLog.Close()
return 1
}
if err != nil {
cr.CrunchLog.Printf("%s: %v", containerUUID, err)
cr.checkBrokenNode(err)
- cr.CrunchLog.Close()
return 1
}
defer cr.executor.Close()
ContainerUUID: containerUUID,
Target: cr.executor,
Log: cr.CrunchLog,
+ LogCollection: cr.LogCollection,
}
if gwListen == "" {
// Direct connection won't work, so we use the
cr.gateway.UpdateTunnelURL = func(url string) {
cr.gateway.Address = "tunnel " + url
cr.DispatcherArvClient.Update("containers", containerUUID,
- arvadosclient.Dict{"container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}}, nil)
+ arvadosclient.Dict{
+ "select": []string{"uuid"},
+ "container": arvadosclient.Dict{"gateway_address": cr.gateway.Address},
+ }, nil)
}
}
err = cr.gateway.Start()
cr.parentTemp = parentTemp
cr.statInterval = *statInterval
- cr.cgroupRoot = *cgroupRoot
- cr.expectCgroupParent = *cgroupParent
cr.enableMemoryLimit = *enableMemoryLimit
cr.enableNetwork = *enableNetwork
cr.networkMode = *networkMode
if *cgroupParentSubsystem != "" {
- p, err := findCgroup(*cgroupParentSubsystem)
+ p, err := findCgroup(os.DirFS("/"), *cgroupParentSubsystem)
if err != nil {
log.Printf("fatal: cgroup parent subsystem: %s", err)
return 1
}
cr.setCgroupParent = p
- cr.expectCgroupParent = p
}
if conf.EC2SpotCheck {
fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err)
return conf
}
- arv.Retries = 8
+ // arvadosclient now interprets Retries=10 to mean
+ // Timeout=10m, retrying with exponential backoff + jitter.
+ arv.Retries = 10
var ctr arvados.Container
err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr)
if err != nil {
}
// Rather than have an alternate way to tell keepstore how
- // many buffers to use when starting it this way, we just
- // modify the cluster configuration that we feed it on stdin.
- configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers
+ // many buffers to use, etc., when starting it this way, we
+ // just modify the cluster configuration that we feed it on
+ // stdin.
+ ccfg := *configData.Cluster
+ ccfg.API.MaxKeepBlobBuffers = configData.KeepBuffers
+ ccfg.Collections.BlobTrash = false
+ ccfg.Collections.BlobTrashConcurrency = 0
+ ccfg.Collections.BlobDeleteConcurrency = 0
localaddr := localKeepstoreAddr()
ln, err := net.Listen("tcp", net.JoinHostPort(localaddr, "0"))
var confJSON bytes.Buffer
err = json.NewEncoder(&confJSON).Encode(arvados.Config{
Clusters: map[string]arvados.Cluster{
- configData.Cluster.ClusterID: *configData.Cluster,
+ ccfg.ClusterID: ccfg,
},
})
if err != nil {
return cost
}
+
+func (runner *ContainerRunner) handleSIGUSR2(sigchan chan os.Signal) {
+ for range sigchan {
+ runner.loadPrices()
+ update := arvadosclient.Dict{
+ "select": []string{"uuid"},
+ "container": arvadosclient.Dict{
+ "cost": runner.calculateCost(time.Now()),
+ },
+ }
+ runner.DispatcherArvClient.Update("containers", runner.Container.UUID, update, nil)
+ }
+}