Improve import and vcf2fasta performance.
authorTom Clegg <tom@tomclegg.ca>
Wed, 2 Dec 2020 20:47:44 +0000 (15:47 -0500)
committerTom Clegg <tom@tomclegg.ca>
Wed, 2 Dec 2020 20:50:19 +0000 (15:50 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

arvados.go
cmd.go
go.mod
go.sum
import.go
taglib.go
tilelib.go
vcf2fasta.go

index 5f78a36408bdbbb8e2aba29b4571fbfd87749c85..a79160d78546cd364aded15290ef63817ce039f9 100644 (file)
@@ -5,6 +5,7 @@ import (
        "encoding/json"
        "errors"
        "fmt"
+       "io"
        "io/ioutil"
        "net/url"
        "os"
@@ -184,12 +185,14 @@ type arvadosContainerRunner struct {
        Name        string
        OutputName  string
        ProjectUUID string
+       APIAccess   bool
        VCPUs       int
        RAM         int64
        Prog        string // if empty, run /proc/self/exe
        Args        []string
        Mounts      map[string]map[string]interface{}
        Priority    int
+       KeepCache   int // cache buffers per VCPU (0 for default)
 }
 
 func (runner *arvadosContainerRunner) Run() (string, error) {
@@ -229,10 +232,15 @@ func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, e
        if priority < 1 {
                priority = 500
        }
+       keepCache := runner.KeepCache
+       if keepCache < 1 {
+               keepCache = 2
+       }
        rc := arvados.RuntimeConstraints{
+               API:          &runner.APIAccess,
                VCPUs:        runner.VCPUs,
                RAM:          runner.RAM,
-               KeepCacheRAM: (1 << 26) * 2 * int64(runner.VCPUs),
+               KeepCacheRAM: (1 << 26) * int64(keepCache) * int64(runner.VCPUs),
        }
        outname := &runner.OutputName
        if *outname == "" {
@@ -449,3 +457,43 @@ func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
        log.Printf("stored lightning binary in new collection %s", coll.UUID)
        return coll.UUID, nil
 }
+
+var arvadosClientFromEnv = arvados.NewClientFromEnv()
+
+func open(fnm string) (io.ReadCloser, error) {
+       if os.Getenv("ARVADOS_API_HOST") == "" {
+               return os.Open(fnm)
+       }
+       m := collectionInPathRe.FindStringSubmatch(fnm)
+       if m == nil {
+               return os.Open(fnm)
+       }
+       uuid := m[2]
+       mnt := "/mnt/" + uuid + "/"
+       if !strings.HasPrefix(fnm, mnt) {
+               return os.Open(fnm)
+       }
+
+       log.Infof("reading %q from %s using Arvados client library", fnm[len(mnt):], uuid)
+       ac, err := arvadosclient.New(arvadosClientFromEnv)
+       if err != nil {
+               return nil, err
+       }
+       ac.Client = arvados.DefaultSecureClient
+       kc := keepclient.New(ac)
+       // Don't use keepclient's default short timeouts.
+       kc.HTTPClient = arvados.DefaultSecureClient
+       // Don't cache more than one block for this file.
+       kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 1}
+
+       var coll arvados.Collection
+       err = arvadosClientFromEnv.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+uuid, nil, arvados.GetOptions{Select: []string{"uuid", "manifest_text"}})
+       if err != nil {
+               return nil, err
+       }
+       fs, err := coll.FileSystem(arvadosClientFromEnv, kc)
+       if err != nil {
+               return nil, err
+       }
+       return fs.Open(fnm[len(mnt):])
+}
diff --git a/cmd.go b/cmd.go
index 92bd5a29d7017d4a9ecf36e7aa4e1a7006aa6d1e..5789fe4604d3b5f41b75215e762b89868503e821 100644 (file)
--- a/cmd.go
+++ b/cmd.go
@@ -21,7 +21,6 @@ var (
                "ref2genome":         &ref2genome{},
                "vcf2fasta":          &vcf2fasta{},
                "import":             &importer{},
-               "import-stats-plot":  &importstatsplot{},
                "annotate":           &annotatecmd{},
                "export":             &exporter{},
                "export-numpy":       &exportNumpy{},
@@ -57,7 +56,7 @@ func (cmd *buildDockerImage) RunCommand(prog string, args []string, stdin io.Rea
 RUN DEBIAN_FRONTEND=noninteractive \
   apt-get update && \
   apt-get dist-upgrade -y && \
-  apt-get install -y --no-install-recommends bcftools bedtools samtools python2 python3-sklearn python3-matplotlib && \
+  apt-get install -y --no-install-recommends bcftools bedtools samtools python2 python3-sklearn python3-matplotlib ca-certificates && \
   apt-get clean
 `), 0644)
        if err != nil {
diff --git a/go.mod b/go.mod
index ffbfc042c059c6c6ddfaa4668c049948de8bc9ae..bc812fedcd2ea5dc084f86588588fbc501a40e48 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -11,8 +11,9 @@ require (
        github.com/gonum/internal v0.0.0-20181124074243-f884aa714029 // indirect
        github.com/james-bowman/nlp v0.0.0-20200417075118-1e2772e0e1e5
        github.com/james-bowman/sparse v0.0.0-20200514124614-ae250424e52d // indirect
+       github.com/klauspost/compress v1.11.3 // indirect
+       github.com/klauspost/pgzip v1.2.5
        github.com/kshedden/gonpy v0.0.0-20190510000443-66c21fac4672
-       github.com/lucasb-eyer/go-colorful v1.0.3
        github.com/mattn/go-isatty v0.0.12
        github.com/prometheus/client_golang v1.6.0 // indirect
        github.com/prometheus/common v0.10.0 // indirect
@@ -23,7 +24,6 @@ require (
        golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2
        golang.org/x/sys v0.0.0-20200519105757-fe76b779f299 // indirect
        gonum.org/v1/gonum v0.8.1
-       gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b
        gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15
        gopkg.in/yaml.v2 v2.3.0 // indirect
 )
diff --git a/go.sum b/go.sum
index f3617dfc0faf03420227ad2c77f723b859548e43..b9ad92e6f8d22ea81e90c4021e373e40bbe387f4 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -63,6 +63,7 @@ github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
 github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
 github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g=
 github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
+github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
 github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
 github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
@@ -116,6 +117,10 @@ github.com/karalabe/xgo v0.0.0-20191115072854-c5ccff8648a7/go.mod h1:iYGcTYIPUvE
 github.com/kevinburke/ssh_config v0.0.0-20171013211458-802051befeb5/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
 github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+github.com/klauspost/compress v1.11.3 h1:dB4Bn0tN3wdCzQxnS8r06kV74qN/TAfaIS0bVE8h3jc=
+github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
+github.com/klauspost/pgzip v1.2.5 h1:qnWYvvKqedOF2ulHpMG72XQol4ILEJ8k2wwRl/Km8oE=
+github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
@@ -129,8 +134,6 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
 github.com/kshedden/gonpy v0.0.0-20190510000443-66c21fac4672 h1:LQLnybCU54zB8Gj8c1DPeZEheIAn3eZ8Cc9fYqM4ac8=
 github.com/kshedden/gonpy v0.0.0-20190510000443-66c21fac4672/go.mod h1:+uEXxXG0RlfBPqG1tq5QN/F2jRlcuY0dExSONLpEwcA=
 github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
-github.com/lucasb-eyer/go-colorful v1.0.3 h1:QIbQXiugsb+q10B+MI+7DI1oQLdmnep86tWFlaaUAac=
-github.com/lucasb-eyer/go-colorful v1.0.3/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
 github.com/marstr/guid v1.1.1-0.20170427235115-8bdf7d1a087c/go.mod h1:74gB1z2wpxxInTG6yaqA7KrtM0NZ+RbrcqDvYHefzho=
 github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
 github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
index cea24f2785bc0c76568a018dee30ff4d9c285e55..f4e0cd0008c218cf23553e12be9d174031e06d2c 100644 (file)
--- a/import.go
+++ b/import.go
@@ -10,6 +10,7 @@ import (
        "flag"
        "fmt"
        "io"
+       "io/ioutil"
        "net/http"
        _ "net/http/pprof"
        "os"
@@ -23,13 +24,8 @@ import (
        "sync/atomic"
        "time"
 
-       "git.arvados.org/arvados.git/sdk/go/arvados"
-       "github.com/lucasb-eyer/go-colorful"
+       "github.com/klauspost/pgzip"
        log "github.com/sirupsen/logrus"
-       "gonum.org/v1/plot"
-       "gonum.org/v1/plot/plotter"
-       "gonum.org/v1/plot/vg"
-       "gonum.org/v1/plot/vg/draw"
 )
 
 type importer struct {
@@ -132,12 +128,12 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std
                }
                defer outf.Close()
                if strings.HasSuffix(cmd.outputFile, ".gz") {
-                       outw = gzip.NewWriter(outf)
+                       outw = pgzip.NewWriter(outf)
                } else {
                        outw = outf
                }
        }
-       bufw := bufio.NewWriter(outw)
+       bufw := bufio.NewWriterSize(outw, 64*1024*1024)
        cmd.encoder = gob.NewEncoder(bufw)
 
        tilelib := &tileLibrary{taglib: taglib, retainNoCalls: cmd.saveIncompleteTiles, skipOOO: cmd.skipOOO}
@@ -179,14 +175,15 @@ func (cmd *importer) runBatches(stdout io.Writer, inputs []string) error {
                // possibly even an in-place update.
                return errors.New("cannot specify output file in container mode: not implemented")
        }
-       client := arvados.NewClientFromEnv()
        runner := arvadosContainerRunner{
                Name:        "lightning import",
-               Client:      client,
+               Client:      arvadosClientFromEnv,
                ProjectUUID: cmd.projectUUID,
-               RAM:         80000000000,
-               VCPUs:       32,
+               APIAccess:   true,
+               RAM:         300000000000,
+               VCPUs:       64,
                Priority:    cmd.priority,
+               KeepCache:   1,
        }
        err := runner.TranslatePaths(&cmd.tagLibraryFile, &cmd.refFile, &cmd.outputFile)
        if err != nil {
@@ -207,6 +204,7 @@ func (cmd *importer) runBatches(stdout io.Writer, inputs []string) error {
                runner.Args = []string{"import",
                        "-local=true",
                        "-loglevel=" + cmd.loglevel,
+                       "-pprof=:6061",
                        fmt.Sprintf("-skip-ooo=%v", cmd.skipOOO),
                        fmt.Sprintf("-output-tiles=%v", cmd.outputTiles),
                        fmt.Sprintf("-save-incomplete-tiles=%v", cmd.saveIncompleteTiles),
@@ -233,13 +231,14 @@ func (cmd *importer) runBatches(stdout io.Writer, inputs []string) error {
 
 func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, []importStats, error) {
        var input io.ReadCloser
-       input, err := os.Open(infile)
+       input, err := open(infile)
        if err != nil {
                return nil, nil, err
        }
        defer input.Close()
+       input = ioutil.NopCloser(bufio.NewReaderSize(input, 8*1024*1024))
        if strings.HasSuffix(infile, ".gz") {
-               input, err = gzip.NewReader(input)
+               input, err = pgzip.NewReader(input)
                if err != nil {
                        return nil, nil, err
                }
@@ -250,14 +249,14 @@ func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, []
 
 func (cmd *importer) loadTagLibrary() (*tagLibrary, error) {
        log.Printf("tag library %s load starting", cmd.tagLibraryFile)
-       f, err := os.Open(cmd.tagLibraryFile)
+       f, err := open(cmd.tagLibraryFile)
        if err != nil {
                return nil, err
        }
        defer f.Close()
-       var rdr io.ReadCloser = f
+       rdr := ioutil.NopCloser(bufio.NewReaderSize(f, 64*1024*1024))
        if strings.HasSuffix(cmd.tagLibraryFile, ".gz") {
-               rdr, err = gzip.NewReader(f)
+               rdr, err = gzip.NewReader(rdr)
                if err != nil {
                        return nil, fmt.Errorf("%s: gzip: %s", cmd.tagLibraryFile, err)
                }
@@ -426,7 +425,7 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error {
        go close(todo)
        var tileJobs sync.WaitGroup
        var running int64
-       for i := 0; i < runtime.NumCPU()*9/8+1; i++ {
+       for i := 0; i < runtime.GOMAXPROCS(-1)*2; i++ {
                tileJobs.Add(1)
                atomic.AddInt64(&running, 1)
                go func() {
@@ -544,73 +543,3 @@ func flatten(variants [][]tileVariantID) []tileVariantID {
        }
        return flat
 }
-
-type importstatsplot struct{}
-
-func (cmd *importstatsplot) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
-       err := cmd.Plot(stdin, stdout)
-       if err != nil {
-               log.Errorf("%s", err)
-               return 1
-       }
-       return 0
-}
-
-func (cmd *importstatsplot) Plot(stdin io.Reader, stdout io.Writer) error {
-       var stats []importStats
-       err := json.NewDecoder(stdin).Decode(&stats)
-       if err != nil {
-               return err
-       }
-
-       p, err := plot.New()
-       if err != nil {
-               return err
-       }
-       p.Title.Text = "coverage preserved by import (excl X<0.65)"
-       p.X.Label.Text = "input base calls ÷ sequence length"
-       p.Y.Label.Text = "output base calls ÷ input base calls"
-       p.Add(plotter.NewGrid())
-
-       data := map[string]plotter.XYs{}
-       for _, stat := range stats {
-               data[stat.InputLabel] = append(data[stat.InputLabel], plotter.XY{
-                       X: float64(stat.InputCoverage) / float64(stat.InputLength),
-                       Y: float64(stat.TileCoverage) / float64(stat.InputCoverage),
-               })
-       }
-
-       labels := []string{}
-       for label := range data {
-               labels = append(labels, label)
-       }
-       sort.Strings(labels)
-       palette, err := colorful.SoftPalette(len(labels))
-       if err != nil {
-               return err
-       }
-       nextInPalette := 0
-       for idx, label := range labels {
-               s, err := plotter.NewScatter(data[label])
-               if err != nil {
-                       return err
-               }
-               s.GlyphStyle.Color = palette[idx]
-               s.GlyphStyle.Radius = vg.Millimeter / 2
-               s.GlyphStyle.Shape = draw.CrossGlyph{}
-               nextInPalette += 7
-               p.Add(s)
-               if false {
-                       p.Legend.Add(label, s)
-               }
-       }
-       p.X.Min = 0.65
-       p.X.Max = 1
-
-       w, err := p.WriterTo(8*vg.Inch, 6*vg.Inch, "svg")
-       if err != nil {
-               return err
-       }
-       _, err = w.WriteTo(stdout)
-       return err
-}
index f92a91f375560014a9b768a2b37450518505528b..527884c847761fa3720f09441273302ac5969e85 100644 (file)
--- a/taglib.go
+++ b/taglib.go
@@ -69,6 +69,10 @@ func (taglib *tagLibrary) Len() int {
        return len(taglib.tagmap)
 }
 
+func (taglib *tagLibrary) TagLen() int {
+       return taglib.keylen
+}
+
 var (
        twobit = func() []tagmapKey {
                r := make([]tagmapKey, 256)
index 522159dede5994d42264d46b1b4f1bebdfdc9de2..926c0324a2c74f3b59a04ea97302803012629541 100644 (file)
@@ -253,7 +253,6 @@ type importStats struct {
        InputLabel             string
        InputLength            int
        InputCoverage          int
-       TileCoverage           int
        PathLength             int
        DroppedOutOfOrderTiles int
 }
@@ -283,15 +282,15 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChro
                todo <- jobT{seqlabel, fasta}
        }()
        type foundtag struct {
-               pos    int
-               tagid  tagID
-               taglen int
+               pos   int
+               tagid tagID
        }
        found := make([]foundtag, 2000000)
        path := make([]tileLibRef, 2000000)
        totalFoundTags := 0
        totalPathLen := 0
        skippedSequences := 0
+       taglen := tilelib.taglib.TagLen()
        var stats []importStats
        for job := range todo {
                if len(job.fasta) == 0 {
@@ -304,15 +303,16 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChro
 
                found = found[:0]
                tilelib.taglib.FindAll(job.fasta, func(tagid tagID, pos, taglen int) {
-                       found = append(found, foundtag{pos: pos, tagid: tagid, taglen: taglen})
+                       found = append(found, foundtag{pos: pos, tagid: tagid})
                })
                totalFoundTags += len(found)
+               if len(found) == 0 {
+                       log.Warnf("%s %s no tags found", filelabel, job.label)
+               }
 
-               basesOut := 0
                skipped := 0
-               path = path[:0]
-               last := foundtag{tagid: -1}
                if tilelib.skipOOO {
+                       log.Infof("%s %s keeping longest increasing subsequence", filelabel, job.label)
                        keep := longestIncreasingSubsequence(len(found), func(i int) int { return int(found[i].tagid) })
                        for i, x := range keep {
                                found[i] = found[x]
@@ -320,56 +320,44 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChro
                        skipped = len(found) - len(keep)
                        found = found[:len(keep)]
                }
+
+               log.Infof("%s %s getting %d librefs", filelabel, job.label, len(found))
+               throttle := &throttle{Max: runtime.NumCPU()}
+               path = path[:len(found)]
                for i, f := range found {
-                       log.Tracef("%s %s found[%d] == %#v", filelabel, job.label, i, f)
-                       if last.tagid < 0 {
-                               // first tag in sequence
-                               last = foundtag{tagid: f.tagid}
-                               continue
-                       }
-                       libref := tilelib.getRef(last.tagid, job.fasta[last.pos:f.pos+f.taglen])
-                       path = append(path, libref)
-                       if libref.Variant > 0 {
-                               // Count output coverage from
-                               // the end of the previous tag
-                               // (if any) to the end of the
-                               // current tag, IOW don't
-                               // double-count coverage for
-                               // the previous tag.
-                               basesOut += countBases(job.fasta[last.pos+last.taglen : f.pos+f.taglen])
-                       } else {
-                               // If we dropped this tile
-                               // (because !retainNoCalls),
-                               // set taglen=0 so the
-                               // overlapping tag is counted
-                               // toward coverage on the
-                               // following tile.
-                               f.taglen = 0
-                       }
-                       last = f
-               }
-               if last.tagid < 0 {
-                       log.Warnf("%s %s no tags found", filelabel, job.label)
-               } else {
-                       libref := tilelib.getRef(last.tagid, job.fasta[last.pos:])
-                       path = append(path, libref)
-                       if libref.Variant > 0 {
-                               basesOut += countBases(job.fasta[last.pos+last.taglen:])
-                       }
+                       i, f := i, f
+                       throttle.Acquire()
+                       go func() {
+                               defer throttle.Release()
+                               var startpos, endpos int
+                               if i == 0 {
+                                       startpos = 0
+                               } else {
+                                       startpos = f.pos
+                               }
+                               if i == len(found)-1 {
+                                       endpos = len(job.fasta)
+                               } else {
+                                       endpos = found[i+1].pos + taglen
+                               }
+                               path[i] = tilelib.getRef(f.tagid, job.fasta[startpos:endpos])
+                       }()
                }
+               throttle.Wait()
+
+               log.Infof("%s %s copying path", filelabel, job.label)
 
                pathcopy := make([]tileLibRef, len(path))
                copy(pathcopy, path)
                ret[job.label] = pathcopy
 
                basesIn := countBases(job.fasta)
-               log.Infof("%s %s fasta in %d coverage in %d coverage out %d path len %d skipped %d", filelabel, job.label, len(job.fasta), basesIn, basesOut, len(path), skipped)
+               log.Infof("%s %s fasta in %d coverage in %d path len %d skipped-out-of-order %d", filelabel, job.label, len(job.fasta), basesIn, len(path), skipped)
                stats = append(stats, importStats{
                        InputFile:              filelabel,
                        InputLabel:             job.label,
                        InputLength:            len(job.fasta),
                        InputCoverage:          basesIn,
-                       TileCoverage:           basesOut,
                        PathLength:             len(path),
                        DroppedOutOfOrderTiles: skipped,
                })
@@ -397,36 +385,42 @@ func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef {
                }
        }
        seqhash := blake2b.Sum256(seq)
+       var vlock sync.Locker
+
        tilelib.mtx.RLock()
-       if int(tag) < len(tilelib.variant) {
+       if len(tilelib.vlock) > int(tag) {
+               vlock = tilelib.vlock[tag]
+       }
+       tilelib.mtx.RUnlock()
+
+       if vlock != nil {
+               vlock.Lock()
                for i, varhash := range tilelib.variant[tag] {
                        if varhash == seqhash {
-                               tilelib.mtx.RUnlock()
+                               vlock.Unlock()
                                return tileLibRef{Tag: tag, Variant: tileVariantID(i + 1)}
                        }
                }
-       }
-       var vlock sync.Locker
-       if len(tilelib.vlock) > int(tag) {
-               vlock = tilelib.vlock[tag]
-       }
-       tilelib.mtx.RUnlock()
-       if vlock == nil {
+               vlock.Unlock()
+       } else {
                tilelib.mtx.Lock()
                if tilelib.variant == nil && tilelib.taglib != nil {
                        tilelib.variant = make([][][blake2b.Size256]byte, tilelib.taglib.Len())
                        tilelib.vlock = make([]sync.Locker, tilelib.taglib.Len())
                        for i := range tilelib.vlock {
-                               tilelib.vlock[i] = &sync.Mutex{}
+                               tilelib.vlock[i] = new(sync.Mutex)
                        }
                }
                if int(tag) >= len(tilelib.variant) {
-                       oldlen := len(tilelib.variant)
+                       oldlen := len(tilelib.vlock)
+                       for i := 0; i < oldlen; i++ {
+                               tilelib.vlock[i].Lock()
+                       }
                        // If we haven't seen the tag library yet (as
                        // in a merge), tilelib.taglib.Len() is
                        // zero. We can still behave correctly, we
-                       // just need to expand the tilelib.variant
-                       // slice as needed.
+                       // just need to expand the tilelib.variant and
+                       // tilelib.vlock slices as needed.
                        if int(tag) >= cap(tilelib.variant) {
                                // Allocate 2x capacity.
                                newslice := make([][][blake2b.Size256]byte, int(tag)+1, (int(tag)+1)*2)
@@ -441,34 +435,37 @@ func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef {
                                tilelib.variant = tilelib.variant[:int(tag)+1]
                                tilelib.vlock = tilelib.vlock[:int(tag)+1]
                        }
-                       for i := oldlen; i < len(tilelib.variant); i++ {
-                               tilelib.vlock[i] = &sync.Mutex{}
+                       for i := oldlen; i < len(tilelib.vlock); i++ {
+                               tilelib.vlock[i] = new(sync.Mutex)
+                       }
+                       for i := 0; i < oldlen; i++ {
+                               tilelib.vlock[i].Unlock()
                        }
                }
                vlock = tilelib.vlock[tag]
                tilelib.mtx.Unlock()
        }
 
-       tilelib.mtx.RLock()
        vlock.Lock()
        for i, varhash := range tilelib.variant[tag] {
                if varhash == seqhash {
                        vlock.Unlock()
-                       tilelib.mtx.RUnlock()
                        return tileLibRef{Tag: tag, Variant: tileVariantID(i + 1)}
                }
        }
        atomic.AddInt64(&tilelib.variants, 1)
        tilelib.variant[tag] = append(tilelib.variant[tag], seqhash)
+       variant := tileVariantID(len(tilelib.variant[tag]))
+       vlock.Unlock()
+
        if tilelib.retainTileSequences && !dropSeq {
+               tilelib.mtx.Lock()
                if tilelib.seq == nil {
                        tilelib.seq = map[[blake2b.Size256]byte][]byte{}
                }
                tilelib.seq[seqhash] = append([]byte(nil), seq...)
+               tilelib.mtx.Unlock()
        }
-       variant := tileVariantID(len(tilelib.variant[tag]))
-       vlock.Unlock()
-       tilelib.mtx.RUnlock()
 
        if tilelib.encoder != nil {
                saveSeq := seq
index cb43ee9567101dd7b653f078745d754c0fdec3d6..7f8df2b5ae4850ffc6a9ff78a88ea67d81c04adb 100644 (file)
@@ -22,7 +22,7 @@ import (
        "sync"
        "syscall"
 
-       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/klauspost/pgzip"
        log "github.com/sirupsen/logrus"
 )
 
@@ -107,14 +107,15 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st
                                cmd.vcpus = 32
                        }
                }
-               client := arvados.NewClientFromEnv()
                runner := arvadosContainerRunner{
                        Name:        "lightning vcf2fasta",
-                       Client:      client,
+                       Client:      arvadosClientFromEnv,
                        ProjectUUID: cmd.projectUUID,
                        RAM:         2<<30 + int64(cmd.vcpus)<<28,
                        VCPUs:       cmd.vcpus,
                        Priority:    *priority,
+                       KeepCache:   2,
+                       APIAccess:   true,
                        Mounts: map[string]map[string]interface{}{
                                "/gvcf_regions.py": map[string]interface{}{
                                        "kind":    "text",
@@ -238,7 +239,7 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
        }
        defer outf.Close()
        bufw := bufio.NewWriterSize(outf, 8*1024*1024)
-       gzipw := gzip.NewWriter(bufw)
+       gzipw := pgzip.NewWriter(bufw)
        defer gzipw.Close()
 
        var maskfifo string // filename of mask fifo if we're running bedtools, otherwise ""
@@ -248,12 +249,13 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
        if cmd.mask {
                chrSize := map[string]int{}
 
-               vcffile, err := os.Open(infile)
+               vcffile, err := open(infile)
                if err != nil {
                        return err
                }
                defer vcffile.Close()
                var rdr io.Reader = vcffile
+               rdr = bufio.NewReaderSize(rdr, 8*1024*1024)
                if strings.HasSuffix(infile, ".gz") {
                        rdr, err = gzip.NewReader(vcffile)
                        if err != nil {
@@ -297,7 +299,7 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
                        // Read chromosome sizes from genome file in
                        // case any weren't specified in the VCF
                        // header.
-                       genomeFile, err := os.Open(cmd.genomeFile)
+                       genomeFile, err := open(cmd.genomeFile)
                        if err != nil {
                                return fmt.Errorf("error opening genome file %q: %s", cmd.genomeFile, err)
                        }