Merge remote-tracking branch 'MajewskiKrzysztof/keep_client_delete_method' into 18655...
[arvados.git] / lib / crunchrun / crunchrun.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package crunchrun
6
7 import (
8         "bytes"
9         "context"
10         "encoding/json"
11         "errors"
12         "flag"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "log"
17         "net"
18         "net/http"
19         "os"
20         "os/exec"
21         "os/signal"
22         "path"
23         "path/filepath"
24         "regexp"
25         "runtime"
26         "runtime/pprof"
27         "sort"
28         "strings"
29         "sync"
30         "syscall"
31         "time"
32
33         "git.arvados.org/arvados.git/lib/cmd"
34         "git.arvados.org/arvados.git/lib/crunchstat"
35         "git.arvados.org/arvados.git/sdk/go/arvados"
36         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
37         "git.arvados.org/arvados.git/sdk/go/keepclient"
38         "git.arvados.org/arvados.git/sdk/go/manifest"
39         "golang.org/x/sys/unix"
40 )
41
42 type command struct{}
43
44 var Command = command{}
45
46 // ConfigData contains environment variables and (when needed) cluster
47 // configuration, passed from dispatchcloud to crunch-run on stdin.
48 type ConfigData struct {
49         Env         map[string]string
50         KeepBuffers int
51         Cluster     *arvados.Cluster
52 }
53
54 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
55 type IArvadosClient interface {
56         Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
57         Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
58         Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
59         Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error
60         CallRaw(method string, resourceType string, uuid string, action string, parameters arvadosclient.Dict) (reader io.ReadCloser, err error)
61         Discovery(key string) (interface{}, error)
62 }
63
64 // ErrCancelled is the error returned when the container is cancelled.
65 var ErrCancelled = errors.New("Cancelled")
66
67 // IKeepClient is the minimal Keep API methods used by crunch-run.
68 type IKeepClient interface {
69         BlockWrite(context.Context, arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error)
70         ReadAt(locator string, p []byte, off int) (int, error)
71         ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
72         LocalLocator(locator string) (string, error)
73         ClearBlockCache()
74         SetStorageClasses(sc []string)
75 }
76
77 // NewLogWriter is a factory function to create a new log writer.
78 type NewLogWriter func(name string) (io.WriteCloser, error)
79
80 type RunArvMount func(cmdline []string, tok string) (*exec.Cmd, error)
81
82 type MkTempDir func(string, string) (string, error)
83
84 type PsProcess interface {
85         CmdlineSlice() ([]string, error)
86 }
87
88 // ContainerRunner is the main stateful struct used for a single execution of a
89 // container.
90 type ContainerRunner struct {
91         executor       containerExecutor
92         executorStdin  io.Closer
93         executorStdout io.Closer
94         executorStderr io.Closer
95
96         // Dispatcher client is initialized with the Dispatcher token.
97         // This is a privileged token used to manage container status
98         // and logs.
99         //
100         // We have both dispatcherClient and DispatcherArvClient
101         // because there are two different incompatible Arvados Go
102         // SDKs and we have to use both (hopefully this gets fixed in
103         // #14467)
104         dispatcherClient     *arvados.Client
105         DispatcherArvClient  IArvadosClient
106         DispatcherKeepClient IKeepClient
107
108         // Container client is initialized with the Container token
109         // This token controls the permissions of the container, and
110         // must be used for operations such as reading collections.
111         //
112         // Same comment as above applies to
113         // containerClient/ContainerArvClient.
114         containerClient     *arvados.Client
115         ContainerArvClient  IArvadosClient
116         ContainerKeepClient IKeepClient
117
118         Container     arvados.Container
119         token         string
120         ExitCode      *int
121         NewLogWriter  NewLogWriter
122         CrunchLog     *ThrottledLogger
123         logUUID       string
124         logMtx        sync.Mutex
125         LogCollection arvados.CollectionFileSystem
126         LogsPDH       *string
127         RunArvMount   RunArvMount
128         MkTempDir     MkTempDir
129         ArvMount      *exec.Cmd
130         ArvMountPoint string
131         HostOutputDir string
132         Volumes       map[string]struct{}
133         OutputPDH     *string
134         SigChan       chan os.Signal
135         ArvMountExit  chan error
136         SecretMounts  map[string]arvados.Mount
137         MkArvClient   func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
138         finalState    string
139         parentTemp    string
140
141         keepstoreLogger  io.WriteCloser
142         keepstoreLogbuf  *bufThenWrite
143         statLogger       io.WriteCloser
144         statReporter     *crunchstat.Reporter
145         hoststatLogger   io.WriteCloser
146         hoststatReporter *crunchstat.Reporter
147         statInterval     time.Duration
148         cgroupRoot       string
149         // What we expect the container's cgroup parent to be.
150         expectCgroupParent string
151         // What we tell docker to use as the container's cgroup
152         // parent. Note: Ideally we would use the same field for both
153         // expectCgroupParent and setCgroupParent, and just make it
154         // default to "docker". However, when using docker < 1.10 with
155         // systemd, specifying a non-empty cgroup parent (even the
156         // default value "docker") hits a docker bug
157         // (https://github.com/docker/docker/issues/17126). Using two
158         // separate fields makes it possible to use the "expect cgroup
159         // parent to be X" feature even on sites where the "specify
160         // cgroup parent" feature breaks.
161         setCgroupParent string
162
163         cStateLock sync.Mutex
164         cCancelled bool // StopContainer() invoked
165
166         enableMemoryLimit bool
167         enableNetwork     string // one of "default" or "always"
168         networkMode       string // "none", "host", or "" -- passed through to executor
169         arvMountLog       *ThrottledLogger
170
171         containerWatchdogInterval time.Duration
172
173         gateway Gateway
174 }
175
176 // setupSignals sets up signal handling to gracefully terminate the
177 // underlying container and update state when receiving a TERM, INT or
178 // QUIT signal.
179 func (runner *ContainerRunner) setupSignals() {
180         runner.SigChan = make(chan os.Signal, 1)
181         signal.Notify(runner.SigChan, syscall.SIGTERM)
182         signal.Notify(runner.SigChan, syscall.SIGINT)
183         signal.Notify(runner.SigChan, syscall.SIGQUIT)
184
185         go func(sig chan os.Signal) {
186                 for s := range sig {
187                         runner.stop(s)
188                 }
189         }(runner.SigChan)
190 }
191
192 // stop the underlying container.
193 func (runner *ContainerRunner) stop(sig os.Signal) {
194         runner.cStateLock.Lock()
195         defer runner.cStateLock.Unlock()
196         if sig != nil {
197                 runner.CrunchLog.Printf("caught signal: %v", sig)
198         }
199         runner.cCancelled = true
200         runner.CrunchLog.Printf("stopping container")
201         err := runner.executor.Stop()
202         if err != nil {
203                 runner.CrunchLog.Printf("error stopping container: %s", err)
204         }
205 }
206
207 var errorBlacklist = []string{
208         "(?ms).*[Cc]annot connect to the Docker daemon.*",
209         "(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*",
210         "(?ms).*grpc: the connection is unavailable.*",
211 }
212 var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
213
214 func (runner *ContainerRunner) runBrokenNodeHook() {
215         if *brokenNodeHook == "" {
216                 path := filepath.Join(lockdir, brokenfile)
217                 runner.CrunchLog.Printf("Writing %s to mark node as broken", path)
218                 f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700)
219                 if err != nil {
220                         runner.CrunchLog.Printf("Error writing %s: %s", path, err)
221                         return
222                 }
223                 f.Close()
224         } else {
225                 runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
226                 // run killme script
227                 c := exec.Command(*brokenNodeHook)
228                 c.Stdout = runner.CrunchLog
229                 c.Stderr = runner.CrunchLog
230                 err := c.Run()
231                 if err != nil {
232                         runner.CrunchLog.Printf("Error running broken node hook: %v", err)
233                 }
234         }
235 }
236
237 func (runner *ContainerRunner) checkBrokenNode(goterr error) bool {
238         for _, d := range errorBlacklist {
239                 if m, e := regexp.MatchString(d, goterr.Error()); m && e == nil {
240                         runner.CrunchLog.Printf("Error suggests node is unable to run containers: %v", goterr)
241                         runner.runBrokenNodeHook()
242                         return true
243                 }
244         }
245         return false
246 }
247
248 // LoadImage determines the docker image id from the container record and
249 // checks if it is available in the local Docker image store.  If not, it loads
250 // the image from Keep.
251 func (runner *ContainerRunner) LoadImage() (string, error) {
252         runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
253
254         d, err := os.Open(runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage)
255         if err != nil {
256                 return "", err
257         }
258         defer d.Close()
259         allfiles, err := d.Readdirnames(-1)
260         if err != nil {
261                 return "", err
262         }
263         var tarfiles []string
264         for _, fnm := range allfiles {
265                 if strings.HasSuffix(fnm, ".tar") {
266                         tarfiles = append(tarfiles, fnm)
267                 }
268         }
269         if len(tarfiles) == 0 {
270                 return "", fmt.Errorf("image collection does not include a .tar image file")
271         }
272         if len(tarfiles) > 1 {
273                 return "", fmt.Errorf("cannot choose from multiple tar files in image collection: %v", tarfiles)
274         }
275         imageID := tarfiles[0][:len(tarfiles[0])-4]
276         imageTarballPath := runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage + "/" + imageID + ".tar"
277         runner.CrunchLog.Printf("Using Docker image id %q", imageID)
278
279         runner.CrunchLog.Print("Loading Docker image from keep")
280         err = runner.executor.LoadImage(imageID, imageTarballPath, runner.Container, runner.ArvMountPoint,
281                 runner.containerClient)
282         if err != nil {
283                 return "", err
284         }
285
286         return imageID, nil
287 }
288
289 func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *exec.Cmd, err error) {
290         c = exec.Command(cmdline[0], cmdline[1:]...)
291
292         // Copy our environment, but override ARVADOS_API_TOKEN with
293         // the container auth token.
294         c.Env = nil
295         for _, s := range os.Environ() {
296                 if !strings.HasPrefix(s, "ARVADOS_API_TOKEN=") {
297                         c.Env = append(c.Env, s)
298                 }
299         }
300         c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
301
302         w, err := runner.NewLogWriter("arv-mount")
303         if err != nil {
304                 return nil, err
305         }
306         runner.arvMountLog = NewThrottledLogger(w)
307         scanner := logScanner{
308                 Patterns: []string{
309                         "Keep write error",
310                         "Block not found error",
311                         "Unhandled exception during FUSE operation",
312                 },
313                 ReportFunc: runner.reportArvMountWarning,
314         }
315         c.Stdout = runner.arvMountLog
316         c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr, &scanner)
317
318         runner.CrunchLog.Printf("Running %v", c.Args)
319
320         err = c.Start()
321         if err != nil {
322                 return nil, err
323         }
324
325         statReadme := make(chan bool)
326         runner.ArvMountExit = make(chan error)
327
328         keepStatting := true
329         go func() {
330                 for keepStatting {
331                         time.Sleep(100 * time.Millisecond)
332                         _, err = os.Stat(fmt.Sprintf("%s/by_id/README", runner.ArvMountPoint))
333                         if err == nil {
334                                 keepStatting = false
335                                 statReadme <- true
336                         }
337                 }
338                 close(statReadme)
339         }()
340
341         go func() {
342                 mnterr := c.Wait()
343                 if mnterr != nil {
344                         runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
345                 }
346                 runner.ArvMountExit <- mnterr
347                 close(runner.ArvMountExit)
348         }()
349
350         select {
351         case <-statReadme:
352                 break
353         case err := <-runner.ArvMountExit:
354                 runner.ArvMount = nil
355                 keepStatting = false
356                 return nil, err
357         }
358
359         return c, nil
360 }
361
362 func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) {
363         if runner.ArvMountPoint == "" {
364                 runner.ArvMountPoint, err = runner.MkTempDir(runner.parentTemp, prefix)
365         }
366         return
367 }
368
369 func copyfile(src string, dst string) (err error) {
370         srcfile, err := os.Open(src)
371         if err != nil {
372                 return
373         }
374
375         os.MkdirAll(path.Dir(dst), 0777)
376
377         dstfile, err := os.Create(dst)
378         if err != nil {
379                 return
380         }
381         _, err = io.Copy(dstfile, srcfile)
382         if err != nil {
383                 return
384         }
385
386         err = srcfile.Close()
387         err2 := dstfile.Close()
388
389         if err != nil {
390                 return
391         }
392
393         if err2 != nil {
394                 return err2
395         }
396
397         return nil
398 }
399
400 func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
401         bindmounts := map[string]bindmount{}
402         err := runner.SetupArvMountPoint("keep")
403         if err != nil {
404                 return nil, fmt.Errorf("While creating keep mount temp dir: %v", err)
405         }
406
407         token, err := runner.ContainerToken()
408         if err != nil {
409                 return nil, fmt.Errorf("could not get container token: %s", err)
410         }
411         runner.CrunchLog.Printf("container token %q", token)
412
413         pdhOnly := true
414         tmpcount := 0
415         arvMountCmd := []string{
416                 "arv-mount",
417                 "--foreground",
418                 "--read-write",
419                 "--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","),
420                 fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
421
422         if runner.executor.Runtime() == "docker" {
423                 arvMountCmd = append(arvMountCmd, "--allow-other")
424         }
425
426         if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
427                 arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
428         }
429
430         collectionPaths := []string{}
431         needCertMount := true
432         type copyFile struct {
433                 src  string
434                 bind string
435         }
436         var copyFiles []copyFile
437
438         var binds []string
439         for bind := range runner.Container.Mounts {
440                 binds = append(binds, bind)
441         }
442         for bind := range runner.SecretMounts {
443                 if _, ok := runner.Container.Mounts[bind]; ok {
444                         return nil, fmt.Errorf("secret mount %q conflicts with regular mount", bind)
445                 }
446                 if runner.SecretMounts[bind].Kind != "json" &&
447                         runner.SecretMounts[bind].Kind != "text" {
448                         return nil, fmt.Errorf("secret mount %q type is %q but only 'json' and 'text' are permitted",
449                                 bind, runner.SecretMounts[bind].Kind)
450                 }
451                 binds = append(binds, bind)
452         }
453         sort.Strings(binds)
454
455         for _, bind := range binds {
456                 mnt, ok := runner.Container.Mounts[bind]
457                 if !ok {
458                         mnt = runner.SecretMounts[bind]
459                 }
460                 if bind == "stdout" || bind == "stderr" {
461                         // Is it a "file" mount kind?
462                         if mnt.Kind != "file" {
463                                 return nil, fmt.Errorf("unsupported mount kind '%s' for %s: only 'file' is supported", mnt.Kind, bind)
464                         }
465
466                         // Does path start with OutputPath?
467                         prefix := runner.Container.OutputPath
468                         if !strings.HasSuffix(prefix, "/") {
469                                 prefix += "/"
470                         }
471                         if !strings.HasPrefix(mnt.Path, prefix) {
472                                 return nil, fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix)
473                         }
474                 }
475
476                 if bind == "stdin" {
477                         // Is it a "collection" mount kind?
478                         if mnt.Kind != "collection" && mnt.Kind != "json" {
479                                 return nil, fmt.Errorf("unsupported mount kind '%s' for stdin: only 'collection' and 'json' are supported", mnt.Kind)
480                         }
481                 }
482
483                 if bind == "/etc/arvados/ca-certificates.crt" {
484                         needCertMount = false
485                 }
486
487                 if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && bind != runner.Container.OutputPath+"/" {
488                         if mnt.Kind != "collection" && mnt.Kind != "text" && mnt.Kind != "json" {
489                                 return nil, fmt.Errorf("only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path for %q, was %q", bind, mnt.Kind)
490                         }
491                 }
492
493                 switch {
494                 case mnt.Kind == "collection" && bind != "stdin":
495                         var src string
496                         if mnt.UUID != "" && mnt.PortableDataHash != "" {
497                                 return nil, fmt.Errorf("cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
498                         }
499                         if mnt.UUID != "" {
500                                 if mnt.Writable {
501                                         return nil, fmt.Errorf("writing to existing collections currently not permitted")
502                                 }
503                                 pdhOnly = false
504                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
505                         } else if mnt.PortableDataHash != "" {
506                                 if mnt.Writable && !strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
507                                         return nil, fmt.Errorf("can never write to a collection specified by portable data hash")
508                                 }
509                                 idx := strings.Index(mnt.PortableDataHash, "/")
510                                 if idx > 0 {
511                                         mnt.Path = path.Clean(mnt.PortableDataHash[idx:])
512                                         mnt.PortableDataHash = mnt.PortableDataHash[0:idx]
513                                         runner.Container.Mounts[bind] = mnt
514                                 }
515                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
516                                 if mnt.Path != "" && mnt.Path != "." {
517                                         if strings.HasPrefix(mnt.Path, "./") {
518                                                 mnt.Path = mnt.Path[2:]
519                                         } else if strings.HasPrefix(mnt.Path, "/") {
520                                                 mnt.Path = mnt.Path[1:]
521                                         }
522                                         src += "/" + mnt.Path
523                                 }
524                         } else {
525                                 src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
526                                 arvMountCmd = append(arvMountCmd, "--mount-tmp")
527                                 arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
528                                 tmpcount++
529                         }
530                         if mnt.Writable {
531                                 if bind == runner.Container.OutputPath {
532                                         runner.HostOutputDir = src
533                                         bindmounts[bind] = bindmount{HostPath: src}
534                                 } else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
535                                         copyFiles = append(copyFiles, copyFile{src, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
536                                 } else {
537                                         bindmounts[bind] = bindmount{HostPath: src}
538                                 }
539                         } else {
540                                 bindmounts[bind] = bindmount{HostPath: src, ReadOnly: true}
541                         }
542                         collectionPaths = append(collectionPaths, src)
543
544                 case mnt.Kind == "tmp":
545                         var tmpdir string
546                         tmpdir, err = runner.MkTempDir(runner.parentTemp, "tmp")
547                         if err != nil {
548                                 return nil, fmt.Errorf("while creating mount temp dir: %v", err)
549                         }
550                         st, staterr := os.Stat(tmpdir)
551                         if staterr != nil {
552                                 return nil, fmt.Errorf("while Stat on temp dir: %v", staterr)
553                         }
554                         err = os.Chmod(tmpdir, st.Mode()|os.ModeSetgid|0777)
555                         if staterr != nil {
556                                 return nil, fmt.Errorf("while Chmod temp dir: %v", err)
557                         }
558                         bindmounts[bind] = bindmount{HostPath: tmpdir}
559                         if bind == runner.Container.OutputPath {
560                                 runner.HostOutputDir = tmpdir
561                         }
562
563                 case mnt.Kind == "json" || mnt.Kind == "text":
564                         var filedata []byte
565                         if mnt.Kind == "json" {
566                                 filedata, err = json.Marshal(mnt.Content)
567                                 if err != nil {
568                                         return nil, fmt.Errorf("encoding json data: %v", err)
569                                 }
570                         } else {
571                                 text, ok := mnt.Content.(string)
572                                 if !ok {
573                                         return nil, fmt.Errorf("content for mount %q must be a string", bind)
574                                 }
575                                 filedata = []byte(text)
576                         }
577
578                         tmpdir, err := runner.MkTempDir(runner.parentTemp, mnt.Kind)
579                         if err != nil {
580                                 return nil, fmt.Errorf("creating temp dir: %v", err)
581                         }
582                         tmpfn := filepath.Join(tmpdir, "mountdata."+mnt.Kind)
583                         err = ioutil.WriteFile(tmpfn, filedata, 0444)
584                         if err != nil {
585                                 return nil, fmt.Errorf("writing temp file: %v", err)
586                         }
587                         if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
588                                 copyFiles = append(copyFiles, copyFile{tmpfn, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
589                         } else {
590                                 bindmounts[bind] = bindmount{HostPath: tmpfn, ReadOnly: true}
591                         }
592
593                 case mnt.Kind == "git_tree":
594                         tmpdir, err := runner.MkTempDir(runner.parentTemp, "git_tree")
595                         if err != nil {
596                                 return nil, fmt.Errorf("creating temp dir: %v", err)
597                         }
598                         err = gitMount(mnt).extractTree(runner.ContainerArvClient, tmpdir, token)
599                         if err != nil {
600                                 return nil, err
601                         }
602                         bindmounts[bind] = bindmount{HostPath: tmpdir, ReadOnly: true}
603                 }
604         }
605
606         if runner.HostOutputDir == "" {
607                 return nil, fmt.Errorf("output path does not correspond to a writable mount point")
608         }
609
610         if needCertMount && runner.Container.RuntimeConstraints.API {
611                 for _, certfile := range arvadosclient.CertFiles {
612                         _, err := os.Stat(certfile)
613                         if err == nil {
614                                 bindmounts["/etc/arvados/ca-certificates.crt"] = bindmount{HostPath: certfile, ReadOnly: true}
615                                 break
616                         }
617                 }
618         }
619
620         if pdhOnly {
621                 // If we are only mounting collections by pdh, make
622                 // sure we don't subscribe to websocket events to
623                 // avoid putting undesired load on the API server
624                 arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id", "--disable-event-listening")
625         } else {
626                 arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
627         }
628         // the by_uuid mount point is used by singularity when writing
629         // out docker images converted to SIF
630         arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_uuid")
631         arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
632
633         runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
634         if err != nil {
635                 return nil, fmt.Errorf("while trying to start arv-mount: %v", err)
636         }
637
638         for _, p := range collectionPaths {
639                 _, err = os.Stat(p)
640                 if err != nil {
641                         return nil, fmt.Errorf("while checking that input files exist: %v", err)
642                 }
643         }
644
645         for _, cp := range copyFiles {
646                 st, err := os.Stat(cp.src)
647                 if err != nil {
648                         return nil, fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err)
649                 }
650                 if st.IsDir() {
651                         err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
652                                 if walkerr != nil {
653                                         return walkerr
654                                 }
655                                 target := path.Join(cp.bind, walkpath[len(cp.src):])
656                                 if walkinfo.Mode().IsRegular() {
657                                         copyerr := copyfile(walkpath, target)
658                                         if copyerr != nil {
659                                                 return copyerr
660                                         }
661                                         return os.Chmod(target, walkinfo.Mode()|0777)
662                                 } else if walkinfo.Mode().IsDir() {
663                                         mkerr := os.MkdirAll(target, 0777)
664                                         if mkerr != nil {
665                                                 return mkerr
666                                         }
667                                         return os.Chmod(target, walkinfo.Mode()|os.ModeSetgid|0777)
668                                 } else {
669                                         return fmt.Errorf("source %q is not a regular file or directory", cp.src)
670                                 }
671                         })
672                 } else if st.Mode().IsRegular() {
673                         err = copyfile(cp.src, cp.bind)
674                         if err == nil {
675                                 err = os.Chmod(cp.bind, st.Mode()|0777)
676                         }
677                 }
678                 if err != nil {
679                         return nil, fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err)
680                 }
681         }
682
683         return bindmounts, nil
684 }
685
686 func (runner *ContainerRunner) stopHoststat() error {
687         if runner.hoststatReporter == nil {
688                 return nil
689         }
690         runner.hoststatReporter.Stop()
691         err := runner.hoststatLogger.Close()
692         if err != nil {
693                 return fmt.Errorf("error closing hoststat logs: %v", err)
694         }
695         return nil
696 }
697
698 func (runner *ContainerRunner) startHoststat() error {
699         w, err := runner.NewLogWriter("hoststat")
700         if err != nil {
701                 return err
702         }
703         runner.hoststatLogger = NewThrottledLogger(w)
704         runner.hoststatReporter = &crunchstat.Reporter{
705                 Logger:     log.New(runner.hoststatLogger, "", 0),
706                 CgroupRoot: runner.cgroupRoot,
707                 PollPeriod: runner.statInterval,
708         }
709         runner.hoststatReporter.Start()
710         return nil
711 }
712
713 func (runner *ContainerRunner) startCrunchstat() error {
714         w, err := runner.NewLogWriter("crunchstat")
715         if err != nil {
716                 return err
717         }
718         runner.statLogger = NewThrottledLogger(w)
719         runner.statReporter = &crunchstat.Reporter{
720                 CID:          runner.executor.CgroupID(),
721                 Logger:       log.New(runner.statLogger, "", 0),
722                 CgroupParent: runner.expectCgroupParent,
723                 CgroupRoot:   runner.cgroupRoot,
724                 PollPeriod:   runner.statInterval,
725                 TempDir:      runner.parentTemp,
726         }
727         runner.statReporter.Start()
728         return nil
729 }
730
731 type infoCommand struct {
732         label string
733         cmd   []string
734 }
735
736 // LogHostInfo logs info about the current host, for debugging and
737 // accounting purposes. Although it's logged as "node-info", this is
738 // about the environment where crunch-run is actually running, which
739 // might differ from what's described in the node record (see
740 // LogNodeRecord).
741 func (runner *ContainerRunner) LogHostInfo() (err error) {
742         w, err := runner.NewLogWriter("node-info")
743         if err != nil {
744                 return
745         }
746
747         commands := []infoCommand{
748                 {
749                         label: "Host Information",
750                         cmd:   []string{"uname", "-a"},
751                 },
752                 {
753                         label: "CPU Information",
754                         cmd:   []string{"cat", "/proc/cpuinfo"},
755                 },
756                 {
757                         label: "Memory Information",
758                         cmd:   []string{"cat", "/proc/meminfo"},
759                 },
760                 {
761                         label: "Disk Space",
762                         cmd:   []string{"df", "-m", "/", os.TempDir()},
763                 },
764                 {
765                         label: "Disk INodes",
766                         cmd:   []string{"df", "-i", "/", os.TempDir()},
767                 },
768         }
769
770         // Run commands with informational output to be logged.
771         for _, command := range commands {
772                 fmt.Fprintln(w, command.label)
773                 cmd := exec.Command(command.cmd[0], command.cmd[1:]...)
774                 cmd.Stdout = w
775                 cmd.Stderr = w
776                 if err := cmd.Run(); err != nil {
777                         err = fmt.Errorf("While running command %q: %v", command.cmd, err)
778                         fmt.Fprintln(w, err)
779                         return err
780                 }
781                 fmt.Fprintln(w, "")
782         }
783
784         err = w.Close()
785         if err != nil {
786                 return fmt.Errorf("While closing node-info logs: %v", err)
787         }
788         return nil
789 }
790
791 // LogContainerRecord gets and saves the raw JSON container record from the API server
792 func (runner *ContainerRunner) LogContainerRecord() error {
793         logged, err := runner.logAPIResponse("container", "containers", map[string]interface{}{"filters": [][]string{{"uuid", "=", runner.Container.UUID}}}, nil)
794         if !logged && err == nil {
795                 err = fmt.Errorf("error: no container record found for %s", runner.Container.UUID)
796         }
797         return err
798 }
799
800 // LogNodeRecord logs the current host's InstanceType config entry (or
801 // the arvados#node record, if running via crunch-dispatch-slurm).
802 func (runner *ContainerRunner) LogNodeRecord() error {
803         if it := os.Getenv("InstanceType"); it != "" {
804                 // Dispatched via arvados-dispatch-cloud. Save
805                 // InstanceType config fragment received from
806                 // dispatcher on stdin.
807                 w, err := runner.LogCollection.OpenFile("node.json", os.O_CREATE|os.O_WRONLY, 0666)
808                 if err != nil {
809                         return err
810                 }
811                 defer w.Close()
812                 _, err = io.WriteString(w, it)
813                 if err != nil {
814                         return err
815                 }
816                 return w.Close()
817         }
818         // Dispatched via crunch-dispatch-slurm. Look up
819         // apiserver's node record corresponding to
820         // $SLURMD_NODENAME.
821         hostname := os.Getenv("SLURMD_NODENAME")
822         if hostname == "" {
823                 hostname, _ = os.Hostname()
824         }
825         _, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) {
826                 // The "info" field has admin-only info when
827                 // obtained with a privileged token, and
828                 // should not be logged.
829                 node, ok := resp.(map[string]interface{})
830                 if ok {
831                         delete(node, "info")
832                 }
833         })
834         return err
835 }
836
837 func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}, munge func(interface{})) (logged bool, err error) {
838         writer, err := runner.LogCollection.OpenFile(label+".json", os.O_CREATE|os.O_WRONLY, 0666)
839         if err != nil {
840                 return false, err
841         }
842         w := &ArvLogWriter{
843                 ArvClient:     runner.DispatcherArvClient,
844                 UUID:          runner.Container.UUID,
845                 loggingStream: label,
846                 writeCloser:   writer,
847         }
848
849         reader, err := runner.DispatcherArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params))
850         if err != nil {
851                 return false, fmt.Errorf("error getting %s record: %v", label, err)
852         }
853         defer reader.Close()
854
855         dec := json.NewDecoder(reader)
856         dec.UseNumber()
857         var resp map[string]interface{}
858         if err = dec.Decode(&resp); err != nil {
859                 return false, fmt.Errorf("error decoding %s list response: %v", label, err)
860         }
861         items, ok := resp["items"].([]interface{})
862         if !ok {
863                 return false, fmt.Errorf("error decoding %s list response: no \"items\" key in API list response", label)
864         } else if len(items) < 1 {
865                 return false, nil
866         }
867         if munge != nil {
868                 munge(items[0])
869         }
870         // Re-encode it using indentation to improve readability
871         enc := json.NewEncoder(w)
872         enc.SetIndent("", "    ")
873         if err = enc.Encode(items[0]); err != nil {
874                 return false, fmt.Errorf("error logging %s record: %v", label, err)
875         }
876         err = w.Close()
877         if err != nil {
878                 return false, fmt.Errorf("error closing %s.json in log collection: %v", label, err)
879         }
880         return true, nil
881 }
882
883 func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
884         stdoutPath := mntPath[len(runner.Container.OutputPath):]
885         index := strings.LastIndex(stdoutPath, "/")
886         if index > 0 {
887                 subdirs := stdoutPath[:index]
888                 if subdirs != "" {
889                         st, err := os.Stat(runner.HostOutputDir)
890                         if err != nil {
891                                 return nil, fmt.Errorf("While Stat on temp dir: %v", err)
892                         }
893                         stdoutPath := filepath.Join(runner.HostOutputDir, subdirs)
894                         err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
895                         if err != nil {
896                                 return nil, fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
897                         }
898                 }
899         }
900         stdoutFile, err := os.Create(filepath.Join(runner.HostOutputDir, stdoutPath))
901         if err != nil {
902                 return nil, fmt.Errorf("While creating file %q: %v", stdoutPath, err)
903         }
904
905         return stdoutFile, nil
906 }
907
908 // CreateContainer creates the docker container.
909 func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error {
910         var stdin io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
911         if mnt, ok := runner.Container.Mounts["stdin"]; ok {
912                 switch mnt.Kind {
913                 case "collection":
914                         var collID string
915                         if mnt.UUID != "" {
916                                 collID = mnt.UUID
917                         } else {
918                                 collID = mnt.PortableDataHash
919                         }
920                         path := runner.ArvMountPoint + "/by_id/" + collID + "/" + mnt.Path
921                         f, err := os.Open(path)
922                         if err != nil {
923                                 return err
924                         }
925                         stdin = f
926                 case "json":
927                         j, err := json.Marshal(mnt.Content)
928                         if err != nil {
929                                 return fmt.Errorf("error encoding stdin json data: %v", err)
930                         }
931                         stdin = ioutil.NopCloser(bytes.NewReader(j))
932                 default:
933                         return fmt.Errorf("stdin mount has unsupported kind %q", mnt.Kind)
934                 }
935         }
936
937         var stdout, stderr io.WriteCloser
938         if mnt, ok := runner.Container.Mounts["stdout"]; ok {
939                 f, err := runner.getStdoutFile(mnt.Path)
940                 if err != nil {
941                         return err
942                 }
943                 stdout = f
944         } else if w, err := runner.NewLogWriter("stdout"); err != nil {
945                 return err
946         } else {
947                 stdout = NewThrottledLogger(w)
948         }
949
950         if mnt, ok := runner.Container.Mounts["stderr"]; ok {
951                 f, err := runner.getStdoutFile(mnt.Path)
952                 if err != nil {
953                         return err
954                 }
955                 stderr = f
956         } else if w, err := runner.NewLogWriter("stderr"); err != nil {
957                 return err
958         } else {
959                 stderr = NewThrottledLogger(w)
960         }
961
962         env := runner.Container.Environment
963         enableNetwork := runner.enableNetwork == "always"
964         if runner.Container.RuntimeConstraints.API {
965                 enableNetwork = true
966                 tok, err := runner.ContainerToken()
967                 if err != nil {
968                         return err
969                 }
970                 env = map[string]string{}
971                 for k, v := range runner.Container.Environment {
972                         env[k] = v
973                 }
974                 env["ARVADOS_API_TOKEN"] = tok
975                 env["ARVADOS_API_HOST"] = os.Getenv("ARVADOS_API_HOST")
976                 env["ARVADOS_API_HOST_INSECURE"] = os.Getenv("ARVADOS_API_HOST_INSECURE")
977         }
978         workdir := runner.Container.Cwd
979         if workdir == "." {
980                 // both "" and "." mean default
981                 workdir = ""
982         }
983         ram := runner.Container.RuntimeConstraints.RAM
984         if !runner.enableMemoryLimit {
985                 ram = 0
986         }
987         runner.executorStdin = stdin
988         runner.executorStdout = stdout
989         runner.executorStderr = stderr
990
991         if runner.Container.RuntimeConstraints.CUDA.DeviceCount > 0 {
992                 nvidiaModprobe(runner.CrunchLog)
993         }
994
995         return runner.executor.Create(containerSpec{
996                 Image:           imageID,
997                 VCPUs:           runner.Container.RuntimeConstraints.VCPUs,
998                 RAM:             ram,
999                 WorkingDir:      workdir,
1000                 Env:             env,
1001                 BindMounts:      bindmounts,
1002                 Command:         runner.Container.Command,
1003                 EnableNetwork:   enableNetwork,
1004                 CUDADeviceCount: runner.Container.RuntimeConstraints.CUDA.DeviceCount,
1005                 NetworkMode:     runner.networkMode,
1006                 CgroupParent:    runner.setCgroupParent,
1007                 Stdin:           stdin,
1008                 Stdout:          stdout,
1009                 Stderr:          stderr,
1010         })
1011 }
1012
1013 // StartContainer starts the docker container created by CreateContainer.
1014 func (runner *ContainerRunner) StartContainer() error {
1015         runner.CrunchLog.Printf("Starting container")
1016         runner.cStateLock.Lock()
1017         defer runner.cStateLock.Unlock()
1018         if runner.cCancelled {
1019                 return ErrCancelled
1020         }
1021         err := runner.executor.Start()
1022         if err != nil {
1023                 var advice string
1024                 if m, e := regexp.MatchString("(?ms).*(exec|System error).*(no such file or directory|file not found).*", err.Error()); m && e == nil {
1025                         advice = fmt.Sprintf("\nPossible causes: command %q is missing, the interpreter given in #! is missing, or script has Windows line endings.", runner.Container.Command[0])
1026                 }
1027                 return fmt.Errorf("could not start container: %v%s", err, advice)
1028         }
1029         return nil
1030 }
1031
1032 // WaitFinish waits for the container to terminate, capture the exit code, and
1033 // close the stdout/stderr logging.
1034 func (runner *ContainerRunner) WaitFinish() error {
1035         runner.CrunchLog.Print("Waiting for container to finish")
1036         var timeout <-chan time.Time
1037         if s := runner.Container.SchedulingParameters.MaxRunTime; s > 0 {
1038                 timeout = time.After(time.Duration(s) * time.Second)
1039         }
1040         ctx, cancel := context.WithCancel(context.Background())
1041         defer cancel()
1042         go func() {
1043                 select {
1044                 case <-timeout:
1045                         runner.CrunchLog.Printf("maximum run time exceeded. Stopping container.")
1046                         runner.stop(nil)
1047                 case <-runner.ArvMountExit:
1048                         runner.CrunchLog.Printf("arv-mount exited while container is still running. Stopping container.")
1049                         runner.stop(nil)
1050                 case <-ctx.Done():
1051                 }
1052         }()
1053         exitcode, err := runner.executor.Wait(ctx)
1054         if err != nil {
1055                 runner.checkBrokenNode(err)
1056                 return err
1057         }
1058         runner.ExitCode = &exitcode
1059
1060         extra := ""
1061         if exitcode&0x80 != 0 {
1062                 // Convert raw exit status (0x80 + signal number) to a
1063                 // string to log after the code, like " (signal 101)"
1064                 // or " (signal 9, killed)"
1065                 sig := syscall.WaitStatus(exitcode).Signal()
1066                 if name := unix.SignalName(sig); name != "" {
1067                         extra = fmt.Sprintf(" (signal %d, %s)", sig, name)
1068                 } else {
1069                         extra = fmt.Sprintf(" (signal %d)", sig)
1070                 }
1071         }
1072         runner.CrunchLog.Printf("Container exited with status code %d%s", exitcode, extra)
1073
1074         var returnErr error
1075         if err = runner.executorStdin.Close(); err != nil {
1076                 err = fmt.Errorf("error closing container stdin: %s", err)
1077                 runner.CrunchLog.Printf("%s", err)
1078                 returnErr = err
1079         }
1080         if err = runner.executorStdout.Close(); err != nil {
1081                 err = fmt.Errorf("error closing container stdout: %s", err)
1082                 runner.CrunchLog.Printf("%s", err)
1083                 if returnErr == nil {
1084                         returnErr = err
1085                 }
1086         }
1087         if err = runner.executorStderr.Close(); err != nil {
1088                 err = fmt.Errorf("error closing container stderr: %s", err)
1089                 runner.CrunchLog.Printf("%s", err)
1090                 if returnErr == nil {
1091                         returnErr = err
1092                 }
1093         }
1094
1095         if runner.statReporter != nil {
1096                 runner.statReporter.Stop()
1097                 err = runner.statLogger.Close()
1098                 if err != nil {
1099                         runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
1100                 }
1101         }
1102         return returnErr
1103 }
1104
1105 func (runner *ContainerRunner) updateLogs() {
1106         ticker := time.NewTicker(crunchLogUpdatePeriod / 360)
1107         defer ticker.Stop()
1108
1109         sigusr1 := make(chan os.Signal, 1)
1110         signal.Notify(sigusr1, syscall.SIGUSR1)
1111         defer signal.Stop(sigusr1)
1112
1113         saveAtTime := time.Now().Add(crunchLogUpdatePeriod)
1114         saveAtSize := crunchLogUpdateSize
1115         var savedSize int64
1116         for {
1117                 select {
1118                 case <-ticker.C:
1119                 case <-sigusr1:
1120                         saveAtTime = time.Now()
1121                 }
1122                 runner.logMtx.Lock()
1123                 done := runner.LogsPDH != nil
1124                 runner.logMtx.Unlock()
1125                 if done {
1126                         return
1127                 }
1128                 size := runner.LogCollection.Size()
1129                 if size == savedSize || (time.Now().Before(saveAtTime) && size < saveAtSize) {
1130                         continue
1131                 }
1132                 saveAtTime = time.Now().Add(crunchLogUpdatePeriod)
1133                 saveAtSize = runner.LogCollection.Size() + crunchLogUpdateSize
1134                 saved, err := runner.saveLogCollection(false)
1135                 if err != nil {
1136                         runner.CrunchLog.Printf("error updating log collection: %s", err)
1137                         continue
1138                 }
1139
1140                 var updated arvados.Container
1141                 err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
1142                         "container": arvadosclient.Dict{"log": saved.PortableDataHash},
1143                 }, &updated)
1144                 if err != nil {
1145                         runner.CrunchLog.Printf("error updating container log to %s: %s", saved.PortableDataHash, err)
1146                         continue
1147                 }
1148
1149                 savedSize = size
1150         }
1151 }
1152
1153 func (runner *ContainerRunner) reportArvMountWarning(pattern, text string) {
1154         var updated arvados.Container
1155         err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
1156                 "container": arvadosclient.Dict{
1157                         "runtime_status": arvadosclient.Dict{
1158                                 "warning":       "arv-mount: " + pattern,
1159                                 "warningDetail": text,
1160                         },
1161                 },
1162         }, &updated)
1163         if err != nil {
1164                 runner.CrunchLog.Printf("error updating container runtime_status: %s", err)
1165         }
1166 }
1167
1168 // CaptureOutput saves data from the container's output directory if
1169 // needed, and updates the container output accordingly.
1170 func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) error {
1171         if runner.Container.RuntimeConstraints.API {
1172                 // Output may have been set directly by the container, so
1173                 // refresh the container record to check.
1174                 err := runner.DispatcherArvClient.Get("containers", runner.Container.UUID,
1175                         nil, &runner.Container)
1176                 if err != nil {
1177                         return err
1178                 }
1179                 if runner.Container.Output != "" {
1180                         // Container output is already set.
1181                         runner.OutputPDH = &runner.Container.Output
1182                         return nil
1183                 }
1184         }
1185
1186         txt, err := (&copier{
1187                 client:        runner.containerClient,
1188                 arvClient:     runner.ContainerArvClient,
1189                 keepClient:    runner.ContainerKeepClient,
1190                 hostOutputDir: runner.HostOutputDir,
1191                 ctrOutputDir:  runner.Container.OutputPath,
1192                 bindmounts:    bindmounts,
1193                 mounts:        runner.Container.Mounts,
1194                 secretMounts:  runner.SecretMounts,
1195                 logger:        runner.CrunchLog,
1196         }).Copy()
1197         if err != nil {
1198                 return err
1199         }
1200         if n := len(regexp.MustCompile(` [0-9a-f]+\+\S*\+R`).FindAllStringIndex(txt, -1)); n > 0 {
1201                 runner.CrunchLog.Printf("Copying %d data blocks from remote input collections...", n)
1202                 fs, err := (&arvados.Collection{ManifestText: txt}).FileSystem(runner.containerClient, runner.ContainerKeepClient)
1203                 if err != nil {
1204                         return err
1205                 }
1206                 txt, err = fs.MarshalManifest(".")
1207                 if err != nil {
1208                         return err
1209                 }
1210         }
1211         var resp arvados.Collection
1212         err = runner.ContainerArvClient.Create("collections", arvadosclient.Dict{
1213                 "ensure_unique_name": true,
1214                 "collection": arvadosclient.Dict{
1215                         "is_trashed":    true,
1216                         "name":          "output for " + runner.Container.UUID,
1217                         "manifest_text": txt,
1218                 },
1219         }, &resp)
1220         if err != nil {
1221                 return fmt.Errorf("error creating output collection: %v", err)
1222         }
1223         runner.OutputPDH = &resp.PortableDataHash
1224         return nil
1225 }
1226
1227 func (runner *ContainerRunner) CleanupDirs() {
1228         if runner.ArvMount != nil {
1229                 var delay int64 = 8
1230                 umount := exec.Command("arv-mount", fmt.Sprintf("--unmount-timeout=%d", delay), "--unmount", runner.ArvMountPoint)
1231                 umount.Stdout = runner.CrunchLog
1232                 umount.Stderr = runner.CrunchLog
1233                 runner.CrunchLog.Printf("Running %v", umount.Args)
1234                 umnterr := umount.Start()
1235
1236                 if umnterr != nil {
1237                         runner.CrunchLog.Printf("Error unmounting: %v", umnterr)
1238                         runner.ArvMount.Process.Kill()
1239                 } else {
1240                         // If arv-mount --unmount gets stuck for any reason, we
1241                         // don't want to wait for it forever.  Do Wait() in a goroutine
1242                         // so it doesn't block crunch-run.
1243                         umountExit := make(chan error)
1244                         go func() {
1245                                 mnterr := umount.Wait()
1246                                 if mnterr != nil {
1247                                         runner.CrunchLog.Printf("Error unmounting: %v", mnterr)
1248                                 }
1249                                 umountExit <- mnterr
1250                         }()
1251
1252                         for again := true; again; {
1253                                 again = false
1254                                 select {
1255                                 case <-umountExit:
1256                                         umount = nil
1257                                         again = true
1258                                 case <-runner.ArvMountExit:
1259                                         break
1260                                 case <-time.After(time.Duration((delay + 1) * int64(time.Second))):
1261                                         runner.CrunchLog.Printf("Timed out waiting for unmount")
1262                                         if umount != nil {
1263                                                 umount.Process.Kill()
1264                                         }
1265                                         runner.ArvMount.Process.Kill()
1266                                 }
1267                         }
1268                 }
1269                 runner.ArvMount = nil
1270         }
1271
1272         if runner.ArvMountPoint != "" {
1273                 if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil {
1274                         runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr)
1275                 }
1276                 runner.ArvMountPoint = ""
1277         }
1278
1279         if rmerr := os.RemoveAll(runner.parentTemp); rmerr != nil {
1280                 runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", runner.parentTemp, rmerr)
1281         }
1282 }
1283
1284 // CommitLogs posts the collection containing the final container logs.
1285 func (runner *ContainerRunner) CommitLogs() error {
1286         func() {
1287                 // Hold cStateLock to prevent races on CrunchLog (e.g., stop()).
1288                 runner.cStateLock.Lock()
1289                 defer runner.cStateLock.Unlock()
1290
1291                 runner.CrunchLog.Print(runner.finalState)
1292
1293                 if runner.arvMountLog != nil {
1294                         runner.arvMountLog.Close()
1295                 }
1296                 runner.CrunchLog.Close()
1297
1298                 // Closing CrunchLog above allows them to be committed to Keep at this
1299                 // point, but re-open crunch log with ArvClient in case there are any
1300                 // other further errors (such as failing to write the log to Keep!)
1301                 // while shutting down
1302                 runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{
1303                         ArvClient:     runner.DispatcherArvClient,
1304                         UUID:          runner.Container.UUID,
1305                         loggingStream: "crunch-run",
1306                         writeCloser:   nil,
1307                 })
1308                 runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
1309         }()
1310
1311         if runner.keepstoreLogger != nil {
1312                 // Flush any buffered logs from our local keepstore
1313                 // process.  Discard anything logged after this point
1314                 // -- it won't end up in the log collection, so
1315                 // there's no point writing it to the collectionfs.
1316                 runner.keepstoreLogbuf.SetWriter(io.Discard)
1317                 runner.keepstoreLogger.Close()
1318                 runner.keepstoreLogger = nil
1319         }
1320
1321         if runner.LogsPDH != nil {
1322                 // If we have already assigned something to LogsPDH,
1323                 // we must be closing the re-opened log, which won't
1324                 // end up getting attached to the container record and
1325                 // therefore doesn't need to be saved as a collection
1326                 // -- it exists only to send logs to other channels.
1327                 return nil
1328         }
1329
1330         saved, err := runner.saveLogCollection(true)
1331         if err != nil {
1332                 return fmt.Errorf("error saving log collection: %s", err)
1333         }
1334         runner.logMtx.Lock()
1335         defer runner.logMtx.Unlock()
1336         runner.LogsPDH = &saved.PortableDataHash
1337         return nil
1338 }
1339
1340 func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.Collection, err error) {
1341         runner.logMtx.Lock()
1342         defer runner.logMtx.Unlock()
1343         if runner.LogsPDH != nil {
1344                 // Already finalized.
1345                 return
1346         }
1347         updates := arvadosclient.Dict{
1348                 "name": "logs for " + runner.Container.UUID,
1349         }
1350         mt, err1 := runner.LogCollection.MarshalManifest(".")
1351         if err1 == nil {
1352                 // Only send updated manifest text if there was no
1353                 // error.
1354                 updates["manifest_text"] = mt
1355         }
1356
1357         // Even if flushing the manifest had an error, we still want
1358         // to update the log record, if possible, to push the trash_at
1359         // and delete_at times into the future.  Details on bug
1360         // #17293.
1361         if final {
1362                 updates["is_trashed"] = true
1363         } else {
1364                 exp := time.Now().Add(crunchLogUpdatePeriod * 24)
1365                 updates["trash_at"] = exp
1366                 updates["delete_at"] = exp
1367         }
1368         reqBody := arvadosclient.Dict{"collection": updates}
1369         var err2 error
1370         if runner.logUUID == "" {
1371                 reqBody["ensure_unique_name"] = true
1372                 err2 = runner.DispatcherArvClient.Create("collections", reqBody, &response)
1373         } else {
1374                 err2 = runner.DispatcherArvClient.Update("collections", runner.logUUID, reqBody, &response)
1375         }
1376         if err2 == nil {
1377                 runner.logUUID = response.UUID
1378         }
1379
1380         if err1 != nil || err2 != nil {
1381                 err = fmt.Errorf("error recording logs: %q, %q", err1, err2)
1382         }
1383         return
1384 }
1385
1386 // UpdateContainerRunning updates the container state to "Running"
1387 func (runner *ContainerRunner) UpdateContainerRunning() error {
1388         runner.cStateLock.Lock()
1389         defer runner.cStateLock.Unlock()
1390         if runner.cCancelled {
1391                 return ErrCancelled
1392         }
1393         return runner.DispatcherArvClient.Update("containers", runner.Container.UUID,
1394                 arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running", "gateway_address": runner.gateway.Address}}, nil)
1395 }
1396
1397 // ContainerToken returns the api_token the container (and any
1398 // arv-mount processes) are allowed to use.
1399 func (runner *ContainerRunner) ContainerToken() (string, error) {
1400         if runner.token != "" {
1401                 return runner.token, nil
1402         }
1403
1404         var auth arvados.APIClientAuthorization
1405         err := runner.DispatcherArvClient.Call("GET", "containers", runner.Container.UUID, "auth", nil, &auth)
1406         if err != nil {
1407                 return "", err
1408         }
1409         runner.token = fmt.Sprintf("v2/%s/%s/%s", auth.UUID, auth.APIToken, runner.Container.UUID)
1410         return runner.token, nil
1411 }
1412
1413 // UpdateContainerFinal updates the container record state on API
1414 // server to "Complete" or "Cancelled"
1415 func (runner *ContainerRunner) UpdateContainerFinal() error {
1416         update := arvadosclient.Dict{}
1417         update["state"] = runner.finalState
1418         if runner.LogsPDH != nil {
1419                 update["log"] = *runner.LogsPDH
1420         }
1421         if runner.finalState == "Complete" {
1422                 if runner.ExitCode != nil {
1423                         update["exit_code"] = *runner.ExitCode
1424                 }
1425                 if runner.OutputPDH != nil {
1426                         update["output"] = *runner.OutputPDH
1427                 }
1428         }
1429         return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
1430 }
1431
1432 // IsCancelled returns the value of Cancelled, with goroutine safety.
1433 func (runner *ContainerRunner) IsCancelled() bool {
1434         runner.cStateLock.Lock()
1435         defer runner.cStateLock.Unlock()
1436         return runner.cCancelled
1437 }
1438
1439 // NewArvLogWriter creates an ArvLogWriter
1440 func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, error) {
1441         writer, err := runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666)
1442         if err != nil {
1443                 return nil, err
1444         }
1445         return &ArvLogWriter{
1446                 ArvClient:     runner.DispatcherArvClient,
1447                 UUID:          runner.Container.UUID,
1448                 loggingStream: name,
1449                 writeCloser:   writer,
1450         }, nil
1451 }
1452
1453 // Run the full container lifecycle.
1454 func (runner *ContainerRunner) Run() (err error) {
1455         runner.CrunchLog.Printf("crunch-run %s started", cmd.Version.String())
1456         runner.CrunchLog.Printf("Executing container '%s' using %s runtime", runner.Container.UUID, runner.executor.Runtime())
1457
1458         hostname, hosterr := os.Hostname()
1459         if hosterr != nil {
1460                 runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr)
1461         } else {
1462                 runner.CrunchLog.Printf("Executing on host '%s'", hostname)
1463         }
1464
1465         runner.finalState = "Queued"
1466
1467         defer func() {
1468                 runner.CleanupDirs()
1469
1470                 runner.CrunchLog.Printf("crunch-run finished")
1471                 runner.CrunchLog.Close()
1472         }()
1473
1474         err = runner.fetchContainerRecord()
1475         if err != nil {
1476                 return
1477         }
1478         if runner.Container.State != "Locked" {
1479                 return fmt.Errorf("dispatch error detected: container %q has state %q", runner.Container.UUID, runner.Container.State)
1480         }
1481
1482         var bindmounts map[string]bindmount
1483         defer func() {
1484                 // checkErr prints e (unless it's nil) and sets err to
1485                 // e (unless err is already non-nil). Thus, if err
1486                 // hasn't already been assigned when Run() returns,
1487                 // this cleanup func will cause Run() to return the
1488                 // first non-nil error that is passed to checkErr().
1489                 checkErr := func(errorIn string, e error) {
1490                         if e == nil {
1491                                 return
1492                         }
1493                         runner.CrunchLog.Printf("error in %s: %v", errorIn, e)
1494                         if err == nil {
1495                                 err = e
1496                         }
1497                         if runner.finalState == "Complete" {
1498                                 // There was an error in the finalization.
1499                                 runner.finalState = "Cancelled"
1500                         }
1501                 }
1502
1503                 // Log the error encountered in Run(), if any
1504                 checkErr("Run", err)
1505
1506                 if runner.finalState == "Queued" {
1507                         runner.UpdateContainerFinal()
1508                         return
1509                 }
1510
1511                 if runner.IsCancelled() {
1512                         runner.finalState = "Cancelled"
1513                         // but don't return yet -- we still want to
1514                         // capture partial output and write logs
1515                 }
1516
1517                 if bindmounts != nil {
1518                         checkErr("CaptureOutput", runner.CaptureOutput(bindmounts))
1519                 }
1520                 checkErr("stopHoststat", runner.stopHoststat())
1521                 checkErr("CommitLogs", runner.CommitLogs())
1522                 runner.CleanupDirs()
1523                 checkErr("UpdateContainerFinal", runner.UpdateContainerFinal())
1524         }()
1525
1526         runner.setupSignals()
1527         err = runner.startHoststat()
1528         if err != nil {
1529                 return
1530         }
1531
1532         // set up FUSE mount and binds
1533         bindmounts, err = runner.SetupMounts()
1534         if err != nil {
1535                 runner.finalState = "Cancelled"
1536                 err = fmt.Errorf("While setting up mounts: %v", err)
1537                 return
1538         }
1539
1540         // check for and/or load image
1541         imageID, err := runner.LoadImage()
1542         if err != nil {
1543                 if !runner.checkBrokenNode(err) {
1544                         // Failed to load image but not due to a "broken node"
1545                         // condition, probably user error.
1546                         runner.finalState = "Cancelled"
1547                 }
1548                 err = fmt.Errorf("While loading container image: %v", err)
1549                 return
1550         }
1551
1552         err = runner.CreateContainer(imageID, bindmounts)
1553         if err != nil {
1554                 return
1555         }
1556         err = runner.LogHostInfo()
1557         if err != nil {
1558                 return
1559         }
1560         err = runner.LogNodeRecord()
1561         if err != nil {
1562                 return
1563         }
1564         err = runner.LogContainerRecord()
1565         if err != nil {
1566                 return
1567         }
1568
1569         if runner.IsCancelled() {
1570                 return
1571         }
1572
1573         err = runner.UpdateContainerRunning()
1574         if err != nil {
1575                 return
1576         }
1577         runner.finalState = "Cancelled"
1578
1579         err = runner.startCrunchstat()
1580         if err != nil {
1581                 return
1582         }
1583
1584         err = runner.StartContainer()
1585         if err != nil {
1586                 runner.checkBrokenNode(err)
1587                 return
1588         }
1589
1590         err = runner.WaitFinish()
1591         if err == nil && !runner.IsCancelled() {
1592                 runner.finalState = "Complete"
1593         }
1594         return
1595 }
1596
1597 // Fetch the current container record (uuid = runner.Container.UUID)
1598 // into runner.Container.
1599 func (runner *ContainerRunner) fetchContainerRecord() error {
1600         reader, err := runner.DispatcherArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
1601         if err != nil {
1602                 return fmt.Errorf("error fetching container record: %v", err)
1603         }
1604         defer reader.Close()
1605
1606         dec := json.NewDecoder(reader)
1607         dec.UseNumber()
1608         err = dec.Decode(&runner.Container)
1609         if err != nil {
1610                 return fmt.Errorf("error decoding container record: %v", err)
1611         }
1612
1613         var sm struct {
1614                 SecretMounts map[string]arvados.Mount `json:"secret_mounts"`
1615         }
1616
1617         containerToken, err := runner.ContainerToken()
1618         if err != nil {
1619                 return fmt.Errorf("error getting container token: %v", err)
1620         }
1621
1622         runner.ContainerArvClient, runner.ContainerKeepClient,
1623                 runner.containerClient, err = runner.MkArvClient(containerToken)
1624         if err != nil {
1625                 return fmt.Errorf("error creating container API client: %v", err)
1626         }
1627
1628         runner.ContainerKeepClient.SetStorageClasses(runner.Container.OutputStorageClasses)
1629         runner.DispatcherKeepClient.SetStorageClasses(runner.Container.OutputStorageClasses)
1630
1631         err = runner.ContainerArvClient.Call("GET", "containers", runner.Container.UUID, "secret_mounts", nil, &sm)
1632         if err != nil {
1633                 if apierr, ok := err.(arvadosclient.APIServerError); !ok || apierr.HttpStatusCode != 404 {
1634                         return fmt.Errorf("error fetching secret_mounts: %v", err)
1635                 }
1636                 // ok && apierr.HttpStatusCode == 404, which means
1637                 // secret_mounts isn't supported by this API server.
1638         }
1639         runner.SecretMounts = sm.SecretMounts
1640
1641         return nil
1642 }
1643
1644 // NewContainerRunner creates a new container runner.
1645 func NewContainerRunner(dispatcherClient *arvados.Client,
1646         dispatcherArvClient IArvadosClient,
1647         dispatcherKeepClient IKeepClient,
1648         containerUUID string) (*ContainerRunner, error) {
1649
1650         cr := &ContainerRunner{
1651                 dispatcherClient:     dispatcherClient,
1652                 DispatcherArvClient:  dispatcherArvClient,
1653                 DispatcherKeepClient: dispatcherKeepClient,
1654         }
1655         cr.NewLogWriter = cr.NewArvLogWriter
1656         cr.RunArvMount = cr.ArvMountCmd
1657         cr.MkTempDir = ioutil.TempDir
1658         cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
1659                 cl, err := arvadosclient.MakeArvadosClient()
1660                 if err != nil {
1661                         return nil, nil, nil, err
1662                 }
1663                 cl.ApiToken = token
1664                 kc, err := keepclient.MakeKeepClient(cl)
1665                 if err != nil {
1666                         return nil, nil, nil, err
1667                 }
1668                 c2 := arvados.NewClientFromEnv()
1669                 c2.AuthToken = token
1670                 return cl, kc, c2, nil
1671         }
1672         var err error
1673         cr.LogCollection, err = (&arvados.Collection{}).FileSystem(cr.dispatcherClient, cr.DispatcherKeepClient)
1674         if err != nil {
1675                 return nil, err
1676         }
1677         cr.Container.UUID = containerUUID
1678         w, err := cr.NewLogWriter("crunch-run")
1679         if err != nil {
1680                 return nil, err
1681         }
1682         cr.CrunchLog = NewThrottledLogger(w)
1683         cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
1684
1685         loadLogThrottleParams(dispatcherArvClient)
1686         go cr.updateLogs()
1687
1688         return cr, nil
1689 }
1690
1691 func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
1692         log := log.New(stderr, "", 0)
1693         flags := flag.NewFlagSet(prog, flag.ContinueOnError)
1694         statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
1695         cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
1696         cgroupParent := flags.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
1697         cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
1698         caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
1699         detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
1700         stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin")
1701         sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
1702         kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
1703         list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
1704         enableMemoryLimit := flags.Bool("enable-memory-limit", true, "tell container runtime to limit container's memory usage")
1705         enableNetwork := flags.String("container-enable-networking", "default", "enable networking \"always\" (for all containers) or \"default\" (for containers that request it)")
1706         networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`)
1707         memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
1708         runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity")
1709         flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
1710
1711         ignoreDetachFlag := false
1712         if len(args) > 0 && args[0] == "-no-detach" {
1713                 // This process was invoked by a parent process, which
1714                 // has passed along its own arguments, including
1715                 // -detach, after the leading -no-detach flag.  Strip
1716                 // the leading -no-detach flag (it's not recognized by
1717                 // flags.Parse()) and ignore the -detach flag that
1718                 // comes later.
1719                 args = args[1:]
1720                 ignoreDetachFlag = true
1721         }
1722
1723         if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok {
1724                 return code
1725         } else if !*list && flags.NArg() != 1 {
1726                 fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
1727                 return 2
1728         }
1729
1730         containerUUID := flags.Arg(0)
1731
1732         switch {
1733         case *detach && !ignoreDetachFlag:
1734                 return Detach(containerUUID, prog, args, os.Stdin, os.Stdout, os.Stderr)
1735         case *kill >= 0:
1736                 return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr)
1737         case *list:
1738                 return ListProcesses(os.Stdout, os.Stderr)
1739         }
1740
1741         if len(containerUUID) != 27 {
1742                 log.Printf("usage: %s [options] UUID", prog)
1743                 return 1
1744         }
1745
1746         var conf ConfigData
1747         if *stdinConfig {
1748                 err := json.NewDecoder(stdin).Decode(&conf)
1749                 if err != nil {
1750                         log.Printf("decode stdin: %s", err)
1751                         return 1
1752                 }
1753                 for k, v := range conf.Env {
1754                         err = os.Setenv(k, v)
1755                         if err != nil {
1756                                 log.Printf("setenv(%q): %s", k, err)
1757                                 return 1
1758                         }
1759                 }
1760                 if conf.Cluster != nil {
1761                         // ClusterID is missing from the JSON
1762                         // representation, but we need it to generate
1763                         // a valid config file for keepstore, so we
1764                         // fill it using the container UUID prefix.
1765                         conf.Cluster.ClusterID = containerUUID[:5]
1766                 }
1767         }
1768
1769         log.Printf("crunch-run %s started", cmd.Version.String())
1770         time.Sleep(*sleep)
1771
1772         if *caCertsPath != "" {
1773                 arvadosclient.CertFiles = []string{*caCertsPath}
1774         }
1775
1776         var keepstoreLogbuf bufThenWrite
1777         keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
1778         if err != nil {
1779                 log.Print(err)
1780                 return 1
1781         }
1782         if keepstore != nil {
1783                 defer keepstore.Process.Kill()
1784         }
1785
1786         api, err := arvadosclient.MakeArvadosClient()
1787         if err != nil {
1788                 log.Printf("%s: %v", containerUUID, err)
1789                 return 1
1790         }
1791         api.Retries = 8
1792
1793         kc, err := keepclient.MakeKeepClient(api)
1794         if err != nil {
1795                 log.Printf("%s: %v", containerUUID, err)
1796                 return 1
1797         }
1798         kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
1799         kc.Retries = 4
1800
1801         cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, containerUUID)
1802         if err != nil {
1803                 log.Print(err)
1804                 return 1
1805         }
1806
1807         if keepstore == nil {
1808                 // Log explanation (if any) for why we're not running
1809                 // a local keepstore.
1810                 var buf bytes.Buffer
1811                 keepstoreLogbuf.SetWriter(&buf)
1812                 if buf.Len() > 0 {
1813                         cr.CrunchLog.Printf("%s", strings.TrimSpace(buf.String()))
1814                 }
1815         } else if logWhat := conf.Cluster.Containers.LocalKeepLogsToContainerLog; logWhat == "none" {
1816                 cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
1817                 keepstoreLogbuf.SetWriter(io.Discard)
1818         } else {
1819                 cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
1820                 logwriter, err := cr.NewLogWriter("keepstore")
1821                 if err != nil {
1822                         log.Print(err)
1823                         return 1
1824                 }
1825                 cr.keepstoreLogger = NewThrottledLogger(logwriter)
1826
1827                 var writer io.WriteCloser = cr.keepstoreLogger
1828                 if logWhat == "errors" {
1829                         writer = &filterKeepstoreErrorsOnly{WriteCloser: writer}
1830                 } else if logWhat != "all" {
1831                         // should have been caught earlier by
1832                         // dispatcher's config loader
1833                         log.Printf("invalid value for Containers.LocalKeepLogsToContainerLog: %q", logWhat)
1834                         return 1
1835                 }
1836                 err = keepstoreLogbuf.SetWriter(writer)
1837                 if err != nil {
1838                         log.Print(err)
1839                         return 1
1840                 }
1841                 cr.keepstoreLogbuf = &keepstoreLogbuf
1842         }
1843
1844         switch *runtimeEngine {
1845         case "docker":
1846                 cr.executor, err = newDockerExecutor(containerUUID, cr.CrunchLog.Printf, cr.containerWatchdogInterval)
1847         case "singularity":
1848                 cr.executor, err = newSingularityExecutor(cr.CrunchLog.Printf)
1849         default:
1850                 cr.CrunchLog.Printf("%s: unsupported RuntimeEngine %q", containerUUID, *runtimeEngine)
1851                 cr.CrunchLog.Close()
1852                 return 1
1853         }
1854         if err != nil {
1855                 cr.CrunchLog.Printf("%s: %v", containerUUID, err)
1856                 cr.checkBrokenNode(err)
1857                 cr.CrunchLog.Close()
1858                 return 1
1859         }
1860         defer cr.executor.Close()
1861
1862         gwAuthSecret := os.Getenv("GatewayAuthSecret")
1863         os.Unsetenv("GatewayAuthSecret")
1864         if gwAuthSecret == "" {
1865                 // not safe to run a gateway service without an auth
1866                 // secret
1867                 cr.CrunchLog.Printf("Not starting a gateway server (GatewayAuthSecret was not provided by dispatcher)")
1868         } else if gwListen := os.Getenv("GatewayAddress"); gwListen == "" {
1869                 // dispatcher did not tell us which external IP
1870                 // address to advertise --> no gateway service
1871                 cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)")
1872         } else if de, ok := cr.executor.(*dockerExecutor); ok {
1873                 cr.gateway = Gateway{
1874                         Address:            gwListen,
1875                         AuthSecret:         gwAuthSecret,
1876                         ContainerUUID:      containerUUID,
1877                         DockerContainerID:  &de.containerID,
1878                         Log:                cr.CrunchLog,
1879                         ContainerIPAddress: dockerContainerIPAddress(&de.containerID),
1880                 }
1881                 err = cr.gateway.Start()
1882                 if err != nil {
1883                         log.Printf("error starting gateway server: %s", err)
1884                         return 1
1885                 }
1886         }
1887
1888         parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerUUID+".")
1889         if tmperr != nil {
1890                 log.Printf("%s: %v", containerUUID, tmperr)
1891                 return 1
1892         }
1893
1894         cr.parentTemp = parentTemp
1895         cr.statInterval = *statInterval
1896         cr.cgroupRoot = *cgroupRoot
1897         cr.expectCgroupParent = *cgroupParent
1898         cr.enableMemoryLimit = *enableMemoryLimit
1899         cr.enableNetwork = *enableNetwork
1900         cr.networkMode = *networkMode
1901         if *cgroupParentSubsystem != "" {
1902                 p := findCgroup(*cgroupParentSubsystem)
1903                 cr.setCgroupParent = p
1904                 cr.expectCgroupParent = p
1905         }
1906
1907         runerr := cr.Run()
1908
1909         if *memprofile != "" {
1910                 f, err := os.Create(*memprofile)
1911                 if err != nil {
1912                         log.Printf("could not create memory profile: %s", err)
1913                 }
1914                 runtime.GC() // get up-to-date statistics
1915                 if err := pprof.WriteHeapProfile(f); err != nil {
1916                         log.Printf("could not write memory profile: %s", err)
1917                 }
1918                 closeerr := f.Close()
1919                 if closeerr != nil {
1920                         log.Printf("closing memprofile file: %s", err)
1921                 }
1922         }
1923
1924         if runerr != nil {
1925                 log.Printf("%s: %v", containerUUID, runerr)
1926                 return 1
1927         }
1928         return 0
1929 }
1930
1931 func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) {
1932         if configData.Cluster == nil || configData.KeepBuffers < 1 {
1933                 return nil, nil
1934         }
1935         for uuid, vol := range configData.Cluster.Volumes {
1936                 if len(vol.AccessViaHosts) > 0 {
1937                         fmt.Fprintf(logbuf, "not starting a local keepstore process because a volume (%s) uses AccessViaHosts\n", uuid)
1938                         return nil, nil
1939                 }
1940                 if !vol.ReadOnly && vol.Replication < configData.Cluster.Collections.DefaultReplication {
1941                         fmt.Fprintf(logbuf, "not starting a local keepstore process because a writable volume (%s) has replication less than Collections.DefaultReplication (%d < %d)\n", uuid, vol.Replication, configData.Cluster.Collections.DefaultReplication)
1942                         return nil, nil
1943                 }
1944         }
1945
1946         // Rather than have an alternate way to tell keepstore how
1947         // many buffers to use when starting it this way, we just
1948         // modify the cluster configuration that we feed it on stdin.
1949         configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers
1950
1951         ln, err := net.Listen("tcp", "localhost:0")
1952         if err != nil {
1953                 return nil, err
1954         }
1955         _, port, err := net.SplitHostPort(ln.Addr().String())
1956         if err != nil {
1957                 ln.Close()
1958                 return nil, err
1959         }
1960         ln.Close()
1961         url := "http://localhost:" + port
1962
1963         fmt.Fprintf(logbuf, "starting keepstore on %s\n", url)
1964
1965         var confJSON bytes.Buffer
1966         err = json.NewEncoder(&confJSON).Encode(arvados.Config{
1967                 Clusters: map[string]arvados.Cluster{
1968                         configData.Cluster.ClusterID: *configData.Cluster,
1969                 },
1970         })
1971         if err != nil {
1972                 return nil, err
1973         }
1974         cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-")
1975         if target, err := os.Readlink(cmd.Path); err == nil && strings.HasSuffix(target, ".test") {
1976                 // If we're a 'go test' process, running
1977                 // /proc/self/exe would start the test suite in a
1978                 // child process, which is not what we want.
1979                 cmd.Path, _ = exec.LookPath("go")
1980                 cmd.Args = append([]string{"go", "run", "../../cmd/arvados-server"}, cmd.Args[1:]...)
1981                 cmd.Env = os.Environ()
1982         }
1983         cmd.Stdin = &confJSON
1984         cmd.Stdout = logbuf
1985         cmd.Stderr = logbuf
1986         cmd.Env = append(cmd.Env,
1987                 "GOGC=10",
1988                 "ARVADOS_SERVICE_INTERNAL_URL="+url)
1989         err = cmd.Start()
1990         if err != nil {
1991                 return nil, fmt.Errorf("error starting keepstore process: %w", err)
1992         }
1993         cmdExited := false
1994         go func() {
1995                 cmd.Wait()
1996                 cmdExited = true
1997         }()
1998         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
1999         defer cancel()
2000         poll := time.NewTicker(time.Second / 10)
2001         defer poll.Stop()
2002         client := http.Client{}
2003         for range poll.C {
2004                 testReq, err := http.NewRequestWithContext(ctx, "GET", url+"/_health/ping", nil)
2005                 testReq.Header.Set("Authorization", "Bearer "+configData.Cluster.ManagementToken)
2006                 if err != nil {
2007                         return nil, err
2008                 }
2009                 resp, err := client.Do(testReq)
2010                 if err == nil {
2011                         resp.Body.Close()
2012                         if resp.StatusCode == http.StatusOK {
2013                                 break
2014                         }
2015                 }
2016                 if cmdExited {
2017                         return nil, fmt.Errorf("keepstore child process exited")
2018                 }
2019                 if ctx.Err() != nil {
2020                         return nil, fmt.Errorf("timed out waiting for new keepstore process to report healthy")
2021                 }
2022         }
2023         os.Setenv("ARVADOS_KEEP_SERVICES", url)
2024         return cmd, nil
2025 }