From f1cff36f8beba7ef4b494120f1577224417c125b Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Wed, 4 Mar 2020 22:18:26 -0500 Subject: [PATCH] I/O pipeline, show arvados container logs. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- arvados.go | 143 +++++++++++++++++++++++++++++++++++++++++-- example-su92l-1kg.sh | 42 ++++++++----- exportnumpy.go | 4 +- filter.go | 4 +- go.mod | 1 + go.sum | 3 + import.go | 4 +- pca.go | 4 +- plot.go | 4 +- 9 files changed, 184 insertions(+), 25 deletions(-) mode change 100644 => 100755 example-su92l-1kg.sh diff --git a/arvados.go b/arvados.go index c1b54d9823..c9f2e7a3ba 100644 --- a/arvados.go +++ b/arvados.go @@ -1,17 +1,22 @@ package main import ( + "encoding/json" "errors" "fmt" "io/ioutil" "log" + "net/url" "os" "regexp" + "strings" + "time" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/keepclient" "golang.org/x/crypto/blake2b" + "golang.org/x/net/websocket" ) type arvadosContainerRunner struct { @@ -25,9 +30,9 @@ type arvadosContainerRunner struct { Mounts map[string]map[string]interface{} } -func (runner *arvadosContainerRunner) Run() error { +func (runner *arvadosContainerRunner) Run() (string, error) { if runner.ProjectUUID == "" { - return errors.New("cannot run arvados container: ProjectUUID not provided") + return "", errors.New("cannot run arvados container: ProjectUUID not provided") } mounts := map[string]map[string]interface{}{ @@ -46,7 +51,7 @@ func (runner *arvadosContainerRunner) Run() error { prog = "/mnt/cmd/lightning" cmdUUID, err := runner.makeCommandCollection() if err != nil { - return err + return "", err } mounts["/mnt/cmd"] = map[string]interface{}{ "kind": "collection", @@ -76,7 +81,65 @@ func (runner *arvadosContainerRunner) Run() error { }, }) log.Print(cr.UUID) - return err + + var logch <-chan string + var logstream *logStream + defer func() { + if logstream != nil { + logstream.Close() + } + }() + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + lastState := cr.State + subscribedUUID := "" + for cr.State != arvados.ContainerRequestState(arvados.ContainerRequestStateFinal) { + if logch == nil && cr.ContainerUUID != subscribedUUID { + if logstream != nil { + logstream.Close() + } + logstream = runner.logStream(cr.ContainerUUID) + logch = logstream.C + } + select { + case msg, ok := <-logch: + if !ok { + logstream.Close() + logstream = nil + logch = nil + break + } + if msg != "" { + log.Print(msg) + continue + } + // empty message indicates an "update" event + // -- fall out of the select and get the + // latest version now, instead of waiting for + // the next timer tick. + case <-ticker.C: + } + err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil) + if err != nil { + return "", err + } + if lastState != cr.State { + log.Printf("container state: %s", cr.State) + lastState = cr.State + } + } + + var c arvados.Container + err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil) + if err != nil { + return "", err + } + if c.ExitCode != 0 { + return "", fmt.Errorf("container exited %d", c.ExitCode) + } + return cr.OutputUUID, err } var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`) @@ -86,7 +149,7 @@ func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error { runner.Mounts = make(map[string]map[string]interface{}) } for _, path := range paths { - if *path == "" { + if *path == "" || *path == "-" { continue } m := collectionInPathRe.FindStringSubmatch(*path) @@ -171,3 +234,73 @@ func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) { log.Printf("stored lightning binary in new collection %s", coll.UUID) return coll.UUID, nil } + +type logStream struct { + C <-chan string + Close func() error +} + +func (runner *arvadosContainerRunner) logStream(uuid string) *logStream { + ch := make(chan string) + done := make(chan struct{}) + go func() { + defer close(ch) + var cluster arvados.Cluster + runner.Client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil) + wsURL := cluster.Services.Websocket.ExternalURL + wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1) + wsURL.Path = "/websocket" + wsURL.RawQuery = url.Values{"api_token": []string{runner.Client.AuthToken}}.Encode() + conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String()) + if err != nil { + ch <- fmt.Sprintf("websocket error: %s", err) + return + } + w := json.NewEncoder(conn) + go w.Encode(map[string]interface{}{ + "method": "subscribe", + "filters": [][]interface{}{ + {"object_uuid", "=", uuid}, + {"event_type", "in", []string{"stderr", "crunch-run", "update"}}, + }, + }) + r := json.NewDecoder(conn) + for { + var msg struct { + Status int + ObjectUUID string `json:"object_uuid"` + EventType string `json:"event_type"` + Properties struct { + Text string + } + } + err := r.Decode(&msg) + if err != nil { + log.Printf("error decoding websocket message: %s", err) + return + } + if msg.ObjectUUID == uuid { + for _, line := range strings.Split(msg.Properties.Text, "\n") { + if line != "" { + ch <- line + } + } + if msg.EventType == "update" { + ch <- "" + } + } + select { + case <-done: + return + default: + } + } + }() + return &logStream{ + C: ch, + Close: func() error { + close(done) + return nil + }, + } +} diff --git a/example-su92l-1kg.sh b/example-su92l-1kg.sh old mode 100644 new mode 100755 index 147585cac1..7bd631b9eb --- a/example-su92l-1kg.sh +++ b/example-su92l-1kg.sh @@ -1,6 +1,10 @@ #!/bin/bash -go run . build-docker-image +set -ex + +PATH="${GOPATH:-${HOME}/go}/bin:${PATH}" +go install +lightning build-docker-image arv keep docker lightning-runtime project=su92l-j7d0g-jzei0m9yvgauhjf @@ -8,17 +12,25 @@ gvcf=su92l-4zz18-ykpcoea5nisz74f fasta=su92l-4zz18-s3e6as6uzsoocsb tags=su92l-4zz18-92bx4zjg5hgs3yc -go run . import -project ${project} \ - -tag-library ~/keep/by_id/${tags}/tagset.fa.gz \ - ~/keep/by_id/${fasta} -go run . filter -project ${project} \ - -i ~/keep/by_id/su92l-4zz18-fcyucnod8y4515p/library.gob \ - -min-coverage 0.9 -max-variants 30 -go run . export-numpy -project ${project} \ - -i ~/keep/by_id/su92l-4zz18-l40xcd2l6dmphaj/library.gob -go run . pca -project ${project} \ - -i ~/keep/by_id/su92l-4zz18-i6fzfoxpdh38yk4/library.npy -go run . plot -project ${project} \ - -i ~/keep/by_id/su92l-4zz18-zqfo7qc3tadh6zb/pca.npy \ - -labels-csv ~/keep/by_id/${gvcf}/sample_info.csv \ - -sample-fasta-dir ~/keep/by_id/${fasta} +unfiltered=$( + lightning import -project ${project} \ + -tag-library ${tags}/tagset.fa.gz \ + ${fasta}) +unfiltered=su92l-4zz18-fcyucnod8y4515p/library.gob +filtered=$( + lightning filter -project ${project} \ + -i ${unfiltered} \ + -min-coverage 0.9 -max-variants 30) +numpy=$( + lightning export-numpy -project ${project} \ + -i ${filtered}) +pca=$( + lightning pca -project ${project} \ + -i ${numpy}) +plot=$( + lightning plot -project ${project} \ + -i ${pca} \ + -labels-csv ${gvcf}/sample_info.csv \ + -sample-fasta-dir ${fasta}) +echo >&2 "https://workbench2.${plot%%-*}.arvadosapi.com/collections/${plot}" +echo ${plot%%/*} diff --git a/exportnumpy.go b/exportnumpy.go index 2c902239f0..8ccf11e707 100644 --- a/exportnumpy.go +++ b/exportnumpy.go @@ -63,10 +63,12 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, return 1 } runner.Args = []string{"export-numpy", "-local=true", "-i", *inputFilename, "-o", "/mnt/output/library.npy"} - err = runner.Run() + var output string + output, err = runner.Run() if err != nil { return 1 } + fmt.Fprintln(stdout, output+"/library.npy") return 0 } diff --git a/filter.go b/filter.go index 26e55bafa4..247e00f782 100644 --- a/filter.go +++ b/filter.go @@ -75,10 +75,12 @@ func (cmd *filterer) RunCommand(prog string, args []string, stdin io.Reader, std "-min-coverage", fmt.Sprintf("%f", *mincoverage), "-max-tag", fmt.Sprintf("%d", *maxtag), } - err = runner.Run() + var output string + output, err = runner.Run() if err != nil { return 1 } + fmt.Fprintln(stdout, output+"/library.gob") return 0 } diff --git a/go.mod b/go.mod index 91be95e4f1..4158a720ed 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/prometheus/client_golang v1.4.1 // indirect github.com/prometheus/procfs v0.0.10 // indirect golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d + golang.org/x/net v0.0.0-20200301022130-244492dfa37a golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 gopkg.in/yaml.v2 v2.2.8 // indirect diff --git a/go.sum b/go.sum index 322870aefa..478eebc510 100644 --- a/go.sum +++ b/go.sum @@ -175,7 +175,10 @@ golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowKpJ8y4AmooUzdBSR9GU= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200301022130-244492dfa37a h1:GuSPYbZzB5/dcLNCwLQLsg3obCJtX9IJhpXkvY7kzk0= +golang.org/x/net v0.0.0-20200301022130-244492dfa37a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= diff --git a/import.go b/import.go index e6a4b3a5cb..2c502a47cf 100644 --- a/import.go +++ b/import.go @@ -98,10 +98,12 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std return 1 } runner.Args = append([]string{"import", "-local=true", "-tag-library", cmd.tagLibraryFile, "-ref", cmd.refFile, "-o", cmd.outputFile}, inputs...) - err = runner.Run() + var output string + output, err = runner.Run() if err != nil { return 1 } + fmt.Fprintln(stdout, output+"/library.gob") return 0 } diff --git a/pca.go b/pca.go index cd68eeb897..68dfff11b1 100644 --- a/pca.go +++ b/pca.go @@ -46,9 +46,11 @@ func (cmd *pythonPCA) RunCommand(prog string, args []string, stdin io.Reader, st import scipy from sklearn.decomposition import PCA scipy.save(sys.argv[2], PCA(n_components=4).fit_transform(scipy.load(sys.argv[1])))`, *inputFilename, "/mnt/output/pca.npy"} - err = runner.Run() + var output string + output, err = runner.Run() if err != nil { return 1 } + fmt.Fprintln(stdout, output+"/pca.npy") return 0 } diff --git a/plot.go b/plot.go index 6705f146c0..9723c91c88 100644 --- a/plot.go +++ b/plot.go @@ -51,10 +51,12 @@ func (cmd *pythonPlot) RunCommand(prog string, args []string, stdin io.Reader, s } runner.Prog = "python3" runner.Args = []string{"/plot.py", *inputFilename, *sampleCSVFilename, *sampleFastaDirname, "/mnt/output/plot.png"} - err = runner.Run() + var output string + output, err = runner.Run() if err != nil { return 1 } + fmt.Fprintln(stdout, output+"/plot.png") return 0 } -- 2.30.2