from __future__ import annotations
import asyncio
import logging
from concurrent.futures import ProcessPoolExecutor
from functools import cached_property
from pydantic import BaseModel, ConfigDict, Field
from pydantic.alias_generators import to_camel
from typing import Any, Callable, Iterable, Optional, NamedTuple, Union
from ..client._client import ProductDataFetchClient
from ..parser._parser import (
ProductPageHTMLParser,
ProductFinder,
hash_text,
normalize_text
)
from ..product._product import Product
from ..update._update import (
SafeAsyncTaskRunner,
ProductUpdateEvent,
ProductUpdateEventPublisher,
ProductCacheUpdater,
Handler
)
logger = logging.getLogger('freshpointsync.page')
"""Logger for the `freshpointsync.page` module."""
[docs]
class FetchInfo(NamedTuple):
"""Named tuple for a product page fetch information."""
contents: Optional[str]
"""The fetched contents of the product page."""
contents_hash: Optional[str]
"""The SHA-256 hash of the fetched contents."""
is_updated: bool
"""Flag indicating whether the contents have been updated."""
[docs]
class ProductPageData(BaseModel):
"""Data model of a product page."""
model_config = ConfigDict(
alias_generator=to_camel,
populate_by_name=True,
)
location_id: int = Field(frozen=True)
"""ID of the product location."""
html_hash: str = Field(default='')
"""SHA-256 hash of the HTML contents of the product page."""
products: dict[int, Product] = Field(
default_factory=dict, repr=False, frozen=True
)
"""Dictionary of products' data on the page."""
@cached_property
def url(self) -> str:
"""URL of the product page."""
return ProductDataFetchClient.get_page_url(self.location_id)
@property # not cached because products may be missing upon initialization
def location(self) -> str:
"""Name of the product location. Infers from the first product in
the products dictionary. If the dictionary is empty, returns an empty
string.
"""
for product in self.products.values():
return product.location
return ''
@property # not cached because "location" is not cached
def location_lowercase_ascii(self) -> str:
"""Lowercase ASCII representation of the location name."""
return normalize_text(self.location)
@property
def product_names(self) -> list[str]:
"""List of string product names on the page."""
return [p.name for p in self.products.values() if p.name]
@property
def product_categories(self) -> list[str]:
"""List of string product categories on the page."""
categories = []
for p in self.products.values():
if p.category and p.category not in categories:
categories.append(p.category)
return categories
[docs]
class ProductPage:
"""Product page object that provides methods for fetching, updating, and
managing product data on the page. May be used as an asynchronous context
manager.
"""
def __init__(
self,
location_id: Optional[int] = None,
data: Optional[ProductPageData] = None,
client: Optional[ProductDataFetchClient] = None
) -> None:
"""Initializes a new product page object.
Args:
location_id (Optional[int], optional): ID of the product location
(product page). Defaults to None.
data (Optional[ProductPageData], optional): Data model of the
product page to be used as the initial cached state. Defaults
to None.
client (Optional[ProductDataFetchClient], optional): Client for
fetching product data. Defaults to None.
"""
self._data = self._validate_data(location_id, data)
self._client = client or ProductDataFetchClient()
self._publisher = ProductUpdateEventPublisher()
self._runner = SafeAsyncTaskRunner(executor=None)
self._update_forever_task: Optional[asyncio.Task] = None
self._updater = ProductCacheUpdater(
self._data.products, self._publisher
)
def __str__(self) -> str:
"""String representation of the product page object."""
return self._data.url
def __repr__(self) -> str:
"""String representation of the product page object instantiation."""
cls_name = self.__class__.__name__
return f'{cls_name}(location_id={self._data.location_id})'
async def __aenter__(self):
"""Asynchronous context manager entry."""
await self.start_session()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Asynchronous context manager exit."""
await self.close_session()
await self.cancel_update_handlers()
await self.cancel_update_forever()
@staticmethod
def _validate_data(
location_id: Optional[int] = None,
data: Optional[ProductPageData] = None
) -> ProductPageData:
"""Validates the product page data and location ID.
Args:
location_id (Optional[int], optional): ID of the product location.
Defaults to None.
data (Optional[ProductPageData], optional): Data model of the
product page. Defaults to None.
Raises:
ValueError: If both location ID and data are None or
if location ID provided explicitly does not match the location
ID inferred from the data model.
Returns:
ProductPageData: Validated product page data model.
"""
if data is None:
if location_id is None:
raise ValueError('Location ID is required')
return ProductPageData(location_id=location_id)
if location_id is not None and location_id != data.location_id:
raise ValueError('Location ID mismatch')
return data
@property
def data(self) -> ProductPageData:
"""Product page data model."""
return self._data # copy is not necessary because fields are frozen
@property
def context(self) -> dict[Any, Any]:
"""Product page context data."""
return self._publisher.context
@property
def client(self) -> ProductDataFetchClient:
"""Product data fetch client."""
return self._client
[docs]
async def set_client(self, client: ProductDataFetchClient) -> None:
"""Set the product data fetch client.
This method is asynchronous and closes the current session if there is
an active session.
Args:
client (ProductDataFetchClient): The new product data fetch client.
"""
if not self._client.is_session_closed:
await self.client.close_session()
self._client = client
[docs]
def subscribe_for_update(
self,
handler: Handler,
event: Union[
ProductUpdateEvent, Iterable[ProductUpdateEvent], None
] = None,
call_safe: bool = True,
handler_done_callback: Optional[Callable[[asyncio.Future], Any]] = None
) -> None:
"""Subscribe a handler to specific product update event(s). The handler
will be invoked when the event is posted, with the event context
passed as an argument.
The handler can be an asynchronous function, method, or any callable
object that accepts exactly one argument (a `ProductUpdateContext`
object) and returns `None` or a coroutine that resolves to `None`.
Args:
handler (Handler): The function or callable to invoke for
the event(s).
event (Union[ProductUpdateEvent, Iterable[ProductUpdateEvent],\
None], optional): The type of product update event(s) to
subscribe to. If None, the handler will be subscribed to
all events.
call_safe (bool, optional): If True, exceptions raised by
the handler are caught and logged. If False, exceptions are
propagated and must be handled by the caller. Defaults to True.
handler_done_callback (Optional[Callable[[asyncio.Future], Any]]):
Optional function to be called when the handler completes
execution. Depending on the type of the handler, the callback
receives an `asyncio.Task` or `asyncio.Future` object as its
argument, which represents the return value of the callback
execution. Defaults to None.
Raises:
TypeError: If the handler does not have a valid signature.
"""
self._publisher.subscribe(
handler, event, call_safe, handler_done_callback
)
[docs]
def unsubscribe_from_update(
self,
handler: Optional[Handler] = None,
event: Union[
ProductUpdateEvent, Iterable[ProductUpdateEvent], None
] = None
) -> None:
"""Unsubscribe a handler from specific product update event(s),
or all handlers if no specific handler is provided. The unsubscribed
handler will no longer be invoked when the event is posted.
Args:
handler (Handler): The handler to be unsubscribed from the
event(s). if None, all handlers for the event are unsubscribed.
event (Union[ProductUpdateEvent, Iterable[ProductUpdateEvent],\
None], optional): The type of product update event(s)
to unsubscribe from. If None, the handler(s) will be subscribed
from all events.
"""
self._publisher.unsubscribe(handler, event)
[docs]
def is_subscribed_for_update(
self,
handler: Optional[Handler] = None,
event: Union[
ProductUpdateEvent, Iterable[ProductUpdateEvent], None
] = None
) -> bool:
"""Check if there are any subscribers for the given event(s).
Args:
handler (Optional[Handler], optional): The handler to check for
subscription. If None, all handlers are checked.
event (Union[ProductUpdateEvent, Iterable[ProductUpdateEvent],\
None], optional): The type of product update event(s) to check for
subscribers. If None, all events are checked.
Returns:
bool: True if there are subscribers for the event, False otherwise.
"""
return self._publisher.is_subscribed(handler, event)
[docs]
async def start_session(self) -> None:
"""Start an aiohttp client session if one is not already started."""
await self._client.start_session()
[docs]
async def close_session(self) -> None:
"""Close the aiohttp client session if one is open."""
await self._client.close_session()
await self._runner.cancel_all()
await self.cancel_update_forever()
async def _fetch_contents(self) -> FetchInfo:
"""Fetch the contents of the product page.
Returns:
FetchInfo: Named tuple containing the fetched contents, the hash
of the contents, and a flag indicating whether the contents
have been updated.
"""
is_updated: bool = False
try:
contents = await self._runner.run_async(
self._client.fetch, self._data.location_id
)
except asyncio.CancelledError:
return FetchInfo(None, None, is_updated)
if contents is None:
return FetchInfo(None, None, is_updated)
contents_hash = hash_text(contents)
if contents_hash != self.data.html_hash:
is_updated = True
# do not update the html data hash attribute value here because
# fetching is not supposed to modify the inner state of the page
return FetchInfo(contents, contents_hash, is_updated)
@staticmethod
def _parse_contents_blocking(contents: str) -> list[Product]:
"""Blocking synchronous function to parse the contents of the product
page and extract product data.
Args:
contents (str): The HTML contents of the product page.
Returns:
list[Product]: List of product data extracted from the contents.
"""
return [p for p in ProductPageHTMLParser(contents).products]
async def _parse_contents(self, contents: str) -> list[Product]:
"""Asynchronously parse the contents of the product page and extract
product data. This method is a wrapper around the blocking synchronous
parsing function that run in a way that does not block the event loop.
Args:
contents (str): The HTML contents of the product page.
Returns:
list[Product]: List of product data extracted from the contents.
"""
if not contents:
return []
try:
func = self._parse_contents_blocking
products = await self._runner.run_sync(func, contents)
except asyncio.CancelledError:
return []
return products or []
[docs]
async def fetch(self) -> list[Product]:
"""Fetch the contents of the product page and extract the product data.
This method does not update the internal state of the page, nor does it
trigger any event handlers.
Returns:
list[Product]: List of product data extracted from the contents.
"""
fetch_info = await self._fetch_contents()
if fetch_info.is_updated:
assert fetch_info.contents is not None, 'Invalid contents'
return await self._parse_contents(fetch_info.contents)
return [p for p in self._data.products.values()]
def _update_silently(
self, html_hash: str, products: Iterable[Product]
) -> None:
"""Fetch the contents of the product page, extract the product data,
and update the internal state of the page without triggering any event
handlers.
Args:
html_hash (str): The SHA-256 hash of the HTML contents.
products (Iterable[Product]): Iterable of product data to update.
"""
self.data.html_hash = html_hash
self._updater.update_silently(products)
[docs]
async def update_silently(self) -> None:
"""Fetch the contents of the product page, extract the product data,
and update the internal state of the page without triggering any event
handlers.
"""
fetch_info = await self._fetch_contents()
if fetch_info.is_updated:
assert fetch_info.contents is not None, 'Invalid contents'
assert fetch_info.contents_hash is not None, 'Invalid hash'
products = await self._parse_contents(fetch_info.contents)
self._update_silently(fetch_info.contents_hash, products)
async def _update(
self,
html_hash: str,
products: Iterable[Product],
await_handlers: bool = False,
**kwargs: Any
) -> None:
"""Fetch the contents of the product page, extract the product data,
update the internal state of the page, and trigger event handlers.
Args:
html_hash (str): The SHA-256 hash of the HTML contents.
products (Iterable[Product]): Iterable of product data to update.
await_handlers (bool, optional): If True, the method will wait for
all event handlers to complete execution. Defaults to False.
"""
self.data.html_hash = html_hash
await self._updater.update(products, await_handlers, **kwargs)
[docs]
async def update(
self, await_handlers: bool = False, **kwargs: Any
) -> None:
"""Fetch the contents of the product page, extract the product data,
update the internal state of the page, and trigger event handlers.
Args:
await_handlers (bool, optional): If True, the method will wait for
all event handlers to complete execution. Defaults to False.
"""
fetch_info = await self._fetch_contents()
if fetch_info.is_updated:
assert fetch_info.contents is not None, 'Invalid contents'
assert fetch_info.contents_hash is not None, 'Invalid hash'
products = await self._parse_contents(fetch_info.contents)
await self._update(
fetch_info.contents_hash, products, await_handlers, **kwargs
)
[docs]
async def update_forever(
self,
interval: float = 10.0,
await_handlers: bool = False,
**kwargs: Any
) -> None:
"""Update the product page at regular intervals.
This method is a coroutine that runs indefinitely, updating
the product page at regular intervals.
Args:
interval (float, optional): The time interval in seconds between
updates. Defaults to 10.0.
await_handlers (bool, optional): If True, the method will wait for
all event handlers to complete execution. Defaults to False.
"""
while True:
try:
await self.update(await_handlers, **kwargs)
except asyncio.CancelledError:
break
await asyncio.sleep(interval)
[docs]
def init_update_forever_task(
self,
interval: float = 10.0,
await_handlers: bool = False,
**kwargs: Any
) -> asyncio.Task:
"""Initialize the update forever task. If the task is already running,
the method does nothing.
This method is not a coroutine. It creates a new task from
the `update_forever` coroutine with the `asyncio.create_task` function.
The task is stored internally and can be cancelled with
the `cancel_update_forever` method.
Args:
interval (float, optional): The time interval in seconds between
updates. Defaults to 10.0.
await_handlers (bool, optional): If True, the method will wait for
all event handlers to complete execution. Defaults to False.
Returns:
asyncio.Task: The task object created by `asyncio.create_task`.
"""
task = self._update_forever_task
if task is None or task.done():
task = asyncio.create_task(
self.update_forever(interval, await_handlers, **kwargs)
)
self._update_forever_task = task
return task
[docs]
async def await_update_handlers(self) -> None:
"""Wait for all event handlers to complete execution."""
await self._runner.await_all()
[docs]
async def cancel_update_handlers(self) -> None:
"""Cancel all running event handlers."""
await self._runner.cancel_all()
[docs]
async def cancel_update_forever(self) -> None:
"""Cancel the update forever task if it is running."""
if self._update_forever_task:
if not self._update_forever_task.done():
task = self._update_forever_task
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
self._update_forever_task = None
def _find_product_by_id(
self,
constraint: Optional[Callable[[Product], bool]] = None,
**attributes
) -> Optional[Product]:
"""Find a product by ID.
Args:
constraint (Optional[Callable[[Product], bool]], optional): Optional
constraint function to filter products. Defaults to None.
Returns:
Optional[Product]: The product object if found, None otherwise.
"""
product_id = attributes['product_id']
product = self.data.products.get(product_id)
if product is None:
return None
if ProductFinder.product_matches(product, constraint, **attributes):
return product
return None
[docs]
def find_product(
self,
constraint: Optional[Callable[[Product], bool]] = None,
**attributes
) -> Optional[Product]:
"""Find a product on the page that matches the specified attributes.
Attributes are specific product state information and should match
the product data model fields, such as `product_id`, `name`, `category`,
etc. A constraint function can be provided to filter products based on
additional criteria or more complex conditions.
Args:
constraint (Optional[Callable[[Product], bool]], optional): Optional
function that takes a `Product` instance as input and returns
a boolean indicating whether a certain constraint is met for
this instance.
Returns:
Optional[Product]: The product object if found, None otherwise.
"""
if 'product_id' in attributes: # optimization for product ID lookup
return self._find_product_by_id(constraint, **attributes)
return ProductFinder.find_product(
self.data.products.values(), constraint, **attributes
)
[docs]
def find_products(
self,
constraint: Optional[Callable[[Product], bool]] = None,
**attributes
) -> list[Product]:
"""Find products on the page that match the specified attributes.
Attributes are specific product state information and should match
the product data model fields, such as `product_id`, `name`, `category`,
etc. A constraint function can be provided to filter products based on
additional criteria or more complex conditions.
Args:
constraint (Optional[Callable[[Product], bool]], optional): Optional
function that takes a `Product` instance as input and returns
a boolean indicating whether a certain constraint is met for
this instance.
Returns:
list[Product]: List of product objects that match the specified
attributes.
"""
if 'product_id' in attributes: # optimization for product ID lookup
product = self._find_product_by_id(constraint, **attributes)
if product is None:
return []
return [product]
return ProductFinder.find_products(
self.data.products.values(), constraint, **attributes
)
[docs]
class ProductPageHubData(BaseModel):
"""Data model of a product page hub."""
model_config = ConfigDict(
alias_generator=to_camel,
populate_by_name=True,
)
pages: dict[int, ProductPageData] = Field(
default_factory=dict, repr=False, frozen=True
)
"""Dictionary of product page data models."""
[docs]
class ProductPageHub:
"""Product page hub object that provides methods for managing multiple
product pages at once. Each page retains its own state and can be accessed
individually. Page data updates are done in parallel using asyncio tasks to
optimize performance. May be used as an asynchronous context manager.
"""
def __init__(
self,
data: Optional[ProductPageHubData] = None,
client: Optional[ProductDataFetchClient] = None,
enable_multiprocessing: bool = False
) -> None:
"""Initializes a new product page hub object.
Args:
data (Optional[ProductPageHubData], optional): Data model of
the product page hub to be used as the initial cached state.
client (Optional[ProductDataFetchClient], optional): Client for
fetching product data. Defaults to None.
enable_multiprocessing (bool, optional): If True, multiprocessing
is enabled for parsing product data. The parsing is then done
in a `ProcessPoolExecutor` instead of the default
`TheadPoolExecutor`. While this may improve startup performance,
it should be used with caution. See the `concurrent.futures`
documentation for more information. Defaults to False.
"""
self._client = client or ProductDataFetchClient()
self._data = data or ProductPageHubData()
self._pages: dict[int, ProductPage] = {
page_id: ProductPage(data=page_data, client=self._client)
for page_id, page_data in self._data.pages.items()
}
self._publisher = ProductUpdateEventPublisher()
executor = ProcessPoolExecutor() if enable_multiprocessing else None
self._runner = SafeAsyncTaskRunner(executor=executor)
self._update_forever_task: Optional[asyncio.Task] = None
def __str__(self) -> str:
"""String representation of the product page hub object."""
page_ids = ", ".join(str(pid) for pid in self._pages.keys())
return f'ProductPageHub for pages: {page_ids}'
async def __aenter__(self):
"""Asynchronous context manager entry."""
await self.start_session()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Asynchronous context manager exit."""
await self.close_session()
await self.await_update_handlers()
@property
def data(self) -> ProductPageHubData:
"""Product page hub data model."""
return self._data # copy is not necessary because fields are frozen
@property
def client(self) -> ProductDataFetchClient:
"""Product data fetch client."""
return self._client
[docs]
async def set_client(self, client: ProductDataFetchClient) -> None:
"""Set the product data fetch client.
This method is asynchronous and closes the current session if there is
an active session.
Args:
client (ProductDataFetchClient): The new product data fetch client.
"""
if not self._client.is_session_closed:
await self.client.close_session()
self._client = client
for page in self._pages.values():
page._client = client
[docs]
def subscribe_for_update(
self,
handler: Handler,
event: Union[
ProductUpdateEvent, Iterable[ProductUpdateEvent], None
] = None,
call_safe: bool = True,
handler_done_callback: Optional[Callable[[asyncio.Future], Any]] = None
) -> None:
"""Subscribe a handler to specific product update event(s) for all
pages in the hub. The handler will be invoked when the event is posted,
with the event context passed as an argument.
The handler can be an asynchronous function, method, or any callable
object that accepts exactly one argument (a `ProductUpdateContext`
object) and returns `None` or a coroutine that resolves to `None`.
Args:
handler (Handler): The function or callable to invoke for
the event(s).
event (Union[ProductUpdateEvent, Iterable[ProductUpdateEvent],\
None], optional): The type of product update event(s) to
subscribe to. If None, the handler will be subscribed to
all events.
call_safe (bool, optional): If True, exceptions raised by
the handler are caught and logged. If False, exceptions are
propagated and must be handled by the caller. Defaults to True.
handler_done_callback (Optional[Callable[[asyncio.Future], Any]]):
Optional function to be called when the handler completes
execution. Depending on the type of the handler, the callback
receives an `asyncio.Task` or `asyncio.Future` object as its
argument, which represents the return value of the callback
execution. Defaults to None.
Raises:
TypeError: If the handler does not have a valid signature.
"""
self._publisher.subscribe(
handler, event, call_safe, handler_done_callback
) # will not be directly invoked upon page updates
for page in self._pages.values():
page.subscribe_for_update(
handler, event, call_safe, handler_done_callback
)
[docs]
def unsubscribe_from_update(
self,
handler: Optional[Handler] = None,
event: Union[
ProductUpdateEvent, Iterable[ProductUpdateEvent], None
] = None
) -> None:
"""Unsubscribe a handler from specific product update event(s) for all
pages in the hub, or all handlers if no specific handler is provided.
The unsubscribed handler will no longer be invoked when the event is
posted.
Args:
handler (Handler): The handler to be unsubscribed from the
event(s). if None, all handlers for the event are unsubscribed.
event (Union[ProductUpdateEvent, Iterable[ProductUpdateEvent],\
None], optional): The type of product update event(s)
to unsubscribe from. If None, the handler(s) will be subscribed
from all events.
"""
self._publisher.unsubscribe(handler, event)
for page in self._pages.values():
page.unsubscribe_from_update(handler, event)
[docs]
def is_subscribed_for_update(
self,
handler: Optional[Handler] = None,
event: Union[
ProductUpdateEvent, Iterable[ProductUpdateEvent], None
] = None
) -> bool:
"""Check if there are any subscribers for the given event(s) for any
page in the hub.
Args:
handler (Optional[Handler], optional): The handler to check for
subscription. If None, all handlers are checked.
event (Union[ProductUpdateEvent, Iterable[ProductUpdateEvent],\
None], optional): The type of product update event(s) to check for
subscribers. If None, all events are checked.
Returns:
bool: True if there are subscribers for the event, False otherwise.
"""
if self._publisher.is_subscribed(handler, event):
return True
for page in self._pages.values():
if page.is_subscribed_for_update(handler, event):
return True
return False
[docs]
def set_context(self, key: Any, value: Any) -> None:
"""Set a context key-value pair for all pages in the hub.
Args:
key (Any): Context key.
value (Any): Context value.
"""
self._publisher.context[key] = value
for page in self._pages.values():
page.context[key] = value
[docs]
def del_context(self, key: Any) -> None:
"""Delete a context key-value pair for all pages in the hub. If the key
does not exist in the page context, the method does nothing.
Args:
key (Any): Context key.
"""
self._publisher.context.pop(key, None)
for page in self._pages.values():
page.context.pop(key, None)
[docs]
async def start_session(self) -> None:
"""Start an aiohttp client session if one is not already started."""
await self._client.start_session()
[docs]
async def close_session(self) -> None:
"""Close the aiohttp client session if one is open."""
await self._client.close_session()
await self.cancel_update_handlers()
if self._runner.executor:
self._runner.executor.shutdown(wait=True)
for page in self._pages.values():
await page.cancel_update_forever()
async def _register_page(
self,
page: ProductPage,
update_contents: bool,
trigger_handlers: bool = False
) -> None:
"""Register a new product page in the hub.
Args:
page (ProductPage): The product page object to register.
update_contents (bool): If True, the page contents are fetched and
updated. If False, the page contents are not fetched.
trigger_handlers (bool, optional): If True, the event handlers are
triggered after the page is updated. Defaults to False.
"""
self._data.pages[page.data.location_id] = page.data
self._pages[page.data.location_id] = page
# add common handlers
pub = self._publisher
for subscribers in (pub.sync_subscribers, pub.async_subscribers):
assert isinstance(subscribers, dict), 'Invalid subscribers type'
for event, handlers_list in subscribers.items():
for handler_data in handlers_list:
page.subscribe_for_update(
handler_data.handler,
event,
handler_data.exec_params.call_safe,
handler_data.exec_params.done_callback
)
# add common context
for key, value in self._publisher.context:
page.context[key] = value
# add page contents (optional)
if update_contents:
if trigger_handlers:
await page.update()
else:
await page.update_silently()
def _unregister_page(self, location_id: int) -> None:
"""Unregister a product page from the hub.
Args:
location_id (int): ID of the product location.
"""
self._data.pages.pop(location_id)
self._pages.pop(location_id)
[docs]
async def new_page(
self,
location_id: int,
fetch_contents: bool = False,
trigger_handlers: bool = False
) -> ProductPage:
"""Create a new product page and register it in the hub.
Args:
location_id (int): ID of the product location.
fetch_contents (bool, optional): If True, the page contents are
fetched and updated. If False, the page contents are empty.
Defaults to False.
trigger_handlers (bool, optional): If True, the event handlers are
triggered after the page is updated. Defaults to False.
Returns:
ProductPage: The newly created product page object.
"""
page = ProductPage(location_id=location_id, client=self._client)
await self._register_page(page, fetch_contents, trigger_handlers)
return page
[docs]
async def add_page(
self,
page: ProductPage,
update_contents: bool = False,
trigger_handlers: bool = False
) -> None:
"""Add an existing product page to the hub. The page retains its own
state, but receives a common client. Its contents and event handlers
are updated, too.
Args:
page (ProductPage): The product page object to add.
update_contents (bool, optional): If True, the page contents are
fetched and updated. If False, the page contents remain as is.
trigger_handlers (bool, optional): If True, the event handlers are
triggered after the page is updated. Defaults to False.
"""
if page.client != self._client:
await page.set_client(self._client)
await self._register_page(page, update_contents, trigger_handlers)
[docs]
def get_page(self, location_id: int) -> ProductPage:
"""Get a registered product page by location ID.
Args:
location_id (int): ID of the product location.
Raises:
KeyError: If the page is not found.
Returns:
ProductPage: The product page object.
"""
try:
return self._pages[location_id]
except KeyError:
raise KeyError(f'Page not found: {location_id}')
[docs]
def get_pages(self) -> dict[int, ProductPage]:
"""Get all registered product pages.
Returns:
dict[int, ProductPage]: Dictionary of product page objects with
location IDs as keys.
"""
return self._pages.copy()
[docs]
async def remove_page(
self,
location_id: int,
await_handlers: bool = False
) -> ProductPage:
"""Remove a product page from the hub.
This method unregisters the page from the hub, creates a new client
for the page, and cancels (or awaits) all event handlers. It acts
similarly to the dictionary `pop` method.
Args:
location_id (int): ID of the product location.
await_handlers (bool, optional): If True, the method will wait for
all event handlers bound to the page to complete execution.
Defaults to False.
Raises:
KeyError: If the page is not found.
Returns:
ProductPage: The removed product page object.
"""
page = self.get_page(location_id)
self._unregister_page(location_id)
if await_handlers:
await page.await_update_handlers()
else:
await page.cancel_update_handlers()
page._client = ProductDataFetchClient()
return page
[docs]
async def scan(
self, start: int = 0, stop: int = 500, step: int = 1
) -> None:
"""Scan for new product pages in a range of location IDs. The pages
that are valid and have products are registered in the hub.
Args:
start (int, optional): Start location ID. Defaults to 0.
stop (int, optional): Stop location ID. Defaults to 500.
step (int, optional): Step size for location IDs. Defaults to 1.
"""
for loc in range(start, stop, step):
if loc in self._pages:
continue
await self.new_page(
location_id=loc,
fetch_contents=False,
trigger_handlers=False
)
await self.update_silently()
inexistent_locations = [
loc for loc, page in self._pages.items() if not page.data.products
]
for loc in inexistent_locations:
self._unregister_page(loc)
async def _fetch_contents(self) -> dict[int, FetchInfo]:
"""Fetch the contents of all product pages in the hub.
Returns:
dict[int, FetchInfo]: Dictionary of fetched contents, hashes, and
update flags for each page.
"""
tasks: list[asyncio.Task] = []
for page in self._pages.values():
tasks.append(self._runner.run_async(page._fetch_contents))
results: list[FetchInfo] = await asyncio.gather(*tasks)
return dict(zip(self._pages.keys(), results))
def _filter_updated_contents(
self, pages_fetch_info: dict[int, FetchInfo]
) -> dict[int, FetchInfo]:
"""Filter the fetched contents to only include pages the contents of
which have been updated.
Args:
pages_fetch_info (dict[int, FetchInfo]): Dictionary of fetched
contents, hashes, and update flags for each page.
Returns:
dict[int, FetchInfo]: Dictionary of updated pages and their fetch
info.
"""
return {
page_id: page_fetch_info
for page_id, page_fetch_info in pages_fetch_info.items()
if page_fetch_info.is_updated
}
async def _parse_contents(
self, pages_fetch_info: dict[int, FetchInfo]
) -> dict[int, list[Product]]:
"""Parse the contents of all product pages in the hub and extract
product data.
Args:
pages_fetch_info (dict[int, FetchInfo]): Dictionary of fetched
contents, hashes, and update flags for each page.
Returns:
dict[int, list[Product]]: Dictionary of parsed product data for
each page.
"""
tasks: list[asyncio.Future] = []
# for some reason, when multiprocessing is enabled, the runner
# fails to run the parsing function with run_safe=True (in this case
# it is wrapped in a safe runner function inside of the runner).
# Something is not pickable, but I don't know what it is.
run_safe = not isinstance(self._runner.executor, ProcessPoolExecutor)
for page_id, page_fetch_info in pages_fetch_info.items():
contents = page_fetch_info.contents or ''
func = self._pages[page_id]._parse_contents_blocking
task = self._runner.run_sync(func, contents, run_safe=run_safe)
tasks.append(task)
results: list[list[Product]] = [
result if isinstance(result, list) else []
for result in await asyncio.gather(*tasks, return_exceptions=True)
]
return dict(zip(pages_fetch_info.keys(), results))
[docs]
async def update_silently(self) -> None:
"""Fetch the contents of all product pages in the hub, extract the
product data, and update the internal state of the pages without
triggering any event handlers.
"""
pages_fetch_info = await self._fetch_contents()
pages_fetch_info = self._filter_updated_contents(pages_fetch_info)
pages_products = await self._parse_contents(pages_fetch_info)
for page_id, page_products in pages_products.items():
page = self._pages[page_id]
page_html_hash = pages_fetch_info[page_id].contents_hash
assert page_html_hash is not None, 'Invalid hash'
page._update_silently(page_html_hash, page_products)
[docs]
async def update(
self,
await_handlers: bool = False,
**kwargs
) -> None:
"""Fetch the contents of all product pages in the hub, extract the
product data, update the internal state of the pages, and trigger
event handlers.
Args:
await_handlers (bool, optional): If True, the method will wait for
all event handlers to complete execution. Defaults to False.
"""
pages_fetch_info = await self._fetch_contents()
pages_fetch_info = self._filter_updated_contents(pages_fetch_info)
pages_products = await self._parse_contents(pages_fetch_info)
tasks: list[asyncio.Task] = []
for page_id, page_products in pages_products.items():
page = self._pages[page_id]
page_html_hash = pages_fetch_info[page_id].contents_hash
assert page_html_hash is not None, 'Invalid hash'
task = self._runner.run_async(
page._update,
page_html_hash,
page_products,
await_handlers,
**kwargs
)
tasks.append(task)
await asyncio.gather(*tasks)
[docs]
async def update_forever(
self,
interval: float = 10.0,
await_handlers: bool = False,
**kwargs: Any
) -> None:
"""Update all product pages in the hub at regular intervals.
This method is a coroutine that runs indefinitely, updating
the product page at regular intervals.
Args:
interval (float, optional): The time interval in seconds between
updates. Defaults to 10.0.
await_handlers (bool, optional): If True, the method will wait for
all event handlers to complete execution. Defaults to False.
"""
while True:
try:
await self.update(await_handlers, **kwargs)
except asyncio.CancelledError:
break
await asyncio.sleep(interval)
[docs]
async def init_update_forever_tasks(
self,
interval: float = 10.0,
await_handlers: bool = False,
**kwargs: Any
) -> asyncio.Task:
"""Initialize the update forever task for all product pages in the hub.
If a task is already running, the method does nothing.
This method is not a coroutine. It creates a new task from
the `update_forever` coroutine with the `asyncio.create_task` function.
The task is stored internally and can be cancelled with
the `cancel_update_forever` method. Note that the task is created
for the hub, not for individual pages.
Args:
interval (float, optional): The time interval in seconds between
updates. Defaults to 10.0.
await_handlers (bool, optional): If True, the method will wait for
all event handlers to complete execution. Defaults to False.
"""
task = self._update_forever_task
if task is None or task.done():
task = asyncio.create_task(
self.update_forever(interval, await_handlers, **kwargs)
)
self._update_forever_task = task
return task
[docs]
async def await_update_handlers(self) -> None:
"""Wait for all event handlers to complete execution."""
tasks = [p.await_update_handlers() for p in self._pages.values()]
await asyncio.gather(*tasks)
[docs]
async def cancel_update_handlers(self) -> None:
"""Cancel all running event handlers."""
tasks = [p.cancel_update_handlers() for p in self._pages.values()]
await asyncio.gather(*tasks)
[docs]
async def cancel_update_forever(self) -> None:
"""Cancel the update forever task if it is running."""
if self._update_forever_task:
task = self._update_forever_task
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
self._update_forever_task = None