X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/b87e00d729f627b20020e5d1bfaa8ac05af57874..0df66c8fae2cdaf70c811379a4c1522211838a9b:/arvados.go diff --git a/arvados.go b/arvados.go index 00d5d6fdda..ec2701bcdc 100644 --- a/arvados.go +++ b/arvados.go @@ -1,6 +1,7 @@ -package main +package lightning import ( + "bufio" "context" "encoding/json" "errors" @@ -10,7 +11,6 @@ import ( "net/url" "os" "regexp" - "runtime" "strings" "sync" "time" @@ -19,6 +19,7 @@ import ( "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/keepclient" + "github.com/klauspost/pgzip" log "github.com/sirupsen/logrus" "golang.org/x/crypto/blake2b" "golang.org/x/net/websocket" @@ -262,6 +263,13 @@ func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, e "runtime_constraints": rc, "priority": runner.Priority, "state": arvados.ContainerRequestStateCommitted, + "scheduling_parameters": arvados.SchedulingParameters{ + Preemptible: true, + Partitions: []string{}, + }, + "environment": map[string]string{ + "GOMAXPROCS": fmt.Sprintf("%d", rc.VCPUs), + }, }, }) if err != nil { @@ -276,6 +284,7 @@ func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, e subscribedUUID := "" defer func() { if subscribedUUID != "" { + log.Printf("unsubscribe container UUID: %s", subscribedUUID) client.Unsubscribe(logch, subscribedUUID) } }() @@ -299,8 +308,10 @@ func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, e fmt.Fprint(os.Stderr, neednewline) neednewline = "" if subscribedUUID != "" { + log.Printf("unsubscribe container UUID: %s", subscribedUUID) client.Unsubscribe(logch, subscribedUUID) } + log.Printf("subscribe container UUID: %s", cr.ContainerUUID) client.Subscribe(logch, cr.ContainerUUID) subscribedUUID = cr.ContainerUUID } @@ -464,13 +475,51 @@ func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) { return coll.UUID, nil } +// zopen returns a reader for the given file, using the arvados API +// instead of arv-mount/fuse where applicable, and transparently +// decompressing the input if fnm ends with ".gz". +func zopen(fnm string) (io.ReadCloser, error) { + f, err := open(fnm) + if err != nil || !strings.HasSuffix(fnm, ".gz") { + return f, err + } + rdr, err := pgzip.NewReader(bufio.NewReaderSize(f, 4*1024*1024)) + if err != nil { + f.Close() + return nil, err + } + return gzipr{rdr, f}, nil +} + +// gzipr wraps a ReadCloser and a Closer, presenting a single Close() +// method that closes both wrapped objects. +type gzipr struct { + io.ReadCloser + io.Closer +} + +func (gr gzipr) Close() error { + e1 := gr.ReadCloser.Close() + e2 := gr.Closer.Close() + if e1 != nil { + return e1 + } + return e2 +} + var ( arvadosClientFromEnv = arvados.NewClientFromEnv() + keepClient *keepclient.KeepClient siteFS arvados.CustomFileSystem siteFSMtx sync.Mutex ) -func open(fnm string) (io.ReadCloser, error) { +type file interface { + io.ReadCloser + Readdir(n int) ([]os.FileInfo, error) +} + +func open(fnm string) (file, error) { if os.Getenv("ARVADOS_API_HOST") == "" { return os.Open(fnm) } @@ -479,8 +528,8 @@ func open(fnm string) (io.ReadCloser, error) { return os.Open(fnm) } uuid := m[2] - mnt := "/mnt/" + uuid + "/" - if !strings.HasPrefix(fnm, mnt) { + mnt := "/mnt/" + uuid + if fnm != mnt && !strings.HasPrefix(fnm, mnt+"/") { return os.Open(fnm) } @@ -493,15 +542,15 @@ func open(fnm string) (io.ReadCloser, error) { return nil, err } ac.Client = arvados.DefaultSecureClient - kc := keepclient.New(ac) + keepClient = keepclient.New(ac) // Don't use keepclient's default short timeouts. - kc.HTTPClient = arvados.DefaultSecureClient - // Guess max concurrent readers, hope to avoid cache - // thrashing. - kc.BlockCache = &keepclient.BlockCache{MaxBlocks: runtime.NumCPU() * 3} - siteFS = arvadosClientFromEnv.SiteFileSystem(kc) + keepClient.HTTPClient = arvados.DefaultSecureClient + keepClient.BlockCache = &keepclient.BlockCache{MaxBlocks: 4} + siteFS = arvadosClientFromEnv.SiteFileSystem(keepClient) + } else { + keepClient.BlockCache.MaxBlocks++ } log.Infof("reading %q from %s using Arvados client", fnm[len(mnt):], uuid) - return siteFS.Open("by_id/" + uuid + "/" + fnm[len(mnt):]) + return siteFS.Open("by_id/" + uuid + fnm[len(mnt):]) }