X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/de10f554682eecf3926b409c70f9685901d11272..ee4f69c81672248fd775c6b2adde9f62dbfdb349:/arvados.go diff --git a/arvados.go b/arvados.go index a79160d785..48cfad7ccd 100644 --- a/arvados.go +++ b/arvados.go @@ -1,4 +1,4 @@ -package main +package lightning import ( "context" @@ -10,10 +10,12 @@ import ( "net/url" "os" "regexp" + "runtime" "strings" "sync" "time" + "git.arvados.org/arvados.git/lib/cmd" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/keepclient" @@ -260,6 +262,10 @@ func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, e "runtime_constraints": rc, "priority": runner.Priority, "state": arvados.ContainerRequestStateCommitted, + "scheduling_parameters": arvados.SchedulingParameters{ + Preemptible: true, + Partitions: []string{}, + }, }, }) if err != nil { @@ -399,7 +405,7 @@ func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) { return "", err } b2 := blake2b.Sum256(exe) - cname := fmt.Sprintf("lightning-%x", b2) + cname := "lightning " + cmd.Version.String() // must build with "make", not just "go install" var existing arvados.CollectionList err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{ Limit: 1, @@ -407,15 +413,16 @@ func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) { Filters: []arvados.Filter{ {Attr: "name", Operator: "=", Operand: cname}, {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID}, + {Attr: "properties.blake2b", Operator: "=", Operand: fmt.Sprintf("%x", b2)}, }, }) if err != nil { return "", err } if len(existing.Items) > 0 { - uuid := existing.Items[0].UUID - log.Printf("using lightning binary in existing collection %s (name is %q; did not verify whether content matches)", uuid, cname) - return uuid, nil + coll := existing.Items[0] + log.Printf("using lightning binary in existing collection %s (name is %q, hash is %q; did not verify whether content matches)", coll.UUID, cname, coll.Properties["blake2b"]) + return coll.UUID, nil } log.Printf("writing lightning binary to new collection %q", cname) ac, err := arvadosclient.New(runner.Client) @@ -449,6 +456,9 @@ func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) { "owner_uuid": runner.ProjectUUID, "manifest_text": mtxt, "name": cname, + "properties": map[string]interface{}{ + "blake2b": fmt.Sprintf("%x", b2), + }, }, }) if err != nil { @@ -458,7 +468,11 @@ func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) { return coll.UUID, nil } -var arvadosClientFromEnv = arvados.NewClientFromEnv() +var ( + arvadosClientFromEnv = arvados.NewClientFromEnv() + siteFS arvados.CustomFileSystem + siteFSMtx sync.Mutex +) func open(fnm string) (io.ReadCloser, error) { if os.Getenv("ARVADOS_API_HOST") == "" { @@ -474,26 +488,24 @@ func open(fnm string) (io.ReadCloser, error) { return os.Open(fnm) } - log.Infof("reading %q from %s using Arvados client library", fnm[len(mnt):], uuid) - ac, err := arvadosclient.New(arvadosClientFromEnv) - if err != nil { - return nil, err - } - ac.Client = arvados.DefaultSecureClient - kc := keepclient.New(ac) - // Don't use keepclient's default short timeouts. - kc.HTTPClient = arvados.DefaultSecureClient - // Don't cache more than one block for this file. - kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 1} - - var coll arvados.Collection - err = arvadosClientFromEnv.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+uuid, nil, arvados.GetOptions{Select: []string{"uuid", "manifest_text"}}) - if err != nil { - return nil, err - } - fs, err := coll.FileSystem(arvadosClientFromEnv, kc) - if err != nil { - return nil, err - } - return fs.Open(fnm[len(mnt):]) + siteFSMtx.Lock() + defer siteFSMtx.Unlock() + if siteFS == nil { + log.Info("setting up Arvados client") + ac, err := arvadosclient.New(arvadosClientFromEnv) + if err != nil { + return nil, err + } + ac.Client = arvados.DefaultSecureClient + kc := keepclient.New(ac) + // Don't use keepclient's default short timeouts. + kc.HTTPClient = arvados.DefaultSecureClient + // Guess max concurrent readers, hope to avoid cache + // thrashing. + kc.BlockCache = &keepclient.BlockCache{MaxBlocks: runtime.NumCPU() * 3} + siteFS = arvadosClientFromEnv.SiteFileSystem(kc) + } + + log.Infof("reading %q from %s using Arvados client", fnm[len(mnt):], uuid) + return siteFS.Open("by_id/" + uuid + "/" + fnm[len(mnt):]) }