16 "git.arvados.org/arvados.git/sdk/go/arvados"
17 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
18 "git.arvados.org/arvados.git/sdk/go/keepclient"
19 log "github.com/sirupsen/logrus"
20 "golang.org/x/crypto/blake2b"
21 "golang.org/x/net/websocket"
24 type eventMessage struct {
26 ObjectUUID string `json:"object_uuid"`
27 EventType string `json:"event_type"`
33 type arvadosClient struct {
35 notifying map[string]map[chan<- eventMessage]int
36 wantClose chan struct{}
37 wsconn *websocket.Conn
41 // Listen for events concerning the given uuids. When an event occurs
42 // (and after connecting/reconnecting to the event stream), send each
43 // uuid to ch. If a {ch, uuid} pair is subscribed twice, the uuid will
44 // be sent only once for each update, but two Unsubscribe calls will
45 // be needed to stop sending them.
46 func (client *arvadosClient) Subscribe(ch chan<- eventMessage, uuid string) {
48 defer client.mtx.Unlock()
49 if client.notifying == nil {
50 client.notifying = map[string]map[chan<- eventMessage]int{}
51 client.wantClose = make(chan struct{})
52 go client.runNotifier()
54 chmap := client.notifying[uuid]
56 chmap = map[chan<- eventMessage]int{}
57 client.notifying[uuid] = chmap
60 for _, nch := range chmap {
67 if needSub && client.wsconn != nil {
68 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
69 "method": "subscribe",
70 "filters": [][]interface{}{
71 {"object_uuid", "=", uuid},
72 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
78 func (client *arvadosClient) Unsubscribe(ch chan<- eventMessage, uuid string) {
80 defer client.mtx.Unlock()
81 chmap := client.notifying[uuid]
82 if n := chmap[ch] - 1; n == 0 {
85 delete(client.notifying, uuid)
87 if client.wsconn != nil {
88 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
89 "method": "unsubscribe",
90 "filters": [][]interface{}{
91 {"object_uuid", "=", uuid},
92 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
101 func (client *arvadosClient) Close() {
103 defer client.mtx.Unlock()
104 if client.notifying != nil {
105 client.notifying = nil
106 close(client.wantClose)
110 func (client *arvadosClient) runNotifier() {
113 var cluster arvados.Cluster
114 err := client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
116 log.Warnf("error getting cluster config: %s", err)
117 time.Sleep(5 * time.Second)
120 wsURL := cluster.Services.Websocket.ExternalURL
121 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
122 wsURL.Path = "/websocket"
123 wsURLNoToken := wsURL.String()
124 wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode()
125 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
127 log.Warnf("websocket connection error: %s", err)
128 time.Sleep(5 * time.Second)
131 log.Printf("connected to websocket at %s", wsURLNoToken)
135 resubscribe := make([]string, 0, len(client.notifying))
136 for uuid := range client.notifying {
137 resubscribe = append(resubscribe, uuid)
142 w := json.NewEncoder(conn)
143 for _, uuid := range resubscribe {
144 w.Encode(map[string]interface{}{
145 "method": "subscribe",
146 "filters": [][]interface{}{
147 {"object_uuid", "=", uuid},
148 {"event_type", "in", []string{"stderr", "crunch-run", "crunchstat", "update"}},
154 r := json.NewDecoder(conn)
157 err := r.Decode(&msg)
159 case <-client.wantClose:
163 log.Printf("error decoding websocket message: %s", err)
171 for ch := range client.notifying[msg.ObjectUUID] {
180 type arvadosContainerRunner struct {
181 Client *arvados.Client
187 Prog string // if empty, run /proc/self/exe
189 Mounts map[string]map[string]interface{}
193 func (runner *arvadosContainerRunner) Run() (string, error) {
194 return runner.RunContext(context.Background())
197 func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, error) {
198 if runner.ProjectUUID == "" {
199 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
202 mounts := map[string]map[string]interface{}{
204 "kind": "collection",
208 for path, mnt := range runner.Mounts {
214 prog = "/mnt/cmd/lightning"
215 cmdUUID, err := runner.makeCommandCollection()
219 mounts["/mnt/cmd"] = map[string]interface{}{
220 "kind": "collection",
224 command := append([]string{prog}, runner.Args...)
226 priority := runner.Priority
230 rc := arvados.RuntimeConstraints{
233 KeepCacheRAM: (1 << 26) * 2 * int64(runner.VCPUs),
235 outname := &runner.OutputName
239 var cr arvados.ContainerRequest
240 err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
241 "container_request": map[string]interface{}{
242 "owner_uuid": runner.ProjectUUID,
244 "container_image": "lightning-runtime",
247 "use_existing": true,
248 "output_path": "/mnt/output",
249 "output_name": outname,
250 "runtime_constraints": rc,
251 "priority": runner.Priority,
252 "state": arvados.ContainerRequestStateCommitted,
258 log.Printf("container request UUID: %s", cr.UUID)
259 log.Printf("container UUID: %s", cr.ContainerUUID)
261 logch := make(chan eventMessage)
262 client := arvadosClient{Client: runner.Client}
266 if subscribedUUID != "" {
267 client.Unsubscribe(logch, subscribedUUID)
271 ticker := time.NewTicker(5 * time.Second)
276 lastState := cr.State
277 refreshCR := func() {
278 err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
280 fmt.Fprint(os.Stderr, neednewline)
281 log.Printf("error getting container request: %s", err)
284 if lastState != cr.State {
285 fmt.Fprint(os.Stderr, neednewline)
286 log.Printf("container request state: %s", cr.State)
289 if subscribedUUID != cr.ContainerUUID {
290 fmt.Fprint(os.Stderr, neednewline)
292 if subscribedUUID != "" {
293 client.Unsubscribe(logch, subscribedUUID)
295 client.Subscribe(logch, cr.ContainerUUID)
296 subscribedUUID = cr.ContainerUUID
300 var reCrunchstat = regexp.MustCompile(`mem .* rss`)
302 for cr.State != arvados.ContainerRequestStateFinal {
305 err := runner.Client.RequestAndDecode(&cr, "PATCH", "arvados/v1/container_requests/"+cr.UUID, nil, map[string]interface{}{
306 "container_request": map[string]interface{}{
311 log.Errorf("error while trying to cancel container request %s: %s", cr.UUID, err)
317 switch msg.EventType {
321 for _, line := range strings.Split(msg.Properties.Text, "\n") {
323 fmt.Fprint(os.Stderr, neednewline)
329 for _, line := range strings.Split(msg.Properties.Text, "\n") {
330 mem := reCrunchstat.FindString(line)
332 fmt.Fprintf(os.Stderr, "%s \r", mem)
339 fmt.Fprint(os.Stderr, neednewline)
341 if err := ctx.Err(); err != nil {
345 var c arvados.Container
346 err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
349 } else if c.State != arvados.ContainerStateComplete {
350 return "", fmt.Errorf("container did not complete: %s", c.State)
351 } else if c.ExitCode != 0 {
352 return "", fmt.Errorf("container exited %d", c.ExitCode)
354 return cr.OutputUUID, err
357 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
359 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
360 if runner.Mounts == nil {
361 runner.Mounts = make(map[string]map[string]interface{})
363 for _, path := range paths {
364 if *path == "" || *path == "-" {
367 m := collectionInPathRe.FindStringSubmatch(*path)
369 return fmt.Errorf("cannot find uuid in path: %q", *path)
372 mnt, ok := runner.Mounts["/mnt/"+uuid]
374 mnt = map[string]interface{}{
375 "kind": "collection",
378 runner.Mounts["/mnt/"+uuid] = mnt
380 *path = "/mnt/" + uuid + m[3]
385 var mtxMakeCommandCollection sync.Mutex
387 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
388 mtxMakeCommandCollection.Lock()
389 defer mtxMakeCommandCollection.Unlock()
390 exe, err := ioutil.ReadFile("/proc/self/exe")
394 b2 := blake2b.Sum256(exe)
395 cname := fmt.Sprintf("lightning-%x", b2)
396 var existing arvados.CollectionList
397 err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
400 Filters: []arvados.Filter{
401 {Attr: "name", Operator: "=", Operand: cname},
402 {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
408 if len(existing.Items) > 0 {
409 uuid := existing.Items[0].UUID
410 log.Printf("using lightning binary in existing collection %s (name is %q; did not verify whether content matches)", uuid, cname)
413 log.Printf("writing lightning binary to new collection %q", cname)
414 ac, err := arvadosclient.New(runner.Client)
418 kc := keepclient.New(ac)
419 var coll arvados.Collection
420 fs, err := coll.FileSystem(runner.Client, kc)
424 f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
428 _, err = f.Write(exe)
436 mtxt, err := fs.MarshalManifest(".")
440 err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
441 "collection": map[string]interface{}{
442 "owner_uuid": runner.ProjectUUID,
443 "manifest_text": mtxt,
450 log.Printf("stored lightning binary in new collection %s", coll.UUID)
451 return coll.UUID, nil