From 30299451e290f999306373ff6a7ddfe58f6cfd28 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Thu, 23 Sep 2021 15:52:42 -0400 Subject: [PATCH] =?utf8?q?Accept=20multiple=20input=20libraries=20for=20sl?= =?utf8?q?ice=E2=86=92slicenumpy.?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit refs #17966 Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- slice.go | 62 +++++++++++++++++++++++++++++++++--------- slice_test.go | 40 +++++++++++++++++++--------- slicenumpy.go | 74 ++++++++++++++++++++++++++++++--------------------- 3 files changed, 119 insertions(+), 57 deletions(-) diff --git a/slice.go b/slice.go index 10e5e01bc2..be5bb87904 100644 --- a/slice.go +++ b/slice.go @@ -7,12 +7,14 @@ package lightning import ( "bufio" "encoding/gob" + "errors" "flag" "fmt" "io" "net/http" _ "net/http/pprof" "os" + "path/filepath" "runtime" "strings" "sync" @@ -38,7 +40,6 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)") projectUUID := flags.String("project", "", "project `UUID` for output data") priority := flags.Int("priority", 500, "container request priority") - inputDir := flags.String("input-dir", "./in", "input `directory`") outputDir := flags.String("output-dir", "./out", "output `directory`") tagsPerFile := flags.Int("tags-per-file", 50000, "tags per file (nfiles will be ~10M÷x)") err = flags.Parse(args) @@ -48,6 +49,11 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std } else if err != nil { return 2 } + inputDirs := flags.Args() + if len(inputDirs) == 0 { + err = errors.New("no input dirs specified") + return 2 + } if *pprof != "" { go func() { @@ -66,15 +72,16 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std KeepCache: 50, APIAccess: true, } - err = runner.TranslatePaths(inputDir) - if err != nil { - return 1 + for i := range inputDirs { + err = runner.TranslatePaths(&inputDirs[i]) + if err != nil { + return 1 + } } - runner.Args = []string{"slice", "-local=true", + runner.Args = append([]string{"slice", "-local=true", "-pprof", ":6060", - "-input-dir", *inputDir, "-output-dir", "/mnt/output", - } + }, inputDirs...) var output string output, err = runner.Run() if err != nil { @@ -84,7 +91,7 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std return 0 } - err = Slice(*outputDir, *inputDir, *tagsPerFile) + err = Slice(*tagsPerFile, *outputDir, inputDirs) if err != nil { return 1 } @@ -93,11 +100,25 @@ func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, std // Read tags+tiles+genomes from srcdir, write to dstdir with (up to) // the specified number of tags per file. -func Slice(dstdir, srcdir string, tagsPerFile int) error { - infiles, err := allGobFiles(srcdir) - if err != nil { - return err +func Slice(tagsPerFile int, dstdir string, srcdirs []string) error { + var infiles []string + for _, srcdir := range srcdirs { + files, err := allGobFiles(srcdir) + if err != nil { + return err + } + infiles = append(infiles, files...) } + // dirNamespace[dir] is an int in [0,len(dirNamespace)), used below to + // namespace variant numbers from different dirs. + dirNamespace := map[string]int{} + for _, path := range infiles { + dir, _ := filepath.Split(path) + if _, ok := dirNamespace[dir]; !ok { + dirNamespace[dir] = len(dirNamespace) + } + } + namespaces := len(dirNamespace) var ( tagset [][]byte @@ -124,7 +145,9 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error { return } defer f.Close() - log.Printf("reading %s", path) + dir, _ := filepath.Split(path) + namespace := dirNamespace[dir] + log.Printf("reading %s (namespace %d)", path, namespace) err = DecodeLibrary(f, strings.HasSuffix(path, ".gz"), func(ent *LibraryEntry) error { if err := throttle.Err(); err != nil { return err @@ -152,6 +175,7 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error { } atomic.AddInt64(&countTileVariants, int64(len(ent.TileVariants))) for _, tv := range ent.TileVariants { + tv.Variant = tileVariantID(int(tv.Variant)*namespaces + namespace) err := encs[int(tv.Tag)/tagsPerFile].Encode(LibraryEntry{ TileVariants: []TileVariant{tv}, }) @@ -166,6 +190,11 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error { // Easier for downstream code. atomic.AddInt64(&countGenomes, int64(len(ent.CompactGenomes))) for _, cg := range ent.CompactGenomes { + for i, v := range cg.Variants { + if v > 0 { + cg.Variants[i] = tileVariantID(int(v)*namespaces + namespace) + } + } for i, enc := range encs { start := i * tagsPerFile end := start + tagsPerFile @@ -194,6 +223,13 @@ func Slice(dstdir, srcdir string, tagsPerFile int) error { // slice. Easier for downstream code. atomic.AddInt64(&countReferences, int64(len(ent.CompactSequences))) if len(ent.CompactSequences) > 0 { + for _, cs := range ent.CompactSequences { + for _, tseq := range cs.TileSequences { + for i, libref := range tseq { + tseq[i].Variant = tileVariantID(int(libref.Variant)*namespaces + namespace) + } + } + } err := encs[0].Encode(LibraryEntry{CompactSequences: ent.CompactSequences}) if err != nil { return err diff --git a/slice_test.go b/slice_test.go index c73d3d7dc0..532b7bb2e5 100644 --- a/slice_test.go +++ b/slice_test.go @@ -19,8 +19,20 @@ var _ = check.Suite(&sliceSuite{}) func (s *sliceSuite) TestImportAndSlice(c *check.C) { tmpdir := c.MkDir() + err := os.Mkdir(tmpdir+"/lib1", 0777) + c.Assert(err, check.IsNil) + err = os.Mkdir(tmpdir+"/lib2", 0777) + c.Assert(err, check.IsNil) + err = os.Mkdir(tmpdir+"/lib3", 0777) + c.Assert(err, check.IsNil) + cwd, err := os.Getwd() + c.Assert(err, check.IsNil) + err = os.Symlink(cwd+"/testdata/pipeline1", tmpdir+"/pipeline1") + c.Assert(err, check.IsNil) + err = os.Symlink(cwd+"/testdata/pipeline1", tmpdir+"/pipeline1dup") + c.Assert(err, check.IsNil) - err := ioutil.WriteFile(tmpdir+"/chr1-12-100.bed", []byte("chr1\t12\t100\ttest.1\n"), 0644) + err = ioutil.WriteFile(tmpdir+"/chr1-12-100.bed", []byte("chr1\t12\t100\ttest.1\n"), 0644) c.Check(err, check.IsNil) c.Log("=== import testdata/ref ===") @@ -29,7 +41,7 @@ func (s *sliceSuite) TestImportAndSlice(c *check.C) { "-tag-library", "testdata/tags", "-output-tiles", "-save-incomplete-tiles", - "-o", tmpdir + "/library1.gob", + "-o", tmpdir + "/lib1/library1.gob", "testdata/ref.fasta", }, nil, os.Stderr, os.Stderr) c.Assert(exited, check.Equals, 0) @@ -39,29 +51,31 @@ func (s *sliceSuite) TestImportAndSlice(c *check.C) { "-local=true", "-tag-library", "testdata/tags", "-output-tiles", - "-o", tmpdir + "/library2.gob", - "testdata/pipeline1", + "-o", tmpdir + "/lib2/library2.gob", + tmpdir + "/pipeline1", }, nil, os.Stderr, os.Stderr) c.Assert(exited, check.Equals, 0) - c.Log("=== merge ===") - exited = (&merger{}).RunCommand("merge", []string{ + c.Log("=== import pipeline1dup ===") + exited = (&importer{}).RunCommand("import", []string{ "-local=true", - "-o", tmpdir + "/library.gob", - tmpdir + "/library1.gob", - tmpdir + "/library2.gob", + "-tag-library", "testdata/tags", + "-output-tiles", + "-o", tmpdir + "/lib3/library3.gob", + tmpdir + "/pipeline1dup", }, nil, os.Stderr, os.Stderr) c.Assert(exited, check.Equals, 0) - input := tmpdir + "/library.gob" slicedir := c.MkDir() c.Log("=== slice ===") exited = (&slicecmd{}).RunCommand("slice", []string{ "-local=true", - "-input-dir=" + input, "-output-dir=" + slicedir, "-tags-per-file=2", + tmpdir + "/lib1", + tmpdir + "/lib2", + tmpdir + "/lib3", }, nil, os.Stderr, os.Stderr) c.Check(exited, check.Equals, 0) out, _ := exec.Command("find", slicedir, "-ls").CombinedOutput() @@ -83,9 +97,9 @@ func (s *sliceSuite) TestImportAndSlice(c *check.C) { defer f.Close() npy, err := gonpy.NewReader(f) c.Assert(err, check.IsNil) - c.Check(npy.Shape, check.DeepEquals, []int{2, 4}) + c.Check(npy.Shape, check.DeepEquals, []int{4, 4}) variants, err := npy.GetInt16() - c.Check(variants, check.DeepEquals, []int16{3, 2, 1, 2, -1, -1, 1, 1}) + c.Check(variants, check.DeepEquals, []int16{3, 2, 1, 2, -1, -1, 1, 1, 3, 2, 1, 2, -1, -1, 1, 1}) annotations, err := ioutil.ReadFile(npydir + "/matrix.0000.annotations.csv") c.Assert(err, check.IsNil) diff --git a/slicenumpy.go b/slicenumpy.go index 2fc2fd6149..a8d217fefa 100644 --- a/slicenumpy.go +++ b/slicenumpy.go @@ -16,7 +16,6 @@ import ( "runtime" "sort" "strings" - "sync" "sync/atomic" "git.arvados.org/arvados.git/sdk/go/arvados" @@ -24,6 +23,7 @@ import ( "github.com/kshedden/gonpy" "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" + "golang.org/x/crypto/blake2b" ) type sliceNumpy struct { @@ -177,10 +177,10 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s } } log.Info("loading reference tiles from all slices") - throttle := throttle{Max: runtime.GOMAXPROCS(0)} + throttle1 := throttle{Max: runtime.GOMAXPROCS(0)} for _, infile := range infiles { infile := infile - throttle.Go(func() error { + throttle1.Go(func() error { defer log.Infof("%s: done", infile) f, err := open(infile) if err != nil { @@ -197,12 +197,12 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s }) }) } - throttle.Wait() + throttle1.Wait() log.Info("reconstructing reference sequences") for seqname, cseq := range refseq { seqname, cseq := seqname, cseq - throttle.Go(func() error { + throttle1.Go(func() error { defer log.Printf("... %s done", seqname) pos := 0 for _, libref := range cseq { @@ -213,7 +213,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s return nil }) } - throttle.Wait() + throttle1.Wait() log.Info("TODO: determining which tiles intersect given regions") @@ -221,7 +221,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s var done int64 for infileIdx, infile := range infiles { infileIdx, infile := infileIdx, infile - throttle.Go(func() error { + throttle1.Go(func() error { seq := make(map[tagID][]TileVariant, 50000) cgs := make(map[string]CompactGenome, len(cgnames)) f, err := open(infile) @@ -233,6 +233,9 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s err = DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error { for _, tv := range ent.TileVariants { variants := seq[tv.Tag] + if len(variants) == 0 { + variants = make([]TileVariant, 100) + } for len(variants) <= int(tv.Variant) { variants = append(variants, TileVariant{}) } @@ -252,46 +255,55 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s // TODO: filters - log.Infof("renumbering variants for tags %d-%d", tagstart, tagend) + log.Infof("renumber/dedup variants for tags %d-%d", tagstart, tagend) variantRemap := make([][]tileVariantID, tagend-tagstart) - var wg sync.WaitGroup + throttle2 := throttle{Max: runtime.GOMAXPROCS(0)} for tag, variants := range seq { tag, variants := tag, variants - wg.Add(1) + throttle2.Acquire() go func() { - defer wg.Done() - count := make([]tileVariantID, len(variants)) + defer throttle2.Release() + count := make(map[[blake2b.Size256]byte]int, len(variants)) for _, cg := range cgs { idx := (tag - tagstart) * 2 if int(idx) < len(cg.Variants) { - count[cg.Variants[idx]]++ - count[cg.Variants[idx+1]]++ + count[variants[cg.Variants[idx]].Blake2b]++ + count[variants[cg.Variants[idx+1]].Blake2b]++ } } - ranked := make([]tileVariantID, len(variants)) - for i := range ranked { - ranked[i] = tileVariantID(i) + // hash[i] will be the hash of + // the variant(s) that should + // be at rank i (0-based). + hash := make([][blake2b.Size256]byte, 0, len(count)) + for b := range count { + hash = append(hash, b) } - sort.Slice(ranked, func(i, j int) bool { - if i == 0 { - return true // leave ranked[0] at position 0, unused - } else if j == 0 { - return false // ditto - } - if cri, crj := count[ranked[i]], count[ranked[j]]; cri != crj { - return cri > crj + sort.Slice(hash, func(i, j int) bool { + bi, bj := &hash[i], &hash[j] + if ci, cj := count[*bi], count[*bj]; ci != cj { + return ci > cj } else { - return bytes.Compare(variants[ranked[i]].Blake2b[:], variants[ranked[j]].Blake2b[:]) < 0 + return bytes.Compare((*bi)[:], (*bj)[:]) < 0 } }) - remap := make([]tileVariantID, len(ranked)) - for i, r := range ranked { - remap[r] = tileVariantID(i) + // rank[b] will be the 1-based + // new variant number for + // variants whose hash is b. + rank := make(map[[blake2b.Size256]byte]tileVariantID, len(hash)) + for i, h := range hash { + rank[h] = tileVariantID(i + 1) + } + // remap[v] will be the new + // variant number for original + // variant number v. + remap := make([]tileVariantID, len(variants)) + for i, tv := range variants { + remap[i] = rank[tv.Blake2b] } variantRemap[tag-tagstart] = remap }() } - wg.Wait() + throttle2.Wait() annotationsFilename := fmt.Sprintf("%s/matrix.%04d.annotations.csv", *outputDir, infileIdx) log.Infof("writing %s", annotationsFilename) @@ -380,7 +392,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s return nil }) } - if err = throttle.Wait(); err != nil { + if err = throttle1.Wait(); err != nil { return 1 } return 0 -- 2.30.2