Merge branch '1968-monitor-disk-usage'
authorMisha Zatsman <misha@curoverse.com>
Wed, 14 May 2014 00:58:43 +0000 (00:58 +0000)
committerMisha Zatsman <misha@curoverse.com>
Wed, 14 May 2014 16:05:02 +0000 (16:05 +0000)
Lots of changes to datamanager. refs #2719

services/datamanager/experimental/datamanager.py [new file with mode: 0755]
services/datamanager/experimental/datamanager_test.py [new file with mode: 0755]

diff --git a/services/datamanager/experimental/datamanager.py b/services/datamanager/experimental/datamanager.py
new file mode 100755 (executable)
index 0000000..8207bdc
--- /dev/null
@@ -0,0 +1,887 @@
+#! /usr/bin/env python
+
+import arvados
+
+import argparse
+import cgi
+import csv
+import json
+import logging
+import math
+import pprint
+import re
+import threading
+import urllib2
+
+from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
+from collections import defaultdict, Counter
+from functools import partial
+from operator import itemgetter
+from SocketServer import ThreadingMixIn
+
+arv = arvados.api('v1')
+
+# Adapted from http://stackoverflow.com/questions/4180980/formatting-data-quantity-capacity-as-string
+byteunits = ('B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB')
+def fileSizeFormat(value):
+  exponent = 0 if value == 0 else int(math.log(value, 1024))
+  return "%7.2f %-3s" % (float(value) / pow(1024, exponent),
+                         byteunits[exponent])
+
+def percentageFloor(x):
+  """ Returns a float which is the input rounded down to the neared 0.01.
+
+e.g. precentageFloor(0.941354) = 0.94
+"""
+  return math.floor(x*100) / 100.0
+
+
+def byteSizeFromValidUuid(valid_uuid):
+  return int(valid_uuid.split('+')[1])
+
+class maxdict(dict):
+  """A dictionary that holds the largest value entered for each key."""
+  def addValue(self, key, value):
+    dict.__setitem__(self, key, max(dict.get(self, key), value))
+  def addValues(self, kv_pairs):
+    for key,value in kv_pairs:
+      self.addValue(key, value)
+  def addDict(self, d):
+    self.addValues(d.items())
+
+class CollectionInfo:
+  DEFAULT_PERSISTER_REPLICATION_LEVEL=2
+  all_by_uuid = {}
+
+  def __init__(self, uuid):
+    if CollectionInfo.all_by_uuid.has_key(uuid):
+      raise ValueError('Collection for uuid "%s" already exists.' % uuid)
+    self.uuid = uuid
+    self.block_uuids = set()  # uuids of keep blocks in this collection
+    self.reader_uuids = set()  # uuids of users who can read this collection
+    self.persister_uuids = set()  # uuids of users who want this collection saved
+    # map from user uuid to replication level they desire
+    self.persister_replication = maxdict()
+
+    # The whole api response in case we need anything else later.
+    self.api_response = []
+    CollectionInfo.all_by_uuid[uuid] = self
+
+  def byteSize(self):
+    return sum(map(byteSizeFromValidUuid, self.block_uuids))
+
+  def __str__(self):
+    return ('CollectionInfo uuid: %s\n'
+            '               %d block(s) containing %s\n'
+            '               reader_uuids: %s\n'
+            '               persister_replication: %s' %
+            (self.uuid,
+             len(self.block_uuids),
+             fileSizeFormat(self.byteSize()),
+             pprint.pformat(self.reader_uuids, indent = 15),
+             pprint.pformat(self.persister_replication, indent = 15)))
+
+  @staticmethod
+  def get(uuid):
+    if not CollectionInfo.all_by_uuid.has_key(uuid):
+      CollectionInfo(uuid)
+    return CollectionInfo.all_by_uuid[uuid]
+
+
+def extractUuid(candidate):
+  """ Returns a canonical (hash+size) uuid from a valid uuid, or None if candidate is not a valid uuid."""
+  match = re.match('([0-9a-fA-F]{32}\+[0-9]+)(\+[^+]+)*$', candidate)
+  return match and match.group(1)
+
+def checkUserIsAdmin():
+  current_user = arv.users().current().execute()
+
+  if not current_user['is_admin']:
+    log.warning('Current user %s (%s - %s) does not have '
+                'admin access and will not see much of the data.',
+                current_user['full_name'],
+                current_user['email'],
+                current_user['uuid'])
+    if args.require_admin_user:
+      log.critical('Exiting, rerun with --no-require-admin-user '
+                   'if you wish to continue.')
+      exit(1)
+
+def buildCollectionsList():
+  if args.uuid:
+    return [args.uuid,]
+  else:
+    collections_list_response = arv.collections().list(limit=args.max_api_results).execute()
+
+    print ('Returned %d of %d collections.' %
+           (len(collections_list_response['items']),
+            collections_list_response['items_available']))
+
+    return [item['uuid'] for item in collections_list_response['items']]
+
+
+def readCollections(collection_uuids):
+  for collection_uuid in collection_uuids:
+    collection_block_uuids = set()
+    collection_response = arv.collections().get(uuid=collection_uuid).execute()
+    collection_info = CollectionInfo.get(collection_uuid)
+    collection_info.api_response = collection_response
+    manifest_lines = collection_response['manifest_text'].split('\n')
+
+    if args.verbose:
+      print 'Manifest text for %s:' % collection_uuid
+      pprint.pprint(manifest_lines)
+
+    for manifest_line in manifest_lines:
+      if manifest_line:
+        manifest_tokens = manifest_line.split(' ')
+        if args.verbose:
+          print 'manifest tokens: ' + pprint.pformat(manifest_tokens)
+        stream_name = manifest_tokens[0]
+
+        line_block_uuids = set(filter(None,
+                                      [extractUuid(candidate)
+                                       for candidate in manifest_tokens[1:]]))
+        collection_info.block_uuids.update(line_block_uuids)
+
+        # file_tokens = [token
+        #                for token in manifest_tokens[1:]
+        #                if extractUuid(token) is None]
+
+        # # Sort file tokens by start position in case they aren't already
+        # file_tokens.sort(key=lambda file_token: int(file_token.split(':')[0]))
+
+        # if args.verbose:
+        #   print 'line_block_uuids: ' + pprint.pformat(line_block_uuids)
+        #   print 'file_tokens: ' + pprint.pformat(file_tokens)
+
+
+def readLinks():
+  link_classes = set()
+
+  for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
+    # TODO(misha): We may not be seing all the links, but since items
+    # available does not return an accurate number, I don't knos how
+    # to confirm that we saw all of them.
+    collection_links_response = arv.links().list(where={'head_uuid':collection_uuid}).execute()
+    link_classes.update([link['link_class'] for link in collection_links_response['items']])
+    for link in collection_links_response['items']:
+      if link['link_class'] == 'permission':
+        collection_info.reader_uuids.add(link['tail_uuid'])
+      elif link['link_class'] == 'resources':
+        replication_level = link['properties'].get(
+          'replication',
+          CollectionInfo.DEFAULT_PERSISTER_REPLICATION_LEVEL)
+        collection_info.persister_replication.addValue(
+          link['tail_uuid'],
+          replication_level)
+        collection_info.persister_uuids.add(link['tail_uuid'])
+
+  print 'Found the following link classes:'
+  pprint.pprint(link_classes)
+
+def reportMostPopularCollections():
+  most_popular_collections = sorted(
+    CollectionInfo.all_by_uuid.values(),
+    key=lambda info: len(info.reader_uuids) + 10 * len(info.persister_replication),
+    reverse=True)[:10]
+
+  print 'Most popular Collections:'
+  for collection_info in most_popular_collections:
+    print collection_info
+
+
+def buildMaps():
+  for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
+    # Add the block holding the manifest itself for all calculations
+    block_uuids = collection_info.block_uuids.union([collection_uuid,])
+    for block_uuid in block_uuids:
+      block_to_collections[block_uuid].add(collection_uuid)
+      block_to_readers[block_uuid].update(collection_info.reader_uuids)
+      block_to_persisters[block_uuid].update(collection_info.persister_uuids)
+      block_to_persister_replication[block_uuid].addDict(
+        collection_info.persister_replication)
+    for reader_uuid in collection_info.reader_uuids:
+      reader_to_collections[reader_uuid].add(collection_uuid)
+      reader_to_blocks[reader_uuid].update(block_uuids)
+    for persister_uuid in collection_info.persister_uuids:
+      persister_to_collections[persister_uuid].add(collection_uuid)
+      persister_to_blocks[persister_uuid].update(block_uuids)
+
+
+def itemsByValueLength(original):
+  return sorted(original.items(),
+                key=lambda item:len(item[1]),
+                reverse=True)
+
+
+def reportBusiestUsers():
+  busiest_readers = itemsByValueLength(reader_to_collections)
+  print 'The busiest readers are:'
+  for reader,collections in busiest_readers:
+    print '%s reading %d collections.' % (reader, len(collections))
+  busiest_persisters = itemsByValueLength(persister_to_collections)
+  print 'The busiest persisters are:'
+  for persister,collections in busiest_persisters:
+    print '%s reading %d collections.' % (persister, len(collections))
+
+
+def blockDiskUsage(block_uuid):
+  """Returns the disk usage of a block given its uuid.
+
+  Will return 0 before reading the contents of the keep servers.
+  """
+  return byteSizeFromValidUuid(block_uuid) * block_to_replication[block_uuid]
+
+def blockPersistedUsage(user_uuid, block_uuid):
+  return (byteSizeFromValidUuid(block_uuid) *
+          block_to_persister_replication[block_uuid].get(user_uuid, 0))
+
+memo_computeWeightedReplicationCosts = {}
+def computeWeightedReplicationCosts(replication_levels):
+  """Computes the relative cost of varied replication levels.
+
+  replication_levels: a tuple of integers representing the desired
+  replication level. If n users want a replication level of x then x
+  should appear n times in replication_levels.
+
+  Returns a dictionary from replication level to cost.
+
+  The basic thinking is that the cost of replicating at level x should
+  be shared by everyone who wants replication of level x or higher.
+
+  For example, if we have two users who want 1 copy, one user who
+  wants 3 copies and two users who want 6 copies:
+  the input would be [1, 1, 3, 6, 6] (or any permutation)
+
+  The cost of the first copy is shared by all 5 users, so they each
+  pay 1 copy / 5 users = 0.2.
+  The cost of the second and third copies shared by 3 users, so they
+  each pay 2 copies / 3 users = 0.67 (plus the above costs)
+  The cost of the fourth, fifth and sixth copies is shared by two
+  users, so they each pay 3 copies / 2 users = 1.5 (plus the above costs)
+
+  Here are some other examples:
+  computeWeightedReplicationCosts([1,]) -> {1:1.0}
+  computeWeightedReplicationCosts([2,]) -> {2:2.0}
+  computeWeightedReplicationCosts([1,1]) -> {1:0.5}
+  computeWeightedReplicationCosts([2,2]) -> {1:1.0}
+  computeWeightedReplicationCosts([1,2]) -> {1:0.5,2:1.5}
+  computeWeightedReplicationCosts([1,3]) -> {1:0.5,2:2.5}
+  computeWeightedReplicationCosts([1,3,6,6,10]) -> {1:0.2,3:0.7,6:1.7,10:5.7}
+  """
+  replication_level_counts = sorted(Counter(replication_levels).items())
+
+  memo_key = str(replication_level_counts)
+
+  if not memo_key in memo_computeWeightedReplicationCosts:
+    last_level = 0
+    current_cost = 0
+    total_interested = float(sum(map(itemgetter(1), replication_level_counts)))
+    cost_for_level = {}
+    for replication_level, count in replication_level_counts:
+      copies_added = replication_level - last_level
+      # compute marginal cost from last level and add it to the last cost
+      current_cost += copies_added / total_interested
+      cost_for_level[replication_level] = current_cost
+      # update invariants
+      last_level = replication_level
+      total_interested -= count
+    memo_computeWeightedReplicationCosts[memo_key] = cost_for_level
+
+  return memo_computeWeightedReplicationCosts[memo_key]
+
+def blockPersistedWeightedUsage(user_uuid, block_uuid):
+  persister_replication_for_block = block_to_persister_replication[block_uuid]
+  user_replication = persister_replication_for_block[user_uuid]
+  return (
+    byteSizeFromValidUuid(block_uuid) *
+    computeWeightedReplicationCosts(
+      persister_replication_for_block.values())[user_replication])
+
+
+def computeUserStorageUsage():
+  for user, blocks in reader_to_blocks.items():
+    user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map(
+        byteSizeFromValidUuid,
+        blocks))
+    user_to_usage[user][WEIGHTED_READ_SIZE_COL] = sum(map(
+        lambda block_uuid:(float(byteSizeFromValidUuid(block_uuid))/
+                                 len(block_to_readers[block_uuid])),
+        blocks))
+  for user, blocks in persister_to_blocks.items():
+    user_to_usage[user][UNWEIGHTED_PERSIST_SIZE_COL] = sum(map(
+        partial(blockPersistedUsage, user),
+        blocks))
+    user_to_usage[user][WEIGHTED_PERSIST_SIZE_COL] = sum(map(
+        partial(blockPersistedWeightedUsage, user),
+        blocks))
+
+def printUserStorageUsage():
+  print ('user: unweighted readable block size, weighted readable block size, '
+         'unweighted persisted block size, weighted persisted block size:')
+  for user, usage in user_to_usage.items():
+    print ('%s: %s %s %s %s' %
+           (user,
+            fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
+            fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
+            fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
+            fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
+
+def logUserStorageUsage():
+  for user, usage in user_to_usage.items():
+    body = {}
+    # user could actually represent a user or a group. We don't set
+    # the object_type field since we don't know which we have.
+    body['object_uuid'] = user
+    body['event_type'] = args.user_storage_log_event_type
+    properties = {}
+    properties['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL]
+    properties['read_collections_weighted_bytes'] = (
+      usage[WEIGHTED_READ_SIZE_COL])
+    properties['persisted_collections_total_bytes'] = (
+      usage[UNWEIGHTED_PERSIST_SIZE_COL])
+    properties['persisted_collections_weighted_bytes'] = (
+      usage[WEIGHTED_PERSIST_SIZE_COL])
+    body['properties'] = properties
+    # TODO(misha): Confirm that this will throw an exception if it
+    # fails to create the log entry.
+    arv.logs().create(body=body).execute()
+
+def getKeepServers():
+  response = arv.keep_disks().list().execute()
+  return [[keep_server['service_host'], keep_server['service_port']]
+          for keep_server in response['items']]
+
+
+def getKeepBlocks(keep_servers):
+  blocks = []
+  for host,port in keep_servers:
+    response = urllib2.urlopen('http://%s:%d/index' % (host, port))
+    server_blocks = [line.split(' ')
+                     for line in response.read().split('\n')
+                     if line]
+    server_blocks = [(block_id, int(mtime))
+                     for block_id, mtime in server_blocks]
+    blocks.append(server_blocks)
+  return blocks
+
+def getKeepStats(keep_servers):
+  MOUNT_COLUMN = 5
+  TOTAL_COLUMN = 1
+  FREE_COLUMN = 3
+  DISK_BLOCK_SIZE = 1024
+  stats = []
+  for host,port in keep_servers:
+    response = urllib2.urlopen('http://%s:%d/status.json' % (host, port))
+
+    parsed_json = json.load(response)
+    df_entries = [line.split()
+                  for line in parsed_json['df'].split('\n')
+                  if line]
+    keep_volumes = [columns
+                    for columns in df_entries
+                    if 'keep' in columns[MOUNT_COLUMN]]
+    total_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(TOTAL_COLUMN),
+                                                  keep_volumes)))
+    free_space =  DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(FREE_COLUMN),
+                                                  keep_volumes)))
+    stats.append([total_space, free_space])
+  return stats
+
+
+def computeReplication(keep_blocks):
+  for server_blocks in keep_blocks:
+    for block_uuid, _ in server_blocks:
+      block_to_replication[block_uuid] += 1
+  log.debug('Seeing the following replication levels among blocks: %s',
+            str(set(block_to_replication.values())))
+
+
+def computeGarbageCollectionCandidates():
+  for server_blocks in keep_blocks:
+    block_to_latest_mtime.addValues(server_blocks)
+  empty_set = set()
+  garbage_collection_priority = sorted(
+    [(block,mtime)
+     for block,mtime in block_to_latest_mtime.items()
+     if len(block_to_persisters.get(block,empty_set)) == 0],
+    key = itemgetter(1))
+  global garbage_collection_report
+  garbage_collection_report = []
+  cumulative_disk_size = 0
+  for block,mtime in garbage_collection_priority:
+    disk_size = blockDiskUsage(block)
+    cumulative_disk_size += disk_size
+    garbage_collection_report.append(
+      (block,
+       mtime,
+       disk_size,
+       cumulative_disk_size,
+       float(free_keep_space + cumulative_disk_size)/total_keep_space))
+
+  print 'The oldest Garbage Collection Candidates: '
+  pprint.pprint(garbage_collection_report[:20])
+
+
+def outputGarbageCollectionReport(filename):
+  with open(filename, 'wb') as csvfile:
+    gcwriter = csv.writer(csvfile)
+    gcwriter.writerow(['block uuid', 'latest mtime', 'disk size',
+                       'cumulative size', 'disk free'])
+    for line in garbage_collection_report:
+      gcwriter.writerow(line)
+
+def computeGarbageCollectionHistogram():
+  # TODO(misha): Modify this to allow users to specify the number of
+  # histogram buckets through a flag.
+  histogram = []
+  last_percentage = -1
+  for _,mtime,_,_,disk_free in garbage_collection_report:
+    curr_percentage = percentageFloor(disk_free)
+    if curr_percentage > last_percentage:
+      histogram.append( (mtime, curr_percentage) )
+    last_percentage = curr_percentage
+
+  log.info('Garbage collection histogram is: %s', histogram)
+
+  return histogram
+
+
+def logGarbageCollectionHistogram():
+  body = {}
+  # TODO(misha): Decide whether we should specify an object_uuid in
+  # the body and if so, which uuid to use.
+  body['event_type'] = args.block_age_free_space_histogram_log_event_type
+  properties = {}
+  properties['histogram'] = garbage_collection_histogram
+  body['properties'] = properties
+  # TODO(misha): Confirm that this will throw an exception if it
+  # fails to create the log entry.
+  arv.logs().create(body=body).execute()
+
+
+def detectReplicationProblems():
+  blocks_not_in_any_collections.update(
+    set(block_to_replication.keys()).difference(block_to_collections.keys()))
+  underreplicated_persisted_blocks.update(
+    [uuid
+     for uuid, persister_replication in block_to_persister_replication.items()
+     if len(persister_replication) > 0 and
+     block_to_replication[uuid] < max(persister_replication.values())])
+  overreplicated_persisted_blocks.update(
+    [uuid
+     for uuid, persister_replication in block_to_persister_replication.items()
+     if len(persister_replication) > 0 and
+     block_to_replication[uuid] > max(persister_replication.values())])
+
+  log.info('Found %d blocks not in any collections, e.g. %s...',
+           len(blocks_not_in_any_collections),
+           ','.join(list(blocks_not_in_any_collections)[:5]))
+  log.info('Found %d underreplicated blocks, e.g. %s...',
+           len(underreplicated_persisted_blocks),
+           ','.join(list(underreplicated_persisted_blocks)[:5]))
+  log.info('Found %d overreplicated blocks, e.g. %s...',
+           len(overreplicated_persisted_blocks),
+           ','.join(list(overreplicated_persisted_blocks)[:5]))
+
+  # TODO:
+  #  Read blocks sorted by mtime
+  #  Cache window vs % free space
+  #  Collections which candidates will appear in
+  #  Youngest underreplicated read blocks that appear in collections.
+  #  Report Collections that have blocks which are missing from (or
+  #   underreplicated in) keep.
+
+
+# This is the main flow here
+
+parser = argparse.ArgumentParser(description='Report on keep disks.')
+"""The command line argument parser we use.
+
+We only use it in the __main__ block, but leave it outside the block
+in case another package wants to use it or customize it by specifying
+it as a parent to their commandline parser.
+"""
+parser.add_argument('-m',
+                    '--max-api-results',
+                    type=int,
+                    default=5000,
+                    help=('The max results to get at once.'))
+parser.add_argument('-p',
+                    '--port',
+                    type=int,
+                    default=9090,
+                    help=('The port number to serve on. 0 means no server.'))
+parser.add_argument('-v',
+                    '--verbose',
+                    help='increase output verbosity',
+                    action='store_true')
+parser.add_argument('-u',
+                    '--uuid',
+                    help='uuid of specific collection to process')
+parser.add_argument('--require-admin-user',
+                    action='store_true',
+                    default=True,
+                    help='Fail if the user is not an admin [default]')
+parser.add_argument('--no-require-admin-user',
+                    dest='require_admin_user',
+                    action='store_false',
+                    help=('Allow users without admin permissions with '
+                          'only a warning.'))
+parser.add_argument('--log-to-workbench',
+                    action='store_true',
+                    default=False,
+                    help='Log findings to workbench')
+parser.add_argument('--no-log-to-workbench',
+                    dest='log_to_workbench',
+                    action='store_false',
+                    help='Don\'t log findings to workbench [default]')
+parser.add_argument('--user-storage-log-event-type',
+                    default='user-storage-report',
+                    help=('The event type to set when logging user '
+                          'storage usage to workbench.'))
+parser.add_argument('--block-age-free-space-histogram-log-event-type',
+                    default='block-age-free-space-histogram',
+                    help=('The event type to set when logging user '
+                          'storage usage to workbench.'))
+parser.add_argument('--garbage-collection-file',
+                    default='',
+                    help=('The file to write a garbage collection report, or '
+                          'leave empty for no report.'))
+
+args = None
+
+# TODO(misha): Think about moving some of this to the __main__ block.
+log = logging.getLogger('arvados.services.datamanager')
+stderr_handler = logging.StreamHandler()
+log.setLevel(logging.INFO)
+stderr_handler.setFormatter(
+  logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
+log.addHandler(stderr_handler)
+
+# Global Data - don't try this at home
+collection_uuids = []
+
+# These maps all map from uuids to a set of uuids
+block_to_collections = defaultdict(set)  # keep blocks
+reader_to_collections = defaultdict(set)  # collection(s) for which the user has read access
+persister_to_collections = defaultdict(set)  # collection(s) which the user has persisted
+block_to_readers = defaultdict(set)
+block_to_persisters = defaultdict(set)
+block_to_persister_replication = defaultdict(maxdict)
+reader_to_blocks = defaultdict(set)
+persister_to_blocks = defaultdict(set)
+
+UNWEIGHTED_READ_SIZE_COL = 0
+WEIGHTED_READ_SIZE_COL = 1
+UNWEIGHTED_PERSIST_SIZE_COL = 2
+WEIGHTED_PERSIST_SIZE_COL = 3
+NUM_COLS = 4
+user_to_usage = defaultdict(lambda : [0,]*NUM_COLS)
+
+keep_servers = []
+keep_blocks = []
+keep_stats = []
+total_keep_space = 0
+free_keep_space =  0
+
+block_to_replication = defaultdict(lambda: 0)
+block_to_latest_mtime = maxdict()
+
+garbage_collection_report = []
+"""A list of non-persisted blocks, sorted by increasing mtime
+
+Each entry is of the form (block uuid, latest mtime, disk size,
+cumulative size)
+
+* block uuid: The id of the block we want to delete
+* latest mtime: The latest mtime of the block across all keep servers.
+* disk size: The total disk space used by this block (block size
+multiplied by current replication level)
+* cumulative disk size: The sum of this block's disk size and all the
+blocks listed above it
+* disk free: The proportion of our disk space that would be free if we
+deleted this block and all the above. So this is (free disk space +
+cumulative disk size) / total disk capacity
+"""
+
+garbage_collection_histogram = []
+""" Shows the tradeoff of keep block age vs keep disk free space.
+
+Each entry is of the form (mtime, Disk Proportion).
+
+An entry of the form (1388747781, 0.52) means that if we deleted the
+oldest non-presisted blocks until we had 52% of the disk free, then
+all blocks with an mtime greater than 1388747781 would be preserved.
+"""
+
+# Stuff to report on
+blocks_not_in_any_collections = set()
+underreplicated_persisted_blocks = set()
+overreplicated_persisted_blocks = set()
+
+all_data_loaded = False
+
+def loadAllData():
+  checkUserIsAdmin()
+
+  log.info('Building Collection List')
+  global collection_uuids
+  collection_uuids = filter(None, [extractUuid(candidate)
+                                   for candidate in buildCollectionsList()])
+
+  log.info('Reading Collections')
+  readCollections(collection_uuids)
+
+  if args.verbose:
+    pprint.pprint(CollectionInfo.all_by_uuid)
+
+  log.info('Reading Links')
+  readLinks()
+
+  reportMostPopularCollections()
+
+  log.info('Building Maps')
+  buildMaps()
+
+  reportBusiestUsers()
+
+  log.info('Getting Keep Servers')
+  global keep_servers
+  keep_servers = getKeepServers()
+
+  print keep_servers
+
+  log.info('Getting Blocks from each Keep Server.')
+  global keep_blocks
+  keep_blocks = getKeepBlocks(keep_servers)
+
+  log.info('Getting Stats from each Keep Server.')
+  global keep_stats, total_keep_space, free_keep_space
+  keep_stats = getKeepStats(keep_servers)
+
+  total_keep_space = sum(map(itemgetter(0), keep_stats))
+  free_keep_space = sum(map(itemgetter(1), keep_stats))
+
+  # TODO(misha): Delete this hack when the keep servers are fixed!
+  # This hack deals with the fact that keep servers report each other's disks.
+  total_keep_space /= len(keep_stats)
+  free_keep_space /= len(keep_stats)
+
+  log.info('Total disk space: %s, Free disk space: %s (%d%%).' %
+           (fileSizeFormat(total_keep_space),
+            fileSizeFormat(free_keep_space),
+            100*free_keep_space/total_keep_space))
+
+  computeReplication(keep_blocks)
+
+  log.info('average replication level is %f',
+           (float(sum(block_to_replication.values())) /
+            len(block_to_replication)))
+
+  computeGarbageCollectionCandidates()
+
+  if args.garbage_collection_file:
+    log.info('Writing garbage Collection report to %s',
+             args.garbage_collection_file)
+    outputGarbageCollectionReport(args.garbage_collection_file)
+
+  global garbage_collection_histogram
+  garbage_collection_histogram = computeGarbageCollectionHistogram()
+
+  if args.log_to_workbench:
+    logGarbageCollectionHistogram()
+
+  detectReplicationProblems()
+
+  computeUserStorageUsage()
+  printUserStorageUsage()
+  if args.log_to_workbench:
+    logUserStorageUsage()
+
+  global all_data_loaded
+  all_data_loaded = True
+
+
+class DataManagerHandler(BaseHTTPRequestHandler):
+  USER_PATH = 'user'
+  COLLECTION_PATH = 'collection'
+  BLOCK_PATH = 'block'
+
+  def userLink(self, uuid):
+    return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
+            {'uuid': uuid,
+             'path': DataManagerHandler.USER_PATH})
+
+  def collectionLink(self, uuid):
+    return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
+            {'uuid': uuid,
+             'path': DataManagerHandler.COLLECTION_PATH})
+
+  def blockLink(self, uuid):
+    return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
+            {'uuid': uuid,
+             'path': DataManagerHandler.BLOCK_PATH})
+
+  def writeTop(self, title):
+    self.wfile.write('<HTML><HEAD><TITLE>%s</TITLE></HEAD>\n<BODY>' % title)
+
+  def writeBottom(self):
+    self.wfile.write('</BODY></HTML>\n')
+
+  def writeHomePage(self):
+    self.send_response(200)
+    self.end_headers()
+    self.writeTop('Home')
+    self.wfile.write('<TABLE>')
+    self.wfile.write('<TR><TH>user'
+                     '<TH>unweighted readable block size'
+                     '<TH>weighted readable block size'
+                     '<TH>unweighted persisted block size'
+                     '<TH>weighted persisted block size</TR>\n')
+    for user, usage in user_to_usage.items():
+      self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
+                       (self.userLink(user),
+                        fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
+                        fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
+                        fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
+                        fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
+    self.wfile.write('</TABLE>\n')
+    self.writeBottom()
+
+  def userExists(self, uuid):
+    # Currently this will return false for a user who exists but
+    # doesn't appear on any manifests.
+    # TODO(misha): Figure out if we need to fix this.
+    return user_to_usage.has_key(uuid)
+
+  def writeUserPage(self, uuid):
+    if not self.userExists(uuid):
+      self.send_error(404,
+                      'User (%s) Not Found.' % cgi.escape(uuid, quote=False))
+    else:
+      # Here we assume that since a user exists, they don't need to be
+      # html escaped.
+      self.send_response(200)
+      self.end_headers()
+      self.writeTop('User %s' % uuid)
+      self.wfile.write('<TABLE>')
+      self.wfile.write('<TR><TH>user'
+                       '<TH>unweighted readable block size'
+                       '<TH>weighted readable block size'
+                       '<TH>unweighted persisted block size'
+                       '<TH>weighted persisted block size</TR>\n')
+      usage = user_to_usage[uuid]
+      self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
+                       (self.userLink(uuid),
+                        fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
+                        fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
+                        fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
+                        fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
+      self.wfile.write('</TABLE>\n')
+      self.wfile.write('<P>Persisting Collections: %s\n' %
+                       ', '.join(map(self.collectionLink,
+                                     persister_to_collections[uuid])))
+      self.wfile.write('<P>Reading Collections: %s\n' %
+                       ', '.join(map(self.collectionLink,
+                                     reader_to_collections[uuid])))
+      self.writeBottom()
+
+  def collectionExists(self, uuid):
+    return CollectionInfo.all_by_uuid.has_key(uuid)
+
+  def writeCollectionPage(self, uuid):
+    if not self.collectionExists(uuid):
+      self.send_error(404,
+                      'Collection (%s) Not Found.' % cgi.escape(uuid, quote=False))
+    else:
+      collection = CollectionInfo.get(uuid)
+      # Here we assume that since a collection exists, its id doesn't
+      # need to be html escaped.
+      self.send_response(200)
+      self.end_headers()
+      self.writeTop('Collection %s' % uuid)
+      self.wfile.write('<H1>Collection %s</H1>\n' % uuid)
+      self.wfile.write('<P>Total size %s (not factoring in replication).\n' %
+                       fileSizeFormat(collection.byteSize()))
+      self.wfile.write('<P>Readers: %s\n' %
+                       ', '.join(map(self.userLink, collection.reader_uuids)))
+
+      if len(collection.persister_replication) == 0:
+        self.wfile.write('<P>No persisters\n')
+      else:
+        replication_to_users = defaultdict(set)
+        for user,replication in collection.persister_replication.items():
+          replication_to_users[replication].add(user)
+        replication_levels = sorted(replication_to_users.keys())
+
+        self.wfile.write('<P>%d persisters in %d replication level(s) maxing '
+                         'out at %dx replication:\n' %
+                         (len(collection.persister_replication),
+                          len(replication_levels),
+                          replication_levels[-1]))
+
+        # TODO(misha): This code is used twice, let's move it to a method.
+        self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
+                         '<TH>'.join(['Replication Level ' + str(x)
+                                      for x in replication_levels]))
+        self.wfile.write('<TR>\n')
+        for replication_level in replication_levels:
+          users = replication_to_users[replication_level]
+          self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(
+              map(self.userLink, users)))
+        self.wfile.write('</TR></TABLE>\n')
+
+      replication_to_blocks = defaultdict(set)
+      for block in collection.block_uuids:
+        replication_to_blocks[block_to_replication[block]].add(block)
+      replication_levels = sorted(replication_to_blocks.keys())
+      self.wfile.write('<P>%d blocks in %d replication level(s):\n' %
+                       (len(collection.block_uuids), len(replication_levels)))
+      self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
+                       '<TH>'.join(['Replication Level ' + str(x)
+                                    for x in replication_levels]))
+      self.wfile.write('<TR>\n')
+      for replication_level in replication_levels:
+        blocks = replication_to_blocks[replication_level]
+        self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(blocks))
+      self.wfile.write('</TR></TABLE>\n')
+
+
+  def do_GET(self):
+    if not all_data_loaded:
+      self.send_error(503,
+                      'Sorry, but I am still loading all the data I need.')
+    else:
+      # Removing leading '/' and process request path
+      split_path = self.path[1:].split('/')
+      request_type = split_path[0]
+      log.debug('path (%s) split as %s with request_type %s' % (self.path,
+                                                                split_path,
+                                                                request_type))
+      if request_type == '':
+        self.writeHomePage()
+      elif request_type == DataManagerHandler.USER_PATH:
+        self.writeUserPage(split_path[1])
+      elif request_type == DataManagerHandler.COLLECTION_PATH:
+        self.writeCollectionPage(split_path[1])
+      else:
+        self.send_error(404, 'Unrecognized request path.')
+    return
+
+class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
+  """Handle requests in a separate thread."""
+
+
+if __name__ == '__main__':
+  args = parser.parse_args()
+
+  if args.port == 0:
+    loadAllData()
+  else:
+    loader = threading.Thread(target = loadAllData, name = 'loader')
+    loader.start()
+
+    server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
+    server.serve_forever()
diff --git a/services/datamanager/experimental/datamanager_test.py b/services/datamanager/experimental/datamanager_test.py
new file mode 100755 (executable)
index 0000000..0842c16
--- /dev/null
@@ -0,0 +1,41 @@
+#! /usr/bin/env python
+
+import datamanager
+import unittest
+
+class TestComputeWeightedReplicationCosts(unittest.TestCase):
+  def test_obvious(self):
+    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,]),
+                     {1:1.0})
+
+  def test_simple(self):
+    self.assertEqual(datamanager.computeWeightedReplicationCosts([2,]),
+                     {2:2.0})
+
+  def test_even_split(self):
+    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,1]),
+                     {1:0.5})
+
+  def test_even_split_bigger(self):
+    self.assertEqual(datamanager.computeWeightedReplicationCosts([2,2]),
+                     {2:1.0})
+
+  def test_uneven_split(self):
+    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,2]),
+                     {1:0.5, 2:1.5})
+
+  def test_uneven_split_bigger(self):
+    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,3]),
+                     {1:0.5, 3:2.5})
+
+  def test_uneven_split_jumble(self):
+    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,3,6,6,10]),
+                     {1:0.2, 3:0.7, 6:1.7, 10:5.7})
+
+  def test_documentation_example(self):
+    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,1,3,6,6]),
+                     {1:0.2, 3: 0.2 + 2.0 / 3, 6: 0.2 + 2.0 / 3 + 1.5})
+
+
+if __name__ == '__main__':
+  unittest.main()