+func init() {
+ flag.StringVar(
+ &azureTestContainer,
+ "test.azure-storage-container-volume",
+ "",
+ "Name of Azure container to use for testing. Do not use a container with real data! Use -azure-storage-account-name and -azure-storage-key-file arguments to supply credentials.")
+}
+
+type azBlob struct {
+ Data []byte
+ Etag string
+ Metadata map[string]string
+ Mtime time.Time
+ Uncommitted map[string][]byte
+}
+
+type azStubHandler struct {
+ sync.Mutex
+ blobs map[string]*azBlob
+ race chan chan struct{}
+}
+
+func newAzStubHandler() *azStubHandler {
+ return &azStubHandler{
+ blobs: make(map[string]*azBlob),
+ }
+}
+
+func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
+ blob, ok := h.blobs[container+"|"+hash]
+ if !ok {
+ return
+ }
+ blob.Mtime = t
+}
+
+func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
+ h.Lock()
+ defer h.Unlock()
+ h.blobs[container+"|"+hash] = &azBlob{
+ Data: data,
+ Mtime: time.Now(),
+ Metadata: make(map[string]string),
+ Uncommitted: make(map[string][]byte),
+ }
+}
+
+func (h *azStubHandler) unlockAndRace() {
+ if h.race == nil {
+ return
+ }
+ h.Unlock()
+ // Signal caller that race is starting by reading from
+ // h.race. If we get a channel, block until that channel is
+ // ready to receive. If we get nil (or h.race is closed) just
+ // proceed.
+ if c := <-h.race; c != nil {
+ c <- struct{}{}
+ }
+ h.Lock()
+}
+
+var rangeRegexp = regexp.MustCompile(`^bytes=(\d+)-(\d+)$`)
+
+func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
+ h.Lock()
+ defer h.Unlock()
+ // defer log.Printf("azStubHandler: %+v", r)
+
+ path := strings.Split(r.URL.Path, "/")
+ container := path[1]
+ hash := ""
+ if len(path) > 2 {
+ hash = path[2]
+ }
+
+ if err := r.ParseForm(); err != nil {
+ log.Printf("azStubHandler(%+v): %s", r, err)
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+
+ body, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ return
+ }
+
+ type blockListRequestBody struct {
+ XMLName xml.Name `xml:"BlockList"`
+ Uncommitted []string
+ }
+
+ blob, blobExists := h.blobs[container+"|"+hash]
+
+ switch {
+ case r.Method == "PUT" && r.Form.Get("comp") == "":
+ // "Put Blob" API
+ if _, ok := h.blobs[container+"|"+hash]; !ok {
+ // Like the real Azure service, we offer a
+ // race window during which other clients can
+ // list/get the new blob before any data is
+ // committed.
+ h.blobs[container+"|"+hash] = &azBlob{
+ Mtime: time.Now(),
+ Uncommitted: make(map[string][]byte),
+ Metadata: make(map[string]string),
+ Etag: makeEtag(),
+ }
+ h.unlockAndRace()
+ }
+ metadata := make(map[string]string)
+ for k, v := range r.Header {
+ if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
+ name := k[len("x-ms-meta-"):]
+ metadata[strings.ToLower(name)] = v[0]
+ }
+ }
+ h.blobs[container+"|"+hash] = &azBlob{
+ Data: body,
+ Mtime: time.Now(),
+ Uncommitted: make(map[string][]byte),
+ Metadata: metadata,
+ Etag: makeEtag(),
+ }
+ rw.WriteHeader(http.StatusCreated)
+ case r.Method == "PUT" && r.Form.Get("comp") == "block":
+ // "Put Block" API
+ if !blobExists {
+ log.Printf("Got block for nonexistent blob: %+v", r)
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ blockID, err := base64.StdEncoding.DecodeString(r.Form.Get("blockid"))
+ if err != nil || len(blockID) == 0 {
+ log.Printf("Invalid blockid: %+q", r.Form.Get("blockid"))
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ blob.Uncommitted[string(blockID)] = body
+ rw.WriteHeader(http.StatusCreated)
+ case r.Method == "PUT" && r.Form.Get("comp") == "blocklist":
+ // "Put Block List" API
+ bl := &blockListRequestBody{}
+ if err := xml.Unmarshal(body, bl); err != nil {
+ log.Printf("xml Unmarshal: %s", err)
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ for _, encBlockID := range bl.Uncommitted {
+ blockID, err := base64.StdEncoding.DecodeString(encBlockID)
+ if err != nil || len(blockID) == 0 || blob.Uncommitted[string(blockID)] == nil {
+ log.Printf("Invalid blockid: %+q", encBlockID)
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ blob.Data = blob.Uncommitted[string(blockID)]
+ blob.Etag = makeEtag()
+ blob.Mtime = time.Now()
+ delete(blob.Uncommitted, string(blockID))
+ }
+ rw.WriteHeader(http.StatusCreated)
+ case r.Method == "PUT" && r.Form.Get("comp") == "metadata":
+ // "Set Metadata Headers" API. We don't bother
+ // stubbing "Get Metadata Headers": AzureBlobVolume
+ // sets metadata headers only as a way to bump Etag
+ // and Last-Modified.
+ if !blobExists {
+ log.Printf("Got metadata for nonexistent blob: %+v", r)
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ blob.Metadata = make(map[string]string)
+ for k, v := range r.Header {
+ if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
+ name := k[len("x-ms-meta-"):]
+ blob.Metadata[strings.ToLower(name)] = v[0]
+ }
+ }
+ blob.Mtime = time.Now()
+ blob.Etag = makeEtag()
+ case (r.Method == "GET" || r.Method == "HEAD") && r.Form.Get("comp") == "metadata" && hash != "":
+ // "Get Blob Metadata" API
+ if !blobExists {
+ rw.WriteHeader(http.StatusNotFound)
+ return
+ }
+ for k, v := range blob.Metadata {
+ rw.Header().Set(fmt.Sprintf("x-ms-meta-%s", k), v)
+ }
+ return
+ case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
+ // "Get Blob" API
+ if !blobExists {
+ rw.WriteHeader(http.StatusNotFound)
+ return
+ }
+ data := blob.Data
+ if rangeSpec := rangeRegexp.FindStringSubmatch(r.Header.Get("Range")); rangeSpec != nil {
+ b0, err0 := strconv.Atoi(rangeSpec[1])
+ b1, err1 := strconv.Atoi(rangeSpec[2])
+ if err0 != nil || err1 != nil || b0 >= len(data) || b1 >= len(data) || b0 > b1 {
+ rw.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", len(data)))
+ rw.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
+ return
+ }
+ rw.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", b0, b1, len(data)))
+ rw.WriteHeader(http.StatusPartialContent)
+ data = data[b0 : b1+1]
+ }
+ rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123))
+ rw.Header().Set("Content-Length", strconv.Itoa(len(data)))
+ if r.Method == "GET" {
+ if _, err := rw.Write(data); err != nil {
+ log.Printf("write %+q: %s", data, err)
+ }
+ }
+ h.unlockAndRace()
+ case r.Method == "DELETE" && hash != "":
+ // "Delete Blob" API
+ if !blobExists {
+ rw.WriteHeader(http.StatusNotFound)
+ return
+ }
+ delete(h.blobs, container+"|"+hash)
+ rw.WriteHeader(http.StatusAccepted)
+ case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container":
+ // "List Blobs" API
+ prefix := container + "|" + r.Form.Get("prefix")
+ marker := r.Form.Get("marker")
+
+ maxResults := 2
+ if n, err := strconv.Atoi(r.Form.Get("maxresults")); err == nil && n >= 1 && n <= 5000 {
+ maxResults = n
+ }
+
+ resp := storage.BlobListResponse{
+ Marker: marker,
+ NextMarker: "",
+ MaxResults: int64(maxResults),
+ }
+ var hashes sort.StringSlice
+ for k := range h.blobs {
+ if strings.HasPrefix(k, prefix) {
+ hashes = append(hashes, k[len(container)+1:])
+ }
+ }
+ hashes.Sort()
+ for _, hash := range hashes {
+ if len(resp.Blobs) == maxResults {
+ resp.NextMarker = hash
+ break
+ }
+ if len(resp.Blobs) > 0 || marker == "" || marker == hash {
+ blob := h.blobs[container+"|"+hash]
+ bmeta := map[string]string(nil)
+ if r.Form.Get("include") == "metadata" {
+ bmeta = blob.Metadata
+ }
+ b := storage.Blob{
+ Name: hash,
+ Properties: storage.BlobProperties{
+ LastModified: blob.Mtime.Format(time.RFC1123),
+ ContentLength: int64(len(blob.Data)),
+ Etag: blob.Etag,
+ },
+ Metadata: bmeta,
+ }
+ resp.Blobs = append(resp.Blobs, b)
+ }
+ }
+ buf, err := xml.Marshal(resp)
+ if err != nil {
+ log.Print(err)
+ rw.WriteHeader(http.StatusInternalServerError)
+ }
+ rw.Write(buf)
+ default:
+ log.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body)
+ rw.WriteHeader(http.StatusNotImplemented)
+ }