package main
import (
+ "encoding/json"
"errors"
"fmt"
"io/ioutil"
- "log"
+ "net/url"
"os"
"regexp"
+ "strings"
+ "time"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/keepclient"
+ log "github.com/sirupsen/logrus"
"golang.org/x/crypto/blake2b"
+ "golang.org/x/net/websocket"
)
type arvadosContainerRunner struct {
Client *arvados.Client
Name string
ProjectUUID string
+ VCPUs int
+ RAM int64
+ Prog string // if empty, run /proc/self/exe
Args []string
- Mounts map[string]string
+ Mounts map[string]map[string]interface{}
+ Priority int
}
-var (
- collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
-)
-
-func (runner *arvadosContainerRunner) Run() error {
+func (runner *arvadosContainerRunner) Run() (string, error) {
if runner.ProjectUUID == "" {
- return errors.New("cannot run arvados container: ProjectUUID not provided")
- }
- prog := "/mnt/cmd/lightning"
- cmdUUID, err := runner.makeCommandCollection()
- if err != nil {
- return err
+ return "", errors.New("cannot run arvados container: ProjectUUID not provided")
}
- command := append([]string{prog}, runner.Args...)
+
mounts := map[string]map[string]interface{}{
- "/mnt/cmd": {
- "kind": "collection",
- "uuid": cmdUUID,
- },
"/mnt/output": {
"kind": "tmp",
"writable": true,
"capacity": 100000000000,
},
}
- for uuid, mnt := range runner.Mounts {
- mounts[mnt] = map[string]interface{}{
+ for path, mnt := range runner.Mounts {
+ mounts[path] = mnt
+ }
+
+ prog := runner.Prog
+ if prog == "" {
+ prog = "/mnt/cmd/lightning"
+ cmdUUID, err := runner.makeCommandCollection()
+ if err != nil {
+ return "", err
+ }
+ mounts["/mnt/cmd"] = map[string]interface{}{
"kind": "collection",
- "uuid": uuid,
+ "uuid": cmdUUID,
}
}
- cpus := 16
+ command := append([]string{prog}, runner.Args...)
+
+ priority := runner.Priority
+ if priority < 1 {
+ priority = 500
+ }
rc := arvados.RuntimeConstraints{
- VCPUs: cpus,
- RAM: 64000000000,
- KeepCacheRAM: (1 << 26) * 2 * int64(cpus),
+ VCPUs: runner.VCPUs,
+ RAM: runner.RAM,
+ KeepCacheRAM: (1 << 26) * 2 * int64(runner.VCPUs),
}
var cr arvados.ContainerRequest
- err = runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
+ err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
"container_request": map[string]interface{}{
"owner_uuid": runner.ProjectUUID,
"name": runner.Name,
"use_existing": true,
"output_path": "/mnt/output",
"runtime_constraints": rc,
- "priority": 1,
+ "priority": runner.Priority,
"state": arvados.ContainerRequestStateCommitted,
},
})
- log.Print(cr.UUID)
- return err
+ log.Printf("container request UUID: %s", cr.UUID)
+ log.Printf("container UUID: %s", cr.ContainerUUID)
+
+ var logch <-chan eventMessage
+ var logstream *logStream
+ defer func() {
+ if logstream != nil {
+ logstream.Close()
+ }
+ }()
+
+ ticker := time.NewTicker(5 * time.Second)
+ defer ticker.Stop()
+
+ lastState := cr.State
+ refreshCR := func() {
+ err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
+ if err != nil {
+ log.Printf("error getting container request: %s", err)
+ return
+ }
+ if lastState != cr.State {
+ log.Printf("container state: %s", cr.State)
+ lastState = cr.State
+ }
+ }
+
+ subscribedUUID := ""
+ for cr.State != arvados.ContainerRequestStateFinal {
+ if logch == nil && cr.ContainerUUID != subscribedUUID {
+ if logstream != nil {
+ logstream.Close()
+ }
+ logstream = runner.logStream(cr.ContainerUUID)
+ logch = logstream.C
+ }
+ select {
+ case msg, ok := <-logch:
+ if !ok {
+ logstream.Close()
+ logstream = nil
+ logch = nil
+ break
+ }
+ switch msg.EventType {
+ case "update":
+ refreshCR()
+ default:
+ for _, line := range strings.Split(msg.Properties.Text, "\n") {
+ if line != "" {
+ log.Print(line)
+ }
+ }
+ }
+ case <-ticker.C:
+ refreshCR()
+ }
+ }
+
+ var c arvados.Container
+ err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
+ if err != nil {
+ return "", err
+ }
+ if c.ExitCode != 0 {
+ return "", fmt.Errorf("container exited %d", c.ExitCode)
+ }
+ return cr.OutputUUID, err
}
+var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
+
func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
if runner.Mounts == nil {
- runner.Mounts = make(map[string]string)
+ runner.Mounts = make(map[string]map[string]interface{})
}
for _, path := range paths {
- if *path == "" {
+ if *path == "" || *path == "-" {
continue
}
m := collectionInPathRe.FindStringSubmatch(*path)
return fmt.Errorf("cannot find uuid in path: %q", *path)
}
uuid := m[2]
- mnt, ok := runner.Mounts[uuid]
+ mnt, ok := runner.Mounts["/mnt/"+uuid]
if !ok {
- mnt = "/mnt/" + uuid
- runner.Mounts[uuid] = mnt
+ mnt = map[string]interface{}{
+ "kind": "collection",
+ "uuid": uuid,
+ }
+ runner.Mounts["/mnt/"+uuid] = mnt
}
- *path = mnt + m[3]
+ *path = "/mnt/" + uuid + m[3]
}
return nil
}
Limit: 1,
Count: "none",
Filters: []arvados.Filter{
- {"name", "=", cname},
- {"owner_uuid", "=", runner.ProjectUUID},
+ {Attr: "name", Operator: "=", Operand: cname},
+ {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
},
})
if err != nil {
}
if len(existing.Items) > 0 {
uuid := existing.Items[0].UUID
- log.Printf("using existing collection %q named %q (did not verify whether content matches)", uuid, cname)
+ log.Printf("using lightning binary in existing collection %s (name is %q; did not verify whether content matches)", uuid, cname)
return uuid, nil
}
log.Printf("writing lightning binary to new collection %q", cname)
if err != nil {
return "", err
}
- log.Printf("collection: %#v", coll)
+ log.Printf("stored lightning binary in new collection %s", coll.UUID)
return coll.UUID, nil
}
+
+type eventMessage struct {
+ Status int
+ ObjectUUID string `json:"object_uuid"`
+ EventType string `json:"event_type"`
+ Properties struct {
+ Text string
+ }
+}
+
+type logStream struct {
+ C <-chan eventMessage
+ Close func() error
+}
+
+func (runner *arvadosContainerRunner) logStream(uuid string) *logStream {
+ ch := make(chan eventMessage)
+ done := make(chan struct{})
+ go func() {
+ defer close(ch)
+ var cluster arvados.Cluster
+ runner.Client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
+ wsURL := cluster.Services.Websocket.ExternalURL
+ wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
+ wsURL.Path = "/websocket"
+ wsURL.RawQuery = url.Values{"api_token": []string{runner.Client.AuthToken}}.Encode()
+ conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
+ if err != nil {
+ log.Printf("websocket error: %s", err)
+ return
+ }
+ w := json.NewEncoder(conn)
+ go w.Encode(map[string]interface{}{
+ "method": "subscribe",
+ "filters": [][]interface{}{
+ {"object_uuid", "=", uuid},
+ {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
+ },
+ })
+ r := json.NewDecoder(conn)
+ for {
+ var msg eventMessage
+ err := r.Decode(&msg)
+ if err != nil {
+ log.Printf("error decoding websocket message: %s", err)
+ return
+ }
+ if msg.ObjectUUID == uuid {
+ ch <- msg
+ }
+ select {
+ case <-done:
+ return
+ default:
+ }
+ }
+ }()
+ return &logStream{
+ C: ch,
+ Close: func() error {
+ close(done)
+ return nil
+ },
+ }
+}