Fixed bug where block mtimes were stored as strings instead of ints. Closes #2631
[arvados.git] / services / datamanager / experimental / datamanager.py
1 #! /usr/bin/env python
2
3 import arvados
4
5 import argparse
6 import cgi
7 import csv
8 import json
9 import logging
10 import math
11 import pprint
12 import re
13 import threading
14 import urllib2
15
16 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
17 from collections import defaultdict, Counter
18 from functools import partial
19 from operator import itemgetter
20 from SocketServer import ThreadingMixIn
21
22 arv = arvados.api('v1')
23
24 # Adapted from http://stackoverflow.com/questions/4180980/formatting-data-quantity-capacity-as-string
25 byteunits = ('B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB')
26 def fileSizeFormat(value):
27   exponent = 0 if value == 0 else int(math.log(value, 1024))
28   return "%7.2f %-3s" % (float(value) / pow(1024, exponent),
29                          byteunits[exponent])
30
31 def percentageFloor(x):
32   """ Returns a float which is the input rounded down to the neared 0.01.
33
34 e.g. precentageFloor(0.941354) = 0.94
35 """
36   return math.floor(x*100) / 100.0
37
38
39 def byteSizeFromValidUuid(valid_uuid):
40   return int(valid_uuid.split('+')[1])
41
42 class maxdict(dict):
43   """A dictionary that holds the largest value entered for each key."""
44   def addValue(self, key, value):
45     dict.__setitem__(self, key, max(dict.get(self, key), value))
46   def addValues(self, kv_pairs):
47     for key,value in kv_pairs:
48       self.addValue(key, value)
49   def addDict(self, d):
50     self.addValues(d.items())
51
52 class CollectionInfo:
53   DEFAULT_PERSISTER_REPLICATION_LEVEL=2
54   all_by_uuid = {}
55
56   def __init__(self, uuid):
57     if CollectionInfo.all_by_uuid.has_key(uuid):
58       raise ValueError('Collection for uuid "%s" already exists.' % uuid)
59     self.uuid = uuid
60     self.block_uuids = set()  # uuids of keep blocks in this collection
61     self.reader_uuids = set()  # uuids of users who can read this collection
62     self.persister_uuids = set()  # uuids of users who want this collection saved
63     # map from user uuid to replication level they desire
64     self.persister_replication = maxdict()
65
66     # The whole api response in case we need anything else later.
67     self.api_response = []
68     CollectionInfo.all_by_uuid[uuid] = self
69
70   def byteSize(self):
71     return sum(map(byteSizeFromValidUuid, self.block_uuids))
72
73   def __str__(self):
74     return ('CollectionInfo uuid: %s\n'
75             '               %d block(s) containing %s\n'
76             '               reader_uuids: %s\n'
77             '               persister_replication: %s' %
78             (self.uuid,
79              len(self.block_uuids),
80              fileSizeFormat(self.byteSize()),
81              pprint.pformat(self.reader_uuids, indent = 15),
82              pprint.pformat(self.persister_replication, indent = 15)))
83
84   @staticmethod
85   def get(uuid):
86     if not CollectionInfo.all_by_uuid.has_key(uuid):
87       CollectionInfo(uuid)
88     return CollectionInfo.all_by_uuid[uuid]
89
90
91 def extractUuid(candidate):
92   """ Returns a canonical (hash+size) uuid from a valid uuid, or None if candidate is not a valid uuid."""
93   match = re.match('([0-9a-fA-F]{32}\+[0-9]+)(\+[^+]+)*$', candidate)
94   return match and match.group(1)
95
96 def checkUserIsAdmin():
97   current_user = arv.users().current().execute()
98
99   if not current_user['is_admin']:
100     log.warning('Current user %s (%s - %s) does not have '
101                 'admin access and will not see much of the data.',
102                 current_user['full_name'],
103                 current_user['email'],
104                 current_user['uuid'])
105     if args.require_admin_user:
106       log.critical('Exiting, rerun with --no-require-admin-user '
107                    'if you wish to continue.')
108       exit(1)
109
110 def buildCollectionsList():
111   if args.uuid:
112     return [args.uuid,]
113   else:
114     collections_list_response = arv.collections().list(limit=args.max_api_results).execute()
115
116     print ('Returned %d of %d collections.' %
117            (len(collections_list_response['items']),
118             collections_list_response['items_available']))
119
120     return [item['uuid'] for item in collections_list_response['items']]
121
122
123 def readCollections(collection_uuids):
124   for collection_uuid in collection_uuids:
125     collection_block_uuids = set()
126     collection_response = arv.collections().get(uuid=collection_uuid).execute()
127     collection_info = CollectionInfo.get(collection_uuid)
128     collection_info.api_response = collection_response
129     manifest_lines = collection_response['manifest_text'].split('\n')
130
131     if args.verbose:
132       print 'Manifest text for %s:' % collection_uuid
133       pprint.pprint(manifest_lines)
134
135     for manifest_line in manifest_lines:
136       if manifest_line:
137         manifest_tokens = manifest_line.split(' ')
138         if args.verbose:
139           print 'manifest tokens: ' + pprint.pformat(manifest_tokens)
140         stream_name = manifest_tokens[0]
141
142         line_block_uuids = set(filter(None,
143                                       [extractUuid(candidate)
144                                        for candidate in manifest_tokens[1:]]))
145         collection_info.block_uuids.update(line_block_uuids)
146
147         # file_tokens = [token
148         #                for token in manifest_tokens[1:]
149         #                if extractUuid(token) is None]
150
151         # # Sort file tokens by start position in case they aren't already
152         # file_tokens.sort(key=lambda file_token: int(file_token.split(':')[0]))
153
154         # if args.verbose:
155         #   print 'line_block_uuids: ' + pprint.pformat(line_block_uuids)
156         #   print 'file_tokens: ' + pprint.pformat(file_tokens)
157
158
159 def readLinks():
160   link_classes = set()
161
162   for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
163     # TODO(misha): We may not be seing all the links, but since items
164     # available does not return an accurate number, I don't knos how
165     # to confirm that we saw all of them.
166     collection_links_response = arv.links().list(where={'head_uuid':collection_uuid}).execute()
167     link_classes.update([link['link_class'] for link in collection_links_response['items']])
168     for link in collection_links_response['items']:
169       if link['link_class'] == 'permission':
170         collection_info.reader_uuids.add(link['tail_uuid'])
171       elif link['link_class'] == 'resources':
172         replication_level = link['properties'].get(
173           'replication',
174           CollectionInfo.DEFAULT_PERSISTER_REPLICATION_LEVEL)
175         collection_info.persister_replication.addValue(
176           link['tail_uuid'],
177           replication_level)
178         collection_info.persister_uuids.add(link['tail_uuid'])
179
180   print 'Found the following link classes:'
181   pprint.pprint(link_classes)
182
183 def reportMostPopularCollections():
184   most_popular_collections = sorted(
185     CollectionInfo.all_by_uuid.values(),
186     key=lambda info: len(info.reader_uuids) + 10 * len(info.persister_replication),
187     reverse=True)[:10]
188
189   print 'Most popular Collections:'
190   for collection_info in most_popular_collections:
191     print collection_info
192
193
194 def buildMaps():
195   for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
196     # Add the block holding the manifest itself for all calculations
197     block_uuids = collection_info.block_uuids.union([collection_uuid,])
198     for block_uuid in block_uuids:
199       block_to_collections[block_uuid].add(collection_uuid)
200       block_to_readers[block_uuid].update(collection_info.reader_uuids)
201       block_to_persisters[block_uuid].update(collection_info.persister_uuids)
202       block_to_persister_replication[block_uuid].addDict(
203         collection_info.persister_replication)
204     for reader_uuid in collection_info.reader_uuids:
205       reader_to_collections[reader_uuid].add(collection_uuid)
206       reader_to_blocks[reader_uuid].update(block_uuids)
207     for persister_uuid in collection_info.persister_uuids:
208       persister_to_collections[persister_uuid].add(collection_uuid)
209       persister_to_blocks[persister_uuid].update(block_uuids)
210
211
212 def itemsByValueLength(original):
213   return sorted(original.items(),
214                 key=lambda item:len(item[1]),
215                 reverse=True)
216
217
218 def reportBusiestUsers():
219   busiest_readers = itemsByValueLength(reader_to_collections)
220   print 'The busiest readers are:'
221   for reader,collections in busiest_readers:
222     print '%s reading %d collections.' % (reader, len(collections))
223   busiest_persisters = itemsByValueLength(persister_to_collections)
224   print 'The busiest persisters are:'
225   for persister,collections in busiest_persisters:
226     print '%s reading %d collections.' % (persister, len(collections))
227
228
229 def blockDiskUsage(block_uuid):
230   """Returns the disk usage of a block given its uuid.
231
232   Will return 0 before reading the contents of the keep servers.
233   """
234   return byteSizeFromValidUuid(block_uuid) * block_to_replication[block_uuid]
235
236 def blockPersistedUsage(user_uuid, block_uuid):
237   return (byteSizeFromValidUuid(block_uuid) *
238           block_to_persister_replication[block_uuid].get(user_uuid, 0))
239
240 memo_computeWeightedReplicationCosts = {}
241 def computeWeightedReplicationCosts(replication_levels):
242   """Computes the relative cost of varied replication levels.
243
244   replication_levels: a tuple of integers representing the desired
245   replication level. If n users want a replication level of x then x
246   should appear n times in replication_levels.
247
248   Returns a dictionary from replication level to cost.
249
250   The basic thinking is that the cost of replicating at level x should
251   be shared by everyone who wants replication of level x or higher.
252
253   For example, if we have two users who want 1 copy, one user who
254   wants 3 copies and two users who want 6 copies:
255   the input would be [1, 1, 3, 6, 6] (or any permutation)
256
257   The cost of the first copy is shared by all 5 users, so they each
258   pay 1 copy / 5 users = 0.2.
259   The cost of the second and third copies shared by 3 users, so they
260   each pay 2 copies / 3 users = 0.67 (plus the above costs)
261   The cost of the fourth, fifth and sixth copies is shared by two
262   users, so they each pay 3 copies / 2 users = 1.5 (plus the above costs)
263
264   Here are some other examples:
265   computeWeightedReplicationCosts([1,]) -> {1:1.0}
266   computeWeightedReplicationCosts([2,]) -> {2:2.0}
267   computeWeightedReplicationCosts([1,1]) -> {1:0.5}
268   computeWeightedReplicationCosts([2,2]) -> {1:1.0}
269   computeWeightedReplicationCosts([1,2]) -> {1:0.5,2:1.5}
270   computeWeightedReplicationCosts([1,3]) -> {1:0.5,2:2.5}
271   computeWeightedReplicationCosts([1,3,6,6,10]) -> {1:0.2,3:0.7,6:1.7,10:5.7}
272   """
273   replication_level_counts = sorted(Counter(replication_levels).items())
274
275   memo_key = str(replication_level_counts)
276
277   if not memo_key in memo_computeWeightedReplicationCosts:
278     last_level = 0
279     current_cost = 0
280     total_interested = float(sum(map(itemgetter(1), replication_level_counts)))
281     cost_for_level = {}
282     for replication_level, count in replication_level_counts:
283       copies_added = replication_level - last_level
284       # compute marginal cost from last level and add it to the last cost
285       current_cost += copies_added / total_interested
286       cost_for_level[replication_level] = current_cost
287       # update invariants
288       last_level = replication_level
289       total_interested -= count
290     memo_computeWeightedReplicationCosts[memo_key] = cost_for_level
291
292   return memo_computeWeightedReplicationCosts[memo_key]
293
294 def blockPersistedWeightedUsage(user_uuid, block_uuid):
295   persister_replication_for_block = block_to_persister_replication[block_uuid]
296   user_replication = persister_replication_for_block[user_uuid]
297   return (
298     byteSizeFromValidUuid(block_uuid) *
299     computeWeightedReplicationCosts(
300       persister_replication_for_block.values())[user_replication])
301
302
303 def computeUserStorageUsage():
304   for user, blocks in reader_to_blocks.items():
305     user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map(
306         byteSizeFromValidUuid,
307         blocks))
308     user_to_usage[user][WEIGHTED_READ_SIZE_COL] = sum(map(
309         lambda block_uuid:(float(byteSizeFromValidUuid(block_uuid))/
310                                  len(block_to_readers[block_uuid])),
311         blocks))
312   for user, blocks in persister_to_blocks.items():
313     user_to_usage[user][UNWEIGHTED_PERSIST_SIZE_COL] = sum(map(
314         partial(blockPersistedUsage, user),
315         blocks))
316     user_to_usage[user][WEIGHTED_PERSIST_SIZE_COL] = sum(map(
317         partial(blockPersistedWeightedUsage, user),
318         blocks))
319
320 def printUserStorageUsage():
321   print ('user: unweighted readable block size, weighted readable block size, '
322          'unweighted persisted block size, weighted persisted block size:')
323   for user, usage in user_to_usage.items():
324     print ('%s: %s %s %s %s' %
325            (user,
326             fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
327             fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
328             fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
329             fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
330
331 def logUserStorageUsage():
332   for user, usage in user_to_usage.items():
333     body = {}
334     # user could actually represent a user or a group. We don't set
335     # the object_type field since we don't know which we have.
336     body['object_uuid'] = user
337     body['event_type'] = args.user_storage_log_event_type
338     properties = {}
339     properties['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL]
340     properties['read_collections_weighted_bytes'] = (
341       usage[WEIGHTED_READ_SIZE_COL])
342     properties['persisted_collections_total_bytes'] = (
343       usage[UNWEIGHTED_PERSIST_SIZE_COL])
344     properties['persisted_collections_weighted_bytes'] = (
345       usage[WEIGHTED_PERSIST_SIZE_COL])
346     body['properties'] = properties
347     # TODO(misha): Confirm that this will throw an exception if it
348     # fails to create the log entry.
349     arv.logs().create(body=body).execute()
350
351 def getKeepServers():
352   response = arv.keep_disks().list().execute()
353   return [[keep_server['service_host'], keep_server['service_port']]
354           for keep_server in response['items']]
355
356
357 def getKeepBlocks(keep_servers):
358   blocks = []
359   for host,port in keep_servers:
360     response = urllib2.urlopen('http://%s:%d/index' % (host, port))
361     server_blocks = [line.split(' ')
362                      for line in response.read().split('\n')
363                      if line]
364     server_blocks = [(block_id, int(mtime))
365                      for block_id, mtime in server_blocks]
366     blocks.append(server_blocks)
367   return blocks
368
369 def getKeepStats(keep_servers):
370   MOUNT_COLUMN = 5
371   TOTAL_COLUMN = 1
372   FREE_COLUMN = 3
373   DISK_BLOCK_SIZE = 1024
374   stats = []
375   for host,port in keep_servers:
376     response = urllib2.urlopen('http://%s:%d/status.json' % (host, port))
377
378     parsed_json = json.load(response)
379     df_entries = [line.split()
380                   for line in parsed_json['df'].split('\n')
381                   if line]
382     keep_volumes = [columns
383                     for columns in df_entries
384                     if 'keep' in columns[MOUNT_COLUMN]]
385     total_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(TOTAL_COLUMN),
386                                                   keep_volumes)))
387     free_space =  DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(FREE_COLUMN),
388                                                   keep_volumes)))
389     stats.append([total_space, free_space])
390   return stats
391
392
393 def computeReplication(keep_blocks):
394   for server_blocks in keep_blocks:
395     for block_uuid, _ in server_blocks:
396       block_to_replication[block_uuid] += 1
397   log.debug('Seeing the following replication levels among blocks: %s',
398             str(set(block_to_replication.values())))
399
400
401 def computeGarbageCollectionCandidates():
402   for server_blocks in keep_blocks:
403     block_to_latest_mtime.addValues(server_blocks)
404   empty_set = set()
405   garbage_collection_priority = sorted(
406     [(block,mtime)
407      for block,mtime in block_to_latest_mtime.items()
408      if len(block_to_persisters.get(block,empty_set)) == 0],
409     key = itemgetter(1))
410   global garbage_collection_report
411   garbage_collection_report = []
412   cumulative_disk_size = 0
413   for block,mtime in garbage_collection_priority:
414     disk_size = blockDiskUsage(block)
415     cumulative_disk_size += disk_size
416     garbage_collection_report.append(
417       (block,
418        mtime,
419        disk_size,
420        cumulative_disk_size,
421        float(free_keep_space + cumulative_disk_size)/total_keep_space))
422
423   print 'The oldest Garbage Collection Candidates: '
424   pprint.pprint(garbage_collection_report[:20])
425
426
427 def outputGarbageCollectionReport(filename):
428   with open(filename, 'wb') as csvfile:
429     gcwriter = csv.writer(csvfile)
430     gcwriter.writerow(['block uuid', 'latest mtime', 'disk size',
431                        'cumulative size', 'disk free'])
432     for line in garbage_collection_report:
433       gcwriter.writerow(line)
434
435
436 def computeGarbageCollectionHistogram():
437   histogram = []
438   last_percentage = -1
439   for _,mtime,_,_,disk_free in garbage_collection_report:
440     curr_percentage = percentageFloor(disk_free)
441     if curr_percentage > last_percentage:
442       histogram.append( (curr_percentage, mtime) )
443     last_percentage = curr_percentage
444
445   log.info('Garbage collection histogram is: %s', histogram)
446
447   return histogram
448
449
450 def logGarbageCollectionHistogram():
451   body = {}
452   # TODO(misha): Decide whether we should specify an object_uuid in
453   # the body and if so, which uuid to use.
454   body['event_type'] = args.block_age_free_space_histogram_log_event_type
455   properties = {}
456   properties['histogram'] = garbage_collection_histogram
457   body['properties'] = properties
458   # TODO(misha): Confirm that this will throw an exception if it
459   # fails to create the log entry.
460   arv.logs().create(body=body).execute()
461
462
463 def detectReplicationProblems():
464   blocks_not_in_any_collections.update(
465     set(block_to_replication.keys()).difference(block_to_collections.keys()))
466   underreplicated_persisted_blocks.update(
467     [uuid
468      for uuid, persister_replication in block_to_persister_replication.items()
469      if len(persister_replication) > 0 and
470      block_to_replication[uuid] < max(persister_replication.values())])
471   overreplicated_persisted_blocks.update(
472     [uuid
473      for uuid, persister_replication in block_to_persister_replication.items()
474      if len(persister_replication) > 0 and
475      block_to_replication[uuid] > max(persister_replication.values())])
476
477   log.info('Found %d blocks not in any collections, e.g. %s...',
478            len(blocks_not_in_any_collections),
479            ','.join(list(blocks_not_in_any_collections)[:5]))
480   log.info('Found %d underreplicated blocks, e.g. %s...',
481            len(underreplicated_persisted_blocks),
482            ','.join(list(underreplicated_persisted_blocks)[:5]))
483   log.info('Found %d overreplicated blocks, e.g. %s...',
484            len(overreplicated_persisted_blocks),
485            ','.join(list(overreplicated_persisted_blocks)[:5]))
486
487   # TODO:
488   #  Read blocks sorted by mtime
489   #  Cache window vs % free space
490   #  Collections which candidates will appear in
491   #  Youngest underreplicated read blocks that appear in collections.
492   #  Report Collections that have blocks which are missing from (or
493   #   underreplicated in) keep.
494
495
496 # This is the main flow here
497
498 parser = argparse.ArgumentParser(description='Report on keep disks.')
499 """The command line argument parser we use.
500
501 We only use it in the __main__ block, but leave it outside the block
502 in case another package wants to use it or customize it by specifying
503 it as a parent to their commandline parser.
504 """
505 parser.add_argument('-m',
506                     '--max-api-results',
507                     type=int,
508                     default=5000,
509                     help=('The max results to get at once.'))
510 parser.add_argument('-p',
511                     '--port',
512                     type=int,
513                     default=9090,
514                     help=('The port number to serve on. 0 means no server.'))
515 parser.add_argument('-v',
516                     '--verbose',
517                     help='increase output verbosity',
518                     action='store_true')
519 parser.add_argument('-u',
520                     '--uuid',
521                     help='uuid of specific collection to process')
522 parser.add_argument('--require-admin-user',
523                     action='store_true',
524                     default=True,
525                     help='Fail if the user is not an admin [default]')
526 parser.add_argument('--no-require-admin-user',
527                     dest='require_admin_user',
528                     action='store_false',
529                     help=('Allow users without admin permissions with '
530                           'only a warning.'))
531 parser.add_argument('--log-to-workbench',
532                     action='store_true',
533                     default=False,
534                     help='Log findings to workbench')
535 parser.add_argument('--no-log-to-workbench',
536                     dest='log_to_workbench',
537                     action='store_false',
538                     help='Don\'t log findings to workbench [default]')
539 parser.add_argument('--user-storage-log-event-type',
540                     default='user-storage-report',
541                     help=('The event type to set when logging user '
542                           'storage usage to workbench.'))
543 parser.add_argument('--block-age-free-space-histogram-log-event-type',
544                     default='block-age-free-space-histogram',
545                     help=('The event type to set when logging user '
546                           'storage usage to workbench.'))
547 parser.add_argument('--garbage-collection-file',
548                     default='',
549                     help=('The file to write a garbage collection report, or '
550                           'leave empty for no report.'))
551
552 args = None
553
554 # TODO(misha): Think about moving some of this to the __main__ block.
555 log = logging.getLogger('arvados.services.datamanager')
556 stderr_handler = logging.StreamHandler()
557 log.setLevel(logging.INFO)
558 stderr_handler.setFormatter(
559   logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
560 log.addHandler(stderr_handler)
561
562 # Global Data - don't try this at home
563 collection_uuids = []
564
565 # These maps all map from uuids to a set of uuids
566 block_to_collections = defaultdict(set)  # keep blocks
567 reader_to_collections = defaultdict(set)  # collection(s) for which the user has read access
568 persister_to_collections = defaultdict(set)  # collection(s) which the user has persisted
569 block_to_readers = defaultdict(set)
570 block_to_persisters = defaultdict(set)
571 block_to_persister_replication = defaultdict(maxdict)
572 reader_to_blocks = defaultdict(set)
573 persister_to_blocks = defaultdict(set)
574
575 UNWEIGHTED_READ_SIZE_COL = 0
576 WEIGHTED_READ_SIZE_COL = 1
577 UNWEIGHTED_PERSIST_SIZE_COL = 2
578 WEIGHTED_PERSIST_SIZE_COL = 3
579 NUM_COLS = 4
580 user_to_usage = defaultdict(lambda : [0,]*NUM_COLS)
581
582 keep_servers = []
583 keep_blocks = []
584 keep_stats = []
585 total_keep_space = 0
586 free_keep_space =  0
587
588 block_to_replication = defaultdict(lambda: 0)
589 block_to_latest_mtime = maxdict()
590
591 garbage_collection_report = []
592 """A list of non-persisted blocks, sorted by increasing mtime
593
594 Each entry is of the form (block uuid, latest mtime, disk size,
595 cumulative size)
596
597 * block uuid: The id of the block we want to delete
598 * latest mtime: The latest mtime of the block across all keep servers.
599 * disk size: The total disk space used by this block (block size
600 multiplied by current replication level)
601 * cumulative disk size: The sum of this block's disk size and all the
602 blocks listed above it
603 * disk free: The proportion of our disk space that would be free if we
604 deleted this block and all the above. So this is (free disk space +
605 cumulative disk size) / total disk capacity
606 """
607
608 garbage_collection_histogram = []
609 """ Shows the tradeoff of keep block age vs keep disk free space.
610
611 Each entry is of the form (Disk Proportion, mtime).
612
613 An entry of the form (0.52, 1388747781) means that if we deleted the
614 oldest non-presisted blocks until we had 52% of the disk free, then
615 all blocks with an mtime greater than 1388747781 would be preserved.
616 """
617
618 # Stuff to report on
619 blocks_not_in_any_collections = set()
620 underreplicated_persisted_blocks = set()
621 overreplicated_persisted_blocks = set()
622
623 all_data_loaded = False
624
625 def loadAllData():
626   checkUserIsAdmin()
627
628   log.info('Building Collection List')
629   global collection_uuids
630   collection_uuids = filter(None, [extractUuid(candidate)
631                                    for candidate in buildCollectionsList()])
632
633   log.info('Reading Collections')
634   readCollections(collection_uuids)
635
636   if args.verbose:
637     pprint.pprint(CollectionInfo.all_by_uuid)
638
639   log.info('Reading Links')
640   readLinks()
641
642   reportMostPopularCollections()
643
644   log.info('Building Maps')
645   buildMaps()
646
647   reportBusiestUsers()
648
649   log.info('Getting Keep Servers')
650   global keep_servers
651   keep_servers = getKeepServers()
652
653   print keep_servers
654
655   log.info('Getting Blocks from each Keep Server.')
656   global keep_blocks
657   keep_blocks = getKeepBlocks(keep_servers)
658
659   log.info('Getting Stats from each Keep Server.')
660   global keep_stats, total_keep_space, free_keep_space
661   keep_stats = getKeepStats(keep_servers)
662
663   total_keep_space = sum(map(itemgetter(0), keep_stats))
664   free_keep_space = sum(map(itemgetter(1), keep_stats))
665
666   # TODO(misha): Delete this hack when the keep servers are fixed!
667   # This hack deals with the fact that keep servers report each other's disks.
668   total_keep_space /= len(keep_stats)
669   free_keep_space /= len(keep_stats)
670
671   log.info('Total disk space: %s, Free disk space: %s (%d%%).' %
672            (fileSizeFormat(total_keep_space),
673             fileSizeFormat(free_keep_space),
674             100*free_keep_space/total_keep_space))
675
676   computeReplication(keep_blocks)
677
678   log.info('average replication level is %f',
679            (float(sum(block_to_replication.values())) /
680             len(block_to_replication)))
681
682   computeGarbageCollectionCandidates()
683
684   if args.garbage_collection_file:
685     log.info('Writing garbage Collection report to %s',
686              args.garbage_collection_file)
687     outputGarbageCollectionReport(args.garbage_collection_file)
688
689   global garbage_collection_histogram
690   garbage_collection_histogram = computeGarbageCollectionHistogram()
691
692   if args.log_to_workbench:
693     logGarbageCollectionHistogram()
694
695   detectReplicationProblems()
696
697   computeUserStorageUsage()
698   printUserStorageUsage()
699   if args.log_to_workbench:
700     logUserStorageUsage()
701
702   global all_data_loaded
703   all_data_loaded = True
704
705
706 class DataManagerHandler(BaseHTTPRequestHandler):
707   USER_PATH = 'user'
708   COLLECTION_PATH = 'collection'
709   BLOCK_PATH = 'block'
710
711   def userLink(self, uuid):
712     return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
713             {'uuid': uuid,
714              'path': DataManagerHandler.USER_PATH})
715
716   def collectionLink(self, uuid):
717     return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
718             {'uuid': uuid,
719              'path': DataManagerHandler.COLLECTION_PATH})
720
721   def blockLink(self, uuid):
722     return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
723             {'uuid': uuid,
724              'path': DataManagerHandler.BLOCK_PATH})
725
726   def writeTop(self, title):
727     self.wfile.write('<HTML><HEAD><TITLE>%s</TITLE></HEAD>\n<BODY>' % title)
728
729   def writeBottom(self):
730     self.wfile.write('</BODY></HTML>\n')
731
732   def writeHomePage(self):
733     self.send_response(200)
734     self.end_headers()
735     self.writeTop('Home')
736     self.wfile.write('<TABLE>')
737     self.wfile.write('<TR><TH>user'
738                      '<TH>unweighted readable block size'
739                      '<TH>weighted readable block size'
740                      '<TH>unweighted persisted block size'
741                      '<TH>weighted persisted block size</TR>\n')
742     for user, usage in user_to_usage.items():
743       self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
744                        (self.userLink(user),
745                         fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
746                         fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
747                         fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
748                         fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
749     self.wfile.write('</TABLE>\n')
750     self.writeBottom()
751
752   def userExists(self, uuid):
753     # Currently this will return false for a user who exists but
754     # doesn't appear on any manifests.
755     # TODO(misha): Figure out if we need to fix this.
756     return user_to_usage.has_key(uuid)
757
758   def writeUserPage(self, uuid):
759     if not self.userExists(uuid):
760       self.send_error(404,
761                       'User (%s) Not Found.' % cgi.escape(uuid, quote=False))
762     else:
763       # Here we assume that since a user exists, they don't need to be
764       # html escaped.
765       self.send_response(200)
766       self.end_headers()
767       self.writeTop('User %s' % uuid)
768       self.wfile.write('<TABLE>')
769       self.wfile.write('<TR><TH>user'
770                        '<TH>unweighted readable block size'
771                        '<TH>weighted readable block size'
772                        '<TH>unweighted persisted block size'
773                        '<TH>weighted persisted block size</TR>\n')
774       usage = user_to_usage[uuid]
775       self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
776                        (self.userLink(uuid),
777                         fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
778                         fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
779                         fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
780                         fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
781       self.wfile.write('</TABLE>\n')
782       self.wfile.write('<P>Persisting Collections: %s\n' %
783                        ', '.join(map(self.collectionLink,
784                                      persister_to_collections[uuid])))
785       self.wfile.write('<P>Reading Collections: %s\n' %
786                        ', '.join(map(self.collectionLink,
787                                      reader_to_collections[uuid])))
788       self.writeBottom()
789
790   def collectionExists(self, uuid):
791     return CollectionInfo.all_by_uuid.has_key(uuid)
792
793   def writeCollectionPage(self, uuid):
794     if not self.collectionExists(uuid):
795       self.send_error(404,
796                       'Collection (%s) Not Found.' % cgi.escape(uuid, quote=False))
797     else:
798       collection = CollectionInfo.get(uuid)
799       # Here we assume that since a collection exists, its id doesn't
800       # need to be html escaped.
801       self.send_response(200)
802       self.end_headers()
803       self.writeTop('Collection %s' % uuid)
804       self.wfile.write('<H1>Collection %s</H1>\n' % uuid)
805       self.wfile.write('<P>Total size %s (not factoring in replication).\n' %
806                        fileSizeFormat(collection.byteSize()))
807       self.wfile.write('<P>Readers: %s\n' %
808                        ', '.join(map(self.userLink, collection.reader_uuids)))
809
810       if len(collection.persister_replication) == 0:
811         self.wfile.write('<P>No persisters\n')
812       else:
813         replication_to_users = defaultdict(set)
814         for user,replication in collection.persister_replication.items():
815           replication_to_users[replication].add(user)
816         replication_levels = sorted(replication_to_users.keys())
817
818         self.wfile.write('<P>%d persisters in %d replication level(s) maxing '
819                          'out at %dx replication:\n' %
820                          (len(collection.persister_replication),
821                           len(replication_levels),
822                           replication_levels[-1]))
823
824         # TODO(misha): This code is used twice, let's move it to a method.
825         self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
826                          '<TH>'.join(['Replication Level ' + str(x)
827                                       for x in replication_levels]))
828         self.wfile.write('<TR>\n')
829         for replication_level in replication_levels:
830           users = replication_to_users[replication_level]
831           self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(
832               map(self.userLink, users)))
833         self.wfile.write('</TR></TABLE>\n')
834
835       replication_to_blocks = defaultdict(set)
836       for block in collection.block_uuids:
837         replication_to_blocks[block_to_replication[block]].add(block)
838       replication_levels = sorted(replication_to_blocks.keys())
839       self.wfile.write('<P>%d blocks in %d replication level(s):\n' %
840                        (len(collection.block_uuids), len(replication_levels)))
841       self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
842                        '<TH>'.join(['Replication Level ' + str(x)
843                                     for x in replication_levels]))
844       self.wfile.write('<TR>\n')
845       for replication_level in replication_levels:
846         blocks = replication_to_blocks[replication_level]
847         self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(blocks))
848       self.wfile.write('</TR></TABLE>\n')
849
850
851   def do_GET(self):
852     if not all_data_loaded:
853       self.send_error(503,
854                       'Sorry, but I am still loading all the data I need.')
855     else:
856       # Removing leading '/' and process request path
857       split_path = self.path[1:].split('/')
858       request_type = split_path[0]
859       log.debug('path (%s) split as %s with request_type %s' % (self.path,
860                                                                 split_path,
861                                                                 request_type))
862       if request_type == '':
863         self.writeHomePage()
864       elif request_type == DataManagerHandler.USER_PATH:
865         self.writeUserPage(split_path[1])
866       elif request_type == DataManagerHandler.COLLECTION_PATH:
867         self.writeCollectionPage(split_path[1])
868       else:
869         self.send_error(404, 'Unrecognized request path.')
870     return
871
872 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
873   """Handle requests in a separate thread."""
874
875
876 if __name__ == '__main__':
877   args = parser.parse_args()
878
879   if args.port == 0:
880     loadAllData()
881   else:
882     loader = threading.Thread(target = loadAllData, name = 'loader')
883     loader.start()
884
885     server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
886     server.serve_forever()