Write hgvs-based numpy matrix.
authorTom Clegg <tom@tomclegg.ca>
Mon, 14 Jun 2021 03:09:54 +0000 (23:09 -0400)
committerTom Clegg <tom@tomclegg.ca>
Mon, 14 Jun 2021 03:09:54 +0000 (23:09 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

annotate.go
arvados.go
export_test.go
exportnumpy.go
exportnumpy_test.go
pipeline_test.go

index b792119b20f84fd281639642a00d4d053a935d77..b1f20df98b72d83f9bda88888fa20791a46b4ea0 100644 (file)
@@ -23,10 +23,11 @@ import (
 )
 
 type annotatecmd struct {
-       dropTiles   []bool
-       variantHash bool
-       maxTileSize int
-       tag2tagid   map[string]tagID
+       dropTiles        []bool
+       variantHash      bool
+       maxTileSize      int
+       tag2tagid        map[string]tagID
+       reportAnnotation func(tag tagID, outcol int, variant tileVariantID, refname string, seqname string, pdi hgvs.Variant)
 }
 
 func (cmd *annotatecmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
@@ -242,13 +243,10 @@ func (cmd *annotatecmd) annotateSequence(throttle *throttle, outch chan<- string
                outcol := outcol
                refstart, ok := tilestart[tag]
                if !ok {
-                       // Tag didn't place on this
-                       // reference sequence. (It
-                       // might place on the same
-                       // chromosome in a genome
-                       // anyway, but we don't output
-                       // the annotations that would
-                       // result.)
+                       // Tag didn't place on this reference
+                       // sequence. (It might place on the same
+                       // chromosome in a genome anyway, but we don't
+                       // output the annotations that would result.)
                        // outch <- fmt.Sprintf("%d,%d,-1%s\n", tag, outcol, refnamefield)
                        continue
                }
@@ -299,6 +297,9 @@ func (cmd *annotatecmd) annotateSequence(throttle *throttle, outch chan<- string
                                                varid = fmt.Sprintf("%d", variant)
                                        }
                                        outch <- fmt.Sprintf("%d,%d,%s%s,%s:g.%s\n", tag, outcol, varid, refnamefield, seqname, diff.String())
+                                       if cmd.reportAnnotation != nil {
+                                               cmd.reportAnnotation(tag, outcol, variant, refname, seqname, diff)
+                                       }
                                }
                        }()
                }
index 811d512e65339bbfcade4f93121256d809b07be5..6d76aded264124ff33d6bee7c24ef39346325145 100644 (file)
@@ -11,7 +11,6 @@ import (
        "net/url"
        "os"
        "regexp"
-       "runtime"
        "strings"
        "sync"
        "time"
@@ -282,6 +281,7 @@ func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, e
        subscribedUUID := ""
        defer func() {
                if subscribedUUID != "" {
+                       log.Printf("unsubscribe container UUID: %s", subscribedUUID)
                        client.Unsubscribe(logch, subscribedUUID)
                }
        }()
@@ -305,8 +305,10 @@ func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, e
                        fmt.Fprint(os.Stderr, neednewline)
                        neednewline = ""
                        if subscribedUUID != "" {
+                               log.Printf("unsubscribe container UUID: %s", subscribedUUID)
                                client.Unsubscribe(logch, subscribedUUID)
                        }
+                       log.Printf("subscribe container UUID: %s", cr.ContainerUUID)
                        client.Subscribe(logch, cr.ContainerUUID)
                        subscribedUUID = cr.ContainerUUID
                }
@@ -504,6 +506,7 @@ func (gr gzipr) Close() error {
 
 var (
        arvadosClientFromEnv = arvados.NewClientFromEnv()
+       keepClient           *keepclient.KeepClient
        siteFS               arvados.CustomFileSystem
        siteFSMtx            sync.Mutex
 )
@@ -531,13 +534,13 @@ func open(fnm string) (io.ReadCloser, error) {
                        return nil, err
                }
                ac.Client = arvados.DefaultSecureClient
-               kc := keepclient.New(ac)
+               keepClient = keepclient.New(ac)
                // Don't use keepclient's default short timeouts.
-               kc.HTTPClient = arvados.DefaultSecureClient
-               // Guess max concurrent readers, hope to avoid cache
-               // thrashing.
-               kc.BlockCache = &keepclient.BlockCache{MaxBlocks: runtime.NumCPU() * 3}
-               siteFS = arvadosClientFromEnv.SiteFileSystem(kc)
+               keepClient.HTTPClient = arvados.DefaultSecureClient
+               keepClient.BlockCache = &keepclient.BlockCache{MaxBlocks: 4}
+               siteFS = arvadosClientFromEnv.SiteFileSystem(keepClient)
+       } else {
+               keepClient.BlockCache.MaxBlocks++
        }
 
        log.Infof("reading %q from %s using Arvados client", fnm[len(mnt):], uuid)
index 2a020b9eba272198a95733be3b31392b02015a24..7261f99ea3fb843e187826a82848fb3186d2b8fd 100644 (file)
@@ -31,7 +31,7 @@ func (s *exportSuite) TestFastaToHGVS(c *check.C) {
                "-ref=testdata/ref.fasta",
        }, &buffer, &output, os.Stderr)
        c.Check(exited, check.Equals, 0)
-       c.Check(output.String(), check.Equals, `chr1.1_3delinsGGC       1       0
+       c.Check(sortLines(output.String()), check.Equals, sortLines(`chr1.1_3delinsGGC  1       0
 chr1.41_42delinsAA     1       0
 chr1.161A>T    1       0
 chr1.178A>T    1       0
@@ -44,7 +44,7 @@ chr2.258_269delinsAA  1       0
 chr2.315C>A    1       0
 chr2.470_472del        1       0
 chr2.471_472delinsAA   1       0
-`)
+`))
        labels, err := ioutil.ReadFile(tmpdir + "/labels.csv")
        c.Check(err, check.IsNil)
        c.Check(string(labels), check.Equals, `0,"input1","-"
index fb4df0801db7982cd63a2503f0957dc91df18d68..8e2eeed230ab64285b287b180e7535e8609b0acc 100644 (file)
@@ -12,12 +12,14 @@ import (
        "net/http"
        _ "net/http/pprof"
        "os"
-       "path"
        "sort"
        "strconv"
        "strings"
+       "sync"
+       "sync/atomic"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/arvados/lightning/hgvs"
        "github.com/kshedden/gonpy"
        "github.com/sirupsen/logrus"
        log "github.com/sirupsen/logrus"
@@ -41,7 +43,7 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
        projectUUID := flags.String("project", "", "project `UUID` for output data")
        priority := flags.Int("priority", 500, "container request priority")
        inputFilename := flags.String("i", "-", "input `file`")
-       outputFilename := flags.String("o", "-", "output `file`")
+       outputDir := flags.String("output-dir", "/tmp", "output `directory`")
        annotationsFilename := flags.String("output-annotations", "", "output `file` for tile variant annotations csv")
        librefsFilename := flags.String("output-onehot2tilevar", "", "when using -one-hot, create csv `file` mapping column# to tag# and variant#")
        labelsFilename := flags.String("output-labels", "", "output `file` for genome labels csv")
@@ -65,10 +67,6 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
        }
 
        if !*runlocal {
-               if *outputFilename != "-" {
-                       err = errors.New("cannot specify output file in container mode: not implemented")
-                       return 1
-               }
                runner := arvadosContainerRunner{
                        Name:        "lightning export-numpy",
                        Client:      arvados.NewClientFromEnv(),
@@ -84,9 +82,10 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
                        return 1
                }
                runner.Args = []string{"export-numpy", "-local=true",
+                       "-pprof", ":6000",
                        fmt.Sprintf("-one-hot=%v", *onehot),
                        "-i", *inputFilename,
-                       "-o", "/mnt/output/matrix.npy",
+                       "-output-dir", "/mnt/output",
                        "-output-annotations", "/mnt/output/annotations.csv",
                        "-output-onehot2tilevar", "/mnt/output/onehot2tilevar.csv",
                        "-output-labels", "/mnt/output/labels.csv",
@@ -148,7 +147,7 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
                        return 1
                }
                defer f.Close()
-               _, outBasename := path.Split(*outputFilename)
+               outBasename := "matrix.npy"
                for i, name := range names {
                        _, err = fmt.Fprintf(f, "%d,%q,%q\n", i, trimFilenameForLabel(name), outBasename)
                        if err != nil {
@@ -169,6 +168,7 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
                return 1
        }
 
+       annotation2tvs := map[string]map[hgvs.Variant][]tileLibRef{}
        if *annotationsFilename != "" {
                log.Info("writing annotations")
                var annow io.WriteCloser
@@ -177,7 +177,19 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
                        return 1
                }
                defer annow.Close()
-               err = (&annotatecmd{maxTileSize: 5000, dropTiles: dropTiles}).exportTileDiffs(annow, tilelib)
+               var mtx sync.Mutex
+               err = (&annotatecmd{
+                       maxTileSize: 5000,
+                       dropTiles:   dropTiles,
+                       reportAnnotation: func(tag tagID, _ int, variant tileVariantID, refname string, seqname string, pdi hgvs.Variant) {
+                               mtx.Lock()
+                               defer mtx.Unlock()
+                               if annotation2tvs[seqname] == nil {
+                                       annotation2tvs[seqname] = map[hgvs.Variant][]tileLibRef{}
+                               }
+                               annotation2tvs[seqname][pdi] = append(annotation2tvs[seqname][pdi], tileLibRef{Tag: tag, Variant: variant})
+                       },
+               }).exportTileDiffs(annow, tilelib)
                if err != nil {
                        return 1
                }
@@ -187,6 +199,92 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
                }
        }
 
+       var lastErr atomic.Value
+       var wg sync.WaitGroup
+       for seqname, pdivars := range annotation2tvs {
+               seqname, pdivars := seqname, pdivars
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       log.Infof("choosing hgvs columns for seq %s", seqname)
+                       var pdis []hgvs.Variant
+                       for pdi, librefs := range pdivars {
+                               // Include this HGVS column if it was
+                               // seen in a variant of any
+                               // non-dropped tile.
+                               for _, libref := range librefs {
+                                       if int(libref.Tag) >= len(dropTiles) || !dropTiles[libref.Tag] {
+                                               pdis = append(pdis, pdi)
+                                               break
+                                       }
+                               }
+                       }
+                       sort.Slice(pdis, func(i, j int) bool {
+                               if cmp := pdis[i].Position - pdis[j].Position; cmp != 0 {
+                                       return cmp < 0
+                               } else if pdis[i].Ref != pdis[j].Ref {
+                                       return pdis[i].Ref < pdis[j].Ref
+                               } else {
+                                       return pdis[i].New < pdis[j].New
+                               }
+                       })
+                       log.Infof("writing column labels for seq %s", seqname)
+                       var buf bytes.Buffer
+                       for _, pdi := range pdis {
+                               fmt.Fprintf(&buf, "%s:g.%s\n", seqname, pdi.String())
+                       }
+                       err := ioutil.WriteFile(*outputDir+"/"+seqname+".columns.csv", buf.Bytes(), 0777)
+                       if err != nil {
+                               lastErr.Store(err)
+                               return
+                       }
+
+                       log.Infof("building hgvs matrix for seq %s", seqname)
+                       data := make([]int8, len(names)*len(pdis)*2)
+                       for row, name := range names {
+                               cg := tilelib.compactGenomes[name]
+                               rowstart := row * len(pdis) * 2
+                               for col, pdi := range pdis {
+                                       for _, libref := range pdivars[pdi] {
+                                               if len(cg) <= int(libref.Tag)*2+1 {
+                                                       continue
+                                               }
+                                               for phase := 0; phase < 2; phase++ {
+                                                       if cg[int(libref.Tag)*2+phase] == libref.Variant {
+                                                               data[rowstart+col*2+phase] = 1
+                                                       }
+                                               }
+                                       }
+                               }
+                       }
+                       log.Infof("writing hgvs numpy for seq %s", seqname)
+                       f, err := os.OpenFile(*outputDir+"/"+seqname+".npy", os.O_CREATE|os.O_WRONLY, 0777)
+                       if err != nil {
+                               lastErr.Store(err)
+                               return
+                       }
+                       defer f.Close()
+                       npw, err := gonpy.NewWriter(f)
+                       if err != nil {
+                               lastErr.Store(err)
+                               return
+                       }
+                       npw.Shape = []int{len(names), len(pdis) * 2}
+                       npw.WriteInt8(data)
+                       // gonpy closes f and ignores errors, doh.
+                       // err = f.Close()
+                       // if err != nil {
+                       //      lastErr.Store(err)
+                       //      return
+                       // }
+               }()
+       }
+       wg.Wait()
+       if e, ok := lastErr.Load().(error); ok {
+               err = e
+               return 1
+       }
+
        chunksize := (len(tilelib.variant) + *chunks - 1) / *chunks
        for chunk := 0; chunk < *chunks; chunk++ {
                log.Infof("preparing chunk %d of %d", chunk+1, *chunks)
@@ -199,23 +297,15 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
 
                var npw *gonpy.NpyWriter
                var output io.WriteCloser
-               fnm := *outputFilename
-               if fnm == "-" {
-                       output = nopCloser{stdout}
-               } else {
-                       if *chunks > 1 {
-                               if strings.HasSuffix(fnm, ".npy") {
-                                       fnm = fmt.Sprintf("%s.%d.npy", fnm[:len(fnm)-4], chunk)
-                               } else {
-                                       fnm = fmt.Sprintf("%s.%d", fnm, chunk)
-                               }
-                       }
-                       output, err = os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0777)
-                       if err != nil {
-                               return 1
-                       }
-                       defer output.Close()
+               fnm := *outputDir + "/matrix.npy"
+               if *chunks > 1 {
+                       fnm = fmt.Sprintf("%s/matrix.%d.npy", *outputDir, chunk)
+               }
+               output, err = os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0777)
+               if err != nil {
+                       return 1
                }
+               defer output.Close()
                bufw := bufio.NewWriter(output)
                npw, err = gonpy.NewWriter(nopCloser{bufw})
                if err != nil {
index 5011df01b44fd72b19125f4dbc6186bcb77e8291..44b0c6609d6cd334f6a0aad94cddb8b95c1bb3a1 100644 (file)
@@ -22,10 +22,12 @@ func (s *exportNumpySuite) TestFastaToNumpy(c *check.C) {
        var buffer bytes.Buffer
        exited := (&importer{}).RunCommand("import", []string{"-local=true", "-tag-library", "testdata/tags", "-output-tiles", "-save-incomplete-tiles", "testdata/a.1.fasta", "testdata/tinyref.fasta"}, &bytes.Buffer{}, &buffer, os.Stderr)
        c.Assert(exited, check.Equals, 0)
-       var output bytes.Buffer
-       exited = (&exportNumpy{}).RunCommand("export-numpy", []string{"-local=true", "-output-annotations", tmpdir + "/annotations.csv", "-regions", tmpdir + "/chr1-12-100.bed"}, &buffer, &output, os.Stderr)
+       exited = (&exportNumpy{}).RunCommand("export-numpy", []string{"-local=true", "-output-dir", tmpdir, "-output-annotations", tmpdir + "/annotations.csv", "-regions", tmpdir + "/chr1-12-100.bed"}, &buffer, os.Stderr, os.Stderr)
        c.Check(exited, check.Equals, 0)
-       npy, err := gonpy.NewReader(&output)
+       f, err := os.Open(tmpdir + "/matrix.npy")
+       c.Assert(err, check.IsNil)
+       defer f.Close()
+       npy, err := gonpy.NewReader(f)
        c.Assert(err, check.IsNil)
        variants, err := npy.GetInt16()
        c.Assert(err, check.IsNil)
index 6735cd802221f4998c91723973e2cfce097b6611..67fff89fb6717405858e37858cf6720a74313fcf 100644 (file)
@@ -90,7 +90,7 @@ func (s *pipelineSuite) TestImportMerge(c *check.C) {
        c.Check(code, check.Equals, 0)
        c.Check(hgvsout.Len() > 0, check.Equals, true)
        c.Logf("%s", hgvsout.String())
-       c.Check(hgvsout.String(), check.Equals, `chr1:g.1_3delinsGGC    .
+       c.Check(sortLines(hgvsout.String()), check.Equals, sortLines(`chr1:g.1_3delinsGGC       .
 chr1:g.[41_42delinsAA];[41=]   .
 chr1:g.[161=];[161A>T] .
 chr1:g.[178=];[178A>T] .
@@ -103,14 +103,14 @@ chr2:g.[258_269delinsAA];[258=]   .
 chr2:g.[315C>A];[315=] .
 chr2:g.[470_472del];[470=]     .
 chr2:g.[471=];[471_472delinsAA]        .
-`)
+`))
 
        vcfout := &bytes.Buffer{}
        code = (&exporter{}).RunCommand("lightning export", []string{"-local", "-ref", "testdata/ref.fasta", "-output-format", "vcf", "-i", tmpdir + "/merged.gob", "-output-bed", tmpdir + "/export.bed"}, bytes.NewReader(nil), vcfout, os.Stderr)
        c.Check(code, check.Equals, 0)
        c.Check(vcfout.Len() > 0, check.Equals, true)
        c.Logf("%s", vcfout.String())
-       c.Check(vcfout.String(), check.Equals, `chr1    1       NNN     GGC     1/1     0/0
+       c.Check(sortLines(vcfout.String()), check.Equals, sortLines(`chr1       1       NNN     GGC     1/1     0/0
 chr1   41      TT      AA      1/0     0/0
 chr1   161     A       T       0/1     0/0
 chr1   178     A       T       0/1     0/0
@@ -123,11 +123,11 @@ chr2      258     CCTTGTATTTTT    AA      1/0     0/0
 chr2   315     C       A       1/0     0/0
 chr2   469     GTGG    G       1/0     0/0
 chr2   471     GG      AA      0/1     0/0
-`)
+`))
        bedout, err := ioutil.ReadFile(tmpdir + "/export.bed")
        c.Check(err, check.IsNil)
        c.Logf("%s", string(bedout))
-       c.Check(string(bedout), check.Equals, `chr1 0 248 0 1000 . 0 224
+       c.Check(sortLines(string(bedout)), check.Equals, sortLines(`chr1 0 248 0 1000 . 0 224
 chr1 224 372 1 1000 . 248 348
 chr1 348 496 2 1000 . 372 472
 chr1 472 572 3 1000 . 496 572
@@ -135,7 +135,7 @@ chr2 0 248 4 1000 . 0 224
 chr2 224 372 5 750 . 248 348
 chr2 348 496 6 1000 . 372 472
 chr2 472 572 7 1000 . 496 572
-`)
+`))
 
        annotateout := &bytes.Buffer{}
        code = (&annotatecmd{}).RunCommand("lightning annotate", []string{"-local", "-variant-hash=true", "-i", tmpdir + "/merged.gob"}, bytes.NewReader(nil), annotateout, os.Stderr)