package main
import (
+ "encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
+ "net/url"
"os"
"regexp"
+ "strings"
+ "time"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/keepclient"
"golang.org/x/crypto/blake2b"
+ "golang.org/x/net/websocket"
)
type arvadosContainerRunner struct {
Mounts map[string]map[string]interface{}
}
-func (runner *arvadosContainerRunner) Run() error {
+func (runner *arvadosContainerRunner) Run() (string, error) {
if runner.ProjectUUID == "" {
- return errors.New("cannot run arvados container: ProjectUUID not provided")
+ return "", errors.New("cannot run arvados container: ProjectUUID not provided")
}
mounts := map[string]map[string]interface{}{
prog = "/mnt/cmd/lightning"
cmdUUID, err := runner.makeCommandCollection()
if err != nil {
- return err
+ return "", err
}
mounts["/mnt/cmd"] = map[string]interface{}{
"kind": "collection",
},
})
log.Print(cr.UUID)
- return err
+
+ var logch <-chan string
+ var logstream *logStream
+ defer func() {
+ if logstream != nil {
+ logstream.Close()
+ }
+ }()
+
+ ticker := time.NewTicker(5 * time.Second)
+ defer ticker.Stop()
+
+ lastState := cr.State
+ subscribedUUID := ""
+ for cr.State != arvados.ContainerRequestState(arvados.ContainerRequestStateFinal) {
+ if logch == nil && cr.ContainerUUID != subscribedUUID {
+ if logstream != nil {
+ logstream.Close()
+ }
+ logstream = runner.logStream(cr.ContainerUUID)
+ logch = logstream.C
+ }
+ select {
+ case msg, ok := <-logch:
+ if !ok {
+ logstream.Close()
+ logstream = nil
+ logch = nil
+ break
+ }
+ if msg != "" {
+ log.Print(msg)
+ continue
+ }
+ // empty message indicates an "update" event
+ // -- fall out of the select and get the
+ // latest version now, instead of waiting for
+ // the next timer tick.
+ case <-ticker.C:
+ }
+ err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
+ if err != nil {
+ return "", err
+ }
+ if lastState != cr.State {
+ log.Printf("container state: %s", cr.State)
+ lastState = cr.State
+ }
+ }
+
+ var c arvados.Container
+ err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
+ if err != nil {
+ return "", err
+ }
+ if c.ExitCode != 0 {
+ return "", fmt.Errorf("container exited %d", c.ExitCode)
+ }
+ return cr.OutputUUID, err
}
var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
runner.Mounts = make(map[string]map[string]interface{})
}
for _, path := range paths {
- if *path == "" {
+ if *path == "" || *path == "-" {
continue
}
m := collectionInPathRe.FindStringSubmatch(*path)
log.Printf("stored lightning binary in new collection %s", coll.UUID)
return coll.UUID, nil
}
+
+type logStream struct {
+ C <-chan string
+ Close func() error
+}
+
+func (runner *arvadosContainerRunner) logStream(uuid string) *logStream {
+ ch := make(chan string)
+ done := make(chan struct{})
+ go func() {
+ defer close(ch)
+ var cluster arvados.Cluster
+ runner.Client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
+ wsURL := cluster.Services.Websocket.ExternalURL
+ wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
+ wsURL.Path = "/websocket"
+ wsURL.RawQuery = url.Values{"api_token": []string{runner.Client.AuthToken}}.Encode()
+ conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
+ if err != nil {
+ ch <- fmt.Sprintf("websocket error: %s", err)
+ return
+ }
+ w := json.NewEncoder(conn)
+ go w.Encode(map[string]interface{}{
+ "method": "subscribe",
+ "filters": [][]interface{}{
+ {"object_uuid", "=", uuid},
+ {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
+ },
+ })
+ r := json.NewDecoder(conn)
+ for {
+ var msg struct {
+ Status int
+ ObjectUUID string `json:"object_uuid"`
+ EventType string `json:"event_type"`
+ Properties struct {
+ Text string
+ }
+ }
+ err := r.Decode(&msg)
+ if err != nil {
+ log.Printf("error decoding websocket message: %s", err)
+ return
+ }
+ if msg.ObjectUUID == uuid {
+ for _, line := range strings.Split(msg.Properties.Text, "\n") {
+ if line != "" {
+ ch <- line
+ }
+ }
+ if msg.EventType == "update" {
+ ch <- ""
+ }
+ }
+ select {
+ case <-done:
+ return
+ default:
+ }
+ }
+ }()
+ return &logStream{
+ C: ch,
+ Close: func() error {
+ close(done)
+ return nil
+ },
+ }
+}
#!/bin/bash
-go run . build-docker-image
+set -ex
+
+PATH="${GOPATH:-${HOME}/go}/bin:${PATH}"
+go install
+lightning build-docker-image
arv keep docker lightning-runtime
project=su92l-j7d0g-jzei0m9yvgauhjf
fasta=su92l-4zz18-s3e6as6uzsoocsb
tags=su92l-4zz18-92bx4zjg5hgs3yc
-go run . import -project ${project} \
- -tag-library ~/keep/by_id/${tags}/tagset.fa.gz \
- ~/keep/by_id/${fasta}
-go run . filter -project ${project} \
- -i ~/keep/by_id/su92l-4zz18-fcyucnod8y4515p/library.gob \
- -min-coverage 0.9 -max-variants 30
-go run . export-numpy -project ${project} \
- -i ~/keep/by_id/su92l-4zz18-l40xcd2l6dmphaj/library.gob
-go run . pca -project ${project} \
- -i ~/keep/by_id/su92l-4zz18-i6fzfoxpdh38yk4/library.npy
-go run . plot -project ${project} \
- -i ~/keep/by_id/su92l-4zz18-zqfo7qc3tadh6zb/pca.npy \
- -labels-csv ~/keep/by_id/${gvcf}/sample_info.csv \
- -sample-fasta-dir ~/keep/by_id/${fasta}
+unfiltered=$(
+ lightning import -project ${project} \
+ -tag-library ${tags}/tagset.fa.gz \
+ ${fasta})
+unfiltered=su92l-4zz18-fcyucnod8y4515p/library.gob
+filtered=$(
+ lightning filter -project ${project} \
+ -i ${unfiltered} \
+ -min-coverage 0.9 -max-variants 30)
+numpy=$(
+ lightning export-numpy -project ${project} \
+ -i ${filtered})
+pca=$(
+ lightning pca -project ${project} \
+ -i ${numpy})
+plot=$(
+ lightning plot -project ${project} \
+ -i ${pca} \
+ -labels-csv ${gvcf}/sample_info.csv \
+ -sample-fasta-dir ${fasta})
+echo >&2 "https://workbench2.${plot%%-*}.arvadosapi.com/collections/${plot}"
+echo ${plot%%/*}