)
type annotatecmd struct {
+ dropTiles []bool
variantHash bool
maxTileSize int
tag2tagid map[string]tagID
}
func (cmd *annotatecmd) annotateSequence(throttle *throttle, outch chan<- string, tilelib *tileLibrary, taglen int, refname, seqname string, reftiles []tileLibRef, refnamecol bool) error {
+ refnamefield := ""
+ if refnamecol {
+ refnamefield = "," + trimFilenameForLabel(refname)
+ }
var refseq []byte
// tilestart[123] is the index into refseq
// where the tile for tag 123 was placed.
tileend[libref.Tag] = len(refseq)
}
log.Infof("seq %s len(refseq) %d len(tilestart) %d", seqname, len(refseq), len(tilestart))
+ // outtag is tag's index in the subset of tags that aren't
+ // dropped. If there are 10M tags and half are dropped by
+ // dropTiles, tag ranges from 0 to 10M-1 and outtag ranges
+ // from 0 to 5M-1.
+ //
+ // IOW, in the matrix built by cgs2array(), {tag} is
+ // represented by columns {outtag}*2 and {outtag}*2+1.
+ outcol := -1
for tag, tvs := range tilelib.variant {
+ if len(cmd.dropTiles) > tag && cmd.dropTiles[tag] {
+ continue
+ }
tag := tagID(tag)
+ outcol++
+ // Must shadow outcol var to use safely in goroutine below.
+ outcol := outcol
refstart, ok := tilestart[tag]
if !ok {
// Tag didn't place on this
// anyway, but we don't output
// the annotations that would
// result.)
+ // outch <- fmt.Sprintf("%d,%d,-1%s\n", tag, outcol, refnamefield)
continue
}
for variant := 1; variant <= len(tvs); variant++ {
} else {
varid = fmt.Sprintf("%d", variant)
}
- refnamefield := ""
- if refnamecol {
- refnamefield = "," + trimFilenameForLabel(refname)
- }
- outch <- fmt.Sprintf("%d,%s%s,%s:g.%s\n", tag, varid, refnamefield, seqname, diff.String())
+ outch <- fmt.Sprintf("%d,%d,%s%s,%s:g.%s\n", tag, outcol, varid, refnamefield, seqname, diff.String())
}
}()
}
package lightning
import (
+ "bufio"
"context"
"encoding/json"
"errors"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/keepclient"
+ "github.com/klauspost/pgzip"
log "github.com/sirupsen/logrus"
"golang.org/x/crypto/blake2b"
"golang.org/x/net/websocket"
return coll.UUID, nil
}
+// zopen returns a reader for the given file, using the arvados API
+// instead of arv-mount/fuse where applicable, and transparently
+// decompressing the input if fnm ends with ".gz".
+func zopen(fnm string) (io.ReadCloser, error) {
+ f, err := open(fnm)
+ if err != nil || !strings.HasSuffix(fnm, ".gz") {
+ return f, err
+ }
+ rdr, err := pgzip.NewReader(bufio.NewReaderSize(f, 4*1024*1024))
+ if err != nil {
+ f.Close()
+ return nil, err
+ }
+ return gzipr{rdr, f}, nil
+}
+
+// gzipr wraps a ReadCloser and a Closer, presenting a single Close()
+// method that closes both wrapped objects.
+type gzipr struct {
+ io.ReadCloser
+ io.Closer
+}
+
+func (gr gzipr) Close() error {
+ e1 := gr.ReadCloser.Close()
+ e2 := gr.Closer.Close()
+ if e1 != nil {
+ return e1
+ }
+ return e2
+}
+
var (
arvadosClientFromEnv = arvados.NewClientFromEnv()
siteFS arvados.CustomFileSystem
import (
"bufio"
+ "bytes"
"context"
"errors"
"flag"
"os"
"path"
"sort"
+ "strconv"
"strings"
"git.arvados.org/arvados.git/sdk/go/arvados"
annotationsFilename := flags.String("output-annotations", "", "output `file` for tile variant annotations csv")
librefsFilename := flags.String("output-onehot2tilevar", "", "when using -one-hot, create csv `file` mapping column# to tag# and variant#")
labelsFilename := flags.String("output-labels", "", "output `file` for genome labels csv")
+ regionsFilename := flags.String("regions", "", "only output columns/annotations that intersect regions in specified bed `file`")
onehot := flags.Bool("one-hot", false, "recode tile variants as one-hot")
chunks := flags.Int("chunks", 1, "split output into `N` numpy files")
cmd.filter.Flags(flags)
KeepCache: 1,
APIAccess: true,
}
- err = runner.TranslatePaths(inputFilename)
+ err = runner.TranslatePaths(inputFilename, regionsFilename)
if err != nil {
return 1
}
"-output-annotations", "/mnt/output/annotations.csv",
"-output-onehot2tilevar", "/mnt/output/onehot2tilevar.csv",
"-output-labels", "/mnt/output/labels.csv",
+ "-regions", *regionsFilename,
"-max-variants", fmt.Sprintf("%d", cmd.filter.MaxVariants),
"-min-coverage", fmt.Sprintf("%f", cmd.filter.MinCoverage),
"-max-tag", fmt.Sprintf("%d", cmd.filter.MaxTag),
log.Info("tidying")
tilelib.Tidy()
- if *annotationsFilename != "" {
- log.Infof("writing annotations")
- var annow io.WriteCloser
- annow, err = os.OpenFile(*annotationsFilename, os.O_CREATE|os.O_WRONLY, 0666)
- if err != nil {
- return 1
- }
- defer annow.Close()
- err = (&annotatecmd{maxTileSize: 5000}).exportTileDiffs(annow, tilelib)
- if err != nil {
- return 1
- }
- err = annow.Close()
- if err != nil {
- return 1
- }
- }
-
log.Info("building lowqual map")
lowqual := lowqual(tilelib)
names := cgnames(tilelib)
}
}
+ log.Info("determining which tiles intersect given regions")
+ dropTiles, err := chooseTiles(tilelib, *regionsFilename)
+ if err != nil {
+ return 1
+ }
+
+ if *annotationsFilename != "" {
+ log.Info("writing annotations")
+ var annow io.WriteCloser
+ annow, err = os.OpenFile(*annotationsFilename, os.O_CREATE|os.O_WRONLY, 0666)
+ if err != nil {
+ return 1
+ }
+ defer annow.Close()
+ err = (&annotatecmd{maxTileSize: 5000, dropTiles: dropTiles}).exportTileDiffs(annow, tilelib)
+ if err != nil {
+ return 1
+ }
+ err = annow.Close()
+ if err != nil {
+ return 1
+ }
+ }
+
chunksize := (len(tilelib.variant) + *chunks - 1) / *chunks
for chunk := 0; chunk < *chunks; chunk++ {
- log.Infof("preparing chunk %d of %d", chunk+1, *chunks+1)
+ log.Infof("preparing chunk %d of %d", chunk+1, *chunks)
tagstart := chunk * chunksize
tagend := tagstart + chunksize
if tagend > len(tilelib.variant) {
tagend = len(tilelib.variant)
}
- out, rows, cols := cgs2array(tilelib, names, lowqual, tagstart, tagend)
+ out, rows, cols := cgs2array(tilelib, names, lowqual, dropTiles, tagstart, tagend)
var npw *gonpy.NpyWriter
var output io.WriteCloser
return
}
-func cgs2array(tilelib *tileLibrary, names []string, lowqual []map[tileVariantID]bool, tagstart, tagend int) (data []int16, rows, cols int) {
+func cgs2array(tilelib *tileLibrary, names []string, lowqual []map[tileVariantID]bool, dropTiles []bool, tagstart, tagend int) (data []int16, rows, cols int) {
rows = len(tilelib.compactGenomes)
- cols = (tagend - tagstart) * 2
+ for tag := tagstart; tag < tagend; tag++ {
+ if len(dropTiles) <= tag || !dropTiles[tag] {
+ cols += 2
+ }
+ }
data = make([]int16, rows*cols)
for row, name := range names {
cg := tilelib.compactGenomes[name]
- if tagstart*2 >= len(cg) {
+ outidx := 0
+ for tag := tagstart; tag < tagend && tag*2+1 < len(cg); tag++ {
+ if len(dropTiles) > tag && dropTiles[tag] {
+ continue
+ }
+ for phase := 0; phase < 2; phase++ {
+ v := cg[tag*2+phase]
+ if v > 0 && lowqual[tag][v] {
+ data[row*cols+outidx] = -1
+ } else {
+ data[row*cols+outidx] = int16(v)
+ }
+ outidx++
+ }
+ }
+ }
+ return
+}
+
+func chooseTiles(tilelib *tileLibrary, regionsFilename string) (drop []bool, err error) {
+ if regionsFilename == "" {
+ return
+ }
+ rfile, err := zopen(regionsFilename)
+ if err != nil {
+ return
+ }
+ defer rfile.Close()
+ regions, err := ioutil.ReadAll(rfile)
+ if err != nil {
+ return
+ }
+
+ log.Print("chooseTiles: building mask")
+ mask := &mask{}
+ for _, line := range bytes.Split(regions, []byte{'\n'}) {
+ if bytes.HasPrefix(line, []byte{'#'}) {
continue
}
- cg = cg[tagstart*2:]
- if cols < len(cg) {
- cg = cg[:cols]
+ fields := bytes.Split(line, []byte{'\t'})
+ if len(fields) < 3 {
+ continue
}
- for i, v := range cg {
- if v > 0 && lowqual[tagstart+i/2][v] {
- data[row*cols+i] = -1
+ refseqname := string(fields[0])
+ if strings.HasPrefix(refseqname, "chr") {
+ refseqname = refseqname[3:]
+ }
+ start, err1 := strconv.Atoi(string(fields[1]))
+ end, err2 := strconv.Atoi(string(fields[2]))
+ if err1 == nil && err2 == nil {
+ // BED
+ } else {
+ start, err1 = strconv.Atoi(string(fields[3]))
+ end, err2 = strconv.Atoi(string(fields[4]))
+ if err1 == nil && err2 == nil {
+ // GFF/GTF
+ end++
} else {
- data[row*cols+i] = int16(v)
+ err = fmt.Errorf("cannot parse input line as BED or GFF/GTF: %q", line)
+ return
}
}
+ mask.Add(refseqname, start, end)
+ }
+ log.Print("chooseTiles: mask.Freeze")
+ mask.Freeze()
+
+ tagset := tilelib.taglib.Tags()
+ if len(tagset) == 0 {
+ err = errors.New("cannot choose tiles by region in a library without tags")
+ return
+ }
+ taglen := len(tagset[0])
+
+ log.Print("chooseTiles: check ref tiles")
+ // Find position+size of each reference tile, and if it
+ // intersects any of the desired regions, set drop[tag]=false.
+ //
+ // (Note it doesn't quite work to do the more obvious thing --
+ // start with drop=false and change to true when ref tiles
+ // intersect target regions -- because that would give us
+ // drop=false for tiles that don't appear at all in the
+ // reference.)
+ //
+ // TODO: (optionally?) don't drop tags for which some tile
+ // variants are spanning tiles, i.e., where the reference tile
+ // does not intersect the desired regions, but a spanning tile
+ // from a genome does.
+ drop = make([]bool, len(tilelib.variant))
+ for i := range drop {
+ drop[i] = true
}
+ for refname, refseqs := range tilelib.refseqs {
+ for refseqname, reftiles := range refseqs {
+ if strings.HasPrefix(refseqname, "chr") {
+ refseqname = refseqname[3:]
+ }
+ tileend := 0
+ for _, libref := range reftiles {
+ if libref.Variant < 1 {
+ err = fmt.Errorf("reference %q seq %q uses variant zero at tag %d", refname, refseqname, libref.Tag)
+ return
+ }
+ seq := tilelib.TileVariantSequence(libref)
+ if len(seq) < taglen {
+ err = fmt.Errorf("reference %q seq %q uses tile %d variant %d with sequence len %d < taglen %d", refname, refseqname, libref.Tag, libref.Variant, len(seq), taglen)
+ return
+ }
+ tilestart := tileend
+ tileend = tilestart + len(seq) - taglen
+ if mask.Check(refseqname, tilestart, tileend) {
+ drop[libref.Tag] = false
+ }
+ }
+ }
+ }
+
+ log.Print("chooseTiles: done")
return
}
func (s *exportSuite) TestFastaToNumpy(c *check.C) {
tmpdir := c.MkDir()
+ err := ioutil.WriteFile(tmpdir+"/chr1-12-100.bed", []byte("chr1\t12\t100\ttest.1\n"), 0644)
+ c.Check(err, check.IsNil)
+
var buffer bytes.Buffer
exited := (&importer{}).RunCommand("import", []string{"-local=true", "-tag-library", "testdata/tags", "-output-tiles", "-save-incomplete-tiles", "testdata/a.1.fasta", "testdata/tinyref.fasta"}, &bytes.Buffer{}, &buffer, os.Stderr)
c.Assert(exited, check.Equals, 0)
var output bytes.Buffer
- exited = (&exportNumpy{}).RunCommand("export-numpy", []string{"-local=true", "-output-annotations", tmpdir + "/annotations.csv"}, &buffer, &output, os.Stderr)
+ exited = (&exportNumpy{}).RunCommand("export-numpy", []string{"-local=true", "-output-annotations", tmpdir + "/annotations.csv", "-regions", tmpdir + "/chr1-12-100.bed"}, &buffer, &output, os.Stderr)
c.Check(exited, check.Equals, 0)
npy, err := gonpy.NewReader(&output)
c.Assert(err, check.IsNil)
variants, err := npy.GetInt16()
c.Assert(err, check.IsNil)
- for i := 0; i < 4; i += 2 {
+ c.Check(variants, check.HasLen, 6)
+ for i := 0; i < 4 && i < len(variants); i += 2 {
if variants[i] == 1 {
c.Check(variants[i+1], check.Equals, int16(2), check.Commentf("i=%d, v=%v", i, variants))
} else {
c.Check(variants[i], check.Equals, int16(2), check.Commentf("i=%d, v=%v", i, variants))
}
}
- for i := 4; i < 9; i += 2 {
+ for i := 4; i < 6 && i < len(variants); i += 2 {
c.Check(variants[i], check.Equals, int16(1), check.Commentf("i=%d, v=%v", i, variants))
}
annotations, err := ioutil.ReadFile(tmpdir + "/annotations.csv")
c.Check(err, check.IsNil)
- c.Check(string(annotations), check.Matches, `(?ms).*1,2,chr1:g.84_85insACTGCGATCTGA\n.*`)
- c.Check(string(annotations), check.Matches, `(?ms).*1,1,chr1:g.87_96delinsGCATCTGCA\n.*`)
+ c.Logf("%s", string(annotations))
+ c.Check(string(annotations), check.Matches, `(?ms)(.*\n)?1,1,2,chr1:g.84_85insACTGCGATCTGA\n.*`)
+ c.Check(string(annotations), check.Matches, `(?ms)(.*\n)?1,1,1,chr1:g.87_96delinsGCATCTGCA\n.*`)
}
func sortUints(variants []int16) {
--- /dev/null
+package lightning
+
+import (
+ "sort"
+)
+
+type interval struct {
+ start int
+ end int
+}
+
+type intervalTreeNode struct {
+ interval interval
+ maxend int
+}
+
+type intervalTree []intervalTreeNode
+
+type mask struct {
+ intervals map[string][]interval
+ itrees map[string]intervalTree
+ frozen bool
+}
+
+func (m *mask) Add(seqname string, start, end int) {
+ if m.intervals == nil {
+ m.intervals = map[string][]interval{}
+ }
+ m.intervals[seqname] = append(m.intervals[seqname], interval{start, end})
+}
+
+func (m *mask) Freeze() {
+ m.itrees = map[string]intervalTree{}
+ for seqname, intervals := range m.intervals {
+ m.itrees[seqname] = m.freeze(intervals)
+ }
+ m.frozen = true
+}
+
+func (m *mask) Check(seqname string, start, end int) bool {
+ if !m.frozen {
+ panic("bug: (*mask)Check() called before Freeze()")
+ }
+ return m.itrees[seqname].check(0, interval{start, end})
+}
+
+func (m *mask) freeze(in []interval) intervalTree {
+ if len(in) == 0 {
+ return nil
+ }
+ sort.Slice(in, func(i, j int) bool {
+ return in[i].start < in[j].start
+ })
+ itreesize := 1
+ for itreesize < len(in) {
+ itreesize = itreesize * 2
+ }
+ itree := make(intervalTree, itreesize)
+ itree.importSlice(0, in)
+ for i := len(in); i < itreesize; i++ {
+ itree[i].maxend = -1
+ }
+ return itree
+}
+
+func (itree intervalTree) check(root int, q interval) bool {
+ return root < len(itree) &&
+ itree[root].maxend >= q.start &&
+ ((itree[root].interval.start <= q.end && itree[root].interval.end >= q.start) ||
+ itree.check(root*2+1, q) ||
+ itree.check(root*2+2, q))
+}
+
+func (itree intervalTree) importSlice(root int, in []interval) int {
+ mid := len(in) / 2
+ node := intervalTreeNode{interval: in[mid], maxend: in[mid].end}
+ if mid > 0 {
+ end := itree.importSlice(root*2+1, in[0:mid])
+ if end > node.maxend {
+ node.maxend = end
+ }
+ }
+ if mid+1 < len(in) {
+ end := itree.importSlice(root*2+2, in[mid+1:])
+ if end > node.maxend {
+ node.maxend = end
+ }
+ }
+ itree[root] = node
+ return node.maxend
+}
--- /dev/null
+package lightning
+
+import (
+ "math/rand"
+ "testing"
+
+ "gopkg.in/check.v1"
+)
+
+type maskSuite struct{}
+
+var _ = check.Suite(&maskSuite{})
+
+func (s *maskSuite) TestMask(c *check.C) {
+ m := mask{}
+ for i := 0; i < 1000000; i++ {
+ start := rand.Int() % 100000
+ end := rand.Int()%100000 + start
+ if start <= 9000 && end >= 8000 ||
+ start <= 8 && end >= 4 ||
+ start <= 1 {
+ continue
+ }
+ m.Add("chr1", start, end)
+ }
+ m.Add("chr1", 1200, 3400)
+ m.Add("chr1", 5600, 7800)
+ m.Add("chr1", 5300, 7900)
+ m.Add("chr1", 9900, 9900)
+ m.Add("chr1", 1, 1)
+ m.Add("chr1", 0, 0)
+ m.Add("chr1", 2, 2)
+ m.Add("chr1", 9, 9)
+ m.Freeze()
+ c.Check(m.Check("chr1", 1, 1), check.Equals, true)
+ c.Check(m.Check("chr1", 4, 8), check.Equals, false)
+ c.Check(m.Check("chr1", 7800, 8000), check.Equals, true)
+ c.Check(m.Check("chr1", 8000, 9000), check.Equals, false)
+ c.Check(m.Check("chr1999", 1, 1), check.Equals, false)
+}
+
+func BenchmarkMask1000(b *testing.B) {
+ benchmarkMask(b, 1000)
+}
+
+func BenchmarkMask10000(b *testing.B) {
+ benchmarkMask(b, 10000)
+}
+
+func BenchmarkMask100000(b *testing.B) {
+ benchmarkMask(b, 100000)
+}
+
+func BenchmarkMask1000000(b *testing.B) {
+ benchmarkMask(b, 1000000)
+}
+
+func benchmarkMask(b *testing.B, size int) {
+ m := mask{}
+ for i := 0; i < size; i++ {
+ start := rand.Int() % 10000000
+ end := rand.Int()%300 + start
+ m.Add("chrB", start, end)
+ }
+ m.Freeze()
+ for n := 0; n < b.N; n++ {
+ start := rand.Int() % 10000000
+ end := rand.Int()%300 + start
+ m.Check("chrB", start, end)
+ }
+}
tilelib.Tidy()
log.Print("converting cgs to array")
- data, rows, cols := cgs2array(tilelib, cgnames(tilelib), lowqual(tilelib), 0, len(tilelib.variant))
+ data, rows, cols := cgs2array(tilelib, cgnames(tilelib), lowqual(tilelib), nil, 0, len(tilelib.variant))
if *onehot {
log.Printf("recode one-hot: %d rows, %d cols", rows, cols)
data, _, cols = recodeOnehot(data, cols)
c.Check(annotateout.Len() > 0, check.Equals, true)
sorted := sortLines(annotateout.String())
c.Logf("%s", sorted)
- c.Check(sorted, check.Equals, sortLines(`0,8d4fe9a63921b,chr1:g.161A>T
-0,8d4fe9a63921b,chr1:g.178A>T
-0,8d4fe9a63921b,chr1:g.1_3delinsGGC
-0,8d4fe9a63921b,chr1:g.222_224del
-0,ba4263ca4199c,chr1:g.1_3delinsGGC
-0,ba4263ca4199c,chr1:g.222_224del
-0,ba4263ca4199c,chr1:g.41_42delinsAA
-1,139890345dbb8,chr1:g.302_305delinsAAAA
-4,cbfca15d241d3,chr2:g.125_127delinsAAA
-4,cbfca15d241d3,chr2:g.1_3delinsAAA
-4,f5fafe9450b02,chr2:g.241_245delinsAAAAA
-4,f5fafe9450b02,chr2:g.291C>A
-4,fe9a71a42adb4,chr2:g.125_127delinsAAA
-6,e36dce85efbef,chr2:g.471_472delinsAA
-6,f81388b184f4a,chr2:g.470_472del
+ c.Check(sorted, check.Equals, sortLines(`0,0,8d4fe9a63921b,chr1:g.161A>T
+0,0,8d4fe9a63921b,chr1:g.178A>T
+0,0,8d4fe9a63921b,chr1:g.1_3delinsGGC
+0,0,8d4fe9a63921b,chr1:g.222_224del
+0,0,ba4263ca4199c,chr1:g.1_3delinsGGC
+0,0,ba4263ca4199c,chr1:g.222_224del
+0,0,ba4263ca4199c,chr1:g.41_42delinsAA
+1,1,139890345dbb8,chr1:g.302_305delinsAAAA
+4,4,cbfca15d241d3,chr2:g.125_127delinsAAA
+4,4,cbfca15d241d3,chr2:g.1_3delinsAAA
+4,4,f5fafe9450b02,chr2:g.241_245delinsAAAAA
+4,4,f5fafe9450b02,chr2:g.291C>A
+4,4,fe9a71a42adb4,chr2:g.125_127delinsAAA
+6,6,e36dce85efbef,chr2:g.471_472delinsAA
+6,6,f81388b184f4a,chr2:g.470_472del
`))
}