17170: Add "arvados-client shell" subcommand and backend support.
[arvados.git] / lib / crunchrun / crunchrun.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package crunchrun
6
7 import (
8         "bytes"
9         "encoding/json"
10         "errors"
11         "flag"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "log"
16         "os"
17         "os/exec"
18         "os/signal"
19         "path"
20         "path/filepath"
21         "regexp"
22         "runtime"
23         "runtime/pprof"
24         "sort"
25         "strings"
26         "sync"
27         "syscall"
28         "time"
29
30         "git.arvados.org/arvados.git/lib/cmd"
31         "git.arvados.org/arvados.git/lib/crunchstat"
32         "git.arvados.org/arvados.git/sdk/go/arvados"
33         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
34         "git.arvados.org/arvados.git/sdk/go/keepclient"
35         "git.arvados.org/arvados.git/sdk/go/manifest"
36         "golang.org/x/crypto/ssh"
37         "golang.org/x/net/context"
38
39         dockertypes "github.com/docker/docker/api/types"
40         dockercontainer "github.com/docker/docker/api/types/container"
41         dockernetwork "github.com/docker/docker/api/types/network"
42         dockerclient "github.com/docker/docker/client"
43 )
44
45 type command struct{}
46
47 var Command = command{}
48
49 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
50 type IArvadosClient interface {
51         Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
52         Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
53         Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
54         Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error
55         CallRaw(method string, resourceType string, uuid string, action string, parameters arvadosclient.Dict) (reader io.ReadCloser, err error)
56         Discovery(key string) (interface{}, error)
57 }
58
59 // ErrCancelled is the error returned when the container is cancelled.
60 var ErrCancelled = errors.New("Cancelled")
61
62 // IKeepClient is the minimal Keep API methods used by crunch-run.
63 type IKeepClient interface {
64         PutB(buf []byte) (string, int, error)
65         ReadAt(locator string, p []byte, off int) (int, error)
66         ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
67         LocalLocator(locator string) (string, error)
68         ClearBlockCache()
69 }
70
71 // NewLogWriter is a factory function to create a new log writer.
72 type NewLogWriter func(name string) (io.WriteCloser, error)
73
74 type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
75
76 type MkTempDir func(string, string) (string, error)
77
78 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
79 type ThinDockerClient interface {
80         ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error)
81         ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
82                 networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
83         ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
84         ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error
85         ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
86         ContainerInspect(ctx context.Context, id string) (dockertypes.ContainerJSON, error)
87         ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
88         ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
89         ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
90 }
91
92 type PsProcess interface {
93         CmdlineSlice() ([]string, error)
94 }
95
96 // ContainerRunner is the main stateful struct used for a single execution of a
97 // container.
98 type ContainerRunner struct {
99         Docker ThinDockerClient
100
101         // Dispatcher client is initialized with the Dispatcher token.
102         // This is a privileged token used to manage container status
103         // and logs.
104         //
105         // We have both dispatcherClient and DispatcherArvClient
106         // because there are two different incompatible Arvados Go
107         // SDKs and we have to use both (hopefully this gets fixed in
108         // #14467)
109         dispatcherClient     *arvados.Client
110         DispatcherArvClient  IArvadosClient
111         DispatcherKeepClient IKeepClient
112
113         // Container client is initialized with the Container token
114         // This token controls the permissions of the container, and
115         // must be used for operations such as reading collections.
116         //
117         // Same comment as above applies to
118         // containerClient/ContainerArvClient.
119         containerClient     *arvados.Client
120         ContainerArvClient  IArvadosClient
121         ContainerKeepClient IKeepClient
122
123         Container       arvados.Container
124         ContainerConfig dockercontainer.Config
125         HostConfig      dockercontainer.HostConfig
126         token           string
127         ContainerID     string
128         ExitCode        *int
129         NewLogWriter    NewLogWriter
130         loggingDone     chan bool
131         CrunchLog       *ThrottledLogger
132         Stdout          io.WriteCloser
133         Stderr          io.WriteCloser
134         logUUID         string
135         logMtx          sync.Mutex
136         LogCollection   arvados.CollectionFileSystem
137         LogsPDH         *string
138         RunArvMount     RunArvMount
139         MkTempDir       MkTempDir
140         ArvMount        *exec.Cmd
141         ArvMountPoint   string
142         HostOutputDir   string
143         Binds           []string
144         Volumes         map[string]struct{}
145         OutputPDH       *string
146         SigChan         chan os.Signal
147         ArvMountExit    chan error
148         SecretMounts    map[string]arvados.Mount
149         MkArvClient     func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
150         finalState      string
151         parentTemp      string
152
153         statLogger       io.WriteCloser
154         statReporter     *crunchstat.Reporter
155         hoststatLogger   io.WriteCloser
156         hoststatReporter *crunchstat.Reporter
157         statInterval     time.Duration
158         cgroupRoot       string
159         // What we expect the container's cgroup parent to be.
160         expectCgroupParent string
161         // What we tell docker to use as the container's cgroup
162         // parent. Note: Ideally we would use the same field for both
163         // expectCgroupParent and setCgroupParent, and just make it
164         // default to "docker". However, when using docker < 1.10 with
165         // systemd, specifying a non-empty cgroup parent (even the
166         // default value "docker") hits a docker bug
167         // (https://github.com/docker/docker/issues/17126). Using two
168         // separate fields makes it possible to use the "expect cgroup
169         // parent to be X" feature even on sites where the "specify
170         // cgroup parent" feature breaks.
171         setCgroupParent string
172
173         cStateLock sync.Mutex
174         cCancelled bool // StopContainer() invoked
175         cRemoved   bool // docker confirmed the container no longer exists
176
177         enableNetwork string // one of "default" or "always"
178         networkMode   string // passed through to HostConfig.NetworkMode
179         arvMountLog   *ThrottledLogger
180
181         containerWatchdogInterval time.Duration
182
183         gatewayAddress    string
184         gatewaySSHConfig  *ssh.ServerConfig
185         gatewayAuthSecret string
186 }
187
188 // setupSignals sets up signal handling to gracefully terminate the underlying
189 // Docker container and update state when receiving a TERM, INT or QUIT signal.
190 func (runner *ContainerRunner) setupSignals() {
191         runner.SigChan = make(chan os.Signal, 1)
192         signal.Notify(runner.SigChan, syscall.SIGTERM)
193         signal.Notify(runner.SigChan, syscall.SIGINT)
194         signal.Notify(runner.SigChan, syscall.SIGQUIT)
195
196         go func(sig chan os.Signal) {
197                 for s := range sig {
198                         runner.stop(s)
199                 }
200         }(runner.SigChan)
201 }
202
203 // stop the underlying Docker container.
204 func (runner *ContainerRunner) stop(sig os.Signal) {
205         runner.cStateLock.Lock()
206         defer runner.cStateLock.Unlock()
207         if sig != nil {
208                 runner.CrunchLog.Printf("caught signal: %v", sig)
209         }
210         if runner.ContainerID == "" {
211                 return
212         }
213         runner.cCancelled = true
214         runner.CrunchLog.Printf("removing container")
215         err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true})
216         if err != nil {
217                 runner.CrunchLog.Printf("error removing container: %s", err)
218         }
219         if err == nil || strings.Contains(err.Error(), "No such container: "+runner.ContainerID) {
220                 runner.cRemoved = true
221         }
222 }
223
224 var errorBlacklist = []string{
225         "(?ms).*[Cc]annot connect to the Docker daemon.*",
226         "(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*",
227         "(?ms).*grpc: the connection is unavailable.*",
228 }
229 var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
230
231 func (runner *ContainerRunner) runBrokenNodeHook() {
232         if *brokenNodeHook == "" {
233                 path := filepath.Join(lockdir, brokenfile)
234                 runner.CrunchLog.Printf("Writing %s to mark node as broken", path)
235                 f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700)
236                 if err != nil {
237                         runner.CrunchLog.Printf("Error writing %s: %s", path, err)
238                         return
239                 }
240                 f.Close()
241         } else {
242                 runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
243                 // run killme script
244                 c := exec.Command(*brokenNodeHook)
245                 c.Stdout = runner.CrunchLog
246                 c.Stderr = runner.CrunchLog
247                 err := c.Run()
248                 if err != nil {
249                         runner.CrunchLog.Printf("Error running broken node hook: %v", err)
250                 }
251         }
252 }
253
254 func (runner *ContainerRunner) checkBrokenNode(goterr error) bool {
255         for _, d := range errorBlacklist {
256                 if m, e := regexp.MatchString(d, goterr.Error()); m && e == nil {
257                         runner.CrunchLog.Printf("Error suggests node is unable to run containers: %v", goterr)
258                         runner.runBrokenNodeHook()
259                         return true
260                 }
261         }
262         return false
263 }
264
265 // LoadImage determines the docker image id from the container record and
266 // checks if it is available in the local Docker image store.  If not, it loads
267 // the image from Keep.
268 func (runner *ContainerRunner) LoadImage() (err error) {
269
270         runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
271
272         var collection arvados.Collection
273         err = runner.ContainerArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
274         if err != nil {
275                 return fmt.Errorf("While getting container image collection: %v", err)
276         }
277         manifest := manifest.Manifest{Text: collection.ManifestText}
278         var img, imageID string
279         for ms := range manifest.StreamIter() {
280                 img = ms.FileStreamSegments[0].Name
281                 if !strings.HasSuffix(img, ".tar") {
282                         return fmt.Errorf("First file in the container image collection does not end in .tar")
283                 }
284                 imageID = img[:len(img)-4]
285         }
286
287         runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
288
289         _, _, err = runner.Docker.ImageInspectWithRaw(context.TODO(), imageID)
290         if err != nil {
291                 runner.CrunchLog.Print("Loading Docker image from keep")
292
293                 var readCloser io.ReadCloser
294                 readCloser, err = runner.ContainerKeepClient.ManifestFileReader(manifest, img)
295                 if err != nil {
296                         return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
297                 }
298
299                 response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, true)
300                 if err != nil {
301                         return fmt.Errorf("While loading container image into Docker: %v", err)
302                 }
303
304                 defer response.Body.Close()
305                 rbody, err := ioutil.ReadAll(response.Body)
306                 if err != nil {
307                         return fmt.Errorf("Reading response to image load: %v", err)
308                 }
309                 runner.CrunchLog.Printf("Docker response: %s", rbody)
310         } else {
311                 runner.CrunchLog.Print("Docker image is available")
312         }
313
314         runner.ContainerConfig.Image = imageID
315
316         runner.ContainerKeepClient.ClearBlockCache()
317
318         return nil
319 }
320
321 func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) {
322         c = exec.Command("arv-mount", arvMountCmd...)
323
324         // Copy our environment, but override ARVADOS_API_TOKEN with
325         // the container auth token.
326         c.Env = nil
327         for _, s := range os.Environ() {
328                 if !strings.HasPrefix(s, "ARVADOS_API_TOKEN=") {
329                         c.Env = append(c.Env, s)
330                 }
331         }
332         c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
333
334         w, err := runner.NewLogWriter("arv-mount")
335         if err != nil {
336                 return nil, err
337         }
338         runner.arvMountLog = NewThrottledLogger(w)
339         c.Stdout = runner.arvMountLog
340         c.Stderr = runner.arvMountLog
341
342         runner.CrunchLog.Printf("Running %v", c.Args)
343
344         err = c.Start()
345         if err != nil {
346                 return nil, err
347         }
348
349         statReadme := make(chan bool)
350         runner.ArvMountExit = make(chan error)
351
352         keepStatting := true
353         go func() {
354                 for keepStatting {
355                         time.Sleep(100 * time.Millisecond)
356                         _, err = os.Stat(fmt.Sprintf("%s/by_id/README", runner.ArvMountPoint))
357                         if err == nil {
358                                 keepStatting = false
359                                 statReadme <- true
360                         }
361                 }
362                 close(statReadme)
363         }()
364
365         go func() {
366                 mnterr := c.Wait()
367                 if mnterr != nil {
368                         runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
369                 }
370                 runner.ArvMountExit <- mnterr
371                 close(runner.ArvMountExit)
372         }()
373
374         select {
375         case <-statReadme:
376                 break
377         case err := <-runner.ArvMountExit:
378                 runner.ArvMount = nil
379                 keepStatting = false
380                 return nil, err
381         }
382
383         return c, nil
384 }
385
386 func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) {
387         if runner.ArvMountPoint == "" {
388                 runner.ArvMountPoint, err = runner.MkTempDir(runner.parentTemp, prefix)
389         }
390         return
391 }
392
393 func copyfile(src string, dst string) (err error) {
394         srcfile, err := os.Open(src)
395         if err != nil {
396                 return
397         }
398
399         os.MkdirAll(path.Dir(dst), 0777)
400
401         dstfile, err := os.Create(dst)
402         if err != nil {
403                 return
404         }
405         _, err = io.Copy(dstfile, srcfile)
406         if err != nil {
407                 return
408         }
409
410         err = srcfile.Close()
411         err2 := dstfile.Close()
412
413         if err != nil {
414                 return
415         }
416
417         if err2 != nil {
418                 return err2
419         }
420
421         return nil
422 }
423
424 func (runner *ContainerRunner) SetupMounts() (err error) {
425         err = runner.SetupArvMountPoint("keep")
426         if err != nil {
427                 return fmt.Errorf("While creating keep mount temp dir: %v", err)
428         }
429
430         token, err := runner.ContainerToken()
431         if err != nil {
432                 return fmt.Errorf("could not get container token: %s", err)
433         }
434
435         pdhOnly := true
436         tmpcount := 0
437         arvMountCmd := []string{
438                 "--foreground",
439                 "--allow-other",
440                 "--read-write",
441                 fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
442
443         if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
444                 arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
445         }
446
447         collectionPaths := []string{}
448         runner.Binds = nil
449         runner.Volumes = make(map[string]struct{})
450         needCertMount := true
451         type copyFile struct {
452                 src  string
453                 bind string
454         }
455         var copyFiles []copyFile
456
457         var binds []string
458         for bind := range runner.Container.Mounts {
459                 binds = append(binds, bind)
460         }
461         for bind := range runner.SecretMounts {
462                 if _, ok := runner.Container.Mounts[bind]; ok {
463                         return fmt.Errorf("secret mount %q conflicts with regular mount", bind)
464                 }
465                 if runner.SecretMounts[bind].Kind != "json" &&
466                         runner.SecretMounts[bind].Kind != "text" {
467                         return fmt.Errorf("secret mount %q type is %q but only 'json' and 'text' are permitted",
468                                 bind, runner.SecretMounts[bind].Kind)
469                 }
470                 binds = append(binds, bind)
471         }
472         sort.Strings(binds)
473
474         for _, bind := range binds {
475                 mnt, ok := runner.Container.Mounts[bind]
476                 if !ok {
477                         mnt = runner.SecretMounts[bind]
478                 }
479                 if bind == "stdout" || bind == "stderr" {
480                         // Is it a "file" mount kind?
481                         if mnt.Kind != "file" {
482                                 return fmt.Errorf("unsupported mount kind '%s' for %s: only 'file' is supported", mnt.Kind, bind)
483                         }
484
485                         // Does path start with OutputPath?
486                         prefix := runner.Container.OutputPath
487                         if !strings.HasSuffix(prefix, "/") {
488                                 prefix += "/"
489                         }
490                         if !strings.HasPrefix(mnt.Path, prefix) {
491                                 return fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix)
492                         }
493                 }
494
495                 if bind == "stdin" {
496                         // Is it a "collection" mount kind?
497                         if mnt.Kind != "collection" && mnt.Kind != "json" {
498                                 return fmt.Errorf("unsupported mount kind '%s' for stdin: only 'collection' and 'json' are supported", mnt.Kind)
499                         }
500                 }
501
502                 if bind == "/etc/arvados/ca-certificates.crt" {
503                         needCertMount = false
504                 }
505
506                 if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && bind != runner.Container.OutputPath+"/" {
507                         if mnt.Kind != "collection" && mnt.Kind != "text" && mnt.Kind != "json" {
508                                 return fmt.Errorf("only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path for %q, was %q", bind, mnt.Kind)
509                         }
510                 }
511
512                 switch {
513                 case mnt.Kind == "collection" && bind != "stdin":
514                         var src string
515                         if mnt.UUID != "" && mnt.PortableDataHash != "" {
516                                 return fmt.Errorf("cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
517                         }
518                         if mnt.UUID != "" {
519                                 if mnt.Writable {
520                                         return fmt.Errorf("writing to existing collections currently not permitted")
521                                 }
522                                 pdhOnly = false
523                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
524                         } else if mnt.PortableDataHash != "" {
525                                 if mnt.Writable && !strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
526                                         return fmt.Errorf("can never write to a collection specified by portable data hash")
527                                 }
528                                 idx := strings.Index(mnt.PortableDataHash, "/")
529                                 if idx > 0 {
530                                         mnt.Path = path.Clean(mnt.PortableDataHash[idx:])
531                                         mnt.PortableDataHash = mnt.PortableDataHash[0:idx]
532                                         runner.Container.Mounts[bind] = mnt
533                                 }
534                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
535                                 if mnt.Path != "" && mnt.Path != "." {
536                                         if strings.HasPrefix(mnt.Path, "./") {
537                                                 mnt.Path = mnt.Path[2:]
538                                         } else if strings.HasPrefix(mnt.Path, "/") {
539                                                 mnt.Path = mnt.Path[1:]
540                                         }
541                                         src += "/" + mnt.Path
542                                 }
543                         } else {
544                                 src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
545                                 arvMountCmd = append(arvMountCmd, "--mount-tmp")
546                                 arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
547                                 tmpcount++
548                         }
549                         if mnt.Writable {
550                                 if bind == runner.Container.OutputPath {
551                                         runner.HostOutputDir = src
552                                         runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
553                                 } else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
554                                         copyFiles = append(copyFiles, copyFile{src, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
555                                 } else {
556                                         runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
557                                 }
558                         } else {
559                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
560                         }
561                         collectionPaths = append(collectionPaths, src)
562
563                 case mnt.Kind == "tmp":
564                         var tmpdir string
565                         tmpdir, err = runner.MkTempDir(runner.parentTemp, "tmp")
566                         if err != nil {
567                                 return fmt.Errorf("while creating mount temp dir: %v", err)
568                         }
569                         st, staterr := os.Stat(tmpdir)
570                         if staterr != nil {
571                                 return fmt.Errorf("while Stat on temp dir: %v", staterr)
572                         }
573                         err = os.Chmod(tmpdir, st.Mode()|os.ModeSetgid|0777)
574                         if staterr != nil {
575                                 return fmt.Errorf("while Chmod temp dir: %v", err)
576                         }
577                         runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind))
578                         if bind == runner.Container.OutputPath {
579                                 runner.HostOutputDir = tmpdir
580                         }
581
582                 case mnt.Kind == "json" || mnt.Kind == "text":
583                         var filedata []byte
584                         if mnt.Kind == "json" {
585                                 filedata, err = json.Marshal(mnt.Content)
586                                 if err != nil {
587                                         return fmt.Errorf("encoding json data: %v", err)
588                                 }
589                         } else {
590                                 text, ok := mnt.Content.(string)
591                                 if !ok {
592                                         return fmt.Errorf("content for mount %q must be a string", bind)
593                                 }
594                                 filedata = []byte(text)
595                         }
596
597                         tmpdir, err := runner.MkTempDir(runner.parentTemp, mnt.Kind)
598                         if err != nil {
599                                 return fmt.Errorf("creating temp dir: %v", err)
600                         }
601                         tmpfn := filepath.Join(tmpdir, "mountdata."+mnt.Kind)
602                         err = ioutil.WriteFile(tmpfn, filedata, 0444)
603                         if err != nil {
604                                 return fmt.Errorf("writing temp file: %v", err)
605                         }
606                         if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
607                                 copyFiles = append(copyFiles, copyFile{tmpfn, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
608                         } else {
609                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind))
610                         }
611
612                 case mnt.Kind == "git_tree":
613                         tmpdir, err := runner.MkTempDir(runner.parentTemp, "git_tree")
614                         if err != nil {
615                                 return fmt.Errorf("creating temp dir: %v", err)
616                         }
617                         err = gitMount(mnt).extractTree(runner.ContainerArvClient, tmpdir, token)
618                         if err != nil {
619                                 return err
620                         }
621                         runner.Binds = append(runner.Binds, tmpdir+":"+bind+":ro")
622                 }
623         }
624
625         if runner.HostOutputDir == "" {
626                 return fmt.Errorf("output path does not correspond to a writable mount point")
627         }
628
629         if wantAPI := runner.Container.RuntimeConstraints.API; needCertMount && wantAPI != nil && *wantAPI {
630                 for _, certfile := range arvadosclient.CertFiles {
631                         _, err := os.Stat(certfile)
632                         if err == nil {
633                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:/etc/arvados/ca-certificates.crt:ro", certfile))
634                                 break
635                         }
636                 }
637         }
638
639         if pdhOnly {
640                 arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
641         } else {
642                 arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
643         }
644         arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
645
646         runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
647         if err != nil {
648                 return fmt.Errorf("while trying to start arv-mount: %v", err)
649         }
650
651         for _, p := range collectionPaths {
652                 _, err = os.Stat(p)
653                 if err != nil {
654                         return fmt.Errorf("while checking that input files exist: %v", err)
655                 }
656         }
657
658         for _, cp := range copyFiles {
659                 st, err := os.Stat(cp.src)
660                 if err != nil {
661                         return fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err)
662                 }
663                 if st.IsDir() {
664                         err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
665                                 if walkerr != nil {
666                                         return walkerr
667                                 }
668                                 target := path.Join(cp.bind, walkpath[len(cp.src):])
669                                 if walkinfo.Mode().IsRegular() {
670                                         copyerr := copyfile(walkpath, target)
671                                         if copyerr != nil {
672                                                 return copyerr
673                                         }
674                                         return os.Chmod(target, walkinfo.Mode()|0777)
675                                 } else if walkinfo.Mode().IsDir() {
676                                         mkerr := os.MkdirAll(target, 0777)
677                                         if mkerr != nil {
678                                                 return mkerr
679                                         }
680                                         return os.Chmod(target, walkinfo.Mode()|os.ModeSetgid|0777)
681                                 } else {
682                                         return fmt.Errorf("source %q is not a regular file or directory", cp.src)
683                                 }
684                         })
685                 } else if st.Mode().IsRegular() {
686                         err = copyfile(cp.src, cp.bind)
687                         if err == nil {
688                                 err = os.Chmod(cp.bind, st.Mode()|0777)
689                         }
690                 }
691                 if err != nil {
692                         return fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err)
693                 }
694         }
695
696         return nil
697 }
698
699 func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
700         // Handle docker log protocol
701         // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
702         defer close(runner.loggingDone)
703
704         header := make([]byte, 8)
705         var err error
706         for err == nil {
707                 _, err = io.ReadAtLeast(containerReader, header, 8)
708                 if err != nil {
709                         if err == io.EOF {
710                                 err = nil
711                         }
712                         break
713                 }
714                 readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
715                 if header[0] == 1 {
716                         // stdout
717                         _, err = io.CopyN(runner.Stdout, containerReader, readsize)
718                 } else {
719                         // stderr
720                         _, err = io.CopyN(runner.Stderr, containerReader, readsize)
721                 }
722         }
723
724         if err != nil {
725                 runner.CrunchLog.Printf("error reading docker logs: %v", err)
726         }
727
728         err = runner.Stdout.Close()
729         if err != nil {
730                 runner.CrunchLog.Printf("error closing stdout logs: %v", err)
731         }
732
733         err = runner.Stderr.Close()
734         if err != nil {
735                 runner.CrunchLog.Printf("error closing stderr logs: %v", err)
736         }
737
738         if runner.statReporter != nil {
739                 runner.statReporter.Stop()
740                 err = runner.statLogger.Close()
741                 if err != nil {
742                         runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
743                 }
744         }
745 }
746
747 func (runner *ContainerRunner) stopHoststat() error {
748         if runner.hoststatReporter == nil {
749                 return nil
750         }
751         runner.hoststatReporter.Stop()
752         err := runner.hoststatLogger.Close()
753         if err != nil {
754                 return fmt.Errorf("error closing hoststat logs: %v", err)
755         }
756         return nil
757 }
758
759 func (runner *ContainerRunner) startHoststat() error {
760         w, err := runner.NewLogWriter("hoststat")
761         if err != nil {
762                 return err
763         }
764         runner.hoststatLogger = NewThrottledLogger(w)
765         runner.hoststatReporter = &crunchstat.Reporter{
766                 Logger:     log.New(runner.hoststatLogger, "", 0),
767                 CgroupRoot: runner.cgroupRoot,
768                 PollPeriod: runner.statInterval,
769         }
770         runner.hoststatReporter.Start()
771         return nil
772 }
773
774 func (runner *ContainerRunner) startCrunchstat() error {
775         w, err := runner.NewLogWriter("crunchstat")
776         if err != nil {
777                 return err
778         }
779         runner.statLogger = NewThrottledLogger(w)
780         runner.statReporter = &crunchstat.Reporter{
781                 CID:          runner.ContainerID,
782                 Logger:       log.New(runner.statLogger, "", 0),
783                 CgroupParent: runner.expectCgroupParent,
784                 CgroupRoot:   runner.cgroupRoot,
785                 PollPeriod:   runner.statInterval,
786                 TempDir:      runner.parentTemp,
787         }
788         runner.statReporter.Start()
789         return nil
790 }
791
792 type infoCommand struct {
793         label string
794         cmd   []string
795 }
796
797 // LogHostInfo logs info about the current host, for debugging and
798 // accounting purposes. Although it's logged as "node-info", this is
799 // about the environment where crunch-run is actually running, which
800 // might differ from what's described in the node record (see
801 // LogNodeRecord).
802 func (runner *ContainerRunner) LogHostInfo() (err error) {
803         w, err := runner.NewLogWriter("node-info")
804         if err != nil {
805                 return
806         }
807
808         commands := []infoCommand{
809                 {
810                         label: "Host Information",
811                         cmd:   []string{"uname", "-a"},
812                 },
813                 {
814                         label: "CPU Information",
815                         cmd:   []string{"cat", "/proc/cpuinfo"},
816                 },
817                 {
818                         label: "Memory Information",
819                         cmd:   []string{"cat", "/proc/meminfo"},
820                 },
821                 {
822                         label: "Disk Space",
823                         cmd:   []string{"df", "-m", "/", os.TempDir()},
824                 },
825                 {
826                         label: "Disk INodes",
827                         cmd:   []string{"df", "-i", "/", os.TempDir()},
828                 },
829         }
830
831         // Run commands with informational output to be logged.
832         for _, command := range commands {
833                 fmt.Fprintln(w, command.label)
834                 cmd := exec.Command(command.cmd[0], command.cmd[1:]...)
835                 cmd.Stdout = w
836                 cmd.Stderr = w
837                 if err := cmd.Run(); err != nil {
838                         err = fmt.Errorf("While running command %q: %v", command.cmd, err)
839                         fmt.Fprintln(w, err)
840                         return err
841                 }
842                 fmt.Fprintln(w, "")
843         }
844
845         err = w.Close()
846         if err != nil {
847                 return fmt.Errorf("While closing node-info logs: %v", err)
848         }
849         return nil
850 }
851
852 // LogContainerRecord gets and saves the raw JSON container record from the API server
853 func (runner *ContainerRunner) LogContainerRecord() error {
854         logged, err := runner.logAPIResponse("container", "containers", map[string]interface{}{"filters": [][]string{{"uuid", "=", runner.Container.UUID}}}, nil)
855         if !logged && err == nil {
856                 err = fmt.Errorf("error: no container record found for %s", runner.Container.UUID)
857         }
858         return err
859 }
860
861 // LogNodeRecord logs the current host's InstanceType config entry (or
862 // the arvados#node record, if running via crunch-dispatch-slurm).
863 func (runner *ContainerRunner) LogNodeRecord() error {
864         if it := os.Getenv("InstanceType"); it != "" {
865                 // Dispatched via arvados-dispatch-cloud. Save
866                 // InstanceType config fragment received from
867                 // dispatcher on stdin.
868                 w, err := runner.LogCollection.OpenFile("node.json", os.O_CREATE|os.O_WRONLY, 0666)
869                 if err != nil {
870                         return err
871                 }
872                 defer w.Close()
873                 _, err = io.WriteString(w, it)
874                 if err != nil {
875                         return err
876                 }
877                 return w.Close()
878         }
879         // Dispatched via crunch-dispatch-slurm. Look up
880         // apiserver's node record corresponding to
881         // $SLURMD_NODENAME.
882         hostname := os.Getenv("SLURMD_NODENAME")
883         if hostname == "" {
884                 hostname, _ = os.Hostname()
885         }
886         _, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) {
887                 // The "info" field has admin-only info when
888                 // obtained with a privileged token, and
889                 // should not be logged.
890                 node, ok := resp.(map[string]interface{})
891                 if ok {
892                         delete(node, "info")
893                 }
894         })
895         return err
896 }
897
898 func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}, munge func(interface{})) (logged bool, err error) {
899         writer, err := runner.LogCollection.OpenFile(label+".json", os.O_CREATE|os.O_WRONLY, 0666)
900         if err != nil {
901                 return false, err
902         }
903         w := &ArvLogWriter{
904                 ArvClient:     runner.DispatcherArvClient,
905                 UUID:          runner.Container.UUID,
906                 loggingStream: label,
907                 writeCloser:   writer,
908         }
909
910         reader, err := runner.DispatcherArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params))
911         if err != nil {
912                 return false, fmt.Errorf("error getting %s record: %v", label, err)
913         }
914         defer reader.Close()
915
916         dec := json.NewDecoder(reader)
917         dec.UseNumber()
918         var resp map[string]interface{}
919         if err = dec.Decode(&resp); err != nil {
920                 return false, fmt.Errorf("error decoding %s list response: %v", label, err)
921         }
922         items, ok := resp["items"].([]interface{})
923         if !ok {
924                 return false, fmt.Errorf("error decoding %s list response: no \"items\" key in API list response", label)
925         } else if len(items) < 1 {
926                 return false, nil
927         }
928         if munge != nil {
929                 munge(items[0])
930         }
931         // Re-encode it using indentation to improve readability
932         enc := json.NewEncoder(w)
933         enc.SetIndent("", "    ")
934         if err = enc.Encode(items[0]); err != nil {
935                 return false, fmt.Errorf("error logging %s record: %v", label, err)
936         }
937         err = w.Close()
938         if err != nil {
939                 return false, fmt.Errorf("error closing %s.json in log collection: %v", label, err)
940         }
941         return true, nil
942 }
943
944 // AttachStreams connects the docker container stdin, stdout and stderr logs
945 // to the Arvados logger which logs to Keep and the API server logs table.
946 func (runner *ContainerRunner) AttachStreams() (err error) {
947
948         runner.CrunchLog.Print("Attaching container streams")
949
950         // If stdin mount is provided, attach it to the docker container
951         var stdinRdr arvados.File
952         var stdinJSON []byte
953         if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
954                 if stdinMnt.Kind == "collection" {
955                         var stdinColl arvados.Collection
956                         collID := stdinMnt.UUID
957                         if collID == "" {
958                                 collID = stdinMnt.PortableDataHash
959                         }
960                         err = runner.ContainerArvClient.Get("collections", collID, nil, &stdinColl)
961                         if err != nil {
962                                 return fmt.Errorf("While getting stdin collection: %v", err)
963                         }
964
965                         stdinRdr, err = runner.ContainerKeepClient.ManifestFileReader(
966                                 manifest.Manifest{Text: stdinColl.ManifestText},
967                                 stdinMnt.Path)
968                         if os.IsNotExist(err) {
969                                 return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
970                         } else if err != nil {
971                                 return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
972                         }
973                 } else if stdinMnt.Kind == "json" {
974                         stdinJSON, err = json.Marshal(stdinMnt.Content)
975                         if err != nil {
976                                 return fmt.Errorf("While encoding stdin json data: %v", err)
977                         }
978                 }
979         }
980
981         stdinUsed := stdinRdr != nil || len(stdinJSON) != 0
982         response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
983                 dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
984         if err != nil {
985                 return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
986         }
987
988         runner.loggingDone = make(chan bool)
989
990         if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
991                 stdoutFile, err := runner.getStdoutFile(stdoutMnt.Path)
992                 if err != nil {
993                         return err
994                 }
995                 runner.Stdout = stdoutFile
996         } else if w, err := runner.NewLogWriter("stdout"); err != nil {
997                 return err
998         } else {
999                 runner.Stdout = NewThrottledLogger(w)
1000         }
1001
1002         if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok {
1003                 stderrFile, err := runner.getStdoutFile(stderrMnt.Path)
1004                 if err != nil {
1005                         return err
1006                 }
1007                 runner.Stderr = stderrFile
1008         } else if w, err := runner.NewLogWriter("stderr"); err != nil {
1009                 return err
1010         } else {
1011                 runner.Stderr = NewThrottledLogger(w)
1012         }
1013
1014         if stdinRdr != nil {
1015                 go func() {
1016                         _, err := io.Copy(response.Conn, stdinRdr)
1017                         if err != nil {
1018                                 runner.CrunchLog.Printf("While writing stdin collection to docker container: %v", err)
1019                                 runner.stop(nil)
1020                         }
1021                         stdinRdr.Close()
1022                         response.CloseWrite()
1023                 }()
1024         } else if len(stdinJSON) != 0 {
1025                 go func() {
1026                         _, err := io.Copy(response.Conn, bytes.NewReader(stdinJSON))
1027                         if err != nil {
1028                                 runner.CrunchLog.Printf("While writing stdin json to docker container: %v", err)
1029                                 runner.stop(nil)
1030                         }
1031                         response.CloseWrite()
1032                 }()
1033         }
1034
1035         go runner.ProcessDockerAttach(response.Reader)
1036
1037         return nil
1038 }
1039
1040 func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
1041         stdoutPath := mntPath[len(runner.Container.OutputPath):]
1042         index := strings.LastIndex(stdoutPath, "/")
1043         if index > 0 {
1044                 subdirs := stdoutPath[:index]
1045                 if subdirs != "" {
1046                         st, err := os.Stat(runner.HostOutputDir)
1047                         if err != nil {
1048                                 return nil, fmt.Errorf("While Stat on temp dir: %v", err)
1049                         }
1050                         stdoutPath := filepath.Join(runner.HostOutputDir, subdirs)
1051                         err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
1052                         if err != nil {
1053                                 return nil, fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
1054                         }
1055                 }
1056         }
1057         stdoutFile, err := os.Create(filepath.Join(runner.HostOutputDir, stdoutPath))
1058         if err != nil {
1059                 return nil, fmt.Errorf("While creating file %q: %v", stdoutPath, err)
1060         }
1061
1062         return stdoutFile, nil
1063 }
1064
1065 // CreateContainer creates the docker container.
1066 func (runner *ContainerRunner) CreateContainer() error {
1067         runner.CrunchLog.Print("Creating Docker container")
1068
1069         runner.ContainerConfig.Cmd = runner.Container.Command
1070         if runner.Container.Cwd != "." {
1071                 runner.ContainerConfig.WorkingDir = runner.Container.Cwd
1072         }
1073
1074         for k, v := range runner.Container.Environment {
1075                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
1076         }
1077
1078         runner.ContainerConfig.Volumes = runner.Volumes
1079
1080         maxRAM := int64(runner.Container.RuntimeConstraints.RAM)
1081         minDockerRAM := int64(16)
1082         if maxRAM < minDockerRAM*1024*1024 {
1083                 // Docker daemon won't let you set a limit less than ~10 MiB
1084                 maxRAM = minDockerRAM * 1024 * 1024
1085         }
1086         runner.HostConfig = dockercontainer.HostConfig{
1087                 Binds: runner.Binds,
1088                 LogConfig: dockercontainer.LogConfig{
1089                         Type: "none",
1090                 },
1091                 Resources: dockercontainer.Resources{
1092                         CgroupParent: runner.setCgroupParent,
1093                         NanoCPUs:     int64(runner.Container.RuntimeConstraints.VCPUs) * 1000000000,
1094                         Memory:       maxRAM, // RAM
1095                         MemorySwap:   maxRAM, // RAM+swap
1096                         KernelMemory: maxRAM, // kernel portion
1097                 },
1098         }
1099
1100         if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
1101                 tok, err := runner.ContainerToken()
1102                 if err != nil {
1103                         return err
1104                 }
1105                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env,
1106                         "ARVADOS_API_TOKEN="+tok,
1107                         "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
1108                         "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
1109                 )
1110                 runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
1111         } else {
1112                 if runner.enableNetwork == "always" {
1113                         runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
1114                 } else {
1115                         runner.HostConfig.NetworkMode = dockercontainer.NetworkMode("none")
1116                 }
1117         }
1118
1119         _, stdinUsed := runner.Container.Mounts["stdin"]
1120         runner.ContainerConfig.OpenStdin = stdinUsed
1121         runner.ContainerConfig.StdinOnce = stdinUsed
1122         runner.ContainerConfig.AttachStdin = stdinUsed
1123         runner.ContainerConfig.AttachStdout = true
1124         runner.ContainerConfig.AttachStderr = true
1125
1126         createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
1127         if err != nil {
1128                 return fmt.Errorf("While creating container: %v", err)
1129         }
1130
1131         runner.ContainerID = createdBody.ID
1132
1133         return runner.AttachStreams()
1134 }
1135
1136 // StartContainer starts the docker container created by CreateContainer.
1137 func (runner *ContainerRunner) StartContainer() error {
1138         runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
1139         runner.cStateLock.Lock()
1140         defer runner.cStateLock.Unlock()
1141         if runner.cCancelled {
1142                 return ErrCancelled
1143         }
1144         err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID,
1145                 dockertypes.ContainerStartOptions{})
1146         if err != nil {
1147                 var advice string
1148                 if m, e := regexp.MatchString("(?ms).*(exec|System error).*(no such file or directory|file not found).*", err.Error()); m && e == nil {
1149                         advice = fmt.Sprintf("\nPossible causes: command %q is missing, the interpreter given in #! is missing, or script has Windows line endings.", runner.Container.Command[0])
1150                 }
1151                 return fmt.Errorf("could not start container: %v%s", err, advice)
1152         }
1153         return nil
1154 }
1155
1156 // WaitFinish waits for the container to terminate, capture the exit code, and
1157 // close the stdout/stderr logging.
1158 func (runner *ContainerRunner) WaitFinish() error {
1159         var runTimeExceeded <-chan time.Time
1160         runner.CrunchLog.Print("Waiting for container to finish")
1161
1162         waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
1163         arvMountExit := runner.ArvMountExit
1164         if timeout := runner.Container.SchedulingParameters.MaxRunTime; timeout > 0 {
1165                 runTimeExceeded = time.After(time.Duration(timeout) * time.Second)
1166         }
1167
1168         containerGone := make(chan struct{})
1169         go func() {
1170                 defer close(containerGone)
1171                 if runner.containerWatchdogInterval < 1 {
1172                         runner.containerWatchdogInterval = time.Minute
1173                 }
1174                 for range time.NewTicker(runner.containerWatchdogInterval).C {
1175                         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(runner.containerWatchdogInterval))
1176                         ctr, err := runner.Docker.ContainerInspect(ctx, runner.ContainerID)
1177                         cancel()
1178                         runner.cStateLock.Lock()
1179                         done := runner.cRemoved || runner.ExitCode != nil
1180                         runner.cStateLock.Unlock()
1181                         if done {
1182                                 return
1183                         } else if err != nil {
1184                                 runner.CrunchLog.Printf("Error inspecting container: %s", err)
1185                                 runner.checkBrokenNode(err)
1186                                 return
1187                         } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
1188                                 runner.CrunchLog.Printf("Container is not running: State=%v", ctr.State)
1189                                 return
1190                         }
1191                 }
1192         }()
1193
1194         for {
1195                 select {
1196                 case waitBody := <-waitOk:
1197                         runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
1198                         code := int(waitBody.StatusCode)
1199                         runner.ExitCode = &code
1200
1201                         // wait for stdout/stderr to complete
1202                         <-runner.loggingDone
1203                         return nil
1204
1205                 case err := <-waitErr:
1206                         return fmt.Errorf("container wait: %v", err)
1207
1208                 case <-arvMountExit:
1209                         runner.CrunchLog.Printf("arv-mount exited while container is still running.  Stopping container.")
1210                         runner.stop(nil)
1211                         // arvMountExit will always be ready now that
1212                         // it's closed, but that doesn't interest us.
1213                         arvMountExit = nil
1214
1215                 case <-runTimeExceeded:
1216                         runner.CrunchLog.Printf("maximum run time exceeded. Stopping container.")
1217                         runner.stop(nil)
1218                         runTimeExceeded = nil
1219
1220                 case <-containerGone:
1221                         return errors.New("docker client never returned status")
1222                 }
1223         }
1224 }
1225
1226 func (runner *ContainerRunner) updateLogs() {
1227         ticker := time.NewTicker(crunchLogUpdatePeriod / 360)
1228         defer ticker.Stop()
1229
1230         sigusr1 := make(chan os.Signal, 1)
1231         signal.Notify(sigusr1, syscall.SIGUSR1)
1232         defer signal.Stop(sigusr1)
1233
1234         saveAtTime := time.Now().Add(crunchLogUpdatePeriod)
1235         saveAtSize := crunchLogUpdateSize
1236         var savedSize int64
1237         for {
1238                 select {
1239                 case <-ticker.C:
1240                 case <-sigusr1:
1241                         saveAtTime = time.Now()
1242                 }
1243                 runner.logMtx.Lock()
1244                 done := runner.LogsPDH != nil
1245                 runner.logMtx.Unlock()
1246                 if done {
1247                         return
1248                 }
1249                 size := runner.LogCollection.Size()
1250                 if size == savedSize || (time.Now().Before(saveAtTime) && size < saveAtSize) {
1251                         continue
1252                 }
1253                 saveAtTime = time.Now().Add(crunchLogUpdatePeriod)
1254                 saveAtSize = runner.LogCollection.Size() + crunchLogUpdateSize
1255                 saved, err := runner.saveLogCollection(false)
1256                 if err != nil {
1257                         runner.CrunchLog.Printf("error updating log collection: %s", err)
1258                         continue
1259                 }
1260
1261                 var updated arvados.Container
1262                 err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
1263                         "container": arvadosclient.Dict{"log": saved.PortableDataHash},
1264                 }, &updated)
1265                 if err != nil {
1266                         runner.CrunchLog.Printf("error updating container log to %s: %s", saved.PortableDataHash, err)
1267                         continue
1268                 }
1269
1270                 savedSize = size
1271         }
1272 }
1273
1274 // CaptureOutput saves data from the container's output directory if
1275 // needed, and updates the container output accordingly.
1276 func (runner *ContainerRunner) CaptureOutput() error {
1277         if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
1278                 // Output may have been set directly by the container, so
1279                 // refresh the container record to check.
1280                 err := runner.DispatcherArvClient.Get("containers", runner.Container.UUID,
1281                         nil, &runner.Container)
1282                 if err != nil {
1283                         return err
1284                 }
1285                 if runner.Container.Output != "" {
1286                         // Container output is already set.
1287                         runner.OutputPDH = &runner.Container.Output
1288                         return nil
1289                 }
1290         }
1291
1292         txt, err := (&copier{
1293                 client:        runner.containerClient,
1294                 arvClient:     runner.ContainerArvClient,
1295                 keepClient:    runner.ContainerKeepClient,
1296                 hostOutputDir: runner.HostOutputDir,
1297                 ctrOutputDir:  runner.Container.OutputPath,
1298                 binds:         runner.Binds,
1299                 mounts:        runner.Container.Mounts,
1300                 secretMounts:  runner.SecretMounts,
1301                 logger:        runner.CrunchLog,
1302         }).Copy()
1303         if err != nil {
1304                 return err
1305         }
1306         if n := len(regexp.MustCompile(` [0-9a-f]+\+\S*\+R`).FindAllStringIndex(txt, -1)); n > 0 {
1307                 runner.CrunchLog.Printf("Copying %d data blocks from remote input collections...", n)
1308                 fs, err := (&arvados.Collection{ManifestText: txt}).FileSystem(runner.containerClient, runner.ContainerKeepClient)
1309                 if err != nil {
1310                         return err
1311                 }
1312                 txt, err = fs.MarshalManifest(".")
1313                 if err != nil {
1314                         return err
1315                 }
1316         }
1317         var resp arvados.Collection
1318         err = runner.ContainerArvClient.Create("collections", arvadosclient.Dict{
1319                 "ensure_unique_name": true,
1320                 "collection": arvadosclient.Dict{
1321                         "is_trashed":    true,
1322                         "name":          "output for " + runner.Container.UUID,
1323                         "manifest_text": txt,
1324                 },
1325         }, &resp)
1326         if err != nil {
1327                 return fmt.Errorf("error creating output collection: %v", err)
1328         }
1329         runner.OutputPDH = &resp.PortableDataHash
1330         return nil
1331 }
1332
1333 func (runner *ContainerRunner) CleanupDirs() {
1334         if runner.ArvMount != nil {
1335                 var delay int64 = 8
1336                 umount := exec.Command("arv-mount", fmt.Sprintf("--unmount-timeout=%d", delay), "--unmount", runner.ArvMountPoint)
1337                 umount.Stdout = runner.CrunchLog
1338                 umount.Stderr = runner.CrunchLog
1339                 runner.CrunchLog.Printf("Running %v", umount.Args)
1340                 umnterr := umount.Start()
1341
1342                 if umnterr != nil {
1343                         runner.CrunchLog.Printf("Error unmounting: %v", umnterr)
1344                 } else {
1345                         // If arv-mount --unmount gets stuck for any reason, we
1346                         // don't want to wait for it forever.  Do Wait() in a goroutine
1347                         // so it doesn't block crunch-run.
1348                         umountExit := make(chan error)
1349                         go func() {
1350                                 mnterr := umount.Wait()
1351                                 if mnterr != nil {
1352                                         runner.CrunchLog.Printf("Error unmounting: %v", mnterr)
1353                                 }
1354                                 umountExit <- mnterr
1355                         }()
1356
1357                         for again := true; again; {
1358                                 again = false
1359                                 select {
1360                                 case <-umountExit:
1361                                         umount = nil
1362                                         again = true
1363                                 case <-runner.ArvMountExit:
1364                                         break
1365                                 case <-time.After(time.Duration((delay + 1) * int64(time.Second))):
1366                                         runner.CrunchLog.Printf("Timed out waiting for unmount")
1367                                         if umount != nil {
1368                                                 umount.Process.Kill()
1369                                         }
1370                                         runner.ArvMount.Process.Kill()
1371                                 }
1372                         }
1373                 }
1374         }
1375
1376         if runner.ArvMountPoint != "" {
1377                 if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil {
1378                         runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr)
1379                 }
1380         }
1381
1382         if rmerr := os.RemoveAll(runner.parentTemp); rmerr != nil {
1383                 runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", runner.parentTemp, rmerr)
1384         }
1385 }
1386
1387 // CommitLogs posts the collection containing the final container logs.
1388 func (runner *ContainerRunner) CommitLogs() error {
1389         func() {
1390                 // Hold cStateLock to prevent races on CrunchLog (e.g., stop()).
1391                 runner.cStateLock.Lock()
1392                 defer runner.cStateLock.Unlock()
1393
1394                 runner.CrunchLog.Print(runner.finalState)
1395
1396                 if runner.arvMountLog != nil {
1397                         runner.arvMountLog.Close()
1398                 }
1399                 runner.CrunchLog.Close()
1400
1401                 // Closing CrunchLog above allows them to be committed to Keep at this
1402                 // point, but re-open crunch log with ArvClient in case there are any
1403                 // other further errors (such as failing to write the log to Keep!)
1404                 // while shutting down
1405                 runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{
1406                         ArvClient:     runner.DispatcherArvClient,
1407                         UUID:          runner.Container.UUID,
1408                         loggingStream: "crunch-run",
1409                         writeCloser:   nil,
1410                 })
1411                 runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
1412         }()
1413
1414         if runner.LogsPDH != nil {
1415                 // If we have already assigned something to LogsPDH,
1416                 // we must be closing the re-opened log, which won't
1417                 // end up getting attached to the container record and
1418                 // therefore doesn't need to be saved as a collection
1419                 // -- it exists only to send logs to other channels.
1420                 return nil
1421         }
1422         saved, err := runner.saveLogCollection(true)
1423         if err != nil {
1424                 return fmt.Errorf("error saving log collection: %s", err)
1425         }
1426         runner.logMtx.Lock()
1427         defer runner.logMtx.Unlock()
1428         runner.LogsPDH = &saved.PortableDataHash
1429         return nil
1430 }
1431
1432 func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.Collection, err error) {
1433         runner.logMtx.Lock()
1434         defer runner.logMtx.Unlock()
1435         if runner.LogsPDH != nil {
1436                 // Already finalized.
1437                 return
1438         }
1439         mt, err := runner.LogCollection.MarshalManifest(".")
1440         if err != nil {
1441                 err = fmt.Errorf("error creating log manifest: %v", err)
1442                 return
1443         }
1444         updates := arvadosclient.Dict{
1445                 "name":          "logs for " + runner.Container.UUID,
1446                 "manifest_text": mt,
1447         }
1448         if final {
1449                 updates["is_trashed"] = true
1450         } else {
1451                 exp := time.Now().Add(crunchLogUpdatePeriod * 24)
1452                 updates["trash_at"] = exp
1453                 updates["delete_at"] = exp
1454         }
1455         reqBody := arvadosclient.Dict{"collection": updates}
1456         if runner.logUUID == "" {
1457                 reqBody["ensure_unique_name"] = true
1458                 err = runner.DispatcherArvClient.Create("collections", reqBody, &response)
1459         } else {
1460                 err = runner.DispatcherArvClient.Update("collections", runner.logUUID, reqBody, &response)
1461         }
1462         if err != nil {
1463                 return
1464         }
1465         runner.logUUID = response.UUID
1466         return
1467 }
1468
1469 // UpdateContainerRunning updates the container state to "Running"
1470 func (runner *ContainerRunner) UpdateContainerRunning() error {
1471         runner.cStateLock.Lock()
1472         defer runner.cStateLock.Unlock()
1473         if runner.cCancelled {
1474                 return ErrCancelled
1475         }
1476         return runner.DispatcherArvClient.Update("containers", runner.Container.UUID,
1477                 arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running", "gateway_address": runner.gatewayAddress}}, nil)
1478 }
1479
1480 // ContainerToken returns the api_token the container (and any
1481 // arv-mount processes) are allowed to use.
1482 func (runner *ContainerRunner) ContainerToken() (string, error) {
1483         if runner.token != "" {
1484                 return runner.token, nil
1485         }
1486
1487         var auth arvados.APIClientAuthorization
1488         err := runner.DispatcherArvClient.Call("GET", "containers", runner.Container.UUID, "auth", nil, &auth)
1489         if err != nil {
1490                 return "", err
1491         }
1492         runner.token = fmt.Sprintf("v2/%s/%s/%s", auth.UUID, auth.APIToken, runner.Container.UUID)
1493         return runner.token, nil
1494 }
1495
1496 // UpdateContainerFinal updates the container record state on API
1497 // server to "Complete" or "Cancelled"
1498 func (runner *ContainerRunner) UpdateContainerFinal() error {
1499         update := arvadosclient.Dict{}
1500         update["state"] = runner.finalState
1501         if runner.LogsPDH != nil {
1502                 update["log"] = *runner.LogsPDH
1503         }
1504         if runner.finalState == "Complete" {
1505                 if runner.ExitCode != nil {
1506                         update["exit_code"] = *runner.ExitCode
1507                 }
1508                 if runner.OutputPDH != nil {
1509                         update["output"] = *runner.OutputPDH
1510                 }
1511         }
1512         return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
1513 }
1514
1515 // IsCancelled returns the value of Cancelled, with goroutine safety.
1516 func (runner *ContainerRunner) IsCancelled() bool {
1517         runner.cStateLock.Lock()
1518         defer runner.cStateLock.Unlock()
1519         return runner.cCancelled
1520 }
1521
1522 // NewArvLogWriter creates an ArvLogWriter
1523 func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, error) {
1524         writer, err := runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666)
1525         if err != nil {
1526                 return nil, err
1527         }
1528         return &ArvLogWriter{
1529                 ArvClient:     runner.DispatcherArvClient,
1530                 UUID:          runner.Container.UUID,
1531                 loggingStream: name,
1532                 writeCloser:   writer,
1533         }, nil
1534 }
1535
1536 // Run the full container lifecycle.
1537 func (runner *ContainerRunner) Run() (err error) {
1538         runner.CrunchLog.Printf("crunch-run %s started", cmd.Version.String())
1539         runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
1540
1541         hostname, hosterr := os.Hostname()
1542         if hosterr != nil {
1543                 runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr)
1544         } else {
1545                 runner.CrunchLog.Printf("Executing on host '%s'", hostname)
1546         }
1547
1548         runner.finalState = "Queued"
1549
1550         defer func() {
1551                 runner.CleanupDirs()
1552
1553                 runner.CrunchLog.Printf("crunch-run finished")
1554                 runner.CrunchLog.Close()
1555         }()
1556
1557         err = runner.fetchContainerRecord()
1558         if err != nil {
1559                 return
1560         }
1561         if runner.Container.State != "Locked" {
1562                 return fmt.Errorf("dispatch error detected: container %q has state %q", runner.Container.UUID, runner.Container.State)
1563         }
1564
1565         defer func() {
1566                 // checkErr prints e (unless it's nil) and sets err to
1567                 // e (unless err is already non-nil). Thus, if err
1568                 // hasn't already been assigned when Run() returns,
1569                 // this cleanup func will cause Run() to return the
1570                 // first non-nil error that is passed to checkErr().
1571                 checkErr := func(errorIn string, e error) {
1572                         if e == nil {
1573                                 return
1574                         }
1575                         runner.CrunchLog.Printf("error in %s: %v", errorIn, e)
1576                         if err == nil {
1577                                 err = e
1578                         }
1579                         if runner.finalState == "Complete" {
1580                                 // There was an error in the finalization.
1581                                 runner.finalState = "Cancelled"
1582                         }
1583                 }
1584
1585                 // Log the error encountered in Run(), if any
1586                 checkErr("Run", err)
1587
1588                 if runner.finalState == "Queued" {
1589                         runner.UpdateContainerFinal()
1590                         return
1591                 }
1592
1593                 if runner.IsCancelled() {
1594                         runner.finalState = "Cancelled"
1595                         // but don't return yet -- we still want to
1596                         // capture partial output and write logs
1597                 }
1598
1599                 checkErr("CaptureOutput", runner.CaptureOutput())
1600                 checkErr("stopHoststat", runner.stopHoststat())
1601                 checkErr("CommitLogs", runner.CommitLogs())
1602                 checkErr("UpdateContainerFinal", runner.UpdateContainerFinal())
1603         }()
1604
1605         runner.setupSignals()
1606         err = runner.startHoststat()
1607         if err != nil {
1608                 return
1609         }
1610
1611         // check for and/or load image
1612         err = runner.LoadImage()
1613         if err != nil {
1614                 if !runner.checkBrokenNode(err) {
1615                         // Failed to load image but not due to a "broken node"
1616                         // condition, probably user error.
1617                         runner.finalState = "Cancelled"
1618                 }
1619                 err = fmt.Errorf("While loading container image: %v", err)
1620                 return
1621         }
1622
1623         // set up FUSE mount and binds
1624         err = runner.SetupMounts()
1625         if err != nil {
1626                 runner.finalState = "Cancelled"
1627                 err = fmt.Errorf("While setting up mounts: %v", err)
1628                 return
1629         }
1630
1631         err = runner.CreateContainer()
1632         if err != nil {
1633                 return
1634         }
1635         err = runner.LogHostInfo()
1636         if err != nil {
1637                 return
1638         }
1639         err = runner.LogNodeRecord()
1640         if err != nil {
1641                 return
1642         }
1643         err = runner.LogContainerRecord()
1644         if err != nil {
1645                 return
1646         }
1647
1648         if runner.IsCancelled() {
1649                 return
1650         }
1651
1652         err = runner.UpdateContainerRunning()
1653         if err != nil {
1654                 return
1655         }
1656         runner.finalState = "Cancelled"
1657
1658         err = runner.startCrunchstat()
1659         if err != nil {
1660                 return
1661         }
1662
1663         err = runner.StartContainer()
1664         if err != nil {
1665                 runner.checkBrokenNode(err)
1666                 return
1667         }
1668
1669         err = runner.WaitFinish()
1670         if err == nil && !runner.IsCancelled() {
1671                 runner.finalState = "Complete"
1672         }
1673         return
1674 }
1675
1676 // Fetch the current container record (uuid = runner.Container.UUID)
1677 // into runner.Container.
1678 func (runner *ContainerRunner) fetchContainerRecord() error {
1679         reader, err := runner.DispatcherArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
1680         if err != nil {
1681                 return fmt.Errorf("error fetching container record: %v", err)
1682         }
1683         defer reader.Close()
1684
1685         dec := json.NewDecoder(reader)
1686         dec.UseNumber()
1687         err = dec.Decode(&runner.Container)
1688         if err != nil {
1689                 return fmt.Errorf("error decoding container record: %v", err)
1690         }
1691
1692         var sm struct {
1693                 SecretMounts map[string]arvados.Mount `json:"secret_mounts"`
1694         }
1695
1696         containerToken, err := runner.ContainerToken()
1697         if err != nil {
1698                 return fmt.Errorf("error getting container token: %v", err)
1699         }
1700
1701         runner.ContainerArvClient, runner.ContainerKeepClient,
1702                 runner.containerClient, err = runner.MkArvClient(containerToken)
1703         if err != nil {
1704                 return fmt.Errorf("error creating container API client: %v", err)
1705         }
1706
1707         err = runner.ContainerArvClient.Call("GET", "containers", runner.Container.UUID, "secret_mounts", nil, &sm)
1708         if err != nil {
1709                 if apierr, ok := err.(arvadosclient.APIServerError); !ok || apierr.HttpStatusCode != 404 {
1710                         return fmt.Errorf("error fetching secret_mounts: %v", err)
1711                 }
1712                 // ok && apierr.HttpStatusCode == 404, which means
1713                 // secret_mounts isn't supported by this API server.
1714         }
1715         runner.SecretMounts = sm.SecretMounts
1716
1717         return nil
1718 }
1719
1720 // NewContainerRunner creates a new container runner.
1721 func NewContainerRunner(dispatcherClient *arvados.Client,
1722         dispatcherArvClient IArvadosClient,
1723         dispatcherKeepClient IKeepClient,
1724         docker ThinDockerClient,
1725         containerUUID string) (*ContainerRunner, error) {
1726
1727         cr := &ContainerRunner{
1728                 dispatcherClient:     dispatcherClient,
1729                 DispatcherArvClient:  dispatcherArvClient,
1730                 DispatcherKeepClient: dispatcherKeepClient,
1731                 Docker:               docker,
1732         }
1733         cr.NewLogWriter = cr.NewArvLogWriter
1734         cr.RunArvMount = cr.ArvMountCmd
1735         cr.MkTempDir = ioutil.TempDir
1736         cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
1737                 cl, err := arvadosclient.MakeArvadosClient()
1738                 if err != nil {
1739                         return nil, nil, nil, err
1740                 }
1741                 cl.ApiToken = token
1742                 kc, err := keepclient.MakeKeepClient(cl)
1743                 if err != nil {
1744                         return nil, nil, nil, err
1745                 }
1746                 c2 := arvados.NewClientFromEnv()
1747                 c2.AuthToken = token
1748                 return cl, kc, c2, nil
1749         }
1750         var err error
1751         cr.LogCollection, err = (&arvados.Collection{}).FileSystem(cr.dispatcherClient, cr.DispatcherKeepClient)
1752         if err != nil {
1753                 return nil, err
1754         }
1755         cr.Container.UUID = containerUUID
1756         w, err := cr.NewLogWriter("crunch-run")
1757         if err != nil {
1758                 return nil, err
1759         }
1760         cr.CrunchLog = NewThrottledLogger(w)
1761         cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
1762
1763         loadLogThrottleParams(dispatcherArvClient)
1764         go cr.updateLogs()
1765
1766         return cr, nil
1767 }
1768
1769 func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
1770         flags := flag.NewFlagSet(prog, flag.ContinueOnError)
1771         statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
1772         cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
1773         cgroupParent := flags.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
1774         cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
1775         caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
1776         detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
1777         stdinEnv := flags.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
1778         sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
1779         kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
1780         list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
1781         enableNetwork := flags.String("container-enable-networking", "default",
1782                 `Specify if networking should be enabled for container.  One of 'default', 'always':
1783         default: only enable networking if container requests it.
1784         always:  containers always have networking enabled
1785         `)
1786         networkMode := flags.String("container-network-mode", "default",
1787                 `Set networking mode for container.  Corresponds to Docker network mode (--net).
1788         `)
1789         memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
1790         flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
1791
1792         ignoreDetachFlag := false
1793         if len(args) > 0 && args[0] == "-no-detach" {
1794                 // This process was invoked by a parent process, which
1795                 // has passed along its own arguments, including
1796                 // -detach, after the leading -no-detach flag.  Strip
1797                 // the leading -no-detach flag (it's not recognized by
1798                 // flags.Parse()) and ignore the -detach flag that
1799                 // comes later.
1800                 args = args[1:]
1801                 ignoreDetachFlag = true
1802         }
1803
1804         if err := flags.Parse(args); err == flag.ErrHelp {
1805                 return 0
1806         } else if err != nil {
1807                 log.Print(err)
1808                 return 1
1809         }
1810
1811         if *stdinEnv && !ignoreDetachFlag {
1812                 // Load env vars on stdin if asked (but not in a
1813                 // detached child process, in which case stdin is
1814                 // /dev/null).
1815                 err := loadEnv(os.Stdin)
1816                 if err != nil {
1817                         log.Print(err)
1818                         return 1
1819                 }
1820         }
1821
1822         containerID := flags.Arg(0)
1823
1824         switch {
1825         case *detach && !ignoreDetachFlag:
1826                 return Detach(containerID, prog, args, os.Stdout, os.Stderr)
1827         case *kill >= 0:
1828                 return KillProcess(containerID, syscall.Signal(*kill), os.Stdout, os.Stderr)
1829         case *list:
1830                 return ListProcesses(os.Stdout, os.Stderr)
1831         }
1832
1833         if containerID == "" {
1834                 log.Printf("usage: %s [options] UUID", prog)
1835                 return 1
1836         }
1837
1838         log.Printf("crunch-run %s started", cmd.Version.String())
1839         time.Sleep(*sleep)
1840
1841         if *caCertsPath != "" {
1842                 arvadosclient.CertFiles = []string{*caCertsPath}
1843         }
1844
1845         api, err := arvadosclient.MakeArvadosClient()
1846         if err != nil {
1847                 log.Printf("%s: %v", containerID, err)
1848                 return 1
1849         }
1850         api.Retries = 8
1851
1852         kc, kcerr := keepclient.MakeKeepClient(api)
1853         if kcerr != nil {
1854                 log.Printf("%s: %v", containerID, kcerr)
1855                 return 1
1856         }
1857         kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
1858         kc.Retries = 4
1859
1860         // API version 1.21 corresponds to Docker 1.9, which is currently the
1861         // minimum version we want to support.
1862         docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
1863
1864         cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerID)
1865         if err != nil {
1866                 log.Print(err)
1867                 return 1
1868         }
1869         if dockererr != nil {
1870                 cr.CrunchLog.Printf("%s: %v", containerID, dockererr)
1871                 cr.checkBrokenNode(dockererr)
1872                 cr.CrunchLog.Close()
1873                 return 1
1874         }
1875
1876         cr.gatewayAuthSecret = os.Getenv("GatewayAuthSecret")
1877         os.Unsetenv("GatewayAuthSecret")
1878         err = cr.startGatewayServer()
1879         if err != nil {
1880                 log.Printf("error starting gateway server: %s", err)
1881                 return 1
1882         }
1883
1884         parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerID+".")
1885         if tmperr != nil {
1886                 log.Printf("%s: %v", containerID, tmperr)
1887                 return 1
1888         }
1889
1890         cr.parentTemp = parentTemp
1891         cr.statInterval = *statInterval
1892         cr.cgroupRoot = *cgroupRoot
1893         cr.expectCgroupParent = *cgroupParent
1894         cr.enableNetwork = *enableNetwork
1895         cr.networkMode = *networkMode
1896         if *cgroupParentSubsystem != "" {
1897                 p := findCgroup(*cgroupParentSubsystem)
1898                 cr.setCgroupParent = p
1899                 cr.expectCgroupParent = p
1900         }
1901
1902         runerr := cr.Run()
1903
1904         if *memprofile != "" {
1905                 f, err := os.Create(*memprofile)
1906                 if err != nil {
1907                         log.Printf("could not create memory profile: %s", err)
1908                 }
1909                 runtime.GC() // get up-to-date statistics
1910                 if err := pprof.WriteHeapProfile(f); err != nil {
1911                         log.Printf("could not write memory profile: %s", err)
1912                 }
1913                 closeerr := f.Close()
1914                 if closeerr != nil {
1915                         log.Printf("closing memprofile file: %s", err)
1916                 }
1917         }
1918
1919         if runerr != nil {
1920                 log.Printf("%s: %v", containerID, runerr)
1921                 return 1
1922         }
1923         return 0
1924 }
1925
1926 func loadEnv(rdr io.Reader) error {
1927         buf, err := ioutil.ReadAll(rdr)
1928         if err != nil {
1929                 return fmt.Errorf("read stdin: %s", err)
1930         }
1931         var env map[string]string
1932         err = json.Unmarshal(buf, &env)
1933         if err != nil {
1934                 return fmt.Errorf("decode stdin: %s", err)
1935         }
1936         for k, v := range env {
1937                 err = os.Setenv(k, v)
1938                 if err != nil {
1939                         return fmt.Errorf("setenv(%q): %s", k, err)
1940                 }
1941         }
1942         return nil
1943 }