775b06d78e167110fc77a5628c79f9e2dd30f271
[arvados.git] / services / crunch-run / crunchrun.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/keepclient"
10         "git.curoverse.com/arvados.git/sdk/go/manifest"
11         "github.com/curoverse/dockerclient"
12         "io"
13         "io/ioutil"
14         "log"
15         "os"
16         "os/exec"
17         "os/signal"
18         "strings"
19         "sync"
20         "syscall"
21         "time"
22 )
23
24 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
25 type IArvadosClient interface {
26         Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
27         Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
28         Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
29 }
30
31 // ErrCancelled is the error returned when the container is cancelled.
32 var ErrCancelled = errors.New("Cancelled")
33
34 // IKeepClient is the minimal Keep API methods used by crunch-run.
35 type IKeepClient interface {
36         PutHB(hash string, buf []byte) (string, int, error)
37         ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
38 }
39
40 // Mount describes the mount points to create inside the container.
41 type Mount struct {
42         Kind             string `json:"kind"`
43         Writable         bool   `json:"writable"`
44         PortableDataHash string `json:"portable_data_hash"`
45         UUID             string `json:"uuid"`
46         DeviceType       string `json:"device_type"`
47 }
48
49 // Collection record returned by the API server.
50 type CollectionRecord struct {
51         ManifestText     string `json:"manifest_text"`
52         PortableDataHash string `json:"portable_data_hash"`
53 }
54
55 // ContainerRecord is the container record returned by the API server.
56 type ContainerRecord struct {
57         UUID               string                 `json:"uuid"`
58         Command            []string               `json:"command"`
59         ContainerImage     string                 `json:"container_image"`
60         Cwd                string                 `json:"cwd"`
61         Environment        map[string]string      `json:"environment"`
62         Mounts             map[string]Mount       `json:"mounts"`
63         OutputPath         string                 `json:"output_path"`
64         Priority           int                    `json:"priority"`
65         RuntimeConstraints map[string]interface{} `json:"runtime_constraints"`
66         State              string                 `json:"state"`
67         Output             string                 `json:"output"`
68 }
69
70 // NewLogWriter is a factory function to create a new log writer.
71 type NewLogWriter func(name string) io.WriteCloser
72
73 type RunArvMount func([]string) (*exec.Cmd, error)
74
75 type MkTempDir func(string, string) (string, error)
76
77 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
78 type ThinDockerClient interface {
79         StopContainer(id string, timeout int) error
80         InspectImage(id string) (*dockerclient.ImageInfo, error)
81         LoadImage(reader io.Reader) error
82         CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
83         StartContainer(id string, config *dockerclient.HostConfig) error
84         ContainerLogs(id string, options *dockerclient.LogOptions) (io.ReadCloser, error)
85         Wait(id string) <-chan dockerclient.WaitResult
86         RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
87 }
88
89 // ContainerRunner is the main stateful struct used for a single execution of a
90 // container.
91 type ContainerRunner struct {
92         Docker    ThinDockerClient
93         ArvClient IArvadosClient
94         Kc        IKeepClient
95         ContainerRecord
96         dockerclient.ContainerConfig
97         ContainerID string
98         ExitCode    *int
99         NewLogWriter
100         loggingDone   chan bool
101         CrunchLog     *ThrottledLogger
102         Stdout        *ThrottledLogger
103         Stderr        *ThrottledLogger
104         LogCollection *CollectionWriter
105         LogsPDH       *string
106         RunArvMount
107         MkTempDir
108         ArvMount       *exec.Cmd
109         ArvMountPoint  string
110         HostOutputDir  string
111         CleanupTempDir []string
112         Binds          []string
113         OutputPDH      *string
114         CancelLock     sync.Mutex
115         Cancelled      bool
116         SigChan        chan os.Signal
117         ArvMountExit   chan error
118         finalState     string
119 }
120
121 // SetupSignals sets up signal handling to gracefully terminate the underlying
122 // Docker container and update state when receiving a TERM, INT or QUIT signal.
123 func (runner *ContainerRunner) SetupSignals() error {
124         runner.SigChan = make(chan os.Signal, 1)
125         signal.Notify(runner.SigChan, syscall.SIGTERM)
126         signal.Notify(runner.SigChan, syscall.SIGINT)
127         signal.Notify(runner.SigChan, syscall.SIGQUIT)
128
129         go func(sig <-chan os.Signal) {
130                 for _ = range sig {
131                         if !runner.Cancelled {
132                                 runner.CancelLock.Lock()
133                                 runner.Cancelled = true
134                                 if runner.ContainerID != "" {
135                                         runner.Docker.StopContainer(runner.ContainerID, 10)
136                                 }
137                                 runner.CancelLock.Unlock()
138                         }
139                 }
140         }(runner.SigChan)
141
142         return nil
143 }
144
145 // LoadImage determines the docker image id from the container record and
146 // checks if it is available in the local Docker image store.  If not, it loads
147 // the image from Keep.
148 func (runner *ContainerRunner) LoadImage() (err error) {
149
150         runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.ContainerRecord.ContainerImage)
151
152         var collection CollectionRecord
153         err = runner.ArvClient.Get("collections", runner.ContainerRecord.ContainerImage, nil, &collection)
154         if err != nil {
155                 return fmt.Errorf("While getting container image collection: %v", err)
156         }
157         manifest := manifest.Manifest{Text: collection.ManifestText}
158         var img, imageID string
159         for ms := range manifest.StreamIter() {
160                 img = ms.FileStreamSegments[0].Name
161                 if !strings.HasSuffix(img, ".tar") {
162                         return fmt.Errorf("First file in the container image collection does not end in .tar")
163                 }
164                 imageID = img[:len(img)-4]
165         }
166
167         runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
168
169         _, err = runner.Docker.InspectImage(imageID)
170         if err != nil {
171                 runner.CrunchLog.Print("Loading Docker image from keep")
172
173                 var readCloser io.ReadCloser
174                 readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
175                 if err != nil {
176                         return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
177                 }
178
179                 err = runner.Docker.LoadImage(readCloser)
180                 if err != nil {
181                         return fmt.Errorf("While loading container image into Docker: %v", err)
182                 }
183         } else {
184                 runner.CrunchLog.Print("Docker image is available")
185         }
186
187         runner.ContainerConfig.Image = imageID
188
189         return nil
190 }
191
192 func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string) (c *exec.Cmd, err error) {
193         c = exec.Command("arv-mount", arvMountCmd...)
194         nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
195         c.Stdout = nt
196         c.Stderr = nt
197
198         err = c.Start()
199         if err != nil {
200                 return nil, err
201         }
202
203         statReadme := make(chan bool)
204         runner.ArvMountExit = make(chan error)
205
206         keepStatting := true
207         go func() {
208                 for keepStatting {
209                         time.Sleep(100 * time.Millisecond)
210                         _, err = os.Stat(fmt.Sprintf("%s/by_id/README", runner.ArvMountPoint))
211                         if err == nil {
212                                 keepStatting = false
213                                 statReadme <- true
214                         }
215                 }
216                 close(statReadme)
217         }()
218
219         go func() {
220                 runner.ArvMountExit <- c.Wait()
221                 close(runner.ArvMountExit)
222         }()
223
224         select {
225         case <-statReadme:
226                 break
227         case err := <-runner.ArvMountExit:
228                 runner.ArvMount = nil
229                 keepStatting = false
230                 return nil, err
231         }
232
233         return c, nil
234 }
235
236 func (runner *ContainerRunner) SetupMounts() (err error) {
237         runner.ArvMountPoint, err = runner.MkTempDir("", "keep")
238         if err != nil {
239                 return fmt.Errorf("While creating keep mount temp dir: %v", err)
240         }
241
242         runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
243
244         pdhOnly := true
245         tmpcount := 0
246         arvMountCmd := []string{"--foreground", "--allow-other"}
247         collectionPaths := []string{}
248         runner.Binds = nil
249
250         for bind, mnt := range runner.ContainerRecord.Mounts {
251                 if mnt.Kind == "collection" {
252                         var src string
253                         if mnt.UUID != "" && mnt.PortableDataHash != "" {
254                                 return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
255                         }
256                         if mnt.UUID != "" {
257                                 if mnt.Writable {
258                                         return fmt.Errorf("Writing to existing collections currently not permitted.")
259                                 }
260                                 pdhOnly = false
261                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
262                         } else if mnt.PortableDataHash != "" {
263                                 if mnt.Writable {
264                                         return fmt.Errorf("Can never write to a collection specified by portable data hash")
265                                 }
266                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
267                         } else {
268                                 src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
269                                 arvMountCmd = append(arvMountCmd, "--mount-tmp")
270                                 arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
271                                 tmpcount += 1
272                         }
273                         if mnt.Writable {
274                                 if bind == runner.ContainerRecord.OutputPath {
275                                         runner.HostOutputDir = src
276                                 }
277                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
278                         } else {
279                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
280                         }
281                         collectionPaths = append(collectionPaths, src)
282                 } else if mnt.Kind == "tmp" {
283                         if bind == runner.ContainerRecord.OutputPath {
284                                 runner.HostOutputDir, err = runner.MkTempDir("", "")
285                                 if err != nil {
286                                         return fmt.Errorf("While creating mount temp dir: %v", err)
287                                 }
288                                 st, staterr := os.Stat(runner.HostOutputDir)
289                                 if staterr != nil {
290                                         return fmt.Errorf("While Stat on temp dir: %v", staterr)
291                                 }
292                                 err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid)
293                                 if staterr != nil {
294                                         return fmt.Errorf("While Chmod temp dir: %v", err)
295                                 }
296                                 runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
297                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
298                         } else {
299                                 runner.Binds = append(runner.Binds, bind)
300                         }
301                 } else {
302                         return fmt.Errorf("Unknown mount kind '%s'", mnt.Kind)
303                 }
304         }
305
306         if runner.HostOutputDir == "" {
307                 return fmt.Errorf("Output path does not correspond to a writable mount point")
308         }
309
310         if pdhOnly {
311                 arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
312         } else {
313                 arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
314         }
315         arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
316
317         runner.ArvMount, err = runner.RunArvMount(arvMountCmd)
318         if err != nil {
319                 return fmt.Errorf("While trying to start arv-mount: %v", err)
320         }
321
322         for _, p := range collectionPaths {
323                 _, err = os.Stat(p)
324                 if err != nil {
325                         return fmt.Errorf("While checking that input files exist: %v", err)
326                 }
327         }
328
329         return nil
330 }
331
332 // StartContainer creates the container and runs it.
333 func (runner *ContainerRunner) StartContainer() (err error) {
334         runner.CrunchLog.Print("Creating Docker container")
335
336         runner.CancelLock.Lock()
337         defer runner.CancelLock.Unlock()
338
339         if runner.Cancelled {
340                 return ErrCancelled
341         }
342
343         runner.ContainerConfig.Cmd = runner.ContainerRecord.Command
344         if runner.ContainerRecord.Cwd != "." {
345                 runner.ContainerConfig.WorkingDir = runner.ContainerRecord.Cwd
346         }
347         for k, v := range runner.ContainerRecord.Environment {
348                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
349         }
350         runner.ContainerConfig.NetworkDisabled = true
351         runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
352         if err != nil {
353                 return fmt.Errorf("While creating container: %v", err)
354         }
355         hostConfig := &dockerclient.HostConfig{Binds: runner.Binds}
356
357         runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
358         err = runner.Docker.StartContainer(runner.ContainerID, hostConfig)
359         if err != nil {
360                 return fmt.Errorf("While starting container: %v", err)
361         }
362
363         return nil
364 }
365
366 func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
367         // Handle docker log protocol
368         // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
369
370         header := make([]byte, 8)
371         for {
372                 _, readerr := io.ReadAtLeast(containerReader, header, 8)
373
374                 if readerr == nil {
375                         readsize := int64(header[4]) | (int64(header[5]) << 8) | (int64(header[6]) << 16) | (int64(header[7]) << 24)
376                         if header[0] == 1 {
377                                 // stdout
378                                 _, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
379                         } else {
380                                 // stderr
381                                 _, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
382                         }
383                 }
384
385                 if readerr != nil {
386                         if readerr != io.EOF {
387                                 runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
388                         }
389
390                         closeerr := runner.Stdout.Close()
391                         if closeerr != nil {
392                                 runner.CrunchLog.Printf("While closing stdout logs: %v", readerr)
393                         }
394
395                         closeerr = runner.Stderr.Close()
396                         if closeerr != nil {
397                                 runner.CrunchLog.Printf("While closing stderr logs: %v", readerr)
398                         }
399
400                         runner.loggingDone <- true
401                         close(runner.loggingDone)
402                         return
403                 }
404         }
405 }
406
407 // AttachLogs connects the docker container stdout and stderr logs to the
408 // Arvados logger which logs to Keep and the API server logs table.
409 func (runner *ContainerRunner) AttachLogs() (err error) {
410
411         runner.CrunchLog.Print("Attaching container logs")
412
413         var containerReader io.Reader
414         containerReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stdout: true, Stderr: true})
415         if err != nil {
416                 return fmt.Errorf("While attaching container logs: %v", err)
417         }
418
419         runner.loggingDone = make(chan bool)
420
421         runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
422         runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
423
424         go runner.ProcessDockerAttach(containerReader)
425
426         return nil
427 }
428
429 // WaitFinish waits for the container to terminate, capture the exit code, and
430 // close the stdout/stderr logging.
431 func (runner *ContainerRunner) WaitFinish() error {
432         result := runner.Docker.Wait(runner.ContainerID)
433         wr := <-result
434         if wr.Error != nil {
435                 return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
436         }
437         runner.ExitCode = &wr.ExitCode
438
439         // wait for stdout/stderr to complete
440         <-runner.loggingDone
441
442         return nil
443 }
444
445 // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
446 func (runner *ContainerRunner) CaptureOutput() error {
447         if runner.finalState != "Complete" {
448                 return nil
449         }
450
451         if runner.HostOutputDir == "" {
452                 return nil
453         }
454
455         _, err := os.Stat(runner.HostOutputDir)
456         if err != nil {
457                 return fmt.Errorf("While checking host output path: %v", err)
458         }
459
460         var manifestText string
461
462         collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
463         _, err = os.Stat(collectionMetafile)
464         if err != nil {
465                 // Regular directory
466                 cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}}
467                 manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
468                 if err != nil {
469                         return fmt.Errorf("While uploading output files: %v", err)
470                 }
471         } else {
472                 // FUSE mount directory
473                 file, openerr := os.Open(collectionMetafile)
474                 if openerr != nil {
475                         return fmt.Errorf("While opening FUSE metafile: %v", err)
476                 }
477                 defer file.Close()
478
479                 rec := CollectionRecord{}
480                 err = json.NewDecoder(file).Decode(&rec)
481                 if err != nil {
482                         return fmt.Errorf("While reading FUSE metafile: %v", err)
483                 }
484                 manifestText = rec.ManifestText
485         }
486
487         var response CollectionRecord
488         err = runner.ArvClient.Create("collections",
489                 arvadosclient.Dict{
490                         "collection": arvadosclient.Dict{
491                                 "manifest_text": manifestText}},
492                 &response)
493         if err != nil {
494                 return fmt.Errorf("While creating output collection: %v", err)
495         }
496
497         runner.OutputPDH = new(string)
498         *runner.OutputPDH = response.PortableDataHash
499
500         return nil
501 }
502
503 func (runner *ContainerRunner) CleanupDirs() {
504         if runner.ArvMount != nil {
505                 umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
506                 umnterr := umount.Run()
507                 if umnterr != nil {
508                         runner.CrunchLog.Printf("While running fusermount: %v", umnterr)
509                 }
510
511                 mnterr := <-runner.ArvMountExit
512                 if mnterr != nil {
513                         runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
514                 }
515         }
516
517         for _, tmpdir := range runner.CleanupTempDir {
518                 rmerr := os.RemoveAll(tmpdir)
519                 runner.CrunchLog.Printf("While cleaning up temporary directories: %v", rmerr)
520         }
521 }
522
523 // CommitLogs posts the collection containing the final container logs.
524 func (runner *ContainerRunner) CommitLogs() error {
525         runner.CrunchLog.Print(runner.finalState)
526         runner.CrunchLog.Close()
527
528         // Closing CrunchLog above allows it to be committed to Keep at this
529         // point, but re-open crunch log with ArvClient in case there are any
530         // other further (such as failing to write the log to Keep!) while
531         // shutting down
532         runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID,
533                 "crunch-run", nil})
534
535         mt, err := runner.LogCollection.ManifestText()
536         if err != nil {
537                 return fmt.Errorf("While creating log manifest: %v", err)
538         }
539
540         var response CollectionRecord
541         err = runner.ArvClient.Create("collections",
542                 arvadosclient.Dict{
543                         "collection": arvadosclient.Dict{
544                                 "name":          "logs for " + runner.ContainerRecord.UUID,
545                                 "manifest_text": mt}},
546                 &response)
547         if err != nil {
548                 return fmt.Errorf("While creating log collection: %v", err)
549         }
550
551         runner.LogsPDH = new(string)
552         *runner.LogsPDH = response.PortableDataHash
553
554         return nil
555 }
556
557 // UpdateContainerRecordRunning updates the container state to "Running"
558 func (runner *ContainerRunner) UpdateContainerRecordRunning() error {
559         return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID,
560                 arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
561 }
562
563 // UpdateContainerRecordComplete updates the container record state on API
564 // server to "Complete" or "Cancelled"
565 func (runner *ContainerRunner) UpdateContainerRecordComplete() error {
566         update := arvadosclient.Dict{}
567         if runner.LogsPDH != nil {
568                 update["log"] = *runner.LogsPDH
569         }
570         if runner.ExitCode != nil {
571                 update["exit_code"] = *runner.ExitCode
572         }
573         if runner.OutputPDH != nil {
574                 update["output"] = runner.OutputPDH
575         }
576
577         update["state"] = runner.finalState
578
579         return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, arvadosclient.Dict{"container": update}, nil)
580 }
581
582 // NewArvLogWriter creates an ArvLogWriter
583 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
584         return &ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID, name, runner.LogCollection.Open(name + ".txt")}
585 }
586
587 // Run the full container lifecycle.
588 func (runner *ContainerRunner) Run() (err error) {
589         runner.CrunchLog.Printf("Executing container '%s'", runner.ContainerRecord.UUID)
590
591         var runerr, waiterr error
592
593         defer func() {
594                 if err != nil {
595                         runner.CrunchLog.Print(err)
596                 }
597
598                 if runner.Cancelled {
599                         runner.finalState = "Cancelled"
600                 } else {
601                         runner.finalState = "Complete"
602                 }
603
604                 // (7) capture output
605                 outputerr := runner.CaptureOutput()
606                 if outputerr != nil {
607                         runner.CrunchLog.Print(outputerr)
608                 }
609
610                 // (8) clean up temporary directories
611                 runner.CleanupDirs()
612
613                 // (9) write logs
614                 logerr := runner.CommitLogs()
615                 if logerr != nil {
616                         runner.CrunchLog.Print(logerr)
617                 }
618
619                 // (10) update container record with results
620                 updateerr := runner.UpdateContainerRecordComplete()
621                 if updateerr != nil {
622                         runner.CrunchLog.Print(updateerr)
623                 }
624
625                 runner.CrunchLog.Close()
626
627                 if err == nil {
628                         if runerr != nil {
629                                 err = runerr
630                         } else if waiterr != nil {
631                                 err = runerr
632                         } else if logerr != nil {
633                                 err = logerr
634                         } else if updateerr != nil {
635                                 err = updateerr
636                         }
637                 }
638         }()
639
640         err = runner.ArvClient.Get("containers", runner.ContainerRecord.UUID, nil, &runner.ContainerRecord)
641         if err != nil {
642                 return fmt.Errorf("While getting container record: %v", err)
643         }
644
645         // (1) setup signal handling
646         err = runner.SetupSignals()
647         if err != nil {
648                 return fmt.Errorf("While setting up signal handling: %v", err)
649         }
650
651         // (2) check for and/or load image
652         err = runner.LoadImage()
653         if err != nil {
654                 return fmt.Errorf("While loading container image: %v", err)
655         }
656
657         // (3) set up FUSE mount and binds
658         err = runner.SetupMounts()
659         if err != nil {
660                 return fmt.Errorf("While setting up mounts: %v", err)
661         }
662
663         // (3) create and start container
664         err = runner.StartContainer()
665         if err != nil {
666                 if err == ErrCancelled {
667                         err = nil
668                 }
669                 return
670         }
671
672         // (4) update container record state
673         err = runner.UpdateContainerRecordRunning()
674         if err != nil {
675                 runner.CrunchLog.Print(err)
676         }
677
678         // (5) attach container logs
679         runerr = runner.AttachLogs()
680         if runerr != nil {
681                 runner.CrunchLog.Print(runerr)
682         }
683
684         // (6) wait for container to finish
685         waiterr = runner.WaitFinish()
686
687         return
688 }
689
690 // NewContainerRunner creates a new container runner.
691 func NewContainerRunner(api IArvadosClient,
692         kc IKeepClient,
693         docker ThinDockerClient,
694         containerUUID string) *ContainerRunner {
695
696         cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
697         cr.NewLogWriter = cr.NewArvLogWriter
698         cr.RunArvMount = cr.ArvMountCmd
699         cr.MkTempDir = ioutil.TempDir
700         cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
701         cr.ContainerRecord.UUID = containerUUID
702         cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
703         return cr
704 }
705
706 func main() {
707         flag.Parse()
708
709         api, err := arvadosclient.MakeArvadosClient()
710         if err != nil {
711                 log.Fatalf("%s: %v", flag.Arg(0), err)
712         }
713         api.Retries = 8
714
715         var kc *keepclient.KeepClient
716         kc, err = keepclient.MakeKeepClient(&api)
717         if err != nil {
718                 log.Fatalf("%s: %v", flag.Arg(0), err)
719         }
720         kc.Retries = 4
721
722         var docker *dockerclient.DockerClient
723         docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
724         if err != nil {
725                 log.Fatalf("%s: %v", flag.Arg(0), err)
726         }
727
728         cr := NewContainerRunner(api, kc, docker, flag.Arg(0))
729
730         err = cr.Run()
731         if err != nil {
732                 log.Fatalf("%s: %v", flag.Arg(0), err)
733         }
734
735 }