X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/2ce4195e773941a3f625e5e2d5f0a36aa082ecf2..66c140245907441b976f21cccc337d361d92b306:/exportnumpy.go diff --git a/exportnumpy.go b/exportnumpy.go index fb4df0801d..39f228a1b0 100644 --- a/exportnumpy.go +++ b/exportnumpy.go @@ -1,3 +1,7 @@ +// Copyright (C) The Lightning Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package lightning import ( @@ -12,12 +16,14 @@ import ( "net/http" _ "net/http/pprof" "os" - "path" "sort" "strconv" "strings" + "sync" + "sync/atomic" "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/arvados/lightning/hgvs" "github.com/kshedden/gonpy" "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" @@ -40,8 +46,8 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)") projectUUID := flags.String("project", "", "project `UUID` for output data") priority := flags.Int("priority", 500, "container request priority") - inputFilename := flags.String("i", "-", "input `file`") - outputFilename := flags.String("o", "-", "output `file`") + inputDir := flags.String("input-dir", "./in", "input `directory`") + outputDir := flags.String("output-dir", "./out", "output `directory`") annotationsFilename := flags.String("output-annotations", "", "output `file` for tile variant annotations csv") librefsFilename := flags.String("output-onehot2tilevar", "", "when using -one-hot, create csv `file` mapping column# to tag# and variant#") labelsFilename := flags.String("output-labels", "", "output `file` for genome labels csv") @@ -65,38 +71,33 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, } if !*runlocal { - if *outputFilename != "-" { - err = errors.New("cannot specify output file in container mode: not implemented") - return 1 - } runner := arvadosContainerRunner{ Name: "lightning export-numpy", Client: arvados.NewClientFromEnv(), ProjectUUID: *projectUUID, - RAM: 750000000000, - VCPUs: 32, + RAM: 500000000000, + VCPUs: 96, Priority: *priority, KeepCache: 1, APIAccess: true, } - err = runner.TranslatePaths(inputFilename, regionsFilename) + err = runner.TranslatePaths(inputDir, regionsFilename) if err != nil { return 1 } runner.Args = []string{"export-numpy", "-local=true", + "-pprof", ":6060", fmt.Sprintf("-one-hot=%v", *onehot), - "-i", *inputFilename, - "-o", "/mnt/output/matrix.npy", + "-input-dir", *inputDir, + "-output-dir", "/mnt/output", "-output-annotations", "/mnt/output/annotations.csv", "-output-onehot2tilevar", "/mnt/output/onehot2tilevar.csv", "-output-labels", "/mnt/output/labels.csv", "-regions", *regionsFilename, "-expand-regions", fmt.Sprintf("%d", *expandRegions), - "-max-variants", fmt.Sprintf("%d", cmd.filter.MaxVariants), - "-min-coverage", fmt.Sprintf("%f", cmd.filter.MinCoverage), - "-max-tag", fmt.Sprintf("%d", cmd.filter.MaxTag), "-chunks", fmt.Sprintf("%d", *chunks), } + runner.Args = append(runner.Args, cmd.filter.Args()...) var output string output, err = runner.Run() if err != nil { @@ -106,27 +107,12 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, return 0 } - var input io.ReadCloser - if *inputFilename == "-" { - input = ioutil.NopCloser(stdin) - } else { - input, err = open(*inputFilename) - if err != nil { - return 1 - } - defer input.Close() - } - input = ioutil.NopCloser(bufio.NewReaderSize(input, 8*1024*1024)) tilelib := &tileLibrary{ retainNoCalls: true, retainTileSequences: true, compactGenomes: map[string][]tileVariantID{}, } - err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), nil) - if err != nil { - return 1 - } - err = input.Close() + err = tilelib.LoadDir(context.Background(), *inputDir) if err != nil { return 1 } @@ -148,7 +134,7 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, return 1 } defer f.Close() - _, outBasename := path.Split(*outputFilename) + outBasename := "matrix.npy" for i, name := range names { _, err = fmt.Fprintf(f, "%d,%q,%q\n", i, trimFilenameForLabel(name), outBasename) if err != nil { @@ -169,6 +155,7 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, return 1 } + annotation2tvs := map[string]map[hgvs.Variant][]tileLibRef{} if *annotationsFilename != "" { log.Info("writing annotations") var annow io.WriteCloser @@ -177,7 +164,19 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, return 1 } defer annow.Close() - err = (&annotatecmd{maxTileSize: 5000, dropTiles: dropTiles}).exportTileDiffs(annow, tilelib) + var mtx sync.Mutex + err = (&annotatecmd{ + maxTileSize: 5000, + dropTiles: dropTiles, + reportAnnotation: func(tag tagID, _ int, variant tileVariantID, refname string, seqname string, pdi hgvs.Variant) { + mtx.Lock() + defer mtx.Unlock() + if annotation2tvs[seqname] == nil { + annotation2tvs[seqname] = map[hgvs.Variant][]tileLibRef{} + } + annotation2tvs[seqname][pdi] = append(annotation2tvs[seqname][pdi], tileLibRef{Tag: tag, Variant: variant}) + }, + }).exportTileDiffs(annow, tilelib) if err != nil { return 1 } @@ -187,6 +186,96 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, } } + var lastErr atomic.Value + var wg sync.WaitGroup + for seqname, pdivars := range annotation2tvs { + seqname, pdivars := seqname, pdivars + wg.Add(1) + go func() { + defer wg.Done() + log.Infof("choosing hgvs columns for seq %s", seqname) + var pdis []hgvs.Variant + for pdi, librefs := range pdivars { + // Include this HGVS column if it was + // seen in a variant of any + // non-dropped tile. + for _, libref := range librefs { + if int(libref.Tag) >= len(dropTiles) || !dropTiles[libref.Tag] { + pdis = append(pdis, pdi) + break + } + } + } + sort.Slice(pdis, func(i, j int) bool { + if cmp := pdis[i].Position - pdis[j].Position; cmp != 0 { + return cmp < 0 + } else if pdis[i].Ref != pdis[j].Ref { + return pdis[i].Ref < pdis[j].Ref + } else { + return pdis[i].New < pdis[j].New + } + }) + log.Infof("writing column labels for seq %s", seqname) + var buf bytes.Buffer + for _, pdi := range pdis { + fmt.Fprintf(&buf, "%s:g.%s\n", seqname, pdi.String()) + } + err := ioutil.WriteFile(*outputDir+"/"+seqname+".columns.csv", buf.Bytes(), 0777) + if err != nil { + lastErr.Store(err) + return + } + + log.Infof("building hgvs matrix for seq %s", seqname) + data := make([]int8, len(names)*len(pdis)*2) + for row, name := range names { + cg := tilelib.compactGenomes[name] + rowstart := row * len(pdis) * 2 + for col, pdi := range pdis { + for _, libref := range pdivars[pdi] { + if len(cg) <= int(libref.Tag)*2+1 { + continue + } + for phase := 0; phase < 2; phase++ { + if cg[int(libref.Tag)*2+phase] == libref.Variant { + data[rowstart+col*2+phase] = 1 + } + } + } + } + } + log.Infof("writing hgvs numpy for seq %s", seqname) + f, err := os.OpenFile(*outputDir+"/"+seqname+".npy", os.O_CREATE|os.O_WRONLY, 0777) + if err != nil { + lastErr.Store(err) + return + } + defer f.Close() + // gonpy closes our writer and ignores errors. Give it a nopCloser so we can close f properly. + npw, err := gonpy.NewWriter(nopCloser{f}) + if err != nil { + lastErr.Store(err) + return + } + npw.Shape = []int{len(names), len(pdis) * 2} + err = npw.WriteInt8(data) + if err != nil { + lastErr.Store(err) + return + } + err = f.Close() + if err != nil { + lastErr.Store(err) + return + } + }() + } + wg.Wait() + if e, ok := lastErr.Load().(error); ok { + err = e + return 1 + } + chunksize := (len(tilelib.variant) + *chunks - 1) / *chunks for chunk := 0; chunk < *chunks; chunk++ { log.Infof("preparing chunk %d of %d", chunk+1, *chunks) @@ -199,23 +288,15 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, var npw *gonpy.NpyWriter var output io.WriteCloser - fnm := *outputFilename - if fnm == "-" { - output = nopCloser{stdout} - } else { - if *chunks > 1 { - if strings.HasSuffix(fnm, ".npy") { - fnm = fmt.Sprintf("%s.%d.npy", fnm[:len(fnm)-4], chunk) - } else { - fnm = fmt.Sprintf("%s.%d", fnm, chunk) - } - } - output, err = os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0777) - if err != nil { - return 1 - } - defer output.Close() + fnm := *outputDir + "/matrix.npy" + if *chunks > 1 { + fnm = fmt.Sprintf("%s/matrix.%d.npy", *outputDir, chunk) + } + output, err = os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0777) + if err != nil { + return 1 } + defer output.Close() bufw := bufio.NewWriter(output) npw, err = gonpy.NewWriter(nopCloser{bufw}) if err != nil { @@ -282,7 +363,7 @@ func lowqual(tilelib *tileLibrary) (lowqual []map[tileVariantID]bool) { for tag, variants := range tilelib.variant { lq := lowqual[tag] for varidx, hash := range variants { - if len(tilelib.seq[hash]) == 0 { + if len(tilelib.hashSequence(hash)) == 0 { if lq == nil { lq = map[tileVariantID]bool{} lowqual[tag] = lq