7709: Clear deliveries before each test. Fixes flaky test.
[arvados.git] / services / crunch-run / crunchrun.go
1 package main
2
3 import (
4         "context"
5         "encoding/json"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "log"
12         "os"
13         "os/exec"
14         "os/signal"
15         "path"
16         "path/filepath"
17         "sort"
18         "strings"
19         "sync"
20         "syscall"
21         "time"
22
23         "git.curoverse.com/arvados.git/lib/crunchstat"
24         "git.curoverse.com/arvados.git/sdk/go/arvados"
25         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
26         "git.curoverse.com/arvados.git/sdk/go/keepclient"
27         "git.curoverse.com/arvados.git/sdk/go/manifest"
28
29         dockertypes "github.com/docker/docker/api/types"
30         dockercontainer "github.com/docker/docker/api/types/container"
31         dockernetwork "github.com/docker/docker/api/types/network"
32         dockerclient "github.com/docker/docker/client"
33 )
34
35 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
36 type IArvadosClient interface {
37         Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
38         Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
39         Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
40         Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error
41         CallRaw(method string, resourceType string, uuid string, action string, parameters arvadosclient.Dict) (reader io.ReadCloser, err error)
42         Discovery(key string) (interface{}, error)
43 }
44
45 // ErrCancelled is the error returned when the container is cancelled.
46 var ErrCancelled = errors.New("Cancelled")
47
48 // IKeepClient is the minimal Keep API methods used by crunch-run.
49 type IKeepClient interface {
50         PutHB(hash string, buf []byte) (string, int, error)
51         ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error)
52 }
53
54 // NewLogWriter is a factory function to create a new log writer.
55 type NewLogWriter func(name string) io.WriteCloser
56
57 type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
58
59 type MkTempDir func(string, string) (string, error)
60
61 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
62 type ThinDockerClient interface {
63         ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error)
64         ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
65                 networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
66         ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
67         ContainerStop(ctx context.Context, container string, timeout *time.Duration) error
68         ContainerWait(ctx context.Context, container string) (int64, error)
69         ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
70         ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
71         ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
72 }
73
74 // ThinDockerClientProxy is a proxy implementation of ThinDockerClient
75 // that executes the docker requests on dockerclient.Client
76 type ThinDockerClientProxy struct {
77         Docker *dockerclient.Client
78 }
79
80 // ContainerAttach invokes dockerclient.Client.ContainerAttach
81 func (proxy ThinDockerClientProxy) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
82         return proxy.Docker.ContainerAttach(ctx, container, options)
83 }
84
85 // ContainerCreate invokes dockerclient.Client.ContainerCreate
86 func (proxy ThinDockerClientProxy) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
87         networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
88         return proxy.Docker.ContainerCreate(ctx, config, hostConfig, networkingConfig, containerName)
89 }
90
91 // ContainerStart invokes dockerclient.Client.ContainerStart
92 func (proxy ThinDockerClientProxy) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
93         return proxy.Docker.ContainerStart(ctx, container, options)
94 }
95
96 // ContainerStop invokes dockerclient.Client.ContainerStop
97 func (proxy ThinDockerClientProxy) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
98         return proxy.Docker.ContainerStop(ctx, container, timeout)
99 }
100
101 // ContainerWait invokes dockerclient.Client.ContainerWait
102 func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string) (int64, error) {
103         return proxy.Docker.ContainerWait(ctx, container)
104 }
105
106 // ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw
107 func (proxy ThinDockerClientProxy) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
108         return proxy.Docker.ImageInspectWithRaw(ctx, image)
109 }
110
111 // ImageLoad invokes dockerclient.Client.ImageLoad
112 func (proxy ThinDockerClientProxy) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
113         return proxy.Docker.ImageLoad(ctx, input, quiet)
114 }
115
116 // ImageRemove invokes dockerclient.Client.ImageRemove
117 func (proxy ThinDockerClientProxy) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) {
118         return proxy.Docker.ImageRemove(ctx, image, options)
119 }
120
121 // ContainerRunner is the main stateful struct used for a single execution of a
122 // container.
123 type ContainerRunner struct {
124         Docker    ThinDockerClient
125         ArvClient IArvadosClient
126         Kc        IKeepClient
127         arvados.Container
128         ContainerConfig dockercontainer.Config
129         dockercontainer.HostConfig
130         token       string
131         ContainerID string
132         ExitCode    *int
133         NewLogWriter
134         loggingDone   chan bool
135         CrunchLog     *ThrottledLogger
136         Stdout        io.WriteCloser
137         Stderr        *ThrottledLogger
138         LogCollection *CollectionWriter
139         LogsPDH       *string
140         RunArvMount
141         MkTempDir
142         ArvMount       *exec.Cmd
143         ArvMountPoint  string
144         HostOutputDir  string
145         CleanupTempDir []string
146         Binds          []string
147         OutputPDH      *string
148         SigChan        chan os.Signal
149         ArvMountExit   chan error
150         finalState     string
151
152         statLogger   io.WriteCloser
153         statReporter *crunchstat.Reporter
154         statInterval time.Duration
155         cgroupRoot   string
156         // What we expect the container's cgroup parent to be.
157         expectCgroupParent string
158         // What we tell docker to use as the container's cgroup
159         // parent. Note: Ideally we would use the same field for both
160         // expectCgroupParent and setCgroupParent, and just make it
161         // default to "docker". However, when using docker < 1.10 with
162         // systemd, specifying a non-empty cgroup parent (even the
163         // default value "docker") hits a docker bug
164         // (https://github.com/docker/docker/issues/17126). Using two
165         // separate fields makes it possible to use the "expect cgroup
166         // parent to be X" feature even on sites where the "specify
167         // cgroup parent" feature breaks.
168         setCgroupParent string
169
170         cStateLock sync.Mutex
171         cStarted   bool // StartContainer() succeeded
172         cCancelled bool // StopContainer() invoked
173
174         enableNetwork string // one of "default" or "always"
175         networkMode   string // passed through to HostConfig.NetworkMode
176 }
177
178 // SetupSignals sets up signal handling to gracefully terminate the underlying
179 // Docker container and update state when receiving a TERM, INT or QUIT signal.
180 func (runner *ContainerRunner) SetupSignals() {
181         runner.SigChan = make(chan os.Signal, 1)
182         signal.Notify(runner.SigChan, syscall.SIGTERM)
183         signal.Notify(runner.SigChan, syscall.SIGINT)
184         signal.Notify(runner.SigChan, syscall.SIGQUIT)
185
186         go func(sig chan os.Signal) {
187                 <-sig
188                 runner.stop()
189                 signal.Stop(sig)
190         }(runner.SigChan)
191 }
192
193 // stop the underlying Docker container.
194 func (runner *ContainerRunner) stop() {
195         runner.cStateLock.Lock()
196         defer runner.cStateLock.Unlock()
197         if runner.cCancelled {
198                 return
199         }
200         runner.cCancelled = true
201         if runner.cStarted {
202                 timeout := time.Duration(10)
203                 err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout))
204                 if err != nil {
205                         log.Printf("StopContainer failed: %s", err)
206                 }
207         }
208 }
209
210 // LoadImage determines the docker image id from the container record and
211 // checks if it is available in the local Docker image store.  If not, it loads
212 // the image from Keep.
213 func (runner *ContainerRunner) LoadImage() (err error) {
214
215         runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
216
217         var collection arvados.Collection
218         err = runner.ArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
219         if err != nil {
220                 return fmt.Errorf("While getting container image collection: %v", err)
221         }
222         manifest := manifest.Manifest{Text: collection.ManifestText}
223         var img, imageID string
224         for ms := range manifest.StreamIter() {
225                 img = ms.FileStreamSegments[0].Name
226                 if !strings.HasSuffix(img, ".tar") {
227                         return fmt.Errorf("First file in the container image collection does not end in .tar")
228                 }
229                 imageID = img[:len(img)-4]
230         }
231
232         runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
233
234         _, _, err = runner.Docker.ImageInspectWithRaw(context.TODO(), imageID)
235         if err != nil {
236                 runner.CrunchLog.Print("Loading Docker image from keep")
237
238                 var readCloser io.ReadCloser
239                 readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
240                 if err != nil {
241                         return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
242                 }
243
244                 response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, false)
245                 if err != nil {
246                         return fmt.Errorf("While loading container image into Docker: %v", err)
247                 }
248                 response.Body.Close()
249         } else {
250                 runner.CrunchLog.Print("Docker image is available")
251         }
252
253         runner.ContainerConfig.Image = imageID
254
255         return nil
256 }
257
258 func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) {
259         c = exec.Command("arv-mount", arvMountCmd...)
260
261         // Copy our environment, but override ARVADOS_API_TOKEN with
262         // the container auth token.
263         c.Env = nil
264         for _, s := range os.Environ() {
265                 if !strings.HasPrefix(s, "ARVADOS_API_TOKEN=") {
266                         c.Env = append(c.Env, s)
267                 }
268         }
269         c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
270
271         nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
272         c.Stdout = nt
273         c.Stderr = nt
274
275         err = c.Start()
276         if err != nil {
277                 return nil, err
278         }
279
280         statReadme := make(chan bool)
281         runner.ArvMountExit = make(chan error)
282
283         keepStatting := true
284         go func() {
285                 for keepStatting {
286                         time.Sleep(100 * time.Millisecond)
287                         _, err = os.Stat(fmt.Sprintf("%s/by_id/README", runner.ArvMountPoint))
288                         if err == nil {
289                                 keepStatting = false
290                                 statReadme <- true
291                         }
292                 }
293                 close(statReadme)
294         }()
295
296         go func() {
297                 runner.ArvMountExit <- c.Wait()
298                 close(runner.ArvMountExit)
299         }()
300
301         select {
302         case <-statReadme:
303                 break
304         case err := <-runner.ArvMountExit:
305                 runner.ArvMount = nil
306                 keepStatting = false
307                 return nil, err
308         }
309
310         return c, nil
311 }
312
313 func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) {
314         if runner.ArvMountPoint == "" {
315                 runner.ArvMountPoint, err = runner.MkTempDir("", prefix)
316         }
317         return
318 }
319
320 func (runner *ContainerRunner) SetupMounts() (err error) {
321         err = runner.SetupArvMountPoint("keep")
322         if err != nil {
323                 return fmt.Errorf("While creating keep mount temp dir: %v", err)
324         }
325
326         runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
327
328         pdhOnly := true
329         tmpcount := 0
330         arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
331
332         if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
333                 arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
334         }
335
336         collectionPaths := []string{}
337         runner.Binds = nil
338         needCertMount := true
339
340         var binds []string
341         for bind, _ := range runner.Container.Mounts {
342                 binds = append(binds, bind)
343         }
344         sort.Strings(binds)
345
346         for _, bind := range binds {
347                 mnt := runner.Container.Mounts[bind]
348                 if bind == "stdout" {
349                         // Is it a "file" mount kind?
350                         if mnt.Kind != "file" {
351                                 return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
352                         }
353
354                         // Does path start with OutputPath?
355                         prefix := runner.Container.OutputPath
356                         if !strings.HasSuffix(prefix, "/") {
357                                 prefix += "/"
358                         }
359                         if !strings.HasPrefix(mnt.Path, prefix) {
360                                 return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
361                         }
362                 }
363
364                 if bind == "/etc/arvados/ca-certificates.crt" {
365                         needCertMount = false
366                 }
367
368                 if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && bind != runner.Container.OutputPath+"/" {
369                         if mnt.Kind != "collection" {
370                                 return fmt.Errorf("Only mount points of kind 'collection' are supported underneath the output_path: %v", bind)
371                         }
372                 }
373
374                 switch {
375                 case mnt.Kind == "collection":
376                         var src string
377                         if mnt.UUID != "" && mnt.PortableDataHash != "" {
378                                 return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
379                         }
380                         if mnt.UUID != "" {
381                                 if mnt.Writable {
382                                         return fmt.Errorf("Writing to existing collections currently not permitted.")
383                                 }
384                                 pdhOnly = false
385                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
386                         } else if mnt.PortableDataHash != "" {
387                                 if mnt.Writable {
388                                         return fmt.Errorf("Can never write to a collection specified by portable data hash")
389                                 }
390                                 idx := strings.Index(mnt.PortableDataHash, "/")
391                                 if idx > 0 {
392                                         mnt.Path = path.Clean(mnt.PortableDataHash[idx:])
393                                         mnt.PortableDataHash = mnt.PortableDataHash[0:idx]
394                                         runner.Container.Mounts[bind] = mnt
395                                 }
396                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
397                                 if mnt.Path != "" && mnt.Path != "." {
398                                         if strings.HasPrefix(mnt.Path, "./") {
399                                                 mnt.Path = mnt.Path[2:]
400                                         } else if strings.HasPrefix(mnt.Path, "/") {
401                                                 mnt.Path = mnt.Path[1:]
402                                         }
403                                         src += "/" + mnt.Path
404                                 }
405                         } else {
406                                 src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
407                                 arvMountCmd = append(arvMountCmd, "--mount-tmp")
408                                 arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
409                                 tmpcount += 1
410                         }
411                         if mnt.Writable {
412                                 if bind == runner.Container.OutputPath {
413                                         runner.HostOutputDir = src
414                                 } else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
415                                         return fmt.Errorf("Writable mount points are not permitted underneath the output_path: %v", bind)
416                                 }
417                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
418                         } else {
419                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
420                         }
421                         collectionPaths = append(collectionPaths, src)
422
423                 case mnt.Kind == "tmp" && bind == runner.Container.OutputPath:
424                         runner.HostOutputDir, err = runner.MkTempDir("", "")
425                         if err != nil {
426                                 return fmt.Errorf("While creating mount temp dir: %v", err)
427                         }
428                         st, staterr := os.Stat(runner.HostOutputDir)
429                         if staterr != nil {
430                                 return fmt.Errorf("While Stat on temp dir: %v", staterr)
431                         }
432                         err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
433                         if staterr != nil {
434                                 return fmt.Errorf("While Chmod temp dir: %v", err)
435                         }
436                         runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
437                         runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
438
439                 case mnt.Kind == "tmp":
440                         runner.Binds = append(runner.Binds, bind)
441
442                 case mnt.Kind == "json":
443                         jsondata, err := json.Marshal(mnt.Content)
444                         if err != nil {
445                                 return fmt.Errorf("encoding json data: %v", err)
446                         }
447                         // Create a tempdir with a single file
448                         // (instead of just a tempfile): this way we
449                         // can ensure the file is world-readable
450                         // inside the container, without having to
451                         // make it world-readable on the docker host.
452                         tmpdir, err := runner.MkTempDir("", "")
453                         if err != nil {
454                                 return fmt.Errorf("creating temp dir: %v", err)
455                         }
456                         runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
457                         tmpfn := filepath.Join(tmpdir, "mountdata.json")
458                         err = ioutil.WriteFile(tmpfn, jsondata, 0644)
459                         if err != nil {
460                                 return fmt.Errorf("writing temp file: %v", err)
461                         }
462                         runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind))
463                 }
464         }
465
466         if runner.HostOutputDir == "" {
467                 return fmt.Errorf("Output path does not correspond to a writable mount point")
468         }
469
470         if wantAPI := runner.Container.RuntimeConstraints.API; needCertMount && wantAPI != nil && *wantAPI {
471                 for _, certfile := range arvadosclient.CertFiles {
472                         _, err := os.Stat(certfile)
473                         if err == nil {
474                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:/etc/arvados/ca-certificates.crt:ro", certfile))
475                                 break
476                         }
477                 }
478         }
479
480         if pdhOnly {
481                 arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
482         } else {
483                 arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
484         }
485         arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
486
487         token, err := runner.ContainerToken()
488         if err != nil {
489                 return fmt.Errorf("could not get container token: %s", err)
490         }
491
492         runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
493         if err != nil {
494                 return fmt.Errorf("While trying to start arv-mount: %v", err)
495         }
496
497         for _, p := range collectionPaths {
498                 _, err = os.Stat(p)
499                 if err != nil {
500                         return fmt.Errorf("While checking that input files exist: %v", err)
501                 }
502         }
503
504         return nil
505 }
506
507 func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
508         // Handle docker log protocol
509         // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
510
511         header := make([]byte, 8)
512         for {
513                 _, readerr := io.ReadAtLeast(containerReader, header, 8)
514
515                 if readerr == nil {
516                         readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
517                         if header[0] == 1 {
518                                 // stdout
519                                 _, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
520                         } else {
521                                 // stderr
522                                 _, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
523                         }
524                 }
525
526                 if readerr != nil {
527                         if readerr != io.EOF {
528                                 runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
529                         }
530
531                         closeerr := runner.Stdout.Close()
532                         if closeerr != nil {
533                                 runner.CrunchLog.Printf("While closing stdout logs: %v", closeerr)
534                         }
535
536                         closeerr = runner.Stderr.Close()
537                         if closeerr != nil {
538                                 runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
539                         }
540
541                         if runner.statReporter != nil {
542                                 runner.statReporter.Stop()
543                                 closeerr = runner.statLogger.Close()
544                                 if closeerr != nil {
545                                         runner.CrunchLog.Printf("While closing crunchstat logs: %v", closeerr)
546                                 }
547                         }
548
549                         runner.loggingDone <- true
550                         close(runner.loggingDone)
551                         return
552                 }
553         }
554 }
555
556 func (runner *ContainerRunner) StartCrunchstat() {
557         runner.statLogger = NewThrottledLogger(runner.NewLogWriter("crunchstat"))
558         runner.statReporter = &crunchstat.Reporter{
559                 CID:          runner.ContainerID,
560                 Logger:       log.New(runner.statLogger, "", 0),
561                 CgroupParent: runner.expectCgroupParent,
562                 CgroupRoot:   runner.cgroupRoot,
563                 PollPeriod:   runner.statInterval,
564         }
565         runner.statReporter.Start()
566 }
567
568 type infoCommand struct {
569         label string
570         cmd   []string
571 }
572
573 // Gather node information and store it on the log for debugging
574 // purposes.
575 func (runner *ContainerRunner) LogNodeInfo() (err error) {
576         w := runner.NewLogWriter("node-info")
577         logger := log.New(w, "node-info", 0)
578
579         commands := []infoCommand{
580                 infoCommand{
581                         label: "Host Information",
582                         cmd:   []string{"uname", "-a"},
583                 },
584                 infoCommand{
585                         label: "CPU Information",
586                         cmd:   []string{"cat", "/proc/cpuinfo"},
587                 },
588                 infoCommand{
589                         label: "Memory Information",
590                         cmd:   []string{"cat", "/proc/meminfo"},
591                 },
592                 infoCommand{
593                         label: "Disk Space",
594                         cmd:   []string{"df", "-m", "/", os.TempDir()},
595                 },
596                 infoCommand{
597                         label: "Disk INodes",
598                         cmd:   []string{"df", "-i", "/", os.TempDir()},
599                 },
600         }
601
602         // Run commands with informational output to be logged.
603         var out []byte
604         for _, command := range commands {
605                 out, err = exec.Command(command.cmd[0], command.cmd[1:]...).CombinedOutput()
606                 if err != nil {
607                         return fmt.Errorf("While running command %q: %v",
608                                 command.cmd, err)
609                 }
610                 logger.Println(command.label)
611                 for _, line := range strings.Split(string(out), "\n") {
612                         logger.Println(" ", line)
613                 }
614         }
615
616         err = w.Close()
617         if err != nil {
618                 return fmt.Errorf("While closing node-info logs: %v", err)
619         }
620         return nil
621 }
622
623 // Get and save the raw JSON container record from the API server
624 func (runner *ContainerRunner) LogContainerRecord() (err error) {
625         w := &ArvLogWriter{
626                 runner.ArvClient,
627                 runner.Container.UUID,
628                 "container",
629                 runner.LogCollection.Open("container.json"),
630         }
631         // Get Container record JSON from the API Server
632         reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
633         if err != nil {
634                 return fmt.Errorf("While retrieving container record from the API server: %v", err)
635         }
636         defer reader.Close()
637         // Read the API server response as []byte
638         json_bytes, err := ioutil.ReadAll(reader)
639         if err != nil {
640                 return fmt.Errorf("While reading container record API server response: %v", err)
641         }
642         // Decode the JSON []byte
643         var cr map[string]interface{}
644         if err = json.Unmarshal(json_bytes, &cr); err != nil {
645                 return fmt.Errorf("While decoding the container record JSON response: %v", err)
646         }
647         // Re-encode it using indentation to improve readability
648         enc := json.NewEncoder(w)
649         enc.SetIndent("", "    ")
650         if err = enc.Encode(cr); err != nil {
651                 return fmt.Errorf("While logging the JSON container record: %v", err)
652         }
653         err = w.Close()
654         if err != nil {
655                 return fmt.Errorf("While closing container.json log: %v", err)
656         }
657         return nil
658 }
659
660 // AttachLogs connects the docker container stdout and stderr logs to the
661 // Arvados logger which logs to Keep and the API server logs table.
662 func (runner *ContainerRunner) AttachStreams() (err error) {
663
664         runner.CrunchLog.Print("Attaching container streams")
665
666         response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
667                 dockertypes.ContainerAttachOptions{Stream: true, Stdout: true, Stderr: true})
668         if err != nil {
669                 return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
670         }
671
672         runner.loggingDone = make(chan bool)
673
674         if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
675                 stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):]
676                 index := strings.LastIndex(stdoutPath, "/")
677                 if index > 0 {
678                         subdirs := stdoutPath[:index]
679                         if subdirs != "" {
680                                 st, err := os.Stat(runner.HostOutputDir)
681                                 if err != nil {
682                                         return fmt.Errorf("While Stat on temp dir: %v", err)
683                                 }
684                                 stdoutPath := path.Join(runner.HostOutputDir, subdirs)
685                                 err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
686                                 if err != nil {
687                                         return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
688                                 }
689                         }
690                 }
691                 stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
692                 if err != nil {
693                         return fmt.Errorf("While creating stdout file: %v", err)
694                 }
695                 runner.Stdout = stdoutFile
696         } else {
697                 runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
698         }
699         runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
700
701         go runner.ProcessDockerAttach(response.Reader)
702
703         return nil
704 }
705
706 // CreateContainer creates the docker container.
707 func (runner *ContainerRunner) CreateContainer() error {
708         runner.CrunchLog.Print("Creating Docker container")
709
710         runner.ContainerConfig.Cmd = runner.Container.Command
711         if runner.Container.Cwd != "." {
712                 runner.ContainerConfig.WorkingDir = runner.Container.Cwd
713         }
714
715         for k, v := range runner.Container.Environment {
716                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
717         }
718
719         runner.HostConfig = dockercontainer.HostConfig{
720                 Binds:  runner.Binds,
721                 Cgroup: dockercontainer.CgroupSpec(runner.setCgroupParent),
722                 LogConfig: dockercontainer.LogConfig{
723                         Type: "none",
724                 },
725         }
726
727         if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
728                 tok, err := runner.ContainerToken()
729                 if err != nil {
730                         return err
731                 }
732                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env,
733                         "ARVADOS_API_TOKEN="+tok,
734                         "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
735                         "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
736                 )
737                 runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
738         } else {
739                 if runner.enableNetwork == "always" {
740                         runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
741                 } else {
742                         runner.HostConfig.NetworkMode = dockercontainer.NetworkMode("none")
743                 }
744         }
745
746         createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
747         if err != nil {
748                 return fmt.Errorf("While creating container: %v", err)
749         }
750
751         runner.ContainerID = createdBody.ID
752
753         return runner.AttachStreams()
754 }
755
756 // StartContainer starts the docker container created by CreateContainer.
757 func (runner *ContainerRunner) StartContainer() error {
758         runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
759         runner.cStateLock.Lock()
760         defer runner.cStateLock.Unlock()
761         if runner.cCancelled {
762                 return ErrCancelled
763         }
764         err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID,
765                 dockertypes.ContainerStartOptions{})
766         if err != nil {
767                 return fmt.Errorf("could not start container: %v", err)
768         }
769         runner.cStarted = true
770         return nil
771 }
772
773 // WaitFinish waits for the container to terminate, capture the exit code, and
774 // close the stdout/stderr logging.
775 func (runner *ContainerRunner) WaitFinish() error {
776         runner.CrunchLog.Print("Waiting for container to finish")
777
778         waitDocker, err := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID)
779         if err != nil {
780                 return fmt.Errorf("container wait: %v", err)
781         }
782
783         runner.CrunchLog.Printf("Container exited with code: %v", waitDocker)
784         code := int(waitDocker)
785         runner.ExitCode = &code
786
787         waitMount := runner.ArvMountExit
788         select {
789         case err := <-waitMount:
790                 runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
791                 waitMount = nil
792                 runner.stop()
793         default:
794         }
795
796         // wait for stdout/stderr to complete
797         <-runner.loggingDone
798
799         return nil
800 }
801
802 // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
803 func (runner *ContainerRunner) CaptureOutput() error {
804         if runner.finalState != "Complete" {
805                 return nil
806         }
807
808         if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
809                 // Output may have been set directly by the container, so
810                 // refresh the container record to check.
811                 err := runner.ArvClient.Get("containers", runner.Container.UUID,
812                         nil, &runner.Container)
813                 if err != nil {
814                         return err
815                 }
816                 if runner.Container.Output != "" {
817                         // Container output is already set.
818                         runner.OutputPDH = &runner.Container.Output
819                         return nil
820                 }
821         }
822
823         if runner.HostOutputDir == "" {
824                 return nil
825         }
826
827         _, err := os.Stat(runner.HostOutputDir)
828         if err != nil {
829                 return fmt.Errorf("While checking host output path: %v", err)
830         }
831
832         var manifestText string
833
834         collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
835         _, err = os.Stat(collectionMetafile)
836         if err != nil {
837                 // Regular directory
838                 cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
839                 manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
840                 if err != nil {
841                         return fmt.Errorf("While uploading output files: %v", err)
842                 }
843         } else {
844                 // FUSE mount directory
845                 file, openerr := os.Open(collectionMetafile)
846                 if openerr != nil {
847                         return fmt.Errorf("While opening FUSE metafile: %v", err)
848                 }
849                 defer file.Close()
850
851                 var rec arvados.Collection
852                 err = json.NewDecoder(file).Decode(&rec)
853                 if err != nil {
854                         return fmt.Errorf("While reading FUSE metafile: %v", err)
855                 }
856                 manifestText = rec.ManifestText
857         }
858
859         // Pre-populate output from the configured mount points
860         var binds []string
861         for bind, _ := range runner.Container.Mounts {
862                 binds = append(binds, bind)
863         }
864         sort.Strings(binds)
865
866         for _, bind := range binds {
867                 mnt := runner.Container.Mounts[bind]
868
869                 bindSuffix := strings.TrimPrefix(bind, runner.Container.OutputPath)
870
871                 if bindSuffix == bind || len(bindSuffix) <= 0 {
872                         // either does not start with OutputPath or is OutputPath itself
873                         continue
874                 }
875
876                 if mnt.ExcludeFromOutput == true {
877                         continue
878                 }
879
880                 // append to manifest_text
881                 m, err := runner.getCollectionManifestForPath(mnt, bindSuffix)
882                 if err != nil {
883                         return err
884                 }
885
886                 manifestText = manifestText + m
887         }
888
889         // Save output
890         var response arvados.Collection
891         manifest := manifest.Manifest{Text: manifestText}
892         manifestText = manifest.Extract(".", ".").Text
893         err = runner.ArvClient.Create("collections",
894                 arvadosclient.Dict{
895                         "ensure_unique_name": true,
896                         "collection": arvadosclient.Dict{
897                                 "is_trashed":    true,
898                                 "name":          "output for " + runner.Container.UUID,
899                                 "manifest_text": manifestText}},
900                 &response)
901         if err != nil {
902                 return fmt.Errorf("While creating output collection: %v", err)
903         }
904         runner.OutputPDH = &response.PortableDataHash
905         return nil
906 }
907
908 var outputCollections = make(map[string]arvados.Collection)
909
910 // Fetch the collection for the mnt.PortableDataHash
911 // Return the manifest_text fragment corresponding to the specified mnt.Path
912 //  after making any required updates.
913 //  Ex:
914 //    If mnt.Path is not specified,
915 //      return the entire manifest_text after replacing any "." with bindSuffix
916 //    If mnt.Path corresponds to one stream,
917 //      return the manifest_text for that stream after replacing that stream name with bindSuffix
918 //    Otherwise, check if a filename in any one stream is being sought. Return the manifest_text
919 //      for that stream after replacing stream name with bindSuffix minus the last word
920 //      and the file name with last word of the bindSuffix
921 //  Allowed path examples:
922 //    "path":"/"
923 //    "path":"/subdir1"
924 //    "path":"/subdir1/subdir2"
925 //    "path":"/subdir/filename" etc
926 func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, bindSuffix string) (string, error) {
927         collection := outputCollections[mnt.PortableDataHash]
928         if collection.PortableDataHash == "" {
929                 err := runner.ArvClient.Get("collections", mnt.PortableDataHash, nil, &collection)
930                 if err != nil {
931                         return "", fmt.Errorf("While getting collection for %v: %v", mnt.PortableDataHash, err)
932                 }
933                 outputCollections[mnt.PortableDataHash] = collection
934         }
935
936         if collection.ManifestText == "" {
937                 runner.CrunchLog.Printf("No manifest text for collection %v", collection.PortableDataHash)
938                 return "", nil
939         }
940
941         mft := manifest.Manifest{Text: collection.ManifestText}
942         extracted := mft.Extract(mnt.Path, bindSuffix)
943         if extracted.Err != nil {
944                 return "", fmt.Errorf("Error parsing manifest for %v: %v", mnt.PortableDataHash, extracted.Err.Error())
945         }
946         return extracted.Text, nil
947 }
948
949 func (runner *ContainerRunner) CleanupDirs() {
950         if runner.ArvMount != nil {
951                 umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
952                 umnterr := umount.Run()
953                 if umnterr != nil {
954                         runner.CrunchLog.Printf("While running fusermount: %v", umnterr)
955                 }
956
957                 mnterr := <-runner.ArvMountExit
958                 if mnterr != nil {
959                         runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
960                 }
961         }
962
963         for _, tmpdir := range runner.CleanupTempDir {
964                 rmerr := os.RemoveAll(tmpdir)
965                 if rmerr != nil {
966                         runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
967                 }
968         }
969 }
970
971 // CommitLogs posts the collection containing the final container logs.
972 func (runner *ContainerRunner) CommitLogs() error {
973         runner.CrunchLog.Print(runner.finalState)
974         runner.CrunchLog.Close()
975
976         // Closing CrunchLog above allows it to be committed to Keep at this
977         // point, but re-open crunch log with ArvClient in case there are any
978         // other further (such as failing to write the log to Keep!) while
979         // shutting down
980         runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID,
981                 "crunch-run", nil})
982
983         if runner.LogsPDH != nil {
984                 // If we have already assigned something to LogsPDH,
985                 // we must be closing the re-opened log, which won't
986                 // end up getting attached to the container record and
987                 // therefore doesn't need to be saved as a collection
988                 // -- it exists only to send logs to other channels.
989                 return nil
990         }
991
992         mt, err := runner.LogCollection.ManifestText()
993         if err != nil {
994                 return fmt.Errorf("While creating log manifest: %v", err)
995         }
996
997         var response arvados.Collection
998         err = runner.ArvClient.Create("collections",
999                 arvadosclient.Dict{
1000                         "ensure_unique_name": true,
1001                         "collection": arvadosclient.Dict{
1002                                 "is_trashed":    true,
1003                                 "name":          "logs for " + runner.Container.UUID,
1004                                 "manifest_text": mt}},
1005                 &response)
1006         if err != nil {
1007                 return fmt.Errorf("While creating log collection: %v", err)
1008         }
1009         runner.LogsPDH = &response.PortableDataHash
1010         return nil
1011 }
1012
1013 // UpdateContainerRunning updates the container state to "Running"
1014 func (runner *ContainerRunner) UpdateContainerRunning() error {
1015         runner.cStateLock.Lock()
1016         defer runner.cStateLock.Unlock()
1017         if runner.cCancelled {
1018                 return ErrCancelled
1019         }
1020         return runner.ArvClient.Update("containers", runner.Container.UUID,
1021                 arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
1022 }
1023
1024 // ContainerToken returns the api_token the container (and any
1025 // arv-mount processes) are allowed to use.
1026 func (runner *ContainerRunner) ContainerToken() (string, error) {
1027         if runner.token != "" {
1028                 return runner.token, nil
1029         }
1030
1031         var auth arvados.APIClientAuthorization
1032         err := runner.ArvClient.Call("GET", "containers", runner.Container.UUID, "auth", nil, &auth)
1033         if err != nil {
1034                 return "", err
1035         }
1036         runner.token = auth.APIToken
1037         return runner.token, nil
1038 }
1039
1040 // UpdateContainerComplete updates the container record state on API
1041 // server to "Complete" or "Cancelled"
1042 func (runner *ContainerRunner) UpdateContainerFinal() error {
1043         update := arvadosclient.Dict{}
1044         update["state"] = runner.finalState
1045         if runner.LogsPDH != nil {
1046                 update["log"] = *runner.LogsPDH
1047         }
1048         if runner.finalState == "Complete" {
1049                 if runner.ExitCode != nil {
1050                         update["exit_code"] = *runner.ExitCode
1051                 }
1052                 if runner.OutputPDH != nil {
1053                         update["output"] = *runner.OutputPDH
1054                 }
1055         }
1056         return runner.ArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
1057 }
1058
1059 // IsCancelled returns the value of Cancelled, with goroutine safety.
1060 func (runner *ContainerRunner) IsCancelled() bool {
1061         runner.cStateLock.Lock()
1062         defer runner.cStateLock.Unlock()
1063         return runner.cCancelled
1064 }
1065
1066 // NewArvLogWriter creates an ArvLogWriter
1067 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
1068         return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")}
1069 }
1070
1071 // Run the full container lifecycle.
1072 func (runner *ContainerRunner) Run() (err error) {
1073         runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
1074
1075         hostname, hosterr := os.Hostname()
1076         if hosterr != nil {
1077                 runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr)
1078         } else {
1079                 runner.CrunchLog.Printf("Executing on host '%s'", hostname)
1080         }
1081
1082         // Clean up temporary directories _after_ finalizing
1083         // everything (if we've made any by then)
1084         defer runner.CleanupDirs()
1085
1086         runner.finalState = "Queued"
1087
1088         defer func() {
1089                 // checkErr prints e (unless it's nil) and sets err to
1090                 // e (unless err is already non-nil). Thus, if err
1091                 // hasn't already been assigned when Run() returns,
1092                 // this cleanup func will cause Run() to return the
1093                 // first non-nil error that is passed to checkErr().
1094                 checkErr := func(e error) {
1095                         if e == nil {
1096                                 return
1097                         }
1098                         runner.CrunchLog.Print(e)
1099                         if err == nil {
1100                                 err = e
1101                         }
1102                 }
1103
1104                 // Log the error encountered in Run(), if any
1105                 checkErr(err)
1106
1107                 if runner.finalState == "Queued" {
1108                         runner.CrunchLog.Close()
1109                         runner.UpdateContainerFinal()
1110                         return
1111                 }
1112
1113                 if runner.IsCancelled() {
1114                         runner.finalState = "Cancelled"
1115                         // but don't return yet -- we still want to
1116                         // capture partial output and write logs
1117                 }
1118
1119                 checkErr(runner.CaptureOutput())
1120                 checkErr(runner.CommitLogs())
1121                 checkErr(runner.UpdateContainerFinal())
1122
1123                 // The real log is already closed, but then we opened
1124                 // a new one in case we needed to log anything while
1125                 // finalizing.
1126                 runner.CrunchLog.Close()
1127         }()
1128
1129         err = runner.ArvClient.Get("containers", runner.Container.UUID, nil, &runner.Container)
1130         if err != nil {
1131                 err = fmt.Errorf("While getting container record: %v", err)
1132                 return
1133         }
1134
1135         // setup signal handling
1136         runner.SetupSignals()
1137
1138         // check for and/or load image
1139         err = runner.LoadImage()
1140         if err != nil {
1141                 runner.finalState = "Cancelled"
1142                 err = fmt.Errorf("While loading container image: %v", err)
1143                 return
1144         }
1145
1146         // set up FUSE mount and binds
1147         err = runner.SetupMounts()
1148         if err != nil {
1149                 runner.finalState = "Cancelled"
1150                 err = fmt.Errorf("While setting up mounts: %v", err)
1151                 return
1152         }
1153
1154         err = runner.CreateContainer()
1155         if err != nil {
1156                 return
1157         }
1158
1159         // Gather and record node information
1160         err = runner.LogNodeInfo()
1161         if err != nil {
1162                 return
1163         }
1164         // Save container.json record on log collection
1165         err = runner.LogContainerRecord()
1166         if err != nil {
1167                 return
1168         }
1169
1170         runner.StartCrunchstat()
1171
1172         if runner.IsCancelled() {
1173                 return
1174         }
1175
1176         err = runner.UpdateContainerRunning()
1177         if err != nil {
1178                 return
1179         }
1180         runner.finalState = "Cancelled"
1181
1182         err = runner.StartContainer()
1183         if err != nil {
1184                 return
1185         }
1186
1187         err = runner.WaitFinish()
1188         if err == nil {
1189                 runner.finalState = "Complete"
1190         }
1191         return
1192 }
1193
1194 // NewContainerRunner creates a new container runner.
1195 func NewContainerRunner(api IArvadosClient,
1196         kc IKeepClient,
1197         docker ThinDockerClient,
1198         containerUUID string) *ContainerRunner {
1199
1200         cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
1201         cr.NewLogWriter = cr.NewArvLogWriter
1202         cr.RunArvMount = cr.ArvMountCmd
1203         cr.MkTempDir = ioutil.TempDir
1204         cr.LogCollection = &CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
1205         cr.Container.UUID = containerUUID
1206         cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
1207         cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
1208         return cr
1209 }
1210
1211 func main() {
1212         statInterval := flag.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
1213         cgroupRoot := flag.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
1214         cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
1215         cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
1216         caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
1217         enableNetwork := flag.String("container-enable-networking", "default",
1218                 `Specify if networking should be enabled for container.  One of 'default', 'always':
1219         default: only enable networking if container requests it.
1220         always:  containers always have networking enabled
1221         `)
1222         networkMode := flag.String("container-network-mode", "default",
1223                 `Set networking mode for container.  Corresponds to Docker network mode (--net).
1224         `)
1225         flag.Parse()
1226
1227         containerId := flag.Arg(0)
1228
1229         if *caCertsPath != "" {
1230                 arvadosclient.CertFiles = []string{*caCertsPath}
1231         }
1232
1233         api, err := arvadosclient.MakeArvadosClient()
1234         if err != nil {
1235                 log.Fatalf("%s: %v", containerId, err)
1236         }
1237         api.Retries = 8
1238
1239         var kc *keepclient.KeepClient
1240         kc, err = keepclient.MakeKeepClient(api)
1241         if err != nil {
1242                 log.Fatalf("%s: %v", containerId, err)
1243         }
1244         kc.Retries = 4
1245
1246         var docker *dockerclient.Client
1247         // API version 1.21 corresponds to Docker 1.9, which is currently the
1248         // minimum version we want to support.
1249         docker, err = dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
1250         if err != nil {
1251                 log.Fatalf("%s: %v", containerId, err)
1252         }
1253
1254         dockerClientProxy := ThinDockerClientProxy{Docker: docker}
1255
1256         cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
1257         cr.statInterval = *statInterval
1258         cr.cgroupRoot = *cgroupRoot
1259         cr.expectCgroupParent = *cgroupParent
1260         cr.enableNetwork = *enableNetwork
1261         cr.networkMode = *networkMode
1262         if *cgroupParentSubsystem != "" {
1263                 p := findCgroup(*cgroupParentSubsystem)
1264                 cr.setCgroupParent = p
1265                 cr.expectCgroupParent = p
1266         }
1267
1268         err = cr.Run()
1269         if err != nil {
1270                 log.Fatalf("%s: %v", containerId, err)
1271         }
1272
1273 }