import asyncio
import logging
import ssl
from typing import Optional, Tuple, Type, TypeVar, Union
import aiohttp
import certifi
logger = logging.getLogger('freshpointsync.client')
"""Logger for the `freshpointsync.client` package."""
TClient = TypeVar('TClient', bound='ProductDataFetchClient')
[docs]
class ProductDataFetchClient:
"""Asynchronous utility for fetching contents of a specified FreshPoint.cz
web page.
This class wraps an `aiohttp.ClientSession` object and provides additional
features like retries, timeouts, logging, and comprehensive error handling.
"""
BASE_URL = 'https://my.freshpoint.cz'
"""The base URL of the FreshPoint.cz website."""
def __init__(self) -> None:
self._timeout = self._check_timeout(timeout=None)
self._max_retries = self._check_max_retries(max_retries=5)
self._client_session: Optional[aiohttp.ClientSession] = None
self._ssl_context: Optional[ssl.SSLContext] = None
async def __aenter__(self: TClient) -> TClient:
"""Asynchronous context manager entry."""
await self.start_session()
return self
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[object],
) -> None:
"""Asynchronous context manager exit."""
await self.close_session()
[docs]
@classmethod
def get_page_url(cls, location_id: int) -> str:
"""Generate a page URL for a given location ID.
Args:
location_id (int): The ID of the location, as it appears in
the FreshPoint.cz web page URL. For example, in
https://my.freshpoint.cz/device/product-list/296,
the ID is 296.
Returns:
str: The full page URL for the given location ID.
"""
return f'{cls.BASE_URL}/device/product-list/{location_id}'
@staticmethod
def _check_timeout(timeout: object) -> aiohttp.ClientTimeout:
"""Validate the given timeout value and convert it to an
`aiohttp.ClientTimeout` object.
Args:
timeout (Any): The timeout value to be checked.
Returns:
aiohttp.ClientTimeout: The validated client timeout object. If the
timeout value is None, the default `aiohttp.ClientTimeout`
object is returned. If the timeout value is already an instance
of `aiohttp.ClientTimeout`, it is returned as is.
Raises:
ValueError: If the timeout value is invalid.
"""
if timeout is None:
return aiohttp.ClientTimeout()
if isinstance(timeout, (int, float)):
return aiohttp.ClientTimeout(total=float(timeout))
if isinstance(timeout, aiohttp.ClientTimeout):
return timeout
raise ValueError(f'Invalid timeout argument "{timeout}".')
@property
def timeout(self) -> aiohttp.ClientTimeout:
"""Client request timeout."""
return self._timeout
[docs]
def set_timeout(
self, timeout: Optional[Union[aiohttp.ClientTimeout, int, float]]
) -> None:
"""Set the client request timeout.
Args:
timeout (Optional[Union[aiohttp.ClientTimeout, int, float]]):
The timeout value. It can be an `aiohttp.ClientTimeout` object,
an integer or a float representing the total timeout in seconds,
or None for the default `aiohttp.ClientTimeout` timeout.
Raises:
ValueError: If the timeout value is negative or invalid.
"""
self._timeout = self._check_timeout(timeout)
@staticmethod
def _check_max_retries(max_retries: object) -> int:
"""Check if the given max_retries value is valid.
Args:
max_retries (Any): The number of max retries.
Returns:
int: The validated value.
Raises:
ValueError: If the value is not an integer or is less than 1.
"""
if not isinstance(max_retries, int):
raise ValueError('The number of max retries must be an integer.')
if max_retries < 0:
raise ValueError('The number of max retries cannot be negative.')
return max_retries
@property
def max_retries(self) -> int:
"""The maximum number of retries for fetching data."""
return self._max_retries
[docs]
def set_max_retries(self, max_retries: int) -> None:
"""Set the maximum number of retries for the fetching data.
Args:
max_retries (int): The maximum number of retries.
Raises:
TypeError: If the max_retries value is not an integer.
ValueError: If the max_retries value is less than 1.
"""
self._max_retries = self._check_max_retries(max_retries)
@property
def session(self) -> Optional[aiohttp.ClientSession]:
"""The `aiohttp.ClientSession` object used for fetching data."""
return self._client_session
@property
def is_session_closed(self) -> bool:
"""Check if the client session is closed.
Returns:
bool: True if the client session is closed, False otherwise.
"""
return not self._client_session or self._client_session.closed
[docs]
async def start_session(self, force: bool = False) -> None:
"""Start a new `aiohttp` client session and create an SSL context.
If a session is already started, a new session is not created unless
the `force` parameter is set to `True`. If the SSL context is already
created, it is not recreated.
Args:
force (bool, optional): If True, forcefully close the existing
session and start a new one. If no session is started,
this parameter has no effect. Defaults to False.
"""
if not self.is_session_closed and force:
logger.info(
'Closing existing client session for "%s".', self.BASE_URL
)
await self.close_session()
if self.is_session_closed:
logger.info('Starting new client session for "%s".', self.BASE_URL)
self._client_session = aiohttp.ClientSession(base_url=self.BASE_URL)
logger.debug(
'Successfully started client session for "%s".', self.BASE_URL
)
else:
logger.debug(
'Client session for "%s" is already started.', self.BASE_URL
)
if self._ssl_context is None:
logger.debug('Creating SSL context for "%s".', self.BASE_URL)
context = ssl.create_default_context(cafile=certifi.where())
self._ssl_context = context
[docs]
async def set_session(self, session: aiohttp.ClientSession) -> None:
"""Set the client session object. If the previous session
is not closed, it is closed before setting the new one.
Args:
session (aiohttp.ClientSession): The client session to set.
"""
if not isinstance(session, aiohttp.ClientSession):
raise TypeError(
'The session must be an aiohttp.ClientSession object.'
)
if not self.is_session_closed:
await self.close_session()
self._client_session = session
[docs]
async def close_session(self) -> None:
"""Close the `aiohttp` client session if one is open.
If the session is already closed, this method has no effect on
the client session object. If an SSL context has been created,
it is also cleared after closing the session.
"""
if self._client_session:
logger.info('Closing client session for "%s".', self.BASE_URL)
if not self._client_session.closed:
await self._client_session.close()
self._client_session = None
logger.debug(
'Successfully closed client session for "%s".', self.BASE_URL
)
else:
logger.debug(
'Client session for "%s" is already closed.', self.BASE_URL
)
if self._ssl_context is not None:
logger.debug('Clearing SSL context for "%s".', self.BASE_URL)
self._ssl_context = None
def _check_fetch_args(
self, location_id: object, timeout: object, max_retries: object
) -> Tuple[aiohttp.ClientSession, str, aiohttp.ClientTimeout, int]:
"""Check and validate the arguments for fetching data. Note that this
method may raise exceptions via the checks for timeout and max_retries.
Args:
location_id (Any): The ID of the location.
timeout (Any): The timeout value for the request.
max_retries (Any): The maximum number of retries for the request.
Returns:
tuple[aiohttp.ClientSession, str, aiohttp.ClientTimeout, int]:
A tuple containing the client session, relative URL, timeout,
and max retries.
"""
if not self._client_session or self._client_session.closed:
raise ValueError('Client session is not initialized or is closed.')
else:
session = self._client_session
if timeout is None:
timeout = self._timeout
else:
timeout = self._check_timeout(timeout)
if max_retries is None:
max_retries = self._max_retries
else:
max_retries = self._check_max_retries(max_retries)
relative_url = f'/device/product-list/{location_id}'
return session, relative_url, timeout, max_retries
async def _fetch_once(
self,
session: aiohttp.ClientSession,
ssl_context: ssl.SSLContext,
relative_url: str,
timeout: aiohttp.ClientTimeout,
) -> Optional[str]:
"""Fetch data from the specified URL using the provided session and
timeout.
Args:
session (aiohttp.ClientSession): The client session to use for
the request.
ssl_context (ssl.SSLContext): The SSL context to use
for the request.
relative_url (str): The relative URL to fetch data from.
timeout (aiohttp.ClientTimeout): The timeout for the request.
Returns:
Optional[str]: The fetched data as a string,
or None if an error occurred.
"""
try:
async with session.get(
relative_url,
ssl=ssl_context,
timeout=timeout,
allow_redirects=False,
) as response:
logger.debug(
'Server response for "%s": %s %s',
response.url,
response.status,
response.reason,
)
if response.status == 302:
return '' # inexistent page, no need to retry
if response.status != 200:
return None # fetch failed, should retry
return await response.text()
except asyncio.TimeoutError:
logger.warning(
'Timeout occurred while fetching data from "%s%s"',
self.BASE_URL,
relative_url,
)
except aiohttp.ClientConnectionError as exc:
logger.warning(
'Connection error occurred while fetching data from "%s%s": %s',
self.BASE_URL,
relative_url,
exc,
)
except Exception as exc:
exc_type = exc.__class__.__name__
logger.error(
'Exception "%s" occurred while fetching data from "%s%s": %s',
exc_type,
self.BASE_URL,
relative_url,
exc,
)
return None # fetch failed, should retry
[docs]
async def fetch(
self,
location_id: Union[int, str],
timeout: Optional[Union[aiohttp.ClientTimeout, int, float]] = None,
max_retries: Optional[int] = None,
) -> Optional[str]:
"""Fetch HTML data from a FreshPoint.cz web page.
Args:
location_id (Union[int, str]): The ID the FreshPoint location.
timeout (Optional[Union[aiohttp.ClientTimeout, int, float]]):\
The timeout for the request. If None, the default timeout is used.
max_retries (Optional[int]): The maximum number of retries.
If None, the default number of retries is used.
Returns:
Optional[str]: The fetched data as a string,\
or None if the fetch failed.
"""
assert self._ssl_context is not None
args = self._check_fetch_args(location_id, timeout, max_retries)
session, relative_url, timeout, max_retries = args
attempt = 0
while attempt < max_retries:
logger.info(
'Fetching data from "%s%s" (attempt %s of %s)',
self.BASE_URL,
relative_url,
attempt + 1,
max_retries,
)
text = await self._fetch_once(
session, self._ssl_context, relative_url, timeout
)
if text == '': # noqa: PLC1901 # inexistent page
return None
if text is not None:
return text
attempt += 1
if attempt < max_retries:
wait_time: int = 2**attempt # exponential backoff
logger.debug('Retrying in %i seconds...', wait_time)
await asyncio.sleep(wait_time)
return None