From: Tom Clegg Date: Mon, 13 Sep 2021 15:23:23 +0000 (-0400) Subject: Generate annotations for slices. X-Git-Url: https://git.arvados.org/lightning.git/commitdiff_plain/a25b70629a19a866bff4ebcff08503abaa7ba2bc Generate annotations for slices. refs #17996 Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/slicenumpy.go b/slicenumpy.go index 601b35a960..55b3c0a140 100644 --- a/slicenumpy.go +++ b/slicenumpy.go @@ -6,6 +6,7 @@ package lightning import ( "bufio" + "bytes" "flag" "fmt" "io" @@ -17,6 +18,7 @@ import ( "strings" "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/arvados/lightning/hgvs" "github.com/kshedden/gonpy" "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" @@ -41,6 +43,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s priority := flags.Int("priority", 500, "container request priority") inputDir := flags.String("input-dir", "./in", "input `directory`") outputDir := flags.String("output-dir", "./out", "output `directory`") + ref := flags.String("ref", "", "reference name (if blank, choose last one that appears in input)") regionsFilename := flags.String("regions", "", "only output columns/annotations that intersect regions in specified bed `file`") expandRegions := flags.Int("expand-regions", 0, "expand specified regions by `N` base pairs on each side`") cmd.filter.Flags(flags) @@ -63,7 +66,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s Name: "lightning slice-numpy", Client: arvados.NewClientFromEnv(), ProjectUUID: *projectUUID, - RAM: 150000000000, + RAM: 250000000000, VCPUs: 32, Priority: *priority, KeepCache: 1, @@ -101,14 +104,20 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s sort.Strings(infiles) var cgnames []string - refseqs := map[string]map[string][]tileLibRef{} + var refseq map[string][]tileLibRef in0, err := open(infiles[0]) if err != nil { return 1 } + taglen := -1 DecodeLibrary(in0, strings.HasSuffix(infiles[0], ".gz"), func(ent *LibraryEntry) error { + if len(ent.TagSet) > 0 { + taglen = len(ent.TagSet[0]) + } for _, cseq := range ent.CompactSequences { - refseqs[cseq.Name] = cseq.TileSequences + if cseq.Name == *ref || *ref == "" { + refseq = cseq.TileSequences + } } for _, cg := range ent.CompactGenomes { cgnames = append(cgnames, cg.Name) @@ -119,6 +128,14 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s return 1 } in0.Close() + if refseq == nil { + err = fmt.Errorf("%s: reference sequence not found", infiles[0]) + return 1 + } + if taglen < 0 { + err = fmt.Errorf("tagset not found") + return 1 + } sort.Strings(cgnames) { @@ -145,12 +162,15 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s } log.Info("building list of reference tiles to load") // TODO: more efficient if we had saved all ref tiles in slice0 - reftiledata := map[tileLibRef]*[]byte{} - for _, ref := range refseqs { - for _, cseq := range ref { - for _, libref := range cseq { - reftiledata[libref] = new([]byte) - } + type reftileinfo struct { + seqname string // chr1 + pos int // distance from start of chr1 to start of tile + tiledata []byte // acgtggcaa... + } + reftile := map[tagID]*reftileinfo{} + for seqname, cseq := range refseq { + for _, libref := range cseq { + reftile[libref.Tag] = &reftileinfo{seqname: seqname} } } log.Info("loading reference tiles from all slices") @@ -166,9 +186,8 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s defer f.Close() return DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error { for _, tv := range ent.TileVariants { - libref := tileLibRef{tv.Tag, tv.Variant} - if dst, ok := reftiledata[libref]; ok { - *dst = tv.Sequence + if dst, ok := reftile[tv.Tag]; ok { + dst.tiledata = tv.Sequence } } return nil @@ -177,6 +196,22 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s } throttle.Wait() + log.Info("reconstructing reference sequences") + for seqname, cseq := range refseq { + seqname, cseq := seqname, cseq + throttle.Go(func() error { + defer log.Printf("... %s done", seqname) + pos := 0 + for _, libref := range cseq { + rt := reftile[libref.Tag] + rt.pos = pos + pos += len(rt.tiledata) - taglen + } + return nil + }) + } + throttle.Wait() + log.Info("TODO: determining which tiles intersect given regions") log.Info("generating annotations and numpy matrix for each slice") @@ -184,7 +219,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s infileIdx, infile := infileIdx, infile throttle.Go(func() error { defer log.Infof("%s: done", infile) - seq := map[tileLibRef][]byte{} + seq := map[tagID][][]byte{} cgs := make(map[string]CompactGenome, len(cgnames)) f, err := open(infile) if err != nil { @@ -193,7 +228,12 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s defer f.Close() err = DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error { for _, tv := range ent.TileVariants { - seq[tileLibRef{tv.Tag, tv.Variant}] = tv.Sequence + variants := seq[tv.Tag] + for len(variants) <= int(tv.Variant) { + variants = append(variants, nil) + } + variants[int(tv.Variant)] = tv.Sequence + seq[tv.Tag] = variants } for _, cg := range ent.CompactGenomes { cgs[cg.Name] = cg @@ -203,6 +243,8 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s if err != nil { return err } + tagstart := cgs[cgnames[0]].StartTag + tagend := cgs[cgnames[0]].EndTag log.Infof("TODO: %s: filtering", infile) log.Infof("TODO: %s: tidying", infile) @@ -210,30 +252,53 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s annotationsFilename := fmt.Sprintf("%s/matrix.%04d.annotations.csv", *outputDir, infileIdx) log.Infof("%s: writing annotations to %s", infile, annotationsFilename) - annow, err := os.Create(annotationsFilename) + annof, err := os.Create(annotationsFilename) if err != nil { return err } - // for libref, seq := range seq { - // // TODO: diff from ref. - // } - err = annow.Close() + annow := bufio.NewWriterSize(annof, 1<<20) + for tag, variants := range seq { + rt, ok := reftile[tag] + if !ok { + // Reference does not use any + // variant of this tile. + // TODO: log this? mention it + // in annotations? + continue + } + outcol := tag - tagID(tagstart) + reftilestr := strings.ToUpper(string(rt.tiledata)) + for v, seq := range variants { + if len(seq) == 0 || !bytes.HasSuffix(rt.tiledata, seq[len(seq)-taglen:]) { + continue + } + if lendiff := len(rt.tiledata) - len(seq); lendiff < -1000 || lendiff > 1000 { + continue + } + diffs, _ := hgvs.Diff(reftilestr, strings.ToUpper(string(seq)), 0) + for _, diff := range diffs { + diff.Position += rt.pos + fmt.Fprintf(annow, "%d,%d,%d,%s:g.%s\n", tag, outcol, v, rt.seqname, diff.String()) + } + } + } + err = annow.Flush() + if err != nil { + return err + } + err = annof.Close() if err != nil { return err } log.Infof("%s: preparing numpy", infile) - tagstart := cgs[cgnames[0]].StartTag - tagend := cgs[cgnames[0]].EndTag rows := len(cgnames) cols := 2 * int(tagend-tagstart) out := make([]int16, rows*cols) for row, name := range cgnames { out := out[row*cols:] for col, v := range cgs[name].Variants { - if v == 0 { - // out[col] = 0 - } else if _, ok := seq[tileLibRef{tagstart + tagID(col/2), v}]; ok { + if variants, ok := seq[tagstart+tagID(col/2)]; ok && len(variants) > int(v) && len(variants[v]) > 0 { out[col] = int16(v) } else { out[col] = -1