14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
16 "git.arvados.org/arvados.git/sdk/go/keepclient"
17 log "github.com/sirupsen/logrus"
18 "golang.org/x/crypto/blake2b"
19 "golang.org/x/net/websocket"
22 type arvadosContainerRunner struct {
23 Client *arvados.Client
28 Prog string // if empty, run /proc/self/exe
30 Mounts map[string]map[string]interface{}
33 func (runner *arvadosContainerRunner) Run() (string, error) {
34 if runner.ProjectUUID == "" {
35 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
38 mounts := map[string]map[string]interface{}{
42 "capacity": 100000000000,
45 for path, mnt := range runner.Mounts {
51 prog = "/mnt/cmd/lightning"
52 cmdUUID, err := runner.makeCommandCollection()
56 mounts["/mnt/cmd"] = map[string]interface{}{
61 command := append([]string{prog}, runner.Args...)
63 rc := arvados.RuntimeConstraints{
66 KeepCacheRAM: (1 << 26) * 2 * int64(runner.VCPUs),
68 var cr arvados.ContainerRequest
69 err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
70 "container_request": map[string]interface{}{
71 "owner_uuid": runner.ProjectUUID,
73 "container_image": "lightning-runtime",
77 "output_path": "/mnt/output",
78 "runtime_constraints": rc,
80 "state": arvados.ContainerRequestStateCommitted,
83 log.Printf("container request UUID: %s", cr.UUID)
84 log.Printf("container UUID: %s", cr.ContainerUUID)
86 var logch <-chan eventMessage
87 var logstream *logStream
94 ticker := time.NewTicker(5 * time.Second)
99 err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
101 log.Printf("error getting container request: %s", err)
104 if lastState != cr.State {
105 log.Printf("container state: %s", cr.State)
111 for cr.State != arvados.ContainerRequestStateFinal {
112 if logch == nil && cr.ContainerUUID != subscribedUUID {
113 if logstream != nil {
116 logstream = runner.logStream(cr.ContainerUUID)
120 case msg, ok := <-logch:
127 switch msg.EventType {
131 for _, line := range strings.Split(msg.Properties.Text, "\n") {
142 var c arvados.Container
143 err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
148 return "", fmt.Errorf("container exited %d", c.ExitCode)
150 return cr.OutputUUID, err
153 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
155 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
156 if runner.Mounts == nil {
157 runner.Mounts = make(map[string]map[string]interface{})
159 for _, path := range paths {
160 if *path == "" || *path == "-" {
163 m := collectionInPathRe.FindStringSubmatch(*path)
165 return fmt.Errorf("cannot find uuid in path: %q", *path)
168 mnt, ok := runner.Mounts["/mnt/"+uuid]
170 mnt = map[string]interface{}{
171 "kind": "collection",
174 runner.Mounts["/mnt/"+uuid] = mnt
176 *path = "/mnt/" + uuid + m[3]
181 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
182 exe, err := ioutil.ReadFile("/proc/self/exe")
186 b2 := blake2b.Sum256(exe)
187 cname := fmt.Sprintf("lightning-%x", b2)
188 var existing arvados.CollectionList
189 err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
192 Filters: []arvados.Filter{
193 {Attr: "name", Operator: "=", Operand: cname},
194 {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
200 if len(existing.Items) > 0 {
201 uuid := existing.Items[0].UUID
202 log.Printf("using lightning binary in existing collection %s (name is %q; did not verify whether content matches)", uuid, cname)
205 log.Printf("writing lightning binary to new collection %q", cname)
206 ac, err := arvadosclient.New(runner.Client)
210 kc := keepclient.New(ac)
211 var coll arvados.Collection
212 fs, err := coll.FileSystem(runner.Client, kc)
216 f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
220 _, err = f.Write(exe)
228 mtxt, err := fs.MarshalManifest(".")
232 err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
233 "collection": map[string]interface{}{
234 "owner_uuid": runner.ProjectUUID,
235 "manifest_text": mtxt,
242 log.Printf("stored lightning binary in new collection %s", coll.UUID)
243 return coll.UUID, nil
246 type eventMessage struct {
248 ObjectUUID string `json:"object_uuid"`
249 EventType string `json:"event_type"`
255 type logStream struct {
256 C <-chan eventMessage
260 func (runner *arvadosContainerRunner) logStream(uuid string) *logStream {
261 ch := make(chan eventMessage)
262 done := make(chan struct{})
265 var cluster arvados.Cluster
266 runner.Client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
267 wsURL := cluster.Services.Websocket.ExternalURL
268 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
269 wsURL.Path = "/websocket"
270 wsURL.RawQuery = url.Values{"api_token": []string{runner.Client.AuthToken}}.Encode()
271 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
273 log.Printf("websocket error: %s", err)
276 w := json.NewEncoder(conn)
277 go w.Encode(map[string]interface{}{
278 "method": "subscribe",
279 "filters": [][]interface{}{
280 {"object_uuid", "=", uuid},
281 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
284 r := json.NewDecoder(conn)
287 err := r.Decode(&msg)
289 log.Printf("error decoding websocket message: %s", err)
302 Close: func() error {