From 69b71af4136fdaeeb5c2afbc559208dc5f428c48 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Fri, 13 Nov 2020 02:43:37 -0500 Subject: [PATCH] Gzip gob files. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- annotate.go | 2 +- export.go | 8 ++++---- exportnumpy.go | 3 ++- filter.go | 3 ++- gob.go | 22 +++++++++++++++------- import.go | 27 +++++++++++++++++++-------- merge.go | 36 ++++++++++++++++++++++-------------- pca.go | 3 ++- stats.go | 20 ++++++++------------ tilelib.go | 4 ++-- 10 files changed, 77 insertions(+), 51 deletions(-) diff --git a/annotate.go b/annotate.go index 34d1c97af5..ff4ab88717 100644 --- a/annotate.go +++ b/annotate.go @@ -112,7 +112,7 @@ func (cmd *annotatecmd) RunCommand(prog string, args []string, stdin io.Reader, retainNoCalls: true, retainTileSequences: true, } - err = tilelib.LoadGob(context.Background(), input, nil) + err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), nil) if err != nil { return 1 } diff --git a/export.go b/export.go index 79e75a794d..b6b160ddb7 100644 --- a/export.go +++ b/export.go @@ -134,7 +134,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std tilelib := tileLibrary{ retainNoCalls: true, } - err = tilelib.LoadGob(context.Background(), input, func(cg CompactGenome) { + err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), func(cg CompactGenome) { if *pick != "" && *pick != cg.Name { return } @@ -188,7 +188,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std bedbufw = bufio.NewWriter(bedout) } - err = cmd.export(bufw, bedout, input, tilelib.taglib.keylen, refseq, cgs) + err = cmd.export(bufw, bedout, input, strings.HasSuffix(*inputFilename, ".gz"), tilelib.taglib.keylen, refseq, cgs) if err != nil { return 1 } @@ -217,7 +217,7 @@ func (cmd *exporter) RunCommand(prog string, args []string, stdin io.Reader, std return 0 } -func (cmd *exporter) export(out, bedout io.Writer, librdr io.Reader, taglen int, refseq map[string][]tileLibRef, cgs []CompactGenome) error { +func (cmd *exporter) export(out, bedout io.Writer, librdr io.Reader, gz bool, taglen int, refseq map[string][]tileLibRef, cgs []CompactGenome) error { need := map[tileLibRef]bool{} var seqnames []string for seqname, librefs := range refseq { @@ -242,7 +242,7 @@ func (cmd *exporter) export(out, bedout io.Writer, librdr io.Reader, taglen int, log.Infof("export: loading %d tile variants", len(need)) tileVariant := map[tileLibRef]TileVariant{} - err := DecodeLibrary(librdr, func(ent *LibraryEntry) error { + err := DecodeLibrary(librdr, gz, func(ent *LibraryEntry) error { for _, tv := range ent.TileVariants { libref := tileLibRef{Tag: tv.Tag, Variant: tv.Variant} if need[libref] { diff --git a/exportnumpy.go b/exportnumpy.go index d67ac6fee7..7ffbd32ea3 100644 --- a/exportnumpy.go +++ b/exportnumpy.go @@ -12,6 +12,7 @@ import ( _ "net/http/pprof" "os" "sort" + "strings" "git.arvados.org/arvados.git/sdk/go/arvados" "github.com/kshedden/gonpy" @@ -106,7 +107,7 @@ func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, retainTileSequences: true, compactGenomes: map[string][]tileVariantID{}, } - err = tilelib.LoadGob(context.Background(), input, nil) + err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), nil) if err != nil { return 1 } diff --git a/filter.go b/filter.go index a4ffce8e7a..5165268154 100644 --- a/filter.go +++ b/filter.go @@ -11,6 +11,7 @@ import ( "net/http" _ "net/http/pprof" "os" + "strings" "git.arvados.org/arvados.git/sdk/go/arvados" log "github.com/sirupsen/logrus" @@ -165,7 +166,7 @@ func (cmd *filtercmd) RunCommand(prog string, args []string, stdin io.Reader, st defer infile.Close() } log.Print("reading") - cgs, err := ReadCompactGenomes(infile) + cgs, err := ReadCompactGenomes(infile, strings.HasSuffix(*inputFilename, ".gz")) if err != nil { return 1 } diff --git a/gob.go b/gob.go index a8401d8af8..39b15ab4ab 100644 --- a/gob.go +++ b/gob.go @@ -2,8 +2,10 @@ package main import ( "bufio" + "compress/gzip" "encoding/gob" "io" + "io/ioutil" _ "net/http/pprof" "golang.org/x/crypto/blake2b" @@ -33,18 +35,25 @@ type LibraryEntry struct { TileVariants []TileVariant } -func ReadCompactGenomes(rdr io.Reader) ([]CompactGenome, error) { +func ReadCompactGenomes(rdr io.Reader, gz bool) ([]CompactGenome, error) { var ret []CompactGenome - err := DecodeLibrary(rdr, func(ent *LibraryEntry) error { + err := DecodeLibrary(rdr, gz, func(ent *LibraryEntry) error { ret = append(ret, ent.CompactGenomes...) return nil }) return ret, err } -func DecodeLibrary(rdr io.Reader, cb func(*LibraryEntry) error) error { - dec := gob.NewDecoder(bufio.NewReaderSize(rdr, 1<<26)) +func DecodeLibrary(rdr io.Reader, gz bool, cb func(*LibraryEntry) error) error { + zrdr := ioutil.NopCloser(rdr) var err error + if gz { + zrdr, err = gzip.NewReader(bufio.NewReaderSize(rdr, 1<<26)) + if err != nil { + return err + } + } + dec := gob.NewDecoder(zrdr) for err == nil { var ent LibraryEntry err = dec.Decode(&ent) @@ -52,9 +61,8 @@ func DecodeLibrary(rdr io.Reader, cb func(*LibraryEntry) error) error { err = cb(&ent) } } - if err == io.EOF { - return nil - } else { + if err != io.EOF { return err } + return zrdr.Close() } diff --git a/import.go b/import.go index f6ca80fed6..629fccac2f 100644 --- a/import.go +++ b/import.go @@ -112,7 +112,7 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std } } if cmd.outputFile == "-" { - cmd.outputFile = "/mnt/output/library.gob" + cmd.outputFile = "/mnt/output/library.gob.gz" } else { // Not yet implemented, but this should write // the collection to an existing collection, @@ -136,7 +136,7 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std if err != nil { return 1 } - fmt.Fprintln(stdout, output+"/library.gob") + fmt.Fprintln(stdout, output+"/library.gob.gz") return 0 } @@ -150,17 +150,22 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std return 1 } - var output io.WriteCloser + var outw, outf io.WriteCloser if cmd.outputFile == "-" { - output = nopCloser{stdout} + outw = nopCloser{stdout} } else { - output, err = os.OpenFile(cmd.outputFile, os.O_CREATE|os.O_WRONLY, 0777) + outf, err = os.OpenFile(cmd.outputFile, os.O_CREATE|os.O_WRONLY, 0777) if err != nil { return 1 } - defer output.Close() + defer outf.Close() + if strings.HasSuffix(cmd.outputFile, ".gz") { + outw = gzip.NewWriter(outf) + } else { + outw = outf + } } - bufw := bufio.NewWriter(output) + bufw := bufio.NewWriter(outw) cmd.encoder = gob.NewEncoder(bufw) tilelib := &tileLibrary{taglib: taglib, retainNoCalls: cmd.saveIncompleteTiles, skipOOO: cmd.skipOOO} @@ -182,10 +187,16 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std if err != nil { return 1 } - err = output.Close() + err = outw.Close() if err != nil { return 1 } + if outf != nil && outf != outw { + err = outf.Close() + if err != nil { + return 1 + } + } return 0 } diff --git a/merge.go b/merge.go index 1807393b59..27b32308bf 100644 --- a/merge.go +++ b/merge.go @@ -2,6 +2,7 @@ package main import ( "bufio" + "compress/gzip" "context" "encoding/gob" "errors" @@ -12,6 +13,7 @@ import ( "net/http" _ "net/http/pprof" "os" + "strings" "sync" "git.arvados.org/arvados.git/sdk/go/arvados" @@ -79,35 +81,47 @@ func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdou } } runner.Args = append([]string{"merge", "-local=true", - "-o", "/mnt/output/library.gob", + "-o", "/mnt/output/library.gob.gz", }, cmd.inputs...) var output string output, err = runner.Run() if err != nil { return 1 } - fmt.Fprintln(stdout, output+"/library.gob") + fmt.Fprintln(stdout, output+"/library.gob.gz") return 0 } + var outf, outw io.WriteCloser if *outputFilename == "-" { - cmd.output = nopCloser{stdout} + outw = nopCloser{stdout} } else { - cmd.output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777) + outf, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777) if err != nil { return 1 } - defer cmd.output.Close() + defer outf.Close() + if strings.HasSuffix(*outputFilename, ".gz") { + outw = gzip.NewWriter(outf) + } else { + outw = outf + } } - + cmd.output = outw err = cmd.doMerge() if err != nil { return 1 } - err = cmd.output.Close() + err = outw.Close() if err != nil { return 1 } + if outf != nil && outf != outw { + err = outf.Close() + if err != nil { + return 1 + } + } return 0 } @@ -153,18 +167,12 @@ func (cmd *merger) doMerge() error { go func(input string) { defer wg.Done() log.Printf("%s: reading", input) - err := cmd.tilelib.LoadGob(ctx, infile, nil) + err := cmd.tilelib.LoadGob(ctx, infile, strings.HasSuffix(input, ".gz"), nil) if err != nil { cmd.setError(fmt.Errorf("%s: load failed: %w", input, err)) cancel() return } - err = infile.Close() - if err != nil { - cmd.setError(fmt.Errorf("%s: error closing input file: %w", input, err)) - cancel() - return - } log.Printf("%s: done", input) }(input) } diff --git a/pca.go b/pca.go index 65c69eb6e3..1120a3b68f 100644 --- a/pca.go +++ b/pca.go @@ -11,6 +11,7 @@ import ( "net/http" _ "net/http/pprof" "os" + "strings" "git.arvados.org/arvados.git/sdk/go/arvados" "github.com/james-bowman/nlp" @@ -142,7 +143,7 @@ func (cmd *goPCA) RunCommand(prog string, args []string, stdin io.Reader, stdout retainNoCalls: true, compactGenomes: map[string][]tileVariantID{}, } - err = tilelib.LoadGob(context.Background(), input, nil) + err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), nil) if err != nil { return 1 } diff --git a/stats.go b/stats.go index 8da2a69ad7..460eef47fb 100644 --- a/stats.go +++ b/stats.go @@ -2,7 +2,6 @@ package main import ( "bufio" - "encoding/gob" "encoding/json" "errors" "flag" @@ -12,6 +11,7 @@ import ( "net/http" _ "net/http/pprof" "os" + "strings" "git.arvados.org/arvados.git/sdk/go/arvados" log "github.com/sirupsen/logrus" @@ -101,7 +101,7 @@ func (cmd *statscmd) RunCommand(prog string, args []string, stdin io.Reader, std } bufw := bufio.NewWriter(output) - err = cmd.doStats(input, bufw) + err = cmd.doStats(input, strings.HasSuffix(*inputFilename, ".gz"), bufw) if err != nil { return 1 } @@ -116,7 +116,7 @@ func (cmd *statscmd) RunCommand(prog string, args []string, stdin io.Reader, std return 0 } -func (cmd *statscmd) doStats(input io.Reader, output io.Writer) error { +func (cmd *statscmd) doStats(input io.Reader, gz bool, output io.Writer) error { var ret struct { Genomes int CalledBases []int64 @@ -131,15 +131,7 @@ func (cmd *statscmd) doStats(input io.Reader, output io.Writer) error { var tagSet [][]byte var tagPlacements []int tileVariantCalls := map[tileLibRef]int{} - dec := gob.NewDecoder(bufio.NewReaderSize(input, 1<<26)) - for { - var ent LibraryEntry - err := dec.Decode(&ent) - if err == io.EOF { - break - } else if err != nil { - return fmt.Errorf("gob decode: %w", err) - } + err := DecodeLibrary(input, gz, func(ent *LibraryEntry) error { ret.Genomes += len(ent.CompactGenomes) ret.TileVariants += len(ent.TileVariants) if len(ent.TagSet) > 0 { @@ -186,6 +178,10 @@ func (cmd *statscmd) doStats(input io.Reader, output io.Writer) error { } ret.CalledBases = append(ret.CalledBases, calledBases) } + return nil + }) + if err != nil { + return err } for id, p := range tagPlacements { for len(ret.TagsPlacedNTimes) <= p { diff --git a/tilelib.go b/tilelib.go index b1550f67eb..5ad12e7228 100644 --- a/tilelib.go +++ b/tilelib.go @@ -210,11 +210,11 @@ func (tilelib *tileLibrary) loadCompactSequences(cseqs []CompactSequence, varian // match. // // If onLoadGenome is non-nil, call it on each CompactGenome entry. -func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, onLoadGenome func(CompactGenome)) error { +func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, gz bool, onLoadGenome func(CompactGenome)) error { cgs := []CompactGenome{} cseqs := []CompactSequence{} variantmap := map[tileLibRef]tileVariantID{} - err := DecodeLibrary(rdr, func(ent *LibraryEntry) error { + err := DecodeLibrary(rdr, gz, func(ent *LibraryEntry) error { if ctx.Err() != nil { return ctx.Err() } -- 2.30.2