Merge branch '8016-crunchrun-crunchstat'
[arvados.git] / services / crunch-run / crunchrun.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/lib/crunchstat"
9         "git.curoverse.com/arvados.git/sdk/go/arvados"
10         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
11         "git.curoverse.com/arvados.git/sdk/go/keepclient"
12         "git.curoverse.com/arvados.git/sdk/go/manifest"
13         "github.com/curoverse/dockerclient"
14         "io"
15         "io/ioutil"
16         "log"
17         "os"
18         "os/exec"
19         "os/signal"
20         "path"
21         "strings"
22         "sync"
23         "syscall"
24         "time"
25 )
26
27 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
28 type IArvadosClient interface {
29         Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
30         Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
31         Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
32         Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) (err error)
33 }
34
35 // ErrCancelled is the error returned when the container is cancelled.
36 var ErrCancelled = errors.New("Cancelled")
37
38 // IKeepClient is the minimal Keep API methods used by crunch-run.
39 type IKeepClient interface {
40         PutHB(hash string, buf []byte) (string, int, error)
41         ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
42 }
43
44 // NewLogWriter is a factory function to create a new log writer.
45 type NewLogWriter func(name string) io.WriteCloser
46
47 type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
48
49 type MkTempDir func(string, string) (string, error)
50
51 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
52 type ThinDockerClient interface {
53         StopContainer(id string, timeout int) error
54         InspectImage(id string) (*dockerclient.ImageInfo, error)
55         LoadImage(reader io.Reader) error
56         CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
57         StartContainer(id string, config *dockerclient.HostConfig) error
58         AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error)
59         Wait(id string) <-chan dockerclient.WaitResult
60         RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
61 }
62
63 // ContainerRunner is the main stateful struct used for a single execution of a
64 // container.
65 type ContainerRunner struct {
66         Docker    ThinDockerClient
67         ArvClient IArvadosClient
68         Kc        IKeepClient
69         arvados.Container
70         dockerclient.ContainerConfig
71         dockerclient.HostConfig
72         token       string
73         ContainerID string
74         ExitCode    *int
75         NewLogWriter
76         loggingDone   chan bool
77         CrunchLog     *ThrottledLogger
78         Stdout        io.WriteCloser
79         Stderr        *ThrottledLogger
80         LogCollection *CollectionWriter
81         LogsPDH       *string
82         RunArvMount
83         MkTempDir
84         ArvMount       *exec.Cmd
85         ArvMountPoint  string
86         HostOutputDir  string
87         CleanupTempDir []string
88         Binds          []string
89         OutputPDH      *string
90         CancelLock     sync.Mutex
91         Cancelled      bool
92         SigChan        chan os.Signal
93         ArvMountExit   chan error
94         finalState     string
95
96         statLogger   io.WriteCloser
97         statReporter *crunchstat.Reporter
98         statInterval time.Duration
99         cgroupRoot   string
100         cgroupParent string
101 }
102
103 // SetupSignals sets up signal handling to gracefully terminate the underlying
104 // Docker container and update state when receiving a TERM, INT or QUIT signal.
105 func (runner *ContainerRunner) SetupSignals() {
106         runner.SigChan = make(chan os.Signal, 1)
107         signal.Notify(runner.SigChan, syscall.SIGTERM)
108         signal.Notify(runner.SigChan, syscall.SIGINT)
109         signal.Notify(runner.SigChan, syscall.SIGQUIT)
110
111         go func(sig <-chan os.Signal) {
112                 for range sig {
113                         if !runner.Cancelled {
114                                 runner.CancelLock.Lock()
115                                 runner.Cancelled = true
116                                 if runner.ContainerID != "" {
117                                         runner.Docker.StopContainer(runner.ContainerID, 10)
118                                 }
119                                 runner.CancelLock.Unlock()
120                         }
121                 }
122         }(runner.SigChan)
123 }
124
125 // LoadImage determines the docker image id from the container record and
126 // checks if it is available in the local Docker image store.  If not, it loads
127 // the image from Keep.
128 func (runner *ContainerRunner) LoadImage() (err error) {
129
130         runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
131
132         var collection arvados.Collection
133         err = runner.ArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
134         if err != nil {
135                 return fmt.Errorf("While getting container image collection: %v", err)
136         }
137         manifest := manifest.Manifest{Text: collection.ManifestText}
138         var img, imageID string
139         for ms := range manifest.StreamIter() {
140                 img = ms.FileStreamSegments[0].Name
141                 if !strings.HasSuffix(img, ".tar") {
142                         return fmt.Errorf("First file in the container image collection does not end in .tar")
143                 }
144                 imageID = img[:len(img)-4]
145         }
146
147         runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
148
149         _, err = runner.Docker.InspectImage(imageID)
150         if err != nil {
151                 runner.CrunchLog.Print("Loading Docker image from keep")
152
153                 var readCloser io.ReadCloser
154                 readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
155                 if err != nil {
156                         return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
157                 }
158
159                 err = runner.Docker.LoadImage(readCloser)
160                 if err != nil {
161                         return fmt.Errorf("While loading container image into Docker: %v", err)
162                 }
163         } else {
164                 runner.CrunchLog.Print("Docker image is available")
165         }
166
167         runner.ContainerConfig.Image = imageID
168
169         return nil
170 }
171
172 func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) {
173         c = exec.Command("arv-mount", arvMountCmd...)
174
175         // Copy our environment, but override ARVADOS_API_TOKEN with
176         // the container auth token.
177         c.Env = nil
178         for _, s := range os.Environ() {
179                 if !strings.HasPrefix(s, "ARVADOS_API_TOKEN=") {
180                         c.Env = append(c.Env, s)
181                 }
182         }
183         c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
184
185         nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
186         c.Stdout = nt
187         c.Stderr = nt
188
189         err = c.Start()
190         if err != nil {
191                 return nil, err
192         }
193
194         statReadme := make(chan bool)
195         runner.ArvMountExit = make(chan error)
196
197         keepStatting := true
198         go func() {
199                 for keepStatting {
200                         time.Sleep(100 * time.Millisecond)
201                         _, err = os.Stat(fmt.Sprintf("%s/by_id/README", runner.ArvMountPoint))
202                         if err == nil {
203                                 keepStatting = false
204                                 statReadme <- true
205                         }
206                 }
207                 close(statReadme)
208         }()
209
210         go func() {
211                 runner.ArvMountExit <- c.Wait()
212                 close(runner.ArvMountExit)
213         }()
214
215         select {
216         case <-statReadme:
217                 break
218         case err := <-runner.ArvMountExit:
219                 runner.ArvMount = nil
220                 keepStatting = false
221                 return nil, err
222         }
223
224         return c, nil
225 }
226
227 func (runner *ContainerRunner) SetupMounts() (err error) {
228         runner.ArvMountPoint, err = runner.MkTempDir("", "keep")
229         if err != nil {
230                 return fmt.Errorf("While creating keep mount temp dir: %v", err)
231         }
232
233         runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
234
235         pdhOnly := true
236         tmpcount := 0
237         arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
238         collectionPaths := []string{}
239         runner.Binds = nil
240
241         for bind, mnt := range runner.Container.Mounts {
242                 if bind == "stdout" {
243                         // Is it a "file" mount kind?
244                         if mnt.Kind != "file" {
245                                 return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
246                         }
247
248                         // Does path start with OutputPath?
249                         prefix := runner.Container.OutputPath
250                         if !strings.HasSuffix(prefix, "/") {
251                                 prefix += "/"
252                         }
253                         if !strings.HasPrefix(mnt.Path, prefix) {
254                                 return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
255                         }
256                 }
257
258                 if mnt.Kind == "collection" {
259                         var src string
260                         if mnt.UUID != "" && mnt.PortableDataHash != "" {
261                                 return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
262                         }
263                         if mnt.UUID != "" {
264                                 if mnt.Writable {
265                                         return fmt.Errorf("Writing to existing collections currently not permitted.")
266                                 }
267                                 pdhOnly = false
268                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
269                         } else if mnt.PortableDataHash != "" {
270                                 if mnt.Writable {
271                                         return fmt.Errorf("Can never write to a collection specified by portable data hash")
272                                 }
273                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
274                         } else {
275                                 src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
276                                 arvMountCmd = append(arvMountCmd, "--mount-tmp")
277                                 arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
278                                 tmpcount += 1
279                         }
280                         if mnt.Writable {
281                                 if bind == runner.Container.OutputPath {
282                                         runner.HostOutputDir = src
283                                 }
284                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
285                         } else {
286                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
287                         }
288                         collectionPaths = append(collectionPaths, src)
289                 } else if mnt.Kind == "tmp" {
290                         if bind == runner.Container.OutputPath {
291                                 runner.HostOutputDir, err = runner.MkTempDir("", "")
292                                 if err != nil {
293                                         return fmt.Errorf("While creating mount temp dir: %v", err)
294                                 }
295                                 st, staterr := os.Stat(runner.HostOutputDir)
296                                 if staterr != nil {
297                                         return fmt.Errorf("While Stat on temp dir: %v", staterr)
298                                 }
299                                 err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
300                                 if staterr != nil {
301                                         return fmt.Errorf("While Chmod temp dir: %v", err)
302                                 }
303                                 runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
304                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
305                         } else {
306                                 runner.Binds = append(runner.Binds, bind)
307                         }
308                 }
309         }
310
311         if runner.HostOutputDir == "" {
312                 return fmt.Errorf("Output path does not correspond to a writable mount point")
313         }
314
315         if pdhOnly {
316                 arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
317         } else {
318                 arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
319         }
320         arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
321
322         token, err := runner.ContainerToken()
323         if err != nil {
324                 return fmt.Errorf("could not get container token: %s", err)
325         }
326
327         runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
328         if err != nil {
329                 return fmt.Errorf("While trying to start arv-mount: %v", err)
330         }
331
332         for _, p := range collectionPaths {
333                 _, err = os.Stat(p)
334                 if err != nil {
335                         return fmt.Errorf("While checking that input files exist: %v", err)
336                 }
337         }
338
339         return nil
340 }
341
342 func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
343         // Handle docker log protocol
344         // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
345
346         header := make([]byte, 8)
347         for {
348                 _, readerr := io.ReadAtLeast(containerReader, header, 8)
349
350                 if readerr == nil {
351                         readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
352                         if header[0] == 1 {
353                                 // stdout
354                                 _, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
355                         } else {
356                                 // stderr
357                                 _, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
358                         }
359                 }
360
361                 if readerr != nil {
362                         if readerr != io.EOF {
363                                 runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
364                         }
365
366                         closeerr := runner.Stdout.Close()
367                         if closeerr != nil {
368                                 runner.CrunchLog.Printf("While closing stdout logs: %v", closeerr)
369                         }
370
371                         closeerr = runner.Stderr.Close()
372                         if closeerr != nil {
373                                 runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
374                         }
375
376                         if runner.statReporter != nil {
377                                 runner.statReporter.Stop()
378                                 closeerr = runner.statLogger.Close()
379                                 if closeerr != nil {
380                                         runner.CrunchLog.Printf("While closing crunchstat logs: %v", closeerr)
381                                 }
382                         }
383
384                         runner.loggingDone <- true
385                         close(runner.loggingDone)
386                         return
387                 }
388         }
389 }
390
391 func (runner *ContainerRunner) StartCrunchstat() {
392         runner.statLogger = NewThrottledLogger(runner.NewLogWriter("crunchstat"))
393         runner.statReporter = &crunchstat.Reporter{
394                 CID:          runner.ContainerID,
395                 Logger:       log.New(runner.statLogger, "", 0),
396                 CgroupParent: runner.cgroupParent,
397                 CgroupRoot:   runner.cgroupRoot,
398                 PollPeriod:   runner.statInterval,
399         }
400         runner.statReporter.Start()
401 }
402
403 // AttachLogs connects the docker container stdout and stderr logs to the
404 // Arvados logger which logs to Keep and the API server logs table.
405 func (runner *ContainerRunner) AttachStreams() (err error) {
406
407         runner.CrunchLog.Print("Attaching container streams")
408
409         var containerReader io.Reader
410         containerReader, err = runner.Docker.AttachContainer(runner.ContainerID,
411                 &dockerclient.AttachOptions{Stream: true, Stdout: true, Stderr: true})
412         if err != nil {
413                 return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
414         }
415
416         runner.loggingDone = make(chan bool)
417
418         if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
419                 stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):]
420                 index := strings.LastIndex(stdoutPath, "/")
421                 if index > 0 {
422                         subdirs := stdoutPath[:index]
423                         if subdirs != "" {
424                                 st, err := os.Stat(runner.HostOutputDir)
425                                 if err != nil {
426                                         return fmt.Errorf("While Stat on temp dir: %v", err)
427                                 }
428                                 stdoutPath := path.Join(runner.HostOutputDir, subdirs)
429                                 err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
430                                 if err != nil {
431                                         return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
432                                 }
433                         }
434                 }
435                 stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
436                 if err != nil {
437                         return fmt.Errorf("While creating stdout file: %v", err)
438                 }
439                 runner.Stdout = stdoutFile
440         } else {
441                 runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
442         }
443         runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
444
445         go runner.ProcessDockerAttach(containerReader)
446
447         return nil
448 }
449
450 // CreateContainer creates the docker container.
451 func (runner *ContainerRunner) CreateContainer() error {
452         runner.CrunchLog.Print("Creating Docker container")
453
454         runner.ContainerConfig.Cmd = runner.Container.Command
455         if runner.Container.Cwd != "." {
456                 runner.ContainerConfig.WorkingDir = runner.Container.Cwd
457         }
458
459         for k, v := range runner.Container.Environment {
460                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
461         }
462         if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
463                 tok, err := runner.ContainerToken()
464                 if err != nil {
465                         return err
466                 }
467                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env,
468                         "ARVADOS_API_TOKEN="+tok,
469                         "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
470                         "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
471                 )
472                 runner.ContainerConfig.NetworkDisabled = false
473         } else {
474                 runner.ContainerConfig.NetworkDisabled = true
475         }
476
477         var err error
478         runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
479         if err != nil {
480                 return fmt.Errorf("While creating container: %v", err)
481         }
482
483         runner.HostConfig = dockerclient.HostConfig{Binds: runner.Binds,
484                 LogConfig: dockerclient.LogConfig{Type: "none"}}
485
486         return runner.AttachStreams()
487 }
488
489 // StartContainer starts the docker container created by CreateContainer.
490 func (runner *ContainerRunner) StartContainer() error {
491         runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
492         err := runner.Docker.StartContainer(runner.ContainerID, &runner.HostConfig)
493         if err != nil {
494                 return fmt.Errorf("could not start container: %v", err)
495         }
496         return nil
497 }
498
499 // WaitFinish waits for the container to terminate, capture the exit code, and
500 // close the stdout/stderr logging.
501 func (runner *ContainerRunner) WaitFinish() error {
502         runner.CrunchLog.Print("Waiting for container to finish")
503
504         result := runner.Docker.Wait(runner.ContainerID)
505         wr := <-result
506         if wr.Error != nil {
507                 return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
508         }
509         runner.ExitCode = &wr.ExitCode
510
511         // wait for stdout/stderr to complete
512         <-runner.loggingDone
513
514         return nil
515 }
516
517 // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
518 func (runner *ContainerRunner) CaptureOutput() error {
519         if runner.finalState != "Complete" {
520                 return nil
521         }
522
523         if runner.HostOutputDir == "" {
524                 return nil
525         }
526
527         _, err := os.Stat(runner.HostOutputDir)
528         if err != nil {
529                 return fmt.Errorf("While checking host output path: %v", err)
530         }
531
532         var manifestText string
533
534         collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
535         _, err = os.Stat(collectionMetafile)
536         if err != nil {
537                 // Regular directory
538                 cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}}
539                 manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
540                 if err != nil {
541                         return fmt.Errorf("While uploading output files: %v", err)
542                 }
543         } else {
544                 // FUSE mount directory
545                 file, openerr := os.Open(collectionMetafile)
546                 if openerr != nil {
547                         return fmt.Errorf("While opening FUSE metafile: %v", err)
548                 }
549                 defer file.Close()
550
551                 var rec arvados.Collection
552                 err = json.NewDecoder(file).Decode(&rec)
553                 if err != nil {
554                         return fmt.Errorf("While reading FUSE metafile: %v", err)
555                 }
556                 manifestText = rec.ManifestText
557         }
558
559         var response arvados.Collection
560         err = runner.ArvClient.Create("collections",
561                 arvadosclient.Dict{
562                         "collection": arvadosclient.Dict{
563                                 "manifest_text": manifestText}},
564                 &response)
565         if err != nil {
566                 return fmt.Errorf("While creating output collection: %v", err)
567         }
568
569         runner.OutputPDH = new(string)
570         *runner.OutputPDH = response.PortableDataHash
571
572         return nil
573 }
574
575 func (runner *ContainerRunner) CleanupDirs() {
576         if runner.ArvMount != nil {
577                 umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
578                 umnterr := umount.Run()
579                 if umnterr != nil {
580                         runner.CrunchLog.Printf("While running fusermount: %v", umnterr)
581                 }
582
583                 mnterr := <-runner.ArvMountExit
584                 if mnterr != nil {
585                         runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
586                 }
587         }
588
589         for _, tmpdir := range runner.CleanupTempDir {
590                 rmerr := os.RemoveAll(tmpdir)
591                 if rmerr != nil {
592                         runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
593                 }
594         }
595 }
596
597 // CommitLogs posts the collection containing the final container logs.
598 func (runner *ContainerRunner) CommitLogs() error {
599         runner.CrunchLog.Print(runner.finalState)
600         runner.CrunchLog.Close()
601
602         // Closing CrunchLog above allows it to be committed to Keep at this
603         // point, but re-open crunch log with ArvClient in case there are any
604         // other further (such as failing to write the log to Keep!) while
605         // shutting down
606         runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID,
607                 "crunch-run", nil})
608
609         if runner.LogsPDH != nil {
610                 // If we have already assigned something to LogsPDH,
611                 // we must be closing the re-opened log, which won't
612                 // end up getting attached to the container record and
613                 // therefore doesn't need to be saved as a collection
614                 // -- it exists only to send logs to other channels.
615                 return nil
616         }
617
618         mt, err := runner.LogCollection.ManifestText()
619         if err != nil {
620                 return fmt.Errorf("While creating log manifest: %v", err)
621         }
622
623         var response arvados.Collection
624         err = runner.ArvClient.Create("collections",
625                 arvadosclient.Dict{
626                         "collection": arvadosclient.Dict{
627                                 "name":          "logs for " + runner.Container.UUID,
628                                 "manifest_text": mt}},
629                 &response)
630         if err != nil {
631                 return fmt.Errorf("While creating log collection: %v", err)
632         }
633
634         runner.LogsPDH = &response.PortableDataHash
635
636         return nil
637 }
638
639 // UpdateContainerRunning updates the container state to "Running"
640 func (runner *ContainerRunner) UpdateContainerRunning() error {
641         runner.CancelLock.Lock()
642         defer runner.CancelLock.Unlock()
643         if runner.Cancelled {
644                 return ErrCancelled
645         }
646         return runner.ArvClient.Update("containers", runner.Container.UUID,
647                 arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
648 }
649
650 // ContainerToken returns the api_token the container (and any
651 // arv-mount processes) are allowed to use.
652 func (runner *ContainerRunner) ContainerToken() (string, error) {
653         if runner.token != "" {
654                 return runner.token, nil
655         }
656
657         var auth arvados.APIClientAuthorization
658         err := runner.ArvClient.Call("GET", "containers", runner.Container.UUID, "auth", nil, &auth)
659         if err != nil {
660                 return "", err
661         }
662         runner.token = auth.APIToken
663         return runner.token, nil
664 }
665
666 // UpdateContainerComplete updates the container record state on API
667 // server to "Complete" or "Cancelled"
668 func (runner *ContainerRunner) UpdateContainerFinal() error {
669         update := arvadosclient.Dict{}
670         update["state"] = runner.finalState
671         if runner.finalState == "Complete" {
672                 if runner.LogsPDH != nil {
673                         update["log"] = *runner.LogsPDH
674                 }
675                 if runner.ExitCode != nil {
676                         update["exit_code"] = *runner.ExitCode
677                 }
678                 if runner.OutputPDH != nil {
679                         update["output"] = *runner.OutputPDH
680                 }
681         }
682         return runner.ArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
683 }
684
685 // IsCancelled returns the value of Cancelled, with goroutine safety.
686 func (runner *ContainerRunner) IsCancelled() bool {
687         runner.CancelLock.Lock()
688         defer runner.CancelLock.Unlock()
689         return runner.Cancelled
690 }
691
692 // NewArvLogWriter creates an ArvLogWriter
693 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
694         return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")}
695 }
696
697 // Run the full container lifecycle.
698 func (runner *ContainerRunner) Run() (err error) {
699         runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
700
701         hostname, hosterr := os.Hostname()
702         if hosterr != nil {
703                 runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr)
704         } else {
705                 runner.CrunchLog.Printf("Executing on host '%s'", hostname)
706         }
707
708         // Clean up temporary directories _after_ finalizing
709         // everything (if we've made any by then)
710         defer runner.CleanupDirs()
711
712         runner.finalState = "Queued"
713
714         defer func() {
715                 // checkErr prints e (unless it's nil) and sets err to
716                 // e (unless err is already non-nil). Thus, if err
717                 // hasn't already been assigned when Run() returns,
718                 // this cleanup func will cause Run() to return the
719                 // first non-nil error that is passed to checkErr().
720                 checkErr := func(e error) {
721                         if e == nil {
722                                 return
723                         }
724                         runner.CrunchLog.Print(e)
725                         if err == nil {
726                                 err = e
727                         }
728                 }
729
730                 // Log the error encountered in Run(), if any
731                 checkErr(err)
732
733                 if runner.finalState == "Queued" {
734                         runner.UpdateContainerFinal()
735                         return
736                 }
737
738                 if runner.IsCancelled() {
739                         runner.finalState = "Cancelled"
740                         // but don't return yet -- we still want to
741                         // capture partial output and write logs
742                 }
743
744                 checkErr(runner.CaptureOutput())
745                 checkErr(runner.CommitLogs())
746                 checkErr(runner.UpdateContainerFinal())
747
748                 // The real log is already closed, but then we opened
749                 // a new one in case we needed to log anything while
750                 // finalizing.
751                 runner.CrunchLog.Close()
752         }()
753
754         err = runner.ArvClient.Get("containers", runner.Container.UUID, nil, &runner.Container)
755         if err != nil {
756                 err = fmt.Errorf("While getting container record: %v", err)
757                 return
758         }
759
760         // setup signal handling
761         runner.SetupSignals()
762
763         // check for and/or load image
764         err = runner.LoadImage()
765         if err != nil {
766                 err = fmt.Errorf("While loading container image: %v", err)
767                 return
768         }
769
770         // set up FUSE mount and binds
771         err = runner.SetupMounts()
772         if err != nil {
773                 err = fmt.Errorf("While setting up mounts: %v", err)
774                 return
775         }
776
777         err = runner.CreateContainer()
778         if err != nil {
779                 return
780         }
781
782         runner.StartCrunchstat()
783
784         if runner.IsCancelled() {
785                 return
786         }
787
788         err = runner.UpdateContainerRunning()
789         if err != nil {
790                 return
791         }
792         runner.finalState = "Cancelled"
793
794         err = runner.StartContainer()
795         if err != nil {
796                 return
797         }
798
799         err = runner.WaitFinish()
800         if err == nil {
801                 runner.finalState = "Complete"
802         }
803         return
804 }
805
806 // NewContainerRunner creates a new container runner.
807 func NewContainerRunner(api IArvadosClient,
808         kc IKeepClient,
809         docker ThinDockerClient,
810         containerUUID string) *ContainerRunner {
811
812         cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
813         cr.NewLogWriter = cr.NewArvLogWriter
814         cr.RunArvMount = cr.ArvMountCmd
815         cr.MkTempDir = ioutil.TempDir
816         cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
817         cr.Container.UUID = containerUUID
818         cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
819         cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
820         return cr
821 }
822
823 func main() {
824         statInterval := flag.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
825         cgroupRoot := flag.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
826         cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup")
827         flag.Parse()
828
829         containerId := flag.Arg(0)
830
831         api, err := arvadosclient.MakeArvadosClient()
832         if err != nil {
833                 log.Fatalf("%s: %v", containerId, err)
834         }
835         api.Retries = 8
836
837         var kc *keepclient.KeepClient
838         kc, err = keepclient.MakeKeepClient(&api)
839         if err != nil {
840                 log.Fatalf("%s: %v", containerId, err)
841         }
842         kc.Retries = 4
843
844         var docker *dockerclient.DockerClient
845         docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
846         if err != nil {
847                 log.Fatalf("%s: %v", containerId, err)
848         }
849
850         cr := NewContainerRunner(api, kc, docker, containerId)
851         cr.statInterval = *statInterval
852         cr.cgroupRoot = *cgroupRoot
853         cr.cgroupParent = *cgroupParent
854
855         err = cr.Run()
856         if err != nil {
857                 log.Fatalf("%s: %v", containerId, err)
858         }
859
860 }