19886: DRY up test code for better readability
[arvados.git] / lib / crunchrun / crunchrun.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package crunchrun
6
7 import (
8         "bytes"
9         "context"
10         "encoding/json"
11         "errors"
12         "flag"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "log"
17         "net"
18         "net/http"
19         "os"
20         "os/exec"
21         "os/signal"
22         "os/user"
23         "path"
24         "path/filepath"
25         "regexp"
26         "runtime"
27         "runtime/pprof"
28         "sort"
29         "strings"
30         "sync"
31         "syscall"
32         "time"
33
34         "git.arvados.org/arvados.git/lib/cmd"
35         "git.arvados.org/arvados.git/lib/config"
36         "git.arvados.org/arvados.git/lib/crunchstat"
37         "git.arvados.org/arvados.git/sdk/go/arvados"
38         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
39         "git.arvados.org/arvados.git/sdk/go/ctxlog"
40         "git.arvados.org/arvados.git/sdk/go/keepclient"
41         "git.arvados.org/arvados.git/sdk/go/manifest"
42         "golang.org/x/sys/unix"
43 )
44
45 type command struct{}
46
47 var Command = command{}
48
49 // ConfigData contains environment variables and (when needed) cluster
50 // configuration, passed from dispatchcloud to crunch-run on stdin.
51 type ConfigData struct {
52         Env         map[string]string
53         KeepBuffers int
54         Cluster     *arvados.Cluster
55 }
56
57 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
58 type IArvadosClient interface {
59         Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
60         Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
61         Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
62         Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error
63         CallRaw(method string, resourceType string, uuid string, action string, parameters arvadosclient.Dict) (reader io.ReadCloser, err error)
64         Discovery(key string) (interface{}, error)
65 }
66
67 // ErrCancelled is the error returned when the container is cancelled.
68 var ErrCancelled = errors.New("Cancelled")
69
70 // IKeepClient is the minimal Keep API methods used by crunch-run.
71 type IKeepClient interface {
72         BlockWrite(context.Context, arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error)
73         ReadAt(locator string, p []byte, off int) (int, error)
74         ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
75         LocalLocator(locator string) (string, error)
76         ClearBlockCache()
77         SetStorageClasses(sc []string)
78 }
79
80 // NewLogWriter is a factory function to create a new log writer.
81 type NewLogWriter func(name string) (io.WriteCloser, error)
82
83 type RunArvMount func(cmdline []string, tok string) (*exec.Cmd, error)
84
85 type MkTempDir func(string, string) (string, error)
86
87 type PsProcess interface {
88         CmdlineSlice() ([]string, error)
89 }
90
91 // ContainerRunner is the main stateful struct used for a single execution of a
92 // container.
93 type ContainerRunner struct {
94         executor       containerExecutor
95         executorStdin  io.Closer
96         executorStdout io.Closer
97         executorStderr io.Closer
98
99         // Dispatcher client is initialized with the Dispatcher token.
100         // This is a privileged token used to manage container status
101         // and logs.
102         //
103         // We have both dispatcherClient and DispatcherArvClient
104         // because there are two different incompatible Arvados Go
105         // SDKs and we have to use both (hopefully this gets fixed in
106         // #14467)
107         dispatcherClient     *arvados.Client
108         DispatcherArvClient  IArvadosClient
109         DispatcherKeepClient IKeepClient
110
111         // Container client is initialized with the Container token
112         // This token controls the permissions of the container, and
113         // must be used for operations such as reading collections.
114         //
115         // Same comment as above applies to
116         // containerClient/ContainerArvClient.
117         containerClient     *arvados.Client
118         ContainerArvClient  IArvadosClient
119         ContainerKeepClient IKeepClient
120
121         Container     arvados.Container
122         token         string
123         ExitCode      *int
124         NewLogWriter  NewLogWriter
125         CrunchLog     *ThrottledLogger
126         logUUID       string
127         logMtx        sync.Mutex
128         LogCollection arvados.CollectionFileSystem
129         LogsPDH       *string
130         RunArvMount   RunArvMount
131         MkTempDir     MkTempDir
132         ArvMount      *exec.Cmd
133         ArvMountPoint string
134         HostOutputDir string
135         Volumes       map[string]struct{}
136         OutputPDH     *string
137         SigChan       chan os.Signal
138         ArvMountExit  chan error
139         SecretMounts  map[string]arvados.Mount
140         MkArvClient   func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
141         finalState    string
142         parentTemp    string
143         costStartTime time.Time
144
145         keepstore        *exec.Cmd
146         keepstoreLogger  io.WriteCloser
147         keepstoreLogbuf  *bufThenWrite
148         statLogger       io.WriteCloser
149         statReporter     *crunchstat.Reporter
150         hoststatLogger   io.WriteCloser
151         hoststatReporter *crunchstat.Reporter
152         statInterval     time.Duration
153         cgroupRoot       string
154         // What we expect the container's cgroup parent to be.
155         expectCgroupParent string
156         // What we tell docker to use as the container's cgroup
157         // parent. Note: Ideally we would use the same field for both
158         // expectCgroupParent and setCgroupParent, and just make it
159         // default to "docker". However, when using docker < 1.10 with
160         // systemd, specifying a non-empty cgroup parent (even the
161         // default value "docker") hits a docker bug
162         // (https://github.com/docker/docker/issues/17126). Using two
163         // separate fields makes it possible to use the "expect cgroup
164         // parent to be X" feature even on sites where the "specify
165         // cgroup parent" feature breaks.
166         setCgroupParent string
167
168         cStateLock sync.Mutex
169         cCancelled bool // StopContainer() invoked
170
171         enableMemoryLimit bool
172         enableNetwork     string // one of "default" or "always"
173         networkMode       string // "none", "host", or "" -- passed through to executor
174         brokenNodeHook    string // script to run if node appears to be broken
175         arvMountLog       *ThrottledLogger
176
177         containerWatchdogInterval time.Duration
178
179         gateway Gateway
180 }
181
182 // setupSignals sets up signal handling to gracefully terminate the
183 // underlying container and update state when receiving a TERM, INT or
184 // QUIT signal.
185 func (runner *ContainerRunner) setupSignals() {
186         runner.SigChan = make(chan os.Signal, 1)
187         signal.Notify(runner.SigChan, syscall.SIGTERM)
188         signal.Notify(runner.SigChan, syscall.SIGINT)
189         signal.Notify(runner.SigChan, syscall.SIGQUIT)
190
191         go func(sig chan os.Signal) {
192                 for s := range sig {
193                         runner.stop(s)
194                 }
195         }(runner.SigChan)
196 }
197
198 // stop the underlying container.
199 func (runner *ContainerRunner) stop(sig os.Signal) {
200         runner.cStateLock.Lock()
201         defer runner.cStateLock.Unlock()
202         if sig != nil {
203                 runner.CrunchLog.Printf("caught signal: %v", sig)
204         }
205         runner.cCancelled = true
206         runner.CrunchLog.Printf("stopping container")
207         err := runner.executor.Stop()
208         if err != nil {
209                 runner.CrunchLog.Printf("error stopping container: %s", err)
210         }
211 }
212
213 var errorBlacklist = []string{
214         "(?ms).*[Cc]annot connect to the Docker daemon.*",
215         "(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*",
216         "(?ms).*grpc: the connection is unavailable.*",
217 }
218
219 func (runner *ContainerRunner) runBrokenNodeHook() {
220         if runner.brokenNodeHook == "" {
221                 path := filepath.Join(lockdir, brokenfile)
222                 runner.CrunchLog.Printf("Writing %s to mark node as broken", path)
223                 f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700)
224                 if err != nil {
225                         runner.CrunchLog.Printf("Error writing %s: %s", path, err)
226                         return
227                 }
228                 f.Close()
229         } else {
230                 runner.CrunchLog.Printf("Running broken node hook %q", runner.brokenNodeHook)
231                 // run killme script
232                 c := exec.Command(runner.brokenNodeHook)
233                 c.Stdout = runner.CrunchLog
234                 c.Stderr = runner.CrunchLog
235                 err := c.Run()
236                 if err != nil {
237                         runner.CrunchLog.Printf("Error running broken node hook: %v", err)
238                 }
239         }
240 }
241
242 func (runner *ContainerRunner) checkBrokenNode(goterr error) bool {
243         for _, d := range errorBlacklist {
244                 if m, e := regexp.MatchString(d, goterr.Error()); m && e == nil {
245                         runner.CrunchLog.Printf("Error suggests node is unable to run containers: %v", goterr)
246                         runner.runBrokenNodeHook()
247                         return true
248                 }
249         }
250         return false
251 }
252
253 // LoadImage determines the docker image id from the container record and
254 // checks if it is available in the local Docker image store.  If not, it loads
255 // the image from Keep.
256 func (runner *ContainerRunner) LoadImage() (string, error) {
257         runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
258
259         d, err := os.Open(runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage)
260         if err != nil {
261                 return "", err
262         }
263         defer d.Close()
264         allfiles, err := d.Readdirnames(-1)
265         if err != nil {
266                 return "", err
267         }
268         var tarfiles []string
269         for _, fnm := range allfiles {
270                 if strings.HasSuffix(fnm, ".tar") {
271                         tarfiles = append(tarfiles, fnm)
272                 }
273         }
274         if len(tarfiles) == 0 {
275                 return "", fmt.Errorf("image collection does not include a .tar image file")
276         }
277         if len(tarfiles) > 1 {
278                 return "", fmt.Errorf("cannot choose from multiple tar files in image collection: %v", tarfiles)
279         }
280         imageID := tarfiles[0][:len(tarfiles[0])-4]
281         imageTarballPath := runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage + "/" + imageID + ".tar"
282         runner.CrunchLog.Printf("Using Docker image id %q", imageID)
283
284         runner.CrunchLog.Print("Loading Docker image from keep")
285         err = runner.executor.LoadImage(imageID, imageTarballPath, runner.Container, runner.ArvMountPoint,
286                 runner.containerClient)
287         if err != nil {
288                 return "", err
289         }
290
291         return imageID, nil
292 }
293
294 func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *exec.Cmd, err error) {
295         c = exec.Command(cmdline[0], cmdline[1:]...)
296
297         // Copy our environment, but override ARVADOS_API_TOKEN with
298         // the container auth token.
299         c.Env = nil
300         for _, s := range os.Environ() {
301                 if !strings.HasPrefix(s, "ARVADOS_API_TOKEN=") {
302                         c.Env = append(c.Env, s)
303                 }
304         }
305         c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
306
307         w, err := runner.NewLogWriter("arv-mount")
308         if err != nil {
309                 return nil, err
310         }
311         runner.arvMountLog = NewThrottledLogger(w)
312         scanner := logScanner{
313                 Patterns: []string{
314                         "Keep write error",
315                         "Block not found error",
316                         "Unhandled exception during FUSE operation",
317                 },
318                 ReportFunc: runner.reportArvMountWarning,
319         }
320         c.Stdout = runner.arvMountLog
321         c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr, &scanner)
322
323         runner.CrunchLog.Printf("Running %v", c.Args)
324
325         err = c.Start()
326         if err != nil {
327                 return nil, err
328         }
329
330         statReadme := make(chan bool)
331         runner.ArvMountExit = make(chan error)
332
333         keepStatting := true
334         go func() {
335                 for keepStatting {
336                         time.Sleep(100 * time.Millisecond)
337                         _, err = os.Stat(fmt.Sprintf("%s/by_id/README", runner.ArvMountPoint))
338                         if err == nil {
339                                 keepStatting = false
340                                 statReadme <- true
341                         }
342                 }
343                 close(statReadme)
344         }()
345
346         go func() {
347                 mnterr := c.Wait()
348                 if mnterr != nil {
349                         runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
350                 }
351                 runner.ArvMountExit <- mnterr
352                 close(runner.ArvMountExit)
353         }()
354
355         select {
356         case <-statReadme:
357                 break
358         case err := <-runner.ArvMountExit:
359                 runner.ArvMount = nil
360                 keepStatting = false
361                 return nil, err
362         }
363
364         return c, nil
365 }
366
367 func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) {
368         if runner.ArvMountPoint == "" {
369                 runner.ArvMountPoint, err = runner.MkTempDir(runner.parentTemp, prefix)
370         }
371         return
372 }
373
374 func copyfile(src string, dst string) (err error) {
375         srcfile, err := os.Open(src)
376         if err != nil {
377                 return
378         }
379
380         os.MkdirAll(path.Dir(dst), 0777)
381
382         dstfile, err := os.Create(dst)
383         if err != nil {
384                 return
385         }
386         _, err = io.Copy(dstfile, srcfile)
387         if err != nil {
388                 return
389         }
390
391         err = srcfile.Close()
392         err2 := dstfile.Close()
393
394         if err != nil {
395                 return
396         }
397
398         if err2 != nil {
399                 return err2
400         }
401
402         return nil
403 }
404
405 func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
406         bindmounts := map[string]bindmount{}
407         err := runner.SetupArvMountPoint("keep")
408         if err != nil {
409                 return nil, fmt.Errorf("While creating keep mount temp dir: %v", err)
410         }
411
412         token, err := runner.ContainerToken()
413         if err != nil {
414                 return nil, fmt.Errorf("could not get container token: %s", err)
415         }
416         runner.CrunchLog.Printf("container token %q", token)
417
418         pdhOnly := true
419         tmpcount := 0
420         arvMountCmd := []string{
421                 "arv-mount",
422                 "--foreground",
423                 "--read-write",
424                 "--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","),
425                 fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
426
427         if _, isdocker := runner.executor.(*dockerExecutor); isdocker {
428                 arvMountCmd = append(arvMountCmd, "--allow-other")
429         }
430
431         if runner.Container.RuntimeConstraints.KeepCacheDisk > 0 {
432                 keepcachedir, err := runner.MkTempDir(runner.parentTemp, "keepcache")
433                 if err != nil {
434                         return nil, fmt.Errorf("while creating keep cache temp dir: %v", err)
435                 }
436                 arvMountCmd = append(arvMountCmd, "--disk-cache", "--disk-cache-dir", keepcachedir, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheDisk))
437         } else if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
438                 arvMountCmd = append(arvMountCmd, "--ram-cache", "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
439         }
440
441         collectionPaths := []string{}
442         needCertMount := true
443         type copyFile struct {
444                 src  string
445                 bind string
446         }
447         var copyFiles []copyFile
448
449         var binds []string
450         for bind := range runner.Container.Mounts {
451                 binds = append(binds, bind)
452         }
453         for bind := range runner.SecretMounts {
454                 if _, ok := runner.Container.Mounts[bind]; ok {
455                         return nil, fmt.Errorf("secret mount %q conflicts with regular mount", bind)
456                 }
457                 if runner.SecretMounts[bind].Kind != "json" &&
458                         runner.SecretMounts[bind].Kind != "text" {
459                         return nil, fmt.Errorf("secret mount %q type is %q but only 'json' and 'text' are permitted",
460                                 bind, runner.SecretMounts[bind].Kind)
461                 }
462                 binds = append(binds, bind)
463         }
464         sort.Strings(binds)
465
466         for _, bind := range binds {
467                 mnt, notSecret := runner.Container.Mounts[bind]
468                 if !notSecret {
469                         mnt = runner.SecretMounts[bind]
470                 }
471                 if bind == "stdout" || bind == "stderr" {
472                         // Is it a "file" mount kind?
473                         if mnt.Kind != "file" {
474                                 return nil, fmt.Errorf("unsupported mount kind '%s' for %s: only 'file' is supported", mnt.Kind, bind)
475                         }
476
477                         // Does path start with OutputPath?
478                         prefix := runner.Container.OutputPath
479                         if !strings.HasSuffix(prefix, "/") {
480                                 prefix += "/"
481                         }
482                         if !strings.HasPrefix(mnt.Path, prefix) {
483                                 return nil, fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix)
484                         }
485                 }
486
487                 if bind == "stdin" {
488                         // Is it a "collection" mount kind?
489                         if mnt.Kind != "collection" && mnt.Kind != "json" {
490                                 return nil, fmt.Errorf("unsupported mount kind '%s' for stdin: only 'collection' and 'json' are supported", mnt.Kind)
491                         }
492                 }
493
494                 if bind == "/etc/arvados/ca-certificates.crt" {
495                         needCertMount = false
496                 }
497
498                 if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && bind != runner.Container.OutputPath+"/" {
499                         if mnt.Kind != "collection" && mnt.Kind != "text" && mnt.Kind != "json" {
500                                 return nil, fmt.Errorf("only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path for %q, was %q", bind, mnt.Kind)
501                         }
502                 }
503
504                 switch {
505                 case mnt.Kind == "collection" && bind != "stdin":
506                         var src string
507                         if mnt.UUID != "" && mnt.PortableDataHash != "" {
508                                 return nil, fmt.Errorf("cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
509                         }
510                         if mnt.UUID != "" {
511                                 if mnt.Writable {
512                                         return nil, fmt.Errorf("writing to existing collections currently not permitted")
513                                 }
514                                 pdhOnly = false
515                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
516                         } else if mnt.PortableDataHash != "" {
517                                 if mnt.Writable && !strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
518                                         return nil, fmt.Errorf("can never write to a collection specified by portable data hash")
519                                 }
520                                 idx := strings.Index(mnt.PortableDataHash, "/")
521                                 if idx > 0 {
522                                         mnt.Path = path.Clean(mnt.PortableDataHash[idx:])
523                                         mnt.PortableDataHash = mnt.PortableDataHash[0:idx]
524                                         runner.Container.Mounts[bind] = mnt
525                                 }
526                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
527                                 if mnt.Path != "" && mnt.Path != "." {
528                                         if strings.HasPrefix(mnt.Path, "./") {
529                                                 mnt.Path = mnt.Path[2:]
530                                         } else if strings.HasPrefix(mnt.Path, "/") {
531                                                 mnt.Path = mnt.Path[1:]
532                                         }
533                                         src += "/" + mnt.Path
534                                 }
535                         } else {
536                                 src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
537                                 arvMountCmd = append(arvMountCmd, "--mount-tmp", fmt.Sprintf("tmp%d", tmpcount))
538                                 tmpcount++
539                         }
540                         if mnt.Writable {
541                                 if bind == runner.Container.OutputPath {
542                                         runner.HostOutputDir = src
543                                         bindmounts[bind] = bindmount{HostPath: src}
544                                 } else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
545                                         copyFiles = append(copyFiles, copyFile{src, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
546                                 } else {
547                                         bindmounts[bind] = bindmount{HostPath: src}
548                                 }
549                         } else {
550                                 bindmounts[bind] = bindmount{HostPath: src, ReadOnly: true}
551                         }
552                         collectionPaths = append(collectionPaths, src)
553
554                 case mnt.Kind == "tmp":
555                         var tmpdir string
556                         tmpdir, err = runner.MkTempDir(runner.parentTemp, "tmp")
557                         if err != nil {
558                                 return nil, fmt.Errorf("while creating mount temp dir: %v", err)
559                         }
560                         st, staterr := os.Stat(tmpdir)
561                         if staterr != nil {
562                                 return nil, fmt.Errorf("while Stat on temp dir: %v", staterr)
563                         }
564                         err = os.Chmod(tmpdir, st.Mode()|os.ModeSetgid|0777)
565                         if staterr != nil {
566                                 return nil, fmt.Errorf("while Chmod temp dir: %v", err)
567                         }
568                         bindmounts[bind] = bindmount{HostPath: tmpdir}
569                         if bind == runner.Container.OutputPath {
570                                 runner.HostOutputDir = tmpdir
571                         }
572
573                 case mnt.Kind == "json" || mnt.Kind == "text":
574                         var filedata []byte
575                         if mnt.Kind == "json" {
576                                 filedata, err = json.Marshal(mnt.Content)
577                                 if err != nil {
578                                         return nil, fmt.Errorf("encoding json data: %v", err)
579                                 }
580                         } else {
581                                 text, ok := mnt.Content.(string)
582                                 if !ok {
583                                         return nil, fmt.Errorf("content for mount %q must be a string", bind)
584                                 }
585                                 filedata = []byte(text)
586                         }
587
588                         tmpdir, err := runner.MkTempDir(runner.parentTemp, mnt.Kind)
589                         if err != nil {
590                                 return nil, fmt.Errorf("creating temp dir: %v", err)
591                         }
592                         tmpfn := filepath.Join(tmpdir, "mountdata."+mnt.Kind)
593                         err = ioutil.WriteFile(tmpfn, filedata, 0444)
594                         if err != nil {
595                                 return nil, fmt.Errorf("writing temp file: %v", err)
596                         }
597                         if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && (notSecret || runner.Container.Mounts[runner.Container.OutputPath].Kind != "collection") {
598                                 // In most cases, if the container
599                                 // specifies a literal file inside the
600                                 // output path, we copy it into the
601                                 // output directory (either a mounted
602                                 // collection or a staging area on the
603                                 // host fs). If it's a secret, it will
604                                 // be skipped when copying output from
605                                 // staging to Keep later.
606                                 copyFiles = append(copyFiles, copyFile{tmpfn, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
607                         } else {
608                                 // If a secret is outside OutputPath,
609                                 // we bind mount the secret file
610                                 // directly just like other mounts. We
611                                 // also use this strategy when a
612                                 // secret is inside OutputPath but
613                                 // OutputPath is a live collection, to
614                                 // avoid writing the secret to
615                                 // Keep. Attempting to remove a
616                                 // bind-mounted secret file from
617                                 // inside the container will return a
618                                 // "Device or resource busy" error
619                                 // that might not be handled well by
620                                 // the container, which is why we
621                                 // don't use this strategy when
622                                 // OutputPath is a staging directory.
623                                 bindmounts[bind] = bindmount{HostPath: tmpfn, ReadOnly: true}
624                         }
625
626                 case mnt.Kind == "git_tree":
627                         tmpdir, err := runner.MkTempDir(runner.parentTemp, "git_tree")
628                         if err != nil {
629                                 return nil, fmt.Errorf("creating temp dir: %v", err)
630                         }
631                         err = gitMount(mnt).extractTree(runner.ContainerArvClient, tmpdir, token)
632                         if err != nil {
633                                 return nil, err
634                         }
635                         bindmounts[bind] = bindmount{HostPath: tmpdir, ReadOnly: true}
636                 }
637         }
638
639         if runner.HostOutputDir == "" {
640                 return nil, fmt.Errorf("output path does not correspond to a writable mount point")
641         }
642
643         if needCertMount && runner.Container.RuntimeConstraints.API {
644                 for _, certfile := range arvadosclient.CertFiles {
645                         _, err := os.Stat(certfile)
646                         if err == nil {
647                                 bindmounts["/etc/arvados/ca-certificates.crt"] = bindmount{HostPath: certfile, ReadOnly: true}
648                                 break
649                         }
650                 }
651         }
652
653         if pdhOnly {
654                 // If we are only mounting collections by pdh, make
655                 // sure we don't subscribe to websocket events to
656                 // avoid putting undesired load on the API server
657                 arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id", "--disable-event-listening")
658         } else {
659                 arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
660         }
661         // the by_uuid mount point is used by singularity when writing
662         // out docker images converted to SIF
663         arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_uuid")
664         arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
665
666         runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
667         if err != nil {
668                 return nil, fmt.Errorf("while trying to start arv-mount: %v", err)
669         }
670         if runner.hoststatReporter != nil && runner.ArvMount != nil {
671                 runner.hoststatReporter.ReportPID("arv-mount", runner.ArvMount.Process.Pid)
672         }
673
674         for _, p := range collectionPaths {
675                 _, err = os.Stat(p)
676                 if err != nil {
677                         return nil, fmt.Errorf("while checking that input files exist: %v", err)
678                 }
679         }
680
681         for _, cp := range copyFiles {
682                 st, err := os.Stat(cp.src)
683                 if err != nil {
684                         return nil, fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err)
685                 }
686                 if st.IsDir() {
687                         err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
688                                 if walkerr != nil {
689                                         return walkerr
690                                 }
691                                 target := path.Join(cp.bind, walkpath[len(cp.src):])
692                                 if walkinfo.Mode().IsRegular() {
693                                         copyerr := copyfile(walkpath, target)
694                                         if copyerr != nil {
695                                                 return copyerr
696                                         }
697                                         return os.Chmod(target, walkinfo.Mode()|0777)
698                                 } else if walkinfo.Mode().IsDir() {
699                                         mkerr := os.MkdirAll(target, 0777)
700                                         if mkerr != nil {
701                                                 return mkerr
702                                         }
703                                         return os.Chmod(target, walkinfo.Mode()|os.ModeSetgid|0777)
704                                 } else {
705                                         return fmt.Errorf("source %q is not a regular file or directory", cp.src)
706                                 }
707                         })
708                 } else if st.Mode().IsRegular() {
709                         err = copyfile(cp.src, cp.bind)
710                         if err == nil {
711                                 err = os.Chmod(cp.bind, st.Mode()|0777)
712                         }
713                 }
714                 if err != nil {
715                         return nil, fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err)
716                 }
717         }
718
719         return bindmounts, nil
720 }
721
722 func (runner *ContainerRunner) stopHoststat() error {
723         if runner.hoststatReporter == nil {
724                 return nil
725         }
726         runner.hoststatReporter.Stop()
727         err := runner.hoststatLogger.Close()
728         if err != nil {
729                 return fmt.Errorf("error closing hoststat logs: %v", err)
730         }
731         return nil
732 }
733
734 func (runner *ContainerRunner) startHoststat() error {
735         w, err := runner.NewLogWriter("hoststat")
736         if err != nil {
737                 return err
738         }
739         runner.hoststatLogger = NewThrottledLogger(w)
740         runner.hoststatReporter = &crunchstat.Reporter{
741                 Logger:     log.New(runner.hoststatLogger, "", 0),
742                 CgroupRoot: runner.cgroupRoot,
743                 PollPeriod: runner.statInterval,
744         }
745         runner.hoststatReporter.Start()
746         runner.hoststatReporter.ReportPID("crunch-run", os.Getpid())
747         return nil
748 }
749
750 func (runner *ContainerRunner) startCrunchstat() error {
751         w, err := runner.NewLogWriter("crunchstat")
752         if err != nil {
753                 return err
754         }
755         runner.statLogger = NewThrottledLogger(w)
756         runner.statReporter = &crunchstat.Reporter{
757                 CID:          runner.executor.CgroupID(),
758                 Logger:       log.New(runner.statLogger, "", 0),
759                 CgroupParent: runner.expectCgroupParent,
760                 CgroupRoot:   runner.cgroupRoot,
761                 PollPeriod:   runner.statInterval,
762                 TempDir:      runner.parentTemp,
763         }
764         runner.statReporter.Start()
765         return nil
766 }
767
768 type infoCommand struct {
769         label string
770         cmd   []string
771 }
772
773 // LogHostInfo logs info about the current host, for debugging and
774 // accounting purposes. Although it's logged as "node-info", this is
775 // about the environment where crunch-run is actually running, which
776 // might differ from what's described in the node record (see
777 // LogNodeRecord).
778 func (runner *ContainerRunner) LogHostInfo() (err error) {
779         w, err := runner.NewLogWriter("node-info")
780         if err != nil {
781                 return
782         }
783
784         commands := []infoCommand{
785                 {
786                         label: "Host Information",
787                         cmd:   []string{"uname", "-a"},
788                 },
789                 {
790                         label: "CPU Information",
791                         cmd:   []string{"cat", "/proc/cpuinfo"},
792                 },
793                 {
794                         label: "Memory Information",
795                         cmd:   []string{"cat", "/proc/meminfo"},
796                 },
797                 {
798                         label: "Disk Space",
799                         cmd:   []string{"df", "-m", "/", os.TempDir()},
800                 },
801                 {
802                         label: "Disk INodes",
803                         cmd:   []string{"df", "-i", "/", os.TempDir()},
804                 },
805         }
806
807         // Run commands with informational output to be logged.
808         for _, command := range commands {
809                 fmt.Fprintln(w, command.label)
810                 cmd := exec.Command(command.cmd[0], command.cmd[1:]...)
811                 cmd.Stdout = w
812                 cmd.Stderr = w
813                 if err := cmd.Run(); err != nil {
814                         err = fmt.Errorf("While running command %q: %v", command.cmd, err)
815                         fmt.Fprintln(w, err)
816                         return err
817                 }
818                 fmt.Fprintln(w, "")
819         }
820
821         err = w.Close()
822         if err != nil {
823                 return fmt.Errorf("While closing node-info logs: %v", err)
824         }
825         return nil
826 }
827
828 // LogContainerRecord gets and saves the raw JSON container record from the API server
829 func (runner *ContainerRunner) LogContainerRecord() error {
830         logged, err := runner.logAPIResponse("container", "containers", map[string]interface{}{"filters": [][]string{{"uuid", "=", runner.Container.UUID}}}, nil)
831         if !logged && err == nil {
832                 err = fmt.Errorf("error: no container record found for %s", runner.Container.UUID)
833         }
834         return err
835 }
836
837 // LogNodeRecord logs the current host's InstanceType config entry (or
838 // the arvados#node record, if running via crunch-dispatch-slurm).
839 func (runner *ContainerRunner) LogNodeRecord() error {
840         if it := os.Getenv("InstanceType"); it != "" {
841                 // Dispatched via arvados-dispatch-cloud. Save
842                 // InstanceType config fragment received from
843                 // dispatcher on stdin.
844                 w, err := runner.LogCollection.OpenFile("node.json", os.O_CREATE|os.O_WRONLY, 0666)
845                 if err != nil {
846                         return err
847                 }
848                 defer w.Close()
849                 _, err = io.WriteString(w, it)
850                 if err != nil {
851                         return err
852                 }
853                 return w.Close()
854         }
855         // Dispatched via crunch-dispatch-slurm. Look up
856         // apiserver's node record corresponding to
857         // $SLURMD_NODENAME.
858         hostname := os.Getenv("SLURMD_NODENAME")
859         if hostname == "" {
860                 hostname, _ = os.Hostname()
861         }
862         _, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) {
863                 // The "info" field has admin-only info when
864                 // obtained with a privileged token, and
865                 // should not be logged.
866                 node, ok := resp.(map[string]interface{})
867                 if ok {
868                         delete(node, "info")
869                 }
870         })
871         return err
872 }
873
874 func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}, munge func(interface{})) (logged bool, err error) {
875         writer, err := runner.LogCollection.OpenFile(label+".json", os.O_CREATE|os.O_WRONLY, 0666)
876         if err != nil {
877                 return false, err
878         }
879         w := &ArvLogWriter{
880                 ArvClient:     runner.DispatcherArvClient,
881                 UUID:          runner.Container.UUID,
882                 loggingStream: label,
883                 writeCloser:   writer,
884         }
885
886         reader, err := runner.DispatcherArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params))
887         if err != nil {
888                 return false, fmt.Errorf("error getting %s record: %v", label, err)
889         }
890         defer reader.Close()
891
892         dec := json.NewDecoder(reader)
893         dec.UseNumber()
894         var resp map[string]interface{}
895         if err = dec.Decode(&resp); err != nil {
896                 return false, fmt.Errorf("error decoding %s list response: %v", label, err)
897         }
898         items, ok := resp["items"].([]interface{})
899         if !ok {
900                 return false, fmt.Errorf("error decoding %s list response: no \"items\" key in API list response", label)
901         } else if len(items) < 1 {
902                 return false, nil
903         }
904         if munge != nil {
905                 munge(items[0])
906         }
907         // Re-encode it using indentation to improve readability
908         enc := json.NewEncoder(w)
909         enc.SetIndent("", "    ")
910         if err = enc.Encode(items[0]); err != nil {
911                 return false, fmt.Errorf("error logging %s record: %v", label, err)
912         }
913         err = w.Close()
914         if err != nil {
915                 return false, fmt.Errorf("error closing %s.json in log collection: %v", label, err)
916         }
917         return true, nil
918 }
919
920 func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
921         stdoutPath := mntPath[len(runner.Container.OutputPath):]
922         index := strings.LastIndex(stdoutPath, "/")
923         if index > 0 {
924                 subdirs := stdoutPath[:index]
925                 if subdirs != "" {
926                         st, err := os.Stat(runner.HostOutputDir)
927                         if err != nil {
928                                 return nil, fmt.Errorf("While Stat on temp dir: %v", err)
929                         }
930                         stdoutPath := filepath.Join(runner.HostOutputDir, subdirs)
931                         err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
932                         if err != nil {
933                                 return nil, fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
934                         }
935                 }
936         }
937         stdoutFile, err := os.Create(filepath.Join(runner.HostOutputDir, stdoutPath))
938         if err != nil {
939                 return nil, fmt.Errorf("While creating file %q: %v", stdoutPath, err)
940         }
941
942         return stdoutFile, nil
943 }
944
945 // CreateContainer creates the docker container.
946 func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error {
947         var stdin io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
948         if mnt, ok := runner.Container.Mounts["stdin"]; ok {
949                 switch mnt.Kind {
950                 case "collection":
951                         var collID string
952                         if mnt.UUID != "" {
953                                 collID = mnt.UUID
954                         } else {
955                                 collID = mnt.PortableDataHash
956                         }
957                         path := runner.ArvMountPoint + "/by_id/" + collID + "/" + mnt.Path
958                         f, err := os.Open(path)
959                         if err != nil {
960                                 return err
961                         }
962                         stdin = f
963                 case "json":
964                         j, err := json.Marshal(mnt.Content)
965                         if err != nil {
966                                 return fmt.Errorf("error encoding stdin json data: %v", err)
967                         }
968                         stdin = ioutil.NopCloser(bytes.NewReader(j))
969                 default:
970                         return fmt.Errorf("stdin mount has unsupported kind %q", mnt.Kind)
971                 }
972         }
973
974         var stdout, stderr io.WriteCloser
975         if mnt, ok := runner.Container.Mounts["stdout"]; ok {
976                 f, err := runner.getStdoutFile(mnt.Path)
977                 if err != nil {
978                         return err
979                 }
980                 stdout = f
981         } else if w, err := runner.NewLogWriter("stdout"); err != nil {
982                 return err
983         } else {
984                 stdout = NewThrottledLogger(w)
985         }
986
987         if mnt, ok := runner.Container.Mounts["stderr"]; ok {
988                 f, err := runner.getStdoutFile(mnt.Path)
989                 if err != nil {
990                         return err
991                 }
992                 stderr = f
993         } else if w, err := runner.NewLogWriter("stderr"); err != nil {
994                 return err
995         } else {
996                 stderr = NewThrottledLogger(w)
997         }
998
999         env := runner.Container.Environment
1000         enableNetwork := runner.enableNetwork == "always"
1001         if runner.Container.RuntimeConstraints.API {
1002                 enableNetwork = true
1003                 tok, err := runner.ContainerToken()
1004                 if err != nil {
1005                         return err
1006                 }
1007                 env = map[string]string{}
1008                 for k, v := range runner.Container.Environment {
1009                         env[k] = v
1010                 }
1011                 env["ARVADOS_API_TOKEN"] = tok
1012                 env["ARVADOS_API_HOST"] = os.Getenv("ARVADOS_API_HOST")
1013                 env["ARVADOS_API_HOST_INSECURE"] = os.Getenv("ARVADOS_API_HOST_INSECURE")
1014                 env["ARVADOS_KEEP_SERVICES"] = os.Getenv("ARVADOS_KEEP_SERVICES")
1015         }
1016         workdir := runner.Container.Cwd
1017         if workdir == "." {
1018                 // both "" and "." mean default
1019                 workdir = ""
1020         }
1021         ram := runner.Container.RuntimeConstraints.RAM
1022         if !runner.enableMemoryLimit {
1023                 ram = 0
1024         }
1025         runner.executorStdin = stdin
1026         runner.executorStdout = stdout
1027         runner.executorStderr = stderr
1028
1029         if runner.Container.RuntimeConstraints.CUDA.DeviceCount > 0 {
1030                 nvidiaModprobe(runner.CrunchLog)
1031         }
1032
1033         return runner.executor.Create(containerSpec{
1034                 Image:           imageID,
1035                 VCPUs:           runner.Container.RuntimeConstraints.VCPUs,
1036                 RAM:             ram,
1037                 WorkingDir:      workdir,
1038                 Env:             env,
1039                 BindMounts:      bindmounts,
1040                 Command:         runner.Container.Command,
1041                 EnableNetwork:   enableNetwork,
1042                 CUDADeviceCount: runner.Container.RuntimeConstraints.CUDA.DeviceCount,
1043                 NetworkMode:     runner.networkMode,
1044                 CgroupParent:    runner.setCgroupParent,
1045                 Stdin:           stdin,
1046                 Stdout:          stdout,
1047                 Stderr:          stderr,
1048         })
1049 }
1050
1051 // StartContainer starts the docker container created by CreateContainer.
1052 func (runner *ContainerRunner) StartContainer() error {
1053         runner.CrunchLog.Printf("Starting container")
1054         runner.cStateLock.Lock()
1055         defer runner.cStateLock.Unlock()
1056         if runner.cCancelled {
1057                 return ErrCancelled
1058         }
1059         err := runner.executor.Start()
1060         if err != nil {
1061                 var advice string
1062                 if m, e := regexp.MatchString("(?ms).*(exec|System error).*(no such file or directory|file not found).*", err.Error()); m && e == nil {
1063                         advice = fmt.Sprintf("\nPossible causes: command %q is missing, the interpreter given in #! is missing, or script has Windows line endings.", runner.Container.Command[0])
1064                 }
1065                 return fmt.Errorf("could not start container: %v%s", err, advice)
1066         }
1067         return nil
1068 }
1069
1070 // WaitFinish waits for the container to terminate, capture the exit code, and
1071 // close the stdout/stderr logging.
1072 func (runner *ContainerRunner) WaitFinish() error {
1073         runner.CrunchLog.Print("Waiting for container to finish")
1074         var timeout <-chan time.Time
1075         if s := runner.Container.SchedulingParameters.MaxRunTime; s > 0 {
1076                 timeout = time.After(time.Duration(s) * time.Second)
1077         }
1078         ctx, cancel := context.WithCancel(context.Background())
1079         defer cancel()
1080         go func() {
1081                 select {
1082                 case <-timeout:
1083                         runner.CrunchLog.Printf("maximum run time exceeded. Stopping container.")
1084                         runner.stop(nil)
1085                 case <-runner.ArvMountExit:
1086                         runner.CrunchLog.Printf("arv-mount exited while container is still running. Stopping container.")
1087                         runner.stop(nil)
1088                 case <-ctx.Done():
1089                 }
1090         }()
1091         exitcode, err := runner.executor.Wait(ctx)
1092         if err != nil {
1093                 runner.checkBrokenNode(err)
1094                 return err
1095         }
1096         runner.ExitCode = &exitcode
1097
1098         extra := ""
1099         if exitcode&0x80 != 0 {
1100                 // Convert raw exit status (0x80 + signal number) to a
1101                 // string to log after the code, like " (signal 101)"
1102                 // or " (signal 9, killed)"
1103                 sig := syscall.WaitStatus(exitcode).Signal()
1104                 if name := unix.SignalName(sig); name != "" {
1105                         extra = fmt.Sprintf(" (signal %d, %s)", sig, name)
1106                 } else {
1107                         extra = fmt.Sprintf(" (signal %d)", sig)
1108                 }
1109         }
1110         runner.CrunchLog.Printf("Container exited with status code %d%s", exitcode, extra)
1111         err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
1112                 "container": arvadosclient.Dict{"exit_code": exitcode},
1113         }, nil)
1114         if err != nil {
1115                 runner.CrunchLog.Printf("ignoring error updating exit_code: %s", err)
1116         }
1117
1118         var returnErr error
1119         if err = runner.executorStdin.Close(); err != nil {
1120                 err = fmt.Errorf("error closing container stdin: %s", err)
1121                 runner.CrunchLog.Printf("%s", err)
1122                 returnErr = err
1123         }
1124         if err = runner.executorStdout.Close(); err != nil {
1125                 err = fmt.Errorf("error closing container stdout: %s", err)
1126                 runner.CrunchLog.Printf("%s", err)
1127                 if returnErr == nil {
1128                         returnErr = err
1129                 }
1130         }
1131         if err = runner.executorStderr.Close(); err != nil {
1132                 err = fmt.Errorf("error closing container stderr: %s", err)
1133                 runner.CrunchLog.Printf("%s", err)
1134                 if returnErr == nil {
1135                         returnErr = err
1136                 }
1137         }
1138
1139         if runner.statReporter != nil {
1140                 runner.statReporter.Stop()
1141                 err = runner.statLogger.Close()
1142                 if err != nil {
1143                         runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
1144                 }
1145         }
1146         return returnErr
1147 }
1148
1149 func (runner *ContainerRunner) updateLogs() {
1150         ticker := time.NewTicker(crunchLogUpdatePeriod / 360)
1151         defer ticker.Stop()
1152
1153         sigusr1 := make(chan os.Signal, 1)
1154         signal.Notify(sigusr1, syscall.SIGUSR1)
1155         defer signal.Stop(sigusr1)
1156
1157         saveAtTime := time.Now().Add(crunchLogUpdatePeriod)
1158         saveAtSize := crunchLogUpdateSize
1159         var savedSize int64
1160         for {
1161                 select {
1162                 case <-ticker.C:
1163                 case <-sigusr1:
1164                         saveAtTime = time.Now()
1165                 }
1166                 runner.logMtx.Lock()
1167                 done := runner.LogsPDH != nil
1168                 runner.logMtx.Unlock()
1169                 if done {
1170                         return
1171                 }
1172                 size := runner.LogCollection.Size()
1173                 if size == savedSize || (time.Now().Before(saveAtTime) && size < saveAtSize) {
1174                         continue
1175                 }
1176                 saveAtTime = time.Now().Add(crunchLogUpdatePeriod)
1177                 saveAtSize = runner.LogCollection.Size() + crunchLogUpdateSize
1178                 saved, err := runner.saveLogCollection(false)
1179                 if err != nil {
1180                         runner.CrunchLog.Printf("error updating log collection: %s", err)
1181                         continue
1182                 }
1183
1184                 err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
1185                         "container": arvadosclient.Dict{"log": saved.PortableDataHash},
1186                 }, nil)
1187                 if err != nil {
1188                         runner.CrunchLog.Printf("error updating container log to %s: %s", saved.PortableDataHash, err)
1189                         continue
1190                 }
1191
1192                 savedSize = size
1193         }
1194 }
1195
1196 func (runner *ContainerRunner) reportArvMountWarning(pattern, text string) {
1197         var updated arvados.Container
1198         err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
1199                 "container": arvadosclient.Dict{
1200                         "runtime_status": arvadosclient.Dict{
1201                                 "warning":       "arv-mount: " + pattern,
1202                                 "warningDetail": text,
1203                         },
1204                 },
1205         }, &updated)
1206         if err != nil {
1207                 runner.CrunchLog.Printf("error updating container runtime_status: %s", err)
1208         }
1209 }
1210
1211 // CaptureOutput saves data from the container's output directory if
1212 // needed, and updates the container output accordingly.
1213 func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) error {
1214         if runner.Container.RuntimeConstraints.API {
1215                 // Output may have been set directly by the container, so
1216                 // refresh the container record to check.
1217                 err := runner.DispatcherArvClient.Get("containers", runner.Container.UUID,
1218                         nil, &runner.Container)
1219                 if err != nil {
1220                         return err
1221                 }
1222                 if runner.Container.Output != "" {
1223                         // Container output is already set.
1224                         runner.OutputPDH = &runner.Container.Output
1225                         return nil
1226                 }
1227         }
1228
1229         txt, err := (&copier{
1230                 client:        runner.containerClient,
1231                 arvClient:     runner.ContainerArvClient,
1232                 keepClient:    runner.ContainerKeepClient,
1233                 hostOutputDir: runner.HostOutputDir,
1234                 ctrOutputDir:  runner.Container.OutputPath,
1235                 bindmounts:    bindmounts,
1236                 mounts:        runner.Container.Mounts,
1237                 secretMounts:  runner.SecretMounts,
1238                 logger:        runner.CrunchLog,
1239         }).Copy()
1240         if err != nil {
1241                 return err
1242         }
1243         if n := len(regexp.MustCompile(` [0-9a-f]+\+\S*\+R`).FindAllStringIndex(txt, -1)); n > 0 {
1244                 runner.CrunchLog.Printf("Copying %d data blocks from remote input collections...", n)
1245                 fs, err := (&arvados.Collection{ManifestText: txt}).FileSystem(runner.containerClient, runner.ContainerKeepClient)
1246                 if err != nil {
1247                         return err
1248                 }
1249                 txt, err = fs.MarshalManifest(".")
1250                 if err != nil {
1251                         return err
1252                 }
1253         }
1254         var resp arvados.Collection
1255         err = runner.ContainerArvClient.Create("collections", arvadosclient.Dict{
1256                 "ensure_unique_name": true,
1257                 "collection": arvadosclient.Dict{
1258                         "is_trashed":    true,
1259                         "name":          "output for " + runner.Container.UUID,
1260                         "manifest_text": txt,
1261                 },
1262         }, &resp)
1263         if err != nil {
1264                 return fmt.Errorf("error creating output collection: %v", err)
1265         }
1266         runner.OutputPDH = &resp.PortableDataHash
1267         return nil
1268 }
1269
1270 func (runner *ContainerRunner) CleanupDirs() {
1271         if runner.ArvMount != nil {
1272                 var delay int64 = 8
1273                 umount := exec.Command("arv-mount", fmt.Sprintf("--unmount-timeout=%d", delay), "--unmount", runner.ArvMountPoint)
1274                 umount.Stdout = runner.CrunchLog
1275                 umount.Stderr = runner.CrunchLog
1276                 runner.CrunchLog.Printf("Running %v", umount.Args)
1277                 umnterr := umount.Start()
1278
1279                 if umnterr != nil {
1280                         runner.CrunchLog.Printf("Error unmounting: %v", umnterr)
1281                         runner.ArvMount.Process.Kill()
1282                 } else {
1283                         // If arv-mount --unmount gets stuck for any reason, we
1284                         // don't want to wait for it forever.  Do Wait() in a goroutine
1285                         // so it doesn't block crunch-run.
1286                         umountExit := make(chan error)
1287                         go func() {
1288                                 mnterr := umount.Wait()
1289                                 if mnterr != nil {
1290                                         runner.CrunchLog.Printf("Error unmounting: %v", mnterr)
1291                                 }
1292                                 umountExit <- mnterr
1293                         }()
1294
1295                         for again := true; again; {
1296                                 again = false
1297                                 select {
1298                                 case <-umountExit:
1299                                         umount = nil
1300                                         again = true
1301                                 case <-runner.ArvMountExit:
1302                                         break
1303                                 case <-time.After(time.Duration((delay + 1) * int64(time.Second))):
1304                                         runner.CrunchLog.Printf("Timed out waiting for unmount")
1305                                         if umount != nil {
1306                                                 umount.Process.Kill()
1307                                         }
1308                                         runner.ArvMount.Process.Kill()
1309                                 }
1310                         }
1311                 }
1312                 runner.ArvMount = nil
1313         }
1314
1315         if runner.ArvMountPoint != "" {
1316                 if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil {
1317                         runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr)
1318                 }
1319                 runner.ArvMountPoint = ""
1320         }
1321
1322         if rmerr := os.RemoveAll(runner.parentTemp); rmerr != nil {
1323                 runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", runner.parentTemp, rmerr)
1324         }
1325 }
1326
1327 // CommitLogs posts the collection containing the final container logs.
1328 func (runner *ContainerRunner) CommitLogs() error {
1329         func() {
1330                 // Hold cStateLock to prevent races on CrunchLog (e.g., stop()).
1331                 runner.cStateLock.Lock()
1332                 defer runner.cStateLock.Unlock()
1333
1334                 runner.CrunchLog.Print(runner.finalState)
1335
1336                 if runner.arvMountLog != nil {
1337                         runner.arvMountLog.Close()
1338                 }
1339                 runner.CrunchLog.Close()
1340
1341                 // Closing CrunchLog above allows them to be committed to Keep at this
1342                 // point, but re-open crunch log with ArvClient in case there are any
1343                 // other further errors (such as failing to write the log to Keep!)
1344                 // while shutting down
1345                 runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{
1346                         ArvClient:     runner.DispatcherArvClient,
1347                         UUID:          runner.Container.UUID,
1348                         loggingStream: "crunch-run",
1349                         writeCloser:   nil,
1350                 })
1351                 runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
1352         }()
1353
1354         if runner.keepstoreLogger != nil {
1355                 // Flush any buffered logs from our local keepstore
1356                 // process.  Discard anything logged after this point
1357                 // -- it won't end up in the log collection, so
1358                 // there's no point writing it to the collectionfs.
1359                 runner.keepstoreLogbuf.SetWriter(io.Discard)
1360                 runner.keepstoreLogger.Close()
1361                 runner.keepstoreLogger = nil
1362         }
1363
1364         if runner.LogsPDH != nil {
1365                 // If we have already assigned something to LogsPDH,
1366                 // we must be closing the re-opened log, which won't
1367                 // end up getting attached to the container record and
1368                 // therefore doesn't need to be saved as a collection
1369                 // -- it exists only to send logs to other channels.
1370                 return nil
1371         }
1372
1373         saved, err := runner.saveLogCollection(true)
1374         if err != nil {
1375                 return fmt.Errorf("error saving log collection: %s", err)
1376         }
1377         runner.logMtx.Lock()
1378         defer runner.logMtx.Unlock()
1379         runner.LogsPDH = &saved.PortableDataHash
1380         return nil
1381 }
1382
1383 func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.Collection, err error) {
1384         runner.logMtx.Lock()
1385         defer runner.logMtx.Unlock()
1386         if runner.LogsPDH != nil {
1387                 // Already finalized.
1388                 return
1389         }
1390         updates := arvadosclient.Dict{
1391                 "name": "logs for " + runner.Container.UUID,
1392         }
1393         mt, err1 := runner.LogCollection.MarshalManifest(".")
1394         if err1 == nil {
1395                 // Only send updated manifest text if there was no
1396                 // error.
1397                 updates["manifest_text"] = mt
1398         }
1399
1400         // Even if flushing the manifest had an error, we still want
1401         // to update the log record, if possible, to push the trash_at
1402         // and delete_at times into the future.  Details on bug
1403         // #17293.
1404         if final {
1405                 updates["is_trashed"] = true
1406         } else {
1407                 exp := time.Now().Add(crunchLogUpdatePeriod * 24)
1408                 updates["trash_at"] = exp
1409                 updates["delete_at"] = exp
1410         }
1411         reqBody := arvadosclient.Dict{"collection": updates}
1412         var err2 error
1413         if runner.logUUID == "" {
1414                 reqBody["ensure_unique_name"] = true
1415                 err2 = runner.DispatcherArvClient.Create("collections", reqBody, &response)
1416         } else {
1417                 err2 = runner.DispatcherArvClient.Update("collections", runner.logUUID, reqBody, &response)
1418         }
1419         if err2 == nil {
1420                 runner.logUUID = response.UUID
1421         }
1422
1423         if err1 != nil || err2 != nil {
1424                 err = fmt.Errorf("error recording logs: %q, %q", err1, err2)
1425         }
1426         return
1427 }
1428
1429 // UpdateContainerRunning updates the container state to "Running"
1430 func (runner *ContainerRunner) UpdateContainerRunning() error {
1431         runner.cStateLock.Lock()
1432         defer runner.cStateLock.Unlock()
1433         if runner.cCancelled {
1434                 return ErrCancelled
1435         }
1436         return runner.DispatcherArvClient.Update("containers", runner.Container.UUID,
1437                 arvadosclient.Dict{"container": arvadosclient.Dict{
1438                         "state":           "Running",
1439                         "gateway_address": runner.gateway.Address,
1440                         "log":             runner.logUUID,
1441                 }}, nil)
1442 }
1443
1444 // ContainerToken returns the api_token the container (and any
1445 // arv-mount processes) are allowed to use.
1446 func (runner *ContainerRunner) ContainerToken() (string, error) {
1447         if runner.token != "" {
1448                 return runner.token, nil
1449         }
1450
1451         var auth arvados.APIClientAuthorization
1452         err := runner.DispatcherArvClient.Call("GET", "containers", runner.Container.UUID, "auth", nil, &auth)
1453         if err != nil {
1454                 return "", err
1455         }
1456         runner.token = fmt.Sprintf("v2/%s/%s/%s", auth.UUID, auth.APIToken, runner.Container.UUID)
1457         return runner.token, nil
1458 }
1459
1460 // UpdateContainerFinal updates the container record state on API
1461 // server to "Complete" or "Cancelled"
1462 func (runner *ContainerRunner) UpdateContainerFinal() error {
1463         update := arvadosclient.Dict{}
1464         update["state"] = runner.finalState
1465         if runner.LogsPDH != nil {
1466                 update["log"] = *runner.LogsPDH
1467         }
1468         if runner.ExitCode != nil {
1469                 update["exit_code"] = *runner.ExitCode
1470         } else {
1471                 update["exit_code"] = nil
1472         }
1473         if runner.finalState == "Complete" && runner.OutputPDH != nil {
1474                 update["output"] = *runner.OutputPDH
1475         }
1476         var it arvados.InstanceType
1477         if j := os.Getenv("InstanceType"); j != "" && json.Unmarshal([]byte(j), &it) == nil && it.Price > 0 {
1478                 update["cost"] = it.Price * time.Now().Sub(runner.costStartTime).Seconds() / time.Hour.Seconds()
1479         }
1480         return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
1481 }
1482
1483 // IsCancelled returns the value of Cancelled, with goroutine safety.
1484 func (runner *ContainerRunner) IsCancelled() bool {
1485         runner.cStateLock.Lock()
1486         defer runner.cStateLock.Unlock()
1487         return runner.cCancelled
1488 }
1489
1490 // NewArvLogWriter creates an ArvLogWriter
1491 func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, error) {
1492         writer, err := runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666)
1493         if err != nil {
1494                 return nil, err
1495         }
1496         return &ArvLogWriter{
1497                 ArvClient:     runner.DispatcherArvClient,
1498                 UUID:          runner.Container.UUID,
1499                 loggingStream: name,
1500                 writeCloser:   writer,
1501         }, nil
1502 }
1503
1504 // Run the full container lifecycle.
1505 func (runner *ContainerRunner) Run() (err error) {
1506         runner.CrunchLog.Printf("crunch-run %s started", cmd.Version.String())
1507         runner.CrunchLog.Printf("%s", currentUserAndGroups())
1508         v, _ := exec.Command("arv-mount", "--version").CombinedOutput()
1509         runner.CrunchLog.Printf("Using FUSE mount: %s", v)
1510         runner.CrunchLog.Printf("Using container runtime: %s", runner.executor.Runtime())
1511         runner.CrunchLog.Printf("Executing container: %s", runner.Container.UUID)
1512         runner.costStartTime = time.Now()
1513
1514         hostname, hosterr := os.Hostname()
1515         if hosterr != nil {
1516                 runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr)
1517         } else {
1518                 runner.CrunchLog.Printf("Executing on host '%s'", hostname)
1519         }
1520
1521         runner.finalState = "Queued"
1522
1523         defer func() {
1524                 runner.CleanupDirs()
1525
1526                 runner.CrunchLog.Printf("crunch-run finished")
1527                 runner.CrunchLog.Close()
1528         }()
1529
1530         err = runner.fetchContainerRecord()
1531         if err != nil {
1532                 return
1533         }
1534         if runner.Container.State != "Locked" {
1535                 return fmt.Errorf("dispatch error detected: container %q has state %q", runner.Container.UUID, runner.Container.State)
1536         }
1537
1538         var bindmounts map[string]bindmount
1539         defer func() {
1540                 // checkErr prints e (unless it's nil) and sets err to
1541                 // e (unless err is already non-nil). Thus, if err
1542                 // hasn't already been assigned when Run() returns,
1543                 // this cleanup func will cause Run() to return the
1544                 // first non-nil error that is passed to checkErr().
1545                 checkErr := func(errorIn string, e error) {
1546                         if e == nil {
1547                                 return
1548                         }
1549                         runner.CrunchLog.Printf("error in %s: %v", errorIn, e)
1550                         if err == nil {
1551                                 err = e
1552                         }
1553                         if runner.finalState == "Complete" {
1554                                 // There was an error in the finalization.
1555                                 runner.finalState = "Cancelled"
1556                         }
1557                 }
1558
1559                 // Log the error encountered in Run(), if any
1560                 checkErr("Run", err)
1561
1562                 if runner.finalState == "Queued" {
1563                         runner.UpdateContainerFinal()
1564                         return
1565                 }
1566
1567                 if runner.IsCancelled() {
1568                         runner.finalState = "Cancelled"
1569                         // but don't return yet -- we still want to
1570                         // capture partial output and write logs
1571                 }
1572
1573                 if bindmounts != nil {
1574                         checkErr("CaptureOutput", runner.CaptureOutput(bindmounts))
1575                 }
1576                 checkErr("stopHoststat", runner.stopHoststat())
1577                 checkErr("CommitLogs", runner.CommitLogs())
1578                 runner.CleanupDirs()
1579                 checkErr("UpdateContainerFinal", runner.UpdateContainerFinal())
1580         }()
1581
1582         runner.setupSignals()
1583         err = runner.startHoststat()
1584         if err != nil {
1585                 return
1586         }
1587         if runner.keepstore != nil {
1588                 runner.hoststatReporter.ReportPID("keepstore", runner.keepstore.Process.Pid)
1589         }
1590
1591         // set up FUSE mount and binds
1592         bindmounts, err = runner.SetupMounts()
1593         if err != nil {
1594                 runner.finalState = "Cancelled"
1595                 err = fmt.Errorf("While setting up mounts: %v", err)
1596                 return
1597         }
1598
1599         // check for and/or load image
1600         imageID, err := runner.LoadImage()
1601         if err != nil {
1602                 if !runner.checkBrokenNode(err) {
1603                         // Failed to load image but not due to a "broken node"
1604                         // condition, probably user error.
1605                         runner.finalState = "Cancelled"
1606                 }
1607                 err = fmt.Errorf("While loading container image: %v", err)
1608                 return
1609         }
1610
1611         err = runner.CreateContainer(imageID, bindmounts)
1612         if err != nil {
1613                 return
1614         }
1615         err = runner.LogHostInfo()
1616         if err != nil {
1617                 return
1618         }
1619         err = runner.LogNodeRecord()
1620         if err != nil {
1621                 return
1622         }
1623         err = runner.LogContainerRecord()
1624         if err != nil {
1625                 return
1626         }
1627
1628         if runner.IsCancelled() {
1629                 return
1630         }
1631
1632         _, err = runner.saveLogCollection(false)
1633         if err != nil {
1634                 runner.CrunchLog.Printf("Error committing initial log collection: %v", err)
1635         }
1636         err = runner.UpdateContainerRunning()
1637         if err != nil {
1638                 return
1639         }
1640         runner.finalState = "Cancelled"
1641
1642         err = runner.startCrunchstat()
1643         if err != nil {
1644                 return
1645         }
1646
1647         err = runner.StartContainer()
1648         if err != nil {
1649                 runner.checkBrokenNode(err)
1650                 return
1651         }
1652
1653         err = runner.WaitFinish()
1654         if err == nil && !runner.IsCancelled() {
1655                 runner.finalState = "Complete"
1656         }
1657         return
1658 }
1659
1660 // Fetch the current container record (uuid = runner.Container.UUID)
1661 // into runner.Container.
1662 func (runner *ContainerRunner) fetchContainerRecord() error {
1663         reader, err := runner.DispatcherArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
1664         if err != nil {
1665                 return fmt.Errorf("error fetching container record: %v", err)
1666         }
1667         defer reader.Close()
1668
1669         dec := json.NewDecoder(reader)
1670         dec.UseNumber()
1671         err = dec.Decode(&runner.Container)
1672         if err != nil {
1673                 return fmt.Errorf("error decoding container record: %v", err)
1674         }
1675
1676         var sm struct {
1677                 SecretMounts map[string]arvados.Mount `json:"secret_mounts"`
1678         }
1679
1680         containerToken, err := runner.ContainerToken()
1681         if err != nil {
1682                 return fmt.Errorf("error getting container token: %v", err)
1683         }
1684
1685         runner.ContainerArvClient, runner.ContainerKeepClient,
1686                 runner.containerClient, err = runner.MkArvClient(containerToken)
1687         if err != nil {
1688                 return fmt.Errorf("error creating container API client: %v", err)
1689         }
1690
1691         runner.ContainerKeepClient.SetStorageClasses(runner.Container.OutputStorageClasses)
1692         runner.DispatcherKeepClient.SetStorageClasses(runner.Container.OutputStorageClasses)
1693
1694         err = runner.ContainerArvClient.Call("GET", "containers", runner.Container.UUID, "secret_mounts", nil, &sm)
1695         if err != nil {
1696                 if apierr, ok := err.(arvadosclient.APIServerError); !ok || apierr.HttpStatusCode != 404 {
1697                         return fmt.Errorf("error fetching secret_mounts: %v", err)
1698                 }
1699                 // ok && apierr.HttpStatusCode == 404, which means
1700                 // secret_mounts isn't supported by this API server.
1701         }
1702         runner.SecretMounts = sm.SecretMounts
1703
1704         return nil
1705 }
1706
1707 // NewContainerRunner creates a new container runner.
1708 func NewContainerRunner(dispatcherClient *arvados.Client,
1709         dispatcherArvClient IArvadosClient,
1710         dispatcherKeepClient IKeepClient,
1711         containerUUID string) (*ContainerRunner, error) {
1712
1713         cr := &ContainerRunner{
1714                 dispatcherClient:     dispatcherClient,
1715                 DispatcherArvClient:  dispatcherArvClient,
1716                 DispatcherKeepClient: dispatcherKeepClient,
1717         }
1718         cr.NewLogWriter = cr.NewArvLogWriter
1719         cr.RunArvMount = cr.ArvMountCmd
1720         cr.MkTempDir = ioutil.TempDir
1721         cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
1722                 cl, err := arvadosclient.MakeArvadosClient()
1723                 if err != nil {
1724                         return nil, nil, nil, err
1725                 }
1726                 cl.ApiToken = token
1727                 kc, err := keepclient.MakeKeepClient(cl)
1728                 if err != nil {
1729                         return nil, nil, nil, err
1730                 }
1731                 c2 := arvados.NewClientFromEnv()
1732                 c2.AuthToken = token
1733                 return cl, kc, c2, nil
1734         }
1735         var err error
1736         cr.LogCollection, err = (&arvados.Collection{}).FileSystem(cr.dispatcherClient, cr.DispatcherKeepClient)
1737         if err != nil {
1738                 return nil, err
1739         }
1740         cr.Container.UUID = containerUUID
1741         w, err := cr.NewLogWriter("crunch-run")
1742         if err != nil {
1743                 return nil, err
1744         }
1745         cr.CrunchLog = NewThrottledLogger(w)
1746         cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
1747
1748         loadLogThrottleParams(dispatcherArvClient)
1749         go cr.updateLogs()
1750
1751         return cr, nil
1752 }
1753
1754 func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
1755         log := log.New(stderr, "", 0)
1756         flags := flag.NewFlagSet(prog, flag.ContinueOnError)
1757         statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
1758         cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
1759         cgroupParent := flags.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
1760         cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
1761         caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
1762         detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
1763         stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin")
1764         configFile := flags.String("config", arvados.DefaultConfigFile, "filename of cluster config file to try loading if -stdin-config=false (default is $ARVADOS_CONFIG)")
1765         sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
1766         kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
1767         list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
1768         enableMemoryLimit := flags.Bool("enable-memory-limit", true, "tell container runtime to limit container's memory usage")
1769         enableNetwork := flags.String("container-enable-networking", "default", "enable networking \"always\" (for all containers) or \"default\" (for containers that request it)")
1770         networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`)
1771         memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
1772         runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity")
1773         brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)")
1774         flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
1775         version := flags.Bool("version", false, "Write version information to stdout and exit 0.")
1776
1777         ignoreDetachFlag := false
1778         if len(args) > 0 && args[0] == "-no-detach" {
1779                 // This process was invoked by a parent process, which
1780                 // has passed along its own arguments, including
1781                 // -detach, after the leading -no-detach flag.  Strip
1782                 // the leading -no-detach flag (it's not recognized by
1783                 // flags.Parse()) and ignore the -detach flag that
1784                 // comes later.
1785                 args = args[1:]
1786                 ignoreDetachFlag = true
1787         }
1788
1789         if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok {
1790                 return code
1791         } else if *version {
1792                 fmt.Fprintln(stdout, prog, cmd.Version.String())
1793                 return 0
1794         } else if !*list && flags.NArg() != 1 {
1795                 fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
1796                 return 2
1797         }
1798
1799         containerUUID := flags.Arg(0)
1800
1801         switch {
1802         case *detach && !ignoreDetachFlag:
1803                 return Detach(containerUUID, prog, args, os.Stdin, os.Stdout, os.Stderr)
1804         case *kill >= 0:
1805                 return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr)
1806         case *list:
1807                 return ListProcesses(os.Stdout, os.Stderr)
1808         }
1809
1810         if len(containerUUID) != 27 {
1811                 log.Printf("usage: %s [options] UUID", prog)
1812                 return 1
1813         }
1814
1815         var keepstoreLogbuf bufThenWrite
1816         var conf ConfigData
1817         if *stdinConfig {
1818                 err := json.NewDecoder(stdin).Decode(&conf)
1819                 if err != nil {
1820                         log.Printf("decode stdin: %s", err)
1821                         return 1
1822                 }
1823                 for k, v := range conf.Env {
1824                         err = os.Setenv(k, v)
1825                         if err != nil {
1826                                 log.Printf("setenv(%q): %s", k, err)
1827                                 return 1
1828                         }
1829                 }
1830                 if conf.Cluster != nil {
1831                         // ClusterID is missing from the JSON
1832                         // representation, but we need it to generate
1833                         // a valid config file for keepstore, so we
1834                         // fill it using the container UUID prefix.
1835                         conf.Cluster.ClusterID = containerUUID[:5]
1836                 }
1837         } else {
1838                 conf = hpcConfData(containerUUID, *configFile, io.MultiWriter(&keepstoreLogbuf, stderr))
1839         }
1840
1841         log.Printf("crunch-run %s started", cmd.Version.String())
1842         time.Sleep(*sleep)
1843
1844         if *caCertsPath != "" {
1845                 arvadosclient.CertFiles = []string{*caCertsPath}
1846         }
1847
1848         keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
1849         if err != nil {
1850                 log.Print(err)
1851                 return 1
1852         }
1853         if keepstore != nil {
1854                 defer keepstore.Process.Kill()
1855         }
1856
1857         api, err := arvadosclient.MakeArvadosClient()
1858         if err != nil {
1859                 log.Printf("%s: %v", containerUUID, err)
1860                 return 1
1861         }
1862         api.Retries = 8
1863
1864         kc, err := keepclient.MakeKeepClient(api)
1865         if err != nil {
1866                 log.Printf("%s: %v", containerUUID, err)
1867                 return 1
1868         }
1869         kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
1870         kc.Retries = 4
1871
1872         cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, containerUUID)
1873         if err != nil {
1874                 log.Print(err)
1875                 return 1
1876         }
1877
1878         cr.keepstore = keepstore
1879         if keepstore == nil {
1880                 // Log explanation (if any) for why we're not running
1881                 // a local keepstore.
1882                 var buf bytes.Buffer
1883                 keepstoreLogbuf.SetWriter(&buf)
1884                 if buf.Len() > 0 {
1885                         cr.CrunchLog.Printf("%s", strings.TrimSpace(buf.String()))
1886                 }
1887         } else if logWhat := conf.Cluster.Containers.LocalKeepLogsToContainerLog; logWhat == "none" {
1888                 cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
1889                 keepstoreLogbuf.SetWriter(io.Discard)
1890         } else {
1891                 cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
1892                 logwriter, err := cr.NewLogWriter("keepstore")
1893                 if err != nil {
1894                         log.Print(err)
1895                         return 1
1896                 }
1897                 cr.keepstoreLogger = NewThrottledLogger(logwriter)
1898
1899                 var writer io.WriteCloser = cr.keepstoreLogger
1900                 if logWhat == "errors" {
1901                         writer = &filterKeepstoreErrorsOnly{WriteCloser: writer}
1902                 } else if logWhat != "all" {
1903                         // should have been caught earlier by
1904                         // dispatcher's config loader
1905                         log.Printf("invalid value for Containers.LocalKeepLogsToContainerLog: %q", logWhat)
1906                         return 1
1907                 }
1908                 err = keepstoreLogbuf.SetWriter(writer)
1909                 if err != nil {
1910                         log.Print(err)
1911                         return 1
1912                 }
1913                 cr.keepstoreLogbuf = &keepstoreLogbuf
1914         }
1915
1916         switch *runtimeEngine {
1917         case "docker":
1918                 cr.executor, err = newDockerExecutor(containerUUID, cr.CrunchLog.Printf, cr.containerWatchdogInterval)
1919         case "singularity":
1920                 cr.executor, err = newSingularityExecutor(cr.CrunchLog.Printf)
1921         default:
1922                 cr.CrunchLog.Printf("%s: unsupported RuntimeEngine %q", containerUUID, *runtimeEngine)
1923                 cr.CrunchLog.Close()
1924                 return 1
1925         }
1926         if err != nil {
1927                 cr.CrunchLog.Printf("%s: %v", containerUUID, err)
1928                 cr.checkBrokenNode(err)
1929                 cr.CrunchLog.Close()
1930                 return 1
1931         }
1932         defer cr.executor.Close()
1933
1934         cr.brokenNodeHook = *brokenNodeHook
1935
1936         gwAuthSecret := os.Getenv("GatewayAuthSecret")
1937         os.Unsetenv("GatewayAuthSecret")
1938         if gwAuthSecret == "" {
1939                 // not safe to run a gateway service without an auth
1940                 // secret
1941                 cr.CrunchLog.Printf("Not starting a gateway server (GatewayAuthSecret was not provided by dispatcher)")
1942         } else {
1943                 gwListen := os.Getenv("GatewayAddress")
1944                 cr.gateway = Gateway{
1945                         Address:       gwListen,
1946                         AuthSecret:    gwAuthSecret,
1947                         ContainerUUID: containerUUID,
1948                         Target:        cr.executor,
1949                         Log:           cr.CrunchLog,
1950                 }
1951                 if gwListen == "" {
1952                         // Direct connection won't work, so we use the
1953                         // gateway_address field to indicate the
1954                         // internalURL of the controller process that
1955                         // has the current tunnel connection.
1956                         cr.gateway.ArvadosClient = cr.dispatcherClient
1957                         cr.gateway.UpdateTunnelURL = func(url string) {
1958                                 cr.gateway.Address = "tunnel " + url
1959                                 cr.DispatcherArvClient.Update("containers", containerUUID,
1960                                         arvadosclient.Dict{"container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}}, nil)
1961                         }
1962                 }
1963                 err = cr.gateway.Start()
1964                 if err != nil {
1965                         log.Printf("error starting gateway server: %s", err)
1966                         return 1
1967                 }
1968         }
1969
1970         parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerUUID+".")
1971         if tmperr != nil {
1972                 log.Printf("%s: %v", containerUUID, tmperr)
1973                 return 1
1974         }
1975
1976         cr.parentTemp = parentTemp
1977         cr.statInterval = *statInterval
1978         cr.cgroupRoot = *cgroupRoot
1979         cr.expectCgroupParent = *cgroupParent
1980         cr.enableMemoryLimit = *enableMemoryLimit
1981         cr.enableNetwork = *enableNetwork
1982         cr.networkMode = *networkMode
1983         if *cgroupParentSubsystem != "" {
1984                 p, err := findCgroup(*cgroupParentSubsystem)
1985                 if err != nil {
1986                         log.Printf("fatal: cgroup parent subsystem: %s", err)
1987                         return 1
1988                 }
1989                 cr.setCgroupParent = p
1990                 cr.expectCgroupParent = p
1991         }
1992
1993         runerr := cr.Run()
1994
1995         if *memprofile != "" {
1996                 f, err := os.Create(*memprofile)
1997                 if err != nil {
1998                         log.Printf("could not create memory profile: %s", err)
1999                 }
2000                 runtime.GC() // get up-to-date statistics
2001                 if err := pprof.WriteHeapProfile(f); err != nil {
2002                         log.Printf("could not write memory profile: %s", err)
2003                 }
2004                 closeerr := f.Close()
2005                 if closeerr != nil {
2006                         log.Printf("closing memprofile file: %s", err)
2007                 }
2008         }
2009
2010         if runerr != nil {
2011                 log.Printf("%s: %v", containerUUID, runerr)
2012                 return 1
2013         }
2014         return 0
2015 }
2016
2017 // Try to load ConfigData in hpc (slurm/lsf) environment. This means
2018 // loading the cluster config from the specified file and (if that
2019 // works) getting the runtime_constraints container field from
2020 // controller to determine # VCPUs so we can calculate KeepBuffers.
2021 func hpcConfData(uuid string, configFile string, stderr io.Writer) ConfigData {
2022         var conf ConfigData
2023         conf.Cluster = loadClusterConfigFile(configFile, stderr)
2024         if conf.Cluster == nil {
2025                 // skip loading the container record -- we won't be
2026                 // able to start local keepstore anyway.
2027                 return conf
2028         }
2029         arv, err := arvadosclient.MakeArvadosClient()
2030         if err != nil {
2031                 fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err)
2032                 return conf
2033         }
2034         arv.Retries = 8
2035         var ctr arvados.Container
2036         err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr)
2037         if err != nil {
2038                 fmt.Fprintf(stderr, "error getting container record: %s\n", err)
2039                 return conf
2040         }
2041         if ctr.RuntimeConstraints.VCPUs > 0 {
2042                 conf.KeepBuffers = ctr.RuntimeConstraints.VCPUs * conf.Cluster.Containers.LocalKeepBlobBuffersPerVCPU
2043         }
2044         return conf
2045 }
2046
2047 // Load cluster config file from given path. If an error occurs, log
2048 // the error to stderr and return nil.
2049 func loadClusterConfigFile(path string, stderr io.Writer) *arvados.Cluster {
2050         ldr := config.NewLoader(&bytes.Buffer{}, ctxlog.New(stderr, "plain", "info"))
2051         ldr.Path = path
2052         cfg, err := ldr.Load()
2053         if err != nil {
2054                 fmt.Fprintf(stderr, "could not load config file %s: %s\n", path, err)
2055                 return nil
2056         }
2057         cluster, err := cfg.GetCluster("")
2058         if err != nil {
2059                 fmt.Fprintf(stderr, "could not use config file %s: %s\n", path, err)
2060                 return nil
2061         }
2062         fmt.Fprintf(stderr, "loaded config file %s\n", path)
2063         return cluster
2064 }
2065
2066 func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) {
2067         if configData.KeepBuffers < 1 {
2068                 fmt.Fprintf(logbuf, "not starting a local keepstore process because KeepBuffers=%v in config\n", configData.KeepBuffers)
2069                 return nil, nil
2070         }
2071         if configData.Cluster == nil {
2072                 fmt.Fprint(logbuf, "not starting a local keepstore process because cluster config file was not loaded\n")
2073                 return nil, nil
2074         }
2075         for uuid, vol := range configData.Cluster.Volumes {
2076                 if len(vol.AccessViaHosts) > 0 {
2077                         fmt.Fprintf(logbuf, "not starting a local keepstore process because a volume (%s) uses AccessViaHosts\n", uuid)
2078                         return nil, nil
2079                 }
2080                 if !vol.ReadOnly && vol.Replication < configData.Cluster.Collections.DefaultReplication {
2081                         fmt.Fprintf(logbuf, "not starting a local keepstore process because a writable volume (%s) has replication less than Collections.DefaultReplication (%d < %d)\n", uuid, vol.Replication, configData.Cluster.Collections.DefaultReplication)
2082                         return nil, nil
2083                 }
2084         }
2085
2086         // Rather than have an alternate way to tell keepstore how
2087         // many buffers to use when starting it this way, we just
2088         // modify the cluster configuration that we feed it on stdin.
2089         configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers
2090
2091         localaddr := localKeepstoreAddr()
2092         ln, err := net.Listen("tcp", net.JoinHostPort(localaddr, "0"))
2093         if err != nil {
2094                 return nil, err
2095         }
2096         _, port, err := net.SplitHostPort(ln.Addr().String())
2097         if err != nil {
2098                 ln.Close()
2099                 return nil, err
2100         }
2101         ln.Close()
2102         url := "http://" + net.JoinHostPort(localaddr, port)
2103
2104         fmt.Fprintf(logbuf, "starting keepstore on %s\n", url)
2105
2106         var confJSON bytes.Buffer
2107         err = json.NewEncoder(&confJSON).Encode(arvados.Config{
2108                 Clusters: map[string]arvados.Cluster{
2109                         configData.Cluster.ClusterID: *configData.Cluster,
2110                 },
2111         })
2112         if err != nil {
2113                 return nil, err
2114         }
2115         cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-")
2116         if target, err := os.Readlink(cmd.Path); err == nil && strings.HasSuffix(target, ".test") {
2117                 // If we're a 'go test' process, running
2118                 // /proc/self/exe would start the test suite in a
2119                 // child process, which is not what we want.
2120                 cmd.Path, _ = exec.LookPath("go")
2121                 cmd.Args = append([]string{"go", "run", "../../cmd/arvados-server"}, cmd.Args[1:]...)
2122                 cmd.Env = os.Environ()
2123         }
2124         cmd.Stdin = &confJSON
2125         cmd.Stdout = logbuf
2126         cmd.Stderr = logbuf
2127         cmd.Env = append(cmd.Env,
2128                 "GOGC=10",
2129                 "ARVADOS_SERVICE_INTERNAL_URL="+url)
2130         err = cmd.Start()
2131         if err != nil {
2132                 return nil, fmt.Errorf("error starting keepstore process: %w", err)
2133         }
2134         cmdExited := false
2135         go func() {
2136                 cmd.Wait()
2137                 cmdExited = true
2138         }()
2139         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
2140         defer cancel()
2141         poll := time.NewTicker(time.Second / 10)
2142         defer poll.Stop()
2143         client := http.Client{}
2144         for range poll.C {
2145                 testReq, err := http.NewRequestWithContext(ctx, "GET", url+"/_health/ping", nil)
2146                 testReq.Header.Set("Authorization", "Bearer "+configData.Cluster.ManagementToken)
2147                 if err != nil {
2148                         return nil, err
2149                 }
2150                 resp, err := client.Do(testReq)
2151                 if err == nil {
2152                         resp.Body.Close()
2153                         if resp.StatusCode == http.StatusOK {
2154                                 break
2155                         }
2156                 }
2157                 if cmdExited {
2158                         return nil, fmt.Errorf("keepstore child process exited")
2159                 }
2160                 if ctx.Err() != nil {
2161                         return nil, fmt.Errorf("timed out waiting for new keepstore process to report healthy")
2162                 }
2163         }
2164         os.Setenv("ARVADOS_KEEP_SERVICES", url)
2165         return cmd, nil
2166 }
2167
2168 // return current uid, gid, groups in a format suitable for logging:
2169 // "crunch-run process has uid=1234(arvados) gid=1234(arvados)
2170 // groups=1234(arvados),114(fuse)"
2171 func currentUserAndGroups() string {
2172         u, err := user.Current()
2173         if err != nil {
2174                 return fmt.Sprintf("error getting current user ID: %s", err)
2175         }
2176         s := fmt.Sprintf("crunch-run process has uid=%s(%s) gid=%s", u.Uid, u.Username, u.Gid)
2177         if g, err := user.LookupGroupId(u.Gid); err == nil {
2178                 s += fmt.Sprintf("(%s)", g.Name)
2179         }
2180         s += " groups="
2181         if gids, err := u.GroupIds(); err == nil {
2182                 for i, gid := range gids {
2183                         if i > 0 {
2184                                 s += ","
2185                         }
2186                         s += gid
2187                         if g, err := user.LookupGroupId(gid); err == nil {
2188                                 s += fmt.Sprintf("(%s)", g.Name)
2189                         }
2190                 }
2191         }
2192         return s
2193 }
2194
2195 // Return a suitable local interface address for a local keepstore
2196 // service. Currently this is the numerically lowest non-loopback ipv4
2197 // address assigned to a local interface that is not in any of the
2198 // link-local/vpn/loopback ranges 169.254/16, 100.64/10, or 127/8.
2199 func localKeepstoreAddr() string {
2200         var ips []net.IP
2201         // Ignore error (proceed with zero IPs)
2202         addrs, _ := processIPs(os.Getpid())
2203         for addr := range addrs {
2204                 ip := net.ParseIP(addr)
2205                 if ip == nil {
2206                         // invalid
2207                         continue
2208                 }
2209                 if ip.Mask(net.CIDRMask(8, 32)).Equal(net.IPv4(127, 0, 0, 0)) ||
2210                         ip.Mask(net.CIDRMask(10, 32)).Equal(net.IPv4(100, 64, 0, 0)) ||
2211                         ip.Mask(net.CIDRMask(16, 32)).Equal(net.IPv4(169, 254, 0, 0)) {
2212                         // unsuitable
2213                         continue
2214                 }
2215                 ips = append(ips, ip)
2216         }
2217         if len(ips) == 0 {
2218                 return "0.0.0.0"
2219         }
2220         sort.Slice(ips, func(ii, jj int) bool {
2221                 i, j := ips[ii], ips[jj]
2222                 if len(i) != len(j) {
2223                         return len(i) < len(j)
2224                 }
2225                 for x := range i {
2226                         if i[x] != j[x] {
2227                                 return i[x] < j[x]
2228                         }
2229                 }
2230                 return false
2231         })
2232         return ips[0].String()
2233 }