+
+type WalkUpload struct {
+ MaxWriters int
+ kc IKeepClient
+ stripPrefix string
+ streamMap map[string]*CollectionFileWriter
+ status *log.Logger
+ workers chan struct{}
+ mtx sync.Mutex
+}
+
+// WalkFunc walks a directory tree, uploads each file found and adds it to the
+// CollectionWriter.
+func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
+
+ if info.IsDir() {
+ return nil
+ }
+
+ var dir string
+ if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
+ dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
+ }
+ if dir == "" {
+ dir = "."
+ }
+
+ fn := path[(len(path) - len(info.Name())):]
+
+ if m.streamMap[dir] == nil {
+ m.streamMap[dir] = &CollectionFileWriter{
+ m.kc,
+ &manifest.ManifestStream{StreamName: dir},
+ 0,
+ 0,
+ nil,
+ make(chan *Block),
+ make(chan []error),
+ ""}
+
+ m.mtx.Lock()
+ if m.workers == nil {
+ if m.MaxWriters < 1 {
+ m.MaxWriters = 2
+ }
+ m.workers = make(chan struct{}, m.MaxWriters)
+ }
+ m.mtx.Unlock()
+
+ go m.streamMap[dir].goUpload(m.workers)
+ }
+
+ fileWriter := m.streamMap[dir]
+
+ // Reset the CollectionFileWriter for a new file
+ fileWriter.NewFile(fn)
+
+ file, err := os.Open(path)
+ if err != nil {
+ return err
+ }
+ defer file.Close()
+
+ m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
+
+ _, err = io.Copy(fileWriter, file)
+ if err != nil {
+ return err
+ }
+
+ // Commits the current file. Legal to call this repeatedly.
+ fileWriter.Close()
+
+ return nil
+}
+
+func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) {
+ streamMap := make(map[string]*CollectionFileWriter)
+ wu := &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
+ err = filepath.Walk(root, wu.WalkFunc)
+
+ if err != nil {
+ return "", err
+ }
+
+ cw.mtx.Lock()
+ for _, st := range streamMap {
+ cw.Streams = append(cw.Streams, st)
+ }
+ cw.mtx.Unlock()
+
+ return cw.ManifestText()
+}