Fix reference assembly.
[lightning.git] / slicenumpy.go
index 55b3c0a140861f72a436f6739f42344f0ed9680a..dad23420e87802cf156804fc2ffbe19c5847abbd 100644 (file)
@@ -16,6 +16,7 @@ import (
        "runtime"
        "sort"
        "strings"
+       "sync/atomic"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/arvados/lightning/hgvs"
@@ -69,7 +70,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        RAM:         250000000000,
                        VCPUs:       32,
                        Priority:    *priority,
-                       KeepCache:   1,
+                       KeepCache:   2,
                        APIAccess:   true,
                }
                err = runner.TranslatePaths(inputDir, regionsFilename)
@@ -163,6 +164,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
 
        log.Info("building list of reference tiles to load") // TODO: more efficient if we had saved all ref tiles in slice0
        type reftileinfo struct {
+               variant  tileVariantID
                seqname  string // chr1
                pos      int    // distance from start of chr1 to start of tile
                tiledata []byte // acgtggcaa...
@@ -170,11 +172,11 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
        reftile := map[tagID]*reftileinfo{}
        for seqname, cseq := range refseq {
                for _, libref := range cseq {
-                       reftile[libref.Tag] = &reftileinfo{seqname: seqname}
+                       reftile[libref.Tag] = &reftileinfo{seqname: seqname, variant: libref.Variant}
                }
        }
        log.Info("loading reference tiles from all slices")
-       throttle := throttle{Max: runtime.NumCPU()}
+       throttle := throttle{Max: runtime.GOMAXPROCS(0)}
        for _, infile := range infiles {
                infile := infile
                throttle.Go(func() error {
@@ -186,7 +188,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        defer f.Close()
                        return DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
                                for _, tv := range ent.TileVariants {
-                                       if dst, ok := reftile[tv.Tag]; ok {
+                                       if dst, ok := reftile[tv.Tag]; ok && dst.variant == tv.Variant {
                                                dst.tiledata = tv.Sequence
                                        }
                                }
@@ -215,10 +217,10 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
        log.Info("TODO: determining which tiles intersect given regions")
 
        log.Info("generating annotations and numpy matrix for each slice")
+       var done int64
        for infileIdx, infile := range infiles {
                infileIdx, infile := infileIdx, infile
                throttle.Go(func() error {
-                       defer log.Infof("%s: done", infile)
                        seq := map[tagID][][]byte{}
                        cgs := make(map[string]CompactGenome, len(cgnames))
                        f, err := open(infile)
@@ -226,6 +228,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                                return err
                        }
                        defer f.Close()
+                       log.Infof("reading %s", infile)
                        err = DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
                                for _, tv := range ent.TileVariants {
                                        variants := seq[tv.Tag]
@@ -246,12 +249,11 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        tagstart := cgs[cgnames[0]].StartTag
                        tagend := cgs[cgnames[0]].EndTag
 
-                       log.Infof("TODO: %s: filtering", infile)
-                       log.Infof("TODO: %s: tidying", infile)
-                       log.Infof("TODO: %s: lowqual to -1", infile)
+                       // TODO: filters
+                       // TODO: tidy/renumber
 
                        annotationsFilename := fmt.Sprintf("%s/matrix.%04d.annotations.csv", *outputDir, infileIdx)
-                       log.Infof("%s: writing annotations to %s", infile, annotationsFilename)
+                       log.Infof("writing %s", annotationsFilename)
                        annof, err := os.Create(annotationsFilename)
                        if err != nil {
                                return err
@@ -312,7 +314,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                                return err
                        }
                        defer output.Close()
-                       bufw := bufio.NewWriter(output)
+                       bufw := bufio.NewWriterSize(output, 1<<26)
                        npw, err := gonpy.NewWriter(nopCloser{bufw})
                        if err != nil {
                                return err
@@ -332,6 +334,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        if err != nil {
                                return err
                        }
+                       log.Infof("%s: done (%d/%d)", infile, int(atomic.AddInt64(&done, 1)), len(infiles))
                        return nil
                })
        }
@@ -340,18 +343,3 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
        }
        return 0
 }
-
-func (*sliceNumpy) writeLibRefs(fnm string, tilelib *tileLibrary, librefs []tileLibRef) error {
-       f, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0666)
-       if err != nil {
-               return err
-       }
-       defer f.Close()
-       for i, libref := range librefs {
-               _, err = fmt.Fprintf(f, "%d,%d,%d\n", i, libref.Tag, libref.Variant)
-               if err != nil {
-                       return err
-               }
-       }
-       return f.Close()
-}