Skip to content

DiskCache

from pulsefire.caches import DiskCache 

Bases: BaseCache

Disk Cache.

Requires diskcache installed. This cache lives on disk, meaning cheaper storage but slower access.

Example:

DiskCache() # Cache on tmp
DiskCache("folder") # Cache on folder/

Parameters:

Name Type Description Default
directory str | None

Cache directory, uses tmp if None.

None
shards int

Number of shards to distribute writes.

8
serializer

Serializer package supporting loads and dumps.

pickle
Source code in pulsefire/caches.py
class DiskCache(BaseCache):
    """Disk Cache.

    Requires `diskcache` installed. This cache lives on disk, meaning cheaper storage but slower access.

    Example:
    ```python
    DiskCache() # Cache on tmp
    DiskCache("folder") # Cache on folder/
    ```

    Parameters:
        directory: Cache directory, uses tmp if None.
        shards: Number of shards to distribute writes.
        serializer: Serializer package supporting `loads` and `dumps`.
    """

    def __init__(self, directory: str | None = None, shards: int = 8, serializer=pickle) -> None:
        import diskcache
        self.directory = directory
        self.serializer = serializer
        self.cache = diskcache.FanoutCache(directory, shards)

    @sync_to_async()
    def get[T](self, key: str) -> T:
        value = self.cache.get(key)
        if value is None:
            raise KeyError(key)
        return self.serializer.loads(value)

    @sync_to_async()
    def set(self, key: str, value: Any, ttl: float) -> None:
        if ttl <= 0:
            return
        if math.isinf(ttl):
            ttl = None
        self.cache.set(key, self.serializer.dumps(value), ttl)

    @sync_to_async()
    def clear(self) -> None:
        self.cache.clear()
Attributes
directory instance-attribute
directory = directory
serializer instance-attribute
serializer = serializer
cache instance-attribute
cache = FanoutCache(directory, shards)
Functions
__init__
__init__(
    directory: str | None = None, shards: int = 8, serializer=pickle
) -> None
Source code in pulsefire/caches.py
def __init__(self, directory: str | None = None, shards: int = 8, serializer=pickle) -> None:
    import diskcache
    self.directory = directory
    self.serializer = serializer
    self.cache = diskcache.FanoutCache(directory, shards)
get
get(key: str) -> T
Source code in pulsefire/caches.py
@sync_to_async()
def get[T](self, key: str) -> T:
    value = self.cache.get(key)
    if value is None:
        raise KeyError(key)
    return self.serializer.loads(value)
set
set(key: str, value: Any, ttl: float) -> None
Source code in pulsefire/caches.py
@sync_to_async()
def set(self, key: str, value: Any, ttl: float) -> None:
    if ttl <= 0:
        return
    if math.isinf(ttl):
        ttl = None
    self.cache.set(key, self.serializer.dumps(value), ttl)
clear
clear() -> None
Source code in pulsefire/caches.py
@sync_to_async()
def clear(self) -> None:
    self.cache.clear()