|
|
@ -1,5 +1,6 @@ |
|
|
|
"Fanout cache automatically shards keys and values." |
|
|
|
|
|
|
|
import contextlib as cl |
|
|
|
import itertools as it |
|
|
|
import operator |
|
|
|
import os.path as op |
|
|
@ -69,9 +70,46 @@ class FanoutCache(object): |
|
|
|
|
|
|
|
|
|
|
|
def __getattr__(self, name): |
|
|
|
safe_names = {'timeout', 'disk'} |
|
|
|
valid_name = name in DEFAULT_SETTINGS or name in safe_names |
|
|
|
assert valid_name, 'cannot access {} in cache shard'.format(name) |
|
|
|
return getattr(self._shards[0], name) |
|
|
|
|
|
|
|
|
|
|
|
@cl.contextmanager |
|
|
|
def transact(self, retry=True): |
|
|
|
"""Context manager to perform a transaction by locking the cache. |
|
|
|
|
|
|
|
While the cache is locked, no other write operation is permitted. |
|
|
|
Transactions should therefore be as short as possible. Read and write |
|
|
|
operations performed in a transaction are atomic. Read operations may |
|
|
|
occur concurrent to a transaction. |
|
|
|
|
|
|
|
Transactions may be nested and may not be shared between threads. |
|
|
|
|
|
|
|
Blocks until transactions are held on all cache shards by retrying as |
|
|
|
necessary. |
|
|
|
|
|
|
|
>>> cache = FanoutCache() |
|
|
|
>>> with cache.transact(): # Atomically increment two keys. |
|
|
|
... _ = cache.incr('total', 123.4) |
|
|
|
... _ = cache.incr('count', 1) |
|
|
|
>>> with cache.transact(): # Atomically calculate average. |
|
|
|
... average = cache['total'] / cache['count'] |
|
|
|
>>> average |
|
|
|
123.4 |
|
|
|
|
|
|
|
:return: context manager for use in `with` statement |
|
|
|
|
|
|
|
""" |
|
|
|
assert retry, 'retry must be True in FanoutCache' |
|
|
|
with cl.ExitStack() as stack: |
|
|
|
for shard in self._shards: |
|
|
|
shard_transaction = shard.transact(retry=True) |
|
|
|
stack.enter_context(shard_transaction) |
|
|
|
yield |
|
|
|
|
|
|
|
|
|
|
|
def set(self, key, value, expire=None, read=False, tag=None, retry=False): |
|
|
|
"""Set `key` and `value` item in cache. |
|
|
|
|
|
|
|