14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
16 "git.arvados.org/arvados.git/sdk/go/keepclient"
17 log "github.com/sirupsen/logrus"
18 "golang.org/x/crypto/blake2b"
19 "golang.org/x/net/websocket"
22 type arvadosContainerRunner struct {
23 Client *arvados.Client
28 Prog string // if empty, run /proc/self/exe
30 Mounts map[string]map[string]interface{}
34 func (runner *arvadosContainerRunner) Run() (string, error) {
35 if runner.ProjectUUID == "" {
36 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
39 mounts := map[string]map[string]interface{}{
43 "capacity": 100000000000,
46 for path, mnt := range runner.Mounts {
52 prog = "/mnt/cmd/lightning"
53 cmdUUID, err := runner.makeCommandCollection()
57 mounts["/mnt/cmd"] = map[string]interface{}{
62 command := append([]string{prog}, runner.Args...)
64 priority := runner.Priority
68 rc := arvados.RuntimeConstraints{
71 KeepCacheRAM: (1 << 26) * 2 * int64(runner.VCPUs),
73 var cr arvados.ContainerRequest
74 err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
75 "container_request": map[string]interface{}{
76 "owner_uuid": runner.ProjectUUID,
78 "container_image": "lightning-runtime",
82 "output_path": "/mnt/output",
83 "runtime_constraints": rc,
84 "priority": runner.Priority,
85 "state": arvados.ContainerRequestStateCommitted,
88 log.Printf("container request UUID: %s", cr.UUID)
89 log.Printf("container UUID: %s", cr.ContainerUUID)
91 var logch <-chan eventMessage
92 var logstream *logStream
99 ticker := time.NewTicker(5 * time.Second)
102 lastState := cr.State
103 refreshCR := func() {
104 err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
106 log.Printf("error getting container request: %s", err)
109 if lastState != cr.State {
110 log.Printf("container state: %s", cr.State)
116 for cr.State != arvados.ContainerRequestStateFinal {
117 if logch == nil && cr.ContainerUUID != subscribedUUID {
118 if logstream != nil {
121 logstream = runner.logStream(cr.ContainerUUID)
125 case msg, ok := <-logch:
132 switch msg.EventType {
136 for _, line := range strings.Split(msg.Properties.Text, "\n") {
147 var c arvados.Container
148 err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
153 return "", fmt.Errorf("container exited %d", c.ExitCode)
155 return cr.OutputUUID, err
158 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
160 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
161 if runner.Mounts == nil {
162 runner.Mounts = make(map[string]map[string]interface{})
164 for _, path := range paths {
165 if *path == "" || *path == "-" {
168 m := collectionInPathRe.FindStringSubmatch(*path)
170 return fmt.Errorf("cannot find uuid in path: %q", *path)
173 mnt, ok := runner.Mounts["/mnt/"+uuid]
175 mnt = map[string]interface{}{
176 "kind": "collection",
179 runner.Mounts["/mnt/"+uuid] = mnt
181 *path = "/mnt/" + uuid + m[3]
186 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
187 exe, err := ioutil.ReadFile("/proc/self/exe")
191 b2 := blake2b.Sum256(exe)
192 cname := fmt.Sprintf("lightning-%x", b2)
193 var existing arvados.CollectionList
194 err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
197 Filters: []arvados.Filter{
198 {Attr: "name", Operator: "=", Operand: cname},
199 {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
205 if len(existing.Items) > 0 {
206 uuid := existing.Items[0].UUID
207 log.Printf("using lightning binary in existing collection %s (name is %q; did not verify whether content matches)", uuid, cname)
210 log.Printf("writing lightning binary to new collection %q", cname)
211 ac, err := arvadosclient.New(runner.Client)
215 kc := keepclient.New(ac)
216 var coll arvados.Collection
217 fs, err := coll.FileSystem(runner.Client, kc)
221 f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
225 _, err = f.Write(exe)
233 mtxt, err := fs.MarshalManifest(".")
237 err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
238 "collection": map[string]interface{}{
239 "owner_uuid": runner.ProjectUUID,
240 "manifest_text": mtxt,
247 log.Printf("stored lightning binary in new collection %s", coll.UUID)
248 return coll.UUID, nil
251 type eventMessage struct {
253 ObjectUUID string `json:"object_uuid"`
254 EventType string `json:"event_type"`
260 type logStream struct {
261 C <-chan eventMessage
265 func (runner *arvadosContainerRunner) logStream(uuid string) *logStream {
266 ch := make(chan eventMessage)
267 done := make(chan struct{})
270 var cluster arvados.Cluster
271 runner.Client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
272 wsURL := cluster.Services.Websocket.ExternalURL
273 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
274 wsURL.Path = "/websocket"
275 wsURL.RawQuery = url.Values{"api_token": []string{runner.Client.AuthToken}}.Encode()
276 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
278 log.Printf("websocket error: %s", err)
281 w := json.NewEncoder(conn)
282 go w.Encode(map[string]interface{}{
283 "method": "subscribe",
284 "filters": [][]interface{}{
285 {"object_uuid", "=", uuid},
286 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
289 r := json.NewDecoder(conn)
292 err := r.Decode(&msg)
294 log.Printf("error decoding websocket message: %s", err)
297 if msg.ObjectUUID == uuid {
309 Close: func() error {