Export labels.csv with numpy array.
[lightning.git] / merge.go
index 2e32ddb03e51313dae0d0eb5fbb9aa0e377db186..904db8e4d27f5f0b6cc2adff2c2613e850a89ee9 100644 (file)
--- a/merge.go
+++ b/merge.go
@@ -2,7 +2,8 @@ package main
 
 import (
        "bufio"
-       "bytes"
+       "compress/gzip"
+       "context"
        "encoding/gob"
        "errors"
        "flag"
@@ -12,6 +13,7 @@ import (
        "net/http"
        _ "net/http/pprof"
        "os"
+       "strings"
        "sync"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
@@ -25,7 +27,6 @@ type merger struct {
        tagSet  [][]byte
        tilelib *tileLibrary
        mapped  map[string]map[tileLibRef]tileVariantID
-       todo    []liftCompactGenome
        mtxTags sync.Mutex
        errs    chan error
 }
@@ -66,7 +67,7 @@ func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdou
                        return 1
                }
                runner := arvadosContainerRunner{
-                       Name:        "lightning filter",
+                       Name:        "lightning merge",
                        Client:      arvados.NewClientFromEnv(),
                        ProjectUUID: *projectUUID,
                        RAM:         64000000000,
@@ -80,97 +81,48 @@ func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdou
                        }
                }
                runner.Args = append([]string{"merge", "-local=true",
-                       "-o", "/mnt/output/library.gob",
+                       "-o", "/mnt/output/library.gob.gz",
                }, cmd.inputs...)
                var output string
                output, err = runner.Run()
                if err != nil {
                        return 1
                }
-               fmt.Fprintln(stdout, output+"/library.gob")
+               fmt.Fprintln(stdout, output+"/library.gob.gz")
                return 0
        }
 
+       var outf, outw io.WriteCloser
        if *outputFilename == "-" {
-               cmd.output = nopCloser{stdout}
+               outw = nopCloser{stdout}
        } else {
-               cmd.output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
+               outf, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
                if err != nil {
                        return 1
                }
-               defer cmd.output.Close()
+               defer outf.Close()
+               if strings.HasSuffix(*outputFilename, ".gz") {
+                       outw = gzip.NewWriter(outf)
+               } else {
+                       outw = outf
+               }
        }
-
+       cmd.output = outw
        err = cmd.doMerge()
        if err != nil {
                return 1
        }
-       err = cmd.output.Close()
+       err = outw.Close()
        if err != nil {
                return 1
        }
-       return 0
-}
-
-func (cmd *merger) mergeLibraryEntry(ent *LibraryEntry, src string) error {
-       mapped := cmd.mapped[src]
-       if len(cmd.errs) > 0 {
-               return errors.New("stopping after error in other goroutine")
-       }
-       if len(ent.TagSet) > 0 {
-               // We don't need the tagset to do a merge, but if it
-               // does appear in the input, we (a) output it once,
-               // and (b) do a sanity check, erroring out if the
-               // inputs have different tagsets.
-               cmd.mtxTags.Lock()
-               defer cmd.mtxTags.Unlock()
-               if len(cmd.tagSet) == 0 {
-                       cmd.tagSet = ent.TagSet
-                       if cmd.tilelib.encoder != nil {
-                               go cmd.tilelib.encoder.Encode(LibraryEntry{
-                                       TagSet: cmd.tagSet,
-                               })
-                       }
-               } else if len(cmd.tagSet) != len(ent.TagSet) {
-                       return fmt.Errorf("cannot merge libraries with differing tagsets")
-               } else {
-                       for i := range ent.TagSet {
-                               if !bytes.Equal(ent.TagSet[i], cmd.tagSet[i]) {
-                                       return fmt.Errorf("cannot merge libraries with differing tagsets")
-                               }
-                       }
-               }
-       }
-       for _, tv := range ent.TileVariants {
-               // Assign a new variant ID (unique across all inputs)
-               // for each input variant.
-               mapped[tileLibRef{tag: tv.Tag, variant: tv.Variant}] = cmd.tilelib.getRef(tv.Tag, tv.Sequence).variant
-       }
-       for _, cg := range ent.CompactGenomes {
-               cmd.todo = append(cmd.todo, liftCompactGenome{cg, mapped})
-       }
-       return nil
-}
-
-type liftCompactGenome struct {
-       CompactGenome
-       mapped map[tileLibRef]tileVariantID
-}
-
-// Translate old variant IDs to new (mapped) variant IDs.
-func (cg liftCompactGenome) lift() error {
-       for i, variant := range cg.Variants {
-               if variant == 0 {
-                       continue
-               }
-               tag := tagID(i / 2)
-               newvariant, ok := cg.mapped[tileLibRef{tag: tag, variant: variant}]
-               if !ok {
-                       return fmt.Errorf("oops: ent.CompactGenomes[] (%q) refs tag %d variant %d which is not in library", cg.Name, tag, variant)
+       if outf != nil && outf != outw {
+               err = outf.Close()
+               if err != nil {
+                       return 1
                }
-               cg.Variants[tag] = newvariant
        }
-       return nil
+       return 0
 }
 
 func (cmd *merger) setError(err error) {
@@ -182,10 +134,15 @@ func (cmd *merger) setError(err error) {
 
 func (cmd *merger) doMerge() error {
        w := bufio.NewWriter(cmd.output)
+       encoder := gob.NewEncoder(w)
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
        cmd.errs = make(chan error, 1)
        cmd.tilelib = &tileLibrary{
-               encoder:        gob.NewEncoder(w),
-               includeNoCalls: true,
+               encoder:       encoder,
+               retainNoCalls: true,
        }
 
        cmd.mapped = map[string]map[tileLibRef]tileVariantID{}
@@ -210,16 +167,10 @@ func (cmd *merger) doMerge() error {
                go func(input string) {
                        defer wg.Done()
                        log.Printf("%s: reading", input)
-                       err := DecodeLibrary(infile, func(ent *LibraryEntry) error {
-                               return cmd.mergeLibraryEntry(ent, input)
-                       })
-                       if err != nil {
-                               cmd.setError(fmt.Errorf("%s: decode: %w", input, err))
-                               return
-                       }
-                       err = infile.Close()
+                       err := cmd.tilelib.LoadGob(ctx, infile, strings.HasSuffix(input, ".gz"), nil)
                        if err != nil {
-                               cmd.setError(fmt.Errorf("%s: close: %w", input, err))
+                               cmd.setError(fmt.Errorf("%s: load failed: %w", input, err))
+                               cancel()
                                return
                        }
                        log.Printf("%s: done", input)
@@ -230,24 +181,8 @@ func (cmd *merger) doMerge() error {
        if err := <-cmd.errs; err != nil {
                return err
        }
-
-       var cgs []CompactGenome
-       for _, cg := range cmd.todo {
-               err := cg.lift()
-               if err != nil {
-                       return err
-               }
-               cgs = append(cgs, cg.CompactGenome)
-       }
-       err := cmd.tilelib.encoder.Encode(LibraryEntry{
-               CompactGenomes: cgs,
-       })
-       if err != nil {
-               return err
-       }
-
        log.Print("flushing")
-       err = w.Flush()
+       err := w.Flush()
        if err != nil {
                return err
        }