9397: more testing
[arvados.git] / services / crunch-run / crunchrun.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/lib/crunchstat"
9         "git.curoverse.com/arvados.git/sdk/go/arvados"
10         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
11         "git.curoverse.com/arvados.git/sdk/go/keepclient"
12         "git.curoverse.com/arvados.git/sdk/go/manifest"
13         "github.com/curoverse/dockerclient"
14         "io"
15         "io/ioutil"
16         "log"
17         "os"
18         "os/exec"
19         "os/signal"
20         "path"
21         "path/filepath"
22         "sort"
23         "strings"
24         "sync"
25         "syscall"
26         "time"
27 )
28
29 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
30 type IArvadosClient interface {
31         Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
32         Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
33         Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
34         Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error
35         Discovery(key string) (interface{}, error)
36 }
37
38 // ErrCancelled is the error returned when the container is cancelled.
39 var ErrCancelled = errors.New("Cancelled")
40
41 // IKeepClient is the minimal Keep API methods used by crunch-run.
42 type IKeepClient interface {
43         PutHB(hash string, buf []byte) (string, int, error)
44         ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
45 }
46
47 // NewLogWriter is a factory function to create a new log writer.
48 type NewLogWriter func(name string) io.WriteCloser
49
50 type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
51
52 type MkTempDir func(string, string) (string, error)
53
54 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
55 type ThinDockerClient interface {
56         StopContainer(id string, timeout int) error
57         InspectImage(id string) (*dockerclient.ImageInfo, error)
58         LoadImage(reader io.Reader) error
59         CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
60         StartContainer(id string, config *dockerclient.HostConfig) error
61         AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error)
62         Wait(id string) <-chan dockerclient.WaitResult
63         RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
64 }
65
66 // ContainerRunner is the main stateful struct used for a single execution of a
67 // container.
68 type ContainerRunner struct {
69         Docker    ThinDockerClient
70         ArvClient IArvadosClient
71         Kc        IKeepClient
72         arvados.Container
73         dockerclient.ContainerConfig
74         dockerclient.HostConfig
75         token       string
76         ContainerID string
77         ExitCode    *int
78         NewLogWriter
79         loggingDone   chan bool
80         CrunchLog     *ThrottledLogger
81         Stdout        io.WriteCloser
82         Stderr        *ThrottledLogger
83         LogCollection *CollectionWriter
84         LogsPDH       *string
85         RunArvMount
86         MkTempDir
87         ArvMount       *exec.Cmd
88         ArvMountPoint  string
89         HostOutputDir  string
90         CleanupTempDir []string
91         Binds          []string
92         OutputPDH      *string
93         CancelLock     sync.Mutex
94         Cancelled      bool
95         SigChan        chan os.Signal
96         ArvMountExit   chan error
97         finalState     string
98         trashLifetime  time.Duration
99
100         statLogger   io.WriteCloser
101         statReporter *crunchstat.Reporter
102         statInterval time.Duration
103         cgroupRoot   string
104         // What we expect the container's cgroup parent to be.
105         expectCgroupParent string
106         // What we tell docker to use as the container's cgroup
107         // parent. Note: Ideally we would use the same field for both
108         // expectCgroupParent and setCgroupParent, and just make it
109         // default to "docker". However, when using docker < 1.10 with
110         // systemd, specifying a non-empty cgroup parent (even the
111         // default value "docker") hits a docker bug
112         // (https://github.com/docker/docker/issues/17126). Using two
113         // separate fields makes it possible to use the "expect cgroup
114         // parent to be X" feature even on sites where the "specify
115         // cgroup parent" feature breaks.
116         setCgroupParent string
117 }
118
119 // SetupSignals sets up signal handling to gracefully terminate the underlying
120 // Docker container and update state when receiving a TERM, INT or QUIT signal.
121 func (runner *ContainerRunner) SetupSignals() {
122         runner.SigChan = make(chan os.Signal, 1)
123         signal.Notify(runner.SigChan, syscall.SIGTERM)
124         signal.Notify(runner.SigChan, syscall.SIGINT)
125         signal.Notify(runner.SigChan, syscall.SIGQUIT)
126
127         go func(sig <-chan os.Signal) {
128                 for range sig {
129                         if !runner.Cancelled {
130                                 runner.CancelLock.Lock()
131                                 runner.Cancelled = true
132                                 if runner.ContainerID != "" {
133                                         runner.Docker.StopContainer(runner.ContainerID, 10)
134                                 }
135                                 runner.CancelLock.Unlock()
136                         }
137                 }
138         }(runner.SigChan)
139 }
140
141 // LoadImage determines the docker image id from the container record and
142 // checks if it is available in the local Docker image store.  If not, it loads
143 // the image from Keep.
144 func (runner *ContainerRunner) LoadImage() (err error) {
145
146         runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
147
148         var collection arvados.Collection
149         err = runner.ArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
150         if err != nil {
151                 return fmt.Errorf("While getting container image collection: %v", err)
152         }
153         manifest := manifest.Manifest{Text: collection.ManifestText}
154         var img, imageID string
155         for ms := range manifest.StreamIter() {
156                 img = ms.FileStreamSegments[0].Name
157                 if !strings.HasSuffix(img, ".tar") {
158                         return fmt.Errorf("First file in the container image collection does not end in .tar")
159                 }
160                 imageID = img[:len(img)-4]
161         }
162
163         runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
164
165         _, err = runner.Docker.InspectImage(imageID)
166         if err != nil {
167                 runner.CrunchLog.Print("Loading Docker image from keep")
168
169                 var readCloser io.ReadCloser
170                 readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
171                 if err != nil {
172                         return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
173                 }
174
175                 err = runner.Docker.LoadImage(readCloser)
176                 if err != nil {
177                         return fmt.Errorf("While loading container image into Docker: %v", err)
178                 }
179         } else {
180                 runner.CrunchLog.Print("Docker image is available")
181         }
182
183         runner.ContainerConfig.Image = imageID
184
185         return nil
186 }
187
188 func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) {
189         c = exec.Command("arv-mount", arvMountCmd...)
190
191         // Copy our environment, but override ARVADOS_API_TOKEN with
192         // the container auth token.
193         c.Env = nil
194         for _, s := range os.Environ() {
195                 if !strings.HasPrefix(s, "ARVADOS_API_TOKEN=") {
196                         c.Env = append(c.Env, s)
197                 }
198         }
199         c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
200
201         nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
202         c.Stdout = nt
203         c.Stderr = nt
204
205         err = c.Start()
206         if err != nil {
207                 return nil, err
208         }
209
210         statReadme := make(chan bool)
211         runner.ArvMountExit = make(chan error)
212
213         keepStatting := true
214         go func() {
215                 for keepStatting {
216                         time.Sleep(100 * time.Millisecond)
217                         _, err = os.Stat(fmt.Sprintf("%s/by_id/README", runner.ArvMountPoint))
218                         if err == nil {
219                                 keepStatting = false
220                                 statReadme <- true
221                         }
222                 }
223                 close(statReadme)
224         }()
225
226         go func() {
227                 runner.ArvMountExit <- c.Wait()
228                 close(runner.ArvMountExit)
229         }()
230
231         select {
232         case <-statReadme:
233                 break
234         case err := <-runner.ArvMountExit:
235                 runner.ArvMount = nil
236                 keepStatting = false
237                 return nil, err
238         }
239
240         return c, nil
241 }
242
243 var tmpBackedOutputDir = false
244
245 func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) {
246         if runner.ArvMountPoint == "" {
247                 runner.ArvMountPoint, err = runner.MkTempDir("", prefix)
248         }
249         return
250 }
251
252 func (runner *ContainerRunner) SetupMounts() (err error) {
253         err = runner.SetupArvMountPoint("keep")
254         if err != nil {
255                 return fmt.Errorf("While creating keep mount temp dir: %v", err)
256         }
257
258         runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
259
260         pdhOnly := true
261         tmpcount := 0
262         arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
263
264         if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
265                 arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
266         }
267
268         collectionPaths := []string{}
269         runner.Binds = nil
270         needCertMount := true
271
272         var binds []string
273         for bind, _ := range runner.Container.Mounts {
274                 binds = append(binds, bind)
275         }
276         sort.Strings(binds)
277
278         for _, bind := range binds {
279                 mnt := runner.Container.Mounts[bind]
280                 if bind == "stdout" {
281                         // Is it a "file" mount kind?
282                         if mnt.Kind != "file" {
283                                 return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
284                         }
285
286                         // Does path start with OutputPath?
287                         prefix := runner.Container.OutputPath
288                         if !strings.HasSuffix(prefix, "/") {
289                                 prefix += "/"
290                         }
291                         if !strings.HasPrefix(mnt.Path, prefix) {
292                                 return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
293                         }
294                 }
295
296                 if bind == "/etc/arvados/ca-certificates.crt" {
297                         needCertMount = false
298                 }
299
300                 if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && bind != runner.Container.OutputPath+"/" {
301                         if mnt.Kind != "collection" {
302                                 return fmt.Errorf("Only mount points of kind 'collection' are supported underneath the output_path: %v", bind)
303                         }
304                 }
305
306                 switch {
307                 case mnt.Kind == "collection":
308                         var src string
309                         if mnt.UUID != "" && mnt.PortableDataHash != "" {
310                                 return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
311                         }
312                         if mnt.UUID != "" {
313                                 if mnt.Writable {
314                                         return fmt.Errorf("Writing to existing collections currently not permitted.")
315                                 }
316                                 pdhOnly = false
317                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
318                         } else if mnt.PortableDataHash != "" {
319                                 if mnt.Writable {
320                                         return fmt.Errorf("Can never write to a collection specified by portable data hash")
321                                 }
322                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
323                         } else {
324                                 src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
325                                 arvMountCmd = append(arvMountCmd, "--mount-tmp")
326                                 arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
327                                 tmpcount += 1
328                         }
329                         if mnt.Writable {
330                                 if bind == runner.Container.OutputPath {
331                                         runner.HostOutputDir = src
332                                 } else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
333                                         return fmt.Errorf("Writable mount points are not permitted underneath the output_path: %v", bind)
334                                 }
335                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
336                         } else {
337                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
338                         }
339                         collectionPaths = append(collectionPaths, src)
340
341                 case mnt.Kind == "tmp" && bind == runner.Container.OutputPath:
342                         runner.HostOutputDir, err = runner.MkTempDir("", "")
343                         if err != nil {
344                                 return fmt.Errorf("While creating mount temp dir: %v", err)
345                         }
346                         st, staterr := os.Stat(runner.HostOutputDir)
347                         if staterr != nil {
348                                 return fmt.Errorf("While Stat on temp dir: %v", staterr)
349                         }
350                         err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
351                         if staterr != nil {
352                                 return fmt.Errorf("While Chmod temp dir: %v", err)
353                         }
354                         runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
355                         runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
356                         tmpBackedOutputDir = true
357
358                 case mnt.Kind == "tmp":
359                         runner.Binds = append(runner.Binds, bind)
360
361                 case mnt.Kind == "json":
362                         jsondata, err := json.Marshal(mnt.Content)
363                         if err != nil {
364                                 return fmt.Errorf("encoding json data: %v", err)
365                         }
366                         // Create a tempdir with a single file
367                         // (instead of just a tempfile): this way we
368                         // can ensure the file is world-readable
369                         // inside the container, without having to
370                         // make it world-readable on the docker host.
371                         tmpdir, err := runner.MkTempDir("", "")
372                         if err != nil {
373                                 return fmt.Errorf("creating temp dir: %v", err)
374                         }
375                         runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
376                         tmpfn := filepath.Join(tmpdir, "mountdata.json")
377                         err = ioutil.WriteFile(tmpfn, jsondata, 0644)
378                         if err != nil {
379                                 return fmt.Errorf("writing temp file: %v", err)
380                         }
381                         runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind))
382                 }
383         }
384
385         if runner.HostOutputDir == "" {
386                 return fmt.Errorf("Output path does not correspond to a writable mount point")
387         }
388
389         if wantAPI := runner.Container.RuntimeConstraints.API; needCertMount && wantAPI != nil && *wantAPI {
390                 for _, certfile := range arvadosclient.CertFiles {
391                         _, err := os.Stat(certfile)
392                         if err == nil {
393                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:/etc/arvados/ca-certificates.crt:ro", certfile))
394                                 break
395                         }
396                 }
397         }
398
399         if pdhOnly {
400                 arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
401         } else {
402                 arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
403         }
404         arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
405
406         token, err := runner.ContainerToken()
407         if err != nil {
408                 return fmt.Errorf("could not get container token: %s", err)
409         }
410
411         runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
412         if err != nil {
413                 return fmt.Errorf("While trying to start arv-mount: %v", err)
414         }
415
416         for _, p := range collectionPaths {
417                 _, err = os.Stat(p)
418                 if err != nil {
419                         return fmt.Errorf("While checking that input files exist: %v", err)
420                 }
421         }
422
423         return nil
424 }
425
426 func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
427         // Handle docker log protocol
428         // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
429
430         header := make([]byte, 8)
431         for {
432                 _, readerr := io.ReadAtLeast(containerReader, header, 8)
433
434                 if readerr == nil {
435                         readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
436                         if header[0] == 1 {
437                                 // stdout
438                                 _, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
439                         } else {
440                                 // stderr
441                                 _, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
442                         }
443                 }
444
445                 if readerr != nil {
446                         if readerr != io.EOF {
447                                 runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
448                         }
449
450                         closeerr := runner.Stdout.Close()
451                         if closeerr != nil {
452                                 runner.CrunchLog.Printf("While closing stdout logs: %v", closeerr)
453                         }
454
455                         closeerr = runner.Stderr.Close()
456                         if closeerr != nil {
457                                 runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
458                         }
459
460                         if runner.statReporter != nil {
461                                 runner.statReporter.Stop()
462                                 closeerr = runner.statLogger.Close()
463                                 if closeerr != nil {
464                                         runner.CrunchLog.Printf("While closing crunchstat logs: %v", closeerr)
465                                 }
466                         }
467
468                         runner.loggingDone <- true
469                         close(runner.loggingDone)
470                         return
471                 }
472         }
473 }
474
475 func (runner *ContainerRunner) StartCrunchstat() {
476         runner.statLogger = NewThrottledLogger(runner.NewLogWriter("crunchstat"))
477         runner.statReporter = &crunchstat.Reporter{
478                 CID:          runner.ContainerID,
479                 Logger:       log.New(runner.statLogger, "", 0),
480                 CgroupParent: runner.expectCgroupParent,
481                 CgroupRoot:   runner.cgroupRoot,
482                 PollPeriod:   runner.statInterval,
483         }
484         runner.statReporter.Start()
485 }
486
487 // AttachLogs connects the docker container stdout and stderr logs to the
488 // Arvados logger which logs to Keep and the API server logs table.
489 func (runner *ContainerRunner) AttachStreams() (err error) {
490
491         runner.CrunchLog.Print("Attaching container streams")
492
493         var containerReader io.Reader
494         containerReader, err = runner.Docker.AttachContainer(runner.ContainerID,
495                 &dockerclient.AttachOptions{Stream: true, Stdout: true, Stderr: true})
496         if err != nil {
497                 return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
498         }
499
500         runner.loggingDone = make(chan bool)
501
502         if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
503                 stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):]
504                 index := strings.LastIndex(stdoutPath, "/")
505                 if index > 0 {
506                         subdirs := stdoutPath[:index]
507                         if subdirs != "" {
508                                 st, err := os.Stat(runner.HostOutputDir)
509                                 if err != nil {
510                                         return fmt.Errorf("While Stat on temp dir: %v", err)
511                                 }
512                                 stdoutPath := path.Join(runner.HostOutputDir, subdirs)
513                                 err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
514                                 if err != nil {
515                                         return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
516                                 }
517                         }
518                 }
519                 stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
520                 if err != nil {
521                         return fmt.Errorf("While creating stdout file: %v", err)
522                 }
523                 runner.Stdout = stdoutFile
524         } else {
525                 runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
526         }
527         runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
528
529         go runner.ProcessDockerAttach(containerReader)
530
531         return nil
532 }
533
534 // CreateContainer creates the docker container.
535 func (runner *ContainerRunner) CreateContainer() error {
536         runner.CrunchLog.Print("Creating Docker container")
537
538         runner.ContainerConfig.Cmd = runner.Container.Command
539         if runner.Container.Cwd != "." {
540                 runner.ContainerConfig.WorkingDir = runner.Container.Cwd
541         }
542
543         for k, v := range runner.Container.Environment {
544                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
545         }
546         if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
547                 tok, err := runner.ContainerToken()
548                 if err != nil {
549                         return err
550                 }
551                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env,
552                         "ARVADOS_API_TOKEN="+tok,
553                         "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
554                         "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
555                 )
556                 runner.ContainerConfig.NetworkDisabled = false
557         } else {
558                 runner.ContainerConfig.NetworkDisabled = true
559         }
560
561         var err error
562         runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
563         if err != nil {
564                 return fmt.Errorf("While creating container: %v", err)
565         }
566
567         runner.HostConfig = dockerclient.HostConfig{
568                 Binds:        runner.Binds,
569                 CgroupParent: runner.setCgroupParent,
570                 LogConfig: dockerclient.LogConfig{
571                         Type: "none",
572                 },
573         }
574
575         return runner.AttachStreams()
576 }
577
578 // StartContainer starts the docker container created by CreateContainer.
579 func (runner *ContainerRunner) StartContainer() error {
580         runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
581         err := runner.Docker.StartContainer(runner.ContainerID, &runner.HostConfig)
582         if err != nil {
583                 return fmt.Errorf("could not start container: %v", err)
584         }
585         return nil
586 }
587
588 // WaitFinish waits for the container to terminate, capture the exit code, and
589 // close the stdout/stderr logging.
590 func (runner *ContainerRunner) WaitFinish() error {
591         runner.CrunchLog.Print("Waiting for container to finish")
592
593         result := runner.Docker.Wait(runner.ContainerID)
594         wr := <-result
595         if wr.Error != nil {
596                 return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
597         }
598         runner.ExitCode = &wr.ExitCode
599
600         // wait for stdout/stderr to complete
601         <-runner.loggingDone
602
603         return nil
604 }
605
606 // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
607 func (runner *ContainerRunner) CaptureOutput() error {
608         if runner.finalState != "Complete" {
609                 return nil
610         }
611
612         if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
613                 // Output may have been set directly by the container, so
614                 // refresh the container record to check.
615                 err := runner.ArvClient.Get("containers", runner.Container.UUID,
616                         nil, &runner.Container)
617                 if err != nil {
618                         return err
619                 }
620                 if runner.Container.Output != "" {
621                         // Container output is already set.
622                         runner.OutputPDH = &runner.Container.Output
623                         return nil
624                 }
625         }
626
627         if runner.HostOutputDir == "" {
628                 return nil
629         }
630
631         _, err := os.Stat(runner.HostOutputDir)
632         if err != nil {
633                 return fmt.Errorf("While checking host output path: %v", err)
634         }
635
636         var manifestText string
637
638         collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
639         _, err = os.Stat(collectionMetafile)
640         if err != nil {
641                 // Regular directory
642                 cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}}
643                 manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
644                 if err != nil {
645                         return fmt.Errorf("While uploading output files: %v", err)
646                 }
647         } else {
648                 // FUSE mount directory
649                 file, openerr := os.Open(collectionMetafile)
650                 if openerr != nil {
651                         return fmt.Errorf("While opening FUSE metafile: %v", err)
652                 }
653                 defer file.Close()
654
655                 var rec arvados.Collection
656                 err = json.NewDecoder(file).Decode(&rec)
657                 if err != nil {
658                         return fmt.Errorf("While reading FUSE metafile: %v", err)
659                 }
660                 manifestText = rec.ManifestText
661         }
662
663         // Pre-populate output from the configured mount points
664         var binds []string
665         for bind, _ := range runner.Container.Mounts {
666                 binds = append(binds, bind)
667         }
668         sort.Strings(binds)
669
670         for _, bind := range binds {
671                 mnt := runner.Container.Mounts[bind]
672
673                 bindSuffix := strings.TrimPrefix(bind, runner.Container.OutputPath)
674
675                 if bindSuffix == bind || len(bindSuffix) <= 0 {
676                         // either does not start with OutputPath or is OutputPath itself
677                         continue
678                 }
679
680                 if strings.Index(bindSuffix, "/") != 0 {
681                         return fmt.Errorf("Expected bind to be of the format '%v/*' but found: %v", runner.Container.OutputPath, bind)
682                 }
683
684                 jsondata, err := json.Marshal(mnt.Content)
685                 if err != nil {
686                         return fmt.Errorf("While marshal of mount content: %v", err)
687                 }
688                 var content map[string]interface{}
689                 err = json.Unmarshal(jsondata, &content)
690                 if err != nil {
691                         return fmt.Errorf("While unmarshal of mount content: %v", err)
692                 }
693
694                 if content["exclude_from_output"] == true {
695                         continue
696                 }
697
698                 idx := strings.Index(mnt.PortableDataHash, "/")
699                 if idx > 0 {
700                         mnt.Path = mnt.PortableDataHash[idx:]
701                         mnt.PortableDataHash = mnt.PortableDataHash[0:idx]
702                 }
703
704                 // append to manifest_text
705                 m, err := runner.getCollectionManifestForPath(mnt, bindSuffix)
706                 if err != nil {
707                         return err
708                 }
709
710                 manifestText = manifestText + m
711         }
712
713         // Save output
714         var response arvados.Collection
715         err = runner.ArvClient.Create("collections",
716                 arvadosclient.Dict{
717                         "collection": arvadosclient.Dict{
718                                 "trash_at":      time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
719                                 "name":          "output for " + runner.Container.UUID,
720                                 "manifest_text": manifestText}},
721                 &response)
722         if err != nil {
723                 return fmt.Errorf("While creating output collection: %v", err)
724         }
725         runner.OutputPDH = &response.PortableDataHash
726         return nil
727 }
728
729 // Fetch the collection for the mnt.PortableDataHash
730 // Return the manifest_text fragment corresponding to the specified mnt.Path
731 //  after making any required updates.
732 //  Ex:
733 //    If mnt.Path is not specified,
734 //      return the entire manifest_text after replacing any "." with bindSuffix
735 //    If mnt.Path corresponds to one stream,
736 //      return the manifest_text for that stream after replacing that stream name with bindSuffix
737 //    Otherwise, check if a filename in any one stream is being sought. Return the manifest_text
738 //      for that stream after replacing stream name with bindSuffix minus the last word
739 //      and the file name with last word of the bindSuffix
740 //  Allowed path examples:
741 //    "path":"/"
742 //    "path":"/subdir1"
743 //    "path":"/subdir1/subdir2"
744 //    "path":"/subdir/filename" etc
745 func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, bindSuffix string) (string, error) {
746         var collection arvados.Collection
747         err := runner.ArvClient.Get("collections", mnt.PortableDataHash, nil, &collection)
748         if err != nil {
749                 return "", fmt.Errorf("While getting collection for %v: %v", mnt.PortableDataHash, err)
750         }
751
752         manifestText := ""
753         if mnt.Path == "" || mnt.Path == "/" {
754                 // no path specified; return the entire manifest text
755                 manifestText = collection.ManifestText
756                 manifestText = strings.Replace(manifestText, "./", "."+bindSuffix+"/", -1)
757                 manifestText = strings.Replace(manifestText, ". ", "."+bindSuffix+" ", -1)
758         } else {
759                 // either a single stream or file from a stream is being sought
760                 bindIdx := strings.LastIndex(bindSuffix, "/")
761                 var bindSubdir, bindFileName string
762                 if bindIdx >= 0 {
763                         bindSubdir = "." + bindSuffix[0:bindIdx]
764                         bindFileName = bindSuffix[bindIdx+1:]
765                 }
766                 pathIdx := strings.LastIndex(mnt.Path, "/")
767                 var pathSubdir, pathFileName string
768                 if pathIdx >= 0 {
769                         pathSubdir = "." + mnt.Path[0:pathIdx]
770                         pathFileName = mnt.Path[pathIdx+1:]
771                 }
772                 streams := strings.Split(collection.ManifestText, "\n")
773                 for _, stream := range streams {
774                         tokens := strings.Split(stream, " ")
775                         if tokens[0] == "."+mnt.Path {
776                                 // path refers to this complete stream
777                                 adjustedStream := strings.Replace(stream, "."+mnt.Path, "."+bindSuffix, -1)
778                                 manifestText = adjustedStream + "\n"
779                                 break
780                         } else {
781                                 // look for a matching file in this stream
782                                 if tokens[0] == pathSubdir {
783                                         // path refers to a file in this stream
784                                         for _, token := range tokens {
785                                                 if strings.Index(token, ":"+pathFileName) > 0 {
786                                                         // found the file in the stream; discard all other file tokens
787                                                         for _, t := range tokens {
788                                                                 if strings.Index(t, ":") == -1 {
789                                                                         manifestText += (" " + t)
790                                                                 } else {
791                                                                         break // done reading all non-file tokens
792                                                                 }
793                                                         }
794                                                         manifestText = strings.Trim(manifestText, " ")
795                                                         token = strings.Replace(token, ":"+pathFileName, ":"+bindFileName, -1)
796                                                         manifestText += (" " + token + "\n")
797                                                         manifestText = strings.Replace(manifestText, pathSubdir, bindSubdir, -1)
798                                                         break
799                                                 }
800                                         }
801                                 }
802                         }
803                 }
804         }
805
806         return manifestText, nil
807 }
808
809 func (runner *ContainerRunner) loadDiscoveryVars() {
810         tl, err := runner.ArvClient.Discovery("defaultTrashLifetime")
811         if err != nil {
812                 log.Fatalf("getting defaultTrashLifetime from discovery document: %s", err)
813         }
814         runner.trashLifetime = time.Duration(tl.(float64)) * time.Second
815 }
816
817 func (runner *ContainerRunner) CleanupDirs() {
818         if runner.ArvMount != nil {
819                 umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
820                 umnterr := umount.Run()
821                 if umnterr != nil {
822                         runner.CrunchLog.Printf("While running fusermount: %v", umnterr)
823                 }
824
825                 mnterr := <-runner.ArvMountExit
826                 if mnterr != nil {
827                         runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
828                 }
829         }
830
831         for _, tmpdir := range runner.CleanupTempDir {
832                 rmerr := os.RemoveAll(tmpdir)
833                 if rmerr != nil {
834                         runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
835                 }
836         }
837 }
838
839 // CommitLogs posts the collection containing the final container logs.
840 func (runner *ContainerRunner) CommitLogs() error {
841         runner.CrunchLog.Print(runner.finalState)
842         runner.CrunchLog.Close()
843
844         // Closing CrunchLog above allows it to be committed to Keep at this
845         // point, but re-open crunch log with ArvClient in case there are any
846         // other further (such as failing to write the log to Keep!) while
847         // shutting down
848         runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID,
849                 "crunch-run", nil})
850
851         if runner.LogsPDH != nil {
852                 // If we have already assigned something to LogsPDH,
853                 // we must be closing the re-opened log, which won't
854                 // end up getting attached to the container record and
855                 // therefore doesn't need to be saved as a collection
856                 // -- it exists only to send logs to other channels.
857                 return nil
858         }
859
860         mt, err := runner.LogCollection.ManifestText()
861         if err != nil {
862                 return fmt.Errorf("While creating log manifest: %v", err)
863         }
864
865         var response arvados.Collection
866         err = runner.ArvClient.Create("collections",
867                 arvadosclient.Dict{
868                         "collection": arvadosclient.Dict{
869                                 "trash_at":      time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
870                                 "name":          "logs for " + runner.Container.UUID,
871                                 "manifest_text": mt}},
872                 &response)
873         if err != nil {
874                 return fmt.Errorf("While creating log collection: %v", err)
875         }
876         runner.LogsPDH = &response.PortableDataHash
877         return nil
878 }
879
880 // UpdateContainerRunning updates the container state to "Running"
881 func (runner *ContainerRunner) UpdateContainerRunning() error {
882         runner.CancelLock.Lock()
883         defer runner.CancelLock.Unlock()
884         if runner.Cancelled {
885                 return ErrCancelled
886         }
887         return runner.ArvClient.Update("containers", runner.Container.UUID,
888                 arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
889 }
890
891 // ContainerToken returns the api_token the container (and any
892 // arv-mount processes) are allowed to use.
893 func (runner *ContainerRunner) ContainerToken() (string, error) {
894         if runner.token != "" {
895                 return runner.token, nil
896         }
897
898         var auth arvados.APIClientAuthorization
899         err := runner.ArvClient.Call("GET", "containers", runner.Container.UUID, "auth", nil, &auth)
900         if err != nil {
901                 return "", err
902         }
903         runner.token = auth.APIToken
904         return runner.token, nil
905 }
906
907 // UpdateContainerComplete updates the container record state on API
908 // server to "Complete" or "Cancelled"
909 func (runner *ContainerRunner) UpdateContainerFinal() error {
910         update := arvadosclient.Dict{}
911         update["state"] = runner.finalState
912         if runner.LogsPDH != nil {
913                 update["log"] = *runner.LogsPDH
914         }
915         if runner.finalState == "Complete" {
916                 if runner.ExitCode != nil {
917                         update["exit_code"] = *runner.ExitCode
918                 }
919                 if runner.OutputPDH != nil {
920                         update["output"] = *runner.OutputPDH
921                 }
922         }
923         return runner.ArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
924 }
925
926 // IsCancelled returns the value of Cancelled, with goroutine safety.
927 func (runner *ContainerRunner) IsCancelled() bool {
928         runner.CancelLock.Lock()
929         defer runner.CancelLock.Unlock()
930         return runner.Cancelled
931 }
932
933 // NewArvLogWriter creates an ArvLogWriter
934 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
935         return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")}
936 }
937
938 // Run the full container lifecycle.
939 func (runner *ContainerRunner) Run() (err error) {
940         runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
941
942         hostname, hosterr := os.Hostname()
943         if hosterr != nil {
944                 runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr)
945         } else {
946                 runner.CrunchLog.Printf("Executing on host '%s'", hostname)
947         }
948
949         // Clean up temporary directories _after_ finalizing
950         // everything (if we've made any by then)
951         defer runner.CleanupDirs()
952
953         runner.finalState = "Queued"
954
955         defer func() {
956                 // checkErr prints e (unless it's nil) and sets err to
957                 // e (unless err is already non-nil). Thus, if err
958                 // hasn't already been assigned when Run() returns,
959                 // this cleanup func will cause Run() to return the
960                 // first non-nil error that is passed to checkErr().
961                 checkErr := func(e error) {
962                         if e == nil {
963                                 return
964                         }
965                         runner.CrunchLog.Print(e)
966                         if err == nil {
967                                 err = e
968                         }
969                 }
970
971                 // Log the error encountered in Run(), if any
972                 checkErr(err)
973
974                 if runner.finalState == "Queued" {
975                         runner.CrunchLog.Close()
976                         runner.UpdateContainerFinal()
977                         return
978                 }
979
980                 if runner.IsCancelled() {
981                         runner.finalState = "Cancelled"
982                         // but don't return yet -- we still want to
983                         // capture partial output and write logs
984                 }
985
986                 checkErr(runner.CaptureOutput())
987                 checkErr(runner.CommitLogs())
988                 checkErr(runner.UpdateContainerFinal())
989
990                 // The real log is already closed, but then we opened
991                 // a new one in case we needed to log anything while
992                 // finalizing.
993                 runner.CrunchLog.Close()
994         }()
995
996         err = runner.ArvClient.Get("containers", runner.Container.UUID, nil, &runner.Container)
997         if err != nil {
998                 err = fmt.Errorf("While getting container record: %v", err)
999                 return
1000         }
1001
1002         // setup signal handling
1003         runner.SetupSignals()
1004
1005         // check for and/or load image
1006         err = runner.LoadImage()
1007         if err != nil {
1008                 runner.finalState = "Cancelled"
1009                 err = fmt.Errorf("While loading container image: %v", err)
1010                 return
1011         }
1012
1013         // set up FUSE mount and binds
1014         err = runner.SetupMounts()
1015         if err != nil {
1016                 runner.finalState = "Cancelled"
1017                 err = fmt.Errorf("While setting up mounts: %v", err)
1018                 return
1019         }
1020
1021         err = runner.CreateContainer()
1022         if err != nil {
1023                 return
1024         }
1025
1026         runner.StartCrunchstat()
1027
1028         if runner.IsCancelled() {
1029                 return
1030         }
1031
1032         err = runner.UpdateContainerRunning()
1033         if err != nil {
1034                 return
1035         }
1036         runner.finalState = "Cancelled"
1037
1038         err = runner.StartContainer()
1039         if err != nil {
1040                 return
1041         }
1042
1043         err = runner.WaitFinish()
1044         if err == nil {
1045                 runner.finalState = "Complete"
1046         }
1047         return
1048 }
1049
1050 // NewContainerRunner creates a new container runner.
1051 func NewContainerRunner(api IArvadosClient,
1052         kc IKeepClient,
1053         docker ThinDockerClient,
1054         containerUUID string) *ContainerRunner {
1055
1056         cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
1057         cr.NewLogWriter = cr.NewArvLogWriter
1058         cr.RunArvMount = cr.ArvMountCmd
1059         cr.MkTempDir = ioutil.TempDir
1060         cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
1061         cr.Container.UUID = containerUUID
1062         cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
1063         cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
1064         cr.loadDiscoveryVars()
1065         return cr
1066 }
1067
1068 func main() {
1069         statInterval := flag.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
1070         cgroupRoot := flag.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
1071         cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
1072         cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
1073         caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
1074         flag.Parse()
1075
1076         containerId := flag.Arg(0)
1077
1078         if *caCertsPath != "" {
1079                 arvadosclient.CertFiles = []string{*caCertsPath}
1080         }
1081
1082         api, err := arvadosclient.MakeArvadosClient()
1083         if err != nil {
1084                 log.Fatalf("%s: %v", containerId, err)
1085         }
1086         api.Retries = 8
1087
1088         var kc *keepclient.KeepClient
1089         kc, err = keepclient.MakeKeepClient(api)
1090         if err != nil {
1091                 log.Fatalf("%s: %v", containerId, err)
1092         }
1093         kc.Retries = 4
1094
1095         var docker *dockerclient.DockerClient
1096         docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
1097         if err != nil {
1098                 log.Fatalf("%s: %v", containerId, err)
1099         }
1100
1101         cr := NewContainerRunner(api, kc, docker, containerId)
1102         cr.statInterval = *statInterval
1103         cr.cgroupRoot = *cgroupRoot
1104         cr.expectCgroupParent = *cgroupParent
1105         if *cgroupParentSubsystem != "" {
1106                 p := findCgroup(*cgroupParentSubsystem)
1107                 cr.setCgroupParent = p
1108                 cr.expectCgroupParent = p
1109         }
1110
1111         err = cr.Run()
1112         if err != nil {
1113                 log.Fatalf("%s: %v", containerId, err)
1114         }
1115
1116 }