9595: Support "json" mount type.
[arvados.git] / services / crunch-run / crunchrun.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/lib/crunchstat"
9         "git.curoverse.com/arvados.git/sdk/go/arvados"
10         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
11         "git.curoverse.com/arvados.git/sdk/go/keepclient"
12         "git.curoverse.com/arvados.git/sdk/go/manifest"
13         "github.com/curoverse/dockerclient"
14         "io"
15         "io/ioutil"
16         "log"
17         "os"
18         "os/exec"
19         "os/signal"
20         "path"
21         "path/filepath"
22         "strings"
23         "sync"
24         "syscall"
25         "time"
26 )
27
28 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
29 type IArvadosClient interface {
30         Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
31         Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
32         Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
33         Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) (err error)
34 }
35
36 // ErrCancelled is the error returned when the container is cancelled.
37 var ErrCancelled = errors.New("Cancelled")
38
39 // IKeepClient is the minimal Keep API methods used by crunch-run.
40 type IKeepClient interface {
41         PutHB(hash string, buf []byte) (string, int, error)
42         ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
43 }
44
45 // NewLogWriter is a factory function to create a new log writer.
46 type NewLogWriter func(name string) io.WriteCloser
47
48 type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
49
50 type MkTempDir func(string, string) (string, error)
51
52 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
53 type ThinDockerClient interface {
54         StopContainer(id string, timeout int) error
55         InspectImage(id string) (*dockerclient.ImageInfo, error)
56         LoadImage(reader io.Reader) error
57         CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
58         StartContainer(id string, config *dockerclient.HostConfig) error
59         AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error)
60         Wait(id string) <-chan dockerclient.WaitResult
61         RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
62 }
63
64 // ContainerRunner is the main stateful struct used for a single execution of a
65 // container.
66 type ContainerRunner struct {
67         Docker    ThinDockerClient
68         ArvClient IArvadosClient
69         Kc        IKeepClient
70         arvados.Container
71         dockerclient.ContainerConfig
72         dockerclient.HostConfig
73         token       string
74         ContainerID string
75         ExitCode    *int
76         NewLogWriter
77         loggingDone   chan bool
78         CrunchLog     *ThrottledLogger
79         Stdout        io.WriteCloser
80         Stderr        *ThrottledLogger
81         LogCollection *CollectionWriter
82         LogsPDH       *string
83         RunArvMount
84         MkTempDir
85         ArvMount       *exec.Cmd
86         ArvMountPoint  string
87         HostOutputDir  string
88         CleanupTempDir []string
89         Binds          []string
90         OutputPDH      *string
91         CancelLock     sync.Mutex
92         Cancelled      bool
93         SigChan        chan os.Signal
94         ArvMountExit   chan error
95         finalState     string
96
97         statLogger   io.WriteCloser
98         statReporter *crunchstat.Reporter
99         statInterval time.Duration
100         cgroupRoot   string
101         cgroupParent string
102 }
103
104 // SetupSignals sets up signal handling to gracefully terminate the underlying
105 // Docker container and update state when receiving a TERM, INT or QUIT signal.
106 func (runner *ContainerRunner) SetupSignals() {
107         runner.SigChan = make(chan os.Signal, 1)
108         signal.Notify(runner.SigChan, syscall.SIGTERM)
109         signal.Notify(runner.SigChan, syscall.SIGINT)
110         signal.Notify(runner.SigChan, syscall.SIGQUIT)
111
112         go func(sig <-chan os.Signal) {
113                 for range sig {
114                         if !runner.Cancelled {
115                                 runner.CancelLock.Lock()
116                                 runner.Cancelled = true
117                                 if runner.ContainerID != "" {
118                                         runner.Docker.StopContainer(runner.ContainerID, 10)
119                                 }
120                                 runner.CancelLock.Unlock()
121                         }
122                 }
123         }(runner.SigChan)
124 }
125
126 // LoadImage determines the docker image id from the container record and
127 // checks if it is available in the local Docker image store.  If not, it loads
128 // the image from Keep.
129 func (runner *ContainerRunner) LoadImage() (err error) {
130
131         runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
132
133         var collection arvados.Collection
134         err = runner.ArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
135         if err != nil {
136                 return fmt.Errorf("While getting container image collection: %v", err)
137         }
138         manifest := manifest.Manifest{Text: collection.ManifestText}
139         var img, imageID string
140         for ms := range manifest.StreamIter() {
141                 img = ms.FileStreamSegments[0].Name
142                 if !strings.HasSuffix(img, ".tar") {
143                         return fmt.Errorf("First file in the container image collection does not end in .tar")
144                 }
145                 imageID = img[:len(img)-4]
146         }
147
148         runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
149
150         _, err = runner.Docker.InspectImage(imageID)
151         if err != nil {
152                 runner.CrunchLog.Print("Loading Docker image from keep")
153
154                 var readCloser io.ReadCloser
155                 readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
156                 if err != nil {
157                         return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
158                 }
159
160                 err = runner.Docker.LoadImage(readCloser)
161                 if err != nil {
162                         return fmt.Errorf("While loading container image into Docker: %v", err)
163                 }
164         } else {
165                 runner.CrunchLog.Print("Docker image is available")
166         }
167
168         runner.ContainerConfig.Image = imageID
169
170         return nil
171 }
172
173 func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) {
174         c = exec.Command("arv-mount", arvMountCmd...)
175
176         // Copy our environment, but override ARVADOS_API_TOKEN with
177         // the container auth token.
178         c.Env = nil
179         for _, s := range os.Environ() {
180                 if !strings.HasPrefix(s, "ARVADOS_API_TOKEN=") {
181                         c.Env = append(c.Env, s)
182                 }
183         }
184         c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
185
186         nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
187         c.Stdout = nt
188         c.Stderr = nt
189
190         err = c.Start()
191         if err != nil {
192                 return nil, err
193         }
194
195         statReadme := make(chan bool)
196         runner.ArvMountExit = make(chan error)
197
198         keepStatting := true
199         go func() {
200                 for keepStatting {
201                         time.Sleep(100 * time.Millisecond)
202                         _, err = os.Stat(fmt.Sprintf("%s/by_id/README", runner.ArvMountPoint))
203                         if err == nil {
204                                 keepStatting = false
205                                 statReadme <- true
206                         }
207                 }
208                 close(statReadme)
209         }()
210
211         go func() {
212                 runner.ArvMountExit <- c.Wait()
213                 close(runner.ArvMountExit)
214         }()
215
216         select {
217         case <-statReadme:
218                 break
219         case err := <-runner.ArvMountExit:
220                 runner.ArvMount = nil
221                 keepStatting = false
222                 return nil, err
223         }
224
225         return c, nil
226 }
227
228 func (runner *ContainerRunner) SetupMounts() (err error) {
229         runner.ArvMountPoint, err = runner.MkTempDir("", "keep")
230         if err != nil {
231                 return fmt.Errorf("While creating keep mount temp dir: %v", err)
232         }
233
234         runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
235
236         pdhOnly := true
237         tmpcount := 0
238         arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
239         collectionPaths := []string{}
240         runner.Binds = nil
241
242         for bind, mnt := range runner.Container.Mounts {
243                 if bind == "stdout" {
244                         // Is it a "file" mount kind?
245                         if mnt.Kind != "file" {
246                                 return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
247                         }
248
249                         // Does path start with OutputPath?
250                         prefix := runner.Container.OutputPath
251                         if !strings.HasSuffix(prefix, "/") {
252                                 prefix += "/"
253                         }
254                         if !strings.HasPrefix(mnt.Path, prefix) {
255                                 return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
256                         }
257                 }
258
259                 switch {
260                 case mnt.Kind == "collection":
261                         var src string
262                         if mnt.UUID != "" && mnt.PortableDataHash != "" {
263                                 return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
264                         }
265                         if mnt.UUID != "" {
266                                 if mnt.Writable {
267                                         return fmt.Errorf("Writing to existing collections currently not permitted.")
268                                 }
269                                 pdhOnly = false
270                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
271                         } else if mnt.PortableDataHash != "" {
272                                 if mnt.Writable {
273                                         return fmt.Errorf("Can never write to a collection specified by portable data hash")
274                                 }
275                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
276                         } else {
277                                 src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
278                                 arvMountCmd = append(arvMountCmd, "--mount-tmp")
279                                 arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
280                                 tmpcount += 1
281                         }
282                         if mnt.Writable {
283                                 if bind == runner.Container.OutputPath {
284                                         runner.HostOutputDir = src
285                                 }
286                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
287                         } else {
288                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
289                         }
290                         collectionPaths = append(collectionPaths, src)
291
292                 case mnt.Kind == "tmp" && bind == runner.Container.OutputPath:
293                         runner.HostOutputDir, err = runner.MkTempDir("", "")
294                         if err != nil {
295                                 return fmt.Errorf("While creating mount temp dir: %v", err)
296                         }
297                         st, staterr := os.Stat(runner.HostOutputDir)
298                         if staterr != nil {
299                                 return fmt.Errorf("While Stat on temp dir: %v", staterr)
300                         }
301                         err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
302                         if staterr != nil {
303                                 return fmt.Errorf("While Chmod temp dir: %v", err)
304                         }
305                         runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
306                         runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
307
308                 case mnt.Kind == "tmp":
309                         runner.Binds = append(runner.Binds, bind)
310
311                 case mnt.Kind == "json":
312                         jsondata, err := json.Marshal(mnt.Content)
313                         if err != nil {
314                                 return fmt.Errorf("encoding json data: %v", err)
315                         }
316                         // Create a tempdir with a single file
317                         // (instead of just a tempfile): this way we
318                         // can ensure the file is world-readable
319                         // inside the container, without having to
320                         // make it world-readable on the docker host.
321                         tmpdir, err := runner.MkTempDir("", "")
322                         if err != nil {
323                                 return fmt.Errorf("creating temp dir: %v", err)
324                         }
325                         runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
326                         tmpfn := filepath.Join(tmpdir, "mountdata.json")
327                         err = ioutil.WriteFile(tmpfn, jsondata, 0644)
328                         if err != nil {
329                                 return fmt.Errorf("writing temp file: %v", err)
330                         }
331                         runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind))
332                 }
333         }
334
335         if runner.HostOutputDir == "" {
336                 return fmt.Errorf("Output path does not correspond to a writable mount point")
337         }
338
339         if pdhOnly {
340                 arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
341         } else {
342                 arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
343         }
344         arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
345
346         token, err := runner.ContainerToken()
347         if err != nil {
348                 return fmt.Errorf("could not get container token: %s", err)
349         }
350
351         runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
352         if err != nil {
353                 return fmt.Errorf("While trying to start arv-mount: %v", err)
354         }
355
356         for _, p := range collectionPaths {
357                 _, err = os.Stat(p)
358                 if err != nil {
359                         return fmt.Errorf("While checking that input files exist: %v", err)
360                 }
361         }
362
363         return nil
364 }
365
366 func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
367         // Handle docker log protocol
368         // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
369
370         header := make([]byte, 8)
371         for {
372                 _, readerr := io.ReadAtLeast(containerReader, header, 8)
373
374                 if readerr == nil {
375                         readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
376                         if header[0] == 1 {
377                                 // stdout
378                                 _, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
379                         } else {
380                                 // stderr
381                                 _, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
382                         }
383                 }
384
385                 if readerr != nil {
386                         if readerr != io.EOF {
387                                 runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
388                         }
389
390                         closeerr := runner.Stdout.Close()
391                         if closeerr != nil {
392                                 runner.CrunchLog.Printf("While closing stdout logs: %v", closeerr)
393                         }
394
395                         closeerr = runner.Stderr.Close()
396                         if closeerr != nil {
397                                 runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
398                         }
399
400                         if runner.statReporter != nil {
401                                 runner.statReporter.Stop()
402                                 closeerr = runner.statLogger.Close()
403                                 if closeerr != nil {
404                                         runner.CrunchLog.Printf("While closing crunchstat logs: %v", closeerr)
405                                 }
406                         }
407
408                         runner.loggingDone <- true
409                         close(runner.loggingDone)
410                         return
411                 }
412         }
413 }
414
415 func (runner *ContainerRunner) StartCrunchstat() {
416         runner.statLogger = NewThrottledLogger(runner.NewLogWriter("crunchstat"))
417         runner.statReporter = &crunchstat.Reporter{
418                 CID:          runner.ContainerID,
419                 Logger:       log.New(runner.statLogger, "", 0),
420                 CgroupParent: runner.cgroupParent,
421                 CgroupRoot:   runner.cgroupRoot,
422                 PollPeriod:   runner.statInterval,
423         }
424         runner.statReporter.Start()
425 }
426
427 // AttachLogs connects the docker container stdout and stderr logs to the
428 // Arvados logger which logs to Keep and the API server logs table.
429 func (runner *ContainerRunner) AttachStreams() (err error) {
430
431         runner.CrunchLog.Print("Attaching container streams")
432
433         var containerReader io.Reader
434         containerReader, err = runner.Docker.AttachContainer(runner.ContainerID,
435                 &dockerclient.AttachOptions{Stream: true, Stdout: true, Stderr: true})
436         if err != nil {
437                 return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
438         }
439
440         runner.loggingDone = make(chan bool)
441
442         if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
443                 stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):]
444                 index := strings.LastIndex(stdoutPath, "/")
445                 if index > 0 {
446                         subdirs := stdoutPath[:index]
447                         if subdirs != "" {
448                                 st, err := os.Stat(runner.HostOutputDir)
449                                 if err != nil {
450                                         return fmt.Errorf("While Stat on temp dir: %v", err)
451                                 }
452                                 stdoutPath := path.Join(runner.HostOutputDir, subdirs)
453                                 err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
454                                 if err != nil {
455                                         return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
456                                 }
457                         }
458                 }
459                 stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
460                 if err != nil {
461                         return fmt.Errorf("While creating stdout file: %v", err)
462                 }
463                 runner.Stdout = stdoutFile
464         } else {
465                 runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
466         }
467         runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
468
469         go runner.ProcessDockerAttach(containerReader)
470
471         return nil
472 }
473
474 // CreateContainer creates the docker container.
475 func (runner *ContainerRunner) CreateContainer() error {
476         runner.CrunchLog.Print("Creating Docker container")
477
478         runner.ContainerConfig.Cmd = runner.Container.Command
479         if runner.Container.Cwd != "." {
480                 runner.ContainerConfig.WorkingDir = runner.Container.Cwd
481         }
482
483         for k, v := range runner.Container.Environment {
484                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
485         }
486         if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
487                 tok, err := runner.ContainerToken()
488                 if err != nil {
489                         return err
490                 }
491                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env,
492                         "ARVADOS_API_TOKEN="+tok,
493                         "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
494                         "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
495                 )
496                 runner.ContainerConfig.NetworkDisabled = false
497         } else {
498                 runner.ContainerConfig.NetworkDisabled = true
499         }
500
501         var err error
502         runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
503         if err != nil {
504                 return fmt.Errorf("While creating container: %v", err)
505         }
506
507         runner.HostConfig = dockerclient.HostConfig{Binds: runner.Binds,
508                 LogConfig: dockerclient.LogConfig{Type: "none"}}
509
510         return runner.AttachStreams()
511 }
512
513 // StartContainer starts the docker container created by CreateContainer.
514 func (runner *ContainerRunner) StartContainer() error {
515         runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
516         err := runner.Docker.StartContainer(runner.ContainerID, &runner.HostConfig)
517         if err != nil {
518                 return fmt.Errorf("could not start container: %v", err)
519         }
520         return nil
521 }
522
523 // WaitFinish waits for the container to terminate, capture the exit code, and
524 // close the stdout/stderr logging.
525 func (runner *ContainerRunner) WaitFinish() error {
526         runner.CrunchLog.Print("Waiting for container to finish")
527
528         result := runner.Docker.Wait(runner.ContainerID)
529         wr := <-result
530         if wr.Error != nil {
531                 return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
532         }
533         runner.ExitCode = &wr.ExitCode
534
535         // wait for stdout/stderr to complete
536         <-runner.loggingDone
537
538         return nil
539 }
540
541 // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
542 func (runner *ContainerRunner) CaptureOutput() error {
543         if runner.finalState != "Complete" {
544                 return nil
545         }
546
547         if runner.HostOutputDir == "" {
548                 return nil
549         }
550
551         _, err := os.Stat(runner.HostOutputDir)
552         if err != nil {
553                 return fmt.Errorf("While checking host output path: %v", err)
554         }
555
556         var manifestText string
557
558         collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
559         _, err = os.Stat(collectionMetafile)
560         if err != nil {
561                 // Regular directory
562                 cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}}
563                 manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
564                 if err != nil {
565                         return fmt.Errorf("While uploading output files: %v", err)
566                 }
567         } else {
568                 // FUSE mount directory
569                 file, openerr := os.Open(collectionMetafile)
570                 if openerr != nil {
571                         return fmt.Errorf("While opening FUSE metafile: %v", err)
572                 }
573                 defer file.Close()
574
575                 var rec arvados.Collection
576                 err = json.NewDecoder(file).Decode(&rec)
577                 if err != nil {
578                         return fmt.Errorf("While reading FUSE metafile: %v", err)
579                 }
580                 manifestText = rec.ManifestText
581         }
582
583         var response arvados.Collection
584         err = runner.ArvClient.Create("collections",
585                 arvadosclient.Dict{
586                         "collection": arvadosclient.Dict{
587                                 "manifest_text": manifestText}},
588                 &response)
589         if err != nil {
590                 return fmt.Errorf("While creating output collection: %v", err)
591         }
592
593         runner.OutputPDH = new(string)
594         *runner.OutputPDH = response.PortableDataHash
595
596         return nil
597 }
598
599 func (runner *ContainerRunner) CleanupDirs() {
600         if runner.ArvMount != nil {
601                 umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
602                 umnterr := umount.Run()
603                 if umnterr != nil {
604                         runner.CrunchLog.Printf("While running fusermount: %v", umnterr)
605                 }
606
607                 mnterr := <-runner.ArvMountExit
608                 if mnterr != nil {
609                         runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
610                 }
611         }
612
613         for _, tmpdir := range runner.CleanupTempDir {
614                 rmerr := os.RemoveAll(tmpdir)
615                 if rmerr != nil {
616                         runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
617                 }
618         }
619 }
620
621 // CommitLogs posts the collection containing the final container logs.
622 func (runner *ContainerRunner) CommitLogs() error {
623         runner.CrunchLog.Print(runner.finalState)
624         runner.CrunchLog.Close()
625
626         // Closing CrunchLog above allows it to be committed to Keep at this
627         // point, but re-open crunch log with ArvClient in case there are any
628         // other further (such as failing to write the log to Keep!) while
629         // shutting down
630         runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID,
631                 "crunch-run", nil})
632
633         if runner.LogsPDH != nil {
634                 // If we have already assigned something to LogsPDH,
635                 // we must be closing the re-opened log, which won't
636                 // end up getting attached to the container record and
637                 // therefore doesn't need to be saved as a collection
638                 // -- it exists only to send logs to other channels.
639                 return nil
640         }
641
642         mt, err := runner.LogCollection.ManifestText()
643         if err != nil {
644                 return fmt.Errorf("While creating log manifest: %v", err)
645         }
646
647         var response arvados.Collection
648         err = runner.ArvClient.Create("collections",
649                 arvadosclient.Dict{
650                         "collection": arvadosclient.Dict{
651                                 "name":          "logs for " + runner.Container.UUID,
652                                 "manifest_text": mt}},
653                 &response)
654         if err != nil {
655                 return fmt.Errorf("While creating log collection: %v", err)
656         }
657
658         runner.LogsPDH = &response.PortableDataHash
659
660         return nil
661 }
662
663 // UpdateContainerRunning updates the container state to "Running"
664 func (runner *ContainerRunner) UpdateContainerRunning() error {
665         runner.CancelLock.Lock()
666         defer runner.CancelLock.Unlock()
667         if runner.Cancelled {
668                 return ErrCancelled
669         }
670         return runner.ArvClient.Update("containers", runner.Container.UUID,
671                 arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
672 }
673
674 // ContainerToken returns the api_token the container (and any
675 // arv-mount processes) are allowed to use.
676 func (runner *ContainerRunner) ContainerToken() (string, error) {
677         if runner.token != "" {
678                 return runner.token, nil
679         }
680
681         var auth arvados.APIClientAuthorization
682         err := runner.ArvClient.Call("GET", "containers", runner.Container.UUID, "auth", nil, &auth)
683         if err != nil {
684                 return "", err
685         }
686         runner.token = auth.APIToken
687         return runner.token, nil
688 }
689
690 // UpdateContainerComplete updates the container record state on API
691 // server to "Complete" or "Cancelled"
692 func (runner *ContainerRunner) UpdateContainerFinal() error {
693         update := arvadosclient.Dict{}
694         update["state"] = runner.finalState
695         if runner.finalState == "Complete" {
696                 if runner.LogsPDH != nil {
697                         update["log"] = *runner.LogsPDH
698                 }
699                 if runner.ExitCode != nil {
700                         update["exit_code"] = *runner.ExitCode
701                 }
702                 if runner.OutputPDH != nil {
703                         update["output"] = *runner.OutputPDH
704                 }
705         }
706         return runner.ArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
707 }
708
709 // IsCancelled returns the value of Cancelled, with goroutine safety.
710 func (runner *ContainerRunner) IsCancelled() bool {
711         runner.CancelLock.Lock()
712         defer runner.CancelLock.Unlock()
713         return runner.Cancelled
714 }
715
716 // NewArvLogWriter creates an ArvLogWriter
717 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
718         return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")}
719 }
720
721 // Run the full container lifecycle.
722 func (runner *ContainerRunner) Run() (err error) {
723         runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
724
725         hostname, hosterr := os.Hostname()
726         if hosterr != nil {
727                 runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr)
728         } else {
729                 runner.CrunchLog.Printf("Executing on host '%s'", hostname)
730         }
731
732         // Clean up temporary directories _after_ finalizing
733         // everything (if we've made any by then)
734         defer runner.CleanupDirs()
735
736         runner.finalState = "Queued"
737
738         defer func() {
739                 // checkErr prints e (unless it's nil) and sets err to
740                 // e (unless err is already non-nil). Thus, if err
741                 // hasn't already been assigned when Run() returns,
742                 // this cleanup func will cause Run() to return the
743                 // first non-nil error that is passed to checkErr().
744                 checkErr := func(e error) {
745                         if e == nil {
746                                 return
747                         }
748                         runner.CrunchLog.Print(e)
749                         if err == nil {
750                                 err = e
751                         }
752                 }
753
754                 // Log the error encountered in Run(), if any
755                 checkErr(err)
756
757                 if runner.finalState == "Queued" {
758                         runner.UpdateContainerFinal()
759                         return
760                 }
761
762                 if runner.IsCancelled() {
763                         runner.finalState = "Cancelled"
764                         // but don't return yet -- we still want to
765                         // capture partial output and write logs
766                 }
767
768                 checkErr(runner.CaptureOutput())
769                 checkErr(runner.CommitLogs())
770                 checkErr(runner.UpdateContainerFinal())
771
772                 // The real log is already closed, but then we opened
773                 // a new one in case we needed to log anything while
774                 // finalizing.
775                 runner.CrunchLog.Close()
776         }()
777
778         err = runner.ArvClient.Get("containers", runner.Container.UUID, nil, &runner.Container)
779         if err != nil {
780                 err = fmt.Errorf("While getting container record: %v", err)
781                 return
782         }
783
784         // setup signal handling
785         runner.SetupSignals()
786
787         // check for and/or load image
788         err = runner.LoadImage()
789         if err != nil {
790                 err = fmt.Errorf("While loading container image: %v", err)
791                 return
792         }
793
794         // set up FUSE mount and binds
795         err = runner.SetupMounts()
796         if err != nil {
797                 err = fmt.Errorf("While setting up mounts: %v", err)
798                 return
799         }
800
801         err = runner.CreateContainer()
802         if err != nil {
803                 return
804         }
805
806         runner.StartCrunchstat()
807
808         if runner.IsCancelled() {
809                 return
810         }
811
812         err = runner.UpdateContainerRunning()
813         if err != nil {
814                 return
815         }
816         runner.finalState = "Cancelled"
817
818         err = runner.StartContainer()
819         if err != nil {
820                 return
821         }
822
823         err = runner.WaitFinish()
824         if err == nil {
825                 runner.finalState = "Complete"
826         }
827         return
828 }
829
830 // NewContainerRunner creates a new container runner.
831 func NewContainerRunner(api IArvadosClient,
832         kc IKeepClient,
833         docker ThinDockerClient,
834         containerUUID string) *ContainerRunner {
835
836         cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
837         cr.NewLogWriter = cr.NewArvLogWriter
838         cr.RunArvMount = cr.ArvMountCmd
839         cr.MkTempDir = ioutil.TempDir
840         cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
841         cr.Container.UUID = containerUUID
842         cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
843         cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
844         return cr
845 }
846
847 func main() {
848         statInterval := flag.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
849         cgroupRoot := flag.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
850         cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup")
851         flag.Parse()
852
853         containerId := flag.Arg(0)
854
855         api, err := arvadosclient.MakeArvadosClient()
856         if err != nil {
857                 log.Fatalf("%s: %v", containerId, err)
858         }
859         api.Retries = 8
860
861         var kc *keepclient.KeepClient
862         kc, err = keepclient.MakeKeepClient(&api)
863         if err != nil {
864                 log.Fatalf("%s: %v", containerId, err)
865         }
866         kc.Retries = 4
867
868         var docker *dockerclient.DockerClient
869         docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
870         if err != nil {
871                 log.Fatalf("%s: %v", containerId, err)
872         }
873
874         cr := NewContainerRunner(api, kc, docker, containerId)
875         cr.statInterval = *statInterval
876         cr.cgroupRoot = *cgroupRoot
877         cr.cgroupParent = *cgroupParent
878
879         err = cr.Run()
880         if err != nil {
881                 log.Fatalf("%s: %v", containerId, err)
882         }
883
884 }