8015: Expand arv-mount command line tests
[arvados.git] / services / crunch-run / crunchrun.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/keepclient"
10         "git.curoverse.com/arvados.git/sdk/go/manifest"
11         "github.com/curoverse/dockerclient"
12         "io"
13         "io/ioutil"
14         "log"
15         "os"
16         "os/exec"
17         "os/signal"
18         "strings"
19         "sync"
20         "syscall"
21         "time"
22 )
23
24 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
25 type IArvadosClient interface {
26         Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
27         Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
28         Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
29 }
30
31 // ErrCancelled is the error returned when the container is cancelled.
32 var ErrCancelled = errors.New("Cancelled")
33
34 // IKeepClient is the minimal Keep API methods used by crunch-run.
35 type IKeepClient interface {
36         PutHB(hash string, buf []byte) (string, int, error)
37         ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
38 }
39
40 // Mount describes the mount points to create inside the container.
41 type Mount struct {
42         Kind             string `json:"kind"`
43         Writable         bool   `json:"writable"`
44         PortableDataHash string `json:"portable_data_hash"`
45         UUID             string `json:"uuid"`
46         DeviceType       string `json:"device_type"`
47 }
48
49 // Collection record returned by the API server.
50 type CollectionRecord struct {
51         ManifestText     string `json:"manifest_text"`
52         PortableDataHash string `json:"portable_data_hash"`
53 }
54
55 // ContainerRecord is the container record returned by the API server.
56 type ContainerRecord struct {
57         UUID               string                 `json:"uuid"`
58         Command            []string               `json:"command"`
59         ContainerImage     string                 `json:"container_image"`
60         Cwd                string                 `json:"cwd"`
61         Environment        map[string]string      `json:"environment"`
62         Mounts             map[string]Mount       `json:"mounts"`
63         OutputPath         string                 `json:"output_path"`
64         Priority           int                    `json:"priority"`
65         RuntimeConstraints map[string]interface{} `json:"runtime_constraints"`
66         State              string                 `json:"state"`
67         Output             string                 `json:"output"`
68 }
69
70 // NewLogWriter is a factory function to create a new log writer.
71 type NewLogWriter func(name string) io.WriteCloser
72
73 type RunArvMount func([]string) (*exec.Cmd, error)
74
75 type MkTempDir func(string, string) (string, error)
76
77 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
78 type ThinDockerClient interface {
79         StopContainer(id string, timeout int) error
80         InspectImage(id string) (*dockerclient.ImageInfo, error)
81         LoadImage(reader io.Reader) error
82         CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
83         StartContainer(id string, config *dockerclient.HostConfig) error
84         ContainerLogs(id string, options *dockerclient.LogOptions) (io.ReadCloser, error)
85         Wait(id string) <-chan dockerclient.WaitResult
86         RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
87 }
88
89 // ContainerRunner is the main stateful struct used for a single execution of a
90 // container.
91 type ContainerRunner struct {
92         Docker    ThinDockerClient
93         ArvClient IArvadosClient
94         Kc        IKeepClient
95         ContainerRecord
96         dockerclient.ContainerConfig
97         ContainerID string
98         ExitCode    *int
99         NewLogWriter
100         loggingDone   chan bool
101         CrunchLog     *ThrottledLogger
102         Stdout        *ThrottledLogger
103         Stderr        *ThrottledLogger
104         LogCollection *CollectionWriter
105         LogsPDH       *string
106         RunArvMount
107         MkTempDir
108         ArvMount      *exec.Cmd
109         ArvMountPoint string
110         HostOutputDir string
111         Binds         []string
112         OutputPDH     *string
113         CancelLock    sync.Mutex
114         Cancelled     bool
115         SigChan       chan os.Signal
116         ArvMountExit  chan error
117         finalState    string
118 }
119
120 // SetupSignals sets up signal handling to gracefully terminate the underlying
121 // Docker container and update state when receiving a TERM, INT or QUIT signal.
122 func (runner *ContainerRunner) SetupSignals() error {
123         runner.SigChan = make(chan os.Signal, 1)
124         signal.Notify(runner.SigChan, syscall.SIGTERM)
125         signal.Notify(runner.SigChan, syscall.SIGINT)
126         signal.Notify(runner.SigChan, syscall.SIGQUIT)
127
128         go func(sig <-chan os.Signal) {
129                 for _ = range sig {
130                         if !runner.Cancelled {
131                                 runner.CancelLock.Lock()
132                                 runner.Cancelled = true
133                                 if runner.ContainerID != "" {
134                                         runner.Docker.StopContainer(runner.ContainerID, 10)
135                                 }
136                                 runner.CancelLock.Unlock()
137                         }
138                 }
139         }(runner.SigChan)
140
141         return nil
142 }
143
144 // LoadImage determines the docker image id from the container record and
145 // checks if it is available in the local Docker image store.  If not, it loads
146 // the image from Keep.
147 func (runner *ContainerRunner) LoadImage() (err error) {
148
149         runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.ContainerRecord.ContainerImage)
150
151         var collection CollectionRecord
152         err = runner.ArvClient.Get("collections", runner.ContainerRecord.ContainerImage, nil, &collection)
153         if err != nil {
154                 return fmt.Errorf("While getting container image collection: %v", err)
155         }
156         manifest := manifest.Manifest{Text: collection.ManifestText}
157         var img, imageID string
158         for ms := range manifest.StreamIter() {
159                 img = ms.FileStreamSegments[0].Name
160                 if !strings.HasSuffix(img, ".tar") {
161                         return fmt.Errorf("First file in the container image collection does not end in .tar")
162                 }
163                 imageID = img[:len(img)-4]
164         }
165
166         runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
167
168         _, err = runner.Docker.InspectImage(imageID)
169         if err != nil {
170                 runner.CrunchLog.Print("Loading Docker image from keep")
171
172                 var readCloser io.ReadCloser
173                 readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
174                 if err != nil {
175                         return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
176                 }
177
178                 err = runner.Docker.LoadImage(readCloser)
179                 if err != nil {
180                         return fmt.Errorf("While loading container image into Docker: %v", err)
181                 }
182         } else {
183                 runner.CrunchLog.Print("Docker image is available")
184         }
185
186         runner.ContainerConfig.Image = imageID
187
188         return nil
189 }
190
191 func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string) (c *exec.Cmd, err error) {
192         c = exec.Command("arv-mount", arvMountCmd...)
193         nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
194         c.Stdout = nt
195         c.Stderr = nt
196
197         err = c.Start()
198         if err != nil {
199                 return nil, err
200         }
201
202         statReadme := make(chan bool)
203         runner.ArvMountExit = make(chan error)
204
205         keepStatting := true
206         go func() {
207                 for keepStatting {
208                         time.Sleep(100 * time.Millisecond)
209                         _, err = os.Stat(fmt.Sprintf("%s/by_id/README", runner.ArvMountPoint))
210                         if err == nil {
211                                 keepStatting = false
212                                 statReadme <- true
213                         }
214                 }
215                 close(statReadme)
216         }()
217
218         go func() {
219                 runner.ArvMountExit <- c.Wait()
220                 close(runner.ArvMountExit)
221         }()
222
223         select {
224         case <-statReadme:
225                 break
226         case err := <-runner.ArvMountExit:
227                 runner.ArvMount = nil
228                 keepStatting = false
229                 return nil, err
230         }
231
232         return c, nil
233 }
234
235 func (runner *ContainerRunner) SetupMounts() (err error) {
236         runner.ArvMountPoint, err = runner.MkTempDir("", "keep")
237         if err != nil {
238                 return fmt.Errorf("While creating keep mount temp dir: %v", err)
239         }
240
241         pdhOnly := true
242         tmpcount := 0
243         arvMountCmd := []string{"--foreground"}
244         collectionPaths := []string{}
245         runner.Binds = nil
246
247         for bind, mnt := range runner.ContainerRecord.Mounts {
248                 if mnt.Kind == "collection" {
249                         var src string
250                         if mnt.UUID != "" && mnt.PortableDataHash != "" {
251                                 return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
252                         }
253                         if mnt.UUID != "" {
254                                 if mnt.Writable {
255                                         return fmt.Errorf("Writing to existing collections currently not permitted.")
256                                 }
257                                 pdhOnly = false
258                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
259                         } else if mnt.PortableDataHash != "" {
260                                 if mnt.Writable {
261                                         return fmt.Errorf("Can never write to a collection specified by portable data hash")
262                                 }
263                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
264                         } else {
265                                 src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
266                                 arvMountCmd = append(arvMountCmd, "--mount-tmp")
267                                 arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
268                                 tmpcount += 1
269                         }
270                         if mnt.Writable {
271                                 if bind == runner.ContainerRecord.OutputPath {
272                                         runner.HostOutputDir = src
273                                 }
274                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
275                         } else {
276                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
277                         }
278                         collectionPaths = append(collectionPaths, src)
279                 } else if mnt.Kind == "tmp" {
280                         if bind == runner.ContainerRecord.OutputPath {
281                                 runner.HostOutputDir, err = runner.MkTempDir("", "")
282                                 if err != nil {
283                                         return fmt.Errorf("While creating mount temp dir: %v", err)
284                                 }
285
286                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
287                         } else {
288                                 runner.Binds = append(runner.Binds, bind)
289                         }
290                 } else {
291                         return fmt.Errorf("Unknown mount kind '%s'", mnt.Kind)
292                 }
293         }
294
295         if runner.HostOutputDir == "" {
296                 return fmt.Errorf("Output path does not correspond to a writable mount point")
297         }
298
299         if pdhOnly {
300                 arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
301         } else {
302                 arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
303         }
304         arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
305
306         runner.ArvMount, err = runner.RunArvMount(arvMountCmd)
307         if err != nil {
308                 return fmt.Errorf("While trying to start arv-mount: %v", err)
309         }
310
311         for _, p := range collectionPaths {
312                 _, err = os.Stat(p)
313                 if err != nil {
314                         return fmt.Errorf("While checking that input files exist: %v", err)
315                 }
316         }
317
318         return nil
319 }
320
321 // StartContainer creates the container and runs it.
322 func (runner *ContainerRunner) StartContainer() (err error) {
323         runner.CrunchLog.Print("Creating Docker container")
324
325         runner.CancelLock.Lock()
326         defer runner.CancelLock.Unlock()
327
328         if runner.Cancelled {
329                 return ErrCancelled
330         }
331
332         runner.ContainerConfig.Cmd = runner.ContainerRecord.Command
333         if runner.ContainerRecord.Cwd != "." {
334                 runner.ContainerConfig.WorkingDir = runner.ContainerRecord.Cwd
335         }
336         for k, v := range runner.ContainerRecord.Environment {
337                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
338         }
339         runner.ContainerConfig.NetworkDisabled = true
340         runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
341         if err != nil {
342                 return fmt.Errorf("While creating container: %v", err)
343         }
344         hostConfig := &dockerclient.HostConfig{Binds: runner.Binds}
345
346         runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
347         err = runner.Docker.StartContainer(runner.ContainerID, hostConfig)
348         if err != nil {
349                 return fmt.Errorf("While starting container: %v", err)
350         }
351
352         return nil
353 }
354
355 // AttachLogs connects the docker container stdout and stderr logs to the
356 // Arvados logger which logs to Keep and the API server logs table.
357 func (runner *ContainerRunner) AttachLogs() (err error) {
358
359         runner.CrunchLog.Print("Attaching container logs")
360
361         var stderrReader, stdoutReader io.Reader
362         stderrReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stderr: true})
363         if err != nil {
364                 return fmt.Errorf("While getting container standard error: %v", err)
365         }
366         stdoutReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stdout: true})
367         if err != nil {
368                 return fmt.Errorf("While getting container standard output: %v", err)
369         }
370
371         runner.loggingDone = make(chan bool)
372
373         runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
374         runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
375         go ReadWriteLines(stdoutReader, runner.Stdout, runner.loggingDone)
376         go ReadWriteLines(stderrReader, runner.Stderr, runner.loggingDone)
377
378         return nil
379 }
380
381 // WaitFinish waits for the container to terminate, capture the exit code, and
382 // close the stdout/stderr logging.
383 func (runner *ContainerRunner) WaitFinish() error {
384         result := runner.Docker.Wait(runner.ContainerID)
385         wr := <-result
386         if wr.Error != nil {
387                 return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
388         }
389         runner.ExitCode = &wr.ExitCode
390
391         // drain stdout/stderr
392         <-runner.loggingDone
393         <-runner.loggingDone
394
395         runner.Stdout.Close()
396         runner.Stderr.Close()
397
398         return nil
399 }
400
401 // HandleOutput sets the output and unmounts the FUSE mount.
402 func (runner *ContainerRunner) CaptureOutput() error {
403         if runner.ArvMount != nil {
404                 defer func() {
405                         umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
406                         umnterr := umount.Run()
407                         if umnterr != nil {
408                                 runner.CrunchLog.Print("While running fusermount: %v", umnterr)
409                         }
410
411                         mnterr := <-runner.ArvMountExit
412                         if mnterr != nil {
413                                 runner.CrunchLog.Print("Arv-mount exit error: %v", mnterr)
414                         }
415                 }()
416         }
417
418         if runner.finalState != "Complete" {
419                 return nil
420         }
421
422         if runner.HostOutputDir == "" {
423                 return nil
424         }
425
426         _, err := os.Stat(runner.HostOutputDir)
427         if err != nil {
428                 return fmt.Errorf("While checking host output path: %v", err)
429         }
430
431         var manifestText string
432
433         collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
434         _, err = os.Stat(collectionMetafile)
435         if err != nil {
436                 // Regular directory
437                 cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}}
438                 manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
439                 if err != nil {
440                         return fmt.Errorf("While uploading output files: %v", err)
441                 }
442         } else {
443                 // FUSE mount directory
444                 file, openerr := os.Open(collectionMetafile)
445                 if openerr != nil {
446                         return fmt.Errorf("While opening FUSE metafile: %v", err)
447                 }
448                 defer file.Close()
449
450                 rec := CollectionRecord{}
451                 err = json.NewDecoder(file).Decode(&rec)
452                 if err != nil {
453                         return fmt.Errorf("While reading FUSE metafile: %v", err)
454                 }
455                 manifestText = rec.ManifestText
456         }
457
458         var response CollectionRecord
459         err = runner.ArvClient.Create("collections",
460                 arvadosclient.Dict{
461                         "collection": arvadosclient.Dict{
462                                 "manifest_text": manifestText}},
463                 &response)
464         if err != nil {
465                 return fmt.Errorf("While creating output collection: %v", err)
466         }
467
468         runner.OutputPDH = new(string)
469         *runner.OutputPDH = response.PortableDataHash
470
471         return nil
472 }
473
474 // CommitLogs posts the collection containing the final container logs.
475 func (runner *ContainerRunner) CommitLogs() error {
476         runner.CrunchLog.Print(runner.finalState)
477         runner.CrunchLog.Close()
478
479         // Closing CrunchLog above allows it to be committed to Keep at this
480         // point, but re-open crunch log with ArvClient in case there are any
481         // other further (such as failing to write the log to Keep!) while
482         // shutting down
483         runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID,
484                 "crunch-run", nil})
485
486         mt, err := runner.LogCollection.ManifestText()
487         if err != nil {
488                 return fmt.Errorf("While creating log manifest: %v", err)
489         }
490
491         var response CollectionRecord
492         err = runner.ArvClient.Create("collections",
493                 arvadosclient.Dict{
494                         "collection": arvadosclient.Dict{
495                                 "name":          "logs for " + runner.ContainerRecord.UUID,
496                                 "manifest_text": mt}},
497                 &response)
498         if err != nil {
499                 return fmt.Errorf("While creating log collection: %v", err)
500         }
501
502         runner.LogsPDH = new(string)
503         *runner.LogsPDH = response.PortableDataHash
504
505         return nil
506 }
507
508 // UpdateContainerRecordRunning updates the container state to "Running"
509 func (runner *ContainerRunner) UpdateContainerRecordRunning() error {
510         return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID,
511                 arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
512 }
513
514 // UpdateContainerRecordComplete updates the container record state on API
515 // server to "Complete" or "Cancelled"
516 func (runner *ContainerRunner) UpdateContainerRecordComplete() error {
517         update := arvadosclient.Dict{}
518         if runner.LogsPDH != nil {
519                 update["log"] = *runner.LogsPDH
520         }
521         if runner.ExitCode != nil {
522                 update["exit_code"] = *runner.ExitCode
523         }
524         if runner.OutputPDH != nil {
525                 update["output"] = runner.OutputPDH
526         }
527
528         update["state"] = runner.finalState
529
530         return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, arvadosclient.Dict{"container": update}, nil)
531 }
532
533 // NewArvLogWriter creates an ArvLogWriter
534 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
535         return &ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID, name, runner.LogCollection.Open(name + ".txt")}
536 }
537
538 // Run the full container lifecycle.
539 func (runner *ContainerRunner) Run() (err error) {
540         runner.CrunchLog.Printf("Executing container '%s'", runner.ContainerRecord.UUID)
541
542         var runerr, waiterr error
543
544         defer func() {
545                 if err != nil {
546                         runner.CrunchLog.Print(err)
547                 }
548
549                 if runner.Cancelled {
550                         runner.finalState = "Cancelled"
551                 } else {
552                         runner.finalState = "Complete"
553                 }
554
555                 // (7) capture output
556                 outputerr := runner.CaptureOutput()
557                 if outputerr != nil {
558                         runner.CrunchLog.Print(outputerr)
559                 }
560
561                 // (8) write logs
562                 logerr := runner.CommitLogs()
563                 if logerr != nil {
564                         runner.CrunchLog.Print(logerr)
565                 }
566
567                 // (9) update container record with results
568                 updateerr := runner.UpdateContainerRecordComplete()
569                 if updateerr != nil {
570                         runner.CrunchLog.Print(updateerr)
571                 }
572
573                 runner.CrunchLog.Close()
574
575                 if err == nil {
576                         if runerr != nil {
577                                 err = runerr
578                         } else if waiterr != nil {
579                                 err = runerr
580                         } else if logerr != nil {
581                                 err = logerr
582                         } else if updateerr != nil {
583                                 err = updateerr
584                         }
585                 }
586         }()
587
588         err = runner.ArvClient.Get("containers", runner.ContainerRecord.UUID, nil, &runner.ContainerRecord)
589         if err != nil {
590                 return fmt.Errorf("While getting container record: %v", err)
591         }
592
593         // (1) setup signal handling
594         err = runner.SetupSignals()
595         if err != nil {
596                 return fmt.Errorf("While setting up signal handling: %v", err)
597         }
598
599         // (2) check for and/or load image
600         err = runner.LoadImage()
601         if err != nil {
602                 return fmt.Errorf("While loading container image: %v", err)
603         }
604
605         // (3) set up FUSE mount and binds
606         err = runner.SetupMounts()
607         if err != nil {
608                 return fmt.Errorf("While setting up mounts: %v", err)
609         }
610
611         // (3) create and start container
612         err = runner.StartContainer()
613         if err != nil {
614                 if err == ErrCancelled {
615                         err = nil
616                 }
617                 return
618         }
619
620         // (4) update container record state
621         err = runner.UpdateContainerRecordRunning()
622         if err != nil {
623                 runner.CrunchLog.Print(err)
624         }
625
626         // (5) attach container logs
627         runerr = runner.AttachLogs()
628         if runerr != nil {
629                 runner.CrunchLog.Print(runerr)
630         }
631
632         // (6) wait for container to finish
633         waiterr = runner.WaitFinish()
634
635         return
636 }
637
638 // NewContainerRunner creates a new container runner.
639 func NewContainerRunner(api IArvadosClient,
640         kc IKeepClient,
641         docker ThinDockerClient,
642         containerUUID string) *ContainerRunner {
643
644         cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
645         cr.NewLogWriter = cr.NewArvLogWriter
646         cr.RunArvMount = cr.ArvMountCmd
647         cr.MkTempDir = ioutil.TempDir
648         cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
649         cr.ContainerRecord.UUID = containerUUID
650         cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
651         return cr
652 }
653
654 func main() {
655         flag.Parse()
656
657         api, err := arvadosclient.MakeArvadosClient()
658         if err != nil {
659                 log.Fatal(err)
660         }
661         api.Retries = 8
662
663         var kc *keepclient.KeepClient
664         kc, err = keepclient.MakeKeepClient(&api)
665         if err != nil {
666                 log.Fatal(err)
667         }
668         kc.Retries = 4
669
670         var docker *dockerclient.DockerClient
671         docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
672         if err != nil {
673                 log.Fatal(err)
674         }
675
676         cr := NewContainerRunner(api, kc, docker, flag.Arg(0))
677
678         err = cr.Run()
679         if err != nil {
680                 log.Fatal(err)
681         }
682
683 }