6 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7 "git.curoverse.com/arvados.git/sdk/go/keepclient"
8 "git.curoverse.com/arvados.git/sdk/go/manifest"
9 "github.com/curoverse/dockerclient"
19 // IArvadosClient is the minimal Arvados API methods used by crunchexec.
20 type IArvadosClient interface {
21 Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
22 Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
23 Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
26 // ErrCancelled is the error returned when the container is cancelled.
27 var ErrCancelled = errors.New("Cancelled")
29 // IKeepClient is the minimal Keep API methods used by crunchexec.
30 type IKeepClient interface {
31 PutHB(hash string, buf []byte) (string, int, error)
32 ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
35 // Mount describes the mount points to create inside the container.
38 // Collection record returned by the API server.
39 type Collection struct {
40 ManifestText string `json:"manifest_text"`
43 // ContainerRecord is the container record returned by the API server.
44 type ContainerRecord struct {
45 UUID string `json:"uuid"`
46 Command []string `json:"command"`
47 ContainerImage string `json:"container_image"`
48 Cwd string `json:"cwd"`
49 Environment map[string]string `json:"environment"`
50 Mounts map[string]Mount `json:"mounts"`
51 OutputPath string `json:"output_path"`
52 Priority int `json:"priority"`
53 RuntimeConstraints map[string]interface{} `json:"runtime_constraints"`
54 State string `json:"state"`
57 // NewLogWriter is a factory function to create a new log writer.
58 type NewLogWriter func(name string) io.WriteCloser
60 // ThinDockerClient is the minimal Docker client interface used by crunchexec.
61 type ThinDockerClient interface {
62 StopContainer(id string, timeout int) error
63 InspectImage(id string) (*dockerclient.ImageInfo, error)
64 LoadImage(reader io.Reader) error
65 CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
66 StartContainer(id string, config *dockerclient.HostConfig) error
67 ContainerLogs(id string, options *dockerclient.LogOptions) (io.ReadCloser, error)
68 Wait(id string) <-chan dockerclient.WaitResult
69 RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
72 // ContainerRunner is the main stateful struct used for a single execution of a
74 type ContainerRunner struct {
75 Docker ThinDockerClient
76 ArvClient IArvadosClient
79 dockerclient.ContainerConfig
84 CrunchLog *ThrottledLogger
85 Stdout *ThrottledLogger
86 Stderr *ThrottledLogger
87 LogCollection *CollectionWriter
91 SigChan chan os.Signal
95 // SetupSignals sets up signal handling to gracefully terminate the underlying
96 // Docker container and update state when receiving a TERM, INT or QUIT signal.
97 func (runner *ContainerRunner) SetupSignals() error {
98 runner.SigChan = make(chan os.Signal, 1)
99 signal.Notify(runner.SigChan, syscall.SIGTERM)
100 signal.Notify(runner.SigChan, syscall.SIGINT)
101 signal.Notify(runner.SigChan, syscall.SIGQUIT)
103 go func(sig <-chan os.Signal) {
105 if !runner.Cancelled {
106 runner.CancelLock.Lock()
107 runner.Cancelled = true
108 if runner.ContainerID != "" {
109 runner.Docker.StopContainer(runner.ContainerID, 10)
111 runner.CancelLock.Unlock()
119 // LoadImage determines the docker image id from the container record and
120 // checks if it is available in the local Docker image store. If not, it loads
121 // the image from Keep.
122 func (runner *ContainerRunner) LoadImage() (err error) {
124 runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.ContainerRecord.ContainerImage)
126 var collection Collection
127 err = runner.ArvClient.Get("collections", runner.ContainerRecord.ContainerImage, nil, &collection)
131 manifest := manifest.Manifest{Text: collection.ManifestText}
132 var img, imageID string
133 for ms := range manifest.StreamIter() {
134 img = ms.FileStreamSegments[0].Name
135 if !strings.HasSuffix(img, ".tar") {
136 return errors.New("First file in the collection does not end in .tar")
138 imageID = img[:len(img)-4]
141 runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
143 _, err = runner.Docker.InspectImage(imageID)
145 runner.CrunchLog.Print("Loading Docker image from keep")
147 var readCloser io.ReadCloser
148 readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
153 err = runner.Docker.LoadImage(readCloser)
158 runner.CrunchLog.Print("Docker image is available")
161 runner.ContainerConfig.Image = imageID
166 // StartContainer creates the container and runs it.
167 func (runner *ContainerRunner) StartContainer() (err error) {
168 runner.CrunchLog.Print("Creating Docker container")
170 runner.CancelLock.Lock()
171 defer runner.CancelLock.Unlock()
173 if runner.Cancelled {
177 runner.ContainerConfig.Cmd = runner.ContainerRecord.Command
178 if runner.ContainerRecord.Cwd != "." {
179 runner.ContainerConfig.WorkingDir = runner.ContainerRecord.Cwd
181 for k, v := range runner.ContainerRecord.Environment {
182 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
184 runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
188 hostConfig := &dockerclient.HostConfig{}
190 runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
191 err = runner.Docker.StartContainer(runner.ContainerID, hostConfig)
199 // AttachLogs connects the docker container stdout and stderr logs to the
200 // Arvados logger which logs to Keep and the API server logs table.
201 func (runner *ContainerRunner) AttachLogs() (err error) {
203 runner.CrunchLog.Print("Attaching container logs")
205 var stderrReader, stdoutReader io.Reader
206 stderrReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stderr: true})
210 stdoutReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stdout: true})
215 runner.loggingDone = make(chan bool)
217 runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
218 runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
219 go CopyReaderToLog(stdoutReader, runner.Stdout.Logger, runner.loggingDone)
220 go CopyReaderToLog(stderrReader, runner.Stderr.Logger, runner.loggingDone)
225 // WaitFinish waits for the container to terminate, capture the exit code, and
226 // close the stdout/stderr logging.
227 func (runner *ContainerRunner) WaitFinish() error {
228 result := runner.Docker.Wait(runner.ContainerID)
233 runner.ExitCode = &wr.ExitCode
235 // drain stdout/stderr
239 runner.Stdout.Close()
240 runner.Stderr.Close()
245 // CommitLogs posts the collection containing the final container logs.
246 func (runner *ContainerRunner) CommitLogs() error {
247 runner.CrunchLog.Print(runner.finalState)
248 runner.CrunchLog.Close()
250 // Closing CrunchLog above allows it to be committed to Keep at this
251 // point, but re-open crunch log with ArvClient in case there are any
252 // other further (such as failing to write the log to Keep!) while
254 runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID,
257 mt, err := runner.LogCollection.ManifestText()
262 response := make(map[string]string)
263 err = runner.ArvClient.Create("collections",
264 arvadosclient.Dict{"name": "logs for " + runner.ContainerRecord.UUID,
265 "manifest_text": mt},
271 runner.LogsPDH = new(string)
272 *runner.LogsPDH = response["portable_data_hash"]
277 // UpdateContainerRecordRunning updates the container state to "Running"
278 func (runner *ContainerRunner) UpdateContainerRecordRunning() error {
279 update := arvadosclient.Dict{"state": "Running"}
280 return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, update, nil)
283 // UpdateContainerRecordComplete updates the container record state on API
284 // server to "Complete" or "Cancelled"
285 func (runner *ContainerRunner) UpdateContainerRecordComplete() error {
286 update := arvadosclient.Dict{}
287 if runner.LogsPDH != nil {
288 update["log"] = *runner.LogsPDH
290 if runner.ExitCode != nil {
291 update["exit_code"] = *runner.ExitCode
294 update["state"] = runner.finalState
296 return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, update, nil)
299 // NewArvLogWriter creates an ArvLogWriter
300 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
301 return &ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID, name, runner.LogCollection.Open(name + ".txt")}
304 // Run the full container lifecycle.
305 func (runner *ContainerRunner) Run(containerUUID string) (err error) {
306 runner.CrunchLog.Printf("Executing container '%s'", containerUUID)
308 var runerr, waiterr error
312 runner.CrunchLog.Print(err)
315 if runner.Cancelled {
316 runner.finalState = "Cancelled"
318 runner.finalState = "Complete"
322 logerr := runner.CommitLogs()
324 runner.CrunchLog.Print(logerr)
327 // (7) update container record with results
328 updateerr := runner.UpdateContainerRecordComplete()
329 if updateerr != nil {
330 runner.CrunchLog.Print(updateerr)
333 runner.CrunchLog.Close()
338 } else if waiterr != nil {
340 } else if logerr != nil {
342 } else if updateerr != nil {
348 err = runner.ArvClient.Get("containers", containerUUID, nil, &runner.ContainerRecord)
353 // (0) setup signal handling
354 err = runner.SetupSignals()
359 // (1) check for and/or load image
360 err = runner.LoadImage()
365 // (2) start container
366 err = runner.StartContainer()
368 if err == ErrCancelled {
374 // (3) update container record state
375 err = runner.UpdateContainerRecordRunning()
377 runner.CrunchLog.Print(err)
380 // (4) attach container logs
381 runerr = runner.AttachLogs()
383 runner.CrunchLog.Print(runerr)
386 // (5) wait for container to finish
387 waiterr = runner.WaitFinish()
392 // NewContainerRunner creates a new container runner.
393 func NewContainerRunner(api IArvadosClient,
395 docker ThinDockerClient) *ContainerRunner {
397 cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
398 cr.NewLogWriter = cr.NewArvLogWriter
399 cr.LogCollection = &CollectionWriter{kc, nil}
400 cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunchexec"))
407 api, err := arvadosclient.MakeArvadosClient()
413 var kc *keepclient.KeepClient
414 kc, err = keepclient.MakeKeepClient(&api)
420 var docker *dockerclient.DockerClient
421 docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
426 cr := NewContainerRunner(api, kc, docker)
428 err = cr.Run(flag.Arg(0))