Source code for countrydata.spec

# -*- coding: utf-8 -*-

"""Specification base definition for all data fields."""

import abc
import json
import os
import random

from countrydata.data import get_random_a2_code
from countrydata.util.property import abstractproperty, classproperty
from countrydata.data.consts import COUNTRYDATA_FILES_DIRECTORY

[docs]class CountryDataEntity(abc.ABC): length = None max_length = None min_length = None @abstractproperty @classproperty def readable(self): raise NotImplementedError() @abstractproperty @classproperty def ids(self): raise NotImplementedError() @abstractproperty @classproperty def field_path(self): raise NotImplementedError() @abstractproperty @classproperty def type(self): raise NotImplementedError() @abstractproperty @classproperty def null(self): raise NotImplementedError() @abstractproperty @classproperty def assertions(self): raise NotImplementedError() @abstractproperty @classproperty def data(self): raise NotImplementedError() @classproperty def data_provider(self): return self.data["provider"] @classproperty def data_source(self): return self.data["source"] if "source" in self.data else \ self.data["provider"] @classmethod def _insert_properties_assertions(cls): cls.assertions.append( ( lambda value: isinstance(value, cls.type), lambda value: "%s for %s is not of type '%s'" % ( cls.readable, str(value), str(getattr(cls, cls.type)).split("'")[1] ), ) ) if cls.null == False: cls.assertions.append( ( lambda value: value is not None, lambda value: "%s for %s is null" % ( cls.readable, str(value)), ) ) if hasattr(cls, "length") and getattr(cls, "length") is not None: cls.assertions.append( ( lambda value: len(str(value)) == cls.length, lambda value: "%s for %s is not %d characters long" % ( cls.readable, str(value), cls.length), ), ) if hasattr(cls, "min_length") and getattr(cls, "min_length") is not None: cls.assertions.append( ( lambda value: len(str(value)) >= cls.min_length, lambda value: "%s for %s is lower than %d characters long" % ( cls.readable, str(value), cls.min_length), ), ) if hasattr(cls, "max_length") and getattr(cls, "max_length") is not None: cls.assertions.append( ( lambda value: len(str(value)) <= cls.max_length, lambda value: "%s for %s is greater than %d characters long" % ( cls.readable, str(value), cls.max_length), ), )
[docs] @classmethod def test(cls, value): cls._insert_properties_assertions() for assert_func, msg_error in cls.assertions: if isinstance(msg_error, str): try: assert assert_func(value) except AssertionError: raise AssertionError(msg_error % value) else: try: assert assert_func(value) except AssertionError: raise AssertionError(msg_error(value))
[docs] @classmethod def pytest(cls, value): cls._insert_properties_assertions() for assert_func, msg_error in cls.assertions: if isinstance(msg_error, str): assert assert_func(value), msg_error % value else: assert assert_func(value), msg_error(value)
@classproperty def category_dirpath(cls): return os.path.join( COUNTRYDATA_FILES_DIRECTORY, *cls.categories_chain) @classproperty def categories_chain(cls): return cls.field_path.split(":")[0].split("/") @classproperty def groups_chain(cls): return cls.field_path.split(":")[1].split(".")[:1]
[docs] @classmethod def filepath(cls, value): if "a2" in cls.ids: return os.path.join( cls.category_dirpath, "%s.json" % value.lower()) index_dirpath = os.path.join(COUNTRYDATA_FILES_DIRECTORY, "_index") index_filename = "%s-%s.json" % (cls.ids[0], "a2") index_filepath = os.path.join(index_dirpath, index_filename) with open(index_filepath, "r") as f: indexes = json.loads(f.read()) return os.path.join( cls.category_dirpath, "%s.json" % indexes[str(value)].lower())
[docs] @classmethod def random_value(cls): if "a2" in cls.ids: return get_random_a2_code() index_dirpath = os.path.join(COUNTRYDATA_FILES_DIRECTORY, "_index") index_filename = "%s-%s.json" % ("a2", cls.ids[0]) index_filepath = os.path.join(index_dirpath, index_filename) with open(index_filepath, "r") as f: indexes = json.loads(f.read()) return cls.type(random.choice(list(indexes.values())))
@classproperty def valid_field_names(cls): response = [] for field_id in cls.ids: response.extend([ field_id.lower(), field_id.upper(), cls.readable, cls.__name__ ]) return response