import asyncio
import aiohttp
import certifi
import logging
import ssl
from typing import Any, Optional, Union
logger = logging.getLogger("freshpointsync.client")
"""Logger for the `freshpointsync.client` module."""
[docs]
class ProductDataFetchClient:
"""Asynchronous utility for fetching contents of a specified FreshPoint.cz
web page.
This class wraps an `aiohttp.ClientSession` object and provides additional
features like retries, timeouts, logging, and comprehensive error handling.
"""
BASE_URL = "https://my.freshpoint.cz"
"""The base URL of the FreshPoint.cz website."""
def __init__(
self,
timeout: Optional[Union[aiohttp.ClientTimeout, int, float]] = None,
max_retries: int = 5
) -> None:
self._timeout = self._check_client_timeout(timeout)
self._max_retries = self._check_max_retries(max_retries)
self._client_session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
"""Asynchronous context manager entry."""
await self.start_session()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Asynchronous context manager exit."""
await self.close_session()
[docs]
@classmethod
def get_page_url(cls, location_id: int) -> str:
"""Generate a page URL for a given location ID.
Args:
location_id (int): The ID of the location, as it appears in
the FreshPoint.cz web page URL. For example, in
https://my.freshpoint.cz/device/product-list/296,
the ID is 296.
Returns:
str: The full page URL for the given location ID.
"""
return f'{cls.BASE_URL}/device/product-list/{location_id}'
@property
def session(self) -> Optional[aiohttp.ClientSession]:
"""The `aiohttp` client session object used for fetching data."""
return self._client_session
@property
def is_session_closed(self) -> bool:
"""Check if the client session is closed.
Returns:
bool: True if the client session is closed, False otherwise.
"""
if not self._client_session:
return True
return self._client_session.closed
[docs]
async def set_session(self, session: aiohttp.ClientSession) -> None:
"""Set the client session object. If the previous session
is not closed, it is closed before setting the new one.
Args:
session (aiohttp.ClientSession): The client session to set.
"""
if not self.is_session_closed:
await self.close_session()
self._client_session = session
@property
def timeout(self) -> aiohttp.ClientTimeout:
"""Client request timeout."""
return self._timeout
@staticmethod
def _check_client_timeout(timeout: Any) -> aiohttp.ClientTimeout:
"""Check and validate the client timeout value.
Args:
timeout (Any): The timeout value to be checked.
Returns:
aiohttp.ClientTimeout: The validated client timeout object.
Raises:
ValueError: If the timeout value is negative or invalid.
"""
if timeout is None:
return aiohttp.ClientTimeout()
if isinstance(timeout, aiohttp.ClientTimeout):
return timeout
try:
timeout = float(timeout)
if timeout < 0:
raise ValueError("Timeout cannot be negative.")
return aiohttp.ClientTimeout(total=timeout)
except ValueError as err:
raise ValueError(f'Invalid timeout argument "{timeout}".') from err
[docs]
def set_timeout(
self, timeout: Optional[Union[aiohttp.ClientTimeout, int, float]]
) -> None:
"""Set the client request timeout.
Args:
timeout (Optional[Union[aiohttp.ClientTimeout, int, float]]):
The timeout value. It can be an `aiohttp.ClientTimeout` object,
an integer or a float representing the total timeout in
seconds, or None for the default aiohttp client timeout.
Raises:
ValueError: If the timeout value is negative or invalid.
"""
self._timeout = self._check_client_timeout(timeout)
@property
def max_retries(self) -> int:
"""The maximum number of retries for fetching data."""
return self._max_retries
@staticmethod
def _check_max_retries(max_retries: Any) -> int:
"""Check if the given max_retries value is valid.
Args:
max_retries (Any): The number of max retries.
Returns:
int: The validated max_retries value.
Raises:
TypeError: If the max_retries value is not an integer.
ValueError: If the max_retries value is less than 1.
"""
if not isinstance(max_retries, int):
raise TypeError("The number of max retries must be an integer.")
if max_retries < 1:
raise ValueError("The number of max retries must be positive.")
return max_retries
[docs]
def set_max_retries(self, max_retries: int) -> None:
"""Set the maximum number of retries for the fetching data.
Args:
max_retries (int): The maximum number of retries.
Raises:
TypeError: If the max_retries value is not an integer.
ValueError: If the max_retries value is less than 1.
"""
self._max_retries = self._check_max_retries(max_retries)
[docs]
async def start_session(self) -> None:
"""Start an aiohttp client session if one is not already started."""
if self.is_session_closed:
logger.info('Starting new client session for "%s".', self.BASE_URL)
session = aiohttp.ClientSession(base_url=self.BASE_URL)
self._client_session = session
logger.debug(
'Successfully started client session for "%s".', self.BASE_URL
)
else:
logger.debug(
'Client session for "%s" is already started.', self.BASE_URL
)
[docs]
async def close_session(self) -> None:
"""Close the aiohttp client session if one is open."""
if self.is_session_closed:
logger.debug(
'Client session for "%s" is already closed.', self.BASE_URL
)
else:
logger.info('Closing client session for "%s".', self.BASE_URL)
await self._client_session.close() # type: ignore
logger.debug(
'Successfully closed client session for "%s".', self.BASE_URL
)
def _check_fetch_args(
self, location_id: Any, timeout: Any, max_retries: Any
) -> tuple[aiohttp.ClientSession, str, aiohttp.ClientTimeout, int]:
"""Check and validate the arguments for fetching data. Note that this
method may raise exceptions via the checks for timeout and max_retries.
Args:
location_id (Any): The ID of the location.
timeout (Any): The timeout value for the request.
max_retries (Any): The maximum number of retries for the request.
Returns:
tuple[aiohttp.ClientSession, str, aiohttp.ClientTimeout, int]:
A tuple containing the client session, relative URL, timeout,
and max retries.
"""
if not self._client_session or self._client_session.closed:
raise ValueError('Client session is not initialized or is closed.')
else:
session = self._client_session
if timeout is None:
timeout = self._timeout
else:
timeout = self._check_client_timeout(timeout)
if max_retries is None:
max_retries = self._max_retries
else:
max_retries = self._check_max_retries(max_retries)
relative_url = f'/device/product-list/{location_id}'
return session, relative_url, timeout, max_retries
async def _fetch_once(
self,
session: aiohttp.ClientSession,
ssl_context: ssl.SSLContext,
relative_url: str,
timeout: aiohttp.ClientTimeout
) -> Optional[str]:
"""Fetch data from the specified URL using the provided session and
timeout.
Args:
session (aiohttp.ClientSession): The client session to use for
the request.
ssl_context (ssl.SSLContext): The SSL context to use
for the request.
relative_url (str): The relative URL to fetch data from.
timeout (aiohttp.ClientTimeout): The timeout for the request.
Returns:
Optional[str]: The fetched data as a string,
or None if an error occurred.
"""
try:
async with session.get(
relative_url, ssl_context=ssl_context, timeout=timeout
) as response:
if response.status == 200:
logger.debug(
'Successfully fetched data from "%s"', response.url
)
return await response.text()
else:
logger.error(
'Error occurred while fetching data from "%s": '
'HTTP Status %s', response.url, response.status
)
except asyncio.TimeoutError:
logger.warning(
'Timeout occurred when fetching data from "%s%s"',
self.BASE_URL, relative_url
)
except Exception as exc:
exc_type = exc.__class__.__name__
logger.error(
'Exception "%s" occurred while fetching data from "%s%s": %s',
exc_type, self.BASE_URL, relative_url, str(exc)
)
return None
[docs]
async def fetch(
self,
location_id: Union[int, str],
timeout: Optional[Union[aiohttp.ClientTimeout, int, float]] = None,
max_retries: Optional[int] = None
) -> Optional[str]:
"""Fetch HTML data from a FreshPoint.cz web page.
Args:
location_id (Union[int, str]): The ID the FreshPoint location.
timeout (Optional[Union[aiohttp.ClientTimeout, int, float]]):\
The timeout for the request. If None, the default timeout is used.
max_retries (Optional[int]): The maximum number of retries.
If None, the default number of retries is used.
Returns:
Optional[str]: The fetched data as a string,\
or None if the fetch failed.
"""
args = self._check_fetch_args(location_id, timeout, max_retries)
session, relative_url, timeout, max_retries = args
ssl_context = ssl.create_default_context(cafile=certifi.where())
attempt = 0
while attempt < max_retries:
logger.info(
'Fetching data from "%s%s" (attempt %s of %s)',
self.BASE_URL, relative_url, attempt + 1, max_retries
)
text = await self._fetch_once(
session, ssl_context, relative_url, timeout
)
if text is not None:
return text
attempt += 1
if attempt < max_retries:
wait_time: int = 2 ** attempt # exponential backoff
logger.debug('Retrying in %i seconds...', wait_time)
await asyncio.sleep(wait_time)
return None