import (
"bufio"
"encoding/gob"
+ "errors"
"flag"
"fmt"
"io"
"net/http"
_ "net/http/pprof"
"os"
+ "path/filepath"
"runtime"
"strings"
"sync"
runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
projectUUID := flags.String("project", "", "project `UUID` for output data")
priority := flags.Int("priority", 500, "container request priority")
- inputDir := flags.String("input-dir", "./in", "input `directory`")
outputDir := flags.String("output-dir", "./out", "output `directory`")
tagsPerFile := flags.Int("tags-per-file", 50000, "tags per file (nfiles will be ~10M÷x)")
err = flags.Parse(args)
} else if err != nil {
return 2
}
+ inputDirs := flags.Args()
+ if len(inputDirs) == 0 {
+ err = errors.New("no input dirs specified")
+ return 2
+ }
if *pprof != "" {
go func() {
KeepCache: 50,
APIAccess: true,
}
- err = runner.TranslatePaths(inputDir)
- if err != nil {
- return 1
+ for i := range inputDirs {
+ err = runner.TranslatePaths(&inputDirs[i])
+ if err != nil {
+ return 1
+ }
}
- runner.Args = []string{"slice", "-local=true",
+ runner.Args = append([]string{"slice", "-local=true",
"-pprof", ":6060",
- "-input-dir", *inputDir,
"-output-dir", "/mnt/output",
- }
+ }, inputDirs...)
var output string
output, err = runner.Run()
if err != nil {
return 0
}
- err = Slice(*outputDir, *inputDir, *tagsPerFile)
+ err = Slice(*tagsPerFile, *outputDir, inputDirs)
if err != nil {
return 1
}
// Read tags+tiles+genomes from srcdir, write to dstdir with (up to)
// the specified number of tags per file.
-func Slice(dstdir, srcdir string, tagsPerFile int) error {
- infiles, err := allGobFiles(srcdir)
- if err != nil {
- return err
+func Slice(tagsPerFile int, dstdir string, srcdirs []string) error {
+ var infiles []string
+ for _, srcdir := range srcdirs {
+ files, err := allGobFiles(srcdir)
+ if err != nil {
+ return err
+ }
+ infiles = append(infiles, files...)
}
+ // dirNamespace[dir] is an int in [0,len(dirNamespace)), used below to
+ // namespace variant numbers from different dirs.
+ dirNamespace := map[string]int{}
+ for _, path := range infiles {
+ dir, _ := filepath.Split(path)
+ if _, ok := dirNamespace[dir]; !ok {
+ dirNamespace[dir] = len(dirNamespace)
+ }
+ }
+ namespaces := len(dirNamespace)
var (
tagset [][]byte
return
}
defer f.Close()
- log.Printf("reading %s", path)
+ dir, _ := filepath.Split(path)
+ namespace := dirNamespace[dir]
+ log.Printf("reading %s (namespace %d)", path, namespace)
err = DecodeLibrary(f, strings.HasSuffix(path, ".gz"), func(ent *LibraryEntry) error {
if err := throttle.Err(); err != nil {
return err
}
atomic.AddInt64(&countTileVariants, int64(len(ent.TileVariants)))
for _, tv := range ent.TileVariants {
+ tv.Variant = tileVariantID(int(tv.Variant)*namespaces + namespace)
err := encs[int(tv.Tag)/tagsPerFile].Encode(LibraryEntry{
TileVariants: []TileVariant{tv},
})
// Easier for downstream code.
atomic.AddInt64(&countGenomes, int64(len(ent.CompactGenomes)))
for _, cg := range ent.CompactGenomes {
+ for i, v := range cg.Variants {
+ if v > 0 {
+ cg.Variants[i] = tileVariantID(int(v)*namespaces + namespace)
+ }
+ }
for i, enc := range encs {
start := i * tagsPerFile
end := start + tagsPerFile
// slice. Easier for downstream code.
atomic.AddInt64(&countReferences, int64(len(ent.CompactSequences)))
if len(ent.CompactSequences) > 0 {
+ for _, cs := range ent.CompactSequences {
+ for _, tseq := range cs.TileSequences {
+ for i, libref := range tseq {
+ tseq[i].Variant = tileVariantID(int(libref.Variant)*namespaces + namespace)
+ }
+ }
+ }
err := encs[0].Encode(LibraryEntry{CompactSequences: ent.CompactSequences})
if err != nil {
return err
func (s *sliceSuite) TestImportAndSlice(c *check.C) {
tmpdir := c.MkDir()
+ err := os.Mkdir(tmpdir+"/lib1", 0777)
+ c.Assert(err, check.IsNil)
+ err = os.Mkdir(tmpdir+"/lib2", 0777)
+ c.Assert(err, check.IsNil)
+ err = os.Mkdir(tmpdir+"/lib3", 0777)
+ c.Assert(err, check.IsNil)
+ cwd, err := os.Getwd()
+ c.Assert(err, check.IsNil)
+ err = os.Symlink(cwd+"/testdata/pipeline1", tmpdir+"/pipeline1")
+ c.Assert(err, check.IsNil)
+ err = os.Symlink(cwd+"/testdata/pipeline1", tmpdir+"/pipeline1dup")
+ c.Assert(err, check.IsNil)
- err := ioutil.WriteFile(tmpdir+"/chr1-12-100.bed", []byte("chr1\t12\t100\ttest.1\n"), 0644)
+ err = ioutil.WriteFile(tmpdir+"/chr1-12-100.bed", []byte("chr1\t12\t100\ttest.1\n"), 0644)
c.Check(err, check.IsNil)
c.Log("=== import testdata/ref ===")
"-tag-library", "testdata/tags",
"-output-tiles",
"-save-incomplete-tiles",
- "-o", tmpdir + "/library1.gob",
+ "-o", tmpdir + "/lib1/library1.gob",
"testdata/ref.fasta",
}, nil, os.Stderr, os.Stderr)
c.Assert(exited, check.Equals, 0)
"-local=true",
"-tag-library", "testdata/tags",
"-output-tiles",
- "-o", tmpdir + "/library2.gob",
- "testdata/pipeline1",
+ "-o", tmpdir + "/lib2/library2.gob",
+ tmpdir + "/pipeline1",
}, nil, os.Stderr, os.Stderr)
c.Assert(exited, check.Equals, 0)
- c.Log("=== merge ===")
- exited = (&merger{}).RunCommand("merge", []string{
+ c.Log("=== import pipeline1dup ===")
+ exited = (&importer{}).RunCommand("import", []string{
"-local=true",
- "-o", tmpdir + "/library.gob",
- tmpdir + "/library1.gob",
- tmpdir + "/library2.gob",
+ "-tag-library", "testdata/tags",
+ "-output-tiles",
+ "-o", tmpdir + "/lib3/library3.gob",
+ tmpdir + "/pipeline1dup",
}, nil, os.Stderr, os.Stderr)
c.Assert(exited, check.Equals, 0)
- input := tmpdir + "/library.gob"
slicedir := c.MkDir()
c.Log("=== slice ===")
exited = (&slicecmd{}).RunCommand("slice", []string{
"-local=true",
- "-input-dir=" + input,
"-output-dir=" + slicedir,
"-tags-per-file=2",
+ tmpdir + "/lib1",
+ tmpdir + "/lib2",
+ tmpdir + "/lib3",
}, nil, os.Stderr, os.Stderr)
c.Check(exited, check.Equals, 0)
out, _ := exec.Command("find", slicedir, "-ls").CombinedOutput()
defer f.Close()
npy, err := gonpy.NewReader(f)
c.Assert(err, check.IsNil)
- c.Check(npy.Shape, check.DeepEquals, []int{2, 4})
+ c.Check(npy.Shape, check.DeepEquals, []int{4, 4})
variants, err := npy.GetInt16()
- c.Check(variants, check.DeepEquals, []int16{3, 2, 1, 2, -1, -1, 1, 1})
+ c.Check(variants, check.DeepEquals, []int16{3, 2, 1, 2, -1, -1, 1, 1, 3, 2, 1, 2, -1, -1, 1, 1})
annotations, err := ioutil.ReadFile(npydir + "/matrix.0000.annotations.csv")
c.Assert(err, check.IsNil)
"runtime"
"sort"
"strings"
- "sync"
"sync/atomic"
"git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/kshedden/gonpy"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
+ "golang.org/x/crypto/blake2b"
)
type sliceNumpy struct {
}
}
log.Info("loading reference tiles from all slices")
- throttle := throttle{Max: runtime.GOMAXPROCS(0)}
+ throttle1 := throttle{Max: runtime.GOMAXPROCS(0)}
for _, infile := range infiles {
infile := infile
- throttle.Go(func() error {
+ throttle1.Go(func() error {
defer log.Infof("%s: done", infile)
f, err := open(infile)
if err != nil {
})
})
}
- throttle.Wait()
+ throttle1.Wait()
log.Info("reconstructing reference sequences")
for seqname, cseq := range refseq {
seqname, cseq := seqname, cseq
- throttle.Go(func() error {
+ throttle1.Go(func() error {
defer log.Printf("... %s done", seqname)
pos := 0
for _, libref := range cseq {
return nil
})
}
- throttle.Wait()
+ throttle1.Wait()
log.Info("TODO: determining which tiles intersect given regions")
var done int64
for infileIdx, infile := range infiles {
infileIdx, infile := infileIdx, infile
- throttle.Go(func() error {
+ throttle1.Go(func() error {
seq := make(map[tagID][]TileVariant, 50000)
cgs := make(map[string]CompactGenome, len(cgnames))
f, err := open(infile)
err = DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
for _, tv := range ent.TileVariants {
variants := seq[tv.Tag]
+ if len(variants) == 0 {
+ variants = make([]TileVariant, 100)
+ }
for len(variants) <= int(tv.Variant) {
variants = append(variants, TileVariant{})
}
// TODO: filters
- log.Infof("renumbering variants for tags %d-%d", tagstart, tagend)
+ log.Infof("renumber/dedup variants for tags %d-%d", tagstart, tagend)
variantRemap := make([][]tileVariantID, tagend-tagstart)
- var wg sync.WaitGroup
+ throttle2 := throttle{Max: runtime.GOMAXPROCS(0)}
for tag, variants := range seq {
tag, variants := tag, variants
- wg.Add(1)
+ throttle2.Acquire()
go func() {
- defer wg.Done()
- count := make([]tileVariantID, len(variants))
+ defer throttle2.Release()
+ count := make(map[[blake2b.Size256]byte]int, len(variants))
for _, cg := range cgs {
idx := (tag - tagstart) * 2
if int(idx) < len(cg.Variants) {
- count[cg.Variants[idx]]++
- count[cg.Variants[idx+1]]++
+ count[variants[cg.Variants[idx]].Blake2b]++
+ count[variants[cg.Variants[idx+1]].Blake2b]++
}
}
- ranked := make([]tileVariantID, len(variants))
- for i := range ranked {
- ranked[i] = tileVariantID(i)
+ // hash[i] will be the hash of
+ // the variant(s) that should
+ // be at rank i (0-based).
+ hash := make([][blake2b.Size256]byte, 0, len(count))
+ for b := range count {
+ hash = append(hash, b)
}
- sort.Slice(ranked, func(i, j int) bool {
- if i == 0 {
- return true // leave ranked[0] at position 0, unused
- } else if j == 0 {
- return false // ditto
- }
- if cri, crj := count[ranked[i]], count[ranked[j]]; cri != crj {
- return cri > crj
+ sort.Slice(hash, func(i, j int) bool {
+ bi, bj := &hash[i], &hash[j]
+ if ci, cj := count[*bi], count[*bj]; ci != cj {
+ return ci > cj
} else {
- return bytes.Compare(variants[ranked[i]].Blake2b[:], variants[ranked[j]].Blake2b[:]) < 0
+ return bytes.Compare((*bi)[:], (*bj)[:]) < 0
}
})
- remap := make([]tileVariantID, len(ranked))
- for i, r := range ranked {
- remap[r] = tileVariantID(i)
+ // rank[b] will be the 1-based
+ // new variant number for
+ // variants whose hash is b.
+ rank := make(map[[blake2b.Size256]byte]tileVariantID, len(hash))
+ for i, h := range hash {
+ rank[h] = tileVariantID(i + 1)
+ }
+ // remap[v] will be the new
+ // variant number for original
+ // variant number v.
+ remap := make([]tileVariantID, len(variants))
+ for i, tv := range variants {
+ remap[i] = rank[tv.Blake2b]
}
variantRemap[tag-tagstart] = remap
}()
}
- wg.Wait()
+ throttle2.Wait()
annotationsFilename := fmt.Sprintf("%s/matrix.%04d.annotations.csv", *outputDir, infileIdx)
log.Infof("writing %s", annotationsFilename)
return nil
})
}
- if err = throttle.Wait(); err != nil {
+ if err = throttle1.Wait(); err != nil {
return 1
}
return 0