Merge branch '8876-work-unit' into 8650-container-work-unit
[arvados.git] / services / crunch-run / crunchrun.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/keepclient"
10         "git.curoverse.com/arvados.git/sdk/go/manifest"
11         "github.com/curoverse/dockerclient"
12         "io"
13         "io/ioutil"
14         "log"
15         "os"
16         "os/exec"
17         "os/signal"
18         "path"
19         "strings"
20         "sync"
21         "syscall"
22         "time"
23 )
24
25 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
26 type IArvadosClient interface {
27         Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
28         Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
29         Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
30 }
31
32 // ErrCancelled is the error returned when the container is cancelled.
33 var ErrCancelled = errors.New("Cancelled")
34
35 // IKeepClient is the minimal Keep API methods used by crunch-run.
36 type IKeepClient interface {
37         PutHB(hash string, buf []byte) (string, int, error)
38         ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
39 }
40
41 // Mount describes the mount points to create inside the container.
42 type Mount struct {
43         Kind             string `json:"kind"`
44         Writable         bool   `json:"writable"`
45         PortableDataHash string `json:"portable_data_hash"`
46         UUID             string `json:"uuid"`
47         DeviceType       string `json:"device_type"`
48         Path             string `json:"path"`
49 }
50
51 // Collection record returned by the API server.
52 type CollectionRecord struct {
53         ManifestText     string `json:"manifest_text"`
54         PortableDataHash string `json:"portable_data_hash"`
55 }
56
57 // ContainerRecord is the container record returned by the API server.
58 type ContainerRecord struct {
59         UUID               string                 `json:"uuid"`
60         Command            []string               `json:"command"`
61         ContainerImage     string                 `json:"container_image"`
62         Cwd                string                 `json:"cwd"`
63         Environment        map[string]string      `json:"environment"`
64         Mounts             map[string]Mount       `json:"mounts"`
65         OutputPath         string                 `json:"output_path"`
66         Priority           int                    `json:"priority"`
67         RuntimeConstraints map[string]interface{} `json:"runtime_constraints"`
68         State              string                 `json:"state"`
69         Output             string                 `json:"output"`
70 }
71
72 // NewLogWriter is a factory function to create a new log writer.
73 type NewLogWriter func(name string) io.WriteCloser
74
75 type RunArvMount func([]string) (*exec.Cmd, error)
76
77 type MkTempDir func(string, string) (string, error)
78
79 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
80 type ThinDockerClient interface {
81         StopContainer(id string, timeout int) error
82         InspectImage(id string) (*dockerclient.ImageInfo, error)
83         LoadImage(reader io.Reader) error
84         CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
85         StartContainer(id string, config *dockerclient.HostConfig) error
86         AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error)
87         Wait(id string) <-chan dockerclient.WaitResult
88         RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
89 }
90
91 // ContainerRunner is the main stateful struct used for a single execution of a
92 // container.
93 type ContainerRunner struct {
94         Docker    ThinDockerClient
95         ArvClient IArvadosClient
96         Kc        IKeepClient
97         ContainerRecord
98         dockerclient.ContainerConfig
99         ContainerID string
100         ExitCode    *int
101         NewLogWriter
102         loggingDone   chan bool
103         CrunchLog     *ThrottledLogger
104         Stdout        io.WriteCloser
105         Stderr        *ThrottledLogger
106         LogCollection *CollectionWriter
107         LogsPDH       *string
108         RunArvMount
109         MkTempDir
110         ArvMount       *exec.Cmd
111         ArvMountPoint  string
112         HostOutputDir  string
113         CleanupTempDir []string
114         Binds          []string
115         OutputPDH      *string
116         CancelLock     sync.Mutex
117         Cancelled      bool
118         SigChan        chan os.Signal
119         ArvMountExit   chan error
120         finalState     string
121 }
122
123 // SetupSignals sets up signal handling to gracefully terminate the underlying
124 // Docker container and update state when receiving a TERM, INT or QUIT signal.
125 func (runner *ContainerRunner) SetupSignals() {
126         runner.SigChan = make(chan os.Signal, 1)
127         signal.Notify(runner.SigChan, syscall.SIGTERM)
128         signal.Notify(runner.SigChan, syscall.SIGINT)
129         signal.Notify(runner.SigChan, syscall.SIGQUIT)
130
131         go func(sig <-chan os.Signal) {
132                 for _ = range sig {
133                         if !runner.Cancelled {
134                                 runner.CancelLock.Lock()
135                                 runner.Cancelled = true
136                                 if runner.ContainerID != "" {
137                                         runner.Docker.StopContainer(runner.ContainerID, 10)
138                                 }
139                                 runner.CancelLock.Unlock()
140                         }
141                 }
142         }(runner.SigChan)
143 }
144
145 // LoadImage determines the docker image id from the container record and
146 // checks if it is available in the local Docker image store.  If not, it loads
147 // the image from Keep.
148 func (runner *ContainerRunner) LoadImage() (err error) {
149
150         runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.ContainerRecord.ContainerImage)
151
152         var collection CollectionRecord
153         err = runner.ArvClient.Get("collections", runner.ContainerRecord.ContainerImage, nil, &collection)
154         if err != nil {
155                 return fmt.Errorf("While getting container image collection: %v", err)
156         }
157         manifest := manifest.Manifest{Text: collection.ManifestText}
158         var img, imageID string
159         for ms := range manifest.StreamIter() {
160                 img = ms.FileStreamSegments[0].Name
161                 if !strings.HasSuffix(img, ".tar") {
162                         return fmt.Errorf("First file in the container image collection does not end in .tar")
163                 }
164                 imageID = img[:len(img)-4]
165         }
166
167         runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
168
169         _, err = runner.Docker.InspectImage(imageID)
170         if err != nil {
171                 runner.CrunchLog.Print("Loading Docker image from keep")
172
173                 var readCloser io.ReadCloser
174                 readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
175                 if err != nil {
176                         return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
177                 }
178
179                 err = runner.Docker.LoadImage(readCloser)
180                 if err != nil {
181                         return fmt.Errorf("While loading container image into Docker: %v", err)
182                 }
183         } else {
184                 runner.CrunchLog.Print("Docker image is available")
185         }
186
187         runner.ContainerConfig.Image = imageID
188
189         return nil
190 }
191
192 func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string) (c *exec.Cmd, err error) {
193         c = exec.Command("arv-mount", arvMountCmd...)
194         nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
195         c.Stdout = nt
196         c.Stderr = nt
197
198         err = c.Start()
199         if err != nil {
200                 return nil, err
201         }
202
203         statReadme := make(chan bool)
204         runner.ArvMountExit = make(chan error)
205
206         keepStatting := true
207         go func() {
208                 for keepStatting {
209                         time.Sleep(100 * time.Millisecond)
210                         _, err = os.Stat(fmt.Sprintf("%s/by_id/README", runner.ArvMountPoint))
211                         if err == nil {
212                                 keepStatting = false
213                                 statReadme <- true
214                         }
215                 }
216                 close(statReadme)
217         }()
218
219         go func() {
220                 runner.ArvMountExit <- c.Wait()
221                 close(runner.ArvMountExit)
222         }()
223
224         select {
225         case <-statReadme:
226                 break
227         case err := <-runner.ArvMountExit:
228                 runner.ArvMount = nil
229                 keepStatting = false
230                 return nil, err
231         }
232
233         return c, nil
234 }
235
236 func (runner *ContainerRunner) SetupMounts() (err error) {
237         runner.ArvMountPoint, err = runner.MkTempDir("", "keep")
238         if err != nil {
239                 return fmt.Errorf("While creating keep mount temp dir: %v", err)
240         }
241
242         runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
243
244         pdhOnly := true
245         tmpcount := 0
246         arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
247         collectionPaths := []string{}
248         runner.Binds = nil
249
250         for bind, mnt := range runner.ContainerRecord.Mounts {
251                 if bind == "stdout" {
252                         // Is it a "file" mount kind?
253                         if mnt.Kind != "file" {
254                                 return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
255                         }
256
257                         // Does path start with OutputPath?
258                         prefix := runner.ContainerRecord.OutputPath
259                         if !strings.HasSuffix(prefix, "/") {
260                                 prefix += "/"
261                         }
262                         if !strings.HasPrefix(mnt.Path, prefix) {
263                                 return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
264                         }
265                 }
266
267                 if mnt.Kind == "collection" {
268                         var src string
269                         if mnt.UUID != "" && mnt.PortableDataHash != "" {
270                                 return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
271                         }
272                         if mnt.UUID != "" {
273                                 if mnt.Writable {
274                                         return fmt.Errorf("Writing to existing collections currently not permitted.")
275                                 }
276                                 pdhOnly = false
277                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
278                         } else if mnt.PortableDataHash != "" {
279                                 if mnt.Writable {
280                                         return fmt.Errorf("Can never write to a collection specified by portable data hash")
281                                 }
282                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
283                         } else {
284                                 src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
285                                 arvMountCmd = append(arvMountCmd, "--mount-tmp")
286                                 arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
287                                 tmpcount += 1
288                         }
289                         if mnt.Writable {
290                                 if bind == runner.ContainerRecord.OutputPath {
291                                         runner.HostOutputDir = src
292                                 }
293                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
294                         } else {
295                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
296                         }
297                         collectionPaths = append(collectionPaths, src)
298                 } else if mnt.Kind == "tmp" {
299                         if bind == runner.ContainerRecord.OutputPath {
300                                 runner.HostOutputDir, err = runner.MkTempDir("", "")
301                                 if err != nil {
302                                         return fmt.Errorf("While creating mount temp dir: %v", err)
303                                 }
304                                 st, staterr := os.Stat(runner.HostOutputDir)
305                                 if staterr != nil {
306                                         return fmt.Errorf("While Stat on temp dir: %v", staterr)
307                                 }
308                                 err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
309                                 if staterr != nil {
310                                         return fmt.Errorf("While Chmod temp dir: %v", err)
311                                 }
312                                 runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
313                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
314                         } else {
315                                 runner.Binds = append(runner.Binds, bind)
316                         }
317                 }
318         }
319
320         if runner.HostOutputDir == "" {
321                 return fmt.Errorf("Output path does not correspond to a writable mount point")
322         }
323
324         if pdhOnly {
325                 arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
326         } else {
327                 arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
328         }
329         arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
330
331         runner.ArvMount, err = runner.RunArvMount(arvMountCmd)
332         if err != nil {
333                 return fmt.Errorf("While trying to start arv-mount: %v", err)
334         }
335
336         for _, p := range collectionPaths {
337                 _, err = os.Stat(p)
338                 if err != nil {
339                         return fmt.Errorf("While checking that input files exist: %v", err)
340                 }
341         }
342
343         return nil
344 }
345
346 func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
347         // Handle docker log protocol
348         // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
349
350         header := make([]byte, 8)
351         for {
352                 _, readerr := io.ReadAtLeast(containerReader, header, 8)
353
354                 if readerr == nil {
355                         readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
356                         if header[0] == 1 {
357                                 // stdout
358                                 _, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
359                         } else {
360                                 // stderr
361                                 _, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
362                         }
363                 }
364
365                 if readerr != nil {
366                         if readerr != io.EOF {
367                                 runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
368                         }
369
370                         closeerr := runner.Stdout.Close()
371                         if closeerr != nil {
372                                 runner.CrunchLog.Printf("While closing stdout logs: %v", closeerr)
373                         }
374
375                         closeerr = runner.Stderr.Close()
376                         if closeerr != nil {
377                                 runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
378                         }
379
380                         runner.loggingDone <- true
381                         close(runner.loggingDone)
382                         return
383                 }
384         }
385 }
386
387 // AttachLogs connects the docker container stdout and stderr logs to the
388 // Arvados logger which logs to Keep and the API server logs table.
389 func (runner *ContainerRunner) AttachStreams() (err error) {
390
391         runner.CrunchLog.Print("Attaching container streams")
392
393         var containerReader io.Reader
394         containerReader, err = runner.Docker.AttachContainer(runner.ContainerID,
395                 &dockerclient.AttachOptions{Stream: true, Stdout: true, Stderr: true})
396         if err != nil {
397                 return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
398         }
399
400         runner.loggingDone = make(chan bool)
401
402         if stdoutMnt, ok := runner.ContainerRecord.Mounts["stdout"]; ok {
403                 stdoutPath := stdoutMnt.Path[len(runner.ContainerRecord.OutputPath):]
404                 index := strings.LastIndex(stdoutPath, "/")
405                 if index > 0 {
406                         subdirs := stdoutPath[:index]
407                         if subdirs != "" {
408                                 st, err := os.Stat(runner.HostOutputDir)
409                                 if err != nil {
410                                         return fmt.Errorf("While Stat on temp dir: %v", err)
411                                 }
412                                 stdoutPath := path.Join(runner.HostOutputDir, subdirs)
413                                 err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
414                                 if err != nil {
415                                         return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
416                                 }
417                         }
418                 }
419                 stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
420                 if err != nil {
421                         return fmt.Errorf("While creating stdout file: %v", err)
422                 }
423                 runner.Stdout = stdoutFile
424         } else {
425                 runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
426         }
427         runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
428
429         go runner.ProcessDockerAttach(containerReader)
430
431         return nil
432 }
433
434 // StartContainer creates the container and runs it.
435 func (runner *ContainerRunner) StartContainer() (err error) {
436         runner.CrunchLog.Print("Creating Docker container")
437
438         runner.CancelLock.Lock()
439         defer runner.CancelLock.Unlock()
440
441         if runner.Cancelled {
442                 return ErrCancelled
443         }
444
445         runner.ContainerConfig.Cmd = runner.ContainerRecord.Command
446         if runner.ContainerRecord.Cwd != "." {
447                 runner.ContainerConfig.WorkingDir = runner.ContainerRecord.Cwd
448         }
449         for k, v := range runner.ContainerRecord.Environment {
450                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
451         }
452         runner.ContainerConfig.NetworkDisabled = true
453         runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
454         if err != nil {
455                 return fmt.Errorf("While creating container: %v", err)
456         }
457         hostConfig := &dockerclient.HostConfig{Binds: runner.Binds,
458                 LogConfig: dockerclient.LogConfig{Type: "none"}}
459
460         err = runner.AttachStreams()
461         if err != nil {
462                 return err
463         }
464
465         runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
466         err = runner.Docker.StartContainer(runner.ContainerID, hostConfig)
467         if err != nil {
468                 return fmt.Errorf("While starting container: %v", err)
469         }
470
471         return nil
472 }
473
474 // WaitFinish waits for the container to terminate, capture the exit code, and
475 // close the stdout/stderr logging.
476 func (runner *ContainerRunner) WaitFinish() error {
477         runner.CrunchLog.Print("Waiting for container to finish")
478
479         result := runner.Docker.Wait(runner.ContainerID)
480         wr := <-result
481         if wr.Error != nil {
482                 return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
483         }
484         runner.ExitCode = &wr.ExitCode
485
486         // wait for stdout/stderr to complete
487         <-runner.loggingDone
488
489         return nil
490 }
491
492 // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
493 func (runner *ContainerRunner) CaptureOutput() error {
494         if runner.finalState != "Complete" {
495                 return nil
496         }
497
498         if runner.HostOutputDir == "" {
499                 return nil
500         }
501
502         _, err := os.Stat(runner.HostOutputDir)
503         if err != nil {
504                 return fmt.Errorf("While checking host output path: %v", err)
505         }
506
507         var manifestText string
508
509         collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
510         _, err = os.Stat(collectionMetafile)
511         if err != nil {
512                 // Regular directory
513                 cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}}
514                 manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
515                 if err != nil {
516                         return fmt.Errorf("While uploading output files: %v", err)
517                 }
518         } else {
519                 // FUSE mount directory
520                 file, openerr := os.Open(collectionMetafile)
521                 if openerr != nil {
522                         return fmt.Errorf("While opening FUSE metafile: %v", err)
523                 }
524                 defer file.Close()
525
526                 rec := CollectionRecord{}
527                 err = json.NewDecoder(file).Decode(&rec)
528                 if err != nil {
529                         return fmt.Errorf("While reading FUSE metafile: %v", err)
530                 }
531                 manifestText = rec.ManifestText
532         }
533
534         var response CollectionRecord
535         err = runner.ArvClient.Create("collections",
536                 arvadosclient.Dict{
537                         "collection": arvadosclient.Dict{
538                                 "manifest_text": manifestText}},
539                 &response)
540         if err != nil {
541                 return fmt.Errorf("While creating output collection: %v", err)
542         }
543
544         runner.OutputPDH = new(string)
545         *runner.OutputPDH = response.PortableDataHash
546
547         return nil
548 }
549
550 func (runner *ContainerRunner) CleanupDirs() {
551         if runner.ArvMount != nil {
552                 umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
553                 umnterr := umount.Run()
554                 if umnterr != nil {
555                         runner.CrunchLog.Printf("While running fusermount: %v", umnterr)
556                 }
557
558                 mnterr := <-runner.ArvMountExit
559                 if mnterr != nil {
560                         runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
561                 }
562         }
563
564         for _, tmpdir := range runner.CleanupTempDir {
565                 rmerr := os.RemoveAll(tmpdir)
566                 if rmerr != nil {
567                         runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
568                 }
569         }
570 }
571
572 // CommitLogs posts the collection containing the final container logs.
573 func (runner *ContainerRunner) CommitLogs() error {
574         runner.CrunchLog.Print(runner.finalState)
575         runner.CrunchLog.Close()
576
577         // Closing CrunchLog above allows it to be committed to Keep at this
578         // point, but re-open crunch log with ArvClient in case there are any
579         // other further (such as failing to write the log to Keep!) while
580         // shutting down
581         runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID,
582                 "crunch-run", nil})
583
584         mt, err := runner.LogCollection.ManifestText()
585         if err != nil {
586                 return fmt.Errorf("While creating log manifest: %v", err)
587         }
588
589         var response CollectionRecord
590         err = runner.ArvClient.Create("collections",
591                 arvadosclient.Dict{
592                         "collection": arvadosclient.Dict{
593                                 "name":          "logs for " + runner.ContainerRecord.UUID,
594                                 "manifest_text": mt}},
595                 &response)
596         if err != nil {
597                 return fmt.Errorf("While creating log collection: %v", err)
598         }
599
600         runner.LogsPDH = new(string)
601         *runner.LogsPDH = response.PortableDataHash
602
603         return nil
604 }
605
606 // UpdateContainerRecordRunning updates the container state to "Running"
607 func (runner *ContainerRunner) UpdateContainerRecordRunning() error {
608         return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID,
609                 arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
610 }
611
612 // UpdateContainerRecordComplete updates the container record state on API
613 // server to "Complete" or "Cancelled"
614 func (runner *ContainerRunner) UpdateContainerRecordComplete() error {
615         update := arvadosclient.Dict{}
616         if runner.LogsPDH != nil {
617                 update["log"] = *runner.LogsPDH
618         }
619         if runner.ExitCode != nil {
620                 update["exit_code"] = *runner.ExitCode
621         }
622         if runner.OutputPDH != nil {
623                 update["output"] = runner.OutputPDH
624         }
625
626         update["state"] = runner.finalState
627
628         return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, arvadosclient.Dict{"container": update}, nil)
629 }
630
631 // NewArvLogWriter creates an ArvLogWriter
632 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
633         return &ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID, name, runner.LogCollection.Open(name + ".txt")}
634 }
635
636 // Run the full container lifecycle.
637 func (runner *ContainerRunner) Run() (err error) {
638         runner.CrunchLog.Printf("Executing container '%s'", runner.ContainerRecord.UUID)
639
640         hostname, hosterr := os.Hostname()
641         if hosterr != nil {
642                 runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr)
643         } else {
644                 runner.CrunchLog.Printf("Executing on host '%s'", hostname)
645         }
646
647         var runerr, waiterr error
648
649         defer func() {
650                 if err != nil {
651                         runner.CrunchLog.Print(err)
652                 }
653
654                 if runner.Cancelled {
655                         runner.finalState = "Cancelled"
656                 } else {
657                         runner.finalState = "Complete"
658                 }
659
660                 // (6) capture output
661                 outputerr := runner.CaptureOutput()
662                 if outputerr != nil {
663                         runner.CrunchLog.Print(outputerr)
664                 }
665
666                 // (7) clean up temporary directories
667                 runner.CleanupDirs()
668
669                 // (8) write logs
670                 logerr := runner.CommitLogs()
671                 if logerr != nil {
672                         runner.CrunchLog.Print(logerr)
673                 }
674
675                 // (9) update container record with results
676                 updateerr := runner.UpdateContainerRecordComplete()
677                 if updateerr != nil {
678                         runner.CrunchLog.Print(updateerr)
679                 }
680
681                 runner.CrunchLog.Close()
682
683                 if err == nil {
684                         if runerr != nil {
685                                 err = runerr
686                         } else if waiterr != nil {
687                                 err = waiterr
688                         } else if logerr != nil {
689                                 err = logerr
690                         } else if updateerr != nil {
691                                 err = updateerr
692                         }
693                 }
694         }()
695
696         err = runner.ArvClient.Get("containers", runner.ContainerRecord.UUID, nil, &runner.ContainerRecord)
697         if err != nil {
698                 return fmt.Errorf("While getting container record: %v", err)
699         }
700
701         // (1) setup signal handling
702         runner.SetupSignals()
703
704         // (2) check for and/or load image
705         err = runner.LoadImage()
706         if err != nil {
707                 return fmt.Errorf("While loading container image: %v", err)
708         }
709
710         // (3) set up FUSE mount and binds
711         err = runner.SetupMounts()
712         if err != nil {
713                 return fmt.Errorf("While setting up mounts: %v", err)
714         }
715
716         // (3) create and start container
717         err = runner.StartContainer()
718         if err != nil {
719                 if err == ErrCancelled {
720                         err = nil
721                 }
722                 return
723         }
724
725         // (4) update container record state
726         err = runner.UpdateContainerRecordRunning()
727         if err != nil {
728                 runner.CrunchLog.Print(err)
729         }
730
731         // (5) wait for container to finish
732         waiterr = runner.WaitFinish()
733
734         return
735 }
736
737 // NewContainerRunner creates a new container runner.
738 func NewContainerRunner(api IArvadosClient,
739         kc IKeepClient,
740         docker ThinDockerClient,
741         containerUUID string) *ContainerRunner {
742
743         cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
744         cr.NewLogWriter = cr.NewArvLogWriter
745         cr.RunArvMount = cr.ArvMountCmd
746         cr.MkTempDir = ioutil.TempDir
747         cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
748         cr.ContainerRecord.UUID = containerUUID
749         cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
750         cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
751         return cr
752 }
753
754 func main() {
755         flag.Parse()
756
757         containerId := flag.Arg(0)
758
759         api, err := arvadosclient.MakeArvadosClient()
760         if err != nil {
761                 log.Fatalf("%s: %v", containerId, err)
762         }
763         api.Retries = 8
764
765         var kc *keepclient.KeepClient
766         kc, err = keepclient.MakeKeepClient(&api)
767         if err != nil {
768                 log.Fatalf("%s: %v", containerId, err)
769         }
770         kc.Retries = 4
771
772         var docker *dockerclient.DockerClient
773         docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
774         if err != nil {
775                 log.Fatalf("%s: %v", containerId, err)
776         }
777
778         cr := NewContainerRunner(api, kc, docker, containerId)
779
780         err = cr.Run()
781         if err != nil {
782                 log.Fatalf("%s: %v", containerId, err)
783         }
784
785 }