10218: Added calls to "df -i" to record free i-nodes. Combined stdout and stderr...
[arvados.git] / services / crunch-run / crunchrun.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "flag"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "log"
11         "os"
12         "os/exec"
13         "os/signal"
14         "path"
15         "path/filepath"
16         "sort"
17         "strings"
18         "sync"
19         "syscall"
20         "time"
21
22         "git.curoverse.com/arvados.git/lib/crunchstat"
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
25         "git.curoverse.com/arvados.git/sdk/go/keepclient"
26         "git.curoverse.com/arvados.git/sdk/go/manifest"
27         "github.com/curoverse/dockerclient"
28 )
29
30 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
31 type IArvadosClient interface {
32         Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
33         Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
34         Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
35         Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error
36         CallRaw(method string, resourceType string, uuid string, action string, parameters arvadosclient.Dict) (reader io.ReadCloser, err error)
37         Discovery(key string) (interface{}, error)
38 }
39
40 // ErrCancelled is the error returned when the container is cancelled.
41 var ErrCancelled = errors.New("Cancelled")
42
43 // IKeepClient is the minimal Keep API methods used by crunch-run.
44 type IKeepClient interface {
45         PutHB(hash string, buf []byte) (string, int, error)
46         ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error)
47 }
48
49 // NewLogWriter is a factory function to create a new log writer.
50 type NewLogWriter func(name string) io.WriteCloser
51
52 type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
53
54 type MkTempDir func(string, string) (string, error)
55
56 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
57 type ThinDockerClient interface {
58         StopContainer(id string, timeout int) error
59         InspectImage(id string) (*dockerclient.ImageInfo, error)
60         LoadImage(reader io.Reader) error
61         CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
62         StartContainer(id string, config *dockerclient.HostConfig) error
63         AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error)
64         Wait(id string) <-chan dockerclient.WaitResult
65         RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
66 }
67
68 // ContainerRunner is the main stateful struct used for a single execution of a
69 // container.
70 type ContainerRunner struct {
71         Docker    ThinDockerClient
72         ArvClient IArvadosClient
73         Kc        IKeepClient
74         arvados.Container
75         dockerclient.ContainerConfig
76         dockerclient.HostConfig
77         token       string
78         ContainerID string
79         ExitCode    *int
80         NewLogWriter
81         loggingDone   chan bool
82         CrunchLog     *ThrottledLogger
83         Stdout        io.WriteCloser
84         Stderr        *ThrottledLogger
85         LogCollection *CollectionWriter
86         LogsPDH       *string
87         RunArvMount
88         MkTempDir
89         ArvMount       *exec.Cmd
90         ArvMountPoint  string
91         HostOutputDir  string
92         CleanupTempDir []string
93         Binds          []string
94         OutputPDH      *string
95         SigChan        chan os.Signal
96         ArvMountExit   chan error
97         finalState     string
98
99         statLogger   io.WriteCloser
100         statReporter *crunchstat.Reporter
101         statInterval time.Duration
102         cgroupRoot   string
103         // What we expect the container's cgroup parent to be.
104         expectCgroupParent string
105         // What we tell docker to use as the container's cgroup
106         // parent. Note: Ideally we would use the same field for both
107         // expectCgroupParent and setCgroupParent, and just make it
108         // default to "docker". However, when using docker < 1.10 with
109         // systemd, specifying a non-empty cgroup parent (even the
110         // default value "docker") hits a docker bug
111         // (https://github.com/docker/docker/issues/17126). Using two
112         // separate fields makes it possible to use the "expect cgroup
113         // parent to be X" feature even on sites where the "specify
114         // cgroup parent" feature breaks.
115         setCgroupParent string
116
117         cStateLock sync.Mutex
118         cStarted   bool // StartContainer() succeeded
119         cCancelled bool // StopContainer() invoked
120 }
121
122 // SetupSignals sets up signal handling to gracefully terminate the underlying
123 // Docker container and update state when receiving a TERM, INT or QUIT signal.
124 func (runner *ContainerRunner) SetupSignals() {
125         runner.SigChan = make(chan os.Signal, 1)
126         signal.Notify(runner.SigChan, syscall.SIGTERM)
127         signal.Notify(runner.SigChan, syscall.SIGINT)
128         signal.Notify(runner.SigChan, syscall.SIGQUIT)
129
130         go func(sig chan os.Signal) {
131                 <-sig
132                 runner.stop()
133                 signal.Stop(sig)
134         }(runner.SigChan)
135 }
136
137 // stop the underlying Docker container.
138 func (runner *ContainerRunner) stop() {
139         runner.cStateLock.Lock()
140         defer runner.cStateLock.Unlock()
141         if runner.cCancelled {
142                 return
143         }
144         runner.cCancelled = true
145         if runner.cStarted {
146                 err := runner.Docker.StopContainer(runner.ContainerID, 10)
147                 if err != nil {
148                         log.Printf("StopContainer failed: %s", err)
149                 }
150         }
151 }
152
153 // LoadImage determines the docker image id from the container record and
154 // checks if it is available in the local Docker image store.  If not, it loads
155 // the image from Keep.
156 func (runner *ContainerRunner) LoadImage() (err error) {
157
158         runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
159
160         var collection arvados.Collection
161         err = runner.ArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
162         if err != nil {
163                 return fmt.Errorf("While getting container image collection: %v", err)
164         }
165         manifest := manifest.Manifest{Text: collection.ManifestText}
166         var img, imageID string
167         for ms := range manifest.StreamIter() {
168                 img = ms.FileStreamSegments[0].Name
169                 if !strings.HasSuffix(img, ".tar") {
170                         return fmt.Errorf("First file in the container image collection does not end in .tar")
171                 }
172                 imageID = img[:len(img)-4]
173         }
174
175         runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
176
177         _, err = runner.Docker.InspectImage(imageID)
178         if err != nil {
179                 runner.CrunchLog.Print("Loading Docker image from keep")
180
181                 var readCloser io.ReadCloser
182                 readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
183                 if err != nil {
184                         return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
185                 }
186
187                 err = runner.Docker.LoadImage(readCloser)
188                 if err != nil {
189                         return fmt.Errorf("While loading container image into Docker: %v", err)
190                 }
191         } else {
192                 runner.CrunchLog.Print("Docker image is available")
193         }
194
195         runner.ContainerConfig.Image = imageID
196
197         return nil
198 }
199
200 func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) {
201         c = exec.Command("arv-mount", arvMountCmd...)
202
203         // Copy our environment, but override ARVADOS_API_TOKEN with
204         // the container auth token.
205         c.Env = nil
206         for _, s := range os.Environ() {
207                 if !strings.HasPrefix(s, "ARVADOS_API_TOKEN=") {
208                         c.Env = append(c.Env, s)
209                 }
210         }
211         c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
212
213         nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
214         c.Stdout = nt
215         c.Stderr = nt
216
217         err = c.Start()
218         if err != nil {
219                 return nil, err
220         }
221
222         statReadme := make(chan bool)
223         runner.ArvMountExit = make(chan error)
224
225         keepStatting := true
226         go func() {
227                 for keepStatting {
228                         time.Sleep(100 * time.Millisecond)
229                         _, err = os.Stat(fmt.Sprintf("%s/by_id/README", runner.ArvMountPoint))
230                         if err == nil {
231                                 keepStatting = false
232                                 statReadme <- true
233                         }
234                 }
235                 close(statReadme)
236         }()
237
238         go func() {
239                 runner.ArvMountExit <- c.Wait()
240                 close(runner.ArvMountExit)
241         }()
242
243         select {
244         case <-statReadme:
245                 break
246         case err := <-runner.ArvMountExit:
247                 runner.ArvMount = nil
248                 keepStatting = false
249                 return nil, err
250         }
251
252         return c, nil
253 }
254
255 func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) {
256         if runner.ArvMountPoint == "" {
257                 runner.ArvMountPoint, err = runner.MkTempDir("", prefix)
258         }
259         return
260 }
261
262 func (runner *ContainerRunner) SetupMounts() (err error) {
263         err = runner.SetupArvMountPoint("keep")
264         if err != nil {
265                 return fmt.Errorf("While creating keep mount temp dir: %v", err)
266         }
267
268         runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
269
270         pdhOnly := true
271         tmpcount := 0
272         arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
273
274         if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
275                 arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
276         }
277
278         collectionPaths := []string{}
279         runner.Binds = nil
280         needCertMount := true
281
282         var binds []string
283         for bind, _ := range runner.Container.Mounts {
284                 binds = append(binds, bind)
285         }
286         sort.Strings(binds)
287
288         for _, bind := range binds {
289                 mnt := runner.Container.Mounts[bind]
290                 if bind == "stdout" {
291                         // Is it a "file" mount kind?
292                         if mnt.Kind != "file" {
293                                 return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
294                         }
295
296                         // Does path start with OutputPath?
297                         prefix := runner.Container.OutputPath
298                         if !strings.HasSuffix(prefix, "/") {
299                                 prefix += "/"
300                         }
301                         if !strings.HasPrefix(mnt.Path, prefix) {
302                                 return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
303                         }
304                 }
305
306                 if bind == "/etc/arvados/ca-certificates.crt" {
307                         needCertMount = false
308                 }
309
310                 if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && bind != runner.Container.OutputPath+"/" {
311                         if mnt.Kind != "collection" {
312                                 return fmt.Errorf("Only mount points of kind 'collection' are supported underneath the output_path: %v", bind)
313                         }
314                 }
315
316                 switch {
317                 case mnt.Kind == "collection":
318                         var src string
319                         if mnt.UUID != "" && mnt.PortableDataHash != "" {
320                                 return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
321                         }
322                         if mnt.UUID != "" {
323                                 if mnt.Writable {
324                                         return fmt.Errorf("Writing to existing collections currently not permitted.")
325                                 }
326                                 pdhOnly = false
327                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
328                         } else if mnt.PortableDataHash != "" {
329                                 if mnt.Writable {
330                                         return fmt.Errorf("Can never write to a collection specified by portable data hash")
331                                 }
332                                 idx := strings.Index(mnt.PortableDataHash, "/")
333                                 if idx > 0 {
334                                         mnt.Path = path.Clean(mnt.PortableDataHash[idx:])
335                                         mnt.PortableDataHash = mnt.PortableDataHash[0:idx]
336                                         runner.Container.Mounts[bind] = mnt
337                                 }
338                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
339                                 if mnt.Path != "" && mnt.Path != "." {
340                                         if strings.HasPrefix(mnt.Path, "./") {
341                                                 mnt.Path = mnt.Path[2:]
342                                         } else if strings.HasPrefix(mnt.Path, "/") {
343                                                 mnt.Path = mnt.Path[1:]
344                                         }
345                                         src += "/" + mnt.Path
346                                 }
347                         } else {
348                                 src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
349                                 arvMountCmd = append(arvMountCmd, "--mount-tmp")
350                                 arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
351                                 tmpcount += 1
352                         }
353                         if mnt.Writable {
354                                 if bind == runner.Container.OutputPath {
355                                         runner.HostOutputDir = src
356                                 } else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
357                                         return fmt.Errorf("Writable mount points are not permitted underneath the output_path: %v", bind)
358                                 }
359                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
360                         } else {
361                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
362                         }
363                         collectionPaths = append(collectionPaths, src)
364
365                 case mnt.Kind == "tmp" && bind == runner.Container.OutputPath:
366                         runner.HostOutputDir, err = runner.MkTempDir("", "")
367                         if err != nil {
368                                 return fmt.Errorf("While creating mount temp dir: %v", err)
369                         }
370                         st, staterr := os.Stat(runner.HostOutputDir)
371                         if staterr != nil {
372                                 return fmt.Errorf("While Stat on temp dir: %v", staterr)
373                         }
374                         err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
375                         if staterr != nil {
376                                 return fmt.Errorf("While Chmod temp dir: %v", err)
377                         }
378                         runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
379                         runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
380
381                 case mnt.Kind == "tmp":
382                         runner.Binds = append(runner.Binds, bind)
383
384                 case mnt.Kind == "json":
385                         jsondata, err := json.Marshal(mnt.Content)
386                         if err != nil {
387                                 return fmt.Errorf("encoding json data: %v", err)
388                         }
389                         // Create a tempdir with a single file
390                         // (instead of just a tempfile): this way we
391                         // can ensure the file is world-readable
392                         // inside the container, without having to
393                         // make it world-readable on the docker host.
394                         tmpdir, err := runner.MkTempDir("", "")
395                         if err != nil {
396                                 return fmt.Errorf("creating temp dir: %v", err)
397                         }
398                         runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
399                         tmpfn := filepath.Join(tmpdir, "mountdata.json")
400                         err = ioutil.WriteFile(tmpfn, jsondata, 0644)
401                         if err != nil {
402                                 return fmt.Errorf("writing temp file: %v", err)
403                         }
404                         runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind))
405                 }
406         }
407
408         if runner.HostOutputDir == "" {
409                 return fmt.Errorf("Output path does not correspond to a writable mount point")
410         }
411
412         if wantAPI := runner.Container.RuntimeConstraints.API; needCertMount && wantAPI != nil && *wantAPI {
413                 for _, certfile := range arvadosclient.CertFiles {
414                         _, err := os.Stat(certfile)
415                         if err == nil {
416                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:/etc/arvados/ca-certificates.crt:ro", certfile))
417                                 break
418                         }
419                 }
420         }
421
422         if pdhOnly {
423                 arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
424         } else {
425                 arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
426         }
427         arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
428
429         token, err := runner.ContainerToken()
430         if err != nil {
431                 return fmt.Errorf("could not get container token: %s", err)
432         }
433
434         runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
435         if err != nil {
436                 return fmt.Errorf("While trying to start arv-mount: %v", err)
437         }
438
439         for _, p := range collectionPaths {
440                 _, err = os.Stat(p)
441                 if err != nil {
442                         return fmt.Errorf("While checking that input files exist: %v", err)
443                 }
444         }
445
446         return nil
447 }
448
449 func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
450         // Handle docker log protocol
451         // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
452
453         header := make([]byte, 8)
454         for {
455                 _, readerr := io.ReadAtLeast(containerReader, header, 8)
456
457                 if readerr == nil {
458                         readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
459                         if header[0] == 1 {
460                                 // stdout
461                                 _, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
462                         } else {
463                                 // stderr
464                                 _, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
465                         }
466                 }
467
468                 if readerr != nil {
469                         if readerr != io.EOF {
470                                 runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
471                         }
472
473                         closeerr := runner.Stdout.Close()
474                         if closeerr != nil {
475                                 runner.CrunchLog.Printf("While closing stdout logs: %v", closeerr)
476                         }
477
478                         closeerr = runner.Stderr.Close()
479                         if closeerr != nil {
480                                 runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
481                         }
482
483                         if runner.statReporter != nil {
484                                 runner.statReporter.Stop()
485                                 closeerr = runner.statLogger.Close()
486                                 if closeerr != nil {
487                                         runner.CrunchLog.Printf("While closing crunchstat logs: %v", closeerr)
488                                 }
489                         }
490
491                         runner.loggingDone <- true
492                         close(runner.loggingDone)
493                         return
494                 }
495         }
496 }
497
498 func (runner *ContainerRunner) StartCrunchstat() {
499         runner.statLogger = NewThrottledLogger(runner.NewLogWriter("crunchstat"))
500         runner.statReporter = &crunchstat.Reporter{
501                 CID:          runner.ContainerID,
502                 Logger:       log.New(runner.statLogger, "", 0),
503                 CgroupParent: runner.expectCgroupParent,
504                 CgroupRoot:   runner.cgroupRoot,
505                 PollPeriod:   runner.statInterval,
506         }
507         runner.statReporter.Start()
508 }
509
510 type infoCommand struct {
511         label string
512         cmd   []string
513 }
514
515 // Gather node information and store it on the log for debugging
516 // purposes.
517 func (runner *ContainerRunner) LogNodeInfo() (err error) {
518         w := runner.NewLogWriter("node-info")
519         logger := log.New(w, "node-info", 0)
520
521         commands := []infoCommand{
522                 infoCommand{
523                         label: "Host Information",
524                         cmd:   []string{"uname", "-a"},
525                 },
526                 infoCommand{
527                         label: "CPU Information",
528                         cmd:   []string{"cat", "/proc/cpuinfo"},
529                 },
530                 infoCommand{
531                         label: "Memory Information",
532                         cmd:   []string{"cat", "/proc/meminfo"},
533                 },
534                 infoCommand{
535                         label: "Disk Space",
536                         cmd:   []string{"df", "-m", "/"},
537                 },
538                 infoCommand{
539                         label: "Disk Space",
540                         cmd:   []string{"df", "-m", os.TempDir()},
541                 },
542                 infoCommand{
543                         label: "Disk INodes",
544                         cmd:   []string{"df", "-i", "/"},
545                 },
546                 infoCommand{
547                         label: "Disk INodes",
548                         cmd:   []string{"df", "-i", os.TempDir()},
549                 },
550         }
551
552         // Run commands with informational output to be logged.
553         var out []byte
554         for _, command := range commands {
555                 out, err = exec.Command(command.cmd[0], command.cmd[1:]...).CombinedOutput()
556                 if err != nil {
557                         return fmt.Errorf("While running command %q: %v",
558                                 command.cmd, err)
559                 }
560                 logger.Println(command.label)
561                 for _, line := range strings.Split(string(out), "\n") {
562                         logger.Println(" ", line)
563                 }
564         }
565
566         err = w.Close()
567         if err != nil {
568                 return fmt.Errorf("While closing node-info logs: %v", err)
569         }
570         return nil
571 }
572
573 // Get and save the raw JSON container record from the API server
574 func (runner *ContainerRunner) LogContainerRecord() (err error) {
575         w := &ArvLogWriter{
576                 runner.ArvClient,
577                 runner.Container.UUID,
578                 "container",
579                 runner.LogCollection.Open("container.json"),
580         }
581         // Get Container record JSON from the API Server
582         reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
583         if err != nil {
584                 return fmt.Errorf("While retrieving container record from the API server: %v", err)
585         }
586         // Read the API server response as []byte
587         json_bytes, err := ioutil.ReadAll(reader)
588         if err != nil {
589                 return fmt.Errorf("While reading container record API server response: %v", err)
590         }
591         // Decode the JSON []byte
592         var cr map[string]interface{}
593         if err = json.Unmarshal(json_bytes, &cr); err != nil {
594                 return fmt.Errorf("While decoding the container record JSON response: %v", err)
595         }
596         // Re-encode it using indentation to improve readability
597         enc := json.NewEncoder(w)
598         enc.SetIndent("", "    ")
599         if err = enc.Encode(cr); err != nil {
600                 return fmt.Errorf("While logging the JSON container record: %v", err)
601         }
602         err = w.Close()
603         if err != nil {
604                 return fmt.Errorf("While closing container.json log: %v", err)
605         }
606         return nil
607 }
608
609 // AttachLogs connects the docker container stdout and stderr logs to the
610 // Arvados logger which logs to Keep and the API server logs table.
611 func (runner *ContainerRunner) AttachStreams() (err error) {
612
613         runner.CrunchLog.Print("Attaching container streams")
614
615         var containerReader io.Reader
616         containerReader, err = runner.Docker.AttachContainer(runner.ContainerID,
617                 &dockerclient.AttachOptions{Stream: true, Stdout: true, Stderr: true})
618         if err != nil {
619                 return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
620         }
621
622         runner.loggingDone = make(chan bool)
623
624         if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
625                 stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):]
626                 index := strings.LastIndex(stdoutPath, "/")
627                 if index > 0 {
628                         subdirs := stdoutPath[:index]
629                         if subdirs != "" {
630                                 st, err := os.Stat(runner.HostOutputDir)
631                                 if err != nil {
632                                         return fmt.Errorf("While Stat on temp dir: %v", err)
633                                 }
634                                 stdoutPath := path.Join(runner.HostOutputDir, subdirs)
635                                 err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
636                                 if err != nil {
637                                         return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
638                                 }
639                         }
640                 }
641                 stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
642                 if err != nil {
643                         return fmt.Errorf("While creating stdout file: %v", err)
644                 }
645                 runner.Stdout = stdoutFile
646         } else {
647                 runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
648         }
649         runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
650
651         go runner.ProcessDockerAttach(containerReader)
652
653         return nil
654 }
655
656 // CreateContainer creates the docker container.
657 func (runner *ContainerRunner) CreateContainer() error {
658         runner.CrunchLog.Print("Creating Docker container")
659
660         runner.ContainerConfig.Cmd = runner.Container.Command
661         if runner.Container.Cwd != "." {
662                 runner.ContainerConfig.WorkingDir = runner.Container.Cwd
663         }
664
665         for k, v := range runner.Container.Environment {
666                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
667         }
668         if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
669                 tok, err := runner.ContainerToken()
670                 if err != nil {
671                         return err
672                 }
673                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env,
674                         "ARVADOS_API_TOKEN="+tok,
675                         "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
676                         "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
677                 )
678                 runner.ContainerConfig.NetworkDisabled = false
679         } else {
680                 runner.ContainerConfig.NetworkDisabled = true
681         }
682
683         var err error
684         runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
685         if err != nil {
686                 return fmt.Errorf("While creating container: %v", err)
687         }
688
689         runner.HostConfig = dockerclient.HostConfig{
690                 Binds:        runner.Binds,
691                 CgroupParent: runner.setCgroupParent,
692                 LogConfig: dockerclient.LogConfig{
693                         Type: "none",
694                 },
695         }
696
697         return runner.AttachStreams()
698 }
699
700 // StartContainer starts the docker container created by CreateContainer.
701 func (runner *ContainerRunner) StartContainer() error {
702         runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
703         runner.cStateLock.Lock()
704         defer runner.cStateLock.Unlock()
705         if runner.cCancelled {
706                 return ErrCancelled
707         }
708         err := runner.Docker.StartContainer(runner.ContainerID, &runner.HostConfig)
709         if err != nil {
710                 return fmt.Errorf("could not start container: %v", err)
711         }
712         runner.cStarted = true
713         return nil
714 }
715
716 // WaitFinish waits for the container to terminate, capture the exit code, and
717 // close the stdout/stderr logging.
718 func (runner *ContainerRunner) WaitFinish() error {
719         runner.CrunchLog.Print("Waiting for container to finish")
720
721         waitDocker := runner.Docker.Wait(runner.ContainerID)
722         waitMount := runner.ArvMountExit
723         for waitDocker != nil {
724                 select {
725                 case err := <-waitMount:
726                         runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
727                         waitMount = nil
728                         runner.stop()
729                 case wr := <-waitDocker:
730                         if wr.Error != nil {
731                                 return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
732                         }
733                         runner.ExitCode = &wr.ExitCode
734                         waitDocker = nil
735                 }
736         }
737
738         // wait for stdout/stderr to complete
739         <-runner.loggingDone
740
741         return nil
742 }
743
744 // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
745 func (runner *ContainerRunner) CaptureOutput() error {
746         if runner.finalState != "Complete" {
747                 return nil
748         }
749
750         if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
751                 // Output may have been set directly by the container, so
752                 // refresh the container record to check.
753                 err := runner.ArvClient.Get("containers", runner.Container.UUID,
754                         nil, &runner.Container)
755                 if err != nil {
756                         return err
757                 }
758                 if runner.Container.Output != "" {
759                         // Container output is already set.
760                         runner.OutputPDH = &runner.Container.Output
761                         return nil
762                 }
763         }
764
765         if runner.HostOutputDir == "" {
766                 return nil
767         }
768
769         _, err := os.Stat(runner.HostOutputDir)
770         if err != nil {
771                 return fmt.Errorf("While checking host output path: %v", err)
772         }
773
774         var manifestText string
775
776         collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
777         _, err = os.Stat(collectionMetafile)
778         if err != nil {
779                 // Regular directory
780                 cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
781                 manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
782                 if err != nil {
783                         return fmt.Errorf("While uploading output files: %v", err)
784                 }
785         } else {
786                 // FUSE mount directory
787                 file, openerr := os.Open(collectionMetafile)
788                 if openerr != nil {
789                         return fmt.Errorf("While opening FUSE metafile: %v", err)
790                 }
791                 defer file.Close()
792
793                 var rec arvados.Collection
794                 err = json.NewDecoder(file).Decode(&rec)
795                 if err != nil {
796                         return fmt.Errorf("While reading FUSE metafile: %v", err)
797                 }
798                 manifestText = rec.ManifestText
799         }
800
801         // Pre-populate output from the configured mount points
802         var binds []string
803         for bind, _ := range runner.Container.Mounts {
804                 binds = append(binds, bind)
805         }
806         sort.Strings(binds)
807
808         for _, bind := range binds {
809                 mnt := runner.Container.Mounts[bind]
810
811                 bindSuffix := strings.TrimPrefix(bind, runner.Container.OutputPath)
812
813                 if bindSuffix == bind || len(bindSuffix) <= 0 {
814                         // either does not start with OutputPath or is OutputPath itself
815                         continue
816                 }
817
818                 if mnt.ExcludeFromOutput == true {
819                         continue
820                 }
821
822                 // append to manifest_text
823                 m, err := runner.getCollectionManifestForPath(mnt, bindSuffix)
824                 if err != nil {
825                         return err
826                 }
827
828                 manifestText = manifestText + m
829         }
830
831         // Save output
832         var response arvados.Collection
833         manifest := manifest.Manifest{Text: manifestText}
834         manifestText = manifest.Extract(".", ".").Text
835         err = runner.ArvClient.Create("collections",
836                 arvadosclient.Dict{
837                         "ensure_unique_name": true,
838                         "collection": arvadosclient.Dict{
839                                 "is_trashed":    true,
840                                 "name":          "output for " + runner.Container.UUID,
841                                 "manifest_text": manifestText}},
842                 &response)
843         if err != nil {
844                 return fmt.Errorf("While creating output collection: %v", err)
845         }
846         runner.OutputPDH = &response.PortableDataHash
847         return nil
848 }
849
850 var outputCollections = make(map[string]arvados.Collection)
851
852 // Fetch the collection for the mnt.PortableDataHash
853 // Return the manifest_text fragment corresponding to the specified mnt.Path
854 //  after making any required updates.
855 //  Ex:
856 //    If mnt.Path is not specified,
857 //      return the entire manifest_text after replacing any "." with bindSuffix
858 //    If mnt.Path corresponds to one stream,
859 //      return the manifest_text for that stream after replacing that stream name with bindSuffix
860 //    Otherwise, check if a filename in any one stream is being sought. Return the manifest_text
861 //      for that stream after replacing stream name with bindSuffix minus the last word
862 //      and the file name with last word of the bindSuffix
863 //  Allowed path examples:
864 //    "path":"/"
865 //    "path":"/subdir1"
866 //    "path":"/subdir1/subdir2"
867 //    "path":"/subdir/filename" etc
868 func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, bindSuffix string) (string, error) {
869         collection := outputCollections[mnt.PortableDataHash]
870         if collection.PortableDataHash == "" {
871                 err := runner.ArvClient.Get("collections", mnt.PortableDataHash, nil, &collection)
872                 if err != nil {
873                         return "", fmt.Errorf("While getting collection for %v: %v", mnt.PortableDataHash, err)
874                 }
875                 outputCollections[mnt.PortableDataHash] = collection
876         }
877
878         if collection.ManifestText == "" {
879                 runner.CrunchLog.Printf("No manifest text for collection %v", collection.PortableDataHash)
880                 return "", nil
881         }
882
883         mft := manifest.Manifest{Text: collection.ManifestText}
884         extracted := mft.Extract(mnt.Path, bindSuffix)
885         if extracted.Err != nil {
886                 return "", fmt.Errorf("Error parsing manifest for %v: %v", mnt.PortableDataHash, extracted.Err.Error())
887         }
888         return extracted.Text, nil
889 }
890
891 func (runner *ContainerRunner) CleanupDirs() {
892         if runner.ArvMount != nil {
893                 umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
894                 umnterr := umount.Run()
895                 if umnterr != nil {
896                         runner.CrunchLog.Printf("While running fusermount: %v", umnterr)
897                 }
898
899                 mnterr := <-runner.ArvMountExit
900                 if mnterr != nil {
901                         runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
902                 }
903         }
904
905         for _, tmpdir := range runner.CleanupTempDir {
906                 rmerr := os.RemoveAll(tmpdir)
907                 if rmerr != nil {
908                         runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
909                 }
910         }
911 }
912
913 // CommitLogs posts the collection containing the final container logs.
914 func (runner *ContainerRunner) CommitLogs() error {
915         runner.CrunchLog.Print(runner.finalState)
916         runner.CrunchLog.Close()
917
918         // Closing CrunchLog above allows it to be committed to Keep at this
919         // point, but re-open crunch log with ArvClient in case there are any
920         // other further (such as failing to write the log to Keep!) while
921         // shutting down
922         runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID,
923                 "crunch-run", nil})
924
925         if runner.LogsPDH != nil {
926                 // If we have already assigned something to LogsPDH,
927                 // we must be closing the re-opened log, which won't
928                 // end up getting attached to the container record and
929                 // therefore doesn't need to be saved as a collection
930                 // -- it exists only to send logs to other channels.
931                 return nil
932         }
933
934         mt, err := runner.LogCollection.ManifestText()
935         if err != nil {
936                 return fmt.Errorf("While creating log manifest: %v", err)
937         }
938
939         var response arvados.Collection
940         err = runner.ArvClient.Create("collections",
941                 arvadosclient.Dict{
942                         "ensure_unique_name": true,
943                         "collection": arvadosclient.Dict{
944                                 "is_trashed":    true,
945                                 "name":          "logs for " + runner.Container.UUID,
946                                 "manifest_text": mt}},
947                 &response)
948         if err != nil {
949                 return fmt.Errorf("While creating log collection: %v", err)
950         }
951         runner.LogsPDH = &response.PortableDataHash
952         return nil
953 }
954
955 // UpdateContainerRunning updates the container state to "Running"
956 func (runner *ContainerRunner) UpdateContainerRunning() error {
957         runner.cStateLock.Lock()
958         defer runner.cStateLock.Unlock()
959         if runner.cCancelled {
960                 return ErrCancelled
961         }
962         return runner.ArvClient.Update("containers", runner.Container.UUID,
963                 arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
964 }
965
966 // ContainerToken returns the api_token the container (and any
967 // arv-mount processes) are allowed to use.
968 func (runner *ContainerRunner) ContainerToken() (string, error) {
969         if runner.token != "" {
970                 return runner.token, nil
971         }
972
973         var auth arvados.APIClientAuthorization
974         err := runner.ArvClient.Call("GET", "containers", runner.Container.UUID, "auth", nil, &auth)
975         if err != nil {
976                 return "", err
977         }
978         runner.token = auth.APIToken
979         return runner.token, nil
980 }
981
982 // UpdateContainerComplete updates the container record state on API
983 // server to "Complete" or "Cancelled"
984 func (runner *ContainerRunner) UpdateContainerFinal() error {
985         update := arvadosclient.Dict{}
986         update["state"] = runner.finalState
987         if runner.LogsPDH != nil {
988                 update["log"] = *runner.LogsPDH
989         }
990         if runner.finalState == "Complete" {
991                 if runner.ExitCode != nil {
992                         update["exit_code"] = *runner.ExitCode
993                 }
994                 if runner.OutputPDH != nil {
995                         update["output"] = *runner.OutputPDH
996                 }
997         }
998         return runner.ArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
999 }
1000
1001 // IsCancelled returns the value of Cancelled, with goroutine safety.
1002 func (runner *ContainerRunner) IsCancelled() bool {
1003         runner.cStateLock.Lock()
1004         defer runner.cStateLock.Unlock()
1005         return runner.cCancelled
1006 }
1007
1008 // NewArvLogWriter creates an ArvLogWriter
1009 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
1010         return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")}
1011 }
1012
1013 // Run the full container lifecycle.
1014 func (runner *ContainerRunner) Run() (err error) {
1015         runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
1016
1017         hostname, hosterr := os.Hostname()
1018         if hosterr != nil {
1019                 runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr)
1020         } else {
1021                 runner.CrunchLog.Printf("Executing on host '%s'", hostname)
1022         }
1023
1024         // Clean up temporary directories _after_ finalizing
1025         // everything (if we've made any by then)
1026         defer runner.CleanupDirs()
1027
1028         runner.finalState = "Queued"
1029
1030         defer func() {
1031                 // checkErr prints e (unless it's nil) and sets err to
1032                 // e (unless err is already non-nil). Thus, if err
1033                 // hasn't already been assigned when Run() returns,
1034                 // this cleanup func will cause Run() to return the
1035                 // first non-nil error that is passed to checkErr().
1036                 checkErr := func(e error) {
1037                         if e == nil {
1038                                 return
1039                         }
1040                         runner.CrunchLog.Print(e)
1041                         if err == nil {
1042                                 err = e
1043                         }
1044                 }
1045
1046                 // Log the error encountered in Run(), if any
1047                 checkErr(err)
1048
1049                 if runner.finalState == "Queued" {
1050                         runner.CrunchLog.Close()
1051                         runner.UpdateContainerFinal()
1052                         return
1053                 }
1054
1055                 if runner.IsCancelled() {
1056                         runner.finalState = "Cancelled"
1057                         // but don't return yet -- we still want to
1058                         // capture partial output and write logs
1059                 }
1060
1061                 checkErr(runner.CaptureOutput())
1062                 checkErr(runner.CommitLogs())
1063                 checkErr(runner.UpdateContainerFinal())
1064
1065                 // The real log is already closed, but then we opened
1066                 // a new one in case we needed to log anything while
1067                 // finalizing.
1068                 runner.CrunchLog.Close()
1069         }()
1070
1071         err = runner.ArvClient.Get("containers", runner.Container.UUID, nil, &runner.Container)
1072         if err != nil {
1073                 err = fmt.Errorf("While getting container record: %v", err)
1074                 return
1075         }
1076
1077         // setup signal handling
1078         runner.SetupSignals()
1079
1080         // check for and/or load image
1081         err = runner.LoadImage()
1082         if err != nil {
1083                 runner.finalState = "Cancelled"
1084                 err = fmt.Errorf("While loading container image: %v", err)
1085                 return
1086         }
1087
1088         // set up FUSE mount and binds
1089         err = runner.SetupMounts()
1090         if err != nil {
1091                 runner.finalState = "Cancelled"
1092                 err = fmt.Errorf("While setting up mounts: %v", err)
1093                 return
1094         }
1095
1096         err = runner.CreateContainer()
1097         if err != nil {
1098                 return
1099         }
1100
1101         // Gather and record node information
1102         err = runner.LogNodeInfo()
1103         if err != nil {
1104                 return
1105         }
1106         // Save container.json record on log collection
1107         err = runner.LogContainerRecord()
1108         if err != nil {
1109                 return
1110         }
1111
1112         runner.StartCrunchstat()
1113
1114         if runner.IsCancelled() {
1115                 return
1116         }
1117
1118         err = runner.UpdateContainerRunning()
1119         if err != nil {
1120                 return
1121         }
1122         runner.finalState = "Cancelled"
1123
1124         err = runner.StartContainer()
1125         if err != nil {
1126                 return
1127         }
1128
1129         err = runner.WaitFinish()
1130         if err == nil {
1131                 runner.finalState = "Complete"
1132         }
1133         return
1134 }
1135
1136 // NewContainerRunner creates a new container runner.
1137 func NewContainerRunner(api IArvadosClient,
1138         kc IKeepClient,
1139         docker ThinDockerClient,
1140         containerUUID string) *ContainerRunner {
1141
1142         cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
1143         cr.NewLogWriter = cr.NewArvLogWriter
1144         cr.RunArvMount = cr.ArvMountCmd
1145         cr.MkTempDir = ioutil.TempDir
1146         cr.LogCollection = &CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
1147         cr.Container.UUID = containerUUID
1148         cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
1149         cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
1150         return cr
1151 }
1152
1153 func main() {
1154         statInterval := flag.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
1155         cgroupRoot := flag.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
1156         cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
1157         cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
1158         caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
1159         flag.Parse()
1160
1161         containerId := flag.Arg(0)
1162
1163         if *caCertsPath != "" {
1164                 arvadosclient.CertFiles = []string{*caCertsPath}
1165         }
1166
1167         api, err := arvadosclient.MakeArvadosClient()
1168         if err != nil {
1169                 log.Fatalf("%s: %v", containerId, err)
1170         }
1171         api.Retries = 8
1172
1173         var kc *keepclient.KeepClient
1174         kc, err = keepclient.MakeKeepClient(api)
1175         if err != nil {
1176                 log.Fatalf("%s: %v", containerId, err)
1177         }
1178         kc.Retries = 4
1179
1180         var docker *dockerclient.DockerClient
1181         docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
1182         if err != nil {
1183                 log.Fatalf("%s: %v", containerId, err)
1184         }
1185
1186         cr := NewContainerRunner(api, kc, docker, containerId)
1187         cr.statInterval = *statInterval
1188         cr.cgroupRoot = *cgroupRoot
1189         cr.expectCgroupParent = *cgroupParent
1190         if *cgroupParentSubsystem != "" {
1191                 p := findCgroup(*cgroupParentSubsystem)
1192                 cr.setCgroupParent = p
1193                 cr.expectCgroupParent = p
1194         }
1195
1196         err = cr.Run()
1197         if err != nil {
1198                 log.Fatalf("%s: %v", containerId, err)
1199         }
1200
1201 }