Source code for freshpointsync.update._update

import asyncio
import collections.abc
import enum
import functools
import inspect
import logging
import sys
from typing import (
    Any,
    Callable,
    Coroutine,
    Dict,
    Generic,
    Iterable,
    Iterator,
    List,
    NamedTuple,
    Optional,
    Tuple,
    TypeVar,
    Union,
    cast,
)

if sys.version_info >= (3, 10):
    from typing import TypeAlias
else:
    from typing_extensions import TypeAlias

if sys.version_info >= (3, 11):
    from typing import NamedTuple
else:
    from typing_extensions import NamedTuple


from ..product._product import DiffValueTuple, Product
from ..runner._runner import CallableRunner

logger = logging.getLogger('freshpointsync.update')
"""Logger for the `freshpointsync.update` package."""


T = TypeVar('T')


[docs] class ProductUpdateEvent(enum.Enum): """An enumeration of different types of product updates. This enum is used to represent various kinds of changes or updates that can occur to a product on the product page. Each member represents a specific type of update event for easy identification and handling. """ PRODUCT_ADDED = 'product_added' """Indicates that a new product has been listed on the product page.""" PRODUCT_UPDATED = 'product_updated' """Indicates any update to a product's information and state.""" QUANTITY_UPDATED = 'quantity_updated' """Indicates that the stock quantity of a product has been updated.""" PRICE_UPDATED = 'price_updated' """Indicates that the pricing of a product has changed, including both full price and current sale price. """ OTHER_UPDATED = 'other_updated' """Indicates an update to the product's metadata, for example, a change of the illustration picture URL. """ PRODUCT_REMOVED = 'product_removed' """Indicates that a product has been removed from the product page."""
[docs] class ProductUpdateContext(collections.abc.Mapping): """An update event context data wrapper for product update events with a mapping-like access to the data along with direct access to specific product state information via the `product_old` and `product_new` keys and the event type via the `event` key. This class is designed to encapsulate the update event context data passed to event handlers during product update events. """ def __init__(self, __kwargs: Dict[Union[str, Any], Any]) -> None: super().__init__() self.__kwargs = __kwargs def __str__(self) -> str: """Returns a string representation of the context data.""" return str(self.__kwargs) def __repr__(self) -> str: """Returns a string representation of the context data object initialization. """ return f'{self.__class__.__name__}({self.__kwargs})' def __getitem__(self, key: object) -> object: """Returns the value for the given key from the internal context data. Args: key (Any): The key to look up in the context data. Returns: Any: The value associated with the specified key. """ return self.__kwargs[key] def __iter__(self) -> Iterator[Any]: """Returns an iterator over the keys of the internal context data. Returns: Iterator[Any]: An iterator over the keys of the context data. """ return iter(self.__kwargs) def __len__(self) -> int: """Returns the number of items in the internal context data. Returns: int: The number of items in the context data. """ return len(self.__kwargs) def __get_product_attr(self, attr: str, default: T) -> T: """Retrieve a specific attribute from the product state data. Args: attr (str): The attribute to retrieve from the product state data. default (T): The default value to return if the attribute is not found. Returns: T: The value of the attribute if found, otherwise the default value. """ if attr in self.__kwargs: return self.__kwargs[attr] # type: ignore if self.product_new: return getattr(self.product_new, attr) # type: ignore if self.product_old: return getattr(self.product_old, attr) # type: ignore return default @property def event(self) -> ProductUpdateEvent: """The type of product update event that occurred.""" return self.__kwargs['event'] # type: ignore @property def timestamp(self) -> float: """The timestamp of the product update event.""" return self.__get_product_attr('timestamp', 0) @property def product_id(self) -> int: """ID of the product being updated. If not available, defaults to 0. """ return self.__get_product_attr('id_', 0) @property def product_name(self) -> str: """Name of the product being updated. If not available, defaults to an empty string. """ return self.__get_product_attr('name', '') @property def product_name_lowercase_ascii(self) -> str: """Lowercase ASCII representation of the product name. If not available, defaults to an empty string. """ return self.__get_product_attr('name_lowercase_ascii', '') @property def location_id(self) -> int: """ID of the product location. If not available, defaults to 0. """ return self.__get_product_attr('location_id', 0) @property def location(self) -> str: """Name of the product location. If not available, defaults to an empty string. """ return self.__get_product_attr('location', '') @property def location_lowercase_ascii(self) -> str: """Lowercase ASCII representation of the location name. If not available, defaults to an empty string. """ return self.__get_product_attr('location_lowercase_ascii', '') @property def product_new(self) -> Optional[Product]: """New state of the product after the update. If not available, defaults to None. """ return self.__kwargs['product_new'] # type: ignore @property def product_old(self) -> Optional[Product]: """Previous state of the product before the update. If not available, defaults to None. """ return self.__kwargs['product_old'] # type: ignore
[docs] def asdict(self) -> Dict[Union[str, object], object]: """Convert the context data to a dictionary. Returns: dict[str, Any]: A dictionary representation of the context data. """ return self.__kwargs.copy()
[docs] class HandlerValidator: """A utility class for validating handler functions or methods based on their signature and whether they are asynchronous or synchronous. """ @classmethod def _is_valid_is_async(cls, handler: object) -> Tuple[bool, bool]: """Determine if a given handler is valid based on its signature and whether it is an asynchronous coroutine or a synchronous function or method. Args: handler: The handler to be validated. This can be any callable (a function, a method, or an object with a `__call__` method). Returns: tuple(bool, bool): A tuple of two boolean values, where the first value indicates whether the handler is valid (True) or not (False) and the second value indicates whether the handler is an asynchronous (True) or synchronous (False). """ while isinstance(handler, functools.partial): handler = handler.func if not callable(handler): return False, False try: is_valid_sig = len(inspect.signature(handler).parameters) == 1 is_async = inspect.iscoroutinefunction(handler) if not is_async: call_method = getattr(handler, '__call__', None) # noqa: B004 is_async = inspect.iscoroutinefunction(call_method) return is_valid_sig, is_async except Exception as e: logger.warning('Failed to validate handler "%s": %s', handler, e) return False, False
[docs] @classmethod def is_valid_async_handler(cls, handler: object) -> bool: """Check if a handler is an asynchronous coroutine function with a valid signature (accepts exactly one argument). Args: handler: The handler to be validated. Returns: bool: True if the handler is valid and asynchronous, otherwise, False. """ is_valid, is_async = cls._is_valid_is_async(handler) return is_valid and is_async
[docs] @classmethod def is_valid_sync_handler(cls, handler: object) -> bool: """Check if a handler is a synchronous function or method with a valid signature (accepts exactly one argument). Args: handler: The handler to be validated. Returns: bool: True if the handler is valid and synchronous, otherwise, False. """ is_valid, is_async = cls._is_valid_is_async(handler) return is_valid and not is_async
[docs] @classmethod def is_valid_handler(cls, handler: object) -> bool: """Check if a handler has a valid signature and is a valid function or method, regardless of whether it is synchronous or asynchronous. Args: handler: The handler to be validated. Returns: bool: True if the handler is valid, otherwise, False. """ is_valid, _ = cls._is_valid_is_async(handler) return is_valid
[docs] def is_valid_handler(handler: object) -> bool: """Validate a handler based on its signature and whether it is an asynchronous coroutine or a synchronous function or method. Args: handler (Any): The handler to be validated. Returns: bool: True if the handler is valid, otherwise, False. """ return HandlerValidator.is_valid_handler(handler)
SyncHandler: TypeAlias = Callable[[ProductUpdateContext], None] """Type alias for a synchronous handler function. Expects a single argument of type `ProductUpdateContext` and returns `None`. """ AsyncHandler: TypeAlias = Callable[ [ProductUpdateContext], Coroutine[Any, Any, None] ] """Type alias for an asynchronous handler function. Expects a single argument of type `ProductUpdateContext` and returns a coroutine that resolves to `None`. """ Handler: TypeAlias = Union[SyncHandler, AsyncHandler] """Type alias for a handler function or method. Can be either a synchronous callable that accepts a single argument of type `ProductUpdateContext` and returns `None`, or an asynchronous callable that accepts a single argument of type `ProductUpdateContext` and returns a coroutine that resolves to `None`. """ class HandlerExecParamsTuple(NamedTuple): """A named tuple for storing handler execution parameters.""" run_safe: bool """A flag indicating whether the handler should be executed in a "safe" manner, meaning any exceptions raised by the handler will be caught, logged, and suppressed. """ run_blocking: bool """A flag indicating whether a synchronous handler should be executed in a blocking manner, i.e., called directly without using a separate `ThreadPoolExecutor`. This flag is ignored for asynchronous handlers. """ done_callback: Optional[Callable[[asyncio.Future], Any]] """An optional callback function to be called when the handler completes its execution. Depending of the type of the handler, the callback receives an `asyncio.Task` or `asyncio.Future` object as its argument, which represents the eventual result of the handler's execution. """ HandlerType = TypeVar('HandlerType', SyncHandler, AsyncHandler) """A `TypeVar` variable for a handler function or method.""" class HandlerDataTuple(NamedTuple, Generic[HandlerType]): """A named tuple for storing handler data.""" handler: HandlerType """The handler function to be executed after a certain event occurs.""" exec_params: HandlerExecParamsTuple """The hanlder function execution parameters.""" SyncHandlerList: TypeAlias = List[HandlerDataTuple[SyncHandler]] """A list of `HandlerDataTuple` records of synchronous handlers.""" AsyncHandlerList: TypeAlias = List[HandlerDataTuple[AsyncHandler]] """A list of `HandlerDataTuple` records of asynchronous handlers."""
[docs] class ProductUpdateEventPublisher: """A publisher and a subscription manager for product update events. This class maintains a registry of event handlers for different types of product update events. It allows for subscribing or unsubscribing handlers to specific events and facilitates broadcasting events to all relevant handlers. """ def __init__(self) -> None: self.context: dict[Any, Any] = {} # global persistent context self.runner = CallableRunner() self.sync_subscribers: dict[ProductUpdateEvent, SyncHandlerList] = {} self.async_subscribers: dict[ProductUpdateEvent, AsyncHandlerList] = {} @staticmethod def _is_in_subscribers_list( handler: Handler, subscribers: List[HandlerDataTuple] ) -> bool: """Check if a handler is already present in the list of subscribers. This method iterates over the subscribers list to check if the provided handler matches any handler in the list based on object identity. Args: handler: The handler function or method to check. subscribers: A list of `HandlerDataTuple` objects representing the current subscribers. Returns: bool: True if the handler is found in the list of subscribers, otherwise, False. """ return any(existing.handler == handler for existing in subscribers) @staticmethod def _remove_from_subscribers_list( handler: Handler, subscribers: List[HandlerDataTuple] ) -> None: """Remove a handler from the list of subscribers. This method iterates over the subscribers list to find and remove the provided handler based on object identity. If the handler is not found in the list, no action is taken. Args: handler: The handler function or method to remove. subscribers: A list of `HandlerDataTuple` objects representing the current subscribers. """ for i, existing in enumerate(subscribers): if existing.handler == handler: del subscribers[i] break def _subscribe_sync( self, event: ProductUpdateEvent, handler: Handler, run_safe: bool, run_blocking: bool, handler_done_callback: Optional[Callable[[asyncio.Future], Any]], ) -> None: """Subscribe a synchronous handler to a specific product update event. This internal method adds a synchronous handler to the event's subscribers list, ensuring that the handler and its execution parameters are stored correctly. Args: event: The product update event to subscribe the handler to. handler: The synchronous handler to be subscribed. run_safe: Indicates whether the handler should be called in a "safe" manner, with exceptions caught and handled. run_blocking: Indicates whether the handler should be called in a blocking manner, i.e., called directly without using an executor. If False, the handler is executed in a non-blocking manner in a separate thread. handler_done_callback: An optional callback function to be called when the handler completes its execution. """ subscribers = self.sync_subscribers.setdefault(event, []) if not self._is_in_subscribers_list(handler, subscribers): handler = cast(SyncHandler, handler) params = HandlerExecParamsTuple( run_safe, run_blocking, handler_done_callback ) subscribers.append(HandlerDataTuple(handler, params)) def _subscribe_async( self, event: ProductUpdateEvent, handler: Handler, run_safe: bool, handler_done_callback: Optional[Callable[[asyncio.Future], Any]], ) -> None: """Subscribe an asynchronous handler to a specific product update event. This internal method adds an asynchronous handler to the event's subscribers list, ensuring that the handler and its execution parameters are stored correctly. Args: event: The product update event to subscribe the handler to. handler: The asynchronous handler to be subscribed. run_safe: Indicates whether the handler should be called in a "safe" manner, with exceptions caught and handled. handler_done_callback: An optional callback function to be called when the handler completes its execution. """ subscribers = self.async_subscribers.setdefault(event, []) if not self._is_in_subscribers_list(handler, subscribers): handler = cast(AsyncHandler, handler) params = HandlerExecParamsTuple( run_safe, False, handler_done_callback ) subscribers.append(HandlerDataTuple(handler, params)) @staticmethod def _validate_event( event: Union[ProductUpdateEvent, Iterable[ProductUpdateEvent], None], ) -> List[ProductUpdateEvent]: """Validate the provided event(s) and return a list of event objects. Args: event (Union[ProductUpdateEvent, Iterable[ProductUpdateEvent], None], optional): The event or events to validate. Raises: TypeError: If the event is not a valid type. Returns: list[ProductUpdateEvent]: A list of validated event objects. """ if event is None: return list(ProductUpdateEvent) if isinstance(event, ProductUpdateEvent): return [event] if isinstance(event, Iterable) and not isinstance(event, str): # type: ignore return [ProductUpdateEvent(e) for e in event] raise TypeError( f'Event must be a ProductUpdateEvent or an iterable of ' f'ProductUpdateEvent, got {type(event).__name__} instead.' )
[docs] def subscribe( self, handler: Handler, event: Union[ ProductUpdateEvent, Iterable[ProductUpdateEvent], None ] = None, run_safe: bool = True, run_blocking: bool = True, handler_done_callback: Optional[Callable[[asyncio.Future], Any]] = None, ) -> None: """Subscribe a handler to specific product update event(s). The handler will be invoked when the event is posted, with the event context passed as an argument. The handler can be an asynchronous function, method, or any callable object that accepts exactly one argument (a `ProductUpdateContext` object) and returns `None` or a coroutine that resolves to `None`. Args: handler (Handler): The function or callable to invoke for the event(s). event (Union[ProductUpdateEvent, Iterable[ProductUpdateEvent], None], optional): The type of product update event(s) to subscribe to. If None, the handler will be subscribed to all events. run_safe (bool, optional): If True, exceptions raised by the handler are caught and logged. If False, exceptions are propagated and must be handled by the caller. Defaults to True. run_blocking (bool, optional): If True, synchronous handlers are executed in a blocking manner, i.e., called directly without using an executor. If False, synchronous handlers are executed in a non-blocking manner in a separate thread. Defaults to True. handler_done_callback (Optional[Callable[[asyncio.Future], Any]]): Optional function to be called when the handler completes execution. Depending on the type of the handler, the callback receives an `asyncio.Task` or `asyncio.Future` object as its argument, which represents the return value of the callback execution. Defaults to None. Raises: TypeError: If the handler does not have a valid signature. """ events = self._validate_event(event) is_valid, is_async = HandlerValidator._is_valid_is_async(handler) if not is_valid: raise TypeError('Handler signature is invalid.') callback = handler_done_callback if is_async: for event_ in events: self._subscribe_async(event_, handler, run_safe, callback) else: for event_ in events: self._subscribe_sync( event_, handler, run_safe, run_blocking, callback )
[docs] def unsubscribe( self, handler: Optional[Handler] = None, event: Union[ ProductUpdateEvent, Iterable[ProductUpdateEvent], None ] = None, ) -> None: """Unsubscribe a handler from specific product update event(s), or all handlers if no specific handler is provided. The unsubscribed handler will no longer be invoked when the event is posted. Args: handler (Handler): The handler to be unsubscribed from the event(s). if None, all handlers for the event are unsubscribed. event (Union[ProductUpdateEvent, Iterable[ProductUpdateEvent], None], optional): The type of product update event(s) to unsubscribe from. If None, the handler(s) will be subscribed from all events. """ events = self._validate_event(event) for e in events: async_subs = self.async_subscribers.pop(e, None) sync_subs = self.sync_subscribers.pop(e, None) if handler is None: continue if async_subs: if self._is_in_subscribers_list(handler, async_subs): self._remove_from_subscribers_list(handler, async_subs) if sync_subs: if self._is_in_subscribers_list(handler, sync_subs): self._remove_from_subscribers_list(handler, sync_subs)
[docs] def is_subscribed( self, handler: Optional[Handler] = None, event: Union[ ProductUpdateEvent, Iterable[ProductUpdateEvent], None ] = None, ) -> bool: """Check if there are any subscribers for the given event(s). Args: handler (Optional[Handler], optional): The handler to check for subscription. If None, all handlers are checked. event (Union[ProductUpdateEvent, Iterable[ProductUpdateEvent], None], optional): The type of product update event(s) to check for subscribers. If None, all events are checked. Returns: bool: True if there are subscribers for the event, False otherwise. """ events = self._validate_event(event) for e in events: if e in self.async_subscribers or e in self.sync_subscribers: if handler is None: return True async_subs = self.async_subscribers.get(e, []) sync_subs = self.sync_subscribers.get(e, []) if self._is_in_subscribers_list(handler, async_subs): return True if self._is_in_subscribers_list(handler, sync_subs): return True return False
[docs] def get_context( self, event: ProductUpdateEvent, product_new: Optional[Product], product_old: Optional[Product], **kwargs: Any, ) -> ProductUpdateContext: """Construct a `ProductUpdateContext` instance from the provided product and additional parameters, merging the global context data with the specific product information and any additional keyword arguments (the latter takes precedence). Args: event (ProductUpdateEvent): The type of product update event to be broadcasted. product_new (Optional[Product]): The updated product instance, or None if not applicable. product_old (Optional[Product]): The original product instance before the update, or None if not applicable. **kwargs (Any): Additional context parameters to be included. Returns: ProductUpdateContext: A context object populated with the global context data and the provided product information and parameters. """ context_kwargs = {**self.context, **kwargs} context_kwargs['event'] = event context_kwargs['product_new'] = product_new context_kwargs['product_old'] = product_old return ProductUpdateContext(context_kwargs)
[docs] def post( self, event: ProductUpdateEvent, product_new: Optional[Product], product_old: Optional[Product], **kwargs: Any, ) -> None: """Notify registered handlers about a specific product update event. This method constructs a `ProductUpdateContext` with new and old product details, along with any additional keyword arguments. It then broadcasts the event to all relevant handlers, asynchronous and synchronous alike, registered for the event type. Args: event (ProductUpdateEvent): The type of product update event to be broadcasted. product_new (Optional[Product]): The updated product instance, or None if there's no new product. product_old (Optional[Product]): The original product instance before update, or None if unavailable. **kwargs (Any): Additional keyword arguments to be included in the event context. If any of the keyword arguments overlaps with the context data, it takes precedence. """ event = ProductUpdateEvent(event) context: Optional[ProductUpdateContext] = None if event in self.async_subscribers: context = self.get_context( event, product_new, product_old, **kwargs ) for async_sub in self.async_subscribers[event]: self.runner.run_async( async_sub.handler, context, run_safe=async_sub.exec_params.run_safe, done_callback=async_sub.exec_params.done_callback, ) if event in self.sync_subscribers: if context is None: context = self.get_context( event, product_new, product_old, **kwargs ) for sync_sub in self.sync_subscribers[event]: self.runner.run_sync( sync_sub.handler, context, run_safe=sync_sub.exec_params.run_safe, run_blocking=sync_sub.exec_params.run_blocking, done_callback=sync_sub.exec_params.done_callback, )
[docs] class ProductCacheUpdater: """Product cache manager. Updates the cache and notifies subscribers about product updates. This class is responsible for maintaining a cache of product data and using an event publisher to notify interested parties of changes to product information. It provides methods to create, delete, and update products in the cache and to trigger appropriate events for each action. """ def __init__( self, products: Dict[int, Product], publisher: ProductUpdateEventPublisher, ) -> None: """Initialize a `ProductCacheUpdater` instance. Args: products (dict[int, Product]): A dictionary of products, where the keys are product IDs and the values are `Product` objects. publisher (ProductUpdateEventPublisher): An instance of the `ProductUpdateEventPublisher` class to call subscribed handlers when product updates occur. """ self.products = products self._publisher = publisher
[docs] @staticmethod def get_location_id(products: Iterable[Product]) -> Optional[int]: """Get the location ID from the given products. Args: products (Iterable[Product]): A collection of products. Returns: Optional[int]: The location ID if it is found in the products, otherwise, None. """ for product in products: if product.location_id is not None: return product.location_id return None
[docs] def create_product(self, product: Product, **kwargs: Any) -> None: """Add a new product to the cache and post a `PRODUCT_ADDED` event. Note that this method assumes that the product is not present in the cache already. Args: product (Product): the product to add to the cache. **kwargs (Any): Additional keyword arguments to include in the update event context. """ self.products[product.id_] = product self._publisher.post( event=ProductUpdateEvent.PRODUCT_ADDED, product_new=product, product_old=None, **kwargs, )
[docs] def delete_product(self, product: Product, **kwargs: Any) -> None: """Remove a product from the cache and posts a `PRODUCT_REMOVED` event. Note that this method assumes that the product is present in the cache. Args: product (Product): the product to remove from the cache. **kwargs (Any): Additional keyword arguments to include in the update event context. """ del self.products[product.id_] self._publisher.post( event=ProductUpdateEvent.PRODUCT_REMOVED, product_new=None, product_old=product, **kwargs, )
[docs] def update_product( self, product: Product, product_cached: Product, diff: Dict[str, DiffValueTuple], **kwargs: Any, ) -> None: """Update a product in the cache and post events reflecting the specific changes occured. This method determines what aspects of the product have changed (e.g., stock level, price, etc.) and posts the appropriate events for each type of update. Note that this method assumes that the product is different from the cached product. Args: product (Product): The new product state after the update. product_cached (Product): The original product state before the update. diff (dict[str, DiffValueTuple]): A dictionary of differences between the new and old product states. **kwargs (Any): Additional keyword arguments to include in the update event context. """ self.products[product.id_] = product events: set[ProductUpdateEvent] = {ProductUpdateEvent.PRODUCT_UPDATED} if 'quantity' in diff: events.add(ProductUpdateEvent.QUANTITY_UPDATED) del diff['quantity'] if 'price_full' in diff: events.add(ProductUpdateEvent.PRICE_UPDATED) del diff['price_full'] if 'price_curr' in diff: events.add(ProductUpdateEvent.PRICE_UPDATED) del diff['price_curr'] if diff: # some other changes are present in the diff events.add(ProductUpdateEvent.OTHER_UPDATED) for event in events: self._publisher.post( event=event, product_new=product, product_old=product_cached, **kwargs, )
[docs] async def update( self, products: Iterable[Product], await_handlers: bool = False, **kwargs: Any, ) -> None: """Process a batch of product updates, adding, deleting, or updating products as necessary. This method iterates through a given iterable of products, updating the cache to reflect new, removed, or changed products. It optionally waits for all event handlers to complete if `await_handlers` is True. Args: products (Iterable[Product]): products to process. await_handlers: If True, waits for all triggered event handlers to complete. **kwargs (Any): Additional keyword arguments to include in the update event context. """ products_mapping = {p.id_: p for p in products} # log the product cache update with the location ID location_id = self.get_location_id(products_mapping.values()) or '?' logger.info('Updating product cache for location ID "%s"', location_id) # check if any product is not listed anymore for id_number, product in self.products.copy().items(): if id_number not in products_mapping: self.delete_product(product, **kwargs) # update the products (or create if not previously listed) for id_number, product in products_mapping.items(): product_cached = self.products.get(id_number) if product_cached is None: self.create_product(product, **kwargs) else: diff = product_cached.diff(product, exclude={'timestamp'}) if diff: self.update_product(product, product_cached, diff, **kwargs) # optionally await the triggered handlers if await_handlers: await self._publisher.runner.await_all()
[docs] def update_silently(self, products: Iterable[Product]) -> None: """Update the product cache without triggering any events. This method clears the cache dictionary and subsequently populates it with the given products, effectively replacing the entire cache contents while maintaining the dictionary object itself. Args: products (Iterable[Product]): products to populate the cache with. """ products_mapping = {p.id_: p for p in products} # log the product cache update with the location ID location_id = self.get_location_id(products_mapping.values()) or '?' logger.info( 'Updating product cache for location ID "%s" silently', location_id ) self.products.clear() self.products.update(products_mapping)