9374: Use arvados.Collection instead of own CollectionRecord.
[arvados.git] / services / crunch-run / crunchrun.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvados"
9         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
10         "git.curoverse.com/arvados.git/sdk/go/keepclient"
11         "git.curoverse.com/arvados.git/sdk/go/manifest"
12         "github.com/curoverse/dockerclient"
13         "io"
14         "io/ioutil"
15         "log"
16         "os"
17         "os/exec"
18         "os/signal"
19         "path"
20         "strings"
21         "sync"
22         "syscall"
23         "time"
24 )
25
26 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
27 type IArvadosClient interface {
28         Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
29         Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
30         Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
31         Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) (err error)
32 }
33
34 // ErrCancelled is the error returned when the container is cancelled.
35 var ErrCancelled = errors.New("Cancelled")
36
37 // IKeepClient is the minimal Keep API methods used by crunch-run.
38 type IKeepClient interface {
39         PutHB(hash string, buf []byte) (string, int, error)
40         ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
41 }
42
43 // NewLogWriter is a factory function to create a new log writer.
44 type NewLogWriter func(name string) io.WriteCloser
45
46 type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
47
48 type MkTempDir func(string, string) (string, error)
49
50 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
51 type ThinDockerClient interface {
52         StopContainer(id string, timeout int) error
53         InspectImage(id string) (*dockerclient.ImageInfo, error)
54         LoadImage(reader io.Reader) error
55         CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
56         StartContainer(id string, config *dockerclient.HostConfig) error
57         AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error)
58         Wait(id string) <-chan dockerclient.WaitResult
59         RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
60 }
61
62 // ContainerRunner is the main stateful struct used for a single execution of a
63 // container.
64 type ContainerRunner struct {
65         Docker    ThinDockerClient
66         ArvClient IArvadosClient
67         Kc        IKeepClient
68         arvados.Container
69         dockerclient.ContainerConfig
70         dockerclient.HostConfig
71         token       string
72         ContainerID string
73         ExitCode    *int
74         NewLogWriter
75         loggingDone   chan bool
76         CrunchLog     *ThrottledLogger
77         Stdout        io.WriteCloser
78         Stderr        *ThrottledLogger
79         LogCollection *CollectionWriter
80         LogsPDH       *string
81         RunArvMount
82         MkTempDir
83         ArvMount       *exec.Cmd
84         ArvMountPoint  string
85         HostOutputDir  string
86         CleanupTempDir []string
87         Binds          []string
88         OutputPDH      *string
89         CancelLock     sync.Mutex
90         Cancelled      bool
91         SigChan        chan os.Signal
92         ArvMountExit   chan error
93         finalState     string
94 }
95
96 // SetupSignals sets up signal handling to gracefully terminate the underlying
97 // Docker container and update state when receiving a TERM, INT or QUIT signal.
98 func (runner *ContainerRunner) SetupSignals() {
99         runner.SigChan = make(chan os.Signal, 1)
100         signal.Notify(runner.SigChan, syscall.SIGTERM)
101         signal.Notify(runner.SigChan, syscall.SIGINT)
102         signal.Notify(runner.SigChan, syscall.SIGQUIT)
103
104         go func(sig <-chan os.Signal) {
105                 for _ = range sig {
106                         if !runner.Cancelled {
107                                 runner.CancelLock.Lock()
108                                 runner.Cancelled = true
109                                 if runner.ContainerID != "" {
110                                         runner.Docker.StopContainer(runner.ContainerID, 10)
111                                 }
112                                 runner.CancelLock.Unlock()
113                         }
114                 }
115         }(runner.SigChan)
116 }
117
118 // LoadImage determines the docker image id from the container record and
119 // checks if it is available in the local Docker image store.  If not, it loads
120 // the image from Keep.
121 func (runner *ContainerRunner) LoadImage() (err error) {
122
123         runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
124
125         var collection arvados.Collection
126         err = runner.ArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
127         if err != nil {
128                 return fmt.Errorf("While getting container image collection: %v", err)
129         }
130         manifest := manifest.Manifest{Text: collection.ManifestText}
131         var img, imageID string
132         for ms := range manifest.StreamIter() {
133                 img = ms.FileStreamSegments[0].Name
134                 if !strings.HasSuffix(img, ".tar") {
135                         return fmt.Errorf("First file in the container image collection does not end in .tar")
136                 }
137                 imageID = img[:len(img)-4]
138         }
139
140         runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
141
142         _, err = runner.Docker.InspectImage(imageID)
143         if err != nil {
144                 runner.CrunchLog.Print("Loading Docker image from keep")
145
146                 var readCloser io.ReadCloser
147                 readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
148                 if err != nil {
149                         return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
150                 }
151
152                 err = runner.Docker.LoadImage(readCloser)
153                 if err != nil {
154                         return fmt.Errorf("While loading container image into Docker: %v", err)
155                 }
156         } else {
157                 runner.CrunchLog.Print("Docker image is available")
158         }
159
160         runner.ContainerConfig.Image = imageID
161
162         return nil
163 }
164
165 func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) {
166         c = exec.Command("arv-mount", arvMountCmd...)
167
168         // Copy our environment, but override ARVADOS_API_TOKEN with
169         // the container auth token.
170         c.Env = nil
171         for _, s := range os.Environ() {
172                 if !strings.HasPrefix(s, "ARVADOS_API_TOKEN=") {
173                         c.Env = append(c.Env, s)
174                 }
175         }
176         c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
177
178         nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
179         c.Stdout = nt
180         c.Stderr = nt
181
182         err = c.Start()
183         if err != nil {
184                 return nil, err
185         }
186
187         statReadme := make(chan bool)
188         runner.ArvMountExit = make(chan error)
189
190         keepStatting := true
191         go func() {
192                 for keepStatting {
193                         time.Sleep(100 * time.Millisecond)
194                         _, err = os.Stat(fmt.Sprintf("%s/by_id/README", runner.ArvMountPoint))
195                         if err == nil {
196                                 keepStatting = false
197                                 statReadme <- true
198                         }
199                 }
200                 close(statReadme)
201         }()
202
203         go func() {
204                 runner.ArvMountExit <- c.Wait()
205                 close(runner.ArvMountExit)
206         }()
207
208         select {
209         case <-statReadme:
210                 break
211         case err := <-runner.ArvMountExit:
212                 runner.ArvMount = nil
213                 keepStatting = false
214                 return nil, err
215         }
216
217         return c, nil
218 }
219
220 func (runner *ContainerRunner) SetupMounts() (err error) {
221         runner.ArvMountPoint, err = runner.MkTempDir("", "keep")
222         if err != nil {
223                 return fmt.Errorf("While creating keep mount temp dir: %v", err)
224         }
225
226         runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
227
228         pdhOnly := true
229         tmpcount := 0
230         arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
231         collectionPaths := []string{}
232         runner.Binds = nil
233
234         for bind, mnt := range runner.Container.Mounts {
235                 if bind == "stdout" {
236                         // Is it a "file" mount kind?
237                         if mnt.Kind != "file" {
238                                 return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
239                         }
240
241                         // Does path start with OutputPath?
242                         prefix := runner.Container.OutputPath
243                         if !strings.HasSuffix(prefix, "/") {
244                                 prefix += "/"
245                         }
246                         if !strings.HasPrefix(mnt.Path, prefix) {
247                                 return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
248                         }
249                 }
250
251                 if mnt.Kind == "collection" {
252                         var src string
253                         if mnt.UUID != "" && mnt.PortableDataHash != "" {
254                                 return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
255                         }
256                         if mnt.UUID != "" {
257                                 if mnt.Writable {
258                                         return fmt.Errorf("Writing to existing collections currently not permitted.")
259                                 }
260                                 pdhOnly = false
261                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
262                         } else if mnt.PortableDataHash != "" {
263                                 if mnt.Writable {
264                                         return fmt.Errorf("Can never write to a collection specified by portable data hash")
265                                 }
266                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
267                         } else {
268                                 src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
269                                 arvMountCmd = append(arvMountCmd, "--mount-tmp")
270                                 arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
271                                 tmpcount += 1
272                         }
273                         if mnt.Writable {
274                                 if bind == runner.Container.OutputPath {
275                                         runner.HostOutputDir = src
276                                 }
277                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
278                         } else {
279                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
280                         }
281                         collectionPaths = append(collectionPaths, src)
282                 } else if mnt.Kind == "tmp" {
283                         if bind == runner.Container.OutputPath {
284                                 runner.HostOutputDir, err = runner.MkTempDir("", "")
285                                 if err != nil {
286                                         return fmt.Errorf("While creating mount temp dir: %v", err)
287                                 }
288                                 st, staterr := os.Stat(runner.HostOutputDir)
289                                 if staterr != nil {
290                                         return fmt.Errorf("While Stat on temp dir: %v", staterr)
291                                 }
292                                 err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
293                                 if staterr != nil {
294                                         return fmt.Errorf("While Chmod temp dir: %v", err)
295                                 }
296                                 runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
297                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
298                         } else {
299                                 runner.Binds = append(runner.Binds, bind)
300                         }
301                 }
302         }
303
304         if runner.HostOutputDir == "" {
305                 return fmt.Errorf("Output path does not correspond to a writable mount point")
306         }
307
308         if pdhOnly {
309                 arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
310         } else {
311                 arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
312         }
313         arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
314
315         token, err := runner.ContainerToken()
316         if err != nil {
317                 return fmt.Errorf("could not get container token: %s", err)
318         }
319
320         runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
321         if err != nil {
322                 return fmt.Errorf("While trying to start arv-mount: %v", err)
323         }
324
325         for _, p := range collectionPaths {
326                 _, err = os.Stat(p)
327                 if err != nil {
328                         return fmt.Errorf("While checking that input files exist: %v", err)
329                 }
330         }
331
332         return nil
333 }
334
335 func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
336         // Handle docker log protocol
337         // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
338
339         header := make([]byte, 8)
340         for {
341                 _, readerr := io.ReadAtLeast(containerReader, header, 8)
342
343                 if readerr == nil {
344                         readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
345                         if header[0] == 1 {
346                                 // stdout
347                                 _, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
348                         } else {
349                                 // stderr
350                                 _, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
351                         }
352                 }
353
354                 if readerr != nil {
355                         if readerr != io.EOF {
356                                 runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
357                         }
358
359                         closeerr := runner.Stdout.Close()
360                         if closeerr != nil {
361                                 runner.CrunchLog.Printf("While closing stdout logs: %v", closeerr)
362                         }
363
364                         closeerr = runner.Stderr.Close()
365                         if closeerr != nil {
366                                 runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
367                         }
368
369                         runner.loggingDone <- true
370                         close(runner.loggingDone)
371                         return
372                 }
373         }
374 }
375
376 // AttachLogs connects the docker container stdout and stderr logs to the
377 // Arvados logger which logs to Keep and the API server logs table.
378 func (runner *ContainerRunner) AttachStreams() (err error) {
379
380         runner.CrunchLog.Print("Attaching container streams")
381
382         var containerReader io.Reader
383         containerReader, err = runner.Docker.AttachContainer(runner.ContainerID,
384                 &dockerclient.AttachOptions{Stream: true, Stdout: true, Stderr: true})
385         if err != nil {
386                 return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
387         }
388
389         runner.loggingDone = make(chan bool)
390
391         if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
392                 stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):]
393                 index := strings.LastIndex(stdoutPath, "/")
394                 if index > 0 {
395                         subdirs := stdoutPath[:index]
396                         if subdirs != "" {
397                                 st, err := os.Stat(runner.HostOutputDir)
398                                 if err != nil {
399                                         return fmt.Errorf("While Stat on temp dir: %v", err)
400                                 }
401                                 stdoutPath := path.Join(runner.HostOutputDir, subdirs)
402                                 err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
403                                 if err != nil {
404                                         return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
405                                 }
406                         }
407                 }
408                 stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
409                 if err != nil {
410                         return fmt.Errorf("While creating stdout file: %v", err)
411                 }
412                 runner.Stdout = stdoutFile
413         } else {
414                 runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
415         }
416         runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
417
418         go runner.ProcessDockerAttach(containerReader)
419
420         return nil
421 }
422
423 // CreateContainer creates the docker container.
424 func (runner *ContainerRunner) CreateContainer() error {
425         runner.CrunchLog.Print("Creating Docker container")
426
427         runner.ContainerConfig.Cmd = runner.Container.Command
428         if runner.Container.Cwd != "." {
429                 runner.ContainerConfig.WorkingDir = runner.Container.Cwd
430         }
431
432         for k, v := range runner.Container.Environment {
433                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
434         }
435         if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
436                 tok, err := runner.ContainerToken()
437                 if err != nil {
438                         return err
439                 }
440                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env,
441                         "ARVADOS_API_TOKEN="+tok,
442                         "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
443                         "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
444                 )
445                 runner.ContainerConfig.NetworkDisabled = false
446         } else {
447                 runner.ContainerConfig.NetworkDisabled = true
448         }
449
450         var err error
451         runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
452         if err != nil {
453                 return fmt.Errorf("While creating container: %v", err)
454         }
455
456         runner.HostConfig = dockerclient.HostConfig{Binds: runner.Binds,
457                 LogConfig: dockerclient.LogConfig{Type: "none"}}
458
459         return runner.AttachStreams()
460 }
461
462 // StartContainer starts the docker container created by CreateContainer.
463 func (runner *ContainerRunner) StartContainer() error {
464         runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
465         err := runner.Docker.StartContainer(runner.ContainerID, &runner.HostConfig)
466         if err != nil {
467                 return fmt.Errorf("could not start container: %v", err)
468         }
469         return nil
470 }
471
472 // WaitFinish waits for the container to terminate, capture the exit code, and
473 // close the stdout/stderr logging.
474 func (runner *ContainerRunner) WaitFinish() error {
475         runner.CrunchLog.Print("Waiting for container to finish")
476
477         result := runner.Docker.Wait(runner.ContainerID)
478         wr := <-result
479         if wr.Error != nil {
480                 return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
481         }
482         runner.ExitCode = &wr.ExitCode
483
484         // wait for stdout/stderr to complete
485         <-runner.loggingDone
486
487         return nil
488 }
489
490 // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
491 func (runner *ContainerRunner) CaptureOutput() error {
492         if runner.finalState != "Complete" {
493                 return nil
494         }
495
496         if runner.HostOutputDir == "" {
497                 return nil
498         }
499
500         _, err := os.Stat(runner.HostOutputDir)
501         if err != nil {
502                 return fmt.Errorf("While checking host output path: %v", err)
503         }
504
505         var manifestText string
506
507         collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
508         _, err = os.Stat(collectionMetafile)
509         if err != nil {
510                 // Regular directory
511                 cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}}
512                 manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
513                 if err != nil {
514                         return fmt.Errorf("While uploading output files: %v", err)
515                 }
516         } else {
517                 // FUSE mount directory
518                 file, openerr := os.Open(collectionMetafile)
519                 if openerr != nil {
520                         return fmt.Errorf("While opening FUSE metafile: %v", err)
521                 }
522                 defer file.Close()
523
524                 var rec arvados.Collection
525                 err = json.NewDecoder(file).Decode(&rec)
526                 if err != nil {
527                         return fmt.Errorf("While reading FUSE metafile: %v", err)
528                 }
529                 manifestText = rec.ManifestText
530         }
531
532         var response arvados.Collection
533         err = runner.ArvClient.Create("collections",
534                 arvadosclient.Dict{
535                         "collection": arvadosclient.Dict{
536                                 "manifest_text": manifestText}},
537                 &response)
538         if err != nil {
539                 return fmt.Errorf("While creating output collection: %v", err)
540         }
541
542         runner.OutputPDH = new(string)
543         *runner.OutputPDH = response.PortableDataHash
544
545         return nil
546 }
547
548 func (runner *ContainerRunner) CleanupDirs() {
549         if runner.ArvMount != nil {
550                 umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
551                 umnterr := umount.Run()
552                 if umnterr != nil {
553                         runner.CrunchLog.Printf("While running fusermount: %v", umnterr)
554                 }
555
556                 mnterr := <-runner.ArvMountExit
557                 if mnterr != nil {
558                         runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
559                 }
560         }
561
562         for _, tmpdir := range runner.CleanupTempDir {
563                 rmerr := os.RemoveAll(tmpdir)
564                 if rmerr != nil {
565                         runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
566                 }
567         }
568 }
569
570 // CommitLogs posts the collection containing the final container logs.
571 func (runner *ContainerRunner) CommitLogs() error {
572         runner.CrunchLog.Print(runner.finalState)
573         runner.CrunchLog.Close()
574
575         // Closing CrunchLog above allows it to be committed to Keep at this
576         // point, but re-open crunch log with ArvClient in case there are any
577         // other further (such as failing to write the log to Keep!) while
578         // shutting down
579         runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID,
580                 "crunch-run", nil})
581
582         if runner.LogsPDH != nil {
583                 // If we have already assigned something to LogsPDH,
584                 // we must be closing the re-opened log, which won't
585                 // end up getting attached to the container record and
586                 // therefore doesn't need to be saved as a collection
587                 // -- it exists only to send logs to other channels.
588                 return nil
589         }
590
591         mt, err := runner.LogCollection.ManifestText()
592         if err != nil {
593                 return fmt.Errorf("While creating log manifest: %v", err)
594         }
595
596         var response arvados.Collection
597         err = runner.ArvClient.Create("collections",
598                 arvadosclient.Dict{
599                         "collection": arvadosclient.Dict{
600                                 "name":          "logs for " + runner.Container.UUID,
601                                 "manifest_text": mt}},
602                 &response)
603         if err != nil {
604                 return fmt.Errorf("While creating log collection: %v", err)
605         }
606
607         runner.LogsPDH = &response.PortableDataHash
608
609         return nil
610 }
611
612 // UpdateContainerRunning updates the container state to "Running"
613 func (runner *ContainerRunner) UpdateContainerRunning() error {
614         runner.CancelLock.Lock()
615         defer runner.CancelLock.Unlock()
616         if runner.Cancelled {
617                 return ErrCancelled
618         }
619         return runner.ArvClient.Update("containers", runner.Container.UUID,
620                 arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
621 }
622
623 // ContainerToken returns the api_token the container (and any
624 // arv-mount processes) are allowed to use.
625 func (runner *ContainerRunner) ContainerToken() (string, error) {
626         if runner.token != "" {
627                 return runner.token, nil
628         }
629
630         var auth arvados.APIClientAuthorization
631         err := runner.ArvClient.Call("GET", "containers", runner.Container.UUID, "auth", nil, &auth)
632         if err != nil {
633                 return "", err
634         }
635         runner.token = auth.APIToken
636         return runner.token, nil
637 }
638
639 // UpdateContainerComplete updates the container record state on API
640 // server to "Complete" or "Cancelled"
641 func (runner *ContainerRunner) UpdateContainerFinal() error {
642         update := arvadosclient.Dict{}
643         update["state"] = runner.finalState
644         if runner.finalState == "Complete" {
645                 if runner.LogsPDH != nil {
646                         update["log"] = *runner.LogsPDH
647                 }
648                 if runner.ExitCode != nil {
649                         update["exit_code"] = *runner.ExitCode
650                 }
651                 if runner.OutputPDH != nil {
652                         update["output"] = *runner.OutputPDH
653                 }
654         }
655         return runner.ArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
656 }
657
658 // IsCancelled returns the value of Cancelled, with goroutine safety.
659 func (runner *ContainerRunner) IsCancelled() bool {
660         runner.CancelLock.Lock()
661         defer runner.CancelLock.Unlock()
662         return runner.Cancelled
663 }
664
665 // NewArvLogWriter creates an ArvLogWriter
666 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
667         return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")}
668 }
669
670 // Run the full container lifecycle.
671 func (runner *ContainerRunner) Run() (err error) {
672         runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
673
674         hostname, hosterr := os.Hostname()
675         if hosterr != nil {
676                 runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr)
677         } else {
678                 runner.CrunchLog.Printf("Executing on host '%s'", hostname)
679         }
680
681         // Clean up temporary directories _after_ finalizing
682         // everything (if we've made any by then)
683         defer runner.CleanupDirs()
684
685         runner.finalState = "Queued"
686
687         defer func() {
688                 // checkErr prints e (unless it's nil) and sets err to
689                 // e (unless err is already non-nil). Thus, if err
690                 // hasn't already been assigned when Run() returns,
691                 // this cleanup func will cause Run() to return the
692                 // first non-nil error that is passed to checkErr().
693                 checkErr := func(e error) {
694                         if e == nil {
695                                 return
696                         }
697                         runner.CrunchLog.Print(e)
698                         if err == nil {
699                                 err = e
700                         }
701                 }
702
703                 // Log the error encountered in Run(), if any
704                 checkErr(err)
705
706                 if runner.finalState == "Queued" {
707                         runner.UpdateContainerFinal()
708                         return
709                 }
710
711                 if runner.IsCancelled() {
712                         runner.finalState = "Cancelled"
713                         // but don't return yet -- we still want to
714                         // capture partial output and write logs
715                 }
716
717                 checkErr(runner.CaptureOutput())
718                 checkErr(runner.CommitLogs())
719                 checkErr(runner.UpdateContainerFinal())
720
721                 // The real log is already closed, but then we opened
722                 // a new one in case we needed to log anything while
723                 // finalizing.
724                 runner.CrunchLog.Close()
725         }()
726
727         err = runner.ArvClient.Get("containers", runner.Container.UUID, nil, &runner.Container)
728         if err != nil {
729                 err = fmt.Errorf("While getting container record: %v", err)
730                 return
731         }
732
733         // setup signal handling
734         runner.SetupSignals()
735
736         // check for and/or load image
737         err = runner.LoadImage()
738         if err != nil {
739                 err = fmt.Errorf("While loading container image: %v", err)
740                 return
741         }
742
743         // set up FUSE mount and binds
744         err = runner.SetupMounts()
745         if err != nil {
746                 err = fmt.Errorf("While setting up mounts: %v", err)
747                 return
748         }
749
750         err = runner.CreateContainer()
751         if err != nil {
752                 return
753         }
754
755         if runner.IsCancelled() {
756                 return
757         }
758
759         err = runner.UpdateContainerRunning()
760         if err != nil {
761                 return
762         }
763         runner.finalState = "Cancelled"
764
765         err = runner.StartContainer()
766         if err != nil {
767                 return
768         }
769
770         err = runner.WaitFinish()
771         if err == nil {
772                 runner.finalState = "Complete"
773         }
774         return
775 }
776
777 // NewContainerRunner creates a new container runner.
778 func NewContainerRunner(api IArvadosClient,
779         kc IKeepClient,
780         docker ThinDockerClient,
781         containerUUID string) *ContainerRunner {
782
783         cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
784         cr.NewLogWriter = cr.NewArvLogWriter
785         cr.RunArvMount = cr.ArvMountCmd
786         cr.MkTempDir = ioutil.TempDir
787         cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
788         cr.Container.UUID = containerUUID
789         cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
790         cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
791         return cr
792 }
793
794 func main() {
795         flag.Parse()
796
797         containerId := flag.Arg(0)
798
799         api, err := arvadosclient.MakeArvadosClient()
800         if err != nil {
801                 log.Fatalf("%s: %v", containerId, err)
802         }
803         api.Retries = 8
804
805         var kc *keepclient.KeepClient
806         kc, err = keepclient.MakeKeepClient(&api)
807         if err != nil {
808                 log.Fatalf("%s: %v", containerId, err)
809         }
810         kc.Retries = 4
811
812         var docker *dockerclient.DockerClient
813         docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
814         if err != nil {
815                 log.Fatalf("%s: %v", containerId, err)
816         }
817
818         cr := NewContainerRunner(api, kc, docker, containerId)
819
820         err = cr.Run()
821         if err != nil {
822                 log.Fatalf("%s: %v", containerId, err)
823         }
824
825 }