finalState string
parentTemp string
+ keepstoreLogger io.WriteCloser
+ keepstoreLogbuf *bufThenWrite
statLogger io.WriteCloser
statReporter *crunchstat.Reporter
hoststatLogger io.WriteCloser
arvMountCmd := []string{
"arv-mount",
"--foreground",
- "--allow-other",
"--read-write",
"--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","),
fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
+ if runner.executor.Runtime() == "docker" {
+ arvMountCmd = append(arvMountCmd, "--allow-other")
+ }
+
if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
}
}
if pdhOnly {
- arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
+ // If we are only mounting collections by pdh, make
+ // sure we don't subscribe to websocket events to
+ // avoid putting undesired load on the API server
+ arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id", "--disable-event-listening")
} else {
arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
}
+ // the by_uuid mount point is used by singularity when writing
+ // out docker images converted to SIF
arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_uuid")
arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
runner.executorStdout = stdout
runner.executorStderr = stderr
return runner.executor.Create(containerSpec{
- Image: imageID,
- VCPUs: runner.Container.RuntimeConstraints.VCPUs,
- RAM: ram,
- WorkingDir: workdir,
- Env: env,
- BindMounts: bindmounts,
- Command: runner.Container.Command,
- EnableNetwork: enableNetwork,
- NetworkMode: runner.networkMode,
- CgroupParent: runner.setCgroupParent,
- Stdin: stdin,
- Stdout: stdout,
- Stderr: stderr,
+ Image: imageID,
+ VCPUs: runner.Container.RuntimeConstraints.VCPUs,
+ RAM: ram,
+ WorkingDir: workdir,
+ Env: env,
+ BindMounts: bindmounts,
+ Command: runner.Container.Command,
+ EnableNetwork: enableNetwork,
+ CUDADeviceCount: runner.Container.RuntimeConstraints.CUDADeviceCount,
+ NetworkMode: runner.networkMode,
+ CgroupParent: runner.setCgroupParent,
+ Stdin: stdin,
+ Stdout: stdout,
+ Stderr: stderr,
})
}
runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
}()
+ if runner.keepstoreLogger != nil {
+ // Flush any buffered logs from our local keepstore
+ // process. Discard anything logged after this point
+ // -- it won't end up in the log collection, so
+ // there's no point writing it to the collectionfs.
+ runner.keepstoreLogbuf.SetWriter(io.Discard)
+ runner.keepstoreLogger.Close()
+ runner.keepstoreLogger = nil
+ }
+
if runner.LogsPDH != nil {
// If we have already assigned something to LogsPDH,
// we must be closing the re-opened log, which won't
// -- it exists only to send logs to other channels.
return nil
}
+
saved, err := runner.saveLogCollection(true)
if err != nil {
return fmt.Errorf("error saving log collection: %s", err)
}
func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+ log := log.New(stderr, "", 0)
flags := flag.NewFlagSet(prog, flag.ContinueOnError)
statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
ignoreDetachFlag = true
}
- if err := flags.Parse(args); err == flag.ErrHelp {
- return 0
- } else if err != nil {
- log.Print(err)
- return 1
+ if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok {
+ return code
+ } else if !*list && flags.NArg() != 1 {
+ fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
+ return 2
}
containerUUID := flags.Arg(0)
var conf ConfigData
if *stdinConfig {
- err := json.NewDecoder(os.Stdin).Decode(&conf)
+ err := json.NewDecoder(stdin).Decode(&conf)
if err != nil {
- log.Print(err)
+ log.Printf("decode stdin: %s", err)
return 1
}
for k, v := range conf.Env {
arvadosclient.CertFiles = []string{*caCertsPath}
}
- var keepstoreLog bufThenWrite
- keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLog, stderr))
+ var keepstoreLogbuf bufThenWrite
+ keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
if err != nil {
log.Print(err)
return 1
}
api.Retries = 8
- kc, kcerr := keepclient.MakeKeepClient(api)
- if kcerr != nil {
- log.Printf("%s: %v", containerUUID, kcerr)
+ kc, err := keepclient.MakeKeepClient(api)
+ if err != nil {
+ log.Printf("%s: %v", containerUUID, err)
return 1
}
kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
return 1
}
- if keepstore != nil {
- w, err := cr.NewLogWriter("keepstore")
+ if keepstore == nil {
+ // Log explanation (if any) for why we're not running
+ // a local keepstore.
+ var buf bytes.Buffer
+ keepstoreLogbuf.SetWriter(&buf)
+ if buf.Len() > 0 {
+ cr.CrunchLog.Printf("%s", strings.TrimSpace(buf.String()))
+ }
+ } else if logWhat := conf.Cluster.Containers.LocalKeepLogsToContainerLog; logWhat == "none" {
+ cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
+ keepstoreLogbuf.SetWriter(io.Discard)
+ } else {
+ cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
+ logwriter, err := cr.NewLogWriter("keepstore")
if err != nil {
log.Print(err)
return 1
}
- err = keepstoreLog.SetWriter(NewThrottledLogger(w))
+ cr.keepstoreLogger = NewThrottledLogger(logwriter)
+
+ var writer io.WriteCloser = cr.keepstoreLogger
+ if logWhat == "errors" {
+ writer = &filterKeepstoreErrorsOnly{WriteCloser: writer}
+ } else if logWhat != "all" {
+ // should have been caught earlier by
+ // dispatcher's config loader
+ log.Printf("invalid value for Containers.LocalKeepLogsToContainerLog: %q", logWhat)
+ return 1
+ }
+ err = keepstoreLogbuf.SetWriter(writer)
if err != nil {
log.Print(err)
return 1
}
+ cr.keepstoreLogbuf = &keepstoreLogbuf
}
switch *runtimeEngine {
if configData.Cluster == nil || configData.KeepBuffers < 1 {
return nil, nil
}
+ for uuid, vol := range configData.Cluster.Volumes {
+ if len(vol.AccessViaHosts) > 0 {
+ fmt.Fprintf(logbuf, "not starting a local keepstore process because a volume (%s) uses AccessViaHosts\n", uuid)
+ return nil, nil
+ }
+ if !vol.ReadOnly && vol.Replication < configData.Cluster.Collections.DefaultReplication {
+ fmt.Fprintf(logbuf, "not starting a local keepstore process because a writable volume (%s) has replication less than Collections.DefaultReplication (%d < %d)\n", uuid, vol.Replication, configData.Cluster.Collections.DefaultReplication)
+ return nil, nil
+ }
+ }
// Rather than have an alternate way to tell keepstore how
// many buffers to use when starting it this way, we just
return nil, err
}
cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-")
+ if target, err := os.Readlink(cmd.Path); err == nil && strings.HasSuffix(target, ".test") {
+ // If we're a 'go test' process, running
+ // /proc/self/exe would start the test suite in a
+ // child process, which is not what we want.
+ cmd.Path, _ = exec.LookPath("go")
+ cmd.Args = append([]string{"go", "run", "../../cmd/arvados-server"}, cmd.Args[1:]...)
+ cmd.Env = os.Environ()
+ }
cmd.Stdin = &confJSON
cmd.Stdout = logbuf
cmd.Stderr = logbuf
- cmd.Env = []string{
+ cmd.Env = append(cmd.Env,
"GOGC=10",
- "ARVADOS_SERVICE_INTERNAL_URL=" + url,
- }
+ "ARVADOS_SERVICE_INTERNAL_URL="+url)
err = cmd.Start()
if err != nil {
return nil, fmt.Errorf("error starting keepstore process: %w", err)
}
+ cmdExited := false
+ go func() {
+ cmd.Wait()
+ cmdExited = true
+ }()
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
defer cancel()
poll := time.NewTicker(time.Second / 10)
defer poll.Stop()
client := http.Client{}
for range poll.C {
- testReq, err := http.NewRequestWithContext(ctx, "GET", url, nil)
+ testReq, err := http.NewRequestWithContext(ctx, "GET", url+"/_health/ping", nil)
+ testReq.Header.Set("Authorization", "Bearer "+configData.Cluster.ManagementToken)
if err != nil {
return nil, err
}
resp, err := client.Do(testReq)
if err == nil {
- // Success -- don't need to check the
- // response, we just need to know it's
- // accepting requests.
resp.Body.Close()
- break
+ if resp.StatusCode == http.StatusOK {
+ break
+ }
+ }
+ if cmdExited {
+ return nil, fmt.Errorf("keepstore child process exited")
}
if ctx.Err() != nil {
- return nil, fmt.Errorf("timed out waiting for new keepstore process to accept a request")
+ return nil, fmt.Errorf("timed out waiting for new keepstore process to report healthy")
}
}
os.Setenv("ARVADOS_KEEP_SERVICES", url)