17 "git.arvados.org/arvados.git/sdk/go/arvados"
18 log "github.com/sirupsen/logrus"
27 mapped map[string]map[tileLibRef]tileVariantID
28 todo []liftCompactGenome
33 func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
37 fmt.Fprintf(stderr, "%s\n", err)
40 flags := flag.NewFlagSet("", flag.ContinueOnError)
41 flags.SetOutput(stderr)
42 pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
43 runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
44 projectUUID := flags.String("project", "", "project `UUID` for output data")
45 priority := flags.Int("priority", 500, "container request priority")
46 outputFilename := flags.String("o", "-", "output `file`")
47 err = flags.Parse(args)
48 if err == flag.ErrHelp {
51 } else if err != nil {
55 cmd.inputs = flags.Args()
59 log.Println(http.ListenAndServe(*pprof, nil))
64 if *outputFilename != "-" {
65 err = errors.New("cannot specify output file in container mode: not implemented")
68 runner := arvadosContainerRunner{
69 Name: "lightning filter",
70 Client: arvados.NewClientFromEnv(),
71 ProjectUUID: *projectUUID,
76 for i := range cmd.inputs {
77 err = runner.TranslatePaths(&cmd.inputs[i])
82 runner.Args = append([]string{"merge", "-local=true",
83 "-o", "/mnt/output/library.gob",
86 output, err = runner.Run()
90 fmt.Fprintln(stdout, output+"/library.gob")
94 if *outputFilename == "-" {
95 cmd.output = nopCloser{stdout}
97 cmd.output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
101 defer cmd.output.Close()
108 err = cmd.output.Close()
115 func (cmd *merger) mergeLibraryEntry(ent *LibraryEntry, src string) error {
116 mapped := cmd.mapped[src]
117 if len(cmd.errs) > 0 {
118 return errors.New("stopping after error in other goroutine")
120 if len(ent.TagSet) > 0 {
121 // We don't need the tagset to do a merge, but if it
122 // does appear in the input, we (a) output it once,
123 // and (b) do a sanity check, erroring out if the
124 // inputs have different tagsets.
126 defer cmd.mtxTags.Unlock()
127 if len(cmd.tagSet) == 0 {
128 cmd.tagSet = ent.TagSet
129 if cmd.tilelib.encoder != nil {
130 go cmd.tilelib.encoder.Encode(LibraryEntry{
134 } else if len(cmd.tagSet) != len(ent.TagSet) {
135 return fmt.Errorf("cannot merge libraries with differing tagsets")
137 for i := range ent.TagSet {
138 if !bytes.Equal(ent.TagSet[i], cmd.tagSet[i]) {
139 return fmt.Errorf("cannot merge libraries with differing tagsets")
144 for _, tv := range ent.TileVariants {
145 // Assign a new variant ID (unique across all inputs)
146 // for each input variant.
147 mapped[tileLibRef{tag: tv.Tag, variant: tv.Variant}] = cmd.tilelib.getRef(tv.Tag, tv.Sequence).variant
149 for _, cg := range ent.CompactGenomes {
150 cmd.todo = append(cmd.todo, liftCompactGenome{cg, mapped})
155 type liftCompactGenome struct {
157 mapped map[tileLibRef]tileVariantID
160 // Translate old variant IDs to new (mapped) variant IDs.
161 func (cg liftCompactGenome) lift() error {
162 for i, variant := range cg.Variants {
167 newvariant, ok := cg.mapped[tileLibRef{tag: tag, variant: variant}]
169 return fmt.Errorf("oops: ent.CompactGenomes[] (%q) refs tag %d variant %d which is not in library", cg.Name, tag, variant)
171 cg.Variants[tag] = newvariant
176 func (cmd *merger) setError(err error) {
178 case cmd.errs <- err:
183 func (cmd *merger) doMerge() error {
184 w := bufio.NewWriter(cmd.output)
185 cmd.errs = make(chan error, 1)
186 cmd.tilelib = &tileLibrary{
187 encoder: gob.NewEncoder(w),
188 includeNoCalls: true,
191 cmd.mapped = map[string]map[tileLibRef]tileVariantID{}
192 for _, input := range cmd.inputs {
193 cmd.mapped[input] = map[tileLibRef]tileVariantID{}
196 var wg sync.WaitGroup
197 for _, input := range cmd.inputs {
198 var infile io.ReadCloser
200 infile = ioutil.NopCloser(cmd.stdin)
203 infile, err = os.Open(input)
210 go func(input string) {
212 log.Printf("%s: reading", input)
213 err := DecodeLibrary(infile, func(ent *LibraryEntry) error {
214 return cmd.mergeLibraryEntry(ent, input)
217 cmd.setError(fmt.Errorf("%s: decode: %w", input, err))
222 cmd.setError(fmt.Errorf("%s: close: %w", input, err))
225 log.Printf("%s: done", input)
230 if err := <-cmd.errs; err != nil {
234 var cgs []CompactGenome
235 for _, cg := range cmd.todo {
240 cgs = append(cgs, cg.CompactGenome)
242 err := cmd.tilelib.encoder.Encode(LibraryEntry{
249 log.Print("flushing")