1 // Copyright (C) The Lightning Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
23 "git.arvados.org/arvados.git/sdk/go/arvados"
24 "github.com/klauspost/pgzip"
25 log "github.com/sirupsen/logrus"
28 type slicecmd struct{}
30 func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
34 fmt.Fprintf(stderr, "%s\n", err)
37 flags := flag.NewFlagSet("", flag.ContinueOnError)
38 flags.SetOutput(stderr)
39 pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
40 pprofdir := flags.String("pprof-dir", "", "write Go profile data to `directory` periodically")
41 runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
42 projectUUID := flags.String("project", "", "project `UUID` for output data")
43 priority := flags.Int("priority", 500, "container request priority")
44 preemptible := flags.Bool("preemptible", true, "request preemptible instance")
45 outputDir := flags.String("output-dir", "./out", "output `directory`")
46 tagsPerFile := flags.Int("tags-per-file", 50000, "tags per file (nfiles will be ~10M÷x)")
47 err = flags.Parse(args)
48 if err == flag.ErrHelp {
51 } else if err != nil {
54 inputDirs := flags.Args()
55 if len(inputDirs) == 0 {
56 err = errors.New("no input dirs specified")
62 log.Println(http.ListenAndServe(*pprof, nil))
66 go writeProfilesPeriodically(*pprofdir)
70 runner := arvadosContainerRunner{
71 Name: "lightning slice",
72 Client: arvados.NewClientFromEnv(),
73 ProjectUUID: *projectUUID,
79 Preemptible: *preemptible,
81 for i := range inputDirs {
82 err = runner.TranslatePaths(&inputDirs[i])
87 runner.Args = append([]string{"slice", "-local=true",
89 "-output-dir", "/mnt/output",
92 output, err = runner.Run()
96 fmt.Fprintln(stdout, output)
100 err = Slice(*tagsPerFile, *outputDir, inputDirs)
107 // Read tags+tiles+genomes from srcdir, write to dstdir with (up to)
108 // the specified number of tags per file.
109 func Slice(tagsPerFile int, dstdir string, srcdirs []string) error {
111 for _, srcdir := range srcdirs {
112 files, err := allFiles(srcdir, matchGobFile)
116 infiles = append(infiles, files...)
118 // dirNamespace[dir] is an int in [0,len(dirNamespace)), used below to
119 // namespace variant numbers from different dirs.
120 dirNamespace := map[string]tileVariantID{}
121 for _, path := range infiles {
122 dir, _ := filepath.Split(path)
123 if _, ok := dirNamespace[dir]; !ok {
124 dirNamespace[dir] = tileVariantID(len(dirNamespace))
127 namespaces := tileVariantID(len(dirNamespace))
133 bufws []*bufio.Writer
137 countTileVariants int64
139 countReferences int64
142 throttle := throttle{Max: runtime.GOMAXPROCS(0)}
143 for _, infile := range infiles {
145 throttle.Go(func() error {
146 f, err := open(infile)
151 dir, _ := filepath.Split(infile)
152 namespace := dirNamespace[dir]
153 log.Printf("reading %s (namespace %d)", infile, namespace)
154 return DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
155 if err := throttle.Err(); err != nil {
158 if len(ent.TagSet) > 0 {
159 tagsetOnce.Do(func() {
162 fs, bufws, gzws, encs, err = openOutFiles(dstdir, len(ent.TagSet), tagsPerFile)
167 for _, enc := range encs {
168 err = enc.Encode(LibraryEntry{TagSet: tagset})
176 if err := throttle.Err(); err != nil {
179 atomic.AddInt64(&countTileVariants, int64(len(ent.TileVariants)))
180 for _, tv := range ent.TileVariants {
181 tv.Variant = tv.Variant*namespaces + namespace
184 fileno = int(tv.Tag) / tagsPerFile
186 err := encs[fileno].Encode(LibraryEntry{
187 TileVariants: []TileVariant{tv},
193 // Here, each output file gets a
194 // CompactGenome entry for each
195 // genome, even if there are no
196 // variants in the relevant range.
197 // Easier for downstream code.
198 atomic.AddInt64(&countGenomes, int64(len(ent.CompactGenomes)))
199 for _, cg := range ent.CompactGenomes {
200 for i, v := range cg.Variants {
202 cg.Variants[i] = v*namespaces + namespace
205 for i, enc := range encs {
206 start := i * tagsPerFile
207 end := start + tagsPerFile
208 if max := len(cg.Variants)/2 + int(cg.StartTag); end > max {
211 if start < int(cg.StartTag) {
212 start = int(cg.StartTag)
214 var variants []tileVariantID
216 variants = cg.Variants[(start-int(cg.StartTag))*2 : (end-int(cg.StartTag))*2]
218 err := enc.Encode(LibraryEntry{CompactGenomes: []CompactGenome{{
221 StartTag: tagID(start),
222 EndTag: tagID(start + tagsPerFile),
229 // Write all ref seqs to the first
230 // slice. Easier for downstream code.
231 atomic.AddInt64(&countReferences, int64(len(ent.CompactSequences)))
232 if len(ent.CompactSequences) > 0 {
233 for _, cs := range ent.CompactSequences {
234 for _, tseq := range cs.TileSequences {
235 for i, libref := range tseq {
236 tseq[i].Variant = libref.Variant*namespaces + namespace
240 err := encs[0].Encode(LibraryEntry{CompactSequences: ent.CompactSequences})
250 if throttle.Err() != nil {
251 closeOutFiles(fs, bufws, gzws, encs)
252 return throttle.Err()
254 defer log.Printf("Total %d tile variants, %d genomes, %d reference sequences", countTileVariants, countGenomes, countReferences)
255 return closeOutFiles(fs, bufws, gzws, encs)
258 func openOutFiles(dstdir string, tags, tagsPerFile int) (fs []*os.File, bufws []*bufio.Writer, gzws []*pgzip.Writer, encs []*gob.Encoder, err error) {
259 nfiles := (tags + tagsPerFile - 1) / tagsPerFile
260 fs = make([]*os.File, nfiles)
261 bufws = make([]*bufio.Writer, nfiles)
262 gzws = make([]*pgzip.Writer, nfiles)
263 encs = make([]*gob.Encoder, nfiles)
264 for i := 0; i*tagsPerFile < tags; i++ {
265 fs[i], err = os.Create(dstdir + fmt.Sprintf("/library%04d.gob.gz", i))
269 bufws[i] = bufio.NewWriterSize(fs[i], 1<<26)
270 gzws[i] = pgzip.NewWriter(bufws[i])
271 encs[i] = gob.NewEncoder(gzws[i])
276 func closeOutFiles(fs []*os.File, bufws []*bufio.Writer, gzws []*pgzip.Writer, encs []*gob.Encoder) error {
278 for _, gzw := range gzws {
281 if err != nil && firstErr == nil {
286 for _, bufw := range bufws {
289 if err != nil && firstErr == nil {
294 for _, f := range fs {
297 if err != nil && firstErr == nil {