import (
"bytes"
"crypto/md5"
+ "errors"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
"io"
- "log"
"os"
"path/filepath"
)
offset int64
*Block
uploader chan *Block
+ finish chan []error
}
type IKeepClient interface {
}
func (m *ManifestStreamWriter) goUpload() {
- select {
- case block, valid := <-m.uploader:
- if !valid {
- return
+ var errors []error
+ uploader := m.uploader
+ finish := m.finish
+ for true {
+ select {
+ case block, valid := <-uploader:
+ if !valid {
+ finish <- errors
+ return
+ }
+ hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
+ signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
+ if err != nil {
+ errors = append(errors, err)
+ } else {
+ m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
+ }
}
- hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
- signedHash, _, _ := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
- m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
}
-
}
type ManifestWriter struct {
Streams map[string]*ManifestStreamWriter
}
-type walker struct {
- currentDir string
- m *ManifestWriter
-}
-
-func (w walker) WalkFunc(path string, info os.FileInfo, err error) error {
- log.Print("path ", path, " ", info.Name(), " ", info.IsDir())
-
+func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) error {
if info.IsDir() {
- if path == w.currentDir {
- return nil
- }
- return filepath.Walk(path, walker{path, w.m}.WalkFunc)
+ return nil
+ }
+
+ var dir string
+ if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
+ dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
}
- m := w.m
- dir := path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()))]
fn := path[(len(path) - len(info.Name())):]
if m.Streams[dir] == nil {
&manifest.ManifestStream{StreamName: dir},
0,
nil,
- make(chan *Block)}
+ make(chan *Block),
+ make(chan []error)}
go m.Streams[dir].goUpload()
}
return nil
}
-func (m *ManifestWriter) Finish() {
+func (m *ManifestWriter) Finish() error {
+ var errstring string
for _, v := range m.Streams {
if v.uploader != nil {
if v.Block != nil {
}
close(v.uploader)
v.uploader = nil
+
+ errors := <-v.finish
+ close(v.finish)
+ v.finish = nil
+
+ if errors != nil {
+ for _, r := range errors {
+ errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
+ }
+ }
}
}
+ if errstring != "" {
+ return errors.New(errstring)
+ } else {
+ return nil
+ }
}
func (m *ManifestWriter) ManifestText() string {
func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
mw := ManifestWriter{kc, root, map[string]*ManifestStreamWriter{}}
- err = filepath.Walk(root, walker{root, &mw}.WalkFunc)
- mw.Finish()
+ err = filepath.Walk(root, mw.WalkFunc)
if err != nil {
return "", err
- } else {
- return mw.ManifestText(), nil
}
+ err = mw.Finish()
+ if err != nil {
+ return "", err
+ }
+
+ return mw.ManifestText(), nil
}