7816: CopyReaderToLog renamed to ReadWriteLines. Use Writer instead of Logger
[arvados.git] / services / crunch-run / crunchrun.go
1 package main
2
3 import (
4         "errors"
5         "flag"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/keepclient"
8         "git.curoverse.com/arvados.git/sdk/go/manifest"
9         "github.com/curoverse/dockerclient"
10         "io"
11         "log"
12         "os"
13         "os/signal"
14         "strings"
15         "sync"
16         "syscall"
17 )
18
19 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
20 type IArvadosClient interface {
21         Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
22         Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
23         Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
24 }
25
26 // ErrCancelled is the error returned when the container is cancelled.
27 var ErrCancelled = errors.New("Cancelled")
28
29 // IKeepClient is the minimal Keep API methods used by crunch-run.
30 type IKeepClient interface {
31         PutHB(hash string, buf []byte) (string, int, error)
32         ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
33 }
34
35 // Mount describes the mount points to create inside the container.
36 type Mount struct{}
37
38 // Collection record returned by the API server.
39 type Collection struct {
40         ManifestText string `json:"manifest_text"`
41 }
42
43 // ContainerRecord is the container record returned by the API server.
44 type ContainerRecord struct {
45         UUID               string                 `json:"uuid"`
46         Command            []string               `json:"command"`
47         ContainerImage     string                 `json:"container_image"`
48         Cwd                string                 `json:"cwd"`
49         Environment        map[string]string      `json:"environment"`
50         Mounts             map[string]Mount       `json:"mounts"`
51         OutputPath         string                 `json:"output_path"`
52         Priority           int                    `json:"priority"`
53         RuntimeConstraints map[string]interface{} `json:"runtime_constraints"`
54         State              string                 `json:"state"`
55 }
56
57 // NewLogWriter is a factory function to create a new log writer.
58 type NewLogWriter func(name string) io.WriteCloser
59
60 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
61 type ThinDockerClient interface {
62         StopContainer(id string, timeout int) error
63         InspectImage(id string) (*dockerclient.ImageInfo, error)
64         LoadImage(reader io.Reader) error
65         CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
66         StartContainer(id string, config *dockerclient.HostConfig) error
67         ContainerLogs(id string, options *dockerclient.LogOptions) (io.ReadCloser, error)
68         Wait(id string) <-chan dockerclient.WaitResult
69         RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
70 }
71
72 // ContainerRunner is the main stateful struct used for a single execution of a
73 // container.
74 type ContainerRunner struct {
75         Docker    ThinDockerClient
76         ArvClient IArvadosClient
77         Kc        IKeepClient
78         ContainerRecord
79         dockerclient.ContainerConfig
80         ContainerID string
81         ExitCode    *int
82         NewLogWriter
83         loggingDone   chan bool
84         CrunchLog     *ThrottledLogger
85         Stdout        *ThrottledLogger
86         Stderr        *ThrottledLogger
87         LogCollection *CollectionWriter
88         LogsPDH       *string
89         CancelLock    sync.Mutex
90         Cancelled     bool
91         SigChan       chan os.Signal
92         finalState    string
93 }
94
95 // SetupSignals sets up signal handling to gracefully terminate the underlying
96 // Docker container and update state when receiving a TERM, INT or QUIT signal.
97 func (runner *ContainerRunner) SetupSignals() error {
98         runner.SigChan = make(chan os.Signal, 1)
99         signal.Notify(runner.SigChan, syscall.SIGTERM)
100         signal.Notify(runner.SigChan, syscall.SIGINT)
101         signal.Notify(runner.SigChan, syscall.SIGQUIT)
102
103         go func(sig <-chan os.Signal) {
104                 for _ = range sig {
105                         if !runner.Cancelled {
106                                 runner.CancelLock.Lock()
107                                 runner.Cancelled = true
108                                 if runner.ContainerID != "" {
109                                         runner.Docker.StopContainer(runner.ContainerID, 10)
110                                 }
111                                 runner.CancelLock.Unlock()
112                         }
113                 }
114         }(runner.SigChan)
115
116         return nil
117 }
118
119 // LoadImage determines the docker image id from the container record and
120 // checks if it is available in the local Docker image store.  If not, it loads
121 // the image from Keep.
122 func (runner *ContainerRunner) LoadImage() (err error) {
123
124         runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.ContainerRecord.ContainerImage)
125
126         var collection Collection
127         err = runner.ArvClient.Get("collections", runner.ContainerRecord.ContainerImage, nil, &collection)
128         if err != nil {
129                 return err
130         }
131         manifest := manifest.Manifest{Text: collection.ManifestText}
132         var img, imageID string
133         for ms := range manifest.StreamIter() {
134                 img = ms.FileStreamSegments[0].Name
135                 if !strings.HasSuffix(img, ".tar") {
136                         return errors.New("First file in the collection does not end in .tar")
137                 }
138                 imageID = img[:len(img)-4]
139         }
140
141         runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
142
143         _, err = runner.Docker.InspectImage(imageID)
144         if err != nil {
145                 runner.CrunchLog.Print("Loading Docker image from keep")
146
147                 var readCloser io.ReadCloser
148                 readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
149                 if err != nil {
150                         return err
151                 }
152
153                 err = runner.Docker.LoadImage(readCloser)
154                 if err != nil {
155                         return err
156                 }
157         } else {
158                 runner.CrunchLog.Print("Docker image is available")
159         }
160
161         runner.ContainerConfig.Image = imageID
162
163         return nil
164 }
165
166 // StartContainer creates the container and runs it.
167 func (runner *ContainerRunner) StartContainer() (err error) {
168         runner.CrunchLog.Print("Creating Docker container")
169
170         runner.CancelLock.Lock()
171         defer runner.CancelLock.Unlock()
172
173         if runner.Cancelled {
174                 return ErrCancelled
175         }
176
177         runner.ContainerConfig.Cmd = runner.ContainerRecord.Command
178         if runner.ContainerRecord.Cwd != "." {
179                 runner.ContainerConfig.WorkingDir = runner.ContainerRecord.Cwd
180         }
181         for k, v := range runner.ContainerRecord.Environment {
182                 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
183         }
184         runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
185         if err != nil {
186                 return
187         }
188         hostConfig := &dockerclient.HostConfig{}
189
190         runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
191         err = runner.Docker.StartContainer(runner.ContainerID, hostConfig)
192         if err != nil {
193                 return
194         }
195
196         return nil
197 }
198
199 // AttachLogs connects the docker container stdout and stderr logs to the
200 // Arvados logger which logs to Keep and the API server logs table.
201 func (runner *ContainerRunner) AttachLogs() (err error) {
202
203         runner.CrunchLog.Print("Attaching container logs")
204
205         var stderrReader, stdoutReader io.Reader
206         stderrReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stderr: true})
207         if err != nil {
208                 return
209         }
210         stdoutReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stdout: true})
211         if err != nil {
212                 return
213         }
214
215         runner.loggingDone = make(chan bool)
216
217         runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
218         runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
219         go ReadWriteLines(stdoutReader, runner.Stdout, runner.loggingDone)
220         go ReadWriteLines(stderrReader, runner.Stderr, runner.loggingDone)
221
222         return nil
223 }
224
225 // WaitFinish waits for the container to terminate, capture the exit code, and
226 // close the stdout/stderr logging.
227 func (runner *ContainerRunner) WaitFinish() error {
228         result := runner.Docker.Wait(runner.ContainerID)
229         wr := <-result
230         if wr.Error != nil {
231                 return wr.Error
232         }
233         runner.ExitCode = &wr.ExitCode
234
235         // drain stdout/stderr
236         <-runner.loggingDone
237         <-runner.loggingDone
238
239         runner.Stdout.Close()
240         runner.Stderr.Close()
241
242         return nil
243 }
244
245 // CommitLogs posts the collection containing the final container logs.
246 func (runner *ContainerRunner) CommitLogs() error {
247         runner.CrunchLog.Print(runner.finalState)
248         runner.CrunchLog.Close()
249
250         // Closing CrunchLog above allows it to be committed to Keep at this
251         // point, but re-open crunch log with ArvClient in case there are any
252         // other further (such as failing to write the log to Keep!) while
253         // shutting down
254         runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID,
255                 "crunch-run", nil})
256
257         mt, err := runner.LogCollection.ManifestText()
258         if err != nil {
259                 return err
260         }
261
262         response := make(map[string]string)
263         err = runner.ArvClient.Create("collections",
264                 arvadosclient.Dict{"name": "logs for " + runner.ContainerRecord.UUID,
265                         "manifest_text": mt},
266                 response)
267         if err != nil {
268                 return err
269         }
270
271         runner.LogsPDH = new(string)
272         *runner.LogsPDH = response["portable_data_hash"]
273
274         return nil
275 }
276
277 // UpdateContainerRecordRunning updates the container state to "Running"
278 func (runner *ContainerRunner) UpdateContainerRecordRunning() error {
279         update := arvadosclient.Dict{"state": "Running"}
280         return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, update, nil)
281 }
282
283 // UpdateContainerRecordComplete updates the container record state on API
284 // server to "Complete" or "Cancelled"
285 func (runner *ContainerRunner) UpdateContainerRecordComplete() error {
286         update := arvadosclient.Dict{}
287         if runner.LogsPDH != nil {
288                 update["log"] = *runner.LogsPDH
289         }
290         if runner.ExitCode != nil {
291                 update["exit_code"] = *runner.ExitCode
292         }
293
294         update["state"] = runner.finalState
295
296         return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, update, nil)
297 }
298
299 // NewArvLogWriter creates an ArvLogWriter
300 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
301         return &ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID, name, runner.LogCollection.Open(name + ".txt")}
302 }
303
304 // Run the full container lifecycle.
305 func (runner *ContainerRunner) Run(containerUUID string) (err error) {
306         runner.CrunchLog.Printf("Executing container '%s'", containerUUID)
307
308         var runerr, waiterr error
309
310         defer func() {
311                 if err != nil {
312                         runner.CrunchLog.Print(err)
313                 }
314
315                 if runner.Cancelled {
316                         runner.finalState = "Cancelled"
317                 } else {
318                         runner.finalState = "Complete"
319                 }
320
321                 // (6) write logs
322                 logerr := runner.CommitLogs()
323                 if logerr != nil {
324                         runner.CrunchLog.Print(logerr)
325                 }
326
327                 // (7) update container record with results
328                 updateerr := runner.UpdateContainerRecordComplete()
329                 if updateerr != nil {
330                         runner.CrunchLog.Print(updateerr)
331                 }
332
333                 runner.CrunchLog.Close()
334
335                 if err == nil {
336                         if runerr != nil {
337                                 err = runerr
338                         } else if waiterr != nil {
339                                 err = runerr
340                         } else if logerr != nil {
341                                 err = logerr
342                         } else if updateerr != nil {
343                                 err = updateerr
344                         }
345                 }
346         }()
347
348         err = runner.ArvClient.Get("containers", containerUUID, nil, &runner.ContainerRecord)
349         if err != nil {
350                 return
351         }
352
353         // (0) setup signal handling
354         err = runner.SetupSignals()
355         if err != nil {
356                 return
357         }
358
359         // (1) check for and/or load image
360         err = runner.LoadImage()
361         if err != nil {
362                 return
363         }
364
365         // (2) start container
366         err = runner.StartContainer()
367         if err != nil {
368                 if err == ErrCancelled {
369                         err = nil
370                 }
371                 return
372         }
373
374         // (3) update container record state
375         err = runner.UpdateContainerRecordRunning()
376         if err != nil {
377                 runner.CrunchLog.Print(err)
378         }
379
380         // (4) attach container logs
381         runerr = runner.AttachLogs()
382         if runerr != nil {
383                 runner.CrunchLog.Print(runerr)
384         }
385
386         // (5) wait for container to finish
387         waiterr = runner.WaitFinish()
388
389         return
390 }
391
392 // NewContainerRunner creates a new container runner.
393 func NewContainerRunner(api IArvadosClient,
394         kc IKeepClient,
395         docker ThinDockerClient) *ContainerRunner {
396
397         cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
398         cr.NewLogWriter = cr.NewArvLogWriter
399         cr.LogCollection = &CollectionWriter{kc, nil}
400         cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
401         return cr
402 }
403
404 func main() {
405         flag.Parse()
406
407         api, err := arvadosclient.MakeArvadosClient()
408         if err != nil {
409                 log.Fatal(err)
410         }
411         api.Retries = 8
412
413         var kc *keepclient.KeepClient
414         kc, err = keepclient.MakeKeepClient(&api)
415         if err != nil {
416                 log.Fatal(err)
417         }
418         kc.Retries = 4
419
420         var docker *dockerclient.DockerClient
421         docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
422         if err != nil {
423                 log.Fatal(err)
424         }
425
426         cr := NewContainerRunner(api, kc, docker)
427
428         err = cr.Run(flag.Arg(0))
429         if err != nil {
430                 log.Fatal(err)
431         }
432
433 }