9931: Fix missing include in workbench test support.
[arvados.git] / services / crunch-run / crunchrun.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/lib/crunchstat"
9         "git.curoverse.com/arvados.git/sdk/go/arvados"
10         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
11         "git.curoverse.com/arvados.git/sdk/go/keepclient"
12         "git.curoverse.com/arvados.git/sdk/go/manifest"
13         "github.com/curoverse/dockerclient"
14         "io"
15         "io/ioutil"
16         "log"
17         "os"
18         "os/exec"
19         "os/signal"
20         "path"
21         "path/filepath"
22         "strings"
23         "sync"
24         "syscall"
25         "time"
26 )
27
28 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
29 type IArvadosClient interface {
30         Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
31         Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
32         Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
33         Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) (err error)
34 }
35
36 // ErrCancelled is the error returned when the container is cancelled.
37 var ErrCancelled = errors.New("Cancelled")
38
39 // IKeepClient is the minimal Keep API methods used by crunch-run.
40 type IKeepClient interface {
41         PutHB(hash string, buf []byte) (string, int, error)
42         ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
43 }
44
45 // NewLogWriter is a factory function to create a new log writer.
46 type NewLogWriter func(name string) io.WriteCloser
47
48 type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
49
50 type MkTempDir func(string, string) (string, error)
51
52 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
53 type ThinDockerClient interface {
54         StopContainer(id string, timeout int) error
55         InspectImage(id string) (*dockerclient.ImageInfo, error)
56         LoadImage(reader io.Reader) error
57         CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
58         StartContainer(id string, config *dockerclient.HostConfig) error
59         AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error)
60         Wait(id string) <-chan dockerclient.WaitResult
61         RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
62 }
63
64 // ContainerRunner is the main stateful struct used for a single execution of a
65 // container.
66 type ContainerRunner struct {
67         Docker    ThinDockerClient
68         ArvClient IArvadosClient
69         Kc        IKeepClient
70         arvados.Container
71         dockerclient.ContainerConfig
72         dockerclient.HostConfig
73         token       string
74         ContainerID string
75         ExitCode    *int
76         NewLogWriter
77         loggingDone   chan bool
78         CrunchLog     *ThrottledLogger
79         Stdout        io.WriteCloser
80         Stderr        *ThrottledLogger
81         LogCollection *CollectionWriter
82         LogsPDH       *string
83         RunArvMount
84         MkTempDir
85         ArvMount       *exec.Cmd
86         ArvMountPoint  string
87         HostOutputDir  string
88         CleanupTempDir []string
89         Binds          []string
90         OutputPDH      *string
91         CancelLock     sync.Mutex
92         Cancelled      bool
93         SigChan        chan os.Signal
94         ArvMountExit   chan error
95         finalState     string
96
97         statLogger   io.WriteCloser
98         statReporter *crunchstat.Reporter
99         statInterval time.Duration
100         cgroupRoot   string
101         // What we expect the container's cgroup parent to be.
102         expectCgroupParent string
103         // What we tell docker to use as the container's cgroup
104         // parent. Note: Ideally we would use the same field for both
105         // expectCgroupParent and setCgroupParent, and just make it
106         // default to "docker". However, when using docker < 1.10 with
107         // systemd, specifying a non-empty cgroup parent (even the
108         // default value "docker") hits a docker bug
109         // (https://github.com/docker/docker/issues/17126). Using two
110         // separate fields makes it possible to use the "expect cgroup
111         // parent to be X" feature even on sites where the "specify
112         // cgroup parent" feature breaks.
113         setCgroupParent string
114 }
115
116 // SetupSignals sets up signal handling to gracefully terminate the underlying
117 // Docker container and update state when receiving a TERM, INT or QUIT signal.
118 func (runner *ContainerRunner) SetupSignals() {
119         runner.SigChan = make(chan os.Signal, 1)
120         signal.Notify(runner.SigChan, syscall.SIGTERM)
121         signal.Notify(runner.SigChan, syscall.SIGINT)
122         signal.Notify(runner.SigChan, syscall.SIGQUIT)
123
124         go func(sig <-chan os.Signal) {
125                 for range sig {
126                         if !runner.Cancelled {
127                                 runner.CancelLock.Lock()
128                                 runner.Cancelled = true
129                                 if runner.ContainerID != "" {
130                                         runner.Docker.StopContainer(runner.ContainerID, 10)
131                                 }
132                                 runner.CancelLock.Unlock()
133                         }
134                 }
135         }(runner.SigChan)
136 }
137
138 // LoadImage determines the docker image id from the container record and
139 // checks if it is available in the local Docker image store.  If not, it loads
140 // the image from Keep.
141 func (runner *ContainerRunner) LoadImage() (err error) {
142
143         runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
144
145         var collection arvados.Collection
146         err = runner.ArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
147         if err != nil {
148                 return fmt.Errorf("While getting container image collection: %v", err)
149         }
150         manifest := manifest.Manifest{Text: collection.ManifestText}
151         var img, imageID string
152         for ms := range manifest.StreamIter() {
153                 img = ms.FileStreamSegments[0].Name
154                 if !strings.HasSuffix(img, ".tar") {
155                         return fmt.Errorf("First file in the container image collection does not end in .tar")
156                 }
157                 imageID = img[:len(img)-4]
158         }
159
160         runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
161
162         _, err = runner.Docker.InspectImage(imageID)
163         if err != nil {
164                 runner.CrunchLog.Print("Loading Docker image from keep")
165
166                 var readCloser io.ReadCloser
167                 readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
168                 if err != nil {
169                         return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
170                 }
171
172                 err = runner.Docker.LoadImage(readCloser)
173                 if err != nil {
174                         return fmt.Errorf("While loading container image into Docker: %v", err)
175                 }
176         } else {
177                 runner.CrunchLog.Print("Docker image is available")
178         }
179
180         runner.ContainerConfig.Image = imageID
181
182         return nil
183 }
184
185 func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) {
186         c = exec.Command("arv-mount", arvMountCmd...)
187
188         // Copy our environment, but override ARVADOS_API_TOKEN with
189         // the container auth token.
190         c.Env = nil
191         for _, s := range os.Environ() {
192                 if !strings.HasPrefix(s, "ARVADOS_API_TOKEN=") {
193                         c.Env = append(c.Env, s)
194                 }
195         }
196         c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
197
198         nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
199         c.Stdout = nt
200         c.Stderr = nt
201
202         err = c.Start()
203         if err != nil {
204                 return nil, err
205         }
206
207         statReadme := make(chan bool)
208         runner.ArvMountExit = make(chan error)
209
210         keepStatting := true
211         go func() {
212                 for keepStatting {
213                         time.Sleep(100 * time.Millisecond)
214                         _, err = os.Stat(fmt.Sprintf("%s/by_id/README", runner.ArvMountPoint))
215                         if err == nil {
216                                 keepStatting = false
217                                 statReadme <- true
218                         }
219                 }
220                 close(statReadme)
221         }()
222
223         go func() {
224                 runner.ArvMountExit <- c.Wait()
225                 close(runner.ArvMountExit)
226         }()
227
228         select {
229         case <-statReadme:
230                 break
231         case err := <-runner.ArvMountExit:
232                 runner.ArvMount = nil
233                 keepStatting = false
234                 return nil, err
235         }
236
237         return c, nil
238 }
239
240 func (runner *ContainerRunner) SetupMounts() (err error) {
241         runner.ArvMountPoint, err = runner.MkTempDir("", "keep")
242         if err != nil {
243                 return fmt.Errorf("While creating keep mount temp dir: %v", err)
244         }
245
246         runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
247
248         pdhOnly := true
249         tmpcount := 0
250         arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
251         collectionPaths := []string{}
252         runner.Binds = nil
253
254         for bind, mnt := range runner.Container.Mounts {
255                 if bind == "stdout" {
256                         // Is it a "file" mount kind?
257                         if mnt.Kind != "file" {
258                                 return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
259                         }
260
261                         // Does path start with OutputPath?
262                         prefix := runner.Container.OutputPath
263                         if !strings.HasSuffix(prefix, "/") {
264                                 prefix += "/"
265                         }
266                         if !strings.HasPrefix(mnt.Path, prefix) {
267                                 return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
268                         }
269                 }
270
271                 switch {
272                 case mnt.Kind == "collection":
273                         var src string
274                         if mnt.UUID != "" && mnt.PortableDataHash != "" {
275                                 return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
276                         }
277                         if mnt.UUID != "" {
278                                 if mnt.Writable {
279                                         return fmt.Errorf("Writing to existing collections currently not permitted.")
280                                 }
281                                 pdhOnly = false
282                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
283                         } else if mnt.PortableDataHash != "" {
284                                 if mnt.Writable {
285                                         return fmt.Errorf("Can never write to a collection specified by portable data hash")
286                                 }
287                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
288                         } else {
289                                 src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
290                                 arvMountCmd = append(arvMountCmd, "--mount-tmp")
291                                 arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
292                                 tmpcount += 1
293                         }
294                         if mnt.Writable {
295                                 if bind == runner.Container.OutputPath {
296                                         runner.HostOutputDir = src
297                                 }
298                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
299                         } else {
300                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
301                         }
302                         collectionPaths = append(collectionPaths, src)
303
304                 case mnt.Kind == "tmp" && bind == runner.Container.OutputPath:
305                         runner.HostOutputDir, err = runner.MkTempDir("", "")
306                         if err != nil {
307                                 return fmt.Errorf("While creating mount temp dir: %v", err)
308                         }
309                         st, staterr := os.Stat(runner.HostOutputDir)
310                         if staterr != nil {
311                                 return fmt.Errorf("While Stat on temp dir: %v", staterr)
312                         }
313                         err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
314                         if staterr != nil {
315                                 return fmt.Errorf("While Chmod temp dir: %v", err)
316                         }
317                         runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
318                         runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
319
320                 case mnt.Kind == "tmp":
321                         runner.Binds = append(runner.Binds, bind)
322
323                 case mnt.Kind == "json":
324                         jsondata, err := json.Marshal(mnt.Content)
325                         if err != nil {
326                                 return fmt.Errorf("encoding json data: %v", err)
327                         }
328                         // Create a tempdir with a single file
329                         // (instead of just a tempfile): this way we
330                         // can ensure the file is world-readable
331                         // inside the container, without having to
332                         // make it world-readable on the docker host.
333                         tmpdir, err := runner.MkTempDir("", "")
334                         if err != nil {
335                                 return fmt.Errorf("creating temp dir: %v", err)
336                         }
337                         runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
338                         tmpfn := filepath.Join(tmpdir, "mountdata.json")
339                         err = ioutil.WriteFile(tmpfn, jsondata, 0644)
340                         if err != nil {
341                                 return fmt.Errorf("writing temp file: %v", err)
342                         }
343                         runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind))
344                 }
345         }
346
347         if runner.HostOutputDir == "" {
348                 return fmt.Errorf("Output path does not correspond to a writable mount point")
349         }
350
351         if pdhOnly {
352                 arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
353         } else {
354                 arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
355         }
356         arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
357
358         token, err := runner.ContainerToken()
359         if err != nil {
360                 return fmt.Errorf("could not get container token: %s", err)
361         }
362
363         runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
364         if err != nil {
365                 return fmt.Errorf("While trying to start arv-mount: %v", err)
366         }
367
368         for _, p := range collectionPaths {
369                 _, err = os.Stat(p)
370                 if err != nil {
371                         return fmt.Errorf("While checking that input files exist: %v", err)
372                 }
373         }
374
375         return nil
376 }
377
378 func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
379         // Handle docker log protocol
380         // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
381
382         header := make([]byte, 8)
383         for {
384                 _, readerr := io.ReadAtLeast(containerReader, header, 8)
385
386                 if readerr == nil {
387                         readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
388                         if header[0] == 1 {
389                                 // stdout
390                                 _, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
391                         } else {
392                                 // stderr
393                                 _, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
394                         }
395                 }
396
397                 if readerr != nil {
398                         if readerr != io.EOF {
399                                 runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
400                         }
401
402                         closeerr := runner.Stdout.Close()
403                         if closeerr != nil {
404                                 runner.CrunchLog.Printf("While closing stdout logs: %v", closeerr)
405                         }
406
407                         closeerr = runner.Stderr.Close()
408                         if closeerr != nil {
409                                 runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
410                         }
411
412                         if runner.statReporter != nil {
413                                 runner.statReporter.Stop()
414                                 closeerr = runner.statLogger.Close()
415                                 if closeerr != nil {
416                                         runner.CrunchLog.Printf("While closing crunchstat logs: %v", closeerr)
417                                 }
418                         }
419
420                         runner.loggingDone <- true
421                         close(runner.loggingDone)
422                         return
423                 }
424         }
425 }
426
427 func (runner *ContainerRunner) StartCrunchstat() {
428         runner.statLogger = NewThrottledLogger(runner.NewLogWriter("crunchstat"))
429         runner.statReporter = &crunchstat.Reporter{
430                 CID:          runner.ContainerID,
431                 Logger:       log.New(runner.statLogger, "", 0),
432                 CgroupParent: runner.expectCgroupParent,
433                 CgroupRoot:   runner.cgroupRoot,
434                 PollPeriod:   runner.statInterval,
435         }
436         runner.statReporter.Start()
437 }
438
439 // AttachLogs connects the docker container stdout and stderr logs to the
440 // Arvados logger which logs to Keep and the API server logs table.
441 func (runner *ContainerRunner) AttachStreams() (err error) {
442
443         runner.CrunchLog.Print("Attaching container streams")
444
445         var containerReader io.Reader
446         containerReader, err = runner.Docker.AttachContainer(runner.ContainerID,
447                 &dockerclient.AttachOptions{Stream: true, Stdout: true, Stderr: true})
448         if err != nil {
449                 return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
450         }
451
452         runner.loggingDone = make(chan bool)
453
454         if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
455                 stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):]
456                 index := strings.LastIndex(stdoutPath, "/")
457                 if index > 0 {
458                         subdirs := stdoutPath[:index]
459                         if subdirs != "" {
460                                 st, err := os.Stat(runner.HostOutputDir)
461                                 if err != nil {
462                                         return fmt.Errorf("While Stat on temp dir: %v", err)
463                                 }
464                                 stdoutPath := path.Join(runner.HostOutputDir, subdirs)
465                                 err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
466                                 if err != nil {
467                                         return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
468                                 }
469                         }
470                 }
471                 stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
472                 if err != nil {
473                         return fmt.Errorf("While creating stdout file: %v", err)
474                 }
475                 runner.Stdout = stdoutFile
476         } else {
477                 runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
478         }
479         runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
480
481         go runner.ProcessDockerAttach(containerReader)
482
483         return nil
484 }
485
486 // CreateContainer creates the docker container.
487 func (runner *ContainerRunner) CreateContainer() error {
488         runner.CrunchLog.Print("Creating Docker container")
489
490         runner.ContainerConfig.Cmd = runner.Container.Command
491         if runner.Container.Cwd != "." {
492                 runner.ContainerConfig.WorkingDir = runner.Container.Cwd
493         }
494
495         for k, v := range runner.Container.Environment {
496                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
497         }
498         if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
499                 tok, err := runner.ContainerToken()
500                 if err != nil {
501                         return err
502                 }
503                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env,
504                         "ARVADOS_API_TOKEN="+tok,
505                         "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
506                         "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
507                 )
508                 runner.ContainerConfig.NetworkDisabled = false
509         } else {
510                 runner.ContainerConfig.NetworkDisabled = true
511         }
512
513         var err error
514         runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
515         if err != nil {
516                 return fmt.Errorf("While creating container: %v", err)
517         }
518
519         runner.HostConfig = dockerclient.HostConfig{
520                 Binds:        runner.Binds,
521                 CgroupParent: runner.setCgroupParent,
522                 LogConfig: dockerclient.LogConfig{
523                         Type: "none",
524                 },
525         }
526
527         return runner.AttachStreams()
528 }
529
530 // StartContainer starts the docker container created by CreateContainer.
531 func (runner *ContainerRunner) StartContainer() error {
532         runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
533         err := runner.Docker.StartContainer(runner.ContainerID, &runner.HostConfig)
534         if err != nil {
535                 return fmt.Errorf("could not start container: %v", err)
536         }
537         return nil
538 }
539
540 // WaitFinish waits for the container to terminate, capture the exit code, and
541 // close the stdout/stderr logging.
542 func (runner *ContainerRunner) WaitFinish() error {
543         runner.CrunchLog.Print("Waiting for container to finish")
544
545         result := runner.Docker.Wait(runner.ContainerID)
546         wr := <-result
547         if wr.Error != nil {
548                 return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
549         }
550         runner.ExitCode = &wr.ExitCode
551
552         // wait for stdout/stderr to complete
553         <-runner.loggingDone
554
555         return nil
556 }
557
558 // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
559 func (runner *ContainerRunner) CaptureOutput() error {
560         if runner.finalState != "Complete" {
561                 return nil
562         }
563
564         if runner.HostOutputDir == "" {
565                 return nil
566         }
567
568         _, err := os.Stat(runner.HostOutputDir)
569         if err != nil {
570                 return fmt.Errorf("While checking host output path: %v", err)
571         }
572
573         var manifestText string
574
575         collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
576         _, err = os.Stat(collectionMetafile)
577         if err != nil {
578                 // Regular directory
579                 cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}}
580                 manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
581                 if err != nil {
582                         return fmt.Errorf("While uploading output files: %v", err)
583                 }
584         } else {
585                 // FUSE mount directory
586                 file, openerr := os.Open(collectionMetafile)
587                 if openerr != nil {
588                         return fmt.Errorf("While opening FUSE metafile: %v", err)
589                 }
590                 defer file.Close()
591
592                 var rec arvados.Collection
593                 err = json.NewDecoder(file).Decode(&rec)
594                 if err != nil {
595                         return fmt.Errorf("While reading FUSE metafile: %v", err)
596                 }
597                 manifestText = rec.ManifestText
598         }
599
600         var response arvados.Collection
601         err = runner.ArvClient.Create("collections",
602                 arvadosclient.Dict{
603                         "collection": arvadosclient.Dict{
604                                 "manifest_text": manifestText}},
605                 &response)
606         if err != nil {
607                 return fmt.Errorf("While creating output collection: %v", err)
608         }
609
610         runner.OutputPDH = new(string)
611         *runner.OutputPDH = response.PortableDataHash
612
613         return nil
614 }
615
616 func (runner *ContainerRunner) CleanupDirs() {
617         if runner.ArvMount != nil {
618                 umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
619                 umnterr := umount.Run()
620                 if umnterr != nil {
621                         runner.CrunchLog.Printf("While running fusermount: %v", umnterr)
622                 }
623
624                 mnterr := <-runner.ArvMountExit
625                 if mnterr != nil {
626                         runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
627                 }
628         }
629
630         for _, tmpdir := range runner.CleanupTempDir {
631                 rmerr := os.RemoveAll(tmpdir)
632                 if rmerr != nil {
633                         runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
634                 }
635         }
636 }
637
638 // CommitLogs posts the collection containing the final container logs.
639 func (runner *ContainerRunner) CommitLogs() error {
640         runner.CrunchLog.Print(runner.finalState)
641         runner.CrunchLog.Close()
642
643         // Closing CrunchLog above allows it to be committed to Keep at this
644         // point, but re-open crunch log with ArvClient in case there are any
645         // other further (such as failing to write the log to Keep!) while
646         // shutting down
647         runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID,
648                 "crunch-run", nil})
649
650         if runner.LogsPDH != nil {
651                 // If we have already assigned something to LogsPDH,
652                 // we must be closing the re-opened log, which won't
653                 // end up getting attached to the container record and
654                 // therefore doesn't need to be saved as a collection
655                 // -- it exists only to send logs to other channels.
656                 return nil
657         }
658
659         mt, err := runner.LogCollection.ManifestText()
660         if err != nil {
661                 return fmt.Errorf("While creating log manifest: %v", err)
662         }
663
664         var response arvados.Collection
665         err = runner.ArvClient.Create("collections",
666                 arvadosclient.Dict{
667                         "collection": arvadosclient.Dict{
668                                 "name":          "logs for " + runner.Container.UUID,
669                                 "manifest_text": mt}},
670                 &response)
671         if err != nil {
672                 return fmt.Errorf("While creating log collection: %v", err)
673         }
674
675         runner.LogsPDH = &response.PortableDataHash
676
677         return nil
678 }
679
680 // UpdateContainerRunning updates the container state to "Running"
681 func (runner *ContainerRunner) UpdateContainerRunning() error {
682         runner.CancelLock.Lock()
683         defer runner.CancelLock.Unlock()
684         if runner.Cancelled {
685                 return ErrCancelled
686         }
687         return runner.ArvClient.Update("containers", runner.Container.UUID,
688                 arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
689 }
690
691 // ContainerToken returns the api_token the container (and any
692 // arv-mount processes) are allowed to use.
693 func (runner *ContainerRunner) ContainerToken() (string, error) {
694         if runner.token != "" {
695                 return runner.token, nil
696         }
697
698         var auth arvados.APIClientAuthorization
699         err := runner.ArvClient.Call("GET", "containers", runner.Container.UUID, "auth", nil, &auth)
700         if err != nil {
701                 return "", err
702         }
703         runner.token = auth.APIToken
704         return runner.token, nil
705 }
706
707 // UpdateContainerComplete updates the container record state on API
708 // server to "Complete" or "Cancelled"
709 func (runner *ContainerRunner) UpdateContainerFinal() error {
710         update := arvadosclient.Dict{}
711         update["state"] = runner.finalState
712         if runner.finalState == "Complete" {
713                 if runner.LogsPDH != nil {
714                         update["log"] = *runner.LogsPDH
715                 }
716                 if runner.ExitCode != nil {
717                         update["exit_code"] = *runner.ExitCode
718                 }
719                 if runner.OutputPDH != nil {
720                         update["output"] = *runner.OutputPDH
721                 }
722         }
723         return runner.ArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
724 }
725
726 // IsCancelled returns the value of Cancelled, with goroutine safety.
727 func (runner *ContainerRunner) IsCancelled() bool {
728         runner.CancelLock.Lock()
729         defer runner.CancelLock.Unlock()
730         return runner.Cancelled
731 }
732
733 // NewArvLogWriter creates an ArvLogWriter
734 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
735         return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")}
736 }
737
738 // Run the full container lifecycle.
739 func (runner *ContainerRunner) Run() (err error) {
740         runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
741
742         hostname, hosterr := os.Hostname()
743         if hosterr != nil {
744                 runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr)
745         } else {
746                 runner.CrunchLog.Printf("Executing on host '%s'", hostname)
747         }
748
749         // Clean up temporary directories _after_ finalizing
750         // everything (if we've made any by then)
751         defer runner.CleanupDirs()
752
753         runner.finalState = "Queued"
754
755         defer func() {
756                 // checkErr prints e (unless it's nil) and sets err to
757                 // e (unless err is already non-nil). Thus, if err
758                 // hasn't already been assigned when Run() returns,
759                 // this cleanup func will cause Run() to return the
760                 // first non-nil error that is passed to checkErr().
761                 checkErr := func(e error) {
762                         if e == nil {
763                                 return
764                         }
765                         runner.CrunchLog.Print(e)
766                         if err == nil {
767                                 err = e
768                         }
769                 }
770
771                 // Log the error encountered in Run(), if any
772                 checkErr(err)
773
774                 if runner.finalState == "Queued" {
775                         runner.UpdateContainerFinal()
776                         return
777                 }
778
779                 if runner.IsCancelled() {
780                         runner.finalState = "Cancelled"
781                         // but don't return yet -- we still want to
782                         // capture partial output and write logs
783                 }
784
785                 checkErr(runner.CaptureOutput())
786                 checkErr(runner.CommitLogs())
787                 checkErr(runner.UpdateContainerFinal())
788
789                 // The real log is already closed, but then we opened
790                 // a new one in case we needed to log anything while
791                 // finalizing.
792                 runner.CrunchLog.Close()
793         }()
794
795         err = runner.ArvClient.Get("containers", runner.Container.UUID, nil, &runner.Container)
796         if err != nil {
797                 err = fmt.Errorf("While getting container record: %v", err)
798                 return
799         }
800
801         // setup signal handling
802         runner.SetupSignals()
803
804         // check for and/or load image
805         err = runner.LoadImage()
806         if err != nil {
807                 err = fmt.Errorf("While loading container image: %v", err)
808                 return
809         }
810
811         // set up FUSE mount and binds
812         err = runner.SetupMounts()
813         if err != nil {
814                 err = fmt.Errorf("While setting up mounts: %v", err)
815                 return
816         }
817
818         err = runner.CreateContainer()
819         if err != nil {
820                 return
821         }
822
823         runner.StartCrunchstat()
824
825         if runner.IsCancelled() {
826                 return
827         }
828
829         err = runner.UpdateContainerRunning()
830         if err != nil {
831                 return
832         }
833         runner.finalState = "Cancelled"
834
835         err = runner.StartContainer()
836         if err != nil {
837                 return
838         }
839
840         err = runner.WaitFinish()
841         if err == nil {
842                 runner.finalState = "Complete"
843         }
844         return
845 }
846
847 // NewContainerRunner creates a new container runner.
848 func NewContainerRunner(api IArvadosClient,
849         kc IKeepClient,
850         docker ThinDockerClient,
851         containerUUID string) *ContainerRunner {
852
853         cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
854         cr.NewLogWriter = cr.NewArvLogWriter
855         cr.RunArvMount = cr.ArvMountCmd
856         cr.MkTempDir = ioutil.TempDir
857         cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
858         cr.Container.UUID = containerUUID
859         cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
860         cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
861         return cr
862 }
863
864 func main() {
865         statInterval := flag.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
866         cgroupRoot := flag.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
867         cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
868         cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
869         flag.Parse()
870
871         containerId := flag.Arg(0)
872
873         api, err := arvadosclient.MakeArvadosClient()
874         if err != nil {
875                 log.Fatalf("%s: %v", containerId, err)
876         }
877         api.Retries = 8
878
879         var kc *keepclient.KeepClient
880         kc, err = keepclient.MakeKeepClient(&api)
881         if err != nil {
882                 log.Fatalf("%s: %v", containerId, err)
883         }
884         kc.Retries = 4
885
886         var docker *dockerclient.DockerClient
887         docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
888         if err != nil {
889                 log.Fatalf("%s: %v", containerId, err)
890         }
891
892         cr := NewContainerRunner(api, kc, docker, containerId)
893         cr.statInterval = *statInterval
894         cr.cgroupRoot = *cgroupRoot
895         cr.expectCgroupParent = *cgroupParent
896         if *cgroupParentSubsystem != "" {
897                 p := findCgroup(*cgroupParentSubsystem)
898                 cr.setCgroupParent = p
899                 cr.expectCgroupParent = p
900         }
901
902         err = cr.Run()
903         if err != nil {
904                 log.Fatalf("%s: %v", containerId, err)
905         }
906
907 }