15 "git.arvados.org/arvados.git/sdk/go/arvados"
16 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
17 "git.arvados.org/arvados.git/sdk/go/keepclient"
18 log "github.com/sirupsen/logrus"
19 "golang.org/x/crypto/blake2b"
20 "golang.org/x/net/websocket"
23 type eventMessage struct {
25 ObjectUUID string `json:"object_uuid"`
26 EventType string `json:"event_type"`
32 type arvadosClient struct {
34 notifying map[string]map[chan<- eventMessage]int
35 wantClose chan struct{}
36 wsconn *websocket.Conn
40 // Listen for events concerning the given uuids. When an event occurs
41 // (and after connecting/reconnecting to the event stream), send each
42 // uuid to ch. If a {ch, uuid} pair is subscribed twice, the uuid will
43 // be sent only once for each update, but two Unsubscribe calls will
44 // be needed to stop sending them.
45 func (client *arvadosClient) Subscribe(ch chan<- eventMessage, uuid string) {
47 defer client.mtx.Unlock()
48 if client.notifying == nil {
49 client.notifying = map[string]map[chan<- eventMessage]int{}
50 client.wantClose = make(chan struct{})
51 go client.runNotifier()
53 chmap := client.notifying[uuid]
55 chmap = map[chan<- eventMessage]int{}
56 client.notifying[uuid] = chmap
59 for _, nch := range chmap {
66 if needSub && client.wsconn != nil {
67 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
68 "method": "subscribe",
69 "filters": [][]interface{}{
70 {"object_uuid", "=", uuid},
71 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
77 func (client *arvadosClient) Unsubscribe(ch chan<- eventMessage, uuid string) {
79 defer client.mtx.Unlock()
80 chmap := client.notifying[uuid]
81 if n := chmap[ch] - 1; n == 0 {
84 delete(client.notifying, uuid)
86 if client.wsconn != nil {
87 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
88 "method": "unsubscribe",
89 "filters": [][]interface{}{
90 {"object_uuid", "=", uuid},
91 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
100 func (client *arvadosClient) Close() {
102 defer client.mtx.Unlock()
103 if client.notifying != nil {
104 client.notifying = nil
105 close(client.wantClose)
109 func (client *arvadosClient) runNotifier() {
112 var cluster arvados.Cluster
113 err := client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
115 log.Warnf("error getting cluster config: %s", err)
116 time.Sleep(5 * time.Second)
119 wsURL := cluster.Services.Websocket.ExternalURL
120 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
121 wsURL.Path = "/websocket"
122 wsURLNoToken := wsURL.String()
123 wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode()
124 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
126 log.Warnf("websocket connection error: %s", err)
127 time.Sleep(5 * time.Second)
130 log.Printf("connected to websocket at %s", wsURLNoToken)
134 resubscribe := make([]string, 0, len(client.notifying))
135 for uuid := range client.notifying {
136 resubscribe = append(resubscribe, uuid)
141 w := json.NewEncoder(conn)
142 for _, uuid := range resubscribe {
143 w.Encode(map[string]interface{}{
144 "method": "subscribe",
145 "filters": [][]interface{}{
146 {"object_uuid", "=", uuid},
147 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
153 r := json.NewDecoder(conn)
156 err := r.Decode(&msg)
158 case <-client.wantClose:
162 log.Printf("error decoding websocket message: %s", err)
170 for ch := range client.notifying[msg.ObjectUUID] {
179 type arvadosContainerRunner struct {
180 Client *arvados.Client
185 Prog string // if empty, run /proc/self/exe
187 Mounts map[string]map[string]interface{}
191 func (runner *arvadosContainerRunner) Run() (string, error) {
192 if runner.ProjectUUID == "" {
193 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
196 mounts := map[string]map[string]interface{}{
198 "kind": "collection",
202 for path, mnt := range runner.Mounts {
208 prog = "/mnt/cmd/lightning"
209 cmdUUID, err := runner.makeCommandCollection()
213 mounts["/mnt/cmd"] = map[string]interface{}{
214 "kind": "collection",
218 command := append([]string{prog}, runner.Args...)
220 priority := runner.Priority
224 rc := arvados.RuntimeConstraints{
227 KeepCacheRAM: (1 << 26) * 2 * int64(runner.VCPUs),
229 var cr arvados.ContainerRequest
230 err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
231 "container_request": map[string]interface{}{
232 "owner_uuid": runner.ProjectUUID,
234 "container_image": "lightning-runtime",
237 "use_existing": true,
238 "output_path": "/mnt/output",
239 "runtime_constraints": rc,
240 "priority": runner.Priority,
241 "state": arvados.ContainerRequestStateCommitted,
247 log.Printf("container request UUID: %s", cr.UUID)
248 log.Printf("container UUID: %s", cr.ContainerUUID)
250 logch := make(chan eventMessage)
251 client := arvadosClient{Client: runner.Client}
255 if subscribedUUID != "" {
256 client.Unsubscribe(logch, subscribedUUID)
260 ticker := time.NewTicker(5 * time.Second)
263 lastState := cr.State
264 refreshCR := func() {
265 err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
267 log.Printf("error getting container request: %s", err)
270 if lastState != cr.State {
271 log.Printf("container request state: %s", cr.State)
274 if subscribedUUID != cr.ContainerUUID {
275 if subscribedUUID != "" {
276 client.Unsubscribe(logch, subscribedUUID)
278 client.Subscribe(logch, cr.ContainerUUID)
279 subscribedUUID = cr.ContainerUUID
283 for cr.State != arvados.ContainerRequestStateFinal {
288 switch msg.EventType {
292 for _, line := range strings.Split(msg.Properties.Text, "\n") {
301 var c arvados.Container
302 err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
305 } else if c.State != arvados.ContainerStateComplete {
306 return "", fmt.Errorf("container did not complete: %s", c.State)
307 } else if c.ExitCode != 0 {
308 return "", fmt.Errorf("container exited %d", c.ExitCode)
310 return cr.OutputUUID, err
313 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
315 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
316 if runner.Mounts == nil {
317 runner.Mounts = make(map[string]map[string]interface{})
319 for _, path := range paths {
320 if *path == "" || *path == "-" {
323 m := collectionInPathRe.FindStringSubmatch(*path)
325 return fmt.Errorf("cannot find uuid in path: %q", *path)
328 mnt, ok := runner.Mounts["/mnt/"+uuid]
330 mnt = map[string]interface{}{
331 "kind": "collection",
334 runner.Mounts["/mnt/"+uuid] = mnt
336 *path = "/mnt/" + uuid + m[3]
341 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
342 exe, err := ioutil.ReadFile("/proc/self/exe")
346 b2 := blake2b.Sum256(exe)
347 cname := fmt.Sprintf("lightning-%x", b2)
348 var existing arvados.CollectionList
349 err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
352 Filters: []arvados.Filter{
353 {Attr: "name", Operator: "=", Operand: cname},
354 {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
360 if len(existing.Items) > 0 {
361 uuid := existing.Items[0].UUID
362 log.Printf("using lightning binary in existing collection %s (name is %q; did not verify whether content matches)", uuid, cname)
365 log.Printf("writing lightning binary to new collection %q", cname)
366 ac, err := arvadosclient.New(runner.Client)
370 kc := keepclient.New(ac)
371 var coll arvados.Collection
372 fs, err := coll.FileSystem(runner.Client, kc)
376 f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
380 _, err = f.Write(exe)
388 mtxt, err := fs.MarshalManifest(".")
392 err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
393 "collection": map[string]interface{}{
394 "owner_uuid": runner.ProjectUUID,
395 "manifest_text": mtxt,
402 log.Printf("stored lightning binary in new collection %s", coll.UUID)
403 return coll.UUID, nil