from typing import TypeVar, Type, Dict, Union, List
import logging
import importlib
import inspect
import copy
from datapipelines import (
DataPipeline,
DataSink,
DataSource,
CompositeDataTransformer,
DataTransformer,
)
from ..data import Region, Platform
T = TypeVar("T")
logging.basicConfig(
format="%(asctime)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
level=logging.WARNING,
)
def create_pipeline(service_configs: Dict, verbose: int = 0) -> DataPipeline:
transformers = []
# Always use the Riot API transformers
from ..transformers import __transformers__ as riotapi_transformer
transformers.extend(riotapi_transformer)
# Add sources / sinks by name from config
services = []
for store_name, config in service_configs.items():
package = config.pop("package", None)
if package is None:
package = "cassiopeia.datastores"
module = importlib.import_module(name=package)
store_cls = getattr(module, store_name)
store = store_cls(**config)
services.append(store)
service_transformers = getattr(module, "__transformers__", [])
transformers.extend(service_transformers)
from ..datastores import Cache, MerakiAnalyticsCDN, LolWikia
# Automatically insert the ghost store if it isn't there
from ..datastores import UnloadedGhostStore
found = False
for datastore in services:
if isinstance(datastore, UnloadedGhostStore):
found = True
break
if not found:
if any(isinstance(service, Cache) for service in services):
# Find the cache and insert the ghost store directly after it
for i, datastore in enumerate(services):
if isinstance(datastore, Cache):
services.insert(i + 1, UnloadedGhostStore())
break
else:
# Insert the ghost store at the beginning of the pipeline
services.insert(0, UnloadedGhostStore())
services.append(MerakiAnalyticsCDN())
services.append(LolWikia())
pipeline = DataPipeline(services, transformers)
# Manually put the cache on the pipeline.
for datastore in services:
if isinstance(datastore, Cache):
pipeline._cache = datastore
break
else:
pipeline._cache = None
if verbose > 0:
for service in services:
print("Service:", service)
if verbose > 1:
if isinstance(service, DataSource):
for p in service.provides:
print(" Provides:", p)
if isinstance(service, DataSink):
for p in service.accepts:
print(" Accepts:", p)
if verbose > 2:
for transformer in transformers:
for t in transformer.transforms.items():
print("Transformer:", t)
print()
return pipeline
def register_transformer_conversion(transformer: DataTransformer, from_type, to_type):
# Find the method that takes a `from_type` and returns a `to_type` and register it
methods = inspect.getmembers(transformer, predicate=inspect.ismethod)
for name, method in methods:
annotations = copy.copy(method.__annotations__)
return_type = annotations.pop("return")
annotations.pop("context", None)
try:
if to_type is return_type and from_type in annotations.values():
transformer.transform.register(from_type, to_type)(
method.__func__
).__get__(transformer, transformer.__class__)
break
except TypeError:
continue
else:
raise RuntimeError(
"Could not find method to register: {} to {} in {}.".format(
from_type, to_type, transformer
)
)
def get_default_config():
return {
"global": {"version_from_match": "patch"},
"plugins": {},
"pipeline": {
"Cache": {},
"DDragon": {},
"RiotAPI": {"api_key": "RIOT_API_KEY"},
},
"logging": {
"print_calls": True,
"print_riot_api_key": False,
"default": "WARNING",
"core": "WARNING",
},
}
[docs]class Settings(object):
def __init__(self, settings):
_defaults = get_default_config()
globals_ = settings.get("global", _defaults["global"])
self.__version_from_match = globals_.get(
"version_from_match", _defaults["global"]["version_from_match"]
) # Valid json values are: "version", "patch", and null
self.__plugins = settings.get("plugins", _defaults["plugins"])
self.__pipeline_args = settings.get("pipeline", _defaults["pipeline"])
self.__pipeline = None # type: DataPipeline
logging_config = settings.get("logging", _defaults["logging"])
self.__default_print_calls = logging_config.get(
"print_calls", _defaults["logging"]["print_calls"]
)
self.__default_print_riot_api_key = logging_config.get(
"print_riot_api_key", _defaults["logging"]["print_riot_api_key"]
)
for name in ["default", "core"]:
logger = logging.getLogger(name)
level = logging_config.get(name, _defaults["logging"][name])
logger.setLevel(level)
for handler in logger.handlers:
handler.setLevel(level)
@property
def pipeline(self) -> DataPipeline:
if self.__pipeline is None:
self.__pipeline = create_pipeline(
service_configs=self.__pipeline_args, verbose=0
)
return self.__pipeline
@property
def version_from_match(self):
return self.__version_from_match
@property
def plugins(self):
return self.__plugins
[docs] def set_riot_api_key(self, key):
from ..datastores.riotapi import RiotAPI
for sources in self.pipeline._sources:
for source in sources:
if isinstance(source, RiotAPI):
source.set_api_key(key)
[docs] def clear_sinks(self, type: Type[T] = None):
types = {type}
if type is not None:
from ..core.common import CoreData, CassiopeiaObject
if issubclass(type, CassiopeiaObject):
for t in type._data_types:
types.add(t)
types.add(t._dto_type)
elif issubclass(type, CoreData):
types.add(type._dto_type)
for sink in self.pipeline._sinks:
for type in types:
sink.clear(type)
[docs] def expire_sinks(self, type: Type[T] = None):
types = {type}
if type is not None:
from ..core.common import CoreData, CassiopeiaObject
if issubclass(type, CassiopeiaObject):
for t in type._data_types:
types.add(t)
types.add(t._dto_type)
elif issubclass(type, CoreData):
types.add(type._dto_type)
for sink in self.pipeline._sinks:
for type in types:
sink.expire(type)