From: Tom Clegg Date: Wed, 2 Dec 2020 20:47:44 +0000 (-0500) Subject: Improve import and vcf2fasta performance. X-Git-Url: https://git.arvados.org/lightning.git/commitdiff_plain/de10f554682eecf3926b409c70f9685901d11272 Improve import and vcf2fasta performance. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/arvados.go b/arvados.go index 5f78a36408..a79160d785 100644 --- a/arvados.go +++ b/arvados.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "io/ioutil" "net/url" "os" @@ -184,12 +185,14 @@ type arvadosContainerRunner struct { Name string OutputName string ProjectUUID string + APIAccess bool VCPUs int RAM int64 Prog string // if empty, run /proc/self/exe Args []string Mounts map[string]map[string]interface{} Priority int + KeepCache int // cache buffers per VCPU (0 for default) } func (runner *arvadosContainerRunner) Run() (string, error) { @@ -229,10 +232,15 @@ func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, e if priority < 1 { priority = 500 } + keepCache := runner.KeepCache + if keepCache < 1 { + keepCache = 2 + } rc := arvados.RuntimeConstraints{ + API: &runner.APIAccess, VCPUs: runner.VCPUs, RAM: runner.RAM, - KeepCacheRAM: (1 << 26) * 2 * int64(runner.VCPUs), + KeepCacheRAM: (1 << 26) * int64(keepCache) * int64(runner.VCPUs), } outname := &runner.OutputName if *outname == "" { @@ -449,3 +457,43 @@ func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) { log.Printf("stored lightning binary in new collection %s", coll.UUID) return coll.UUID, nil } + +var arvadosClientFromEnv = arvados.NewClientFromEnv() + +func open(fnm string) (io.ReadCloser, error) { + if os.Getenv("ARVADOS_API_HOST") == "" { + return os.Open(fnm) + } + m := collectionInPathRe.FindStringSubmatch(fnm) + if m == nil { + return os.Open(fnm) + } + uuid := m[2] + mnt := "/mnt/" + uuid + "/" + if !strings.HasPrefix(fnm, mnt) { + return os.Open(fnm) + } + + log.Infof("reading %q from %s using Arvados client library", fnm[len(mnt):], uuid) + ac, err := arvadosclient.New(arvadosClientFromEnv) + if err != nil { + return nil, err + } + ac.Client = arvados.DefaultSecureClient + kc := keepclient.New(ac) + // Don't use keepclient's default short timeouts. + kc.HTTPClient = arvados.DefaultSecureClient + // Don't cache more than one block for this file. + kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 1} + + var coll arvados.Collection + err = arvadosClientFromEnv.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+uuid, nil, arvados.GetOptions{Select: []string{"uuid", "manifest_text"}}) + if err != nil { + return nil, err + } + fs, err := coll.FileSystem(arvadosClientFromEnv, kc) + if err != nil { + return nil, err + } + return fs.Open(fnm[len(mnt):]) +} diff --git a/cmd.go b/cmd.go index 92bd5a29d7..5789fe4604 100644 --- a/cmd.go +++ b/cmd.go @@ -21,7 +21,6 @@ var ( "ref2genome": &ref2genome{}, "vcf2fasta": &vcf2fasta{}, "import": &importer{}, - "import-stats-plot": &importstatsplot{}, "annotate": &annotatecmd{}, "export": &exporter{}, "export-numpy": &exportNumpy{}, @@ -57,7 +56,7 @@ func (cmd *buildDockerImage) RunCommand(prog string, args []string, stdin io.Rea RUN DEBIAN_FRONTEND=noninteractive \ apt-get update && \ apt-get dist-upgrade -y && \ - apt-get install -y --no-install-recommends bcftools bedtools samtools python2 python3-sklearn python3-matplotlib && \ + apt-get install -y --no-install-recommends bcftools bedtools samtools python2 python3-sklearn python3-matplotlib ca-certificates && \ apt-get clean `), 0644) if err != nil { diff --git a/go.mod b/go.mod index ffbfc042c0..bc812fedcd 100644 --- a/go.mod +++ b/go.mod @@ -11,8 +11,9 @@ require ( github.com/gonum/internal v0.0.0-20181124074243-f884aa714029 // indirect github.com/james-bowman/nlp v0.0.0-20200417075118-1e2772e0e1e5 github.com/james-bowman/sparse v0.0.0-20200514124614-ae250424e52d // indirect + github.com/klauspost/compress v1.11.3 // indirect + github.com/klauspost/pgzip v1.2.5 github.com/kshedden/gonpy v0.0.0-20190510000443-66c21fac4672 - github.com/lucasb-eyer/go-colorful v1.0.3 github.com/mattn/go-isatty v0.0.12 github.com/prometheus/client_golang v1.6.0 // indirect github.com/prometheus/common v0.10.0 // indirect @@ -23,7 +24,6 @@ require ( golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2 golang.org/x/sys v0.0.0-20200519105757-fe76b779f299 // indirect gonum.org/v1/gonum v0.8.1 - gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 gopkg.in/yaml.v2 v2.3.0 // indirect ) diff --git a/go.sum b/go.sum index f3617dfc0f..b9ad92e6f8 100644 --- a/go.sum +++ b/go.sum @@ -63,6 +63,7 @@ github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -116,6 +117,10 @@ github.com/karalabe/xgo v0.0.0-20191115072854-c5ccff8648a7/go.mod h1:iYGcTYIPUvE github.com/kevinburke/ssh_config v0.0.0-20171013211458-802051befeb5/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.11.3 h1:dB4Bn0tN3wdCzQxnS8r06kV74qN/TAfaIS0bVE8h3jc= +github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/klauspost/pgzip v1.2.5 h1:qnWYvvKqedOF2ulHpMG72XQol4ILEJ8k2wwRl/Km8oE= +github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8= @@ -129,8 +134,6 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kshedden/gonpy v0.0.0-20190510000443-66c21fac4672 h1:LQLnybCU54zB8Gj8c1DPeZEheIAn3eZ8Cc9fYqM4ac8= github.com/kshedden/gonpy v0.0.0-20190510000443-66c21fac4672/go.mod h1:+uEXxXG0RlfBPqG1tq5QN/F2jRlcuY0dExSONLpEwcA= github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= -github.com/lucasb-eyer/go-colorful v1.0.3 h1:QIbQXiugsb+q10B+MI+7DI1oQLdmnep86tWFlaaUAac= -github.com/lucasb-eyer/go-colorful v1.0.3/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= github.com/marstr/guid v1.1.1-0.20170427235115-8bdf7d1a087c/go.mod h1:74gB1z2wpxxInTG6yaqA7KrtM0NZ+RbrcqDvYHefzho= github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= diff --git a/import.go b/import.go index cea24f2785..f4e0cd0008 100644 --- a/import.go +++ b/import.go @@ -10,6 +10,7 @@ import ( "flag" "fmt" "io" + "io/ioutil" "net/http" _ "net/http/pprof" "os" @@ -23,13 +24,8 @@ import ( "sync/atomic" "time" - "git.arvados.org/arvados.git/sdk/go/arvados" - "github.com/lucasb-eyer/go-colorful" + "github.com/klauspost/pgzip" log "github.com/sirupsen/logrus" - "gonum.org/v1/plot" - "gonum.org/v1/plot/plotter" - "gonum.org/v1/plot/vg" - "gonum.org/v1/plot/vg/draw" ) type importer struct { @@ -132,12 +128,12 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std } defer outf.Close() if strings.HasSuffix(cmd.outputFile, ".gz") { - outw = gzip.NewWriter(outf) + outw = pgzip.NewWriter(outf) } else { outw = outf } } - bufw := bufio.NewWriter(outw) + bufw := bufio.NewWriterSize(outw, 64*1024*1024) cmd.encoder = gob.NewEncoder(bufw) tilelib := &tileLibrary{taglib: taglib, retainNoCalls: cmd.saveIncompleteTiles, skipOOO: cmd.skipOOO} @@ -179,14 +175,15 @@ func (cmd *importer) runBatches(stdout io.Writer, inputs []string) error { // possibly even an in-place update. return errors.New("cannot specify output file in container mode: not implemented") } - client := arvados.NewClientFromEnv() runner := arvadosContainerRunner{ Name: "lightning import", - Client: client, + Client: arvadosClientFromEnv, ProjectUUID: cmd.projectUUID, - RAM: 80000000000, - VCPUs: 32, + APIAccess: true, + RAM: 300000000000, + VCPUs: 64, Priority: cmd.priority, + KeepCache: 1, } err := runner.TranslatePaths(&cmd.tagLibraryFile, &cmd.refFile, &cmd.outputFile) if err != nil { @@ -207,6 +204,7 @@ func (cmd *importer) runBatches(stdout io.Writer, inputs []string) error { runner.Args = []string{"import", "-local=true", "-loglevel=" + cmd.loglevel, + "-pprof=:6061", fmt.Sprintf("-skip-ooo=%v", cmd.skipOOO), fmt.Sprintf("-output-tiles=%v", cmd.outputTiles), fmt.Sprintf("-save-incomplete-tiles=%v", cmd.saveIncompleteTiles), @@ -233,13 +231,14 @@ func (cmd *importer) runBatches(stdout io.Writer, inputs []string) error { func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, []importStats, error) { var input io.ReadCloser - input, err := os.Open(infile) + input, err := open(infile) if err != nil { return nil, nil, err } defer input.Close() + input = ioutil.NopCloser(bufio.NewReaderSize(input, 8*1024*1024)) if strings.HasSuffix(infile, ".gz") { - input, err = gzip.NewReader(input) + input, err = pgzip.NewReader(input) if err != nil { return nil, nil, err } @@ -250,14 +249,14 @@ func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, [] func (cmd *importer) loadTagLibrary() (*tagLibrary, error) { log.Printf("tag library %s load starting", cmd.tagLibraryFile) - f, err := os.Open(cmd.tagLibraryFile) + f, err := open(cmd.tagLibraryFile) if err != nil { return nil, err } defer f.Close() - var rdr io.ReadCloser = f + rdr := ioutil.NopCloser(bufio.NewReaderSize(f, 64*1024*1024)) if strings.HasSuffix(cmd.tagLibraryFile, ".gz") { - rdr, err = gzip.NewReader(f) + rdr, err = gzip.NewReader(rdr) if err != nil { return nil, fmt.Errorf("%s: gzip: %s", cmd.tagLibraryFile, err) } @@ -426,7 +425,7 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { go close(todo) var tileJobs sync.WaitGroup var running int64 - for i := 0; i < runtime.NumCPU()*9/8+1; i++ { + for i := 0; i < runtime.GOMAXPROCS(-1)*2; i++ { tileJobs.Add(1) atomic.AddInt64(&running, 1) go func() { @@ -544,73 +543,3 @@ func flatten(variants [][]tileVariantID) []tileVariantID { } return flat } - -type importstatsplot struct{} - -func (cmd *importstatsplot) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { - err := cmd.Plot(stdin, stdout) - if err != nil { - log.Errorf("%s", err) - return 1 - } - return 0 -} - -func (cmd *importstatsplot) Plot(stdin io.Reader, stdout io.Writer) error { - var stats []importStats - err := json.NewDecoder(stdin).Decode(&stats) - if err != nil { - return err - } - - p, err := plot.New() - if err != nil { - return err - } - p.Title.Text = "coverage preserved by import (excl X<0.65)" - p.X.Label.Text = "input base calls ÷ sequence length" - p.Y.Label.Text = "output base calls ÷ input base calls" - p.Add(plotter.NewGrid()) - - data := map[string]plotter.XYs{} - for _, stat := range stats { - data[stat.InputLabel] = append(data[stat.InputLabel], plotter.XY{ - X: float64(stat.InputCoverage) / float64(stat.InputLength), - Y: float64(stat.TileCoverage) / float64(stat.InputCoverage), - }) - } - - labels := []string{} - for label := range data { - labels = append(labels, label) - } - sort.Strings(labels) - palette, err := colorful.SoftPalette(len(labels)) - if err != nil { - return err - } - nextInPalette := 0 - for idx, label := range labels { - s, err := plotter.NewScatter(data[label]) - if err != nil { - return err - } - s.GlyphStyle.Color = palette[idx] - s.GlyphStyle.Radius = vg.Millimeter / 2 - s.GlyphStyle.Shape = draw.CrossGlyph{} - nextInPalette += 7 - p.Add(s) - if false { - p.Legend.Add(label, s) - } - } - p.X.Min = 0.65 - p.X.Max = 1 - - w, err := p.WriterTo(8*vg.Inch, 6*vg.Inch, "svg") - if err != nil { - return err - } - _, err = w.WriteTo(stdout) - return err -} diff --git a/taglib.go b/taglib.go index f92a91f375..527884c847 100644 --- a/taglib.go +++ b/taglib.go @@ -69,6 +69,10 @@ func (taglib *tagLibrary) Len() int { return len(taglib.tagmap) } +func (taglib *tagLibrary) TagLen() int { + return taglib.keylen +} + var ( twobit = func() []tagmapKey { r := make([]tagmapKey, 256) diff --git a/tilelib.go b/tilelib.go index 522159dede..926c0324a2 100644 --- a/tilelib.go +++ b/tilelib.go @@ -253,7 +253,6 @@ type importStats struct { InputLabel string InputLength int InputCoverage int - TileCoverage int PathLength int DroppedOutOfOrderTiles int } @@ -283,15 +282,15 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChro todo <- jobT{seqlabel, fasta} }() type foundtag struct { - pos int - tagid tagID - taglen int + pos int + tagid tagID } found := make([]foundtag, 2000000) path := make([]tileLibRef, 2000000) totalFoundTags := 0 totalPathLen := 0 skippedSequences := 0 + taglen := tilelib.taglib.TagLen() var stats []importStats for job := range todo { if len(job.fasta) == 0 { @@ -304,15 +303,16 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChro found = found[:0] tilelib.taglib.FindAll(job.fasta, func(tagid tagID, pos, taglen int) { - found = append(found, foundtag{pos: pos, tagid: tagid, taglen: taglen}) + found = append(found, foundtag{pos: pos, tagid: tagid}) }) totalFoundTags += len(found) + if len(found) == 0 { + log.Warnf("%s %s no tags found", filelabel, job.label) + } - basesOut := 0 skipped := 0 - path = path[:0] - last := foundtag{tagid: -1} if tilelib.skipOOO { + log.Infof("%s %s keeping longest increasing subsequence", filelabel, job.label) keep := longestIncreasingSubsequence(len(found), func(i int) int { return int(found[i].tagid) }) for i, x := range keep { found[i] = found[x] @@ -320,56 +320,44 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChro skipped = len(found) - len(keep) found = found[:len(keep)] } + + log.Infof("%s %s getting %d librefs", filelabel, job.label, len(found)) + throttle := &throttle{Max: runtime.NumCPU()} + path = path[:len(found)] for i, f := range found { - log.Tracef("%s %s found[%d] == %#v", filelabel, job.label, i, f) - if last.tagid < 0 { - // first tag in sequence - last = foundtag{tagid: f.tagid} - continue - } - libref := tilelib.getRef(last.tagid, job.fasta[last.pos:f.pos+f.taglen]) - path = append(path, libref) - if libref.Variant > 0 { - // Count output coverage from - // the end of the previous tag - // (if any) to the end of the - // current tag, IOW don't - // double-count coverage for - // the previous tag. - basesOut += countBases(job.fasta[last.pos+last.taglen : f.pos+f.taglen]) - } else { - // If we dropped this tile - // (because !retainNoCalls), - // set taglen=0 so the - // overlapping tag is counted - // toward coverage on the - // following tile. - f.taglen = 0 - } - last = f - } - if last.tagid < 0 { - log.Warnf("%s %s no tags found", filelabel, job.label) - } else { - libref := tilelib.getRef(last.tagid, job.fasta[last.pos:]) - path = append(path, libref) - if libref.Variant > 0 { - basesOut += countBases(job.fasta[last.pos+last.taglen:]) - } + i, f := i, f + throttle.Acquire() + go func() { + defer throttle.Release() + var startpos, endpos int + if i == 0 { + startpos = 0 + } else { + startpos = f.pos + } + if i == len(found)-1 { + endpos = len(job.fasta) + } else { + endpos = found[i+1].pos + taglen + } + path[i] = tilelib.getRef(f.tagid, job.fasta[startpos:endpos]) + }() } + throttle.Wait() + + log.Infof("%s %s copying path", filelabel, job.label) pathcopy := make([]tileLibRef, len(path)) copy(pathcopy, path) ret[job.label] = pathcopy basesIn := countBases(job.fasta) - log.Infof("%s %s fasta in %d coverage in %d coverage out %d path len %d skipped %d", filelabel, job.label, len(job.fasta), basesIn, basesOut, len(path), skipped) + log.Infof("%s %s fasta in %d coverage in %d path len %d skipped-out-of-order %d", filelabel, job.label, len(job.fasta), basesIn, len(path), skipped) stats = append(stats, importStats{ InputFile: filelabel, InputLabel: job.label, InputLength: len(job.fasta), InputCoverage: basesIn, - TileCoverage: basesOut, PathLength: len(path), DroppedOutOfOrderTiles: skipped, }) @@ -397,36 +385,42 @@ func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef { } } seqhash := blake2b.Sum256(seq) + var vlock sync.Locker + tilelib.mtx.RLock() - if int(tag) < len(tilelib.variant) { + if len(tilelib.vlock) > int(tag) { + vlock = tilelib.vlock[tag] + } + tilelib.mtx.RUnlock() + + if vlock != nil { + vlock.Lock() for i, varhash := range tilelib.variant[tag] { if varhash == seqhash { - tilelib.mtx.RUnlock() + vlock.Unlock() return tileLibRef{Tag: tag, Variant: tileVariantID(i + 1)} } } - } - var vlock sync.Locker - if len(tilelib.vlock) > int(tag) { - vlock = tilelib.vlock[tag] - } - tilelib.mtx.RUnlock() - if vlock == nil { + vlock.Unlock() + } else { tilelib.mtx.Lock() if tilelib.variant == nil && tilelib.taglib != nil { tilelib.variant = make([][][blake2b.Size256]byte, tilelib.taglib.Len()) tilelib.vlock = make([]sync.Locker, tilelib.taglib.Len()) for i := range tilelib.vlock { - tilelib.vlock[i] = &sync.Mutex{} + tilelib.vlock[i] = new(sync.Mutex) } } if int(tag) >= len(tilelib.variant) { - oldlen := len(tilelib.variant) + oldlen := len(tilelib.vlock) + for i := 0; i < oldlen; i++ { + tilelib.vlock[i].Lock() + } // If we haven't seen the tag library yet (as // in a merge), tilelib.taglib.Len() is // zero. We can still behave correctly, we - // just need to expand the tilelib.variant - // slice as needed. + // just need to expand the tilelib.variant and + // tilelib.vlock slices as needed. if int(tag) >= cap(tilelib.variant) { // Allocate 2x capacity. newslice := make([][][blake2b.Size256]byte, int(tag)+1, (int(tag)+1)*2) @@ -441,34 +435,37 @@ func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef { tilelib.variant = tilelib.variant[:int(tag)+1] tilelib.vlock = tilelib.vlock[:int(tag)+1] } - for i := oldlen; i < len(tilelib.variant); i++ { - tilelib.vlock[i] = &sync.Mutex{} + for i := oldlen; i < len(tilelib.vlock); i++ { + tilelib.vlock[i] = new(sync.Mutex) + } + for i := 0; i < oldlen; i++ { + tilelib.vlock[i].Unlock() } } vlock = tilelib.vlock[tag] tilelib.mtx.Unlock() } - tilelib.mtx.RLock() vlock.Lock() for i, varhash := range tilelib.variant[tag] { if varhash == seqhash { vlock.Unlock() - tilelib.mtx.RUnlock() return tileLibRef{Tag: tag, Variant: tileVariantID(i + 1)} } } atomic.AddInt64(&tilelib.variants, 1) tilelib.variant[tag] = append(tilelib.variant[tag], seqhash) + variant := tileVariantID(len(tilelib.variant[tag])) + vlock.Unlock() + if tilelib.retainTileSequences && !dropSeq { + tilelib.mtx.Lock() if tilelib.seq == nil { tilelib.seq = map[[blake2b.Size256]byte][]byte{} } tilelib.seq[seqhash] = append([]byte(nil), seq...) + tilelib.mtx.Unlock() } - variant := tileVariantID(len(tilelib.variant[tag])) - vlock.Unlock() - tilelib.mtx.RUnlock() if tilelib.encoder != nil { saveSeq := seq diff --git a/vcf2fasta.go b/vcf2fasta.go index cb43ee9567..7f8df2b5ae 100644 --- a/vcf2fasta.go +++ b/vcf2fasta.go @@ -22,7 +22,7 @@ import ( "sync" "syscall" - "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/klauspost/pgzip" log "github.com/sirupsen/logrus" ) @@ -107,14 +107,15 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st cmd.vcpus = 32 } } - client := arvados.NewClientFromEnv() runner := arvadosContainerRunner{ Name: "lightning vcf2fasta", - Client: client, + Client: arvadosClientFromEnv, ProjectUUID: cmd.projectUUID, RAM: 2<<30 + int64(cmd.vcpus)<<28, VCPUs: cmd.vcpus, Priority: *priority, + KeepCache: 2, + APIAccess: true, Mounts: map[string]map[string]interface{}{ "/gvcf_regions.py": map[string]interface{}{ "kind": "text", @@ -238,7 +239,7 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error { } defer outf.Close() bufw := bufio.NewWriterSize(outf, 8*1024*1024) - gzipw := gzip.NewWriter(bufw) + gzipw := pgzip.NewWriter(bufw) defer gzipw.Close() var maskfifo string // filename of mask fifo if we're running bedtools, otherwise "" @@ -248,12 +249,13 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error { if cmd.mask { chrSize := map[string]int{} - vcffile, err := os.Open(infile) + vcffile, err := open(infile) if err != nil { return err } defer vcffile.Close() var rdr io.Reader = vcffile + rdr = bufio.NewReaderSize(rdr, 8*1024*1024) if strings.HasSuffix(infile, ".gz") { rdr, err = gzip.NewReader(vcffile) if err != nil { @@ -297,7 +299,7 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error { // Read chromosome sizes from genome file in // case any weren't specified in the VCF // header. - genomeFile, err := os.Open(cmd.genomeFile) + genomeFile, err := open(cmd.genomeFile) if err != nil { return fmt.Errorf("error opening genome file %q: %s", cmd.genomeFile, err) }