Source code for melisa.cache

# Copyright MelisaDev 2022 - Present
# Full MIT License can be found in `LICENSE.txt` at the project root.

from __future__ import annotations

from enum import Enum
from typing import List, Dict, Optional, Any, Union

from .utils.types import UNDEFINED
from .models.guild.guild import Guild, UnavailableGuild
from .models.guild.channel import ChannelType, Channel
from .utils.snowflake import Snowflake


class ChannelsCachingPolicy(Enum):
    """ "Channels caching policy"""

    ALL = "all"
    NONE = "none"
    GUILD_TEXT = 0
    GUILD_VOICE = 2


[docs]class CacheManager: """ """ def __init__( self, *, disabled: bool = False, policies: Dict[str, List[ChannelsCachingPolicy]] = None, auto_unused_attributes: Optional[Dict[Any, List[str]]] = None, ): self.auto_unused_attributes: Dict[Any, List[str]] = ( {} if auto_unused_attributes is not None else auto_unused_attributes ) self._raw_guilds: Dict[str, Any] = {} self._raw_users: Dict[str, Any] = {} self._raw_dm_channels: Dict[str, Any] = {} self._disabled = disabled # Some default values if policies is None: policies = {"channels": [ChannelsCachingPolicy.GUILD_TEXT]} self._policies = policies # We use symlinks to cache guild channels # like we save channel in Guild and save it here # and if you need channel, and you don't know its guild # you can use special method, and it will find it in guild self._channel_symlinks: Dict[str, str] = {}
[docs] def guilds_count(self) -> int: """Cached Guilds Count""" return len(self._raw_guilds)
[docs] def users_count(self) -> int: """Cached Users Count""" return len(self._raw_users)
[docs] def guild_channels_count(self) -> int: """Cached Guild Channels Count""" return len(self._channel_symlinks)
[docs] def total_channels_count(self) -> int: """Total Cached Channel Count""" return len(self._raw_dm_channels) + len(self._channel_symlinks)
def __remove_unused_attributes(self, model, _type): if self.auto_unused_attributes is None: self.auto_unused_attributes = {} unused_attributes = self.auto_unused_attributes.get(_type) if unused_attributes and unused_attributes is not None: unused_attributes = unused_attributes.__dict__.keys() for attr in unused_attributes: model.__delattr__(attr) return model
[docs] def set_guild(self, guild: Optional[Guild] = None): """ Save Guild into cache Parameters ---------- guild: Optional[`~melisa.models.guild.Guild`] Guild to save into cache """ if self._disabled: return if guild is None: return None guild = self.__remove_unused_attributes(guild, Guild) policy = self._policies["channels"] if hasattr(guild, "channels") and ChannelsCachingPolicy.NONE not in policy: channels = guild.channels.values() if ChannelsCachingPolicy.ALL not in policy: policy = [int(x.value) for x in policy] channels = filter( lambda channel: channel.type is not None and int(channel.type) in policy, channels, ) for sym in channels: sym_id = str(sym.id) if self._channel_symlinks.get(sym_id, UNDEFINED) is not UNDEFINED: self._channel_symlinks.pop(sym_id) self._channel_symlinks[sym_id] = str(guild.id) else: if hasattr(guild, "channels"): guild.channels = {} self._raw_guilds.update({str(guild.id): guild}) return guild
[docs] def set_guild_channel(self, channel: Optional[Channel] = None): """ Save Guild into cache Parameters ---------- channel: Optional[`~melisa.models.guild.Channel`] Guild Channel to save into cache """ if self._disabled: return if channel is None: return None channel = self.__remove_unused_attributes( channel, Channel ) # ToDo: add channel type guild = self._raw_guilds.get(str(channel.guild_id), UNDEFINED) channel_id = str(channel.id) if guild != UNDEFINED: if hasattr(guild, "channels"): self._raw_guilds[str(guild.id)].channels.update({channel_id: channel}) self._channel_symlinks.update({channel_id: str(channel.guild_id)}) return channel
[docs] def get_guild_channel(self, channel_id: Union[Snowflake, str, int]): """ Get guild channel from cache Parameters ---------- channel_id: Optional[:class:`~melisa.utils.snowflake.Snowflake`, `str`, `int`] ID of guild channel to get from cache. """ if self._disabled: return None channel_id = str(channel_id) guild_id = self._channel_symlinks.get(channel_id, UNDEFINED) if guild_id == UNDEFINED: return None guild = self.get_guild(guild_id) if guild is None: return None if hasattr(guild, "channels") is False: return None return guild.channels.get(channel_id)
[docs] def get_guild(self, guild_id: Union[Snowflake, str, int]): """ Get guild from cache Parameters ---------- guild_id: Optional[:class:`~melisa.utils.snowflake.Snowflake`, `str`, `int`] ID of guild to get from cache. """ if self._disabled: return None if not isinstance(guild_id, str): guild_id = str(guild_id) return self._raw_guilds.get(guild_id, None)
def _set_none_guilds(self, guilds: List[Dict[str, Any]]) -> None: """ Insert None-Guilds to cache Parameters ---------- guilds: Optional[:class:`~melisa.utils.snowflake.Snowflake`, `str`, `int`] Data of guilds tso insert to the cache """ if self._disabled: return guilds_dict = dict( map( lambda i: (str(i["id"]), UnavailableGuild.from_dict(i)), guilds, ) ) self._raw_guilds.update(guilds_dict) return None
[docs] def remove_guild(self, guild_id: Union[Snowflake, str, int]): """ Remove guild from cache Parameters ---------- guild_id: Optional[:class:`~melisa.utils.snowflake.Snowflake`, `str`, `int`] ID of guild to remove from cache. """ if self._disabled: return if not isinstance(guild_id, str): guild_id = str(guild_id) return self._raw_guilds.pop(guild_id, None)
[docs] def remove_guild_channel(self, channel_id: Union[Snowflake, str, int]): """ Remove guild channel from cache Parameters ---------- channel_id: Optional[:class:`~melisa.utils.snowflake.Snowflake`, `str`, `int`] ID of guild channel to remove from cache. """ if self._disabled: return if not isinstance(channel_id, str): channel_id = str(channel_id) guild_id = str(self._channel_symlinks.pop(channel_id, None)) if guild_id is None: return None guild = self.get_guild(guild_id) if guild is None: return None if hasattr(guild, "channels") is False: return None return guild.channels.pop(channel_id, None)
[docs] def set_guild_channel_last_message_id( self, channel_id: Union[Snowflake, str, int], guild_id: Union[Snowflake, str, int], message_id: Union[Snowflake, str, int], ): """ Set guild channel last message id Parameters ---------- channel_id: Optional[:class:`~melisa.utils.snowflake.Snowflake`, `str`, `int`] ID of channel to set. guild_id: Optional[:class:`~melisa.utils.snowflake.Snowflake`, `str`, `int`] ID of guild to set. message_id: Optional[:class:`~melisa.utils.snowflake.Snowflake`, `str`, `int`] ID of message to set. """ if not isinstance(channel_id, str): channel_id = str(channel_id) if not isinstance(guild_id, str): guild_id = str(guild_id) if not isinstance(message_id, Snowflake): message_id = Snowflake(int(message_id)) guild = self.get_guild(guild_id) if guild is None: return None if hasattr(guild, "channels") is False: return None if guild.channels.get(channel_id) is None: return None channel = self._raw_guilds[guild_id].channels[channel_id] channel.last_message_id = message_id self._raw_guilds[guild_id].channels[channel_id] = channel