X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/a0d7eb079fe583a9c8895a848cd3a01f786a9a4c..ee4f69c81672248fd775c6b2adde9f62dbfdb349:/arvados.go diff --git a/arvados.go b/arvados.go index 89d23c7e31..48cfad7ccd 100644 --- a/arvados.go +++ b/arvados.go @@ -1,52 +1,227 @@ -package main +package lightning import ( + "context" + "encoding/json" "errors" "fmt" + "io" "io/ioutil" - "log" + "net/url" "os" "regexp" + "runtime" + "strings" + "sync" + "time" + "git.arvados.org/arvados.git/lib/cmd" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/keepclient" + log "github.com/sirupsen/logrus" "golang.org/x/crypto/blake2b" + "golang.org/x/net/websocket" ) +type eventMessage struct { + Status int + ObjectUUID string `json:"object_uuid"` + EventType string `json:"event_type"` + Properties struct { + Text string + } +} + +type arvadosClient struct { + *arvados.Client + notifying map[string]map[chan<- eventMessage]int + wantClose chan struct{} + wsconn *websocket.Conn + mtx sync.Mutex +} + +// Listen for events concerning the given uuids. When an event occurs +// (and after connecting/reconnecting to the event stream), send each +// uuid to ch. If a {ch, uuid} pair is subscribed twice, the uuid will +// be sent only once for each update, but two Unsubscribe calls will +// be needed to stop sending them. +func (client *arvadosClient) Subscribe(ch chan<- eventMessage, uuid string) { + client.mtx.Lock() + defer client.mtx.Unlock() + if client.notifying == nil { + client.notifying = map[string]map[chan<- eventMessage]int{} + client.wantClose = make(chan struct{}) + go client.runNotifier() + } + chmap := client.notifying[uuid] + if chmap == nil { + chmap = map[chan<- eventMessage]int{} + client.notifying[uuid] = chmap + } + needSub := true + for _, nch := range chmap { + if nch > 0 { + needSub = false + break + } + } + chmap[ch]++ + if needSub && client.wsconn != nil { + go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{ + "method": "subscribe", + "filters": [][]interface{}{ + {"object_uuid", "=", uuid}, + {"event_type", "in", []string{"stderr", "crunch-run", "update"}}, + }, + }) + } +} + +func (client *arvadosClient) Unsubscribe(ch chan<- eventMessage, uuid string) { + client.mtx.Lock() + defer client.mtx.Unlock() + chmap := client.notifying[uuid] + if n := chmap[ch] - 1; n == 0 { + delete(chmap, ch) + if len(chmap) == 0 { + delete(client.notifying, uuid) + } + if client.wsconn != nil { + go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{ + "method": "unsubscribe", + "filters": [][]interface{}{ + {"object_uuid", "=", uuid}, + {"event_type", "in", []string{"stderr", "crunch-run", "update"}}, + }, + }) + } + } else if n > 0 { + chmap[ch] = n + } +} + +func (client *arvadosClient) Close() { + client.mtx.Lock() + defer client.mtx.Unlock() + if client.notifying != nil { + client.notifying = nil + close(client.wantClose) + } +} + +func (client *arvadosClient) runNotifier() { +reconnect: + for { + var cluster arvados.Cluster + err := client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil) + if err != nil { + log.Warnf("error getting cluster config: %s", err) + time.Sleep(5 * time.Second) + continue reconnect + } + wsURL := cluster.Services.Websocket.ExternalURL + wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1) + wsURL.Path = "/websocket" + wsURLNoToken := wsURL.String() + wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode() + conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String()) + if err != nil { + log.Warnf("websocket connection error: %s", err) + time.Sleep(5 * time.Second) + continue reconnect + } + log.Printf("connected to websocket at %s", wsURLNoToken) + + client.mtx.Lock() + client.wsconn = conn + resubscribe := make([]string, 0, len(client.notifying)) + for uuid := range client.notifying { + resubscribe = append(resubscribe, uuid) + } + client.mtx.Unlock() + + go func() { + w := json.NewEncoder(conn) + for _, uuid := range resubscribe { + w.Encode(map[string]interface{}{ + "method": "subscribe", + "filters": [][]interface{}{ + {"object_uuid", "=", uuid}, + {"event_type", "in", []string{"stderr", "crunch-run", "crunchstat", "update"}}, + }, + }) + } + }() + + r := json.NewDecoder(conn) + for { + var msg eventMessage + err := r.Decode(&msg) + select { + case <-client.wantClose: + return + default: + if err != nil { + log.Printf("error decoding websocket message: %s", err) + client.mtx.Lock() + client.wsconn = nil + client.mtx.Unlock() + go conn.Close() + continue reconnect + } + client.mtx.Lock() + for ch := range client.notifying[msg.ObjectUUID] { + ch <- msg + } + client.mtx.Unlock() + } + } + } +} + +var refreshTicker = time.NewTicker(5 * time.Second) + type arvadosContainerRunner struct { Client *arvados.Client Name string + OutputName string ProjectUUID string + APIAccess bool VCPUs int RAM int64 Prog string // if empty, run /proc/self/exe Args []string - Mounts map[string]string + Mounts map[string]map[string]interface{} + Priority int + KeepCache int // cache buffers per VCPU (0 for default) } -var ( - collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`) -) +func (runner *arvadosContainerRunner) Run() (string, error) { + return runner.RunContext(context.Background()) +} -func (runner *arvadosContainerRunner) Run() error { +func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, error) { if runner.ProjectUUID == "" { - return errors.New("cannot run arvados container: ProjectUUID not provided") + return "", errors.New("cannot run arvados container: ProjectUUID not provided") } + mounts := map[string]map[string]interface{}{ "/mnt/output": { - "kind": "tmp", + "kind": "collection", "writable": true, - "capacity": 100000000000, }, } + for path, mnt := range runner.Mounts { + mounts[path] = mnt + } prog := runner.Prog if prog == "" { prog = "/mnt/cmd/lightning" cmdUUID, err := runner.makeCommandCollection() if err != nil { - return err + return "", err } mounts["/mnt/cmd"] = map[string]interface{}{ "kind": "collection", @@ -55,16 +230,23 @@ func (runner *arvadosContainerRunner) Run() error { } command := append([]string{prog}, runner.Args...) - for uuid, mnt := range runner.Mounts { - mounts[mnt] = map[string]interface{}{ - "kind": "collection", - "uuid": uuid, - } + priority := runner.Priority + if priority < 1 { + priority = 500 + } + keepCache := runner.KeepCache + if keepCache < 1 { + keepCache = 2 } rc := arvados.RuntimeConstraints{ + API: &runner.APIAccess, VCPUs: runner.VCPUs, RAM: runner.RAM, - KeepCacheRAM: (1 << 26) * 2 * int64(runner.VCPUs), + KeepCacheRAM: (1 << 26) * int64(keepCache) * int64(runner.VCPUs), + } + outname := &runner.OutputName + if *outname == "" { + outname = nil } var cr arvados.ContainerRequest err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{ @@ -76,21 +258,123 @@ func (runner *arvadosContainerRunner) Run() error { "mounts": mounts, "use_existing": true, "output_path": "/mnt/output", + "output_name": outname, "runtime_constraints": rc, - "priority": 1, + "priority": runner.Priority, "state": arvados.ContainerRequestStateCommitted, + "scheduling_parameters": arvados.SchedulingParameters{ + Preemptible: true, + Partitions: []string{}, + }, }, }) - log.Print(cr.UUID) - return err + if err != nil { + return "", err + } + log.Printf("container request UUID: %s", cr.UUID) + log.Printf("container UUID: %s", cr.ContainerUUID) + + logch := make(chan eventMessage) + client := arvadosClient{Client: runner.Client} + defer client.Close() + subscribedUUID := "" + defer func() { + if subscribedUUID != "" { + client.Unsubscribe(logch, subscribedUUID) + } + }() + + neednewline := "" + + lastState := cr.State + refreshCR := func() { + err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil) + if err != nil { + fmt.Fprint(os.Stderr, neednewline) + log.Printf("error getting container request: %s", err) + return + } + if lastState != cr.State { + fmt.Fprint(os.Stderr, neednewline) + log.Printf("container request state: %s", cr.State) + lastState = cr.State + } + if subscribedUUID != cr.ContainerUUID { + fmt.Fprint(os.Stderr, neednewline) + neednewline = "" + if subscribedUUID != "" { + client.Unsubscribe(logch, subscribedUUID) + } + client.Subscribe(logch, cr.ContainerUUID) + subscribedUUID = cr.ContainerUUID + } + } + + var reCrunchstat = regexp.MustCompile(`mem .* rss`) +waitctr: + for cr.State != arvados.ContainerRequestStateFinal { + select { + case <-ctx.Done(): + err := runner.Client.RequestAndDecode(&cr, "PATCH", "arvados/v1/container_requests/"+cr.UUID, nil, map[string]interface{}{ + "container_request": map[string]interface{}{ + "priority": 0, + }, + }) + if err != nil { + log.Errorf("error while trying to cancel container request %s: %s", cr.UUID, err) + } + break waitctr + case <-refreshTicker.C: + refreshCR() + case msg := <-logch: + switch msg.EventType { + case "update": + refreshCR() + case "stderr": + for _, line := range strings.Split(msg.Properties.Text, "\n") { + if line != "" { + fmt.Fprint(os.Stderr, neednewline) + neednewline = "" + log.Print(line) + } + } + case "crunchstat": + for _, line := range strings.Split(msg.Properties.Text, "\n") { + mem := reCrunchstat.FindString(line) + if mem != "" { + fmt.Fprintf(os.Stderr, "%s \r", mem) + neednewline = "\n" + } + } + } + } + } + fmt.Fprint(os.Stderr, neednewline) + + if err := ctx.Err(); err != nil { + return "", err + } + + var c arvados.Container + err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil) + if err != nil { + return "", err + } else if c.State != arvados.ContainerStateComplete { + return "", fmt.Errorf("container did not complete: %s", c.State) + } else if c.ExitCode != 0 { + return "", fmt.Errorf("container exited %d", c.ExitCode) + } + return cr.OutputUUID, err } +var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`) + func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error { if runner.Mounts == nil { - runner.Mounts = make(map[string]string) + runner.Mounts = make(map[string]map[string]interface{}) } for _, path := range paths { - if *path == "" { + if *path == "" || *path == "-" { continue } m := collectionInPathRe.FindStringSubmatch(*path) @@ -98,23 +382,30 @@ func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error { return fmt.Errorf("cannot find uuid in path: %q", *path) } uuid := m[2] - mnt, ok := runner.Mounts[uuid] + mnt, ok := runner.Mounts["/mnt/"+uuid] if !ok { - mnt = "/mnt/" + uuid - runner.Mounts[uuid] = mnt + mnt = map[string]interface{}{ + "kind": "collection", + "uuid": uuid, + } + runner.Mounts["/mnt/"+uuid] = mnt } - *path = mnt + m[3] + *path = "/mnt/" + uuid + m[3] } return nil } +var mtxMakeCommandCollection sync.Mutex + func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) { + mtxMakeCommandCollection.Lock() + defer mtxMakeCommandCollection.Unlock() exe, err := ioutil.ReadFile("/proc/self/exe") if err != nil { return "", err } b2 := blake2b.Sum256(exe) - cname := fmt.Sprintf("lightning-%x", b2) + cname := "lightning " + cmd.Version.String() // must build with "make", not just "go install" var existing arvados.CollectionList err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{ Limit: 1, @@ -122,15 +413,16 @@ func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) { Filters: []arvados.Filter{ {Attr: "name", Operator: "=", Operand: cname}, {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID}, + {Attr: "properties.blake2b", Operator: "=", Operand: fmt.Sprintf("%x", b2)}, }, }) if err != nil { return "", err } if len(existing.Items) > 0 { - uuid := existing.Items[0].UUID - log.Printf("using lightning binary in existing collection %s (name is %q; did not verify whether content matches)", uuid, cname) - return uuid, nil + coll := existing.Items[0] + log.Printf("using lightning binary in existing collection %s (name is %q, hash is %q; did not verify whether content matches)", coll.UUID, cname, coll.Properties["blake2b"]) + return coll.UUID, nil } log.Printf("writing lightning binary to new collection %q", cname) ac, err := arvadosclient.New(runner.Client) @@ -164,6 +456,9 @@ func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) { "owner_uuid": runner.ProjectUUID, "manifest_text": mtxt, "name": cname, + "properties": map[string]interface{}{ + "blake2b": fmt.Sprintf("%x", b2), + }, }, }) if err != nil { @@ -172,3 +467,45 @@ func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) { log.Printf("stored lightning binary in new collection %s", coll.UUID) return coll.UUID, nil } + +var ( + arvadosClientFromEnv = arvados.NewClientFromEnv() + siteFS arvados.CustomFileSystem + siteFSMtx sync.Mutex +) + +func open(fnm string) (io.ReadCloser, error) { + if os.Getenv("ARVADOS_API_HOST") == "" { + return os.Open(fnm) + } + m := collectionInPathRe.FindStringSubmatch(fnm) + if m == nil { + return os.Open(fnm) + } + uuid := m[2] + mnt := "/mnt/" + uuid + "/" + if !strings.HasPrefix(fnm, mnt) { + return os.Open(fnm) + } + + siteFSMtx.Lock() + defer siteFSMtx.Unlock() + if siteFS == nil { + log.Info("setting up Arvados client") + ac, err := arvadosclient.New(arvadosClientFromEnv) + if err != nil { + return nil, err + } + ac.Client = arvados.DefaultSecureClient + kc := keepclient.New(ac) + // Don't use keepclient's default short timeouts. + kc.HTTPClient = arvados.DefaultSecureClient + // Guess max concurrent readers, hope to avoid cache + // thrashing. + kc.BlockCache = &keepclient.BlockCache{MaxBlocks: runtime.NumCPU() * 3} + siteFS = arvadosClientFromEnv.SiteFileSystem(kc) + } + + log.Infof("reading %q from %s using Arvados client", fnm[len(mnt):], uuid) + return siteFS.Open("by_id/" + uuid + "/" + fnm[len(mnt):]) +}