"encoding/json"
"errors"
"fmt"
+ "io"
"io/ioutil"
"net/url"
"os"
Name string
OutputName string
ProjectUUID string
+ APIAccess bool
VCPUs int
RAM int64
Prog string // if empty, run /proc/self/exe
Args []string
Mounts map[string]map[string]interface{}
Priority int
+ KeepCache int // cache buffers per VCPU (0 for default)
}
func (runner *arvadosContainerRunner) Run() (string, error) {
if priority < 1 {
priority = 500
}
+ keepCache := runner.KeepCache
+ if keepCache < 1 {
+ keepCache = 2
+ }
rc := arvados.RuntimeConstraints{
+ API: &runner.APIAccess,
VCPUs: runner.VCPUs,
RAM: runner.RAM,
- KeepCacheRAM: (1 << 26) * 2 * int64(runner.VCPUs),
+ KeepCacheRAM: (1 << 26) * int64(keepCache) * int64(runner.VCPUs),
}
outname := &runner.OutputName
if *outname == "" {
log.Printf("stored lightning binary in new collection %s", coll.UUID)
return coll.UUID, nil
}
+
+var arvadosClientFromEnv = arvados.NewClientFromEnv()
+
+func open(fnm string) (io.ReadCloser, error) {
+ if os.Getenv("ARVADOS_API_HOST") == "" {
+ return os.Open(fnm)
+ }
+ m := collectionInPathRe.FindStringSubmatch(fnm)
+ if m == nil {
+ return os.Open(fnm)
+ }
+ uuid := m[2]
+ mnt := "/mnt/" + uuid + "/"
+ if !strings.HasPrefix(fnm, mnt) {
+ return os.Open(fnm)
+ }
+
+ log.Infof("reading %q from %s using Arvados client library", fnm[len(mnt):], uuid)
+ ac, err := arvadosclient.New(arvadosClientFromEnv)
+ if err != nil {
+ return nil, err
+ }
+ ac.Client = arvados.DefaultSecureClient
+ kc := keepclient.New(ac)
+ // Don't use keepclient's default short timeouts.
+ kc.HTTPClient = arvados.DefaultSecureClient
+ // Don't cache more than one block for this file.
+ kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 1}
+
+ var coll arvados.Collection
+ err = arvadosClientFromEnv.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+uuid, nil, arvados.GetOptions{Select: []string{"uuid", "manifest_text"}})
+ if err != nil {
+ return nil, err
+ }
+ fs, err := coll.FileSystem(arvadosClientFromEnv, kc)
+ if err != nil {
+ return nil, err
+ }
+ return fs.Open(fnm[len(mnt):])
+}
"ref2genome": &ref2genome{},
"vcf2fasta": &vcf2fasta{},
"import": &importer{},
- "import-stats-plot": &importstatsplot{},
"annotate": &annotatecmd{},
"export": &exporter{},
"export-numpy": &exportNumpy{},
RUN DEBIAN_FRONTEND=noninteractive \
apt-get update && \
apt-get dist-upgrade -y && \
- apt-get install -y --no-install-recommends bcftools bedtools samtools python2 python3-sklearn python3-matplotlib && \
+ apt-get install -y --no-install-recommends bcftools bedtools samtools python2 python3-sklearn python3-matplotlib ca-certificates && \
apt-get clean
`), 0644)
if err != nil {
github.com/gonum/internal v0.0.0-20181124074243-f884aa714029 // indirect
github.com/james-bowman/nlp v0.0.0-20200417075118-1e2772e0e1e5
github.com/james-bowman/sparse v0.0.0-20200514124614-ae250424e52d // indirect
+ github.com/klauspost/compress v1.11.3 // indirect
+ github.com/klauspost/pgzip v1.2.5
github.com/kshedden/gonpy v0.0.0-20190510000443-66c21fac4672
- github.com/lucasb-eyer/go-colorful v1.0.3
github.com/mattn/go-isatty v0.0.12
github.com/prometheus/client_golang v1.6.0 // indirect
github.com/prometheus/common v0.10.0 // indirect
golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299 // indirect
gonum.org/v1/gonum v0.8.1
- gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15
gopkg.in/yaml.v2 v2.3.0 // indirect
)
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
+github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/kevinburke/ssh_config v0.0.0-20171013211458-802051befeb5/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+github.com/klauspost/compress v1.11.3 h1:dB4Bn0tN3wdCzQxnS8r06kV74qN/TAfaIS0bVE8h3jc=
+github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
+github.com/klauspost/pgzip v1.2.5 h1:qnWYvvKqedOF2ulHpMG72XQol4ILEJ8k2wwRl/Km8oE=
+github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
github.com/kshedden/gonpy v0.0.0-20190510000443-66c21fac4672 h1:LQLnybCU54zB8Gj8c1DPeZEheIAn3eZ8Cc9fYqM4ac8=
github.com/kshedden/gonpy v0.0.0-20190510000443-66c21fac4672/go.mod h1:+uEXxXG0RlfBPqG1tq5QN/F2jRlcuY0dExSONLpEwcA=
github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
-github.com/lucasb-eyer/go-colorful v1.0.3 h1:QIbQXiugsb+q10B+MI+7DI1oQLdmnep86tWFlaaUAac=
-github.com/lucasb-eyer/go-colorful v1.0.3/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
github.com/marstr/guid v1.1.1-0.20170427235115-8bdf7d1a087c/go.mod h1:74gB1z2wpxxInTG6yaqA7KrtM0NZ+RbrcqDvYHefzho=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
"flag"
"fmt"
"io"
+ "io/ioutil"
"net/http"
_ "net/http/pprof"
"os"
"sync/atomic"
"time"
- "git.arvados.org/arvados.git/sdk/go/arvados"
- "github.com/lucasb-eyer/go-colorful"
+ "github.com/klauspost/pgzip"
log "github.com/sirupsen/logrus"
- "gonum.org/v1/plot"
- "gonum.org/v1/plot/plotter"
- "gonum.org/v1/plot/vg"
- "gonum.org/v1/plot/vg/draw"
)
type importer struct {
}
defer outf.Close()
if strings.HasSuffix(cmd.outputFile, ".gz") {
- outw = gzip.NewWriter(outf)
+ outw = pgzip.NewWriter(outf)
} else {
outw = outf
}
}
- bufw := bufio.NewWriter(outw)
+ bufw := bufio.NewWriterSize(outw, 64*1024*1024)
cmd.encoder = gob.NewEncoder(bufw)
tilelib := &tileLibrary{taglib: taglib, retainNoCalls: cmd.saveIncompleteTiles, skipOOO: cmd.skipOOO}
// possibly even an in-place update.
return errors.New("cannot specify output file in container mode: not implemented")
}
- client := arvados.NewClientFromEnv()
runner := arvadosContainerRunner{
Name: "lightning import",
- Client: client,
+ Client: arvadosClientFromEnv,
ProjectUUID: cmd.projectUUID,
- RAM: 80000000000,
- VCPUs: 32,
+ APIAccess: true,
+ RAM: 300000000000,
+ VCPUs: 64,
Priority: cmd.priority,
+ KeepCache: 1,
}
err := runner.TranslatePaths(&cmd.tagLibraryFile, &cmd.refFile, &cmd.outputFile)
if err != nil {
runner.Args = []string{"import",
"-local=true",
"-loglevel=" + cmd.loglevel,
+ "-pprof=:6061",
fmt.Sprintf("-skip-ooo=%v", cmd.skipOOO),
fmt.Sprintf("-output-tiles=%v", cmd.outputTiles),
fmt.Sprintf("-save-incomplete-tiles=%v", cmd.saveIncompleteTiles),
func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, []importStats, error) {
var input io.ReadCloser
- input, err := os.Open(infile)
+ input, err := open(infile)
if err != nil {
return nil, nil, err
}
defer input.Close()
+ input = ioutil.NopCloser(bufio.NewReaderSize(input, 8*1024*1024))
if strings.HasSuffix(infile, ".gz") {
- input, err = gzip.NewReader(input)
+ input, err = pgzip.NewReader(input)
if err != nil {
return nil, nil, err
}
func (cmd *importer) loadTagLibrary() (*tagLibrary, error) {
log.Printf("tag library %s load starting", cmd.tagLibraryFile)
- f, err := os.Open(cmd.tagLibraryFile)
+ f, err := open(cmd.tagLibraryFile)
if err != nil {
return nil, err
}
defer f.Close()
- var rdr io.ReadCloser = f
+ rdr := ioutil.NopCloser(bufio.NewReaderSize(f, 64*1024*1024))
if strings.HasSuffix(cmd.tagLibraryFile, ".gz") {
- rdr, err = gzip.NewReader(f)
+ rdr, err = gzip.NewReader(rdr)
if err != nil {
return nil, fmt.Errorf("%s: gzip: %s", cmd.tagLibraryFile, err)
}
go close(todo)
var tileJobs sync.WaitGroup
var running int64
- for i := 0; i < runtime.NumCPU()*9/8+1; i++ {
+ for i := 0; i < runtime.GOMAXPROCS(-1)*2; i++ {
tileJobs.Add(1)
atomic.AddInt64(&running, 1)
go func() {
}
return flat
}
-
-type importstatsplot struct{}
-
-func (cmd *importstatsplot) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
- err := cmd.Plot(stdin, stdout)
- if err != nil {
- log.Errorf("%s", err)
- return 1
- }
- return 0
-}
-
-func (cmd *importstatsplot) Plot(stdin io.Reader, stdout io.Writer) error {
- var stats []importStats
- err := json.NewDecoder(stdin).Decode(&stats)
- if err != nil {
- return err
- }
-
- p, err := plot.New()
- if err != nil {
- return err
- }
- p.Title.Text = "coverage preserved by import (excl X<0.65)"
- p.X.Label.Text = "input base calls ÷ sequence length"
- p.Y.Label.Text = "output base calls ÷ input base calls"
- p.Add(plotter.NewGrid())
-
- data := map[string]plotter.XYs{}
- for _, stat := range stats {
- data[stat.InputLabel] = append(data[stat.InputLabel], plotter.XY{
- X: float64(stat.InputCoverage) / float64(stat.InputLength),
- Y: float64(stat.TileCoverage) / float64(stat.InputCoverage),
- })
- }
-
- labels := []string{}
- for label := range data {
- labels = append(labels, label)
- }
- sort.Strings(labels)
- palette, err := colorful.SoftPalette(len(labels))
- if err != nil {
- return err
- }
- nextInPalette := 0
- for idx, label := range labels {
- s, err := plotter.NewScatter(data[label])
- if err != nil {
- return err
- }
- s.GlyphStyle.Color = palette[idx]
- s.GlyphStyle.Radius = vg.Millimeter / 2
- s.GlyphStyle.Shape = draw.CrossGlyph{}
- nextInPalette += 7
- p.Add(s)
- if false {
- p.Legend.Add(label, s)
- }
- }
- p.X.Min = 0.65
- p.X.Max = 1
-
- w, err := p.WriterTo(8*vg.Inch, 6*vg.Inch, "svg")
- if err != nil {
- return err
- }
- _, err = w.WriteTo(stdout)
- return err
-}
return len(taglib.tagmap)
}
+func (taglib *tagLibrary) TagLen() int {
+ return taglib.keylen
+}
+
var (
twobit = func() []tagmapKey {
r := make([]tagmapKey, 256)
InputLabel string
InputLength int
InputCoverage int
- TileCoverage int
PathLength int
DroppedOutOfOrderTiles int
}
todo <- jobT{seqlabel, fasta}
}()
type foundtag struct {
- pos int
- tagid tagID
- taglen int
+ pos int
+ tagid tagID
}
found := make([]foundtag, 2000000)
path := make([]tileLibRef, 2000000)
totalFoundTags := 0
totalPathLen := 0
skippedSequences := 0
+ taglen := tilelib.taglib.TagLen()
var stats []importStats
for job := range todo {
if len(job.fasta) == 0 {
found = found[:0]
tilelib.taglib.FindAll(job.fasta, func(tagid tagID, pos, taglen int) {
- found = append(found, foundtag{pos: pos, tagid: tagid, taglen: taglen})
+ found = append(found, foundtag{pos: pos, tagid: tagid})
})
totalFoundTags += len(found)
+ if len(found) == 0 {
+ log.Warnf("%s %s no tags found", filelabel, job.label)
+ }
- basesOut := 0
skipped := 0
- path = path[:0]
- last := foundtag{tagid: -1}
if tilelib.skipOOO {
+ log.Infof("%s %s keeping longest increasing subsequence", filelabel, job.label)
keep := longestIncreasingSubsequence(len(found), func(i int) int { return int(found[i].tagid) })
for i, x := range keep {
found[i] = found[x]
skipped = len(found) - len(keep)
found = found[:len(keep)]
}
+
+ log.Infof("%s %s getting %d librefs", filelabel, job.label, len(found))
+ throttle := &throttle{Max: runtime.NumCPU()}
+ path = path[:len(found)]
for i, f := range found {
- log.Tracef("%s %s found[%d] == %#v", filelabel, job.label, i, f)
- if last.tagid < 0 {
- // first tag in sequence
- last = foundtag{tagid: f.tagid}
- continue
- }
- libref := tilelib.getRef(last.tagid, job.fasta[last.pos:f.pos+f.taglen])
- path = append(path, libref)
- if libref.Variant > 0 {
- // Count output coverage from
- // the end of the previous tag
- // (if any) to the end of the
- // current tag, IOW don't
- // double-count coverage for
- // the previous tag.
- basesOut += countBases(job.fasta[last.pos+last.taglen : f.pos+f.taglen])
- } else {
- // If we dropped this tile
- // (because !retainNoCalls),
- // set taglen=0 so the
- // overlapping tag is counted
- // toward coverage on the
- // following tile.
- f.taglen = 0
- }
- last = f
- }
- if last.tagid < 0 {
- log.Warnf("%s %s no tags found", filelabel, job.label)
- } else {
- libref := tilelib.getRef(last.tagid, job.fasta[last.pos:])
- path = append(path, libref)
- if libref.Variant > 0 {
- basesOut += countBases(job.fasta[last.pos+last.taglen:])
- }
+ i, f := i, f
+ throttle.Acquire()
+ go func() {
+ defer throttle.Release()
+ var startpos, endpos int
+ if i == 0 {
+ startpos = 0
+ } else {
+ startpos = f.pos
+ }
+ if i == len(found)-1 {
+ endpos = len(job.fasta)
+ } else {
+ endpos = found[i+1].pos + taglen
+ }
+ path[i] = tilelib.getRef(f.tagid, job.fasta[startpos:endpos])
+ }()
}
+ throttle.Wait()
+
+ log.Infof("%s %s copying path", filelabel, job.label)
pathcopy := make([]tileLibRef, len(path))
copy(pathcopy, path)
ret[job.label] = pathcopy
basesIn := countBases(job.fasta)
- log.Infof("%s %s fasta in %d coverage in %d coverage out %d path len %d skipped %d", filelabel, job.label, len(job.fasta), basesIn, basesOut, len(path), skipped)
+ log.Infof("%s %s fasta in %d coverage in %d path len %d skipped-out-of-order %d", filelabel, job.label, len(job.fasta), basesIn, len(path), skipped)
stats = append(stats, importStats{
InputFile: filelabel,
InputLabel: job.label,
InputLength: len(job.fasta),
InputCoverage: basesIn,
- TileCoverage: basesOut,
PathLength: len(path),
DroppedOutOfOrderTiles: skipped,
})
}
}
seqhash := blake2b.Sum256(seq)
+ var vlock sync.Locker
+
tilelib.mtx.RLock()
- if int(tag) < len(tilelib.variant) {
+ if len(tilelib.vlock) > int(tag) {
+ vlock = tilelib.vlock[tag]
+ }
+ tilelib.mtx.RUnlock()
+
+ if vlock != nil {
+ vlock.Lock()
for i, varhash := range tilelib.variant[tag] {
if varhash == seqhash {
- tilelib.mtx.RUnlock()
+ vlock.Unlock()
return tileLibRef{Tag: tag, Variant: tileVariantID(i + 1)}
}
}
- }
- var vlock sync.Locker
- if len(tilelib.vlock) > int(tag) {
- vlock = tilelib.vlock[tag]
- }
- tilelib.mtx.RUnlock()
- if vlock == nil {
+ vlock.Unlock()
+ } else {
tilelib.mtx.Lock()
if tilelib.variant == nil && tilelib.taglib != nil {
tilelib.variant = make([][][blake2b.Size256]byte, tilelib.taglib.Len())
tilelib.vlock = make([]sync.Locker, tilelib.taglib.Len())
for i := range tilelib.vlock {
- tilelib.vlock[i] = &sync.Mutex{}
+ tilelib.vlock[i] = new(sync.Mutex)
}
}
if int(tag) >= len(tilelib.variant) {
- oldlen := len(tilelib.variant)
+ oldlen := len(tilelib.vlock)
+ for i := 0; i < oldlen; i++ {
+ tilelib.vlock[i].Lock()
+ }
// If we haven't seen the tag library yet (as
// in a merge), tilelib.taglib.Len() is
// zero. We can still behave correctly, we
- // just need to expand the tilelib.variant
- // slice as needed.
+ // just need to expand the tilelib.variant and
+ // tilelib.vlock slices as needed.
if int(tag) >= cap(tilelib.variant) {
// Allocate 2x capacity.
newslice := make([][][blake2b.Size256]byte, int(tag)+1, (int(tag)+1)*2)
tilelib.variant = tilelib.variant[:int(tag)+1]
tilelib.vlock = tilelib.vlock[:int(tag)+1]
}
- for i := oldlen; i < len(tilelib.variant); i++ {
- tilelib.vlock[i] = &sync.Mutex{}
+ for i := oldlen; i < len(tilelib.vlock); i++ {
+ tilelib.vlock[i] = new(sync.Mutex)
+ }
+ for i := 0; i < oldlen; i++ {
+ tilelib.vlock[i].Unlock()
}
}
vlock = tilelib.vlock[tag]
tilelib.mtx.Unlock()
}
- tilelib.mtx.RLock()
vlock.Lock()
for i, varhash := range tilelib.variant[tag] {
if varhash == seqhash {
vlock.Unlock()
- tilelib.mtx.RUnlock()
return tileLibRef{Tag: tag, Variant: tileVariantID(i + 1)}
}
}
atomic.AddInt64(&tilelib.variants, 1)
tilelib.variant[tag] = append(tilelib.variant[tag], seqhash)
+ variant := tileVariantID(len(tilelib.variant[tag]))
+ vlock.Unlock()
+
if tilelib.retainTileSequences && !dropSeq {
+ tilelib.mtx.Lock()
if tilelib.seq == nil {
tilelib.seq = map[[blake2b.Size256]byte][]byte{}
}
tilelib.seq[seqhash] = append([]byte(nil), seq...)
+ tilelib.mtx.Unlock()
}
- variant := tileVariantID(len(tilelib.variant[tag]))
- vlock.Unlock()
- tilelib.mtx.RUnlock()
if tilelib.encoder != nil {
saveSeq := seq
"sync"
"syscall"
- "git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/klauspost/pgzip"
log "github.com/sirupsen/logrus"
)
cmd.vcpus = 32
}
}
- client := arvados.NewClientFromEnv()
runner := arvadosContainerRunner{
Name: "lightning vcf2fasta",
- Client: client,
+ Client: arvadosClientFromEnv,
ProjectUUID: cmd.projectUUID,
RAM: 2<<30 + int64(cmd.vcpus)<<28,
VCPUs: cmd.vcpus,
Priority: *priority,
+ KeepCache: 2,
+ APIAccess: true,
Mounts: map[string]map[string]interface{}{
"/gvcf_regions.py": map[string]interface{}{
"kind": "text",
}
defer outf.Close()
bufw := bufio.NewWriterSize(outf, 8*1024*1024)
- gzipw := gzip.NewWriter(bufw)
+ gzipw := pgzip.NewWriter(bufw)
defer gzipw.Close()
var maskfifo string // filename of mask fifo if we're running bedtools, otherwise ""
if cmd.mask {
chrSize := map[string]int{}
- vcffile, err := os.Open(infile)
+ vcffile, err := open(infile)
if err != nil {
return err
}
defer vcffile.Close()
var rdr io.Reader = vcffile
+ rdr = bufio.NewReaderSize(rdr, 8*1024*1024)
if strings.HasSuffix(infile, ".gz") {
rdr, err = gzip.NewReader(vcffile)
if err != nil {
// Read chromosome sizes from genome file in
// case any weren't specified in the VCF
// header.
- genomeFile, err := os.Open(cmd.genomeFile)
+ genomeFile, err := open(cmd.genomeFile)
if err != nil {
return fmt.Errorf("error opening genome file %q: %s", cmd.genomeFile, err)
}