9272: Pass container auth info into container if requested.
[arvados.git] / services / crunch-run / crunchrun.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/keepclient"
10         "git.curoverse.com/arvados.git/sdk/go/manifest"
11         "github.com/curoverse/dockerclient"
12         "io"
13         "io/ioutil"
14         "log"
15         "os"
16         "os/exec"
17         "os/signal"
18         "path"
19         "strings"
20         "sync"
21         "syscall"
22         "time"
23 )
24
25 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
26 type IArvadosClient interface {
27         Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
28         Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
29         Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
30         Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) (err error)
31 }
32
33 // ErrCancelled is the error returned when the container is cancelled.
34 var ErrCancelled = errors.New("Cancelled")
35
36 // IKeepClient is the minimal Keep API methods used by crunch-run.
37 type IKeepClient interface {
38         PutHB(hash string, buf []byte) (string, int, error)
39         ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
40 }
41
42 // Mount describes the mount points to create inside the container.
43 type Mount struct {
44         Kind             string `json:"kind"`
45         Writable         bool   `json:"writable"`
46         PortableDataHash string `json:"portable_data_hash"`
47         UUID             string `json:"uuid"`
48         DeviceType       string `json:"device_type"`
49         Path             string `json:"path"`
50 }
51
52 // Collection record returned by the API server.
53 type CollectionRecord struct {
54         ManifestText     string `json:"manifest_text"`
55         PortableDataHash string `json:"portable_data_hash"`
56 }
57
58 type RuntimeConstraints struct {
59         API *bool
60 }
61
62 // ContainerRecord is the container record returned by the API server.
63 type ContainerRecord struct {
64         UUID               string             `json:"uuid"`
65         Command            []string           `json:"command"`
66         ContainerImage     string             `json:"container_image"`
67         Cwd                string             `json:"cwd"`
68         Environment        map[string]string  `json:"environment"`
69         Mounts             map[string]Mount   `json:"mounts"`
70         OutputPath         string             `json:"output_path"`
71         Priority           int                `json:"priority"`
72         RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"`
73         State              string             `json:"state"`
74         Output             string             `json:"output"`
75 }
76
77 // APIClientAuthorization is an arvados#api_client_authorization resource.
78 type APIClientAuthorization struct {
79         UUID     string `json:"uuid"`
80         APIToken string `json:"api_token"`
81 }
82
83 // NewLogWriter is a factory function to create a new log writer.
84 type NewLogWriter func(name string) io.WriteCloser
85
86 type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
87
88 type MkTempDir func(string, string) (string, error)
89
90 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
91 type ThinDockerClient interface {
92         StopContainer(id string, timeout int) error
93         InspectImage(id string) (*dockerclient.ImageInfo, error)
94         LoadImage(reader io.Reader) error
95         CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
96         StartContainer(id string, config *dockerclient.HostConfig) error
97         AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error)
98         Wait(id string) <-chan dockerclient.WaitResult
99         RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
100 }
101
102 // ContainerRunner is the main stateful struct used for a single execution of a
103 // container.
104 type ContainerRunner struct {
105         Docker    ThinDockerClient
106         ArvClient IArvadosClient
107         Kc        IKeepClient
108         ContainerRecord
109         dockerclient.ContainerConfig
110         token       string
111         ContainerID string
112         ExitCode    *int
113         NewLogWriter
114         loggingDone   chan bool
115         CrunchLog     *ThrottledLogger
116         Stdout        io.WriteCloser
117         Stderr        *ThrottledLogger
118         LogCollection *CollectionWriter
119         LogsPDH       *string
120         RunArvMount
121         MkTempDir
122         ArvMount       *exec.Cmd
123         ArvMountPoint  string
124         HostOutputDir  string
125         CleanupTempDir []string
126         Binds          []string
127         OutputPDH      *string
128         CancelLock     sync.Mutex
129         Cancelled      bool
130         SigChan        chan os.Signal
131         ArvMountExit   chan error
132         finalState     string
133 }
134
135 // SetupSignals sets up signal handling to gracefully terminate the underlying
136 // Docker container and update state when receiving a TERM, INT or QUIT signal.
137 func (runner *ContainerRunner) SetupSignals() {
138         runner.SigChan = make(chan os.Signal, 1)
139         signal.Notify(runner.SigChan, syscall.SIGTERM)
140         signal.Notify(runner.SigChan, syscall.SIGINT)
141         signal.Notify(runner.SigChan, syscall.SIGQUIT)
142
143         go func(sig <-chan os.Signal) {
144                 for _ = range sig {
145                         if !runner.Cancelled {
146                                 runner.CancelLock.Lock()
147                                 runner.Cancelled = true
148                                 if runner.ContainerID != "" {
149                                         runner.Docker.StopContainer(runner.ContainerID, 10)
150                                 }
151                                 runner.CancelLock.Unlock()
152                         }
153                 }
154         }(runner.SigChan)
155 }
156
157 // LoadImage determines the docker image id from the container record and
158 // checks if it is available in the local Docker image store.  If not, it loads
159 // the image from Keep.
160 func (runner *ContainerRunner) LoadImage() (err error) {
161
162         runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.ContainerRecord.ContainerImage)
163
164         var collection CollectionRecord
165         err = runner.ArvClient.Get("collections", runner.ContainerRecord.ContainerImage, nil, &collection)
166         if err != nil {
167                 return fmt.Errorf("While getting container image collection: %v", err)
168         }
169         manifest := manifest.Manifest{Text: collection.ManifestText}
170         var img, imageID string
171         for ms := range manifest.StreamIter() {
172                 img = ms.FileStreamSegments[0].Name
173                 if !strings.HasSuffix(img, ".tar") {
174                         return fmt.Errorf("First file in the container image collection does not end in .tar")
175                 }
176                 imageID = img[:len(img)-4]
177         }
178
179         runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
180
181         _, err = runner.Docker.InspectImage(imageID)
182         if err != nil {
183                 runner.CrunchLog.Print("Loading Docker image from keep")
184
185                 var readCloser io.ReadCloser
186                 readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
187                 if err != nil {
188                         return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
189                 }
190
191                 err = runner.Docker.LoadImage(readCloser)
192                 if err != nil {
193                         return fmt.Errorf("While loading container image into Docker: %v", err)
194                 }
195         } else {
196                 runner.CrunchLog.Print("Docker image is available")
197         }
198
199         runner.ContainerConfig.Image = imageID
200
201         return nil
202 }
203
204 func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) {
205         c = exec.Command("arv-mount", arvMountCmd...)
206
207         // Copy our environment, but override ARVADOS_API_TOKEN with
208         // the container auth token.
209         c.Env = nil
210         for _, s := range os.Environ() {
211                 if !strings.HasPrefix(s, "ARVADOS_API_TOKEN=") {
212                         c.Env = append(c.Env, s)
213                 }
214         }
215         c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
216
217         nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
218         c.Stdout = nt
219         c.Stderr = nt
220
221         err = c.Start()
222         if err != nil {
223                 return nil, err
224         }
225
226         statReadme := make(chan bool)
227         runner.ArvMountExit = make(chan error)
228
229         keepStatting := true
230         go func() {
231                 for keepStatting {
232                         time.Sleep(100 * time.Millisecond)
233                         _, err = os.Stat(fmt.Sprintf("%s/by_id/README", runner.ArvMountPoint))
234                         if err == nil {
235                                 keepStatting = false
236                                 statReadme <- true
237                         }
238                 }
239                 close(statReadme)
240         }()
241
242         go func() {
243                 runner.ArvMountExit <- c.Wait()
244                 close(runner.ArvMountExit)
245         }()
246
247         select {
248         case <-statReadme:
249                 break
250         case err := <-runner.ArvMountExit:
251                 runner.ArvMount = nil
252                 keepStatting = false
253                 return nil, err
254         }
255
256         return c, nil
257 }
258
259 func (runner *ContainerRunner) SetupMounts() (err error) {
260         runner.ArvMountPoint, err = runner.MkTempDir("", "keep")
261         if err != nil {
262                 return fmt.Errorf("While creating keep mount temp dir: %v", err)
263         }
264
265         runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
266
267         pdhOnly := true
268         tmpcount := 0
269         arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
270         collectionPaths := []string{}
271         runner.Binds = nil
272
273         for bind, mnt := range runner.ContainerRecord.Mounts {
274                 if bind == "stdout" {
275                         // Is it a "file" mount kind?
276                         if mnt.Kind != "file" {
277                                 return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
278                         }
279
280                         // Does path start with OutputPath?
281                         prefix := runner.ContainerRecord.OutputPath
282                         if !strings.HasSuffix(prefix, "/") {
283                                 prefix += "/"
284                         }
285                         if !strings.HasPrefix(mnt.Path, prefix) {
286                                 return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
287                         }
288                 }
289
290                 if mnt.Kind == "collection" {
291                         var src string
292                         if mnt.UUID != "" && mnt.PortableDataHash != "" {
293                                 return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
294                         }
295                         if mnt.UUID != "" {
296                                 if mnt.Writable {
297                                         return fmt.Errorf("Writing to existing collections currently not permitted.")
298                                 }
299                                 pdhOnly = false
300                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
301                         } else if mnt.PortableDataHash != "" {
302                                 if mnt.Writable {
303                                         return fmt.Errorf("Can never write to a collection specified by portable data hash")
304                                 }
305                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
306                         } else {
307                                 src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
308                                 arvMountCmd = append(arvMountCmd, "--mount-tmp")
309                                 arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
310                                 tmpcount += 1
311                         }
312                         if mnt.Writable {
313                                 if bind == runner.ContainerRecord.OutputPath {
314                                         runner.HostOutputDir = src
315                                 }
316                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
317                         } else {
318                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
319                         }
320                         collectionPaths = append(collectionPaths, src)
321                 } else if mnt.Kind == "tmp" {
322                         if bind == runner.ContainerRecord.OutputPath {
323                                 runner.HostOutputDir, err = runner.MkTempDir("", "")
324                                 if err != nil {
325                                         return fmt.Errorf("While creating mount temp dir: %v", err)
326                                 }
327                                 st, staterr := os.Stat(runner.HostOutputDir)
328                                 if staterr != nil {
329                                         return fmt.Errorf("While Stat on temp dir: %v", staterr)
330                                 }
331                                 err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
332                                 if staterr != nil {
333                                         return fmt.Errorf("While Chmod temp dir: %v", err)
334                                 }
335                                 runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
336                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
337                         } else {
338                                 runner.Binds = append(runner.Binds, bind)
339                         }
340                 }
341         }
342
343         if runner.HostOutputDir == "" {
344                 return fmt.Errorf("Output path does not correspond to a writable mount point")
345         }
346
347         if pdhOnly {
348                 arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
349         } else {
350                 arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
351         }
352         arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
353
354         token, err := runner.ContainerToken()
355         if err != nil {
356                 return fmt.Errorf("could not get container token: %s", err)
357         }
358
359         runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
360         if err != nil {
361                 return fmt.Errorf("While trying to start arv-mount: %v", err)
362         }
363
364         for _, p := range collectionPaths {
365                 _, err = os.Stat(p)
366                 if err != nil {
367                         return fmt.Errorf("While checking that input files exist: %v", err)
368                 }
369         }
370
371         return nil
372 }
373
374 func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
375         // Handle docker log protocol
376         // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
377
378         header := make([]byte, 8)
379         for {
380                 _, readerr := io.ReadAtLeast(containerReader, header, 8)
381
382                 if readerr == nil {
383                         readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
384                         if header[0] == 1 {
385                                 // stdout
386                                 _, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
387                         } else {
388                                 // stderr
389                                 _, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
390                         }
391                 }
392
393                 if readerr != nil {
394                         if readerr != io.EOF {
395                                 runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
396                         }
397
398                         closeerr := runner.Stdout.Close()
399                         if closeerr != nil {
400                                 runner.CrunchLog.Printf("While closing stdout logs: %v", closeerr)
401                         }
402
403                         closeerr = runner.Stderr.Close()
404                         if closeerr != nil {
405                                 runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
406                         }
407
408                         runner.loggingDone <- true
409                         close(runner.loggingDone)
410                         return
411                 }
412         }
413 }
414
415 // AttachLogs connects the docker container stdout and stderr logs to the
416 // Arvados logger which logs to Keep and the API server logs table.
417 func (runner *ContainerRunner) AttachStreams() (err error) {
418
419         runner.CrunchLog.Print("Attaching container streams")
420
421         var containerReader io.Reader
422         containerReader, err = runner.Docker.AttachContainer(runner.ContainerID,
423                 &dockerclient.AttachOptions{Stream: true, Stdout: true, Stderr: true})
424         if err != nil {
425                 return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
426         }
427
428         runner.loggingDone = make(chan bool)
429
430         if stdoutMnt, ok := runner.ContainerRecord.Mounts["stdout"]; ok {
431                 stdoutPath := stdoutMnt.Path[len(runner.ContainerRecord.OutputPath):]
432                 index := strings.LastIndex(stdoutPath, "/")
433                 if index > 0 {
434                         subdirs := stdoutPath[:index]
435                         if subdirs != "" {
436                                 st, err := os.Stat(runner.HostOutputDir)
437                                 if err != nil {
438                                         return fmt.Errorf("While Stat on temp dir: %v", err)
439                                 }
440                                 stdoutPath := path.Join(runner.HostOutputDir, subdirs)
441                                 err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
442                                 if err != nil {
443                                         return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
444                                 }
445                         }
446                 }
447                 stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
448                 if err != nil {
449                         return fmt.Errorf("While creating stdout file: %v", err)
450                 }
451                 runner.Stdout = stdoutFile
452         } else {
453                 runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
454         }
455         runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
456
457         go runner.ProcessDockerAttach(containerReader)
458
459         return nil
460 }
461
462 // StartContainer creates the container and runs it.
463 func (runner *ContainerRunner) StartContainer() (err error) {
464         runner.CrunchLog.Print("Creating Docker container")
465
466         runner.CancelLock.Lock()
467         defer runner.CancelLock.Unlock()
468
469         if runner.Cancelled {
470                 return ErrCancelled
471         }
472
473         runner.ContainerConfig.Cmd = runner.ContainerRecord.Command
474         if runner.ContainerRecord.Cwd != "." {
475                 runner.ContainerConfig.WorkingDir = runner.ContainerRecord.Cwd
476         }
477
478         for k, v := range runner.ContainerRecord.Environment {
479                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
480         }
481         if wantAPI := runner.ContainerRecord.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
482                 tok, err := runner.ContainerToken()
483                 if err != nil {
484                         return err
485                 }
486                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env,
487                         "ARVADOS_API_TOKEN="+tok,
488                         "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
489                         "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
490                 )
491         }
492
493         runner.ContainerConfig.NetworkDisabled = true
494         runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
495         if err != nil {
496                 return fmt.Errorf("While creating container: %v", err)
497         }
498         hostConfig := &dockerclient.HostConfig{Binds: runner.Binds,
499                 LogConfig: dockerclient.LogConfig{Type: "none"}}
500
501         err = runner.AttachStreams()
502         if err != nil {
503                 return err
504         }
505
506         runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
507         err = runner.Docker.StartContainer(runner.ContainerID, hostConfig)
508         if err != nil {
509                 return fmt.Errorf("While starting container: %v", err)
510         }
511
512         return nil
513 }
514
515 // WaitFinish waits for the container to terminate, capture the exit code, and
516 // close the stdout/stderr logging.
517 func (runner *ContainerRunner) WaitFinish() error {
518         runner.CrunchLog.Print("Waiting for container to finish")
519
520         result := runner.Docker.Wait(runner.ContainerID)
521         wr := <-result
522         if wr.Error != nil {
523                 return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
524         }
525         runner.ExitCode = &wr.ExitCode
526
527         // wait for stdout/stderr to complete
528         <-runner.loggingDone
529
530         return nil
531 }
532
533 // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
534 func (runner *ContainerRunner) CaptureOutput() error {
535         if runner.finalState != "Complete" {
536                 return nil
537         }
538
539         if runner.HostOutputDir == "" {
540                 return nil
541         }
542
543         _, err := os.Stat(runner.HostOutputDir)
544         if err != nil {
545                 return fmt.Errorf("While checking host output path: %v", err)
546         }
547
548         var manifestText string
549
550         collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
551         _, err = os.Stat(collectionMetafile)
552         if err != nil {
553                 // Regular directory
554                 cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}}
555                 manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
556                 if err != nil {
557                         return fmt.Errorf("While uploading output files: %v", err)
558                 }
559         } else {
560                 // FUSE mount directory
561                 file, openerr := os.Open(collectionMetafile)
562                 if openerr != nil {
563                         return fmt.Errorf("While opening FUSE metafile: %v", err)
564                 }
565                 defer file.Close()
566
567                 rec := CollectionRecord{}
568                 err = json.NewDecoder(file).Decode(&rec)
569                 if err != nil {
570                         return fmt.Errorf("While reading FUSE metafile: %v", err)
571                 }
572                 manifestText = rec.ManifestText
573         }
574
575         var response CollectionRecord
576         err = runner.ArvClient.Create("collections",
577                 arvadosclient.Dict{
578                         "collection": arvadosclient.Dict{
579                                 "manifest_text": manifestText}},
580                 &response)
581         if err != nil {
582                 return fmt.Errorf("While creating output collection: %v", err)
583         }
584
585         runner.OutputPDH = new(string)
586         *runner.OutputPDH = response.PortableDataHash
587
588         return nil
589 }
590
591 func (runner *ContainerRunner) CleanupDirs() {
592         if runner.ArvMount != nil {
593                 umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
594                 umnterr := umount.Run()
595                 if umnterr != nil {
596                         runner.CrunchLog.Printf("While running fusermount: %v", umnterr)
597                 }
598
599                 mnterr := <-runner.ArvMountExit
600                 if mnterr != nil {
601                         runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
602                 }
603         }
604
605         for _, tmpdir := range runner.CleanupTempDir {
606                 rmerr := os.RemoveAll(tmpdir)
607                 if rmerr != nil {
608                         runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
609                 }
610         }
611 }
612
613 // CommitLogs posts the collection containing the final container logs.
614 func (runner *ContainerRunner) CommitLogs() error {
615         runner.CrunchLog.Print(runner.finalState)
616         runner.CrunchLog.Close()
617
618         // Closing CrunchLog above allows it to be committed to Keep at this
619         // point, but re-open crunch log with ArvClient in case there are any
620         // other further (such as failing to write the log to Keep!) while
621         // shutting down
622         runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID,
623                 "crunch-run", nil})
624
625         mt, err := runner.LogCollection.ManifestText()
626         if err != nil {
627                 return fmt.Errorf("While creating log manifest: %v", err)
628         }
629
630         var response CollectionRecord
631         err = runner.ArvClient.Create("collections",
632                 arvadosclient.Dict{
633                         "collection": arvadosclient.Dict{
634                                 "name":          "logs for " + runner.ContainerRecord.UUID,
635                                 "manifest_text": mt}},
636                 &response)
637         if err != nil {
638                 return fmt.Errorf("While creating log collection: %v", err)
639         }
640
641         runner.LogsPDH = new(string)
642         *runner.LogsPDH = response.PortableDataHash
643
644         return nil
645 }
646
647 // UpdateContainerRecordRunning updates the container state to "Running"
648 func (runner *ContainerRunner) UpdateContainerRecordRunning() error {
649         return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID,
650                 arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
651 }
652
653 // ContainerToken returns the api_token the container (and any
654 // arv-mount processes) are allowed to use.
655 func (runner *ContainerRunner) ContainerToken() (string, error) {
656         if runner.token != "" {
657                 return runner.token, nil
658         }
659
660         var auth APIClientAuthorization
661         err := runner.ArvClient.Call("GET", "containers", runner.ContainerRecord.UUID, "auth", nil, &auth)
662         if err != nil {
663                 return "", err
664         }
665         runner.token = auth.APIToken
666         return runner.token, nil
667 }
668
669 // UpdateContainerRecordComplete updates the container record state on API
670 // server to "Complete" or "Cancelled"
671 func (runner *ContainerRunner) UpdateContainerRecordComplete() error {
672         update := arvadosclient.Dict{}
673         if runner.LogsPDH != nil {
674                 update["log"] = *runner.LogsPDH
675         }
676         if runner.ExitCode != nil {
677                 update["exit_code"] = *runner.ExitCode
678         }
679         if runner.OutputPDH != nil {
680                 update["output"] = runner.OutputPDH
681         }
682
683         update["state"] = runner.finalState
684
685         return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, arvadosclient.Dict{"container": update}, nil)
686 }
687
688 // NewArvLogWriter creates an ArvLogWriter
689 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
690         return &ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID, name, runner.LogCollection.Open(name + ".txt")}
691 }
692
693 // Run the full container lifecycle.
694 func (runner *ContainerRunner) Run() (err error) {
695         runner.CrunchLog.Printf("Executing container '%s'", runner.ContainerRecord.UUID)
696
697         hostname, hosterr := os.Hostname()
698         if hosterr != nil {
699                 runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr)
700         } else {
701                 runner.CrunchLog.Printf("Executing on host '%s'", hostname)
702         }
703
704         var runerr, waiterr error
705
706         defer func() {
707                 if err != nil {
708                         runner.CrunchLog.Print(err)
709                 }
710
711                 if runner.Cancelled {
712                         runner.finalState = "Cancelled"
713                 } else {
714                         runner.finalState = "Complete"
715                 }
716
717                 // (6) capture output
718                 outputerr := runner.CaptureOutput()
719                 if outputerr != nil {
720                         runner.CrunchLog.Print(outputerr)
721                 }
722
723                 // (7) clean up temporary directories
724                 runner.CleanupDirs()
725
726                 // (8) write logs
727                 logerr := runner.CommitLogs()
728                 if logerr != nil {
729                         runner.CrunchLog.Print(logerr)
730                 }
731
732                 // (9) update container record with results
733                 updateerr := runner.UpdateContainerRecordComplete()
734                 if updateerr != nil {
735                         runner.CrunchLog.Print(updateerr)
736                 }
737
738                 runner.CrunchLog.Close()
739
740                 if err == nil {
741                         if runerr != nil {
742                                 err = runerr
743                         } else if waiterr != nil {
744                                 err = waiterr
745                         } else if logerr != nil {
746                                 err = logerr
747                         } else if updateerr != nil {
748                                 err = updateerr
749                         }
750                 }
751         }()
752
753         err = runner.ArvClient.Get("containers", runner.ContainerRecord.UUID, nil, &runner.ContainerRecord)
754         if err != nil {
755                 return fmt.Errorf("While getting container record: %v", err)
756         }
757
758         // (1) setup signal handling
759         runner.SetupSignals()
760
761         // (2) check for and/or load image
762         err = runner.LoadImage()
763         if err != nil {
764                 return fmt.Errorf("While loading container image: %v", err)
765         }
766
767         // (3) set up FUSE mount and binds
768         err = runner.SetupMounts()
769         if err != nil {
770                 return fmt.Errorf("While setting up mounts: %v", err)
771         }
772
773         // (3) create and start container
774         err = runner.StartContainer()
775         if err != nil {
776                 if err == ErrCancelled {
777                         err = nil
778                 }
779                 return
780         }
781
782         // (4) update container record state
783         err = runner.UpdateContainerRecordRunning()
784         if err != nil {
785                 runner.CrunchLog.Print(err)
786         }
787
788         // (5) wait for container to finish
789         waiterr = runner.WaitFinish()
790
791         return
792 }
793
794 // NewContainerRunner creates a new container runner.
795 func NewContainerRunner(api IArvadosClient,
796         kc IKeepClient,
797         docker ThinDockerClient,
798         containerUUID string) *ContainerRunner {
799
800         cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
801         cr.NewLogWriter = cr.NewArvLogWriter
802         cr.RunArvMount = cr.ArvMountCmd
803         cr.MkTempDir = ioutil.TempDir
804         cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
805         cr.ContainerRecord.UUID = containerUUID
806         cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
807         cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
808         return cr
809 }
810
811 func main() {
812         flag.Parse()
813
814         containerId := flag.Arg(0)
815
816         api, err := arvadosclient.MakeArvadosClient()
817         if err != nil {
818                 log.Fatalf("%s: %v", containerId, err)
819         }
820         api.Retries = 8
821
822         var kc *keepclient.KeepClient
823         kc, err = keepclient.MakeKeepClient(&api)
824         if err != nil {
825                 log.Fatalf("%s: %v", containerId, err)
826         }
827         kc.Retries = 4
828
829         var docker *dockerclient.DockerClient
830         docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
831         if err != nil {
832                 log.Fatalf("%s: %v", containerId, err)
833         }
834
835         cr := NewContainerRunner(api, kc, docker, containerId)
836
837         err = cr.Run()
838         if err != nil {
839                 log.Fatalf("%s: %v", containerId, err)
840         }
841
842 }