"runtime"
"sort"
"strings"
- "sync"
"sync/atomic"
"git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/kshedden/gonpy"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
+ "golang.org/x/crypto/blake2b"
)
type sliceNumpy struct {
}
}
log.Info("loading reference tiles from all slices")
- throttle := throttle{Max: runtime.GOMAXPROCS(0)}
+ throttle1 := throttle{Max: runtime.GOMAXPROCS(0)}
for _, infile := range infiles {
infile := infile
- throttle.Go(func() error {
+ throttle1.Go(func() error {
defer log.Infof("%s: done", infile)
f, err := open(infile)
if err != nil {
})
})
}
- throttle.Wait()
+ throttle1.Wait()
log.Info("reconstructing reference sequences")
for seqname, cseq := range refseq {
seqname, cseq := seqname, cseq
- throttle.Go(func() error {
+ throttle1.Go(func() error {
defer log.Printf("... %s done", seqname)
pos := 0
for _, libref := range cseq {
return nil
})
}
- throttle.Wait()
+ throttle1.Wait()
log.Info("TODO: determining which tiles intersect given regions")
var done int64
for infileIdx, infile := range infiles {
infileIdx, infile := infileIdx, infile
- throttle.Go(func() error {
+ throttle1.Go(func() error {
seq := make(map[tagID][]TileVariant, 50000)
cgs := make(map[string]CompactGenome, len(cgnames))
f, err := open(infile)
err = DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
for _, tv := range ent.TileVariants {
variants := seq[tv.Tag]
+ if len(variants) == 0 {
+ variants = make([]TileVariant, 100)
+ }
for len(variants) <= int(tv.Variant) {
variants = append(variants, TileVariant{})
}
// TODO: filters
- log.Infof("renumbering variants for tags %d-%d", tagstart, tagend)
+ log.Infof("renumber/dedup variants for tags %d-%d", tagstart, tagend)
variantRemap := make([][]tileVariantID, tagend-tagstart)
- var wg sync.WaitGroup
+ throttle2 := throttle{Max: runtime.GOMAXPROCS(0)}
for tag, variants := range seq {
tag, variants := tag, variants
- wg.Add(1)
+ throttle2.Acquire()
go func() {
- defer wg.Done()
- count := make([]tileVariantID, len(variants))
+ defer throttle2.Release()
+ count := make(map[[blake2b.Size256]byte]int, len(variants))
for _, cg := range cgs {
idx := (tag - tagstart) * 2
if int(idx) < len(cg.Variants) {
- count[cg.Variants[idx]]++
- count[cg.Variants[idx+1]]++
+ count[variants[cg.Variants[idx]].Blake2b]++
+ count[variants[cg.Variants[idx+1]].Blake2b]++
}
}
- ranked := make([]tileVariantID, len(variants))
- for i := range ranked {
- ranked[i] = tileVariantID(i)
+ // hash[i] will be the hash of
+ // the variant(s) that should
+ // be at rank i (0-based).
+ hash := make([][blake2b.Size256]byte, 0, len(count))
+ for b := range count {
+ hash = append(hash, b)
}
- sort.Slice(ranked, func(i, j int) bool {
- if i == 0 {
- return true // leave ranked[0] at position 0, unused
- } else if j == 0 {
- return false // ditto
- }
- if cri, crj := count[ranked[i]], count[ranked[j]]; cri != crj {
- return cri > crj
+ sort.Slice(hash, func(i, j int) bool {
+ bi, bj := &hash[i], &hash[j]
+ if ci, cj := count[*bi], count[*bj]; ci != cj {
+ return ci > cj
} else {
- return bytes.Compare(variants[ranked[i]].Blake2b[:], variants[ranked[j]].Blake2b[:]) < 0
+ return bytes.Compare((*bi)[:], (*bj)[:]) < 0
}
})
- remap := make([]tileVariantID, len(ranked))
- for i, r := range ranked {
- remap[r] = tileVariantID(i)
+ // rank[b] will be the 1-based
+ // new variant number for
+ // variants whose hash is b.
+ rank := make(map[[blake2b.Size256]byte]tileVariantID, len(hash))
+ for i, h := range hash {
+ rank[h] = tileVariantID(i + 1)
+ }
+ // remap[v] will be the new
+ // variant number for original
+ // variant number v.
+ remap := make([]tileVariantID, len(variants))
+ for i, tv := range variants {
+ remap[i] = rank[tv.Blake2b]
}
variantRemap[tag-tagstart] = remap
}()
}
- wg.Wait()
+ throttle2.Wait()
annotationsFilename := fmt.Sprintf("%s/matrix.%04d.annotations.csv", *outputDir, infileIdx)
log.Infof("writing %s", annotationsFilename)
return nil
})
}
- if err = throttle.Wait(); err != nil {
+ if err = throttle1.Wait(); err != nil {
return 1
}
return 0