8015: Add arv-mount command line test
[arvados.git] / services / crunch-run / crunchrun.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/keepclient"
10         "git.curoverse.com/arvados.git/sdk/go/manifest"
11         "github.com/curoverse/dockerclient"
12         "io"
13         "io/ioutil"
14         "log"
15         "os"
16         "os/exec"
17         "os/signal"
18         "strings"
19         "sync"
20         "syscall"
21         "time"
22 )
23
24 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
25 type IArvadosClient interface {
26         Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
27         Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
28         Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
29 }
30
31 // ErrCancelled is the error returned when the container is cancelled.
32 var ErrCancelled = errors.New("Cancelled")
33
34 // IKeepClient is the minimal Keep API methods used by crunch-run.
35 type IKeepClient interface {
36         PutHB(hash string, buf []byte) (string, int, error)
37         ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
38 }
39
40 // Mount describes the mount points to create inside the container.
41 type Mount struct {
42         Kind             string `json:"kind"`
43         Writable         bool   `json:"writable"`
44         PortableDataHash string `json:"portable_data_hash"`
45         UUID             string `json:"uuid"`
46         DeviceType       string `json:"device_type"`
47 }
48
49 // Collection record returned by the API server.
50 type CollectionRecord struct {
51         ManifestText     string `json:"manifest_text"`
52         PortableDataHash string `json:"portable_data_hash"`
53 }
54
55 // ContainerRecord is the container record returned by the API server.
56 type ContainerRecord struct {
57         UUID               string                 `json:"uuid"`
58         Command            []string               `json:"command"`
59         ContainerImage     string                 `json:"container_image"`
60         Cwd                string                 `json:"cwd"`
61         Environment        map[string]string      `json:"environment"`
62         Mounts             map[string]Mount       `json:"mounts"`
63         OutputPath         string                 `json:"output_path"`
64         Priority           int                    `json:"priority"`
65         RuntimeConstraints map[string]interface{} `json:"runtime_constraints"`
66         State              string                 `json:"state"`
67         Output             string                 `json:"output"`
68 }
69
70 // NewLogWriter is a factory function to create a new log writer.
71 type NewLogWriter func(name string) io.WriteCloser
72
73 type RunArvMount func([]string) (*exec.Cmd, error)
74
75 type MkTempDir func(string, string) (string, error)
76
77 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
78 type ThinDockerClient interface {
79         StopContainer(id string, timeout int) error
80         InspectImage(id string) (*dockerclient.ImageInfo, error)
81         LoadImage(reader io.Reader) error
82         CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
83         StartContainer(id string, config *dockerclient.HostConfig) error
84         ContainerLogs(id string, options *dockerclient.LogOptions) (io.ReadCloser, error)
85         Wait(id string) <-chan dockerclient.WaitResult
86         RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
87 }
88
89 // ContainerRunner is the main stateful struct used for a single execution of a
90 // container.
91 type ContainerRunner struct {
92         Docker    ThinDockerClient
93         ArvClient IArvadosClient
94         Kc        IKeepClient
95         ContainerRecord
96         dockerclient.ContainerConfig
97         ContainerID string
98         ExitCode    *int
99         NewLogWriter
100         loggingDone   chan bool
101         CrunchLog     *ThrottledLogger
102         Stdout        *ThrottledLogger
103         Stderr        *ThrottledLogger
104         LogCollection *CollectionWriter
105         LogsPDH       *string
106         RunArvMount
107         MkTempDir
108         ArvMount      *exec.Cmd
109         ArvMountPoint string
110         HostOutputDir string
111         Binds         []string
112         OutputPDH     *string
113         CancelLock    sync.Mutex
114         Cancelled     bool
115         SigChan       chan os.Signal
116         ArvMountExit  chan error
117         finalState    string
118 }
119
120 // SetupSignals sets up signal handling to gracefully terminate the underlying
121 // Docker container and update state when receiving a TERM, INT or QUIT signal.
122 func (runner *ContainerRunner) SetupSignals() error {
123         runner.SigChan = make(chan os.Signal, 1)
124         signal.Notify(runner.SigChan, syscall.SIGTERM)
125         signal.Notify(runner.SigChan, syscall.SIGINT)
126         signal.Notify(runner.SigChan, syscall.SIGQUIT)
127
128         go func(sig <-chan os.Signal) {
129                 for _ = range sig {
130                         if !runner.Cancelled {
131                                 runner.CancelLock.Lock()
132                                 runner.Cancelled = true
133                                 if runner.ContainerID != "" {
134                                         runner.Docker.StopContainer(runner.ContainerID, 10)
135                                 }
136                                 runner.CancelLock.Unlock()
137                         }
138                 }
139         }(runner.SigChan)
140
141         return nil
142 }
143
144 // LoadImage determines the docker image id from the container record and
145 // checks if it is available in the local Docker image store.  If not, it loads
146 // the image from Keep.
147 func (runner *ContainerRunner) LoadImage() (err error) {
148
149         runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.ContainerRecord.ContainerImage)
150
151         var collection CollectionRecord
152         err = runner.ArvClient.Get("collections", runner.ContainerRecord.ContainerImage, nil, &collection)
153         if err != nil {
154                 return fmt.Errorf("While getting container image collection: %v", err)
155         }
156         manifest := manifest.Manifest{Text: collection.ManifestText}
157         var img, imageID string
158         for ms := range manifest.StreamIter() {
159                 img = ms.FileStreamSegments[0].Name
160                 if !strings.HasSuffix(img, ".tar") {
161                         return fmt.Errorf("First file in the container image collection does not end in .tar")
162                 }
163                 imageID = img[:len(img)-4]
164         }
165
166         runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
167
168         _, err = runner.Docker.InspectImage(imageID)
169         if err != nil {
170                 runner.CrunchLog.Print("Loading Docker image from keep")
171
172                 var readCloser io.ReadCloser
173                 readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
174                 if err != nil {
175                         return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
176                 }
177
178                 err = runner.Docker.LoadImage(readCloser)
179                 if err != nil {
180                         return fmt.Errorf("While loading container image into Docker: %v", err)
181                 }
182         } else {
183                 runner.CrunchLog.Print("Docker image is available")
184         }
185
186         runner.ContainerConfig.Image = imageID
187
188         return nil
189 }
190
191 func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string) (c *exec.Cmd, err error) {
192         c = exec.Command("arv-mount", arvMountCmd...)
193         nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
194         c.Stdout = nt
195         c.Stderr = nt
196
197         err = c.Start()
198         if err != nil {
199                 return nil, err
200         }
201
202         statReadme := make(chan bool)
203         runner.ArvMountExit = make(chan error)
204
205         keepStatting := true
206         go func() {
207                 for keepStatting {
208                         time.Sleep(100 * time.Millisecond)
209                         _, err = os.Stat(fmt.Sprintf("%s/by_id/README", runner.ArvMountPoint))
210                         if err == nil {
211                                 keepStatting = false
212                                 statReadme <- true
213                         }
214                 }
215                 close(statReadme)
216         }()
217
218         go func() {
219                 runner.ArvMountExit <- c.Wait()
220                 close(runner.ArvMountExit)
221         }()
222
223         select {
224         case <-statReadme:
225                 break
226         case err := <-runner.ArvMountExit:
227                 runner.ArvMount = nil
228                 keepStatting = false
229                 return nil, err
230         }
231
232         return c, nil
233 }
234
235 func (runner *ContainerRunner) SetupMounts() (err error) {
236         runner.ArvMountPoint, err = runner.MkTempDir("", "keep")
237         if err != nil {
238                 return fmt.Errorf("While creating keep mount temp dir: %v", err)
239         }
240
241         pdhOnly := true
242         tmpcount := 0
243         arvMountCmd := []string{"--foreground"}
244         collectionPaths := []string{}
245
246         for bind, mnt := range runner.ContainerRecord.Mounts {
247                 if mnt.Kind == "collection" {
248                         var src string
249                         if mnt.UUID != "" && mnt.PortableDataHash != "" {
250                                 return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
251                         }
252                         if mnt.UUID != "" {
253                                 if mnt.Writable {
254                                         return fmt.Errorf("Writing to existing collections currently not permitted.")
255                                 }
256                                 pdhOnly = false
257                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
258                         } else if mnt.PortableDataHash != "" {
259                                 if mnt.Writable {
260                                         return fmt.Errorf("Can never write to a collection specified by portable data hash")
261                                 }
262                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
263                         } else {
264                                 src = fmt.Sprintf("%s/tmp%i", runner.ArvMountPoint, tmpcount)
265                                 arvMountCmd = append(arvMountCmd, "--mount-tmp")
266                                 arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%i", tmpcount))
267                                 tmpcount += 1
268                         }
269                         if mnt.Writable {
270                                 if bind == runner.ContainerRecord.OutputPath {
271                                         runner.HostOutputDir = src
272                                 }
273                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
274                         } else {
275                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
276                         }
277                         collectionPaths = append(collectionPaths, src)
278                 } else if mnt.Kind == "tmp" {
279                         if bind == runner.ContainerRecord.OutputPath {
280                                 runner.HostOutputDir, err = runner.MkTempDir("", "")
281                                 if err != nil {
282                                         return fmt.Errorf("While creating mount temp dir: %v", err)
283                                 }
284
285                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
286                         } else {
287                                 runner.Binds = append(runner.Binds, bind)
288                         }
289                 } else {
290                         return fmt.Errorf("Unknown mount kind '%s'", mnt.Kind)
291                 }
292         }
293
294         if runner.HostOutputDir == "" {
295                 return fmt.Errorf("Output path does not correspond to a writable mount point")
296         }
297
298         if pdhOnly {
299                 arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
300         } else {
301                 arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
302         }
303         arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
304
305         runner.ArvMount, err = runner.RunArvMount(arvMountCmd)
306         if err != nil {
307                 return fmt.Errorf("While trying to start arv-mount: %v", err)
308         }
309
310         for _, p := range collectionPaths {
311                 _, err = os.Stat(p)
312                 if err != nil {
313                         return fmt.Errorf("While checking that input files exist: %v", err)
314                 }
315         }
316
317         return nil
318 }
319
320 // StartContainer creates the container and runs it.
321 func (runner *ContainerRunner) StartContainer() (err error) {
322         runner.CrunchLog.Print("Creating Docker container")
323
324         runner.CancelLock.Lock()
325         defer runner.CancelLock.Unlock()
326
327         if runner.Cancelled {
328                 return ErrCancelled
329         }
330
331         runner.ContainerConfig.Cmd = runner.ContainerRecord.Command
332         if runner.ContainerRecord.Cwd != "." {
333                 runner.ContainerConfig.WorkingDir = runner.ContainerRecord.Cwd
334         }
335         for k, v := range runner.ContainerRecord.Environment {
336                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
337         }
338         runner.ContainerConfig.NetworkDisabled = true
339         runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
340         if err != nil {
341                 return fmt.Errorf("While creating container: %v", err)
342         }
343         hostConfig := &dockerclient.HostConfig{Binds: runner.Binds}
344
345         runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
346         err = runner.Docker.StartContainer(runner.ContainerID, hostConfig)
347         if err != nil {
348                 return fmt.Errorf("While starting container: %v", err)
349         }
350
351         return nil
352 }
353
354 // AttachLogs connects the docker container stdout and stderr logs to the
355 // Arvados logger which logs to Keep and the API server logs table.
356 func (runner *ContainerRunner) AttachLogs() (err error) {
357
358         runner.CrunchLog.Print("Attaching container logs")
359
360         var stderrReader, stdoutReader io.Reader
361         stderrReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stderr: true})
362         if err != nil {
363                 return fmt.Errorf("While getting container standard error: %v", err)
364         }
365         stdoutReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stdout: true})
366         if err != nil {
367                 return fmt.Errorf("While getting container standard output: %v", err)
368         }
369
370         runner.loggingDone = make(chan bool)
371
372         runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
373         runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
374         go ReadWriteLines(stdoutReader, runner.Stdout, runner.loggingDone)
375         go ReadWriteLines(stderrReader, runner.Stderr, runner.loggingDone)
376
377         return nil
378 }
379
380 // WaitFinish waits for the container to terminate, capture the exit code, and
381 // close the stdout/stderr logging.
382 func (runner *ContainerRunner) WaitFinish() error {
383         result := runner.Docker.Wait(runner.ContainerID)
384         wr := <-result
385         if wr.Error != nil {
386                 return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
387         }
388         runner.ExitCode = &wr.ExitCode
389
390         // drain stdout/stderr
391         <-runner.loggingDone
392         <-runner.loggingDone
393
394         runner.Stdout.Close()
395         runner.Stderr.Close()
396
397         return nil
398 }
399
400 // HandleOutput sets the output and unmounts the FUSE mount.
401 func (runner *ContainerRunner) CaptureOutput() error {
402         if runner.ArvMount != nil {
403                 defer func() {
404                         umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
405                         umnterr := umount.Run()
406                         if umnterr != nil {
407                                 runner.CrunchLog.Print("While running fusermount: %v", umnterr)
408                         }
409
410                         mnterr := <-runner.ArvMountExit
411                         if mnterr != nil {
412                                 runner.CrunchLog.Print("Arv-mount exit error: %v", mnterr)
413                         }
414                 }()
415         }
416
417         if runner.finalState != "Complete" {
418                 return nil
419         }
420
421         if runner.HostOutputDir == "" {
422                 return nil
423         }
424
425         _, err := os.Stat(runner.HostOutputDir)
426         if err != nil {
427                 return fmt.Errorf("While checking host output path: %v", err)
428         }
429
430         var manifestText string
431
432         collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
433         _, err = os.Stat(collectionMetafile)
434         if err != nil {
435                 // Regular directory
436                 cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}}
437                 manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
438                 if err != nil {
439                         return fmt.Errorf("While uploading output files: %v", err)
440                 }
441         } else {
442                 // FUSE mount directory
443                 file, openerr := os.Open(collectionMetafile)
444                 if openerr != nil {
445                         return fmt.Errorf("While opening FUSE metafile: %v", err)
446                 }
447                 defer file.Close()
448
449                 rec := CollectionRecord{}
450                 err = json.NewDecoder(file).Decode(&rec)
451                 if err != nil {
452                         return fmt.Errorf("While reading FUSE metafile: %v", err)
453                 }
454                 manifestText = rec.ManifestText
455         }
456
457         var response CollectionRecord
458         err = runner.ArvClient.Create("collections",
459                 arvadosclient.Dict{
460                         "collection": arvadosclient.Dict{
461                                 "manifest_text": manifestText}},
462                 &response)
463         if err != nil {
464                 return fmt.Errorf("While creating output collection: %v", err)
465         }
466
467         runner.OutputPDH = new(string)
468         *runner.OutputPDH = response.PortableDataHash
469
470         return nil
471 }
472
473 // CommitLogs posts the collection containing the final container logs.
474 func (runner *ContainerRunner) CommitLogs() error {
475         runner.CrunchLog.Print(runner.finalState)
476         runner.CrunchLog.Close()
477
478         // Closing CrunchLog above allows it to be committed to Keep at this
479         // point, but re-open crunch log with ArvClient in case there are any
480         // other further (such as failing to write the log to Keep!) while
481         // shutting down
482         runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID,
483                 "crunch-run", nil})
484
485         mt, err := runner.LogCollection.ManifestText()
486         if err != nil {
487                 return fmt.Errorf("While creating log manifest: %v", err)
488         }
489
490         var response CollectionRecord
491         err = runner.ArvClient.Create("collections",
492                 arvadosclient.Dict{
493                         "collection": arvadosclient.Dict{
494                                 "name":          "logs for " + runner.ContainerRecord.UUID,
495                                 "manifest_text": mt}},
496                 &response)
497         if err != nil {
498                 return fmt.Errorf("While creating log collection: %v", err)
499         }
500
501         runner.LogsPDH = new(string)
502         *runner.LogsPDH = response.PortableDataHash
503
504         return nil
505 }
506
507 // UpdateContainerRecordRunning updates the container state to "Running"
508 func (runner *ContainerRunner) UpdateContainerRecordRunning() error {
509         return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID,
510                 arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
511 }
512
513 // UpdateContainerRecordComplete updates the container record state on API
514 // server to "Complete" or "Cancelled"
515 func (runner *ContainerRunner) UpdateContainerRecordComplete() error {
516         update := arvadosclient.Dict{}
517         if runner.LogsPDH != nil {
518                 update["log"] = *runner.LogsPDH
519         }
520         if runner.ExitCode != nil {
521                 update["exit_code"] = *runner.ExitCode
522         }
523         if runner.OutputPDH != nil {
524                 update["output"] = runner.OutputPDH
525         }
526
527         update["state"] = runner.finalState
528
529         return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, arvadosclient.Dict{"container": update}, nil)
530 }
531
532 // NewArvLogWriter creates an ArvLogWriter
533 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
534         return &ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID, name, runner.LogCollection.Open(name + ".txt")}
535 }
536
537 // Run the full container lifecycle.
538 func (runner *ContainerRunner) Run() (err error) {
539         runner.CrunchLog.Printf("Executing container '%s'", runner.ContainerRecord.UUID)
540
541         var runerr, waiterr error
542
543         defer func() {
544                 if err != nil {
545                         runner.CrunchLog.Print(err)
546                 }
547
548                 if runner.Cancelled {
549                         runner.finalState = "Cancelled"
550                 } else {
551                         runner.finalState = "Complete"
552                 }
553
554                 // (7) capture output
555                 outputerr := runner.CaptureOutput()
556                 if outputerr != nil {
557                         runner.CrunchLog.Print(outputerr)
558                 }
559
560                 // (8) write logs
561                 logerr := runner.CommitLogs()
562                 if logerr != nil {
563                         runner.CrunchLog.Print(logerr)
564                 }
565
566                 // (9) update container record with results
567                 updateerr := runner.UpdateContainerRecordComplete()
568                 if updateerr != nil {
569                         runner.CrunchLog.Print(updateerr)
570                 }
571
572                 runner.CrunchLog.Close()
573
574                 if err == nil {
575                         if runerr != nil {
576                                 err = runerr
577                         } else if waiterr != nil {
578                                 err = runerr
579                         } else if logerr != nil {
580                                 err = logerr
581                         } else if updateerr != nil {
582                                 err = updateerr
583                         }
584                 }
585         }()
586
587         err = runner.ArvClient.Get("containers", runner.ContainerRecord.UUID, nil, &runner.ContainerRecord)
588         if err != nil {
589                 return fmt.Errorf("While getting container record: %v", err)
590         }
591
592         // (1) setup signal handling
593         err = runner.SetupSignals()
594         if err != nil {
595                 return fmt.Errorf("While setting up signal handling: %v", err)
596         }
597
598         // (2) check for and/or load image
599         err = runner.LoadImage()
600         if err != nil {
601                 return fmt.Errorf("While loading container image: %v", err)
602         }
603
604         // (3) set up FUSE mount and binds
605         err = runner.SetupMounts()
606         if err != nil {
607                 return fmt.Errorf("While setting up mounts: %v", err)
608         }
609
610         // (3) create and start container
611         err = runner.StartContainer()
612         if err != nil {
613                 if err == ErrCancelled {
614                         err = nil
615                 }
616                 return
617         }
618
619         // (4) update container record state
620         err = runner.UpdateContainerRecordRunning()
621         if err != nil {
622                 runner.CrunchLog.Print(err)
623         }
624
625         // (5) attach container logs
626         runerr = runner.AttachLogs()
627         if runerr != nil {
628                 runner.CrunchLog.Print(runerr)
629         }
630
631         // (6) wait for container to finish
632         waiterr = runner.WaitFinish()
633
634         return
635 }
636
637 // NewContainerRunner creates a new container runner.
638 func NewContainerRunner(api IArvadosClient,
639         kc IKeepClient,
640         docker ThinDockerClient,
641         containerUUID string) *ContainerRunner {
642
643         cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
644         cr.NewLogWriter = cr.NewArvLogWriter
645         cr.RunArvMount = cr.ArvMountCmd
646         cr.MkTempDir = ioutil.TempDir
647         cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
648         cr.ContainerRecord.UUID = containerUUID
649         cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
650         return cr
651 }
652
653 func main() {
654         flag.Parse()
655
656         api, err := arvadosclient.MakeArvadosClient()
657         if err != nil {
658                 log.Fatal(err)
659         }
660         api.Retries = 8
661
662         var kc *keepclient.KeepClient
663         kc, err = keepclient.MakeKeepClient(&api)
664         if err != nil {
665                 log.Fatal(err)
666         }
667         kc.Retries = 4
668
669         var docker *dockerclient.DockerClient
670         docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
671         if err != nil {
672                 log.Fatal(err)
673         }
674
675         cr := NewContainerRunner(api, kc, docker, flag.Arg(0))
676
677         err = cr.Run()
678         if err != nil {
679                 log.Fatal(err)
680         }
681
682 }