Merge branch 'master' into 11071-fts-perf-test
[arvados.git] / services / crunch-run / crunchrun.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "flag"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "log"
11         "os"
12         "os/exec"
13         "os/signal"
14         "path"
15         "path/filepath"
16         "sort"
17         "strings"
18         "sync"
19         "syscall"
20         "time"
21
22         "git.curoverse.com/arvados.git/lib/crunchstat"
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
25         "git.curoverse.com/arvados.git/sdk/go/keepclient"
26         "git.curoverse.com/arvados.git/sdk/go/manifest"
27         "github.com/curoverse/dockerclient"
28 )
29
30 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
31 type IArvadosClient interface {
32         Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
33         Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
34         Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
35         Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error
36         Discovery(key string) (interface{}, error)
37 }
38
39 // ErrCancelled is the error returned when the container is cancelled.
40 var ErrCancelled = errors.New("Cancelled")
41
42 // IKeepClient is the minimal Keep API methods used by crunch-run.
43 type IKeepClient interface {
44         PutHB(hash string, buf []byte) (string, int, error)
45         ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error)
46 }
47
48 // NewLogWriter is a factory function to create a new log writer.
49 type NewLogWriter func(name string) io.WriteCloser
50
51 type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
52
53 type MkTempDir func(string, string) (string, error)
54
55 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
56 type ThinDockerClient interface {
57         StopContainer(id string, timeout int) error
58         InspectImage(id string) (*dockerclient.ImageInfo, error)
59         LoadImage(reader io.Reader) error
60         CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
61         StartContainer(id string, config *dockerclient.HostConfig) error
62         AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error)
63         Wait(id string) <-chan dockerclient.WaitResult
64         RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
65 }
66
67 // ContainerRunner is the main stateful struct used for a single execution of a
68 // container.
69 type ContainerRunner struct {
70         Docker    ThinDockerClient
71         ArvClient IArvadosClient
72         Kc        IKeepClient
73         arvados.Container
74         dockerclient.ContainerConfig
75         dockerclient.HostConfig
76         token       string
77         ContainerID string
78         ExitCode    *int
79         NewLogWriter
80         loggingDone   chan bool
81         CrunchLog     *ThrottledLogger
82         Stdout        io.WriteCloser
83         Stderr        *ThrottledLogger
84         LogCollection *CollectionWriter
85         LogsPDH       *string
86         RunArvMount
87         MkTempDir
88         ArvMount       *exec.Cmd
89         ArvMountPoint  string
90         HostOutputDir  string
91         CleanupTempDir []string
92         Binds          []string
93         OutputPDH      *string
94         CancelLock     sync.Mutex
95         Cancelled      bool
96         SigChan        chan os.Signal
97         ArvMountExit   chan error
98         finalState     string
99
100         statLogger   io.WriteCloser
101         statReporter *crunchstat.Reporter
102         statInterval time.Duration
103         cgroupRoot   string
104         // What we expect the container's cgroup parent to be.
105         expectCgroupParent string
106         // What we tell docker to use as the container's cgroup
107         // parent. Note: Ideally we would use the same field for both
108         // expectCgroupParent and setCgroupParent, and just make it
109         // default to "docker". However, when using docker < 1.10 with
110         // systemd, specifying a non-empty cgroup parent (even the
111         // default value "docker") hits a docker bug
112         // (https://github.com/docker/docker/issues/17126). Using two
113         // separate fields makes it possible to use the "expect cgroup
114         // parent to be X" feature even on sites where the "specify
115         // cgroup parent" feature breaks.
116         setCgroupParent string
117 }
118
119 // SetupSignals sets up signal handling to gracefully terminate the underlying
120 // Docker container and update state when receiving a TERM, INT or QUIT signal.
121 func (runner *ContainerRunner) SetupSignals() {
122         runner.SigChan = make(chan os.Signal, 1)
123         signal.Notify(runner.SigChan, syscall.SIGTERM)
124         signal.Notify(runner.SigChan, syscall.SIGINT)
125         signal.Notify(runner.SigChan, syscall.SIGQUIT)
126
127         go func(sig chan os.Signal) {
128                 <-sig
129                 runner.stop()
130                 signal.Stop(sig)
131         }(runner.SigChan)
132 }
133
134 // stop the underlying Docker container.
135 func (runner *ContainerRunner) stop() {
136         runner.CancelLock.Lock()
137         defer runner.CancelLock.Unlock()
138         if runner.Cancelled {
139                 return
140         }
141         runner.Cancelled = true
142         if runner.ContainerID != "" {
143                 err := runner.Docker.StopContainer(runner.ContainerID, 10)
144                 if err != nil {
145                         log.Printf("StopContainer failed: %s", err)
146                 }
147         }
148 }
149
150 // LoadImage determines the docker image id from the container record and
151 // checks if it is available in the local Docker image store.  If not, it loads
152 // the image from Keep.
153 func (runner *ContainerRunner) LoadImage() (err error) {
154
155         runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
156
157         var collection arvados.Collection
158         err = runner.ArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
159         if err != nil {
160                 return fmt.Errorf("While getting container image collection: %v", err)
161         }
162         manifest := manifest.Manifest{Text: collection.ManifestText}
163         var img, imageID string
164         for ms := range manifest.StreamIter() {
165                 img = ms.FileStreamSegments[0].Name
166                 if !strings.HasSuffix(img, ".tar") {
167                         return fmt.Errorf("First file in the container image collection does not end in .tar")
168                 }
169                 imageID = img[:len(img)-4]
170         }
171
172         runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
173
174         _, err = runner.Docker.InspectImage(imageID)
175         if err != nil {
176                 runner.CrunchLog.Print("Loading Docker image from keep")
177
178                 var readCloser io.ReadCloser
179                 readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
180                 if err != nil {
181                         return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
182                 }
183
184                 err = runner.Docker.LoadImage(readCloser)
185                 if err != nil {
186                         return fmt.Errorf("While loading container image into Docker: %v", err)
187                 }
188         } else {
189                 runner.CrunchLog.Print("Docker image is available")
190         }
191
192         runner.ContainerConfig.Image = imageID
193
194         return nil
195 }
196
197 func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) {
198         c = exec.Command("arv-mount", arvMountCmd...)
199
200         // Copy our environment, but override ARVADOS_API_TOKEN with
201         // the container auth token.
202         c.Env = nil
203         for _, s := range os.Environ() {
204                 if !strings.HasPrefix(s, "ARVADOS_API_TOKEN=") {
205                         c.Env = append(c.Env, s)
206                 }
207         }
208         c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
209
210         nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
211         c.Stdout = nt
212         c.Stderr = nt
213
214         err = c.Start()
215         if err != nil {
216                 return nil, err
217         }
218
219         statReadme := make(chan bool)
220         runner.ArvMountExit = make(chan error)
221
222         keepStatting := true
223         go func() {
224                 for keepStatting {
225                         time.Sleep(100 * time.Millisecond)
226                         _, err = os.Stat(fmt.Sprintf("%s/by_id/README", runner.ArvMountPoint))
227                         if err == nil {
228                                 keepStatting = false
229                                 statReadme <- true
230                         }
231                 }
232                 close(statReadme)
233         }()
234
235         go func() {
236                 runner.ArvMountExit <- c.Wait()
237                 close(runner.ArvMountExit)
238         }()
239
240         select {
241         case <-statReadme:
242                 break
243         case err := <-runner.ArvMountExit:
244                 runner.ArvMount = nil
245                 keepStatting = false
246                 return nil, err
247         }
248
249         return c, nil
250 }
251
252 func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) {
253         if runner.ArvMountPoint == "" {
254                 runner.ArvMountPoint, err = runner.MkTempDir("", prefix)
255         }
256         return
257 }
258
259 func (runner *ContainerRunner) SetupMounts() (err error) {
260         err = runner.SetupArvMountPoint("keep")
261         if err != nil {
262                 return fmt.Errorf("While creating keep mount temp dir: %v", err)
263         }
264
265         runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
266
267         pdhOnly := true
268         tmpcount := 0
269         arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
270
271         if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
272                 arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
273         }
274
275         collectionPaths := []string{}
276         runner.Binds = nil
277         needCertMount := true
278
279         var binds []string
280         for bind, _ := range runner.Container.Mounts {
281                 binds = append(binds, bind)
282         }
283         sort.Strings(binds)
284
285         for _, bind := range binds {
286                 mnt := runner.Container.Mounts[bind]
287                 if bind == "stdout" {
288                         // Is it a "file" mount kind?
289                         if mnt.Kind != "file" {
290                                 return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
291                         }
292
293                         // Does path start with OutputPath?
294                         prefix := runner.Container.OutputPath
295                         if !strings.HasSuffix(prefix, "/") {
296                                 prefix += "/"
297                         }
298                         if !strings.HasPrefix(mnt.Path, prefix) {
299                                 return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
300                         }
301                 }
302
303                 if bind == "/etc/arvados/ca-certificates.crt" {
304                         needCertMount = false
305                 }
306
307                 if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && bind != runner.Container.OutputPath+"/" {
308                         if mnt.Kind != "collection" {
309                                 return fmt.Errorf("Only mount points of kind 'collection' are supported underneath the output_path: %v", bind)
310                         }
311                 }
312
313                 switch {
314                 case mnt.Kind == "collection":
315                         var src string
316                         if mnt.UUID != "" && mnt.PortableDataHash != "" {
317                                 return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
318                         }
319                         if mnt.UUID != "" {
320                                 if mnt.Writable {
321                                         return fmt.Errorf("Writing to existing collections currently not permitted.")
322                                 }
323                                 pdhOnly = false
324                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
325                         } else if mnt.PortableDataHash != "" {
326                                 if mnt.Writable {
327                                         return fmt.Errorf("Can never write to a collection specified by portable data hash")
328                                 }
329                                 idx := strings.Index(mnt.PortableDataHash, "/")
330                                 if idx > 0 {
331                                         mnt.Path = path.Clean(mnt.PortableDataHash[idx:])
332                                         mnt.PortableDataHash = mnt.PortableDataHash[0:idx]
333                                         runner.Container.Mounts[bind] = mnt
334                                 }
335                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
336                                 if mnt.Path != "" && mnt.Path != "." {
337                                         if strings.HasPrefix(mnt.Path, "./") {
338                                                 mnt.Path = mnt.Path[2:]
339                                         } else if strings.HasPrefix(mnt.Path, "/") {
340                                                 mnt.Path = mnt.Path[1:]
341                                         }
342                                         src += "/" + mnt.Path
343                                 }
344                         } else {
345                                 src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
346                                 arvMountCmd = append(arvMountCmd, "--mount-tmp")
347                                 arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
348                                 tmpcount += 1
349                         }
350                         if mnt.Writable {
351                                 if bind == runner.Container.OutputPath {
352                                         runner.HostOutputDir = src
353                                 } else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
354                                         return fmt.Errorf("Writable mount points are not permitted underneath the output_path: %v", bind)
355                                 }
356                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
357                         } else {
358                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
359                         }
360                         collectionPaths = append(collectionPaths, src)
361
362                 case mnt.Kind == "tmp" && bind == runner.Container.OutputPath:
363                         runner.HostOutputDir, err = runner.MkTempDir("", "")
364                         if err != nil {
365                                 return fmt.Errorf("While creating mount temp dir: %v", err)
366                         }
367                         st, staterr := os.Stat(runner.HostOutputDir)
368                         if staterr != nil {
369                                 return fmt.Errorf("While Stat on temp dir: %v", staterr)
370                         }
371                         err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
372                         if staterr != nil {
373                                 return fmt.Errorf("While Chmod temp dir: %v", err)
374                         }
375                         runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
376                         runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
377
378                 case mnt.Kind == "tmp":
379                         runner.Binds = append(runner.Binds, bind)
380
381                 case mnt.Kind == "json":
382                         jsondata, err := json.Marshal(mnt.Content)
383                         if err != nil {
384                                 return fmt.Errorf("encoding json data: %v", err)
385                         }
386                         // Create a tempdir with a single file
387                         // (instead of just a tempfile): this way we
388                         // can ensure the file is world-readable
389                         // inside the container, without having to
390                         // make it world-readable on the docker host.
391                         tmpdir, err := runner.MkTempDir("", "")
392                         if err != nil {
393                                 return fmt.Errorf("creating temp dir: %v", err)
394                         }
395                         runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
396                         tmpfn := filepath.Join(tmpdir, "mountdata.json")
397                         err = ioutil.WriteFile(tmpfn, jsondata, 0644)
398                         if err != nil {
399                                 return fmt.Errorf("writing temp file: %v", err)
400                         }
401                         runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind))
402                 }
403         }
404
405         if runner.HostOutputDir == "" {
406                 return fmt.Errorf("Output path does not correspond to a writable mount point")
407         }
408
409         if wantAPI := runner.Container.RuntimeConstraints.API; needCertMount && wantAPI != nil && *wantAPI {
410                 for _, certfile := range arvadosclient.CertFiles {
411                         _, err := os.Stat(certfile)
412                         if err == nil {
413                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:/etc/arvados/ca-certificates.crt:ro", certfile))
414                                 break
415                         }
416                 }
417         }
418
419         if pdhOnly {
420                 arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
421         } else {
422                 arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
423         }
424         arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
425
426         token, err := runner.ContainerToken()
427         if err != nil {
428                 return fmt.Errorf("could not get container token: %s", err)
429         }
430
431         runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
432         if err != nil {
433                 return fmt.Errorf("While trying to start arv-mount: %v", err)
434         }
435
436         for _, p := range collectionPaths {
437                 _, err = os.Stat(p)
438                 if err != nil {
439                         return fmt.Errorf("While checking that input files exist: %v", err)
440                 }
441         }
442
443         return nil
444 }
445
446 func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
447         // Handle docker log protocol
448         // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
449
450         header := make([]byte, 8)
451         for {
452                 _, readerr := io.ReadAtLeast(containerReader, header, 8)
453
454                 if readerr == nil {
455                         readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
456                         if header[0] == 1 {
457                                 // stdout
458                                 _, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
459                         } else {
460                                 // stderr
461                                 _, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
462                         }
463                 }
464
465                 if readerr != nil {
466                         if readerr != io.EOF {
467                                 runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
468                         }
469
470                         closeerr := runner.Stdout.Close()
471                         if closeerr != nil {
472                                 runner.CrunchLog.Printf("While closing stdout logs: %v", closeerr)
473                         }
474
475                         closeerr = runner.Stderr.Close()
476                         if closeerr != nil {
477                                 runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
478                         }
479
480                         if runner.statReporter != nil {
481                                 runner.statReporter.Stop()
482                                 closeerr = runner.statLogger.Close()
483                                 if closeerr != nil {
484                                         runner.CrunchLog.Printf("While closing crunchstat logs: %v", closeerr)
485                                 }
486                         }
487
488                         runner.loggingDone <- true
489                         close(runner.loggingDone)
490                         return
491                 }
492         }
493 }
494
495 func (runner *ContainerRunner) StartCrunchstat() {
496         runner.statLogger = NewThrottledLogger(runner.NewLogWriter("crunchstat"))
497         runner.statReporter = &crunchstat.Reporter{
498                 CID:          runner.ContainerID,
499                 Logger:       log.New(runner.statLogger, "", 0),
500                 CgroupParent: runner.expectCgroupParent,
501                 CgroupRoot:   runner.cgroupRoot,
502                 PollPeriod:   runner.statInterval,
503         }
504         runner.statReporter.Start()
505 }
506
507 // AttachLogs connects the docker container stdout and stderr logs to the
508 // Arvados logger which logs to Keep and the API server logs table.
509 func (runner *ContainerRunner) AttachStreams() (err error) {
510
511         runner.CrunchLog.Print("Attaching container streams")
512
513         var containerReader io.Reader
514         containerReader, err = runner.Docker.AttachContainer(runner.ContainerID,
515                 &dockerclient.AttachOptions{Stream: true, Stdout: true, Stderr: true})
516         if err != nil {
517                 return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
518         }
519
520         runner.loggingDone = make(chan bool)
521
522         if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
523                 stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):]
524                 index := strings.LastIndex(stdoutPath, "/")
525                 if index > 0 {
526                         subdirs := stdoutPath[:index]
527                         if subdirs != "" {
528                                 st, err := os.Stat(runner.HostOutputDir)
529                                 if err != nil {
530                                         return fmt.Errorf("While Stat on temp dir: %v", err)
531                                 }
532                                 stdoutPath := path.Join(runner.HostOutputDir, subdirs)
533                                 err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
534                                 if err != nil {
535                                         return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
536                                 }
537                         }
538                 }
539                 stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
540                 if err != nil {
541                         return fmt.Errorf("While creating stdout file: %v", err)
542                 }
543                 runner.Stdout = stdoutFile
544         } else {
545                 runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
546         }
547         runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
548
549         go runner.ProcessDockerAttach(containerReader)
550
551         return nil
552 }
553
554 // CreateContainer creates the docker container.
555 func (runner *ContainerRunner) CreateContainer() error {
556         runner.CrunchLog.Print("Creating Docker container")
557
558         runner.ContainerConfig.Cmd = runner.Container.Command
559         if runner.Container.Cwd != "." {
560                 runner.ContainerConfig.WorkingDir = runner.Container.Cwd
561         }
562
563         for k, v := range runner.Container.Environment {
564                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
565         }
566         if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
567                 tok, err := runner.ContainerToken()
568                 if err != nil {
569                         return err
570                 }
571                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env,
572                         "ARVADOS_API_TOKEN="+tok,
573                         "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
574                         "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
575                 )
576                 runner.ContainerConfig.NetworkDisabled = false
577         } else {
578                 runner.ContainerConfig.NetworkDisabled = true
579         }
580
581         var err error
582         runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
583         if err != nil {
584                 return fmt.Errorf("While creating container: %v", err)
585         }
586
587         runner.HostConfig = dockerclient.HostConfig{
588                 Binds:        runner.Binds,
589                 CgroupParent: runner.setCgroupParent,
590                 LogConfig: dockerclient.LogConfig{
591                         Type: "none",
592                 },
593         }
594
595         return runner.AttachStreams()
596 }
597
598 // StartContainer starts the docker container created by CreateContainer.
599 func (runner *ContainerRunner) StartContainer() error {
600         runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
601         err := runner.Docker.StartContainer(runner.ContainerID, &runner.HostConfig)
602         if err != nil {
603                 return fmt.Errorf("could not start container: %v", err)
604         }
605         return nil
606 }
607
608 // WaitFinish waits for the container to terminate, capture the exit code, and
609 // close the stdout/stderr logging.
610 func (runner *ContainerRunner) WaitFinish() error {
611         runner.CrunchLog.Print("Waiting for container to finish")
612
613         waitDocker := runner.Docker.Wait(runner.ContainerID)
614         waitMount := runner.ArvMountExit
615         for waitDocker != nil {
616                 select {
617                 case err := <-waitMount:
618                         runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
619                         waitMount = nil
620                         runner.stop()
621                 case wr := <-waitDocker:
622                         if wr.Error != nil {
623                                 return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
624                         }
625                         runner.ExitCode = &wr.ExitCode
626                         waitDocker = nil
627                 }
628         }
629
630         // wait for stdout/stderr to complete
631         <-runner.loggingDone
632
633         return nil
634 }
635
636 // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
637 func (runner *ContainerRunner) CaptureOutput() error {
638         if runner.finalState != "Complete" {
639                 return nil
640         }
641
642         if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
643                 // Output may have been set directly by the container, so
644                 // refresh the container record to check.
645                 err := runner.ArvClient.Get("containers", runner.Container.UUID,
646                         nil, &runner.Container)
647                 if err != nil {
648                         return err
649                 }
650                 if runner.Container.Output != "" {
651                         // Container output is already set.
652                         runner.OutputPDH = &runner.Container.Output
653                         return nil
654                 }
655         }
656
657         if runner.HostOutputDir == "" {
658                 return nil
659         }
660
661         _, err := os.Stat(runner.HostOutputDir)
662         if err != nil {
663                 return fmt.Errorf("While checking host output path: %v", err)
664         }
665
666         var manifestText string
667
668         collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
669         _, err = os.Stat(collectionMetafile)
670         if err != nil {
671                 // Regular directory
672                 cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
673                 manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
674                 if err != nil {
675                         return fmt.Errorf("While uploading output files: %v", err)
676                 }
677         } else {
678                 // FUSE mount directory
679                 file, openerr := os.Open(collectionMetafile)
680                 if openerr != nil {
681                         return fmt.Errorf("While opening FUSE metafile: %v", err)
682                 }
683                 defer file.Close()
684
685                 var rec arvados.Collection
686                 err = json.NewDecoder(file).Decode(&rec)
687                 if err != nil {
688                         return fmt.Errorf("While reading FUSE metafile: %v", err)
689                 }
690                 manifestText = rec.ManifestText
691         }
692
693         // Pre-populate output from the configured mount points
694         var binds []string
695         for bind, _ := range runner.Container.Mounts {
696                 binds = append(binds, bind)
697         }
698         sort.Strings(binds)
699
700         for _, bind := range binds {
701                 mnt := runner.Container.Mounts[bind]
702
703                 bindSuffix := strings.TrimPrefix(bind, runner.Container.OutputPath)
704
705                 if bindSuffix == bind || len(bindSuffix) <= 0 {
706                         // either does not start with OutputPath or is OutputPath itself
707                         continue
708                 }
709
710                 if mnt.ExcludeFromOutput == true {
711                         continue
712                 }
713
714                 // append to manifest_text
715                 m, err := runner.getCollectionManifestForPath(mnt, bindSuffix)
716                 if err != nil {
717                         return err
718                 }
719
720                 manifestText = manifestText + m
721         }
722
723         // Save output
724         var response arvados.Collection
725         manifest := manifest.Manifest{Text: manifestText}
726         manifestText = manifest.Extract(".", ".").Text
727         err = runner.ArvClient.Create("collections",
728                 arvadosclient.Dict{
729                         "ensure_unique_name": true,
730                         "collection": arvadosclient.Dict{
731                                 "is_trashed":    true,
732                                 "name":          "output for " + runner.Container.UUID,
733                                 "manifest_text": manifestText}},
734                 &response)
735         if err != nil {
736                 return fmt.Errorf("While creating output collection: %v", err)
737         }
738         runner.OutputPDH = &response.PortableDataHash
739         return nil
740 }
741
742 var outputCollections = make(map[string]arvados.Collection)
743
744 // Fetch the collection for the mnt.PortableDataHash
745 // Return the manifest_text fragment corresponding to the specified mnt.Path
746 //  after making any required updates.
747 //  Ex:
748 //    If mnt.Path is not specified,
749 //      return the entire manifest_text after replacing any "." with bindSuffix
750 //    If mnt.Path corresponds to one stream,
751 //      return the manifest_text for that stream after replacing that stream name with bindSuffix
752 //    Otherwise, check if a filename in any one stream is being sought. Return the manifest_text
753 //      for that stream after replacing stream name with bindSuffix minus the last word
754 //      and the file name with last word of the bindSuffix
755 //  Allowed path examples:
756 //    "path":"/"
757 //    "path":"/subdir1"
758 //    "path":"/subdir1/subdir2"
759 //    "path":"/subdir/filename" etc
760 func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, bindSuffix string) (string, error) {
761         collection := outputCollections[mnt.PortableDataHash]
762         if collection.PortableDataHash == "" {
763                 err := runner.ArvClient.Get("collections", mnt.PortableDataHash, nil, &collection)
764                 if err != nil {
765                         return "", fmt.Errorf("While getting collection for %v: %v", mnt.PortableDataHash, err)
766                 }
767                 outputCollections[mnt.PortableDataHash] = collection
768         }
769
770         if collection.ManifestText == "" {
771                 runner.CrunchLog.Printf("No manifest text for collection %v", collection.PortableDataHash)
772                 return "", nil
773         }
774
775         mft := manifest.Manifest{Text: collection.ManifestText}
776         extracted := mft.Extract(mnt.Path, bindSuffix)
777         if extracted.Err != nil {
778                 return "", fmt.Errorf("Error parsing manifest for %v: %v", mnt.PortableDataHash, extracted.Err.Error())
779         }
780         return extracted.Text, nil
781 }
782
783 func (runner *ContainerRunner) CleanupDirs() {
784         if runner.ArvMount != nil {
785                 umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
786                 umnterr := umount.Run()
787                 if umnterr != nil {
788                         runner.CrunchLog.Printf("While running fusermount: %v", umnterr)
789                 }
790
791                 mnterr := <-runner.ArvMountExit
792                 if mnterr != nil {
793                         runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
794                 }
795         }
796
797         for _, tmpdir := range runner.CleanupTempDir {
798                 rmerr := os.RemoveAll(tmpdir)
799                 if rmerr != nil {
800                         runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
801                 }
802         }
803 }
804
805 // CommitLogs posts the collection containing the final container logs.
806 func (runner *ContainerRunner) CommitLogs() error {
807         runner.CrunchLog.Print(runner.finalState)
808         runner.CrunchLog.Close()
809
810         // Closing CrunchLog above allows it to be committed to Keep at this
811         // point, but re-open crunch log with ArvClient in case there are any
812         // other further (such as failing to write the log to Keep!) while
813         // shutting down
814         runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID,
815                 "crunch-run", nil})
816
817         if runner.LogsPDH != nil {
818                 // If we have already assigned something to LogsPDH,
819                 // we must be closing the re-opened log, which won't
820                 // end up getting attached to the container record and
821                 // therefore doesn't need to be saved as a collection
822                 // -- it exists only to send logs to other channels.
823                 return nil
824         }
825
826         mt, err := runner.LogCollection.ManifestText()
827         if err != nil {
828                 return fmt.Errorf("While creating log manifest: %v", err)
829         }
830
831         var response arvados.Collection
832         err = runner.ArvClient.Create("collections",
833                 arvadosclient.Dict{
834                         "ensure_unique_name": true,
835                         "collection": arvadosclient.Dict{
836                                 "is_trashed":    true,
837                                 "name":          "logs for " + runner.Container.UUID,
838                                 "manifest_text": mt}},
839                 &response)
840         if err != nil {
841                 return fmt.Errorf("While creating log collection: %v", err)
842         }
843         runner.LogsPDH = &response.PortableDataHash
844         return nil
845 }
846
847 // UpdateContainerRunning updates the container state to "Running"
848 func (runner *ContainerRunner) UpdateContainerRunning() error {
849         runner.CancelLock.Lock()
850         defer runner.CancelLock.Unlock()
851         if runner.Cancelled {
852                 return ErrCancelled
853         }
854         return runner.ArvClient.Update("containers", runner.Container.UUID,
855                 arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
856 }
857
858 // ContainerToken returns the api_token the container (and any
859 // arv-mount processes) are allowed to use.
860 func (runner *ContainerRunner) ContainerToken() (string, error) {
861         if runner.token != "" {
862                 return runner.token, nil
863         }
864
865         var auth arvados.APIClientAuthorization
866         err := runner.ArvClient.Call("GET", "containers", runner.Container.UUID, "auth", nil, &auth)
867         if err != nil {
868                 return "", err
869         }
870         runner.token = auth.APIToken
871         return runner.token, nil
872 }
873
874 // UpdateContainerComplete updates the container record state on API
875 // server to "Complete" or "Cancelled"
876 func (runner *ContainerRunner) UpdateContainerFinal() error {
877         update := arvadosclient.Dict{}
878         update["state"] = runner.finalState
879         if runner.LogsPDH != nil {
880                 update["log"] = *runner.LogsPDH
881         }
882         if runner.finalState == "Complete" {
883                 if runner.ExitCode != nil {
884                         update["exit_code"] = *runner.ExitCode
885                 }
886                 if runner.OutputPDH != nil {
887                         update["output"] = *runner.OutputPDH
888                 }
889         }
890         return runner.ArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
891 }
892
893 // IsCancelled returns the value of Cancelled, with goroutine safety.
894 func (runner *ContainerRunner) IsCancelled() bool {
895         runner.CancelLock.Lock()
896         defer runner.CancelLock.Unlock()
897         return runner.Cancelled
898 }
899
900 // NewArvLogWriter creates an ArvLogWriter
901 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
902         return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")}
903 }
904
905 // Run the full container lifecycle.
906 func (runner *ContainerRunner) Run() (err error) {
907         runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
908
909         hostname, hosterr := os.Hostname()
910         if hosterr != nil {
911                 runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr)
912         } else {
913                 runner.CrunchLog.Printf("Executing on host '%s'", hostname)
914         }
915
916         // Clean up temporary directories _after_ finalizing
917         // everything (if we've made any by then)
918         defer runner.CleanupDirs()
919
920         runner.finalState = "Queued"
921
922         defer func() {
923                 // checkErr prints e (unless it's nil) and sets err to
924                 // e (unless err is already non-nil). Thus, if err
925                 // hasn't already been assigned when Run() returns,
926                 // this cleanup func will cause Run() to return the
927                 // first non-nil error that is passed to checkErr().
928                 checkErr := func(e error) {
929                         if e == nil {
930                                 return
931                         }
932                         runner.CrunchLog.Print(e)
933                         if err == nil {
934                                 err = e
935                         }
936                 }
937
938                 // Log the error encountered in Run(), if any
939                 checkErr(err)
940
941                 if runner.finalState == "Queued" {
942                         runner.CrunchLog.Close()
943                         runner.UpdateContainerFinal()
944                         return
945                 }
946
947                 if runner.IsCancelled() {
948                         runner.finalState = "Cancelled"
949                         // but don't return yet -- we still want to
950                         // capture partial output and write logs
951                 }
952
953                 checkErr(runner.CaptureOutput())
954                 checkErr(runner.CommitLogs())
955                 checkErr(runner.UpdateContainerFinal())
956
957                 // The real log is already closed, but then we opened
958                 // a new one in case we needed to log anything while
959                 // finalizing.
960                 runner.CrunchLog.Close()
961         }()
962
963         err = runner.ArvClient.Get("containers", runner.Container.UUID, nil, &runner.Container)
964         if err != nil {
965                 err = fmt.Errorf("While getting container record: %v", err)
966                 return
967         }
968
969         // setup signal handling
970         runner.SetupSignals()
971
972         // check for and/or load image
973         err = runner.LoadImage()
974         if err != nil {
975                 runner.finalState = "Cancelled"
976                 err = fmt.Errorf("While loading container image: %v", err)
977                 return
978         }
979
980         // set up FUSE mount and binds
981         err = runner.SetupMounts()
982         if err != nil {
983                 runner.finalState = "Cancelled"
984                 err = fmt.Errorf("While setting up mounts: %v", err)
985                 return
986         }
987
988         err = runner.CreateContainer()
989         if err != nil {
990                 return
991         }
992
993         runner.StartCrunchstat()
994
995         if runner.IsCancelled() {
996                 return
997         }
998
999         err = runner.UpdateContainerRunning()
1000         if err != nil {
1001                 return
1002         }
1003         runner.finalState = "Cancelled"
1004
1005         err = runner.StartContainer()
1006         if err != nil {
1007                 return
1008         }
1009
1010         err = runner.WaitFinish()
1011         if err == nil {
1012                 runner.finalState = "Complete"
1013         }
1014         return
1015 }
1016
1017 // NewContainerRunner creates a new container runner.
1018 func NewContainerRunner(api IArvadosClient,
1019         kc IKeepClient,
1020         docker ThinDockerClient,
1021         containerUUID string) *ContainerRunner {
1022
1023         cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
1024         cr.NewLogWriter = cr.NewArvLogWriter
1025         cr.RunArvMount = cr.ArvMountCmd
1026         cr.MkTempDir = ioutil.TempDir
1027         cr.LogCollection = &CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
1028         cr.Container.UUID = containerUUID
1029         cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
1030         cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
1031         return cr
1032 }
1033
1034 func main() {
1035         statInterval := flag.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
1036         cgroupRoot := flag.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
1037         cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
1038         cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
1039         caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
1040         flag.Parse()
1041
1042         containerId := flag.Arg(0)
1043
1044         if *caCertsPath != "" {
1045                 arvadosclient.CertFiles = []string{*caCertsPath}
1046         }
1047
1048         api, err := arvadosclient.MakeArvadosClient()
1049         if err != nil {
1050                 log.Fatalf("%s: %v", containerId, err)
1051         }
1052         api.Retries = 8
1053
1054         var kc *keepclient.KeepClient
1055         kc, err = keepclient.MakeKeepClient(api)
1056         if err != nil {
1057                 log.Fatalf("%s: %v", containerId, err)
1058         }
1059         kc.Retries = 4
1060
1061         var docker *dockerclient.DockerClient
1062         docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
1063         if err != nil {
1064                 log.Fatalf("%s: %v", containerId, err)
1065         }
1066
1067         cr := NewContainerRunner(api, kc, docker, containerId)
1068         cr.statInterval = *statInterval
1069         cr.cgroupRoot = *cgroupRoot
1070         cr.expectCgroupParent = *cgroupParent
1071         if *cgroupParentSubsystem != "" {
1072                 p := findCgroup(*cgroupParentSubsystem)
1073                 cr.setCgroupParent = p
1074                 cr.expectCgroupParent = p
1075         }
1076
1077         err = cr.Run()
1078         if err != nil {
1079                 log.Fatalf("%s: %v", containerId, err)
1080         }
1081
1082 }