Merge branch '12690-12748-crunchstat-summary'
[arvados.git] / services / crunch-run / crunchrun.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "encoding/json"
10         "errors"
11         "flag"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "log"
16         "os"
17         "os/exec"
18         "os/signal"
19         "path"
20         "path/filepath"
21         "regexp"
22         "runtime"
23         "runtime/pprof"
24         "sort"
25         "strings"
26         "sync"
27         "syscall"
28         "time"
29
30         "git.curoverse.com/arvados.git/lib/crunchstat"
31         "git.curoverse.com/arvados.git/sdk/go/arvados"
32         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
33         "git.curoverse.com/arvados.git/sdk/go/keepclient"
34         "git.curoverse.com/arvados.git/sdk/go/manifest"
35         "github.com/shirou/gopsutil/process"
36         "golang.org/x/net/context"
37
38         dockertypes "github.com/docker/docker/api/types"
39         dockercontainer "github.com/docker/docker/api/types/container"
40         dockernetwork "github.com/docker/docker/api/types/network"
41         dockerclient "github.com/docker/docker/client"
42 )
43
44 var version = "dev"
45
46 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
47 type IArvadosClient interface {
48         Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
49         Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
50         Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
51         Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error
52         CallRaw(method string, resourceType string, uuid string, action string, parameters arvadosclient.Dict) (reader io.ReadCloser, err error)
53         Discovery(key string) (interface{}, error)
54 }
55
56 // ErrCancelled is the error returned when the container is cancelled.
57 var ErrCancelled = errors.New("Cancelled")
58
59 // IKeepClient is the minimal Keep API methods used by crunch-run.
60 type IKeepClient interface {
61         PutB(buf []byte) (string, int, error)
62         ReadAt(locator string, p []byte, off int) (int, error)
63         ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
64         ClearBlockCache()
65 }
66
67 // NewLogWriter is a factory function to create a new log writer.
68 type NewLogWriter func(name string) (io.WriteCloser, error)
69
70 type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
71
72 type MkTempDir func(string, string) (string, error)
73
74 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
75 type ThinDockerClient interface {
76         ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error)
77         ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
78                 networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
79         ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
80         ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error
81         ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
82         ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
83         ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
84         ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
85 }
86
87 type PsProcess interface {
88         CmdlineSlice() ([]string, error)
89 }
90
91 // ContainerRunner is the main stateful struct used for a single execution of a
92 // container.
93 type ContainerRunner struct {
94         Docker          ThinDockerClient
95         client          *arvados.Client
96         ArvClient       IArvadosClient
97         Kc              IKeepClient
98         Container       arvados.Container
99         ContainerConfig dockercontainer.Config
100         HostConfig      dockercontainer.HostConfig
101         token           string
102         ContainerID     string
103         ExitCode        *int
104         NewLogWriter    NewLogWriter
105         loggingDone     chan bool
106         CrunchLog       *ThrottledLogger
107         Stdout          io.WriteCloser
108         Stderr          io.WriteCloser
109         logUUID         string
110         logMtx          sync.Mutex
111         LogCollection   arvados.CollectionFileSystem
112         LogsPDH         *string
113         RunArvMount     RunArvMount
114         MkTempDir       MkTempDir
115         ArvMount        *exec.Cmd
116         ArvMountPoint   string
117         HostOutputDir   string
118         Binds           []string
119         Volumes         map[string]struct{}
120         OutputPDH       *string
121         SigChan         chan os.Signal
122         ArvMountExit    chan error
123         SecretMounts    map[string]arvados.Mount
124         MkArvClient     func(token string) (IArvadosClient, error)
125         finalState      string
126         parentTemp      string
127
128         ListProcesses func() ([]PsProcess, error)
129
130         statLogger       io.WriteCloser
131         statReporter     *crunchstat.Reporter
132         hoststatLogger   io.WriteCloser
133         hoststatReporter *crunchstat.Reporter
134         statInterval     time.Duration
135         cgroupRoot       string
136         // What we expect the container's cgroup parent to be.
137         expectCgroupParent string
138         // What we tell docker to use as the container's cgroup
139         // parent. Note: Ideally we would use the same field for both
140         // expectCgroupParent and setCgroupParent, and just make it
141         // default to "docker". However, when using docker < 1.10 with
142         // systemd, specifying a non-empty cgroup parent (even the
143         // default value "docker") hits a docker bug
144         // (https://github.com/docker/docker/issues/17126). Using two
145         // separate fields makes it possible to use the "expect cgroup
146         // parent to be X" feature even on sites where the "specify
147         // cgroup parent" feature breaks.
148         setCgroupParent string
149
150         cStateLock sync.Mutex
151         cCancelled bool // StopContainer() invoked
152
153         enableNetwork   string // one of "default" or "always"
154         networkMode     string // passed through to HostConfig.NetworkMode
155         arvMountLog     *ThrottledLogger
156         checkContainerd time.Duration
157 }
158
159 // setupSignals sets up signal handling to gracefully terminate the underlying
160 // Docker container and update state when receiving a TERM, INT or QUIT signal.
161 func (runner *ContainerRunner) setupSignals() {
162         runner.SigChan = make(chan os.Signal, 1)
163         signal.Notify(runner.SigChan, syscall.SIGTERM)
164         signal.Notify(runner.SigChan, syscall.SIGINT)
165         signal.Notify(runner.SigChan, syscall.SIGQUIT)
166
167         go func(sig chan os.Signal) {
168                 for s := range sig {
169                         runner.stop(s)
170                 }
171         }(runner.SigChan)
172 }
173
174 // stop the underlying Docker container.
175 func (runner *ContainerRunner) stop(sig os.Signal) {
176         runner.cStateLock.Lock()
177         defer runner.cStateLock.Unlock()
178         if sig != nil {
179                 runner.CrunchLog.Printf("caught signal: %v", sig)
180         }
181         if runner.ContainerID == "" {
182                 return
183         }
184         runner.cCancelled = true
185         runner.CrunchLog.Printf("removing container")
186         err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true})
187         if err != nil {
188                 runner.CrunchLog.Printf("error removing container: %s", err)
189         }
190 }
191
192 var errorBlacklist = []string{
193         "(?ms).*[Cc]annot connect to the Docker daemon.*",
194         "(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*",
195         "(?ms).*grpc: the connection is unavailable.*",
196 }
197 var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
198
199 func (runner *ContainerRunner) runBrokenNodeHook() {
200         if *brokenNodeHook == "" {
201                 runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
202         } else {
203                 runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
204                 // run killme script
205                 c := exec.Command(*brokenNodeHook)
206                 c.Stdout = runner.CrunchLog
207                 c.Stderr = runner.CrunchLog
208                 err := c.Run()
209                 if err != nil {
210                         runner.CrunchLog.Printf("Error running broken node hook: %v", err)
211                 }
212         }
213 }
214
215 func (runner *ContainerRunner) checkBrokenNode(goterr error) bool {
216         for _, d := range errorBlacklist {
217                 if m, e := regexp.MatchString(d, goterr.Error()); m && e == nil {
218                         runner.CrunchLog.Printf("Error suggests node is unable to run containers: %v", goterr)
219                         runner.runBrokenNodeHook()
220                         return true
221                 }
222         }
223         return false
224 }
225
226 // LoadImage determines the docker image id from the container record and
227 // checks if it is available in the local Docker image store.  If not, it loads
228 // the image from Keep.
229 func (runner *ContainerRunner) LoadImage() (err error) {
230
231         runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
232
233         var collection arvados.Collection
234         err = runner.ArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
235         if err != nil {
236                 return fmt.Errorf("While getting container image collection: %v", err)
237         }
238         manifest := manifest.Manifest{Text: collection.ManifestText}
239         var img, imageID string
240         for ms := range manifest.StreamIter() {
241                 img = ms.FileStreamSegments[0].Name
242                 if !strings.HasSuffix(img, ".tar") {
243                         return fmt.Errorf("First file in the container image collection does not end in .tar")
244                 }
245                 imageID = img[:len(img)-4]
246         }
247
248         runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
249
250         _, _, err = runner.Docker.ImageInspectWithRaw(context.TODO(), imageID)
251         if err != nil {
252                 runner.CrunchLog.Print("Loading Docker image from keep")
253
254                 var readCloser io.ReadCloser
255                 readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
256                 if err != nil {
257                         return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
258                 }
259
260                 response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, true)
261                 if err != nil {
262                         return fmt.Errorf("While loading container image into Docker: %v", err)
263                 }
264
265                 defer response.Body.Close()
266                 rbody, err := ioutil.ReadAll(response.Body)
267                 if err != nil {
268                         return fmt.Errorf("Reading response to image load: %v", err)
269                 }
270                 runner.CrunchLog.Printf("Docker response: %s", rbody)
271         } else {
272                 runner.CrunchLog.Print("Docker image is available")
273         }
274
275         runner.ContainerConfig.Image = imageID
276
277         runner.Kc.ClearBlockCache()
278
279         return nil
280 }
281
282 func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) {
283         c = exec.Command("arv-mount", arvMountCmd...)
284
285         // Copy our environment, but override ARVADOS_API_TOKEN with
286         // the container auth token.
287         c.Env = nil
288         for _, s := range os.Environ() {
289                 if !strings.HasPrefix(s, "ARVADOS_API_TOKEN=") {
290                         c.Env = append(c.Env, s)
291                 }
292         }
293         c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
294
295         w, err := runner.NewLogWriter("arv-mount")
296         if err != nil {
297                 return nil, err
298         }
299         runner.arvMountLog = NewThrottledLogger(w)
300         c.Stdout = runner.arvMountLog
301         c.Stderr = runner.arvMountLog
302
303         runner.CrunchLog.Printf("Running %v", c.Args)
304
305         err = c.Start()
306         if err != nil {
307                 return nil, err
308         }
309
310         statReadme := make(chan bool)
311         runner.ArvMountExit = make(chan error)
312
313         keepStatting := true
314         go func() {
315                 for keepStatting {
316                         time.Sleep(100 * time.Millisecond)
317                         _, err = os.Stat(fmt.Sprintf("%s/by_id/README", runner.ArvMountPoint))
318                         if err == nil {
319                                 keepStatting = false
320                                 statReadme <- true
321                         }
322                 }
323                 close(statReadme)
324         }()
325
326         go func() {
327                 mnterr := c.Wait()
328                 if mnterr != nil {
329                         runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
330                 }
331                 runner.ArvMountExit <- mnterr
332                 close(runner.ArvMountExit)
333         }()
334
335         select {
336         case <-statReadme:
337                 break
338         case err := <-runner.ArvMountExit:
339                 runner.ArvMount = nil
340                 keepStatting = false
341                 return nil, err
342         }
343
344         return c, nil
345 }
346
347 func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) {
348         if runner.ArvMountPoint == "" {
349                 runner.ArvMountPoint, err = runner.MkTempDir(runner.parentTemp, prefix)
350         }
351         return
352 }
353
354 func copyfile(src string, dst string) (err error) {
355         srcfile, err := os.Open(src)
356         if err != nil {
357                 return
358         }
359
360         os.MkdirAll(path.Dir(dst), 0777)
361
362         dstfile, err := os.Create(dst)
363         if err != nil {
364                 return
365         }
366         _, err = io.Copy(dstfile, srcfile)
367         if err != nil {
368                 return
369         }
370
371         err = srcfile.Close()
372         err2 := dstfile.Close()
373
374         if err != nil {
375                 return
376         }
377
378         if err2 != nil {
379                 return err2
380         }
381
382         return nil
383 }
384
385 func (runner *ContainerRunner) SetupMounts() (err error) {
386         err = runner.SetupArvMountPoint("keep")
387         if err != nil {
388                 return fmt.Errorf("While creating keep mount temp dir: %v", err)
389         }
390
391         token, err := runner.ContainerToken()
392         if err != nil {
393                 return fmt.Errorf("could not get container token: %s", err)
394         }
395
396         pdhOnly := true
397         tmpcount := 0
398         arvMountCmd := []string{
399                 "--foreground",
400                 "--allow-other",
401                 "--read-write",
402                 fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
403
404         if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
405                 arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
406         }
407
408         collectionPaths := []string{}
409         runner.Binds = nil
410         runner.Volumes = make(map[string]struct{})
411         needCertMount := true
412         type copyFile struct {
413                 src  string
414                 bind string
415         }
416         var copyFiles []copyFile
417
418         var binds []string
419         for bind := range runner.Container.Mounts {
420                 binds = append(binds, bind)
421         }
422         for bind := range runner.SecretMounts {
423                 if _, ok := runner.Container.Mounts[bind]; ok {
424                         return fmt.Errorf("Secret mount %q conflicts with regular mount", bind)
425                 }
426                 if runner.SecretMounts[bind].Kind != "json" &&
427                         runner.SecretMounts[bind].Kind != "text" {
428                         return fmt.Errorf("Secret mount %q type is %q but only 'json' and 'text' are permitted.",
429                                 bind, runner.SecretMounts[bind].Kind)
430                 }
431                 binds = append(binds, bind)
432         }
433         sort.Strings(binds)
434
435         for _, bind := range binds {
436                 mnt, ok := runner.Container.Mounts[bind]
437                 if !ok {
438                         mnt = runner.SecretMounts[bind]
439                 }
440                 if bind == "stdout" || bind == "stderr" {
441                         // Is it a "file" mount kind?
442                         if mnt.Kind != "file" {
443                                 return fmt.Errorf("Unsupported mount kind '%s' for %s. Only 'file' is supported.", mnt.Kind, bind)
444                         }
445
446                         // Does path start with OutputPath?
447                         prefix := runner.Container.OutputPath
448                         if !strings.HasSuffix(prefix, "/") {
449                                 prefix += "/"
450                         }
451                         if !strings.HasPrefix(mnt.Path, prefix) {
452                                 return fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix)
453                         }
454                 }
455
456                 if bind == "stdin" {
457                         // Is it a "collection" mount kind?
458                         if mnt.Kind != "collection" && mnt.Kind != "json" {
459                                 return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' or 'json' are supported.", mnt.Kind)
460                         }
461                 }
462
463                 if bind == "/etc/arvados/ca-certificates.crt" {
464                         needCertMount = false
465                 }
466
467                 if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && bind != runner.Container.OutputPath+"/" {
468                         if mnt.Kind != "collection" && mnt.Kind != "text" && mnt.Kind != "json" {
469                                 return fmt.Errorf("Only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path for %q, was %q", bind, mnt.Kind)
470                         }
471                 }
472
473                 switch {
474                 case mnt.Kind == "collection" && bind != "stdin":
475                         var src string
476                         if mnt.UUID != "" && mnt.PortableDataHash != "" {
477                                 return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
478                         }
479                         if mnt.UUID != "" {
480                                 if mnt.Writable {
481                                         return fmt.Errorf("Writing to existing collections currently not permitted.")
482                                 }
483                                 pdhOnly = false
484                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
485                         } else if mnt.PortableDataHash != "" {
486                                 if mnt.Writable && !strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
487                                         return fmt.Errorf("Can never write to a collection specified by portable data hash")
488                                 }
489                                 idx := strings.Index(mnt.PortableDataHash, "/")
490                                 if idx > 0 {
491                                         mnt.Path = path.Clean(mnt.PortableDataHash[idx:])
492                                         mnt.PortableDataHash = mnt.PortableDataHash[0:idx]
493                                         runner.Container.Mounts[bind] = mnt
494                                 }
495                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
496                                 if mnt.Path != "" && mnt.Path != "." {
497                                         if strings.HasPrefix(mnt.Path, "./") {
498                                                 mnt.Path = mnt.Path[2:]
499                                         } else if strings.HasPrefix(mnt.Path, "/") {
500                                                 mnt.Path = mnt.Path[1:]
501                                         }
502                                         src += "/" + mnt.Path
503                                 }
504                         } else {
505                                 src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
506                                 arvMountCmd = append(arvMountCmd, "--mount-tmp")
507                                 arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
508                                 tmpcount += 1
509                         }
510                         if mnt.Writable {
511                                 if bind == runner.Container.OutputPath {
512                                         runner.HostOutputDir = src
513                                         runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
514                                 } else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
515                                         copyFiles = append(copyFiles, copyFile{src, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
516                                 } else {
517                                         runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
518                                 }
519                         } else {
520                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
521                         }
522                         collectionPaths = append(collectionPaths, src)
523
524                 case mnt.Kind == "tmp":
525                         var tmpdir string
526                         tmpdir, err = runner.MkTempDir(runner.parentTemp, "tmp")
527                         if err != nil {
528                                 return fmt.Errorf("While creating mount temp dir: %v", err)
529                         }
530                         st, staterr := os.Stat(tmpdir)
531                         if staterr != nil {
532                                 return fmt.Errorf("While Stat on temp dir: %v", staterr)
533                         }
534                         err = os.Chmod(tmpdir, st.Mode()|os.ModeSetgid|0777)
535                         if staterr != nil {
536                                 return fmt.Errorf("While Chmod temp dir: %v", err)
537                         }
538                         runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind))
539                         if bind == runner.Container.OutputPath {
540                                 runner.HostOutputDir = tmpdir
541                         }
542
543                 case mnt.Kind == "json" || mnt.Kind == "text":
544                         var filedata []byte
545                         if mnt.Kind == "json" {
546                                 filedata, err = json.Marshal(mnt.Content)
547                                 if err != nil {
548                                         return fmt.Errorf("encoding json data: %v", err)
549                                 }
550                         } else {
551                                 text, ok := mnt.Content.(string)
552                                 if !ok {
553                                         return fmt.Errorf("content for mount %q must be a string", bind)
554                                 }
555                                 filedata = []byte(text)
556                         }
557
558                         tmpdir, err := runner.MkTempDir(runner.parentTemp, mnt.Kind)
559                         if err != nil {
560                                 return fmt.Errorf("creating temp dir: %v", err)
561                         }
562                         tmpfn := filepath.Join(tmpdir, "mountdata."+mnt.Kind)
563                         err = ioutil.WriteFile(tmpfn, filedata, 0444)
564                         if err != nil {
565                                 return fmt.Errorf("writing temp file: %v", err)
566                         }
567                         if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
568                                 copyFiles = append(copyFiles, copyFile{tmpfn, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
569                         } else {
570                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind))
571                         }
572
573                 case mnt.Kind == "git_tree":
574                         tmpdir, err := runner.MkTempDir(runner.parentTemp, "git_tree")
575                         if err != nil {
576                                 return fmt.Errorf("creating temp dir: %v", err)
577                         }
578                         err = gitMount(mnt).extractTree(runner.ArvClient, tmpdir, token)
579                         if err != nil {
580                                 return err
581                         }
582                         runner.Binds = append(runner.Binds, tmpdir+":"+bind+":ro")
583                 }
584         }
585
586         if runner.HostOutputDir == "" {
587                 return fmt.Errorf("Output path does not correspond to a writable mount point")
588         }
589
590         if wantAPI := runner.Container.RuntimeConstraints.API; needCertMount && wantAPI != nil && *wantAPI {
591                 for _, certfile := range arvadosclient.CertFiles {
592                         _, err := os.Stat(certfile)
593                         if err == nil {
594                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:/etc/arvados/ca-certificates.crt:ro", certfile))
595                                 break
596                         }
597                 }
598         }
599
600         if pdhOnly {
601                 arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
602         } else {
603                 arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
604         }
605         arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
606
607         runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
608         if err != nil {
609                 return fmt.Errorf("While trying to start arv-mount: %v", err)
610         }
611
612         for _, p := range collectionPaths {
613                 _, err = os.Stat(p)
614                 if err != nil {
615                         return fmt.Errorf("While checking that input files exist: %v", err)
616                 }
617         }
618
619         for _, cp := range copyFiles {
620                 st, err := os.Stat(cp.src)
621                 if err != nil {
622                         return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
623                 }
624                 if st.IsDir() {
625                         err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
626                                 if walkerr != nil {
627                                         return walkerr
628                                 }
629                                 target := path.Join(cp.bind, walkpath[len(cp.src):])
630                                 if walkinfo.Mode().IsRegular() {
631                                         copyerr := copyfile(walkpath, target)
632                                         if copyerr != nil {
633                                                 return copyerr
634                                         }
635                                         return os.Chmod(target, walkinfo.Mode()|0777)
636                                 } else if walkinfo.Mode().IsDir() {
637                                         mkerr := os.MkdirAll(target, 0777)
638                                         if mkerr != nil {
639                                                 return mkerr
640                                         }
641                                         return os.Chmod(target, walkinfo.Mode()|os.ModeSetgid|0777)
642                                 } else {
643                                         return fmt.Errorf("Source %q is not a regular file or directory", cp.src)
644                                 }
645                         })
646                 } else if st.Mode().IsRegular() {
647                         err = copyfile(cp.src, cp.bind)
648                         if err == nil {
649                                 err = os.Chmod(cp.bind, st.Mode()|0777)
650                         }
651                 }
652                 if err != nil {
653                         return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
654                 }
655         }
656
657         return nil
658 }
659
660 func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
661         // Handle docker log protocol
662         // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
663         defer close(runner.loggingDone)
664
665         header := make([]byte, 8)
666         var err error
667         for err == nil {
668                 _, err = io.ReadAtLeast(containerReader, header, 8)
669                 if err != nil {
670                         if err == io.EOF {
671                                 err = nil
672                         }
673                         break
674                 }
675                 readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
676                 if header[0] == 1 {
677                         // stdout
678                         _, err = io.CopyN(runner.Stdout, containerReader, readsize)
679                 } else {
680                         // stderr
681                         _, err = io.CopyN(runner.Stderr, containerReader, readsize)
682                 }
683         }
684
685         if err != nil {
686                 runner.CrunchLog.Printf("error reading docker logs: %v", err)
687         }
688
689         err = runner.Stdout.Close()
690         if err != nil {
691                 runner.CrunchLog.Printf("error closing stdout logs: %v", err)
692         }
693
694         err = runner.Stderr.Close()
695         if err != nil {
696                 runner.CrunchLog.Printf("error closing stderr logs: %v", err)
697         }
698
699         if runner.statReporter != nil {
700                 runner.statReporter.Stop()
701                 err = runner.statLogger.Close()
702                 if err != nil {
703                         runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
704                 }
705         }
706 }
707
708 func (runner *ContainerRunner) stopHoststat() error {
709         if runner.hoststatReporter == nil {
710                 return nil
711         }
712         runner.hoststatReporter.Stop()
713         err := runner.hoststatLogger.Close()
714         if err != nil {
715                 return fmt.Errorf("error closing hoststat logs: %v", err)
716         }
717         return nil
718 }
719
720 func (runner *ContainerRunner) startHoststat() error {
721         w, err := runner.NewLogWriter("hoststat")
722         if err != nil {
723                 return err
724         }
725         runner.hoststatLogger = NewThrottledLogger(w)
726         runner.hoststatReporter = &crunchstat.Reporter{
727                 Logger:     log.New(runner.hoststatLogger, "", 0),
728                 CgroupRoot: runner.cgroupRoot,
729                 PollPeriod: runner.statInterval,
730         }
731         runner.hoststatReporter.Start()
732         return nil
733 }
734
735 func (runner *ContainerRunner) startCrunchstat() error {
736         w, err := runner.NewLogWriter("crunchstat")
737         if err != nil {
738                 return err
739         }
740         runner.statLogger = NewThrottledLogger(w)
741         runner.statReporter = &crunchstat.Reporter{
742                 CID:          runner.ContainerID,
743                 Logger:       log.New(runner.statLogger, "", 0),
744                 CgroupParent: runner.expectCgroupParent,
745                 CgroupRoot:   runner.cgroupRoot,
746                 PollPeriod:   runner.statInterval,
747                 TempDir:      runner.parentTemp,
748         }
749         runner.statReporter.Start()
750         return nil
751 }
752
753 type infoCommand struct {
754         label string
755         cmd   []string
756 }
757
758 // LogHostInfo logs info about the current host, for debugging and
759 // accounting purposes. Although it's logged as "node-info", this is
760 // about the environment where crunch-run is actually running, which
761 // might differ from what's described in the node record (see
762 // LogNodeRecord).
763 func (runner *ContainerRunner) LogHostInfo() (err error) {
764         w, err := runner.NewLogWriter("node-info")
765         if err != nil {
766                 return
767         }
768
769         commands := []infoCommand{
770                 {
771                         label: "Host Information",
772                         cmd:   []string{"uname", "-a"},
773                 },
774                 {
775                         label: "CPU Information",
776                         cmd:   []string{"cat", "/proc/cpuinfo"},
777                 },
778                 {
779                         label: "Memory Information",
780                         cmd:   []string{"cat", "/proc/meminfo"},
781                 },
782                 {
783                         label: "Disk Space",
784                         cmd:   []string{"df", "-m", "/", os.TempDir()},
785                 },
786                 {
787                         label: "Disk INodes",
788                         cmd:   []string{"df", "-i", "/", os.TempDir()},
789                 },
790         }
791
792         // Run commands with informational output to be logged.
793         for _, command := range commands {
794                 fmt.Fprintln(w, command.label)
795                 cmd := exec.Command(command.cmd[0], command.cmd[1:]...)
796                 cmd.Stdout = w
797                 cmd.Stderr = w
798                 if err := cmd.Run(); err != nil {
799                         err = fmt.Errorf("While running command %q: %v", command.cmd, err)
800                         fmt.Fprintln(w, err)
801                         return err
802                 }
803                 fmt.Fprintln(w, "")
804         }
805
806         err = w.Close()
807         if err != nil {
808                 return fmt.Errorf("While closing node-info logs: %v", err)
809         }
810         return nil
811 }
812
813 // LogContainerRecord gets and saves the raw JSON container record from the API server
814 func (runner *ContainerRunner) LogContainerRecord() error {
815         logged, err := runner.logAPIResponse("container", "containers", map[string]interface{}{"filters": [][]string{{"uuid", "=", runner.Container.UUID}}}, nil)
816         if !logged && err == nil {
817                 err = fmt.Errorf("error: no container record found for %s", runner.Container.UUID)
818         }
819         return err
820 }
821
822 // LogNodeRecord logs arvados#node record corresponding to the current host.
823 func (runner *ContainerRunner) LogNodeRecord() error {
824         hostname := os.Getenv("SLURMD_NODENAME")
825         if hostname == "" {
826                 hostname, _ = os.Hostname()
827         }
828         _, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) {
829                 // The "info" field has admin-only info when obtained
830                 // with a privileged token, and should not be logged.
831                 node, ok := resp.(map[string]interface{})
832                 if ok {
833                         delete(node, "info")
834                 }
835         })
836         return err
837 }
838
839 func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}, munge func(interface{})) (logged bool, err error) {
840         writer, err := runner.LogCollection.OpenFile(label+".json", os.O_CREATE|os.O_WRONLY, 0666)
841         if err != nil {
842                 return false, err
843         }
844         w := &ArvLogWriter{
845                 ArvClient:     runner.ArvClient,
846                 UUID:          runner.Container.UUID,
847                 loggingStream: label,
848                 writeCloser:   writer,
849         }
850
851         reader, err := runner.ArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params))
852         if err != nil {
853                 return false, fmt.Errorf("error getting %s record: %v", label, err)
854         }
855         defer reader.Close()
856
857         dec := json.NewDecoder(reader)
858         dec.UseNumber()
859         var resp map[string]interface{}
860         if err = dec.Decode(&resp); err != nil {
861                 return false, fmt.Errorf("error decoding %s list response: %v", label, err)
862         }
863         items, ok := resp["items"].([]interface{})
864         if !ok {
865                 return false, fmt.Errorf("error decoding %s list response: no \"items\" key in API list response", label)
866         } else if len(items) < 1 {
867                 return false, nil
868         }
869         if munge != nil {
870                 munge(items[0])
871         }
872         // Re-encode it using indentation to improve readability
873         enc := json.NewEncoder(w)
874         enc.SetIndent("", "    ")
875         if err = enc.Encode(items[0]); err != nil {
876                 return false, fmt.Errorf("error logging %s record: %v", label, err)
877         }
878         err = w.Close()
879         if err != nil {
880                 return false, fmt.Errorf("error closing %s.json in log collection: %v", label, err)
881         }
882         return true, nil
883 }
884
885 // AttachStreams connects the docker container stdin, stdout and stderr logs
886 // to the Arvados logger which logs to Keep and the API server logs table.
887 func (runner *ContainerRunner) AttachStreams() (err error) {
888
889         runner.CrunchLog.Print("Attaching container streams")
890
891         // If stdin mount is provided, attach it to the docker container
892         var stdinRdr arvados.File
893         var stdinJson []byte
894         if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
895                 if stdinMnt.Kind == "collection" {
896                         var stdinColl arvados.Collection
897                         collId := stdinMnt.UUID
898                         if collId == "" {
899                                 collId = stdinMnt.PortableDataHash
900                         }
901                         err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
902                         if err != nil {
903                                 return fmt.Errorf("While getting stding collection: %v", err)
904                         }
905
906                         stdinRdr, err = runner.Kc.ManifestFileReader(manifest.Manifest{Text: stdinColl.ManifestText}, stdinMnt.Path)
907                         if os.IsNotExist(err) {
908                                 return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
909                         } else if err != nil {
910                                 return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
911                         }
912                 } else if stdinMnt.Kind == "json" {
913                         stdinJson, err = json.Marshal(stdinMnt.Content)
914                         if err != nil {
915                                 return fmt.Errorf("While encoding stdin json data: %v", err)
916                         }
917                 }
918         }
919
920         stdinUsed := stdinRdr != nil || len(stdinJson) != 0
921         response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
922                 dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
923         if err != nil {
924                 return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
925         }
926
927         runner.loggingDone = make(chan bool)
928
929         if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
930                 stdoutFile, err := runner.getStdoutFile(stdoutMnt.Path)
931                 if err != nil {
932                         return err
933                 }
934                 runner.Stdout = stdoutFile
935         } else if w, err := runner.NewLogWriter("stdout"); err != nil {
936                 return err
937         } else {
938                 runner.Stdout = NewThrottledLogger(w)
939         }
940
941         if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok {
942                 stderrFile, err := runner.getStdoutFile(stderrMnt.Path)
943                 if err != nil {
944                         return err
945                 }
946                 runner.Stderr = stderrFile
947         } else if w, err := runner.NewLogWriter("stderr"); err != nil {
948                 return err
949         } else {
950                 runner.Stderr = NewThrottledLogger(w)
951         }
952
953         if stdinRdr != nil {
954                 go func() {
955                         _, err := io.Copy(response.Conn, stdinRdr)
956                         if err != nil {
957                                 runner.CrunchLog.Print("While writing stdin collection to docker container %q", err)
958                                 runner.stop(nil)
959                         }
960                         stdinRdr.Close()
961                         response.CloseWrite()
962                 }()
963         } else if len(stdinJson) != 0 {
964                 go func() {
965                         _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
966                         if err != nil {
967                                 runner.CrunchLog.Print("While writing stdin json to docker container %q", err)
968                                 runner.stop(nil)
969                         }
970                         response.CloseWrite()
971                 }()
972         }
973
974         go runner.ProcessDockerAttach(response.Reader)
975
976         return nil
977 }
978
979 func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
980         stdoutPath := mntPath[len(runner.Container.OutputPath):]
981         index := strings.LastIndex(stdoutPath, "/")
982         if index > 0 {
983                 subdirs := stdoutPath[:index]
984                 if subdirs != "" {
985                         st, err := os.Stat(runner.HostOutputDir)
986                         if err != nil {
987                                 return nil, fmt.Errorf("While Stat on temp dir: %v", err)
988                         }
989                         stdoutPath := filepath.Join(runner.HostOutputDir, subdirs)
990                         err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
991                         if err != nil {
992                                 return nil, fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
993                         }
994                 }
995         }
996         stdoutFile, err := os.Create(filepath.Join(runner.HostOutputDir, stdoutPath))
997         if err != nil {
998                 return nil, fmt.Errorf("While creating file %q: %v", stdoutPath, err)
999         }
1000
1001         return stdoutFile, nil
1002 }
1003
1004 // CreateContainer creates the docker container.
1005 func (runner *ContainerRunner) CreateContainer() error {
1006         runner.CrunchLog.Print("Creating Docker container")
1007
1008         runner.ContainerConfig.Cmd = runner.Container.Command
1009         if runner.Container.Cwd != "." {
1010                 runner.ContainerConfig.WorkingDir = runner.Container.Cwd
1011         }
1012
1013         for k, v := range runner.Container.Environment {
1014                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
1015         }
1016
1017         runner.ContainerConfig.Volumes = runner.Volumes
1018
1019         maxRAM := int64(runner.Container.RuntimeConstraints.RAM)
1020         if maxRAM < 4*1024*1024 {
1021                 // Docker daemon won't let you set a limit less than 4 MiB
1022                 maxRAM = 4 * 1024 * 1024
1023         }
1024         runner.HostConfig = dockercontainer.HostConfig{
1025                 Binds: runner.Binds,
1026                 LogConfig: dockercontainer.LogConfig{
1027                         Type: "none",
1028                 },
1029                 Resources: dockercontainer.Resources{
1030                         CgroupParent: runner.setCgroupParent,
1031                         NanoCPUs:     int64(runner.Container.RuntimeConstraints.VCPUs) * 1000000000,
1032                         Memory:       maxRAM, // RAM
1033                         MemorySwap:   maxRAM, // RAM+swap
1034                         KernelMemory: maxRAM, // kernel portion
1035                 },
1036         }
1037
1038         if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
1039                 tok, err := runner.ContainerToken()
1040                 if err != nil {
1041                         return err
1042                 }
1043                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env,
1044                         "ARVADOS_API_TOKEN="+tok,
1045                         "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
1046                         "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
1047                 )
1048                 runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
1049         } else {
1050                 if runner.enableNetwork == "always" {
1051                         runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
1052                 } else {
1053                         runner.HostConfig.NetworkMode = dockercontainer.NetworkMode("none")
1054                 }
1055         }
1056
1057         _, stdinUsed := runner.Container.Mounts["stdin"]
1058         runner.ContainerConfig.OpenStdin = stdinUsed
1059         runner.ContainerConfig.StdinOnce = stdinUsed
1060         runner.ContainerConfig.AttachStdin = stdinUsed
1061         runner.ContainerConfig.AttachStdout = true
1062         runner.ContainerConfig.AttachStderr = true
1063
1064         createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
1065         if err != nil {
1066                 return fmt.Errorf("While creating container: %v", err)
1067         }
1068
1069         runner.ContainerID = createdBody.ID
1070
1071         return runner.AttachStreams()
1072 }
1073
1074 // StartContainer starts the docker container created by CreateContainer.
1075 func (runner *ContainerRunner) StartContainer() error {
1076         runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
1077         runner.cStateLock.Lock()
1078         defer runner.cStateLock.Unlock()
1079         if runner.cCancelled {
1080                 return ErrCancelled
1081         }
1082         err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID,
1083                 dockertypes.ContainerStartOptions{})
1084         if err != nil {
1085                 var advice string
1086                 if m, e := regexp.MatchString("(?ms).*(exec|System error).*(no such file or directory|file not found).*", err.Error()); m && e == nil {
1087                         advice = fmt.Sprintf("\nPossible causes: command %q is missing, the interpreter given in #! is missing, or script has Windows line endings.", runner.Container.Command[0])
1088                 }
1089                 return fmt.Errorf("could not start container: %v%s", err, advice)
1090         }
1091         return nil
1092 }
1093
1094 // checkContainerd checks if "containerd" is present in the process list.
1095 func (runner *ContainerRunner) CheckContainerd() error {
1096         if runner.checkContainerd == 0 {
1097                 return nil
1098         }
1099         p, _ := runner.ListProcesses()
1100         for _, i := range p {
1101                 e, _ := i.CmdlineSlice()
1102                 if len(e) > 0 {
1103                         if strings.Index(e[0], "containerd") > -1 {
1104                                 return nil
1105                         }
1106                 }
1107         }
1108
1109         // Not found
1110         runner.runBrokenNodeHook()
1111         runner.stop(nil)
1112         return fmt.Errorf("'containerd' not found in process list.")
1113 }
1114
1115 // WaitFinish waits for the container to terminate, capture the exit code, and
1116 // close the stdout/stderr logging.
1117 func (runner *ContainerRunner) WaitFinish() error {
1118         var runTimeExceeded <-chan time.Time
1119         runner.CrunchLog.Print("Waiting for container to finish")
1120
1121         waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
1122         arvMountExit := runner.ArvMountExit
1123         if timeout := runner.Container.SchedulingParameters.MaxRunTime; timeout > 0 {
1124                 runTimeExceeded = time.After(time.Duration(timeout) * time.Second)
1125         }
1126
1127         containerdGone := make(chan error)
1128         defer close(containerdGone)
1129         if runner.checkContainerd > 0 {
1130                 go func() {
1131                         ticker := time.NewTicker(time.Duration(runner.checkContainerd))
1132                         defer ticker.Stop()
1133                         for {
1134                                 select {
1135                                 case <-ticker.C:
1136                                         if ck := runner.CheckContainerd(); ck != nil {
1137                                                 containerdGone <- ck
1138                                                 return
1139                                         }
1140                                 case <-containerdGone:
1141                                         // Channel closed, quit goroutine
1142                                         return
1143                                 }
1144                         }
1145                 }()
1146         }
1147
1148         for {
1149                 select {
1150                 case waitBody := <-waitOk:
1151                         runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
1152                         code := int(waitBody.StatusCode)
1153                         runner.ExitCode = &code
1154
1155                         // wait for stdout/stderr to complete
1156                         <-runner.loggingDone
1157                         return nil
1158
1159                 case err := <-waitErr:
1160                         return fmt.Errorf("container wait: %v", err)
1161
1162                 case <-arvMountExit:
1163                         runner.CrunchLog.Printf("arv-mount exited while container is still running.  Stopping container.")
1164                         runner.stop(nil)
1165                         // arvMountExit will always be ready now that
1166                         // it's closed, but that doesn't interest us.
1167                         arvMountExit = nil
1168
1169                 case <-runTimeExceeded:
1170                         runner.CrunchLog.Printf("maximum run time exceeded. Stopping container.")
1171                         runner.stop(nil)
1172                         runTimeExceeded = nil
1173
1174                 case err := <-containerdGone:
1175                         return err
1176                 }
1177         }
1178 }
1179
1180 func (runner *ContainerRunner) updateLogs() {
1181         ticker := time.NewTicker(crunchLogUpdatePeriod / 360)
1182         defer ticker.Stop()
1183
1184         sigusr1 := make(chan os.Signal, 1)
1185         signal.Notify(sigusr1, syscall.SIGUSR1)
1186         defer signal.Stop(sigusr1)
1187
1188         saveAtTime := time.Now().Add(crunchLogUpdatePeriod)
1189         saveAtSize := crunchLogUpdateSize
1190         var savedSize int64
1191         for {
1192                 select {
1193                 case <-ticker.C:
1194                 case <-sigusr1:
1195                         saveAtTime = time.Now()
1196                 }
1197                 runner.logMtx.Lock()
1198                 done := runner.LogsPDH != nil
1199                 runner.logMtx.Unlock()
1200                 if done {
1201                         return
1202                 }
1203                 size := runner.LogCollection.Size()
1204                 if size == savedSize || (time.Now().Before(saveAtTime) && size < saveAtSize) {
1205                         continue
1206                 }
1207                 saveAtTime = time.Now().Add(crunchLogUpdatePeriod)
1208                 saveAtSize = runner.LogCollection.Size() + crunchLogUpdateSize
1209                 saved, err := runner.saveLogCollection(false)
1210                 if err != nil {
1211                         runner.CrunchLog.Printf("error updating log collection: %s", err)
1212                         continue
1213                 }
1214
1215                 var updated arvados.Container
1216                 err = runner.ArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
1217                         "container": arvadosclient.Dict{"log": saved.PortableDataHash},
1218                 }, &updated)
1219                 if err != nil {
1220                         runner.CrunchLog.Printf("error updating container log to %s: %s", saved.PortableDataHash, err)
1221                         continue
1222                 }
1223
1224                 savedSize = size
1225         }
1226 }
1227
1228 // CaptureOutput saves data from the container's output directory if
1229 // needed, and updates the container output accordingly.
1230 func (runner *ContainerRunner) CaptureOutput() error {
1231         if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
1232                 // Output may have been set directly by the container, so
1233                 // refresh the container record to check.
1234                 err := runner.ArvClient.Get("containers", runner.Container.UUID,
1235                         nil, &runner.Container)
1236                 if err != nil {
1237                         return err
1238                 }
1239                 if runner.Container.Output != "" {
1240                         // Container output is already set.
1241                         runner.OutputPDH = &runner.Container.Output
1242                         return nil
1243                 }
1244         }
1245
1246         txt, err := (&copier{
1247                 client:        runner.client,
1248                 arvClient:     runner.ArvClient,
1249                 keepClient:    runner.Kc,
1250                 hostOutputDir: runner.HostOutputDir,
1251                 ctrOutputDir:  runner.Container.OutputPath,
1252                 binds:         runner.Binds,
1253                 mounts:        runner.Container.Mounts,
1254                 secretMounts:  runner.SecretMounts,
1255                 logger:        runner.CrunchLog,
1256         }).Copy()
1257         if err != nil {
1258                 return err
1259         }
1260         var resp arvados.Collection
1261         err = runner.ArvClient.Create("collections", arvadosclient.Dict{
1262                 "ensure_unique_name": true,
1263                 "collection": arvadosclient.Dict{
1264                         "is_trashed":    true,
1265                         "name":          "output for " + runner.Container.UUID,
1266                         "manifest_text": txt,
1267                 },
1268         }, &resp)
1269         if err != nil {
1270                 return fmt.Errorf("error creating output collection: %v", err)
1271         }
1272         runner.OutputPDH = &resp.PortableDataHash
1273         return nil
1274 }
1275
1276 func (runner *ContainerRunner) CleanupDirs() {
1277         if runner.ArvMount != nil {
1278                 var delay int64 = 8
1279                 umount := exec.Command("arv-mount", fmt.Sprintf("--unmount-timeout=%d", delay), "--unmount", runner.ArvMountPoint)
1280                 umount.Stdout = runner.CrunchLog
1281                 umount.Stderr = runner.CrunchLog
1282                 runner.CrunchLog.Printf("Running %v", umount.Args)
1283                 umnterr := umount.Start()
1284
1285                 if umnterr != nil {
1286                         runner.CrunchLog.Printf("Error unmounting: %v", umnterr)
1287                 } else {
1288                         // If arv-mount --unmount gets stuck for any reason, we
1289                         // don't want to wait for it forever.  Do Wait() in a goroutine
1290                         // so it doesn't block crunch-run.
1291                         umountExit := make(chan error)
1292                         go func() {
1293                                 mnterr := umount.Wait()
1294                                 if mnterr != nil {
1295                                         runner.CrunchLog.Printf("Error unmounting: %v", mnterr)
1296                                 }
1297                                 umountExit <- mnterr
1298                         }()
1299
1300                         for again := true; again; {
1301                                 again = false
1302                                 select {
1303                                 case <-umountExit:
1304                                         umount = nil
1305                                         again = true
1306                                 case <-runner.ArvMountExit:
1307                                         break
1308                                 case <-time.After(time.Duration((delay + 1) * int64(time.Second))):
1309                                         runner.CrunchLog.Printf("Timed out waiting for unmount")
1310                                         if umount != nil {
1311                                                 umount.Process.Kill()
1312                                         }
1313                                         runner.ArvMount.Process.Kill()
1314                                 }
1315                         }
1316                 }
1317         }
1318
1319         if runner.ArvMountPoint != "" {
1320                 if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil {
1321                         runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr)
1322                 }
1323         }
1324
1325         if rmerr := os.RemoveAll(runner.parentTemp); rmerr != nil {
1326                 runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", runner.parentTemp, rmerr)
1327         }
1328 }
1329
1330 // CommitLogs posts the collection containing the final container logs.
1331 func (runner *ContainerRunner) CommitLogs() error {
1332         func() {
1333                 // Hold cStateLock to prevent races on CrunchLog (e.g., stop()).
1334                 runner.cStateLock.Lock()
1335                 defer runner.cStateLock.Unlock()
1336
1337                 runner.CrunchLog.Print(runner.finalState)
1338
1339                 if runner.arvMountLog != nil {
1340                         runner.arvMountLog.Close()
1341                 }
1342                 runner.CrunchLog.Close()
1343
1344                 // Closing CrunchLog above allows them to be committed to Keep at this
1345                 // point, but re-open crunch log with ArvClient in case there are any
1346                 // other further errors (such as failing to write the log to Keep!)
1347                 // while shutting down
1348                 runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{
1349                         ArvClient:     runner.ArvClient,
1350                         UUID:          runner.Container.UUID,
1351                         loggingStream: "crunch-run",
1352                         writeCloser:   nil,
1353                 })
1354                 runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
1355         }()
1356
1357         if runner.LogsPDH != nil {
1358                 // If we have already assigned something to LogsPDH,
1359                 // we must be closing the re-opened log, which won't
1360                 // end up getting attached to the container record and
1361                 // therefore doesn't need to be saved as a collection
1362                 // -- it exists only to send logs to other channels.
1363                 return nil
1364         }
1365         saved, err := runner.saveLogCollection(true)
1366         if err != nil {
1367                 return fmt.Errorf("error saving log collection: %s", err)
1368         }
1369         runner.logMtx.Lock()
1370         defer runner.logMtx.Unlock()
1371         runner.LogsPDH = &saved.PortableDataHash
1372         return nil
1373 }
1374
1375 func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.Collection, err error) {
1376         runner.logMtx.Lock()
1377         defer runner.logMtx.Unlock()
1378         if runner.LogsPDH != nil {
1379                 // Already finalized.
1380                 return
1381         }
1382         mt, err := runner.LogCollection.MarshalManifest(".")
1383         if err != nil {
1384                 err = fmt.Errorf("error creating log manifest: %v", err)
1385                 return
1386         }
1387         updates := arvadosclient.Dict{
1388                 "name":          "logs for " + runner.Container.UUID,
1389                 "manifest_text": mt,
1390         }
1391         if final {
1392                 updates["is_trashed"] = true
1393         } else {
1394                 exp := time.Now().Add(crunchLogUpdatePeriod * 24)
1395                 updates["trash_at"] = exp
1396                 updates["delete_at"] = exp
1397         }
1398         reqBody := arvadosclient.Dict{"collection": updates}
1399         if runner.logUUID == "" {
1400                 reqBody["ensure_unique_name"] = true
1401                 err = runner.ArvClient.Create("collections", reqBody, &response)
1402         } else {
1403                 err = runner.ArvClient.Update("collections", runner.logUUID, reqBody, &response)
1404         }
1405         if err != nil {
1406                 return
1407         }
1408         runner.logUUID = response.UUID
1409         return
1410 }
1411
1412 // UpdateContainerRunning updates the container state to "Running"
1413 func (runner *ContainerRunner) UpdateContainerRunning() error {
1414         runner.cStateLock.Lock()
1415         defer runner.cStateLock.Unlock()
1416         if runner.cCancelled {
1417                 return ErrCancelled
1418         }
1419         return runner.ArvClient.Update("containers", runner.Container.UUID,
1420                 arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
1421 }
1422
1423 // ContainerToken returns the api_token the container (and any
1424 // arv-mount processes) are allowed to use.
1425 func (runner *ContainerRunner) ContainerToken() (string, error) {
1426         if runner.token != "" {
1427                 return runner.token, nil
1428         }
1429
1430         var auth arvados.APIClientAuthorization
1431         err := runner.ArvClient.Call("GET", "containers", runner.Container.UUID, "auth", nil, &auth)
1432         if err != nil {
1433                 return "", err
1434         }
1435         runner.token = auth.APIToken
1436         return runner.token, nil
1437 }
1438
1439 // UpdateContainerComplete updates the container record state on API
1440 // server to "Complete" or "Cancelled"
1441 func (runner *ContainerRunner) UpdateContainerFinal() error {
1442         update := arvadosclient.Dict{}
1443         update["state"] = runner.finalState
1444         if runner.LogsPDH != nil {
1445                 update["log"] = *runner.LogsPDH
1446         }
1447         if runner.finalState == "Complete" {
1448                 if runner.ExitCode != nil {
1449                         update["exit_code"] = *runner.ExitCode
1450                 }
1451                 if runner.OutputPDH != nil {
1452                         update["output"] = *runner.OutputPDH
1453                 }
1454         }
1455         return runner.ArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
1456 }
1457
1458 // IsCancelled returns the value of Cancelled, with goroutine safety.
1459 func (runner *ContainerRunner) IsCancelled() bool {
1460         runner.cStateLock.Lock()
1461         defer runner.cStateLock.Unlock()
1462         return runner.cCancelled
1463 }
1464
1465 // NewArvLogWriter creates an ArvLogWriter
1466 func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, error) {
1467         writer, err := runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666)
1468         if err != nil {
1469                 return nil, err
1470         }
1471         return &ArvLogWriter{
1472                 ArvClient:     runner.ArvClient,
1473                 UUID:          runner.Container.UUID,
1474                 loggingStream: name,
1475                 writeCloser:   writer,
1476         }, nil
1477 }
1478
1479 // Run the full container lifecycle.
1480 func (runner *ContainerRunner) Run() (err error) {
1481         runner.CrunchLog.Printf("crunch-run %s started", version)
1482         runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
1483
1484         hostname, hosterr := os.Hostname()
1485         if hosterr != nil {
1486                 runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr)
1487         } else {
1488                 runner.CrunchLog.Printf("Executing on host '%s'", hostname)
1489         }
1490
1491         runner.finalState = "Queued"
1492
1493         defer func() {
1494                 runner.CleanupDirs()
1495
1496                 runner.CrunchLog.Printf("crunch-run finished")
1497                 runner.CrunchLog.Close()
1498         }()
1499
1500         defer func() {
1501                 // checkErr prints e (unless it's nil) and sets err to
1502                 // e (unless err is already non-nil). Thus, if err
1503                 // hasn't already been assigned when Run() returns,
1504                 // this cleanup func will cause Run() to return the
1505                 // first non-nil error that is passed to checkErr().
1506                 checkErr := func(errorIn string, e error) {
1507                         if e == nil {
1508                                 return
1509                         }
1510                         runner.CrunchLog.Printf("error in %s: %v", errorIn, e)
1511                         if err == nil {
1512                                 err = e
1513                         }
1514                         if runner.finalState == "Complete" {
1515                                 // There was an error in the finalization.
1516                                 runner.finalState = "Cancelled"
1517                         }
1518                 }
1519
1520                 // Log the error encountered in Run(), if any
1521                 checkErr("Run", err)
1522
1523                 if runner.finalState == "Queued" {
1524                         runner.UpdateContainerFinal()
1525                         return
1526                 }
1527
1528                 if runner.IsCancelled() {
1529                         runner.finalState = "Cancelled"
1530                         // but don't return yet -- we still want to
1531                         // capture partial output and write logs
1532                 }
1533
1534                 checkErr("CaptureOutput", runner.CaptureOutput())
1535                 checkErr("stopHoststat", runner.stopHoststat())
1536                 checkErr("CommitLogs", runner.CommitLogs())
1537                 checkErr("UpdateContainerFinal", runner.UpdateContainerFinal())
1538         }()
1539
1540         err = runner.fetchContainerRecord()
1541         if err != nil {
1542                 return
1543         }
1544         runner.setupSignals()
1545         err = runner.startHoststat()
1546         if err != nil {
1547                 return
1548         }
1549
1550         // Sanity check that containerd is running.
1551         err = runner.CheckContainerd()
1552         if err != nil {
1553                 return
1554         }
1555
1556         // check for and/or load image
1557         err = runner.LoadImage()
1558         if err != nil {
1559                 if !runner.checkBrokenNode(err) {
1560                         // Failed to load image but not due to a "broken node"
1561                         // condition, probably user error.
1562                         runner.finalState = "Cancelled"
1563                 }
1564                 err = fmt.Errorf("While loading container image: %v", err)
1565                 return
1566         }
1567
1568         // set up FUSE mount and binds
1569         err = runner.SetupMounts()
1570         if err != nil {
1571                 runner.finalState = "Cancelled"
1572                 err = fmt.Errorf("While setting up mounts: %v", err)
1573                 return
1574         }
1575
1576         err = runner.CreateContainer()
1577         if err != nil {
1578                 return
1579         }
1580         err = runner.LogHostInfo()
1581         if err != nil {
1582                 return
1583         }
1584         err = runner.LogNodeRecord()
1585         if err != nil {
1586                 return
1587         }
1588         err = runner.LogContainerRecord()
1589         if err != nil {
1590                 return
1591         }
1592
1593         if runner.IsCancelled() {
1594                 return
1595         }
1596
1597         err = runner.UpdateContainerRunning()
1598         if err != nil {
1599                 return
1600         }
1601         runner.finalState = "Cancelled"
1602
1603         err = runner.startCrunchstat()
1604         if err != nil {
1605                 return
1606         }
1607
1608         err = runner.StartContainer()
1609         if err != nil {
1610                 runner.checkBrokenNode(err)
1611                 return
1612         }
1613
1614         err = runner.WaitFinish()
1615         if err == nil && !runner.IsCancelled() {
1616                 runner.finalState = "Complete"
1617         }
1618         return
1619 }
1620
1621 // Fetch the current container record (uuid = runner.Container.UUID)
1622 // into runner.Container.
1623 func (runner *ContainerRunner) fetchContainerRecord() error {
1624         reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
1625         if err != nil {
1626                 return fmt.Errorf("error fetching container record: %v", err)
1627         }
1628         defer reader.Close()
1629
1630         dec := json.NewDecoder(reader)
1631         dec.UseNumber()
1632         err = dec.Decode(&runner.Container)
1633         if err != nil {
1634                 return fmt.Errorf("error decoding container record: %v", err)
1635         }
1636
1637         var sm struct {
1638                 SecretMounts map[string]arvados.Mount `json:"secret_mounts"`
1639         }
1640
1641         containerToken, err := runner.ContainerToken()
1642         if err != nil {
1643                 return fmt.Errorf("error getting container token: %v", err)
1644         }
1645
1646         containerClient, err := runner.MkArvClient(containerToken)
1647         if err != nil {
1648                 return fmt.Errorf("error creating container API client: %v", err)
1649         }
1650
1651         err = containerClient.Call("GET", "containers", runner.Container.UUID, "secret_mounts", nil, &sm)
1652         if err != nil {
1653                 if apierr, ok := err.(arvadosclient.APIServerError); !ok || apierr.HttpStatusCode != 404 {
1654                         return fmt.Errorf("error fetching secret_mounts: %v", err)
1655                 }
1656                 // ok && apierr.HttpStatusCode == 404, which means
1657                 // secret_mounts isn't supported by this API server.
1658         }
1659         runner.SecretMounts = sm.SecretMounts
1660
1661         return nil
1662 }
1663
1664 // NewContainerRunner creates a new container runner.
1665 func NewContainerRunner(client *arvados.Client, api IArvadosClient, kc IKeepClient, docker ThinDockerClient, containerUUID string) (*ContainerRunner, error) {
1666         cr := &ContainerRunner{
1667                 client:    client,
1668                 ArvClient: api,
1669                 Kc:        kc,
1670                 Docker:    docker,
1671         }
1672         cr.NewLogWriter = cr.NewArvLogWriter
1673         cr.RunArvMount = cr.ArvMountCmd
1674         cr.MkTempDir = ioutil.TempDir
1675         cr.ListProcesses = func() ([]PsProcess, error) {
1676                 pr, err := process.Processes()
1677                 if err != nil {
1678                         return nil, err
1679                 }
1680                 ps := make([]PsProcess, len(pr))
1681                 for i, j := range pr {
1682                         ps[i] = j
1683                 }
1684                 return ps, nil
1685         }
1686         cr.MkArvClient = func(token string) (IArvadosClient, error) {
1687                 cl, err := arvadosclient.MakeArvadosClient()
1688                 if err != nil {
1689                         return nil, err
1690                 }
1691                 cl.ApiToken = token
1692                 return cl, nil
1693         }
1694         var err error
1695         cr.LogCollection, err = (&arvados.Collection{}).FileSystem(cr.client, cr.Kc)
1696         if err != nil {
1697                 return nil, err
1698         }
1699         cr.Container.UUID = containerUUID
1700         w, err := cr.NewLogWriter("crunch-run")
1701         if err != nil {
1702                 return nil, err
1703         }
1704         cr.CrunchLog = NewThrottledLogger(w)
1705         cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
1706
1707         loadLogThrottleParams(api)
1708         go cr.updateLogs()
1709
1710         return cr, nil
1711 }
1712
1713 func main() {
1714         statInterval := flag.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
1715         cgroupRoot := flag.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
1716         cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
1717         cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
1718         caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
1719         enableNetwork := flag.String("container-enable-networking", "default",
1720                 `Specify if networking should be enabled for container.  One of 'default', 'always':
1721         default: only enable networking if container requests it.
1722         always:  containers always have networking enabled
1723         `)
1724         networkMode := flag.String("container-network-mode", "default",
1725                 `Set networking mode for container.  Corresponds to Docker network mode (--net).
1726         `)
1727         memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
1728         getVersion := flag.Bool("version", false, "Print version information and exit.")
1729         checkContainerd := flag.Duration("check-containerd", 60*time.Second, "Periodic check if (docker-)containerd is running (use 0s to disable).")
1730         flag.Parse()
1731
1732         // Print version information if requested
1733         if *getVersion {
1734                 fmt.Printf("crunch-run %s\n", version)
1735                 return
1736         }
1737
1738         log.Printf("crunch-run %s started", version)
1739
1740         containerId := flag.Arg(0)
1741
1742         if *caCertsPath != "" {
1743                 arvadosclient.CertFiles = []string{*caCertsPath}
1744         }
1745
1746         api, err := arvadosclient.MakeArvadosClient()
1747         if err != nil {
1748                 log.Fatalf("%s: %v", containerId, err)
1749         }
1750         api.Retries = 8
1751
1752         kc, kcerr := keepclient.MakeKeepClient(api)
1753         if kcerr != nil {
1754                 log.Fatalf("%s: %v", containerId, kcerr)
1755         }
1756         kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
1757         kc.Retries = 4
1758
1759         // API version 1.21 corresponds to Docker 1.9, which is currently the
1760         // minimum version we want to support.
1761         docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
1762
1763         cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerId)
1764         if err != nil {
1765                 log.Fatal(err)
1766         }
1767         if dockererr != nil {
1768                 cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
1769                 cr.checkBrokenNode(dockererr)
1770                 cr.CrunchLog.Close()
1771                 os.Exit(1)
1772         }
1773
1774         parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerId+".")
1775         if tmperr != nil {
1776                 log.Fatalf("%s: %v", containerId, tmperr)
1777         }
1778
1779         cr.parentTemp = parentTemp
1780         cr.statInterval = *statInterval
1781         cr.cgroupRoot = *cgroupRoot
1782         cr.expectCgroupParent = *cgroupParent
1783         cr.enableNetwork = *enableNetwork
1784         cr.networkMode = *networkMode
1785         cr.checkContainerd = *checkContainerd
1786         if *cgroupParentSubsystem != "" {
1787                 p := findCgroup(*cgroupParentSubsystem)
1788                 cr.setCgroupParent = p
1789                 cr.expectCgroupParent = p
1790         }
1791
1792         runerr := cr.Run()
1793
1794         if *memprofile != "" {
1795                 f, err := os.Create(*memprofile)
1796                 if err != nil {
1797                         log.Printf("could not create memory profile: %s", err)
1798                 }
1799                 runtime.GC() // get up-to-date statistics
1800                 if err := pprof.WriteHeapProfile(f); err != nil {
1801                         log.Printf("could not write memory profile: %s", err)
1802                 }
1803                 closeerr := f.Close()
1804                 if closeerr != nil {
1805                         log.Printf("closing memprofile file: %s", err)
1806                 }
1807         }
1808
1809         if runerr != nil {
1810                 log.Fatalf("%s: %v", containerId, runerr)
1811         }
1812 }