| info: | Simple lru cache for asyncio |
|---|
pip install async-lruThis package is a port of Python's built-in functools.lru_cache function for asyncio. To better handle async behaviour, it also ensures multiple concurrent calls will only result in 1 call to the wrapped function, with all awaits receiving the result of that call when it completes.
import asyncio
import aiohttp
from async_lru import alru_cache
@alru_cache(maxsize=32)
async def get_pep(num):
resource = 'http://www.python.org/dev/peps/pep-%04d/' % num
async with aiohttp.ClientSession() as session:
try:
async with session.get(resource) as s:
return await s.read()
except aiohttp.ClientError:
return 'Not Found'
async def main():
for n in 8, 290, 308, 320, 8, 218, 320, 279, 289, 320, 9991:
pep = await get_pep(n)
print(n, len(pep))
print(get_pep.cache_info())
# CacheInfo(hits=3, misses=8, maxsize=32, currsize=8)
# closing is optional, but highly recommended
await get_pep.cache_close()
asyncio.run(main())TTL (time-to-live in seconds, expiration on timeout) is supported by accepting ttl configuration parameter (off by default):
@alru_cache(ttl=5)
async def func(arg):
return arg * 2To prevent thundering herd issues when many cache entries expire simultaneously,
you can add jitter to randomize the TTL for each entry:
@alru_cache(ttl=3600, jitter=1800)
async def func(arg):
return arg * 2With ttl=3600, jitter=1800, each cache entry will have a random TTL
between 3600 and 5400 seconds, spreading out invalidations over time.
The library supports explicit invalidation for specific function call by cache_invalidate():
@alru_cache(ttl=5)
async def func(arg1, arg2):
return arg1 + arg2
func.cache_invalidate(1, arg2=2)The method returns True if corresponding arguments set was cached already, False otherwise.
Event Loop Affinity: alru_cache enforces that a cache instance is used with only
one event loop. If you attempt to use a cached function from a different event loop than
where it was first called, a RuntimeError will be raised:
RuntimeError: alru_cache is not safe to use across event loops: this cache
instance was first used with a different event loop.
Use separate cache instances per event loop.
For typical asyncio applications using a single event loop, this is automatic and requires no configuration. If your application uses multiple event loops, create separate cache instances per loop:
import threading
_local = threading.local()
def get_cached_fetcher():
if not hasattr(_local, 'fetcher'):
@alru_cache(maxsize=100)
async def fetch_data(key):
...
_local.fetcher = fetch_data
return _local.fetcherYou can also reuse the logic of an already decorated function in a new loop by accessing __wrapped__:
@alru_cache(maxsize=32)
async def my_task(x):
...
# In Loop 1:
# my_task() uses the default global cache instance
# In Loop 2 (or a new thread):
# Create a fresh cache instance for the same logic
cached_task_loop2 = alru_cache(maxsize=32)(my_task.__wrapped__)
await cached_task_loop2(x)async-lru uses CodSpeed for performance regression testing.
To run the benchmarks locally:
pip install -r requirements-dev.txt
pytest --codspeed benchmark.pyThe benchmark suite covers both bounded (with maxsize) and unbounded (no maxsize) cache configurations. Scenarios include:
- Cache hit
- Cache miss
- Cache fill/eviction (cycling through more keys than maxsize)
- Cache clear
- TTL expiry
- Cache invalidation
- Cache info retrieval
- Concurrent cache hits
- Baseline (uncached async function)
On CI, benchmarks are run automatically via GitHub Actions on Python 3.13, and results are uploaded to CodSpeed (if a CODSPEED_TOKEN is configured). You can view performance history and detect regressions on the CodSpeed dashboard.
The library was donated by Ocean S.A.
Thanks to the company for contribution.