I/O pipeline, show arvados container logs.
authorTom Clegg <tom@tomclegg.ca>
Thu, 5 Mar 2020 03:18:26 +0000 (22:18 -0500)
committerTom Clegg <tom@tomclegg.ca>
Thu, 5 Mar 2020 03:18:26 +0000 (22:18 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

arvados.go
example-su92l-1kg.sh [changed mode: 0644->0755]
exportnumpy.go
filter.go
go.mod
go.sum
import.go
pca.go
plot.go

index c1b54d98236661136a5112672950c17ed6605807..c9f2e7a3bac8b87c7dda29d8d43fef7ebd567225 100644 (file)
@@ -1,17 +1,22 @@
 package main
 
 import (
 package main
 
 import (
+       "encoding/json"
        "errors"
        "fmt"
        "io/ioutil"
        "log"
        "errors"
        "fmt"
        "io/ioutil"
        "log"
+       "net/url"
        "os"
        "regexp"
        "os"
        "regexp"
+       "strings"
+       "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
        "golang.org/x/crypto/blake2b"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
        "golang.org/x/crypto/blake2b"
+       "golang.org/x/net/websocket"
 )
 
 type arvadosContainerRunner struct {
 )
 
 type arvadosContainerRunner struct {
@@ -25,9 +30,9 @@ type arvadosContainerRunner struct {
        Mounts      map[string]map[string]interface{}
 }
 
        Mounts      map[string]map[string]interface{}
 }
 
-func (runner *arvadosContainerRunner) Run() error {
+func (runner *arvadosContainerRunner) Run() (string, error) {
        if runner.ProjectUUID == "" {
        if runner.ProjectUUID == "" {
-               return errors.New("cannot run arvados container: ProjectUUID not provided")
+               return "", errors.New("cannot run arvados container: ProjectUUID not provided")
        }
 
        mounts := map[string]map[string]interface{}{
        }
 
        mounts := map[string]map[string]interface{}{
@@ -46,7 +51,7 @@ func (runner *arvadosContainerRunner) Run() error {
                prog = "/mnt/cmd/lightning"
                cmdUUID, err := runner.makeCommandCollection()
                if err != nil {
                prog = "/mnt/cmd/lightning"
                cmdUUID, err := runner.makeCommandCollection()
                if err != nil {
-                       return err
+                       return "", err
                }
                mounts["/mnt/cmd"] = map[string]interface{}{
                        "kind": "collection",
                }
                mounts["/mnt/cmd"] = map[string]interface{}{
                        "kind": "collection",
@@ -76,7 +81,65 @@ func (runner *arvadosContainerRunner) Run() error {
                },
        })
        log.Print(cr.UUID)
                },
        })
        log.Print(cr.UUID)
-       return err
+
+       var logch <-chan string
+       var logstream *logStream
+       defer func() {
+               if logstream != nil {
+                       logstream.Close()
+               }
+       }()
+
+       ticker := time.NewTicker(5 * time.Second)
+       defer ticker.Stop()
+
+       lastState := cr.State
+       subscribedUUID := ""
+       for cr.State != arvados.ContainerRequestState(arvados.ContainerRequestStateFinal) {
+               if logch == nil && cr.ContainerUUID != subscribedUUID {
+                       if logstream != nil {
+                               logstream.Close()
+                       }
+                       logstream = runner.logStream(cr.ContainerUUID)
+                       logch = logstream.C
+               }
+               select {
+               case msg, ok := <-logch:
+                       if !ok {
+                               logstream.Close()
+                               logstream = nil
+                               logch = nil
+                               break
+                       }
+                       if msg != "" {
+                               log.Print(msg)
+                               continue
+                       }
+                       // empty message indicates an "update" event
+                       // -- fall out of the select and get the
+                       // latest version now, instead of waiting for
+                       // the next timer tick.
+               case <-ticker.C:
+               }
+               err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
+               if err != nil {
+                       return "", err
+               }
+               if lastState != cr.State {
+                       log.Printf("container state: %s", cr.State)
+                       lastState = cr.State
+               }
+       }
+
+       var c arvados.Container
+       err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
+       if err != nil {
+               return "", err
+       }
+       if c.ExitCode != 0 {
+               return "", fmt.Errorf("container exited %d", c.ExitCode)
+       }
+       return cr.OutputUUID, err
 }
 
 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
 }
 
 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
@@ -86,7 +149,7 @@ func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
                runner.Mounts = make(map[string]map[string]interface{})
        }
        for _, path := range paths {
                runner.Mounts = make(map[string]map[string]interface{})
        }
        for _, path := range paths {
-               if *path == "" {
+               if *path == "" || *path == "-" {
                        continue
                }
                m := collectionInPathRe.FindStringSubmatch(*path)
                        continue
                }
                m := collectionInPathRe.FindStringSubmatch(*path)
@@ -171,3 +234,73 @@ func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
        log.Printf("stored lightning binary in new collection %s", coll.UUID)
        return coll.UUID, nil
 }
        log.Printf("stored lightning binary in new collection %s", coll.UUID)
        return coll.UUID, nil
 }
+
+type logStream struct {
+       C     <-chan string
+       Close func() error
+}
+
+func (runner *arvadosContainerRunner) logStream(uuid string) *logStream {
+       ch := make(chan string)
+       done := make(chan struct{})
+       go func() {
+               defer close(ch)
+               var cluster arvados.Cluster
+               runner.Client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
+               wsURL := cluster.Services.Websocket.ExternalURL
+               wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
+               wsURL.Path = "/websocket"
+               wsURL.RawQuery = url.Values{"api_token": []string{runner.Client.AuthToken}}.Encode()
+               conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
+               if err != nil {
+                       ch <- fmt.Sprintf("websocket error: %s", err)
+                       return
+               }
+               w := json.NewEncoder(conn)
+               go w.Encode(map[string]interface{}{
+                       "method": "subscribe",
+                       "filters": [][]interface{}{
+                               {"object_uuid", "=", uuid},
+                               {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
+                       },
+               })
+               r := json.NewDecoder(conn)
+               for {
+                       var msg struct {
+                               Status     int
+                               ObjectUUID string `json:"object_uuid"`
+                               EventType  string `json:"event_type"`
+                               Properties struct {
+                                       Text string
+                               }
+                       }
+                       err := r.Decode(&msg)
+                       if err != nil {
+                               log.Printf("error decoding websocket message: %s", err)
+                               return
+                       }
+                       if msg.ObjectUUID == uuid {
+                               for _, line := range strings.Split(msg.Properties.Text, "\n") {
+                                       if line != "" {
+                                               ch <- line
+                                       }
+                               }
+                               if msg.EventType == "update" {
+                                       ch <- ""
+                               }
+                       }
+                       select {
+                       case <-done:
+                               return
+                       default:
+                       }
+               }
+       }()
+       return &logStream{
+               C: ch,
+               Close: func() error {
+                       close(done)
+                       return nil
+               },
+       }
+}
old mode 100644 (file)
new mode 100755 (executable)
index 147585c..7bd631b
@@ -1,6 +1,10 @@
 #!/bin/bash
 
 #!/bin/bash
 
-go run . build-docker-image
+set -ex
+
+PATH="${GOPATH:-${HOME}/go}/bin:${PATH}"
+go install
+lightning build-docker-image
 arv keep docker lightning-runtime
 
 project=su92l-j7d0g-jzei0m9yvgauhjf
 arv keep docker lightning-runtime
 
 project=su92l-j7d0g-jzei0m9yvgauhjf
@@ -8,17 +12,25 @@ gvcf=su92l-4zz18-ykpcoea5nisz74f
 fasta=su92l-4zz18-s3e6as6uzsoocsb
 tags=su92l-4zz18-92bx4zjg5hgs3yc
 
 fasta=su92l-4zz18-s3e6as6uzsoocsb
 tags=su92l-4zz18-92bx4zjg5hgs3yc
 
-go run . import       -project ${project} \
-   -tag-library ~/keep/by_id/${tags}/tagset.fa.gz \
-   ~/keep/by_id/${fasta}
-go run . filter       -project ${project} \
-   -i ~/keep/by_id/su92l-4zz18-fcyucnod8y4515p/library.gob \
-   -min-coverage 0.9 -max-variants 30
-go run . export-numpy -project ${project} \
-   -i ~/keep/by_id/su92l-4zz18-l40xcd2l6dmphaj/library.gob
-go run . pca          -project ${project} \
-   -i ~/keep/by_id/su92l-4zz18-i6fzfoxpdh38yk4/library.npy
-go run . plot         -project ${project} \
-   -i ~/keep/by_id/su92l-4zz18-zqfo7qc3tadh6zb/pca.npy \
-   -labels-csv ~/keep/by_id/${gvcf}/sample_info.csv \
-   -sample-fasta-dir ~/keep/by_id/${fasta}
+unfiltered=$(
+    lightning import       -project ${project} \
+       -tag-library ${tags}/tagset.fa.gz \
+       ${fasta})
+unfiltered=su92l-4zz18-fcyucnod8y4515p/library.gob
+filtered=$(
+    lightning filter       -project ${project} \
+       -i ${unfiltered} \
+       -min-coverage 0.9 -max-variants 30)
+numpy=$(
+    lightning export-numpy -project ${project} \
+       -i ${filtered})
+pca=$(
+    lightning pca          -project ${project} \
+       -i ${numpy})
+plot=$(
+    lightning plot         -project ${project} \
+       -i ${pca} \
+       -labels-csv ${gvcf}/sample_info.csv \
+       -sample-fasta-dir ${fasta})
+echo >&2 "https://workbench2.${plot%%-*}.arvadosapi.com/collections/${plot}"
+echo ${plot%%/*}
index 2c902239f037bb94703ea19851ec6355359c3bc2..8ccf11e707b3f17b930323a877f9f20e106836b8 100644 (file)
@@ -63,10 +63,12 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
                        return 1
                }
                runner.Args = []string{"export-numpy", "-local=true", "-i", *inputFilename, "-o", "/mnt/output/library.npy"}
                        return 1
                }
                runner.Args = []string{"export-numpy", "-local=true", "-i", *inputFilename, "-o", "/mnt/output/library.npy"}
-               err = runner.Run()
+               var output string
+               output, err = runner.Run()
                if err != nil {
                        return 1
                }
                if err != nil {
                        return 1
                }
+               fmt.Fprintln(stdout, output+"/library.npy")
                return 0
        }
 
                return 0
        }
 
index 26e55bafa4efea47cb8ffda6c41ca45587272869..247e00f7826efc656eec6a96a6af4a40a3b9d88e 100644 (file)
--- a/filter.go
+++ b/filter.go
@@ -75,10 +75,12 @@ func (cmd *filterer) RunCommand(prog string, args []string, stdin io.Reader, std
                        "-min-coverage", fmt.Sprintf("%f", *mincoverage),
                        "-max-tag", fmt.Sprintf("%d", *maxtag),
                }
                        "-min-coverage", fmt.Sprintf("%f", *mincoverage),
                        "-max-tag", fmt.Sprintf("%d", *maxtag),
                }
-               err = runner.Run()
+               var output string
+               output, err = runner.Run()
                if err != nil {
                        return 1
                }
                if err != nil {
                        return 1
                }
+               fmt.Fprintln(stdout, output+"/library.gob")
                return 0
        }
 
                return 0
        }
 
diff --git a/go.mod b/go.mod
index 91be95e4f12c8fef3764d524804e2a9bf823f0d4..4158a720edfcce88cc394fef9db3d64b62df074e 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -11,6 +11,7 @@ require (
        github.com/prometheus/client_golang v1.4.1 // indirect
        github.com/prometheus/procfs v0.0.10 // indirect
        golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d
        github.com/prometheus/client_golang v1.4.1 // indirect
        github.com/prometheus/procfs v0.0.10 // indirect
        golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d
+       golang.org/x/net v0.0.0-20200301022130-244492dfa37a
        golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 // indirect
        gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15
        gopkg.in/yaml.v2 v2.2.8 // indirect
        golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 // indirect
        gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15
        gopkg.in/yaml.v2 v2.2.8 // indirect
diff --git a/go.sum b/go.sum
index 322870aefa3079dea663af75425a7d994f727fc0..478eebc51036165dfdbac25f37642536172b8a3d 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -175,7 +175,10 @@ golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73r
 golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowKpJ8y4AmooUzdBSR9GU=
 golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200301022130-244492dfa37a h1:GuSPYbZzB5/dcLNCwLQLsg3obCJtX9IJhpXkvY7kzk0=
+golang.org/x/net v0.0.0-20200301022130-244492dfa37a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
index e6a4b3a5cb8961afd73a7177d04b83f23dcda632..2c502a47cff931c39c29f5d1cae4b5351f59e9f5 100644 (file)
--- a/import.go
+++ b/import.go
@@ -98,10 +98,12 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std
                        return 1
                }
                runner.Args = append([]string{"import", "-local=true", "-tag-library", cmd.tagLibraryFile, "-ref", cmd.refFile, "-o", cmd.outputFile}, inputs...)
                        return 1
                }
                runner.Args = append([]string{"import", "-local=true", "-tag-library", cmd.tagLibraryFile, "-ref", cmd.refFile, "-o", cmd.outputFile}, inputs...)
-               err = runner.Run()
+               var output string
+               output, err = runner.Run()
                if err != nil {
                        return 1
                }
                if err != nil {
                        return 1
                }
+               fmt.Fprintln(stdout, output+"/library.gob")
                return 0
        }
 
                return 0
        }
 
diff --git a/pca.go b/pca.go
index cd68eeb897e7fefda071f97c1d2eb26142de12ad..68dfff11b1c03fdf3b089fd61c16551faeac315e 100644 (file)
--- a/pca.go
+++ b/pca.go
@@ -46,9 +46,11 @@ func (cmd *pythonPCA) RunCommand(prog string, args []string, stdin io.Reader, st
 import scipy
 from sklearn.decomposition import PCA
 scipy.save(sys.argv[2], PCA(n_components=4).fit_transform(scipy.load(sys.argv[1])))`, *inputFilename, "/mnt/output/pca.npy"}
 import scipy
 from sklearn.decomposition import PCA
 scipy.save(sys.argv[2], PCA(n_components=4).fit_transform(scipy.load(sys.argv[1])))`, *inputFilename, "/mnt/output/pca.npy"}
-       err = runner.Run()
+       var output string
+       output, err = runner.Run()
        if err != nil {
                return 1
        }
        if err != nil {
                return 1
        }
+       fmt.Fprintln(stdout, output+"/pca.npy")
        return 0
 }
        return 0
 }
diff --git a/plot.go b/plot.go
index 6705f146c0e8994c6db189b4a595793924f6a3d9..9723c91c88625ec99830f135aafea8ffdaae12a0 100644 (file)
--- a/plot.go
+++ b/plot.go
@@ -51,10 +51,12 @@ func (cmd *pythonPlot) RunCommand(prog string, args []string, stdin io.Reader, s
        }
        runner.Prog = "python3"
        runner.Args = []string{"/plot.py", *inputFilename, *sampleCSVFilename, *sampleFastaDirname, "/mnt/output/plot.png"}
        }
        runner.Prog = "python3"
        runner.Args = []string{"/plot.py", *inputFilename, *sampleCSVFilename, *sampleFastaDirname, "/mnt/output/plot.png"}
-       err = runner.Run()
+       var output string
+       output, err = runner.Run()
        if err != nil {
                return 1
        }
        if err != nil {
                return 1
        }
+       fmt.Fprintln(stdout, output+"/plot.png")
        return 0
 }
 
        return 0
 }