15 "git.arvados.org/arvados.git/sdk/go/arvados"
16 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
17 "git.arvados.org/arvados.git/sdk/go/keepclient"
18 "golang.org/x/crypto/blake2b"
19 "golang.org/x/net/websocket"
22 type arvadosContainerRunner struct {
23 Client *arvados.Client
28 Prog string // if empty, run /proc/self/exe
30 Mounts map[string]map[string]interface{}
33 func (runner *arvadosContainerRunner) Run() (string, error) {
34 if runner.ProjectUUID == "" {
35 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
38 mounts := map[string]map[string]interface{}{
42 "capacity": 100000000000,
45 for path, mnt := range runner.Mounts {
51 prog = "/mnt/cmd/lightning"
52 cmdUUID, err := runner.makeCommandCollection()
56 mounts["/mnt/cmd"] = map[string]interface{}{
61 command := append([]string{prog}, runner.Args...)
63 rc := arvados.RuntimeConstraints{
66 KeepCacheRAM: (1 << 26) * 2 * int64(runner.VCPUs),
68 var cr arvados.ContainerRequest
69 err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
70 "container_request": map[string]interface{}{
71 "owner_uuid": runner.ProjectUUID,
73 "container_image": "lightning-runtime",
77 "output_path": "/mnt/output",
78 "runtime_constraints": rc,
80 "state": arvados.ContainerRequestStateCommitted,
85 var logch <-chan string
86 var logstream *logStream
93 ticker := time.NewTicker(5 * time.Second)
98 for cr.State != arvados.ContainerRequestState(arvados.ContainerRequestStateFinal) {
99 if logch == nil && cr.ContainerUUID != subscribedUUID {
100 if logstream != nil {
103 logstream = runner.logStream(cr.ContainerUUID)
107 case msg, ok := <-logch:
118 // empty message indicates an "update" event
119 // -- fall out of the select and get the
120 // latest version now, instead of waiting for
121 // the next timer tick.
124 err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
128 if lastState != cr.State {
129 log.Printf("container state: %s", cr.State)
134 var c arvados.Container
135 err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
140 return "", fmt.Errorf("container exited %d", c.ExitCode)
142 return cr.OutputUUID, err
145 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
147 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
148 if runner.Mounts == nil {
149 runner.Mounts = make(map[string]map[string]interface{})
151 for _, path := range paths {
152 if *path == "" || *path == "-" {
155 m := collectionInPathRe.FindStringSubmatch(*path)
157 return fmt.Errorf("cannot find uuid in path: %q", *path)
160 mnt, ok := runner.Mounts["/mnt/"+uuid]
162 mnt = map[string]interface{}{
163 "kind": "collection",
166 runner.Mounts["/mnt/"+uuid] = mnt
168 *path = "/mnt/" + uuid + m[3]
173 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
174 exe, err := ioutil.ReadFile("/proc/self/exe")
178 b2 := blake2b.Sum256(exe)
179 cname := fmt.Sprintf("lightning-%x", b2)
180 var existing arvados.CollectionList
181 err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
184 Filters: []arvados.Filter{
185 {Attr: "name", Operator: "=", Operand: cname},
186 {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
192 if len(existing.Items) > 0 {
193 uuid := existing.Items[0].UUID
194 log.Printf("using lightning binary in existing collection %s (name is %q; did not verify whether content matches)", uuid, cname)
197 log.Printf("writing lightning binary to new collection %q", cname)
198 ac, err := arvadosclient.New(runner.Client)
202 kc := keepclient.New(ac)
203 var coll arvados.Collection
204 fs, err := coll.FileSystem(runner.Client, kc)
208 f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
212 _, err = f.Write(exe)
220 mtxt, err := fs.MarshalManifest(".")
224 err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
225 "collection": map[string]interface{}{
226 "owner_uuid": runner.ProjectUUID,
227 "manifest_text": mtxt,
234 log.Printf("stored lightning binary in new collection %s", coll.UUID)
235 return coll.UUID, nil
238 type logStream struct {
243 func (runner *arvadosContainerRunner) logStream(uuid string) *logStream {
244 ch := make(chan string)
245 done := make(chan struct{})
248 var cluster arvados.Cluster
249 runner.Client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
250 wsURL := cluster.Services.Websocket.ExternalURL
251 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
252 wsURL.Path = "/websocket"
253 wsURL.RawQuery = url.Values{"api_token": []string{runner.Client.AuthToken}}.Encode()
254 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
256 ch <- fmt.Sprintf("websocket error: %s", err)
259 w := json.NewEncoder(conn)
260 go w.Encode(map[string]interface{}{
261 "method": "subscribe",
262 "filters": [][]interface{}{
263 {"object_uuid", "=", uuid},
264 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
267 r := json.NewDecoder(conn)
271 ObjectUUID string `json:"object_uuid"`
272 EventType string `json:"event_type"`
277 err := r.Decode(&msg)
279 log.Printf("error decoding websocket message: %s", err)
282 if msg.ObjectUUID == uuid {
283 for _, line := range strings.Split(msg.Properties.Text, "\n") {
288 if msg.EventType == "update" {
301 Close: func() error {