from typing import Dict, List, Union, Optional import numpy as np from pygrex.data_reader.data_reader import DataReader from pygrex.models.recommender_model import RecommenderModel from pygrex.utils.aggregation_strategy import ScoreAggregator, AggregationStrategy from pygrex.utils.scale import Scale class GroupRecommender: """ A class to represent a group recommender system that follows the workflow: 1. Setup and Candidate Selection 2. Individual Preference Collection 3. Score Aggregation 4. Final Recommendation List """ def __init__(self, data: DataReader): """Initialize the group recommender with data. Args: data: The dataset containing user-item interactions. """ self.data = data self._group_predictions = None self._members = None self._item_pool = None self._model = None self._aggregation_strategy = None self._score_aggregator = None self._aggregated_scores = None self._top_recommendation = None def setup_recommendation( self, model: RecommenderModel, members: List[Union[str, int]], data: DataReader, aggregation_strategy: AggregationStrategy, # type: ignore most_respected_person: Optional[Union[str, int]] = None, ) -> None: """ Setup and Candidate Selection: Initialize the group recommendation process. Args: model: The recommendation model to use members: List of user IDs representing the group members data: DataReader object containing the dataset aggregation_strategy: Strategy for aggregating individual predictions most_respected_person: User ID of most respected person (required for MRP strategy) """ self._members = members self._model = model self._aggregation_strategy = aggregation_strategy # Initialize score aggregator self._score_aggregator = ScoreAggregator( most_respected_person=most_respected_person ) # get all item IDs from the dataset item_ids = data.dataset["itemId"].unique() # Get items that no group member has interacted with self._item_pool = self.get_non_interacted_items_for_recommendation( self.data, item_ids, # type: ignore members, # type: ignore ) # Filter item_pool to only include IDs that are valid for the model # This prevents out-of-bounds errors when the model was trained with a different # number of items than what's currently in the dataset max_item_id = self._get_max_valid_item_id(model) # Convert to int array and filter out invalid IDs item_pool_int = self._item_pool.astype(int) valid_mask = (item_pool_int >= 0) & (item_pool_int < max_item_id) self._item_pool = item_pool_int[valid_mask] # Individual Preference Collection: Generate predictions for each group member self._group_predictions = self._generate_group_predictions() # Score Aggregation: Aggregate individual predictions into collective scores self._aggregated_scores = self._aggregate_group_scores() def _generate_group_predictions(self) -> Dict[Union[str, int], Dict[int, float]]: """ Individual Preference Collection: Generate predictions for all group members. Returns: A dictionary with user IDs as keys and their predictions as values """ if not self._members or self._model is None or self._item_pool is None: raise ValueError( "You must call setup_recommendation before generating predictions" ) predictions = {} for member in self._members: user_pred = self.generate_recommendation( self._model, member, self._item_pool, # type: ignore self.data, # type: ignore ) predictions[member] = user_pred return predictions def _aggregate_group_scores(self) -> Dict[int, float]: """ Score Aggregation: Aggregate individual predictions into collective scores. Returns: Dictionary mapping item IDs to aggregated scores """ if ( self._group_predictions is None or self._score_aggregator is None or self._aggregation_strategy is None ): raise ValueError( "You must call setup_recommendation before aggregating scores" ) # For Borda Count, we need to create rankings from predictions rankings = None if self._aggregation_strategy == AggregationStrategy.BORDA_COUNT: rankings = self._create_rankings_from_predictions() # Use ScoreAggregator to aggregate scores aggregated_scores = self._score_aggregator.aggregate_scores( evaluations=self._group_predictions, # type: ignore strategy=self._aggregation_strategy, rankings=rankings, # type: ignore ) # Sort items by their aggregated scores in descending order sorted_scores = dict( sorted(aggregated_scores.items(), key=lambda x: x[1], reverse=True) ) return sorted_scores # type: ignore def _create_rankings_from_predictions(self) -> Dict[Union[str, int], List[int]]: """ Create rankings from predictions for Borda Count aggregation. Returns: Dictionary mapping user IDs to ranked lists of item IDs """ if self._group_predictions is None: raise ValueError("Group predictions not available") rankings = {} for user_id, predictions in self._group_predictions.items(): # Sort items by prediction score in descending order sorted_items = sorted(predictions.items(), key=lambda x: x[1], reverse=True) rankings[user_id] = [item_id for item_id, _ in sorted_items] return rankings def _get_max_valid_item_id(self, model: RecommenderModel) -> int: """ Get the maximum valid item ID for the given model. Args: model: The recommendation model Returns: Maximum valid item ID (exclusive, so valid IDs are [0, max_item_id)) """ # For implicit models (MFImplicitModel), check item_factors shape if hasattr(model, 'model') and model.model is not None: if hasattr(model.model, 'item_factors'): return model.model.item_factors.shape[0] # Check if model has total_items attribute (set during fit) if hasattr(model, 'total_items') and model.total_items is not None: return model.total_items # Fallback to data.num_item if model shape is not available return self.data.num_item def get_non_interacted_items_for_recommendation( self, data: DataReader, item_ids: List[Union[str, int]], members: List[Union[str, int]], ) -> np.ndarray: """ Returns the list of item IDs that none of the specified group members have interacted with. This method is typically used in recommendation systems to filter out items that have already been interacted with by any member of the group, ensuring that recommendations focus on new or unseen items. Args: data: The original dataset containing user-item interactions. item_ids: A list of all available item IDs to consider. members: A list of user IDs representing the group. Returns: np.ndarray: A list of item IDs that have not been interacted with by any member of the group. """ consecutive_member_ids = [data.get_new_user_id(int(m)) for m in members] consecutive_member_ids = [m for m in consecutive_member_ids if m is not None] # Get all unique item IDs interacted with by users in the group interacted_item_ids = data.dataset.loc[ data.dataset.userId.isin(consecutive_member_ids), "itemId" ].unique() # Use numpy set difference to get non-interacted item IDs item_pool = np.setdiff1d(item_ids, interacted_item_ids, assume_unique=True) return item_pool def generate_recommendation( self, model: RecommenderModel, member: Union[str, int], item_pool: List[Union[str, int]], data: DataReader, ) -> Dict[int, float]: """ Generate recommendations for a user based on the provided model. Args: model: A recommendation model that implements the RecommenderModel interface member: The ID of the user item_pool: List of item IDs to predict ratings/scores for data: The dataset containing user-item interactions Returns: A dictionary mapping item IDs to predicted ratings/scores """ member = int(member) new_member_id = data.get_new_user_id(member) if new_member_id is None: return {} # Return empty predictions for this user # Additional safety check: filter item_pool to valid IDs before prediction # This provides a second layer of protection in case filtering was missed earlier max_valid_item_id = self._get_max_valid_item_id(model) if isinstance(item_pool, np.ndarray): item_pool = item_pool.astype(int) item_pool = item_pool[(item_pool >= 0) & (item_pool < max_valid_item_id)] elif isinstance(item_pool, list): item_pool = [int(item) for item in item_pool if 0 <= int(item) < max_valid_item_id] if len(item_pool) == 0: print(f"No valid items found for user {new_member_id}. Returning empty predictions.") return {} # Return empty predictions if no valid items raw_predictions = model.predict(new_member_id, item_pool) # type: ignore if not isinstance(raw_predictions, (list, np.ndarray)): raise TypeError( f"Model's predict function returned an unexpected type: {type(raw_predictions)}" ) # raw_predictions = [] # # Generate predictions for each item in the pool # for item in item_pool: # item = int(item) # raw_predictions.append(model.predict(new_member_id, item)) # type: ignore # Ensure raw_predictions is a numpy array raw_predictions = np.array(raw_predictions) # # Flatten the predictions if it's a 2D array (single user, multiple items) # if raw_predictions.ndim == 2 and raw_predictions.shape[0] == 1: # raw_predictions = raw_predictions.flatten() # # Check if the length of raw_predictions matches item_pool # if len(raw_predictions) != len(item_pool): # raise ValueError( # "Mismatch between predictions and item IDs. Check the model's predict function." # ) # Apply scaling to normalize predictions to 1-5 range scaled_linear = Scale.linear( np.array(raw_predictions), target_min=1, target_max=5, ) # Convert the scaled predictions into a dictionary with original item IDs as keys predictions = {} for item, scaled_pred in zip(item_pool, scaled_linear): # Ensure item_id is treated as an integer item_original_id = data.get_original_item_id(int(item)) if item_original_id is not None: predictions[int(item_original_id)] = scaled_pred # type: ignore # Sort the predictions in descending order of scores sorted_predictions = dict( sorted(predictions.items(), key=lambda item: item[1], reverse=True) ) return sorted_predictions def get_group_recommendations( self, top_k: Optional[int] = None ) -> Union[int, List[int]]: """ Final Recommendation List: Get recommendations for the group based on aggregated scores. Args: top_k: The number of recommendations to return. If None, returns all recommendations sorted by score. If 1, returns only the top recommendation as a single item ID. If > 1, returns the top k recommendations as a list of item IDs. Returns: If top_k is 1, a single item ID. Otherwise, a list of item IDs. """ if self._aggregated_scores is None: raise ValueError( "You must call setup_recommendation before getting recommendations" ) sorted_items = list(self._aggregated_scores.items()) # Return results based on top_k parameter if top_k is None: # Return all items as a list of item IDs return [item_id for item_id, _ in sorted_items] elif top_k == 1: # Return only the top item ID if sorted_items: return sorted_items[0][0] return None # type: ignore else: # Return top k item IDs return [ item_id for item_id, _ in sorted_items[: min(top_k, len(sorted_items))] ] def get_top_recommendation(self) -> int: """ Get the top recommendation for the group. Returns: The item ID with the highest aggregated score across all group members. """ if self._top_recommendation is None: self._top_recommendation = self.get_group_recommendations(top_k=1) return self._top_recommendation # type: ignore def get_recommendation_scores(self) -> Dict[int, float]: """ Get the aggregated scores for all items across the group. Returns: A dictionary with item IDs as keys and their aggregated scores as values. """ if self._aggregated_scores is None: raise ValueError( "You must call setup_recommendation before getting recommendation scores" ) return self._aggregated_scores.copy() def get_aggregation_strategy(self) -> Optional[AggregationStrategy]: """ Get the current aggregation strategy. Returns: The aggregation strategy being used, or None if not set. """ return self._aggregation_strategy def get_group_members(self) -> Optional[List[Union[str, int]]]: """ Get the current group members. Returns: List of group member IDs, or None if not set. """ return self._members.copy() if self._members else None def get_individual_predictions( self, ) -> Optional[Dict[Union[str, int], Dict[int, float]]]: """ Get the individual predictions for all group members. Returns: Dictionary mapping user IDs to their individual predictions, or None if not available. """ return self._group_predictions.copy() if self._group_predictions else None