1 // Copyright (C) The Lightning Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
21 "git.arvados.org/arvados.git/sdk/go/arvados"
22 "github.com/klauspost/pgzip"
23 log "github.com/sirupsen/logrus"
26 type slicecmd struct{}
28 func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
32 fmt.Fprintf(stderr, "%s\n", err)
35 flags := flag.NewFlagSet("", flag.ContinueOnError)
36 flags.SetOutput(stderr)
37 pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
38 runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
39 projectUUID := flags.String("project", "", "project `UUID` for output data")
40 priority := flags.Int("priority", 500, "container request priority")
41 inputDir := flags.String("input-dir", "./in", "input `directory`")
42 outputDir := flags.String("output-dir", "./out", "output `directory`")
43 tagsPerFile := flags.Int("tags-per-file", 50000, "tags per file (nfiles will be ~10M÷x)")
44 err = flags.Parse(args)
45 if err == flag.ErrHelp {
48 } else if err != nil {
54 log.Println(http.ListenAndServe(*pprof, nil))
59 runner := arvadosContainerRunner{
60 Name: "lightning slice",
61 Client: arvados.NewClientFromEnv(),
62 ProjectUUID: *projectUUID,
69 err = runner.TranslatePaths(inputDir)
73 runner.Args = []string{"slice", "-local=true",
75 "-input-dir", *inputDir,
76 "-output-dir", "/mnt/output",
79 output, err = runner.Run()
83 fmt.Fprintln(stdout, output)
87 err = Slice(*outputDir, *inputDir, *tagsPerFile)
94 // Read tags+tiles+genomes from srcdir, write to dstdir with (up to)
95 // the specified number of tags per file.
96 func Slice(dstdir, srcdir string, tagsPerFile int) error {
97 infiles, err := allGobFiles(srcdir)
106 bufws []*bufio.Writer
110 countTileVariants int64
112 countReferences int64
115 throttle := throttle{Max: runtime.GOMAXPROCS(0)}
116 for _, path := range infiles {
120 defer throttle.Release()
127 log.Printf("reading %s", path)
128 err = DecodeLibrary(f, strings.HasSuffix(path, ".gz"), func(ent *LibraryEntry) error {
129 if err := throttle.Err(); err != nil {
132 if len(ent.TagSet) > 0 {
133 tagsetOnce.Do(func() {
136 fs, bufws, gzws, encs, err = openOutFiles(dstdir, len(ent.TagSet), tagsPerFile)
141 for _, enc := range encs {
142 err = enc.Encode(LibraryEntry{TagSet: tagset})
150 if err := throttle.Err(); err != nil {
153 atomic.AddInt64(&countTileVariants, int64(len(ent.TileVariants)))
154 for _, tv := range ent.TileVariants {
155 err := encs[int(tv.Tag)/tagsPerFile].Encode(LibraryEntry{
156 TileVariants: []TileVariant{tv},
162 // Here, each output file gets a
163 // CompactGenome entry for each
164 // genome, even if there are no
165 // variants in the relevant range.
166 // Easier for downstream code.
167 atomic.AddInt64(&countGenomes, int64(len(ent.CompactGenomes)))
168 for _, cg := range ent.CompactGenomes {
169 for i, enc := range encs {
170 start := i * tagsPerFile
171 end := start + tagsPerFile
172 if max := len(cg.Variants)/2 + int(cg.StartTag); end > max {
175 if start < int(cg.StartTag) {
176 start = int(cg.StartTag)
178 var variants []tileVariantID
180 variants = cg.Variants[(start-int(cg.StartTag))*2 : (end-int(cg.StartTag))*2]
182 err := enc.Encode(LibraryEntry{CompactGenomes: []CompactGenome{{
185 StartTag: tagID(start),
186 EndTag: tagID(start + tagsPerFile),
193 // Write all ref seqs to the first
194 // slice. Easier for downstream code.
195 atomic.AddInt64(&countReferences, int64(len(ent.CompactSequences)))
196 if len(ent.CompactSequences) > 0 {
197 err := encs[0].Encode(LibraryEntry{CompactSequences: ent.CompactSequences})
208 if throttle.Err() != nil {
209 closeOutFiles(fs, bufws, gzws, encs)
210 return throttle.Err()
212 defer log.Printf("Total %d tile variants, %d genomes, %d reference sequences", countTileVariants, countGenomes, countReferences)
213 return closeOutFiles(fs, bufws, gzws, encs)
216 func openOutFiles(dstdir string, tags, tagsPerFile int) (fs []*os.File, bufws []*bufio.Writer, gzws []*pgzip.Writer, encs []*gob.Encoder, err error) {
217 nfiles := (tags + tagsPerFile - 1) / tagsPerFile
218 fs = make([]*os.File, nfiles)
219 bufws = make([]*bufio.Writer, nfiles)
220 gzws = make([]*pgzip.Writer, nfiles)
221 encs = make([]*gob.Encoder, nfiles)
222 for i := 0; i*tagsPerFile < tags; i++ {
223 fs[i], err = os.Create(dstdir + fmt.Sprintf("/library%04d.gob.gz", i))
227 bufws[i] = bufio.NewWriterSize(fs[i], 1<<26)
228 gzws[i] = pgzip.NewWriter(bufws[i])
229 encs[i] = gob.NewEncoder(gzws[i])
234 func closeOutFiles(fs []*os.File, bufws []*bufio.Writer, gzws []*pgzip.Writer, encs []*gob.Encoder) error {
236 for _, gzw := range gzws {
239 if err != nil && firstErr == nil {
244 for _, bufw := range bufws {
247 if err != nil && firstErr == nil {
252 for _, f := range fs {
255 if err != nil && firstErr == nil {