Gzip gob files.
authorTom Clegg <tom@tomclegg.ca>
Fri, 13 Nov 2020 07:43:37 +0000 (02:43 -0500)
committerTom Clegg <tom@tomclegg.ca>
Fri, 13 Nov 2020 07:43:37 +0000 (02:43 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

annotate.go
export.go
exportnumpy.go
filter.go
gob.go
import.go
merge.go
pca.go
stats.go
tilelib.go

index 34d1c97af586acc4f2def992b261e03f7d0d0b5b..ff4ab887175cfb64c43af6ae6afd93da35399ec8 100644 (file)
@@ -112,7 +112,7 @@ func (cmd *annotatecmd) RunCommand(prog string, args []string, stdin io.Reader,
                retainNoCalls:       true,
                retainTileSequences: true,
        }
-       err = tilelib.LoadGob(context.Background(), input, nil)
+       err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), nil)
        if err != nil {
                return 1
        }
index 79e75a794d98359ee4f0893d3c77499f9c0ddc29..b6b160ddb79311fe3a5964b25e2ca0efe482abdc 100644 (file)
--- a/export.go
+++ b/export.go
@@ -134,7 +134,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std
        tilelib := tileLibrary{
                retainNoCalls: true,
        }
-       err = tilelib.LoadGob(context.Background(), input, func(cg CompactGenome) {
+       err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), func(cg CompactGenome) {
                if *pick != "" && *pick != cg.Name {
                        return
                }
@@ -188,7 +188,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std
                bedbufw = bufio.NewWriter(bedout)
        }
 
-       err = cmd.export(bufw, bedout, input, tilelib.taglib.keylen, refseq, cgs)
+       err = cmd.export(bufw, bedout, input, strings.HasSuffix(*inputFilename, ".gz"), tilelib.taglib.keylen, refseq, cgs)
        if err != nil {
                return 1
        }
@@ -217,7 +217,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std
        return 0
 }
 
-func (cmd *exporter) export(out, bedout io.Writer, librdr io.Reader, taglen int, refseq map[string][]tileLibRef, cgs []CompactGenome) error {
+func (cmd *exporter) export(out, bedout io.Writer, librdr io.Reader, gz bool, taglen int, refseq map[string][]tileLibRef, cgs []CompactGenome) error {
        need := map[tileLibRef]bool{}
        var seqnames []string
        for seqname, librefs := range refseq {
@@ -242,7 +242,7 @@ func (cmd *exporter) export(out, bedout io.Writer, librdr io.Reader, taglen int,
 
        log.Infof("export: loading %d tile variants", len(need))
        tileVariant := map[tileLibRef]TileVariant{}
-       err := DecodeLibrary(librdr, func(ent *LibraryEntry) error {
+       err := DecodeLibrary(librdr, gz, func(ent *LibraryEntry) error {
                for _, tv := range ent.TileVariants {
                        libref := tileLibRef{Tag: tv.Tag, Variant: tv.Variant}
                        if need[libref] {
index d67ac6fee73d1842015b55afa36c0b92b64c77c5..7ffbd32ea31a7fde89a815d3eeee708f4b176c4a 100644 (file)
@@ -12,6 +12,7 @@ import (
        _ "net/http/pprof"
        "os"
        "sort"
+       "strings"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/kshedden/gonpy"
@@ -106,7 +107,7 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader,
                retainTileSequences: true,
                compactGenomes:      map[string][]tileVariantID{},
        }
-       err = tilelib.LoadGob(context.Background(), input, nil)
+       err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), nil)
        if err != nil {
                return 1
        }
index a4ffce8e7aa4d0c6ec355d7625a77ef6c52ff87d..51652681543df2921caee8ec790f4065ee146542 100644 (file)
--- a/filter.go
+++ b/filter.go
@@ -11,6 +11,7 @@ import (
        "net/http"
        _ "net/http/pprof"
        "os"
+       "strings"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        log "github.com/sirupsen/logrus"
@@ -165,7 +166,7 @@ func (cmd *filtercmd) RunCommand(prog string, args []string, stdin io.Reader, st
                defer infile.Close()
        }
        log.Print("reading")
-       cgs, err := ReadCompactGenomes(infile)
+       cgs, err := ReadCompactGenomes(infile, strings.HasSuffix(*inputFilename, ".gz"))
        if err != nil {
                return 1
        }
diff --git a/gob.go b/gob.go
index a8401d8af8627f45882c8fae82704595495baeaa..39b15ab4aba904ba02c67bf06fddee15f0f4bab8 100644 (file)
--- a/gob.go
+++ b/gob.go
@@ -2,8 +2,10 @@ package main
 
 import (
        "bufio"
+       "compress/gzip"
        "encoding/gob"
        "io"
+       "io/ioutil"
        _ "net/http/pprof"
 
        "golang.org/x/crypto/blake2b"
@@ -33,18 +35,25 @@ type LibraryEntry struct {
        TileVariants     []TileVariant
 }
 
-func ReadCompactGenomes(rdr io.Reader) ([]CompactGenome, error) {
+func ReadCompactGenomes(rdr io.Reader, gz bool) ([]CompactGenome, error) {
        var ret []CompactGenome
-       err := DecodeLibrary(rdr, func(ent *LibraryEntry) error {
+       err := DecodeLibrary(rdr, gz, func(ent *LibraryEntry) error {
                ret = append(ret, ent.CompactGenomes...)
                return nil
        })
        return ret, err
 }
 
-func DecodeLibrary(rdr io.Reader, cb func(*LibraryEntry) error) error {
-       dec := gob.NewDecoder(bufio.NewReaderSize(rdr, 1<<26))
+func DecodeLibrary(rdr io.Reader, gz bool, cb func(*LibraryEntry) error) error {
+       zrdr := ioutil.NopCloser(rdr)
        var err error
+       if gz {
+               zrdr, err = gzip.NewReader(bufio.NewReaderSize(rdr, 1<<26))
+               if err != nil {
+                       return err
+               }
+       }
+       dec := gob.NewDecoder(zrdr)
        for err == nil {
                var ent LibraryEntry
                err = dec.Decode(&ent)
@@ -52,9 +61,8 @@ func DecodeLibrary(rdr io.Reader, cb func(*LibraryEntry) error) error {
                        err = cb(&ent)
                }
        }
-       if err == io.EOF {
-               return nil
-       } else {
+       if err != io.EOF {
                return err
        }
+       return zrdr.Close()
 }
index f6ca80fed6e9aa5b461f1c7f31d42563de8b1862..629fccac2fc049646a58c23fbbbd11a14292ef52 100644 (file)
--- a/import.go
+++ b/import.go
@@ -112,7 +112,7 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std
                        }
                }
                if cmd.outputFile == "-" {
-                       cmd.outputFile = "/mnt/output/library.gob"
+                       cmd.outputFile = "/mnt/output/library.gob.gz"
                } else {
                        // Not yet implemented, but this should write
                        // the collection to an existing collection,
@@ -136,7 +136,7 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std
                if err != nil {
                        return 1
                }
-               fmt.Fprintln(stdout, output+"/library.gob")
+               fmt.Fprintln(stdout, output+"/library.gob.gz")
                return 0
        }
 
@@ -150,17 +150,22 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std
                return 1
        }
 
-       var output io.WriteCloser
+       var outw, outf io.WriteCloser
        if cmd.outputFile == "-" {
-               output = nopCloser{stdout}
+               outw = nopCloser{stdout}
        } else {
-               output, err = os.OpenFile(cmd.outputFile, os.O_CREATE|os.O_WRONLY, 0777)
+               outf, err = os.OpenFile(cmd.outputFile, os.O_CREATE|os.O_WRONLY, 0777)
                if err != nil {
                        return 1
                }
-               defer output.Close()
+               defer outf.Close()
+               if strings.HasSuffix(cmd.outputFile, ".gz") {
+                       outw = gzip.NewWriter(outf)
+               } else {
+                       outw = outf
+               }
        }
-       bufw := bufio.NewWriter(output)
+       bufw := bufio.NewWriter(outw)
        cmd.encoder = gob.NewEncoder(bufw)
 
        tilelib := &tileLibrary{taglib: taglib, retainNoCalls: cmd.saveIncompleteTiles, skipOOO: cmd.skipOOO}
@@ -182,10 +187,16 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std
        if err != nil {
                return 1
        }
-       err = output.Close()
+       err = outw.Close()
        if err != nil {
                return 1
        }
+       if outf != nil && outf != outw {
+               err = outf.Close()
+               if err != nil {
+                       return 1
+               }
+       }
        return 0
 }
 
index 1807393b593b7d2c4e59ac3b138e3532b977391b..27b32308bf91cd69ae6e41aa2c45c9d4d1ea7ef9 100644 (file)
--- a/merge.go
+++ b/merge.go
@@ -2,6 +2,7 @@ package main
 
 import (
        "bufio"
+       "compress/gzip"
        "context"
        "encoding/gob"
        "errors"
@@ -12,6 +13,7 @@ import (
        "net/http"
        _ "net/http/pprof"
        "os"
+       "strings"
        "sync"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
@@ -79,35 +81,47 @@ func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdou
                        }
                }
                runner.Args = append([]string{"merge", "-local=true",
-                       "-o", "/mnt/output/library.gob",
+                       "-o", "/mnt/output/library.gob.gz",
                }, cmd.inputs...)
                var output string
                output, err = runner.Run()
                if err != nil {
                        return 1
                }
-               fmt.Fprintln(stdout, output+"/library.gob")
+               fmt.Fprintln(stdout, output+"/library.gob.gz")
                return 0
        }
 
+       var outf, outw io.WriteCloser
        if *outputFilename == "-" {
-               cmd.output = nopCloser{stdout}
+               outw = nopCloser{stdout}
        } else {
-               cmd.output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
+               outf, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
                if err != nil {
                        return 1
                }
-               defer cmd.output.Close()
+               defer outf.Close()
+               if strings.HasSuffix(*outputFilename, ".gz") {
+                       outw = gzip.NewWriter(outf)
+               } else {
+                       outw = outf
+               }
        }
-
+       cmd.output = outw
        err = cmd.doMerge()
        if err != nil {
                return 1
        }
-       err = cmd.output.Close()
+       err = outw.Close()
        if err != nil {
                return 1
        }
+       if outf != nil && outf != outw {
+               err = outf.Close()
+               if err != nil {
+                       return 1
+               }
+       }
        return 0
 }
 
@@ -153,18 +167,12 @@ func (cmd *merger) doMerge() error {
                go func(input string) {
                        defer wg.Done()
                        log.Printf("%s: reading", input)
-                       err := cmd.tilelib.LoadGob(ctx, infile, nil)
+                       err := cmd.tilelib.LoadGob(ctx, infile, strings.HasSuffix(input, ".gz"), nil)
                        if err != nil {
                                cmd.setError(fmt.Errorf("%s: load failed: %w", input, err))
                                cancel()
                                return
                        }
-                       err = infile.Close()
-                       if err != nil {
-                               cmd.setError(fmt.Errorf("%s: error closing input file: %w", input, err))
-                               cancel()
-                               return
-                       }
                        log.Printf("%s: done", input)
                }(input)
        }
diff --git a/pca.go b/pca.go
index 65c69eb6e3a495657fad6284230fc015058e47e4..1120a3b68fbcf9935404fb026ef90bacc3aeb780 100644 (file)
--- a/pca.go
+++ b/pca.go
@@ -11,6 +11,7 @@ import (
        "net/http"
        _ "net/http/pprof"
        "os"
+       "strings"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/james-bowman/nlp"
@@ -142,7 +143,7 @@ func (cmd *goPCA) RunCommand(prog string, args []string, stdin io.Reader, stdout
                retainNoCalls:  true,
                compactGenomes: map[string][]tileVariantID{},
        }
-       err = tilelib.LoadGob(context.Background(), input, nil)
+       err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), nil)
        if err != nil {
                return 1
        }
index 8da2a69ad75afb3ebfe72e28aa2f3d59c80bd792..460eef47fb29b865b1c4dd05158b96a80ad0a1dc 100644 (file)
--- a/stats.go
+++ b/stats.go
@@ -2,7 +2,6 @@ package main
 
 import (
        "bufio"
-       "encoding/gob"
        "encoding/json"
        "errors"
        "flag"
@@ -12,6 +11,7 @@ import (
        "net/http"
        _ "net/http/pprof"
        "os"
+       "strings"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        log "github.com/sirupsen/logrus"
@@ -101,7 +101,7 @@ func (cmd *statscmd) RunCommand(prog string, args []string, stdin io.Reader, std
        }
 
        bufw := bufio.NewWriter(output)
-       err = cmd.doStats(input, bufw)
+       err = cmd.doStats(input, strings.HasSuffix(*inputFilename, ".gz"), bufw)
        if err != nil {
                return 1
        }
@@ -116,7 +116,7 @@ func (cmd *statscmd) RunCommand(prog string, args []string, stdin io.Reader, std
        return 0
 }
 
-func (cmd *statscmd) doStats(input io.Reader, output io.Writer) error {
+func (cmd *statscmd) doStats(input io.Reader, gz bool, output io.Writer) error {
        var ret struct {
                Genomes          int
                CalledBases      []int64
@@ -131,15 +131,7 @@ func (cmd *statscmd) doStats(input io.Reader, output io.Writer) error {
        var tagSet [][]byte
        var tagPlacements []int
        tileVariantCalls := map[tileLibRef]int{}
-       dec := gob.NewDecoder(bufio.NewReaderSize(input, 1<<26))
-       for {
-               var ent LibraryEntry
-               err := dec.Decode(&ent)
-               if err == io.EOF {
-                       break
-               } else if err != nil {
-                       return fmt.Errorf("gob decode: %w", err)
-               }
+       err := DecodeLibrary(input, gz, func(ent *LibraryEntry) error {
                ret.Genomes += len(ent.CompactGenomes)
                ret.TileVariants += len(ent.TileVariants)
                if len(ent.TagSet) > 0 {
@@ -186,6 +178,10 @@ func (cmd *statscmd) doStats(input io.Reader, output io.Writer) error {
                        }
                        ret.CalledBases = append(ret.CalledBases, calledBases)
                }
+               return nil
+       })
+       if err != nil {
+               return err
        }
        for id, p := range tagPlacements {
                for len(ret.TagsPlacedNTimes) <= p {
index b1550f67eb64141b0cfe328a4da23ad93d184215..5ad12e7228667024e562e12b8a6030f7fefac05a 100644 (file)
@@ -210,11 +210,11 @@ func (tilelib *tileLibrary) loadCompactSequences(cseqs []CompactSequence, varian
 // match.
 //
 // If onLoadGenome is non-nil, call it on each CompactGenome entry.
-func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, onLoadGenome func(CompactGenome)) error {
+func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, gz bool, onLoadGenome func(CompactGenome)) error {
        cgs := []CompactGenome{}
        cseqs := []CompactSequence{}
        variantmap := map[tileLibRef]tileVariantID{}
-       err := DecodeLibrary(rdr, func(ent *LibraryEntry) error {
+       err := DecodeLibrary(rdr, gz, func(ent *LibraryEntry) error {
                if ctx.Err() != nil {
                        return ctx.Err()
                }