"git.curoverse.com/arvados.git/sdk/go/manifest"
"io"
"log"
+ "os"
+ "path/filepath"
"strings"
"sync"
)
finish <- errors
}
-// CollectionWriter makes implements creating new Keep collections by opening files
+// CollectionWriter implements creating new Keep collections by opening files
// and writing to them.
type CollectionWriter struct {
IKeepClient
m.mtx.Lock()
defer m.mtx.Unlock()
for _, v := range m.Streams {
+ if len(v.FileStreamSegments) == 0 {
+ continue
+ }
k := v.StreamName
if k == "." {
buf.WriteString(".")
k = strings.Replace(k, "\n", "", -1)
buf.WriteString("./" + k)
}
- for _, b := range v.Blocks {
- buf.WriteString(" ")
- buf.WriteString(b)
+ if len(v.Blocks) > 0 {
+ for _, b := range v.Blocks {
+ buf.WriteString(" ")
+ buf.WriteString(b)
+ }
+ } else {
+ buf.WriteString(" d41d8cd98f00b204e9800998ecf8427e+0")
}
for _, f := range v.FileStreamSegments {
buf.WriteString(" ")
return buf.String(), nil
}
+type WalkUpload struct {
+ kc IKeepClient
+ stripPrefix string
+ streamMap map[string]*CollectionFileWriter
+ status *log.Logger
+}
+
// WalkFunc walks a directory tree, uploads each file found and adds it to the
// CollectionWriter.
-func (m *CollectionWriter) WalkFunc(path string,
- info os.FileInfo,
- err error,
- stripPrefix string,
- streamMap map[string]*manifest.ManifestStream,
- status log.Logger) error {
+func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
if info.IsDir() {
return nil
}
var dir string
- if len(path) > (len(stripPrefix) + len(info.Name()) + 1) {
- dir = path[len(stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
+ if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
+ dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
}
if dir == "" {
dir = "."
fn := path[(len(path) - len(info.Name())):]
- if streamMap[dir] == nil {
- streamMap[dir] = &CollectionFileWriter{
- m.IKeepClient,
+ if m.streamMap[dir] == nil {
+ m.streamMap[dir] = &CollectionFileWriter{
+ m.kc,
&manifest.ManifestStream{StreamName: dir},
0,
0,
make(chan *Block),
make(chan []error),
""}
- go streamMap[dir].goUpload()
+ go m.streamMap[dir].goUpload()
}
- fileWriter := streamMap[dir]
+ fileWriter := m.streamMap[dir]
// Reset the CollectionFileWriter for a new file
fileWriter.NewFile(fn)
}
defer file.Close()
- status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
+ m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
- var count int64
- count, err = io.Copy(fileWriter, file)
+ _, err = io.Copy(fileWriter, file)
if err != nil {
return err
}
return nil
}
-func (cw *CollectionWriter) WriteTree(root string, status log.Logger) (manifest string, err error) {
- streamMap := make(map[string]*ManifestStreamWriter)
- err = filepath.Walk(root, func(path string, info os.FileInfo, err error) {
- return cw.WalkFunc(path,
- info,
- err,
- root,
- streamMap,
- status)
- })
+func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) {
+ streamMap := make(map[string]*CollectionFileWriter)
+ wu := &WalkUpload{cw.IKeepClient, root, streamMap, status}
+ err = filepath.Walk(root, wu.WalkFunc)
if err != nil {
return "", err
}
cw.mtx.Unlock()
- return mw.ManifestText()
+ return cw.ManifestText()
}