665f10767ae2989dffa4f8b9198111fedeffbe01
[arvados.git] / services / crunch-run / crunchrun.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/keepclient"
10         "git.curoverse.com/arvados.git/sdk/go/manifest"
11         "github.com/curoverse/dockerclient"
12         "io"
13         "io/ioutil"
14         "log"
15         "os"
16         "os/exec"
17         "os/signal"
18         "strings"
19         "sync"
20         "syscall"
21         "time"
22 )
23
24 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
25 type IArvadosClient interface {
26         Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
27         Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
28         Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
29 }
30
31 // ErrCancelled is the error returned when the container is cancelled.
32 var ErrCancelled = errors.New("Cancelled")
33
34 // IKeepClient is the minimal Keep API methods used by crunch-run.
35 type IKeepClient interface {
36         PutHB(hash string, buf []byte) (string, int, error)
37         ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
38 }
39
40 // Mount describes the mount points to create inside the container.
41 type Mount struct {
42         Kind             string `json:"kind"`
43         Writable         bool   `json:"writable"`
44         PortableDataHash string `json:"portable_data_hash"`
45         UUID             string `json:"uuid"`
46         DeviceType       string `json:"device_type"`
47 }
48
49 // Collection record returned by the API server.
50 type CollectionRecord struct {
51         ManifestText     string `json:"manifest_text"`
52         PortableDataHash string `json:"portable_data_hash"`
53 }
54
55 // ContainerRecord is the container record returned by the API server.
56 type ContainerRecord struct {
57         UUID               string                 `json:"uuid"`
58         Command            []string               `json:"command"`
59         ContainerImage     string                 `json:"container_image"`
60         Cwd                string                 `json:"cwd"`
61         Environment        map[string]string      `json:"environment"`
62         Mounts             map[string]Mount       `json:"mounts"`
63         OutputPath         string                 `json:"output_path"`
64         Priority           int                    `json:"priority"`
65         RuntimeConstraints map[string]interface{} `json:"runtime_constraints"`
66         State              string                 `json:"state"`
67         Output             string                 `json:"output"`
68 }
69
70 // NewLogWriter is a factory function to create a new log writer.
71 type NewLogWriter func(name string) io.WriteCloser
72
73 type RunArvMount func([]string) (*exec.Cmd, error)
74
75 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
76 type ThinDockerClient interface {
77         StopContainer(id string, timeout int) error
78         InspectImage(id string) (*dockerclient.ImageInfo, error)
79         LoadImage(reader io.Reader) error
80         CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
81         StartContainer(id string, config *dockerclient.HostConfig) error
82         ContainerLogs(id string, options *dockerclient.LogOptions) (io.ReadCloser, error)
83         Wait(id string) <-chan dockerclient.WaitResult
84         RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
85 }
86
87 // ContainerRunner is the main stateful struct used for a single execution of a
88 // container.
89 type ContainerRunner struct {
90         Docker    ThinDockerClient
91         ArvClient IArvadosClient
92         Kc        IKeepClient
93         ContainerRecord
94         dockerclient.ContainerConfig
95         ContainerID string
96         ExitCode    *int
97         NewLogWriter
98         loggingDone   chan bool
99         CrunchLog     *ThrottledLogger
100         Stdout        *ThrottledLogger
101         Stderr        *ThrottledLogger
102         LogCollection *CollectionWriter
103         LogsPDH       *string
104         RunArvMount
105         ArvMount      *exec.Cmd
106         ArvMountPoint string
107         HostOutputDir string
108         Binds         []string
109         OutputPDH     *string
110         CancelLock    sync.Mutex
111         Cancelled     bool
112         SigChan       chan os.Signal
113         ArvMountExit  chan error
114         finalState    string
115 }
116
117 // SetupSignals sets up signal handling to gracefully terminate the underlying
118 // Docker container and update state when receiving a TERM, INT or QUIT signal.
119 func (runner *ContainerRunner) SetupSignals() error {
120         runner.SigChan = make(chan os.Signal, 1)
121         signal.Notify(runner.SigChan, syscall.SIGTERM)
122         signal.Notify(runner.SigChan, syscall.SIGINT)
123         signal.Notify(runner.SigChan, syscall.SIGQUIT)
124
125         go func(sig <-chan os.Signal) {
126                 for _ = range sig {
127                         if !runner.Cancelled {
128                                 runner.CancelLock.Lock()
129                                 runner.Cancelled = true
130                                 if runner.ContainerID != "" {
131                                         runner.Docker.StopContainer(runner.ContainerID, 10)
132                                 }
133                                 runner.CancelLock.Unlock()
134                         }
135                 }
136         }(runner.SigChan)
137
138         return nil
139 }
140
141 // LoadImage determines the docker image id from the container record and
142 // checks if it is available in the local Docker image store.  If not, it loads
143 // the image from Keep.
144 func (runner *ContainerRunner) LoadImage() (err error) {
145
146         runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.ContainerRecord.ContainerImage)
147
148         var collection CollectionRecord
149         err = runner.ArvClient.Get("collections", runner.ContainerRecord.ContainerImage, nil, &collection)
150         if err != nil {
151                 return fmt.Errorf("While getting container image collection: %v", err)
152         }
153         manifest := manifest.Manifest{Text: collection.ManifestText}
154         var img, imageID string
155         for ms := range manifest.StreamIter() {
156                 img = ms.FileStreamSegments[0].Name
157                 if !strings.HasSuffix(img, ".tar") {
158                         return fmt.Errorf("First file in the container image collection does not end in .tar")
159                 }
160                 imageID = img[:len(img)-4]
161         }
162
163         runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
164
165         _, err = runner.Docker.InspectImage(imageID)
166         if err != nil {
167                 runner.CrunchLog.Print("Loading Docker image from keep")
168
169                 var readCloser io.ReadCloser
170                 readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
171                 if err != nil {
172                         return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
173                 }
174
175                 err = runner.Docker.LoadImage(readCloser)
176                 if err != nil {
177                         return fmt.Errorf("While loading container image into Docker: %v", err)
178                 }
179         } else {
180                 runner.CrunchLog.Print("Docker image is available")
181         }
182
183         runner.ContainerConfig.Image = imageID
184
185         return nil
186 }
187
188 func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string) (c *exec.Cmd, err error) {
189         c = exec.Command("arv-mount", arvMountCmd...)
190         nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
191         c.Stdout = nt
192         c.Stderr = nt
193
194         err = c.Start()
195         if err != nil {
196                 return nil, err
197         }
198
199         statReadme := make(chan bool)
200         runner.ArvMountExit = make(chan error)
201
202         keepStatting := true
203         go func() {
204                 for keepStatting {
205                         time.Sleep(100 * time.Millisecond)
206                         _, err = os.Stat(fmt.Sprintf("%s/by_id/README", runner.ArvMountPoint))
207                         if err == nil {
208                                 keepStatting = false
209                                 statReadme <- true
210                         }
211                 }
212                 close(statReadme)
213         }()
214
215         go func() {
216                 runner.ArvMountExit <- c.Wait()
217                 close(runner.ArvMountExit)
218         }()
219
220         select {
221         case <-statReadme:
222                 break
223         case err := <-runner.ArvMountExit:
224                 runner.ArvMount = nil
225                 keepStatting = false
226                 return nil, err
227         }
228
229         return c, nil
230 }
231
232 func (runner *ContainerRunner) SetupMounts() (err error) {
233         runner.ArvMountPoint, err = ioutil.TempDir("", "keep")
234         if err != nil {
235                 return fmt.Errorf("While creating keep mount temp dir: %v", err)
236         }
237
238         pdhOnly := true
239         tmpcount := 0
240         arvMountCmd := []string{"--foreground"}
241         collectionPaths := []string{}
242
243         for bind, mnt := range runner.ContainerRecord.Mounts {
244                 if mnt.Kind == "collection" {
245                         var src string
246                         if mnt.UUID != "" && mnt.PortableDataHash != "" {
247                                 return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
248                         }
249                         if mnt.UUID != "" {
250                                 if mnt.Writable {
251                                         return fmt.Errorf("Writing to existing collections currently not permitted.")
252                                 }
253                                 pdhOnly = false
254                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
255                         } else if mnt.PortableDataHash != "" {
256                                 if mnt.Writable {
257                                         return fmt.Errorf("Can never write to a collection specified by portable data hash")
258                                 }
259                                 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
260                         } else {
261                                 src = fmt.Sprintf("%s/tmp%i", runner.ArvMountPoint, tmpcount)
262                                 arvMountCmd = append(arvMountCmd, "--mount-tmp")
263                                 arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%i", tmpcount))
264                                 tmpcount += 1
265                         }
266                         if mnt.Writable {
267                                 if bind == runner.ContainerRecord.OutputPath {
268                                         runner.HostOutputDir = src
269                                 }
270                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
271                         } else {
272                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
273                         }
274                         collectionPaths = append(collectionPaths, src)
275                 } else if mnt.Kind == "tmp" {
276                         if bind == runner.ContainerRecord.OutputPath {
277                                 runner.HostOutputDir, err = ioutil.TempDir("", "")
278                                 if err != nil {
279                                         return fmt.Errorf("While creating mount temp dir: %v", err)
280                                 }
281
282                                 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
283                         } else {
284                                 runner.Binds = append(runner.Binds, bind)
285                         }
286                 } else {
287                         return fmt.Errorf("Unknown mount kind '%s'", mnt.Kind)
288                 }
289         }
290
291         if runner.HostOutputDir == "" {
292                 return fmt.Errorf("Output path does not correspond to a writable mount point")
293         }
294
295         if pdhOnly {
296                 arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
297         } else {
298                 arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
299         }
300         arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
301
302         runner.ArvMount, err = runner.RunArvMount(arvMountCmd)
303         if err != nil {
304                 return fmt.Errorf("While trying to start arv-mount: %v", err)
305         }
306
307         for _, p := range collectionPaths {
308                 _, err = os.Stat(p)
309                 if err != nil {
310                         return fmt.Errorf("While checking that input files exist: %v", err)
311                 }
312         }
313
314         return nil
315 }
316
317 // StartContainer creates the container and runs it.
318 func (runner *ContainerRunner) StartContainer() (err error) {
319         runner.CrunchLog.Print("Creating Docker container")
320
321         runner.CancelLock.Lock()
322         defer runner.CancelLock.Unlock()
323
324         if runner.Cancelled {
325                 return ErrCancelled
326         }
327
328         runner.ContainerConfig.Cmd = runner.ContainerRecord.Command
329         if runner.ContainerRecord.Cwd != "." {
330                 runner.ContainerConfig.WorkingDir = runner.ContainerRecord.Cwd
331         }
332         for k, v := range runner.ContainerRecord.Environment {
333                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
334         }
335         runner.ContainerConfig.NetworkDisabled = true
336         runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
337         if err != nil {
338                 return fmt.Errorf("While creating container: %v", err)
339         }
340         hostConfig := &dockerclient.HostConfig{Binds: runner.Binds}
341
342         runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
343         err = runner.Docker.StartContainer(runner.ContainerID, hostConfig)
344         if err != nil {
345                 return fmt.Errorf("While starting container: %v", err)
346         }
347
348         return nil
349 }
350
351 // AttachLogs connects the docker container stdout and stderr logs to the
352 // Arvados logger which logs to Keep and the API server logs table.
353 func (runner *ContainerRunner) AttachLogs() (err error) {
354
355         runner.CrunchLog.Print("Attaching container logs")
356
357         var stderrReader, stdoutReader io.Reader
358         stderrReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stderr: true})
359         if err != nil {
360                 return fmt.Errorf("While getting container standard error: %v", err)
361         }
362         stdoutReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stdout: true})
363         if err != nil {
364                 return fmt.Errorf("While getting container standard output: %v", err)
365         }
366
367         runner.loggingDone = make(chan bool)
368
369         runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
370         runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
371         go ReadWriteLines(stdoutReader, runner.Stdout, runner.loggingDone)
372         go ReadWriteLines(stderrReader, runner.Stderr, runner.loggingDone)
373
374         return nil
375 }
376
377 // WaitFinish waits for the container to terminate, capture the exit code, and
378 // close the stdout/stderr logging.
379 func (runner *ContainerRunner) WaitFinish() error {
380         result := runner.Docker.Wait(runner.ContainerID)
381         wr := <-result
382         if wr.Error != nil {
383                 return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
384         }
385         runner.ExitCode = &wr.ExitCode
386
387         // drain stdout/stderr
388         <-runner.loggingDone
389         <-runner.loggingDone
390
391         runner.Stdout.Close()
392         runner.Stderr.Close()
393
394         return nil
395 }
396
397 // HandleOutput sets the output and unmounts the FUSE mount.
398 func (runner *ContainerRunner) CaptureOutput() error {
399         if runner.ArvMount != nil {
400                 defer func() {
401                         umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
402                         umnterr := umount.Run()
403                         if umnterr != nil {
404                                 runner.CrunchLog.Print("While running fusermount: %v", umnterr)
405                         }
406
407                         mnterr := <-runner.ArvMountExit
408                         if mnterr != nil {
409                                 runner.CrunchLog.Print("Arv-mount exit error: %v", mnterr)
410                         }
411                 }()
412         }
413
414         if runner.finalState != "Complete" {
415                 return nil
416         }
417
418         if runner.HostOutputDir == "" {
419                 return nil
420         }
421
422         _, err := os.Stat(runner.HostOutputDir)
423         if err != nil {
424                 return fmt.Errorf("While checking host output path: %v", err)
425         }
426
427         var manifestText string
428
429         collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
430         _, err = os.Stat(collectionMetafile)
431         if err != nil {
432                 // Regular directory
433                 cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}}
434                 manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
435                 if err != nil {
436                         return fmt.Errorf("While uploading output files: %v", err)
437                 }
438         } else {
439                 // FUSE mount directory
440                 file, openerr := os.Open(collectionMetafile)
441                 if openerr != nil {
442                         return fmt.Errorf("While opening FUSE metafile: %v", err)
443                 }
444                 defer file.Close()
445
446                 rec := CollectionRecord{}
447                 err = json.NewDecoder(file).Decode(&rec)
448                 if err != nil {
449                         return fmt.Errorf("While reading FUSE metafile: %v", err)
450                 }
451                 manifestText = rec.ManifestText
452         }
453
454         var response CollectionRecord
455         err = runner.ArvClient.Create("collections",
456                 arvadosclient.Dict{
457                         "collection": arvadosclient.Dict{
458                                 "manifest_text": manifestText}},
459                 &response)
460         if err != nil {
461                 return fmt.Errorf("While creating output collection: %v", err)
462         }
463
464         runner.OutputPDH = new(string)
465         *runner.OutputPDH = response.PortableDataHash
466
467         return nil
468 }
469
470 // CommitLogs posts the collection containing the final container logs.
471 func (runner *ContainerRunner) CommitLogs() error {
472         runner.CrunchLog.Print(runner.finalState)
473         runner.CrunchLog.Close()
474
475         // Closing CrunchLog above allows it to be committed to Keep at this
476         // point, but re-open crunch log with ArvClient in case there are any
477         // other further (such as failing to write the log to Keep!) while
478         // shutting down
479         runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID,
480                 "crunch-run", nil})
481
482         mt, err := runner.LogCollection.ManifestText()
483         if err != nil {
484                 return fmt.Errorf("While creating log manifest: %v", err)
485         }
486
487         var response CollectionRecord
488         err = runner.ArvClient.Create("collections",
489                 arvadosclient.Dict{
490                         "collection": arvadosclient.Dict{
491                                 "name":          "logs for " + runner.ContainerRecord.UUID,
492                                 "manifest_text": mt}},
493                 &response)
494         if err != nil {
495                 return fmt.Errorf("While creating log collection: %v", err)
496         }
497
498         runner.LogsPDH = new(string)
499         *runner.LogsPDH = response.PortableDataHash
500
501         return nil
502 }
503
504 // UpdateContainerRecordRunning updates the container state to "Running"
505 func (runner *ContainerRunner) UpdateContainerRecordRunning() error {
506         return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID,
507                 arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
508 }
509
510 // UpdateContainerRecordComplete updates the container record state on API
511 // server to "Complete" or "Cancelled"
512 func (runner *ContainerRunner) UpdateContainerRecordComplete() error {
513         update := arvadosclient.Dict{}
514         if runner.LogsPDH != nil {
515                 update["log"] = *runner.LogsPDH
516         }
517         if runner.ExitCode != nil {
518                 update["exit_code"] = *runner.ExitCode
519         }
520         if runner.OutputPDH != nil {
521                 update["output"] = runner.OutputPDH
522         }
523
524         update["state"] = runner.finalState
525
526         return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, arvadosclient.Dict{"container": update}, nil)
527 }
528
529 // NewArvLogWriter creates an ArvLogWriter
530 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
531         return &ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID, name, runner.LogCollection.Open(name + ".txt")}
532 }
533
534 // Run the full container lifecycle.
535 func (runner *ContainerRunner) Run() (err error) {
536         runner.CrunchLog.Printf("Executing container '%s'", runner.ContainerRecord.UUID)
537
538         var runerr, waiterr error
539
540         defer func() {
541                 if err != nil {
542                         runner.CrunchLog.Print(err)
543                 }
544
545                 if runner.Cancelled {
546                         runner.finalState = "Cancelled"
547                 } else {
548                         runner.finalState = "Complete"
549                 }
550
551                 // (7) capture output
552                 outputerr := runner.CaptureOutput()
553                 if outputerr != nil {
554                         runner.CrunchLog.Print(outputerr)
555                 }
556
557                 // (8) write logs
558                 logerr := runner.CommitLogs()
559                 if logerr != nil {
560                         runner.CrunchLog.Print(logerr)
561                 }
562
563                 // (9) update container record with results
564                 updateerr := runner.UpdateContainerRecordComplete()
565                 if updateerr != nil {
566                         runner.CrunchLog.Print(updateerr)
567                 }
568
569                 runner.CrunchLog.Close()
570
571                 if err == nil {
572                         if runerr != nil {
573                                 err = runerr
574                         } else if waiterr != nil {
575                                 err = runerr
576                         } else if logerr != nil {
577                                 err = logerr
578                         } else if updateerr != nil {
579                                 err = updateerr
580                         }
581                 }
582         }()
583
584         err = runner.ArvClient.Get("containers", runner.ContainerRecord.UUID, nil, &runner.ContainerRecord)
585         if err != nil {
586                 return fmt.Errorf("While getting container record: %v", err)
587         }
588
589         // (1) setup signal handling
590         err = runner.SetupSignals()
591         if err != nil {
592                 return fmt.Errorf("While setting up signal handling: %v", err)
593         }
594
595         // (2) check for and/or load image
596         err = runner.LoadImage()
597         if err != nil {
598                 return fmt.Errorf("While loading container image: %v", err)
599         }
600
601         // (3) set up FUSE mount and binds
602         err = runner.SetupMounts()
603         if err != nil {
604                 return fmt.Errorf("While setting up mounts: %v", err)
605         }
606
607         // (3) create and start container
608         err = runner.StartContainer()
609         if err != nil {
610                 if err == ErrCancelled {
611                         err = nil
612                 }
613                 return
614         }
615
616         // (4) update container record state
617         err = runner.UpdateContainerRecordRunning()
618         if err != nil {
619                 runner.CrunchLog.Print(err)
620         }
621
622         // (5) attach container logs
623         runerr = runner.AttachLogs()
624         if runerr != nil {
625                 runner.CrunchLog.Print(runerr)
626         }
627
628         // (6) wait for container to finish
629         waiterr = runner.WaitFinish()
630
631         return
632 }
633
634 // NewContainerRunner creates a new container runner.
635 func NewContainerRunner(api IArvadosClient,
636         kc IKeepClient,
637         docker ThinDockerClient,
638         containerUUID string) *ContainerRunner {
639
640         cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
641         cr.NewLogWriter = cr.NewArvLogWriter
642         cr.RunArvMount = cr.ArvMountCmd
643         cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
644         cr.ContainerRecord.UUID = containerUUID
645         cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
646         return cr
647 }
648
649 func main() {
650         flag.Parse()
651
652         api, err := arvadosclient.MakeArvadosClient()
653         if err != nil {
654                 log.Fatal(err)
655         }
656         api.Retries = 8
657
658         var kc *keepclient.KeepClient
659         kc, err = keepclient.MakeKeepClient(&api)
660         if err != nil {
661                 log.Fatal(err)
662         }
663         kc.Retries = 4
664
665         var docker *dockerclient.DockerClient
666         docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
667         if err != nil {
668                 log.Fatal(err)
669         }
670
671         cr := NewContainerRunner(api, kc, docker, flag.Arg(0))
672
673         err = cr.Run()
674         if err != nil {
675                 log.Fatal(err)
676         }
677
678 }