8015: Fix typos in error reporting. Fix tests.
[arvados.git] / services / crunch-run / crunchrun.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/keepclient"
10         "git.curoverse.com/arvados.git/sdk/go/manifest"
11         "github.com/curoverse/dockerclient"
12         "io"
13         "io/ioutil"
14         "log"
15         "os"
16         "os/exec"
17         "os/signal"
18         "strings"
19         "sync"
20         "syscall"
21         "time"
22 )
23
24 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
25 type IArvadosClient interface {
26         Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
27         Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
28         Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
29 }
30
31 // ErrCancelled is the error returned when the container is cancelled.
32 var ErrCancelled = errors.New("Cancelled")
33
34 // IKeepClient is the minimal Keep API methods used by crunch-run.
35 type IKeepClient interface {
36         PutHB(hash string, buf []byte) (string, int, error)
37         ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
38 }
39
40 // Mount describes the mount points to create inside the container.
41 type Mount struct {
42         Kind             string `json:"kind"`
43         Writable         bool   `json:"writable"`
44         PortableDataHash string `json:"portable_data_hash"`
45         UUID             string `json:"uuid"`
46         DeviceType       string `json:"device_type"`
47 }
48
49 // Collection record returned by the API server.
50 type CollectionRecord struct {
51         ManifestText     string `json:"manifest_text"`
52         PortableDataHash string `json:"portable_data_hash"`
53 }
54
55 // ContainerRecord is the container record returned by the API server.
56 type ContainerRecord struct {
57         UUID               string                 `json:"uuid"`
58         Command            []string               `json:"command"`
59         ContainerImage     string                 `json:"container_image"`
60         Cwd                string                 `json:"cwd"`
61         Environment        map[string]string      `json:"environment"`
62         Mounts             map[string]Mount       `json:"mounts"`
63         OutputPath         string                 `json:"output_path"`
64         Priority           int                    `json:"priority"`
65         RuntimeConstraints map[string]interface{} `json:"runtime_constraints"`
66         State              string                 `json:"state"`
67         Output             string                 `json:"output"`
68 }
69
70 // NewLogWriter is a factory function to create a new log writer.
71 type NewLogWriter func(name string) io.WriteCloser
72
73 type RunArvMount func([]string) (*exec.Cmd, error)
74
75 type MkTempDir func(string, string) (string, error)
76
77 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
78 type ThinDockerClient interface {
79         StopContainer(id string, timeout int) error
80         InspectImage(id string) (*dockerclient.ImageInfo, error)
81         LoadImage(reader io.Reader) error
82         CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
83         StartContainer(id string, config *dockerclient.HostConfig) error
84         AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error)
85         Wait(id string) <-chan dockerclient.WaitResult
86         RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
87 }
88
89 // ContainerRunner is the main stateful struct used for a single execution of a
90 // container.
91 type ContainerRunner struct {
92         Docker    ThinDockerClient
93         ArvClient IArvadosClient
94         Kc        IKeepClient
95         ContainerRecord
96         dockerclient.ContainerConfig
97         ContainerID string
98         ExitCode    *int
99         NewLogWriter
100         loggingDone   chan bool
101         CrunchLog     *ThrottledLogger
102         Stdout        *ThrottledLogger
103         Stderr        *ThrottledLogger
104         LogCollection *CollectionWriter
105         LogsPDH       *string
106         RunArvMount
107         MkTempDir
108         ArvMount       *exec.Cmd
109         ArvMountPoint  string
110         HostOutputDir  string
111         CleanupTempDir []string
112         Binds          []string
113         OutputPDH      *string
114         CancelLock     sync.Mutex
115         Cancelled      bool
116         SigChan        chan os.Signal
117         ArvMountExit   chan error
118         finalState     string
119 }
120
121 // SetupSignals sets up signal handling to gracefully terminate the underlying
122 // Docker container and update state when receiving a TERM, INT or QUIT signal.
123 func (runner *ContainerRunner) SetupSignals() {
124         runner.SigChan = make(chan os.Signal, 1)
125         signal.Notify(runner.SigChan, syscall.SIGTERM)
126         signal.Notify(runner.SigChan, syscall.SIGINT)
127         signal.Notify(runner.SigChan, syscall.SIGQUIT)
128
129         go func(sig <-chan os.Signal) {
130                 for _ = range sig {
131                         if !runner.Cancelled {
132                                 runner.CancelLock.Lock()
133                                 runner.Cancelled = true
134                                 if runner.ContainerID != "" {
135                                         runner.Docker.StopContainer(runner.ContainerID, 10)
136                                 }
137                                 runner.CancelLock.Unlock()
138                         }
139                 }
140         }(runner.SigChan)
141 }
142
143 // LoadImage determines the docker image id from the container record and
144 // checks if it is available in the local Docker image store.  If not, it loads
145 // the image from Keep.
146 func (runner *ContainerRunner) LoadImage() (err error) {
147
148         runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.ContainerRecord.ContainerImage)
149
150         var collection CollectionRecord
151         err = runner.ArvClient.Get("collections", runner.ContainerRecord.ContainerImage, nil, &collection)
152         if err != nil {
153                 return fmt.Errorf("While getting container image collection: %v", err)
154         }
155         manifest := manifest.Manifest{Text: collection.ManifestText}
156         var img, imageID string
157         for ms := range manifest.StreamIter() {
158                 img = ms.FileStreamSegments[0].Name
159                 if !strings.HasSuffix(img, ".tar") {
160                         return fmt.Errorf("First file in the container image collection does not end in .tar")
161                 }
162                 imageID = img[:len(img)-4]
163         }
164
165         runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
166
167         _, err = runner.Docker.InspectImage(imageID)
168         if err != nil {
169                 runner.CrunchLog.Print("Loading Docker image from keep")
170
171                 var readCloser io.ReadCloser
172                 readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
173                 if err != nil {
174                         return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
175                 }
176
177                 err = runner.Docker.LoadImage(readCloser)
178                 if err != nil {
179                         return fmt.Errorf("While loading container image into Docker: %v", err)
180                 }
181         } else {
182                 runner.CrunchLog.Print("Docker image is available")
183         }
184
185         runner.ContainerConfig.Image = imageID
186
187         return nil
188 }
189
190 func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string) (c *exec.Cmd, err error) {
191         c = exec.Command("arv-mount", arvMountCmd...)
192         nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
193         c.Stdout = nt
194         c.Stderr = nt
195
196         err = c.Start()
197         if err != nil {
198                 return nil, err
199         }
200
201         statReadme := make(chan bool)
202         runner.ArvMountExit = make(chan error)
203
204         keepStatting := true
205         go func() {
206                 for keepStatting {
207                         time.Sleep(100 * time.Millisecond)
208                         _, err = os.Stat(fmt.Sprintf("%s/by_id/README", runner.ArvMountPoint))
209                         if err == nil {
210                                 keepStatting = false
211                                 statReadme <- true
212                         }
213                 }
214                 close(statReadme)
215         }()
216
217         go func() {
218                 runner.ArvMountExit <- c.Wait()
219                 close(runner.ArvMountExit)
220         }()
221
222         select {
223         case <-statReadme:
224                 break
225         case err := <-runner.ArvMountExit:
226                 runner.ArvMount = nil
227                 keepStatting = false
228                 return nil, err
229         }
230
231         return c, nil
232 }
233
234 func (runner *ContainerRunner) SetupMounts() (err error) {
235         runner.ArvMountPoint, err = runner.MkTempDir("", "keep")
236         if err != nil {
237                 return fmt.Errorf("While creating keep mount temp dir: %v", err)
238         }
239
240         runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
241
242         pdhOnly := true
243         tmpcount := 0
244         arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
245         collectionPaths := []string{}
246         runner.Binds = nil
247
248         for bind, mnt := range runner.ContainerRecord.Mounts {
249                 if mnt.Kind == "collection" {
250                         var src string
251                         if mnt.UUID != "" && mnt.PortableDataHash != "" {
252                                 return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
253                         }
254                         if mnt.UUID != "" {
255                                 if mnt.Writable {
256                                         return fmt.Errorf("Writing to existing collections currently not permitted.")
257                                 }
258                                 pdhOnly = false
259                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
260                         } else if mnt.PortableDataHash != "" {
261                                 if mnt.Writable {
262                                         return fmt.Errorf("Can never write to a collection specified by portable data hash")
263                                 }
264                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
265                         } else {
266                                 src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
267                                 arvMountCmd = append(arvMountCmd, "--mount-tmp")
268                                 arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
269                                 tmpcount += 1
270                         }
271                         if mnt.Writable {
272                                 if bind == runner.ContainerRecord.OutputPath {
273                                         runner.HostOutputDir = src
274                                 }
275                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
276                         } else {
277                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
278                         }
279                         collectionPaths = append(collectionPaths, src)
280                 } else if mnt.Kind == "tmp" {
281                         if bind == runner.ContainerRecord.OutputPath {
282                                 runner.HostOutputDir, err = runner.MkTempDir("", "")
283                                 if err != nil {
284                                         return fmt.Errorf("While creating mount temp dir: %v", err)
285                                 }
286                                 st, staterr := os.Stat(runner.HostOutputDir)
287                                 if staterr != nil {
288                                         return fmt.Errorf("While Stat on temp dir: %v", staterr)
289                                 }
290                                 err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
291                                 if staterr != nil {
292                                         return fmt.Errorf("While Chmod temp dir: %v", err)
293                                 }
294                                 runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
295                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
296                         } else {
297                                 runner.Binds = append(runner.Binds, bind)
298                         }
299                 } else {
300                         return fmt.Errorf("Unknown mount kind '%s'", mnt.Kind)
301                 }
302         }
303
304         if runner.HostOutputDir == "" {
305                 return fmt.Errorf("Output path does not correspond to a writable mount point")
306         }
307
308         if pdhOnly {
309                 arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
310         } else {
311                 arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
312         }
313         arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
314
315         runner.ArvMount, err = runner.RunArvMount(arvMountCmd)
316         if err != nil {
317                 return fmt.Errorf("While trying to start arv-mount: %v", err)
318         }
319
320         for _, p := range collectionPaths {
321                 _, err = os.Stat(p)
322                 if err != nil {
323                         return fmt.Errorf("While checking that input files exist: %v", err)
324                 }
325         }
326
327         return nil
328 }
329
330 func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
331         // Handle docker log protocol
332         // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
333
334         header := make([]byte, 8)
335         for {
336                 _, readerr := io.ReadAtLeast(containerReader, header, 8)
337
338                 if readerr == nil {
339                         readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
340                         if header[0] == 1 {
341                                 // stdout
342                                 _, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
343                         } else {
344                                 // stderr
345                                 _, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
346                         }
347                 }
348
349                 if readerr != nil {
350                         if readerr != io.EOF {
351                                 runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
352                         }
353
354                         closeerr := runner.Stdout.Close()
355                         if closeerr != nil {
356                                 runner.CrunchLog.Printf("While closing stdout logs: %v", closeerr)
357                         }
358
359                         closeerr = runner.Stderr.Close()
360                         if closeerr != nil {
361                                 runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
362                         }
363
364                         runner.loggingDone <- true
365                         close(runner.loggingDone)
366                         return
367                 }
368         }
369 }
370
371 // AttachLogs connects the docker container stdout and stderr logs to the
372 // Arvados logger which logs to Keep and the API server logs table.
373 func (runner *ContainerRunner) AttachStreams() (err error) {
374
375         runner.CrunchLog.Print("Attaching container streams")
376
377         var containerReader io.Reader
378         containerReader, err = runner.Docker.AttachContainer(runner.ContainerID,
379                 &dockerclient.AttachOptions{Stream: true, Stdout: true, Stderr: true})
380         if err != nil {
381                 return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
382         }
383
384         runner.loggingDone = make(chan bool)
385
386         runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
387         runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
388
389         go runner.ProcessDockerAttach(containerReader)
390
391         return nil
392 }
393
394 // StartContainer creates the container and runs it.
395 func (runner *ContainerRunner) StartContainer() (err error) {
396         runner.CrunchLog.Print("Creating Docker container")
397
398         runner.CancelLock.Lock()
399         defer runner.CancelLock.Unlock()
400
401         if runner.Cancelled {
402                 return ErrCancelled
403         }
404
405         runner.ContainerConfig.Cmd = runner.ContainerRecord.Command
406         if runner.ContainerRecord.Cwd != "." {
407                 runner.ContainerConfig.WorkingDir = runner.ContainerRecord.Cwd
408         }
409         for k, v := range runner.ContainerRecord.Environment {
410                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
411         }
412         runner.ContainerConfig.NetworkDisabled = true
413         runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
414         if err != nil {
415                 return fmt.Errorf("While creating container: %v", err)
416         }
417         hostConfig := &dockerclient.HostConfig{Binds: runner.Binds,
418                 LogConfig: dockerclient.LogConfig{Type: "none"}}
419
420         err = runner.AttachStreams()
421         if err != nil {
422                 return err
423         }
424
425         runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
426         err = runner.Docker.StartContainer(runner.ContainerID, hostConfig)
427         if err != nil {
428                 return fmt.Errorf("While starting container: %v", err)
429         }
430
431         return nil
432 }
433
434 // WaitFinish waits for the container to terminate, capture the exit code, and
435 // close the stdout/stderr logging.
436 func (runner *ContainerRunner) WaitFinish() error {
437         runner.CrunchLog.Print("Waiting for container to finish")
438
439         result := runner.Docker.Wait(runner.ContainerID)
440         wr := <-result
441         if wr.Error != nil {
442                 return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
443         }
444         runner.ExitCode = &wr.ExitCode
445
446         // wait for stdout/stderr to complete
447         <-runner.loggingDone
448
449         return nil
450 }
451
452 // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
453 func (runner *ContainerRunner) CaptureOutput() error {
454         if runner.finalState != "Complete" {
455                 return nil
456         }
457
458         if runner.HostOutputDir == "" {
459                 return nil
460         }
461
462         _, err := os.Stat(runner.HostOutputDir)
463         if err != nil {
464                 return fmt.Errorf("While checking host output path: %v", err)
465         }
466
467         var manifestText string
468
469         collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
470         _, err = os.Stat(collectionMetafile)
471         if err != nil {
472                 // Regular directory
473                 cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}}
474                 manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
475                 if err != nil {
476                         return fmt.Errorf("While uploading output files: %v", err)
477                 }
478         } else {
479                 // FUSE mount directory
480                 file, openerr := os.Open(collectionMetafile)
481                 if openerr != nil {
482                         return fmt.Errorf("While opening FUSE metafile: %v", err)
483                 }
484                 defer file.Close()
485
486                 rec := CollectionRecord{}
487                 err = json.NewDecoder(file).Decode(&rec)
488                 if err != nil {
489                         return fmt.Errorf("While reading FUSE metafile: %v", err)
490                 }
491                 manifestText = rec.ManifestText
492         }
493
494         var response CollectionRecord
495         err = runner.ArvClient.Create("collections",
496                 arvadosclient.Dict{
497                         "collection": arvadosclient.Dict{
498                                 "manifest_text": manifestText}},
499                 &response)
500         if err != nil {
501                 return fmt.Errorf("While creating output collection: %v", err)
502         }
503
504         runner.OutputPDH = new(string)
505         *runner.OutputPDH = response.PortableDataHash
506
507         return nil
508 }
509
510 func (runner *ContainerRunner) CleanupDirs() {
511         if runner.ArvMount != nil {
512                 umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
513                 umnterr := umount.Run()
514                 if umnterr != nil {
515                         runner.CrunchLog.Printf("While running fusermount: %v", umnterr)
516                 }
517
518                 mnterr := <-runner.ArvMountExit
519                 if mnterr != nil {
520                         runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
521                 }
522         }
523
524         for _, tmpdir := range runner.CleanupTempDir {
525                 rmerr := os.RemoveAll(tmpdir)
526                 if rmerr != nil {
527                         runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
528                 }
529         }
530 }
531
532 // CommitLogs posts the collection containing the final container logs.
533 func (runner *ContainerRunner) CommitLogs() error {
534         runner.CrunchLog.Print(runner.finalState)
535         runner.CrunchLog.Close()
536
537         // Closing CrunchLog above allows it to be committed to Keep at this
538         // point, but re-open crunch log with ArvClient in case there are any
539         // other further (such as failing to write the log to Keep!) while
540         // shutting down
541         runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID,
542                 "crunch-run", nil})
543
544         mt, err := runner.LogCollection.ManifestText()
545         if err != nil {
546                 return fmt.Errorf("While creating log manifest: %v", err)
547         }
548
549         var response CollectionRecord
550         err = runner.ArvClient.Create("collections",
551                 arvadosclient.Dict{
552                         "collection": arvadosclient.Dict{
553                                 "name":          "logs for " + runner.ContainerRecord.UUID,
554                                 "manifest_text": mt}},
555                 &response)
556         if err != nil {
557                 return fmt.Errorf("While creating log collection: %v", err)
558         }
559
560         runner.LogsPDH = new(string)
561         *runner.LogsPDH = response.PortableDataHash
562
563         return nil
564 }
565
566 // UpdateContainerRecordRunning updates the container state to "Running"
567 func (runner *ContainerRunner) UpdateContainerRecordRunning() error {
568         return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID,
569                 arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
570 }
571
572 // UpdateContainerRecordComplete updates the container record state on API
573 // server to "Complete" or "Cancelled"
574 func (runner *ContainerRunner) UpdateContainerRecordComplete() error {
575         update := arvadosclient.Dict{}
576         if runner.LogsPDH != nil {
577                 update["log"] = *runner.LogsPDH
578         }
579         if runner.ExitCode != nil {
580                 update["exit_code"] = *runner.ExitCode
581         }
582         if runner.OutputPDH != nil {
583                 update["output"] = runner.OutputPDH
584         }
585
586         update["state"] = runner.finalState
587
588         return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, arvadosclient.Dict{"container": update}, nil)
589 }
590
591 // NewArvLogWriter creates an ArvLogWriter
592 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
593         return &ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID, name, runner.LogCollection.Open(name + ".txt")}
594 }
595
596 // Run the full container lifecycle.
597 func (runner *ContainerRunner) Run() (err error) {
598         runner.CrunchLog.Printf("Executing container '%s'", runner.ContainerRecord.UUID)
599
600         hostname, hosterr := os.Hostname()
601         if hosterr != nil {
602                 runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr)
603         } else {
604                 runner.CrunchLog.Printf("Executing on host '%s'", hostname)
605         }
606
607         var runerr, waiterr error
608
609         defer func() {
610                 if err != nil {
611                         runner.CrunchLog.Print(err)
612                 }
613
614                 if runner.Cancelled {
615                         runner.finalState = "Cancelled"
616                 } else {
617                         runner.finalState = "Complete"
618                 }
619
620                 // (6) capture output
621                 outputerr := runner.CaptureOutput()
622                 if outputerr != nil {
623                         runner.CrunchLog.Print(outputerr)
624                 }
625
626                 // (7) clean up temporary directories
627                 runner.CleanupDirs()
628
629                 // (8) write logs
630                 logerr := runner.CommitLogs()
631                 if logerr != nil {
632                         runner.CrunchLog.Print(logerr)
633                 }
634
635                 // (9) update container record with results
636                 updateerr := runner.UpdateContainerRecordComplete()
637                 if updateerr != nil {
638                         runner.CrunchLog.Print(updateerr)
639                 }
640
641                 runner.CrunchLog.Close()
642
643                 if err == nil {
644                         if runerr != nil {
645                                 err = runerr
646                         } else if waiterr != nil {
647                                 err = waiterr
648                         } else if logerr != nil {
649                                 err = logerr
650                         } else if updateerr != nil {
651                                 err = updateerr
652                         }
653                 }
654         }()
655
656         err = runner.ArvClient.Get("containers", runner.ContainerRecord.UUID, nil, &runner.ContainerRecord)
657         if err != nil {
658                 return fmt.Errorf("While getting container record: %v", err)
659         }
660
661         // (1) setup signal handling
662         runner.SetupSignals()
663
664         // (2) check for and/or load image
665         err = runner.LoadImage()
666         if err != nil {
667                 return fmt.Errorf("While loading container image: %v", err)
668         }
669
670         // (3) set up FUSE mount and binds
671         err = runner.SetupMounts()
672         if err != nil {
673                 return fmt.Errorf("While setting up mounts: %v", err)
674         }
675
676         // (3) create and start container
677         err = runner.StartContainer()
678         if err != nil {
679                 if err == ErrCancelled {
680                         err = nil
681                 }
682                 return
683         }
684
685         // (4) update container record state
686         err = runner.UpdateContainerRecordRunning()
687         if err != nil {
688                 runner.CrunchLog.Print(err)
689         }
690
691         // (5) wait for container to finish
692         waiterr = runner.WaitFinish()
693
694         return
695 }
696
697 // NewContainerRunner creates a new container runner.
698 func NewContainerRunner(api IArvadosClient,
699         kc IKeepClient,
700         docker ThinDockerClient,
701         containerUUID string) *ContainerRunner {
702
703         cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
704         cr.NewLogWriter = cr.NewArvLogWriter
705         cr.RunArvMount = cr.ArvMountCmd
706         cr.MkTempDir = ioutil.TempDir
707         cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
708         cr.ContainerRecord.UUID = containerUUID
709         cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
710         cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
711         return cr
712 }
713
714 func main() {
715         flag.Parse()
716
717         containerId := flag.Arg(0)
718
719         api, err := arvadosclient.MakeArvadosClient()
720         if err != nil {
721                 log.Fatalf("%s: %v", containerId, err)
722         }
723         api.Retries = 8
724
725         var kc *keepclient.KeepClient
726         kc, err = keepclient.MakeKeepClient(&api)
727         if err != nil {
728                 log.Fatalf("%s: %v", containerId, err)
729         }
730         kc.Retries = 4
731
732         var docker *dockerclient.DockerClient
733         docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
734         if err != nil {
735                 log.Fatalf("%s: %v", containerId, err)
736         }
737
738         cr := NewContainerRunner(api, kc, docker, containerId)
739
740         err = cr.Run()
741         if err != nil {
742                 log.Fatalf("%s: %v", containerId, err)
743         }
744
745 }