From 896220cbd21811433f6db068f559677927e56757 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Sun, 13 Jun 2021 23:09:54 -0400 Subject: [PATCH] Write hgvs-based numpy matrix. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- annotate.go | 23 ++++---- arvados.go | 17 +++--- export_test.go | 4 +- exportnumpy.go | 140 ++++++++++++++++++++++++++++++++++++-------- exportnumpy_test.go | 8 ++- pipeline_test.go | 12 ++-- 6 files changed, 150 insertions(+), 54 deletions(-) diff --git a/annotate.go b/annotate.go index b792119b20..b1f20df98b 100644 --- a/annotate.go +++ b/annotate.go @@ -23,10 +23,11 @@ import ( ) type annotatecmd struct { - dropTiles []bool - variantHash bool - maxTileSize int - tag2tagid map[string]tagID + dropTiles []bool + variantHash bool + maxTileSize int + tag2tagid map[string]tagID + reportAnnotation func(tag tagID, outcol int, variant tileVariantID, refname string, seqname string, pdi hgvs.Variant) } func (cmd *annotatecmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { @@ -242,13 +243,10 @@ func (cmd *annotatecmd) annotateSequence(throttle *throttle, outch chan<- string outcol := outcol refstart, ok := tilestart[tag] if !ok { - // Tag didn't place on this - // reference sequence. (It - // might place on the same - // chromosome in a genome - // anyway, but we don't output - // the annotations that would - // result.) + // Tag didn't place on this reference + // sequence. (It might place on the same + // chromosome in a genome anyway, but we don't + // output the annotations that would result.) // outch <- fmt.Sprintf("%d,%d,-1%s\n", tag, outcol, refnamefield) continue } @@ -299,6 +297,9 @@ func (cmd *annotatecmd) annotateSequence(throttle *throttle, outch chan<- string varid = fmt.Sprintf("%d", variant) } outch <- fmt.Sprintf("%d,%d,%s%s,%s:g.%s\n", tag, outcol, varid, refnamefield, seqname, diff.String()) + if cmd.reportAnnotation != nil { + cmd.reportAnnotation(tag, outcol, variant, refname, seqname, diff) + } } }() } diff --git a/arvados.go b/arvados.go index 811d512e65..6d76aded26 100644 --- a/arvados.go +++ b/arvados.go @@ -11,7 +11,6 @@ import ( "net/url" "os" "regexp" - "runtime" "strings" "sync" "time" @@ -282,6 +281,7 @@ func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, e subscribedUUID := "" defer func() { if subscribedUUID != "" { + log.Printf("unsubscribe container UUID: %s", subscribedUUID) client.Unsubscribe(logch, subscribedUUID) } }() @@ -305,8 +305,10 @@ func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, e fmt.Fprint(os.Stderr, neednewline) neednewline = "" if subscribedUUID != "" { + log.Printf("unsubscribe container UUID: %s", subscribedUUID) client.Unsubscribe(logch, subscribedUUID) } + log.Printf("subscribe container UUID: %s", cr.ContainerUUID) client.Subscribe(logch, cr.ContainerUUID) subscribedUUID = cr.ContainerUUID } @@ -504,6 +506,7 @@ func (gr gzipr) Close() error { var ( arvadosClientFromEnv = arvados.NewClientFromEnv() + keepClient *keepclient.KeepClient siteFS arvados.CustomFileSystem siteFSMtx sync.Mutex ) @@ -531,13 +534,13 @@ func open(fnm string) (io.ReadCloser, error) { return nil, err } ac.Client = arvados.DefaultSecureClient - kc := keepclient.New(ac) + keepClient = keepclient.New(ac) // Don't use keepclient's default short timeouts. - kc.HTTPClient = arvados.DefaultSecureClient - // Guess max concurrent readers, hope to avoid cache - // thrashing. - kc.BlockCache = &keepclient.BlockCache{MaxBlocks: runtime.NumCPU() * 3} - siteFS = arvadosClientFromEnv.SiteFileSystem(kc) + keepClient.HTTPClient = arvados.DefaultSecureClient + keepClient.BlockCache = &keepclient.BlockCache{MaxBlocks: 4} + siteFS = arvadosClientFromEnv.SiteFileSystem(keepClient) + } else { + keepClient.BlockCache.MaxBlocks++ } log.Infof("reading %q from %s using Arvados client", fnm[len(mnt):], uuid) diff --git a/export_test.go b/export_test.go index 2a020b9eba..7261f99ea3 100644 --- a/export_test.go +++ b/export_test.go @@ -31,7 +31,7 @@ func (s *exportSuite) TestFastaToHGVS(c *check.C) { "-ref=testdata/ref.fasta", }, &buffer, &output, os.Stderr) c.Check(exited, check.Equals, 0) - c.Check(output.String(), check.Equals, `chr1.1_3delinsGGC 1 0 + c.Check(sortLines(output.String()), check.Equals, sortLines(`chr1.1_3delinsGGC 1 0 chr1.41_42delinsAA 1 0 chr1.161A>T 1 0 chr1.178A>T 1 0 @@ -44,7 +44,7 @@ chr2.258_269delinsAA 1 0 chr2.315C>A 1 0 chr2.470_472del 1 0 chr2.471_472delinsAA 1 0 -`) +`)) labels, err := ioutil.ReadFile(tmpdir + "/labels.csv") c.Check(err, check.IsNil) c.Check(string(labels), check.Equals, `0,"input1","-" diff --git a/exportnumpy.go b/exportnumpy.go index fb4df0801d..8e2eeed230 100644 --- a/exportnumpy.go +++ b/exportnumpy.go @@ -12,12 +12,14 @@ import ( "net/http" _ "net/http/pprof" "os" - "path" "sort" "strconv" "strings" + "sync" + "sync/atomic" "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/arvados/lightning/hgvs" "github.com/kshedden/gonpy" "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" @@ -41,7 +43,7 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, projectUUID := flags.String("project", "", "project `UUID` for output data") priority := flags.Int("priority", 500, "container request priority") inputFilename := flags.String("i", "-", "input `file`") - outputFilename := flags.String("o", "-", "output `file`") + outputDir := flags.String("output-dir", "/tmp", "output `directory`") annotationsFilename := flags.String("output-annotations", "", "output `file` for tile variant annotations csv") librefsFilename := flags.String("output-onehot2tilevar", "", "when using -one-hot, create csv `file` mapping column# to tag# and variant#") labelsFilename := flags.String("output-labels", "", "output `file` for genome labels csv") @@ -65,10 +67,6 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, } if !*runlocal { - if *outputFilename != "-" { - err = errors.New("cannot specify output file in container mode: not implemented") - return 1 - } runner := arvadosContainerRunner{ Name: "lightning export-numpy", Client: arvados.NewClientFromEnv(), @@ -84,9 +82,10 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, return 1 } runner.Args = []string{"export-numpy", "-local=true", + "-pprof", ":6000", fmt.Sprintf("-one-hot=%v", *onehot), "-i", *inputFilename, - "-o", "/mnt/output/matrix.npy", + "-output-dir", "/mnt/output", "-output-annotations", "/mnt/output/annotations.csv", "-output-onehot2tilevar", "/mnt/output/onehot2tilevar.csv", "-output-labels", "/mnt/output/labels.csv", @@ -148,7 +147,7 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, return 1 } defer f.Close() - _, outBasename := path.Split(*outputFilename) + outBasename := "matrix.npy" for i, name := range names { _, err = fmt.Fprintf(f, "%d,%q,%q\n", i, trimFilenameForLabel(name), outBasename) if err != nil { @@ -169,6 +168,7 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, return 1 } + annotation2tvs := map[string]map[hgvs.Variant][]tileLibRef{} if *annotationsFilename != "" { log.Info("writing annotations") var annow io.WriteCloser @@ -177,7 +177,19 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, return 1 } defer annow.Close() - err = (&annotatecmd{maxTileSize: 5000, dropTiles: dropTiles}).exportTileDiffs(annow, tilelib) + var mtx sync.Mutex + err = (&annotatecmd{ + maxTileSize: 5000, + dropTiles: dropTiles, + reportAnnotation: func(tag tagID, _ int, variant tileVariantID, refname string, seqname string, pdi hgvs.Variant) { + mtx.Lock() + defer mtx.Unlock() + if annotation2tvs[seqname] == nil { + annotation2tvs[seqname] = map[hgvs.Variant][]tileLibRef{} + } + annotation2tvs[seqname][pdi] = append(annotation2tvs[seqname][pdi], tileLibRef{Tag: tag, Variant: variant}) + }, + }).exportTileDiffs(annow, tilelib) if err != nil { return 1 } @@ -187,6 +199,92 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, } } + var lastErr atomic.Value + var wg sync.WaitGroup + for seqname, pdivars := range annotation2tvs { + seqname, pdivars := seqname, pdivars + wg.Add(1) + go func() { + defer wg.Done() + log.Infof("choosing hgvs columns for seq %s", seqname) + var pdis []hgvs.Variant + for pdi, librefs := range pdivars { + // Include this HGVS column if it was + // seen in a variant of any + // non-dropped tile. + for _, libref := range librefs { + if int(libref.Tag) >= len(dropTiles) || !dropTiles[libref.Tag] { + pdis = append(pdis, pdi) + break + } + } + } + sort.Slice(pdis, func(i, j int) bool { + if cmp := pdis[i].Position - pdis[j].Position; cmp != 0 { + return cmp < 0 + } else if pdis[i].Ref != pdis[j].Ref { + return pdis[i].Ref < pdis[j].Ref + } else { + return pdis[i].New < pdis[j].New + } + }) + log.Infof("writing column labels for seq %s", seqname) + var buf bytes.Buffer + for _, pdi := range pdis { + fmt.Fprintf(&buf, "%s:g.%s\n", seqname, pdi.String()) + } + err := ioutil.WriteFile(*outputDir+"/"+seqname+".columns.csv", buf.Bytes(), 0777) + if err != nil { + lastErr.Store(err) + return + } + + log.Infof("building hgvs matrix for seq %s", seqname) + data := make([]int8, len(names)*len(pdis)*2) + for row, name := range names { + cg := tilelib.compactGenomes[name] + rowstart := row * len(pdis) * 2 + for col, pdi := range pdis { + for _, libref := range pdivars[pdi] { + if len(cg) <= int(libref.Tag)*2+1 { + continue + } + for phase := 0; phase < 2; phase++ { + if cg[int(libref.Tag)*2+phase] == libref.Variant { + data[rowstart+col*2+phase] = 1 + } + } + } + } + } + log.Infof("writing hgvs numpy for seq %s", seqname) + f, err := os.OpenFile(*outputDir+"/"+seqname+".npy", os.O_CREATE|os.O_WRONLY, 0777) + if err != nil { + lastErr.Store(err) + return + } + defer f.Close() + npw, err := gonpy.NewWriter(f) + if err != nil { + lastErr.Store(err) + return + } + npw.Shape = []int{len(names), len(pdis) * 2} + npw.WriteInt8(data) + // gonpy closes f and ignores errors, doh. + // err = f.Close() + // if err != nil { + // lastErr.Store(err) + // return + // } + }() + } + wg.Wait() + if e, ok := lastErr.Load().(error); ok { + err = e + return 1 + } + chunksize := (len(tilelib.variant) + *chunks - 1) / *chunks for chunk := 0; chunk < *chunks; chunk++ { log.Infof("preparing chunk %d of %d", chunk+1, *chunks) @@ -199,23 +297,15 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, var npw *gonpy.NpyWriter var output io.WriteCloser - fnm := *outputFilename - if fnm == "-" { - output = nopCloser{stdout} - } else { - if *chunks > 1 { - if strings.HasSuffix(fnm, ".npy") { - fnm = fmt.Sprintf("%s.%d.npy", fnm[:len(fnm)-4], chunk) - } else { - fnm = fmt.Sprintf("%s.%d", fnm, chunk) - } - } - output, err = os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0777) - if err != nil { - return 1 - } - defer output.Close() + fnm := *outputDir + "/matrix.npy" + if *chunks > 1 { + fnm = fmt.Sprintf("%s/matrix.%d.npy", *outputDir, chunk) + } + output, err = os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0777) + if err != nil { + return 1 } + defer output.Close() bufw := bufio.NewWriter(output) npw, err = gonpy.NewWriter(nopCloser{bufw}) if err != nil { diff --git a/exportnumpy_test.go b/exportnumpy_test.go index 5011df01b4..44b0c6609d 100644 --- a/exportnumpy_test.go +++ b/exportnumpy_test.go @@ -22,10 +22,12 @@ func (s *exportNumpySuite) TestFastaToNumpy(c *check.C) { var buffer bytes.Buffer exited := (&importer{}).RunCommand("import", []string{"-local=true", "-tag-library", "testdata/tags", "-output-tiles", "-save-incomplete-tiles", "testdata/a.1.fasta", "testdata/tinyref.fasta"}, &bytes.Buffer{}, &buffer, os.Stderr) c.Assert(exited, check.Equals, 0) - var output bytes.Buffer - exited = (&exportNumpy{}).RunCommand("export-numpy", []string{"-local=true", "-output-annotations", tmpdir + "/annotations.csv", "-regions", tmpdir + "/chr1-12-100.bed"}, &buffer, &output, os.Stderr) + exited = (&exportNumpy{}).RunCommand("export-numpy", []string{"-local=true", "-output-dir", tmpdir, "-output-annotations", tmpdir + "/annotations.csv", "-regions", tmpdir + "/chr1-12-100.bed"}, &buffer, os.Stderr, os.Stderr) c.Check(exited, check.Equals, 0) - npy, err := gonpy.NewReader(&output) + f, err := os.Open(tmpdir + "/matrix.npy") + c.Assert(err, check.IsNil) + defer f.Close() + npy, err := gonpy.NewReader(f) c.Assert(err, check.IsNil) variants, err := npy.GetInt16() c.Assert(err, check.IsNil) diff --git a/pipeline_test.go b/pipeline_test.go index 6735cd8022..67fff89fb6 100644 --- a/pipeline_test.go +++ b/pipeline_test.go @@ -90,7 +90,7 @@ func (s *pipelineSuite) TestImportMerge(c *check.C) { c.Check(code, check.Equals, 0) c.Check(hgvsout.Len() > 0, check.Equals, true) c.Logf("%s", hgvsout.String()) - c.Check(hgvsout.String(), check.Equals, `chr1:g.1_3delinsGGC . + c.Check(sortLines(hgvsout.String()), check.Equals, sortLines(`chr1:g.1_3delinsGGC . chr1:g.[41_42delinsAA];[41=] . chr1:g.[161=];[161A>T] . chr1:g.[178=];[178A>T] . @@ -103,14 +103,14 @@ chr2:g.[258_269delinsAA];[258=] . chr2:g.[315C>A];[315=] . chr2:g.[470_472del];[470=] . chr2:g.[471=];[471_472delinsAA] . -`) +`)) vcfout := &bytes.Buffer{} code = (&exporter{}).RunCommand("lightning export", []string{"-local", "-ref", "testdata/ref.fasta", "-output-format", "vcf", "-i", tmpdir + "/merged.gob", "-output-bed", tmpdir + "/export.bed"}, bytes.NewReader(nil), vcfout, os.Stderr) c.Check(code, check.Equals, 0) c.Check(vcfout.Len() > 0, check.Equals, true) c.Logf("%s", vcfout.String()) - c.Check(vcfout.String(), check.Equals, `chr1 1 NNN GGC 1/1 0/0 + c.Check(sortLines(vcfout.String()), check.Equals, sortLines(`chr1 1 NNN GGC 1/1 0/0 chr1 41 TT AA 1/0 0/0 chr1 161 A T 0/1 0/0 chr1 178 A T 0/1 0/0 @@ -123,11 +123,11 @@ chr2 258 CCTTGTATTTTT AA 1/0 0/0 chr2 315 C A 1/0 0/0 chr2 469 GTGG G 1/0 0/0 chr2 471 GG AA 0/1 0/0 -`) +`)) bedout, err := ioutil.ReadFile(tmpdir + "/export.bed") c.Check(err, check.IsNil) c.Logf("%s", string(bedout)) - c.Check(string(bedout), check.Equals, `chr1 0 248 0 1000 . 0 224 + c.Check(sortLines(string(bedout)), check.Equals, sortLines(`chr1 0 248 0 1000 . 0 224 chr1 224 372 1 1000 . 248 348 chr1 348 496 2 1000 . 372 472 chr1 472 572 3 1000 . 496 572 @@ -135,7 +135,7 @@ chr2 0 248 4 1000 . 0 224 chr2 224 372 5 750 . 248 348 chr2 348 496 6 1000 . 372 472 chr2 472 572 7 1000 . 496 572 -`) +`)) annotateout := &bytes.Buffer{} code = (&annotatecmd{}).RunCommand("lightning annotate", []string{"-local", "-variant-hash=true", "-i", tmpdir + "/merged.gob"}, bytes.NewReader(nil), annotateout, os.Stderr) -- 2.30.2