392 lines
13 KiB
Python
392 lines
13 KiB
Python
import numpy as np
|
|
import torch
|
|
import torch.nn as nn
|
|
from scipy import sparse
|
|
from sklearn.metrics.pairwise import cosine_similarity
|
|
from sklearn.preprocessing import MinMaxScaler
|
|
from torch.utils.data import DataLoader
|
|
from tqdm.auto import tqdm
|
|
from typing import Union
|
|
|
|
from pygrex.data_reader import UserItemRatingDataset, DataReader
|
|
from pygrex.utils import EMFLoss
|
|
from .py_torch_model import PyTorchModel
|
|
from .recommender_model import RecommenderModel
|
|
|
|
|
|
class EMFModel(RecommenderModel):
|
|
def __init__(
|
|
self,
|
|
learning_rate: float,
|
|
reg_term: float,
|
|
expl_reg_term: float,
|
|
positive_threshold: float,
|
|
latent_dim: int,
|
|
epochs: int,
|
|
knn: int,
|
|
):
|
|
self.latent_dim = latent_dim
|
|
self.learning_rate = learning_rate
|
|
self.epochs = epochs
|
|
|
|
self.dataset = None
|
|
self.data = None
|
|
self.embedding_user = None
|
|
self.embedding_item = None
|
|
self.optimizer = None
|
|
|
|
self.reg_term = reg_term
|
|
self.expl_reg_term = expl_reg_term
|
|
self.positive_threshold = positive_threshold
|
|
self.knn = knn
|
|
|
|
self.explainability_matrix = None
|
|
self.sim_users = {}
|
|
|
|
self.affine_output = nn.Linear(in_features=self.latent_dim, out_features=1)
|
|
|
|
self.criterion = EMFLoss()
|
|
|
|
def fit(self, data: DataReader) -> None:
|
|
self.data = data
|
|
self.dataset = data.dataset
|
|
|
|
assert self.data is not None
|
|
num_users = self.data.num_user
|
|
num_items = self.data.num_item
|
|
|
|
self.embedding_user = np.random.uniform(
|
|
low=0, high=0.5 / self.latent_dim, size=(num_users, self.latent_dim)
|
|
)
|
|
|
|
self.embedding_item = np.random.uniform(
|
|
low=0, high=0.5 / self.latent_dim, size=(num_items, self.latent_dim)
|
|
)
|
|
|
|
self.compute_explainability()
|
|
|
|
with tqdm(total=self.epochs) as progress:
|
|
assert self.dataset is not None
|
|
for epoch in range(self.epochs):
|
|
self.dataset = self.dataset.sample(frac=1)
|
|
loss = []
|
|
for _, row in self.dataset.iterrows():
|
|
user_id = int(row.userId)
|
|
item_id = int(row.itemId)
|
|
|
|
p_ui = self.predict(user_id, item_id)
|
|
|
|
e_ui = row.rating - p_ui
|
|
|
|
loss.append(e_ui**2)
|
|
|
|
assert self.embedding_item is not None
|
|
assert self.embedding_user is not None
|
|
delta_u = 2 * e_ui * self.embedding_item[item_id, :]
|
|
delta_u -= self.reg_term * self.embedding_user[user_id, :]
|
|
temp = np.sign(
|
|
self.embedding_item[item_id, :]
|
|
- self.embedding_user[user_id, :]
|
|
)
|
|
assert self.explainability_matrix is not None
|
|
temp *= (
|
|
self.expl_reg_term
|
|
* self.explainability_matrix[user_id, item_id]
|
|
)
|
|
delta_u -= temp
|
|
|
|
delta_v = 2 * e_ui * self.embedding_user[user_id, :]
|
|
delta_v -= self.reg_term * self.embedding_item[item_id, :]
|
|
temp = np.sign(
|
|
self.embedding_user[user_id, :]
|
|
- self.embedding_item[item_id, :]
|
|
)
|
|
assert self.explainability_matrix is not None
|
|
temp *= (
|
|
self.expl_reg_term
|
|
* self.explainability_matrix[user_id, item_id]
|
|
)
|
|
delta_v -= temp
|
|
|
|
self.embedding_user[user_id, :] += self.learning_rate * delta_u
|
|
self.embedding_item[item_id, :] += self.learning_rate * delta_v
|
|
|
|
progress.update(1)
|
|
|
|
progress.set_postfix({"MSE": sum(loss) / len(loss)})
|
|
|
|
def compute_explainability(self):
|
|
assert self.dataset is not None
|
|
ds = self.dataset.pivot(index="userId", columns="itemId", values="rating")
|
|
ds = ds.fillna(0)
|
|
ds = sparse.csr_matrix(ds)
|
|
sim_matrix = cosine_similarity(ds)
|
|
min_val = sim_matrix.min() - 1
|
|
|
|
assert self.data is not None
|
|
for i in range(self.data.num_user):
|
|
sim_matrix[i, i] = min_val
|
|
|
|
knn_to_user_i = (-sim_matrix[i, :]).argsort()[: self.knn]
|
|
self.sim_users[i] = knn_to_user_i
|
|
|
|
self.explainability_matrix = np.zeros((self.data.num_user, self.data.num_item))
|
|
|
|
filter_dataset_on_threshold = self.dataset[
|
|
self.dataset["rating"] >= self.positive_threshold
|
|
]
|
|
|
|
for i in range(self.data.num_user):
|
|
knn_to_user_i = self.sim_users[i]
|
|
|
|
rated_items_by_sim_users = filter_dataset_on_threshold[
|
|
filter_dataset_on_threshold["userId"].isin(knn_to_user_i)
|
|
]
|
|
|
|
sim_scores = rated_items_by_sim_users.groupby(by="itemId")
|
|
sim_scores = sim_scores["rating"].sum()
|
|
sim_scores = sim_scores.reset_index()
|
|
|
|
self.explainability_matrix[i, sim_scores.itemId.astype(int)] = (
|
|
sim_scores.rating.to_list()
|
|
)
|
|
|
|
self.explainability_matrix = MinMaxScaler().fit_transform(
|
|
self.explainability_matrix
|
|
)
|
|
|
|
def predict(
|
|
self, user_id: Union[int, str], item_id: Union[int, str]
|
|
) -> Union[float, list]:
|
|
user_id_processed = user_id
|
|
item_id_processed = item_id
|
|
|
|
if isinstance(user_id_processed, np.ndarray):
|
|
user_id_processed = user_id_processed.tolist()
|
|
if isinstance(item_id_processed, np.ndarray):
|
|
item_id_processed = item_id_processed.tolist()
|
|
|
|
is_list_input = isinstance(user_id_processed, list) or isinstance(
|
|
item_id_processed, list
|
|
)
|
|
|
|
if is_list_input:
|
|
user_id_list = (
|
|
user_id_processed
|
|
if isinstance(user_id_processed, list)
|
|
else [user_id_processed]
|
|
)
|
|
item_id_list = (
|
|
item_id_processed
|
|
if isinstance(item_id_processed, list)
|
|
else [item_id_processed]
|
|
)
|
|
predictions = []
|
|
for u in user_id_list:
|
|
assert self.embedding_user is not None
|
|
assert self.embedding_item is not None
|
|
pred = [
|
|
np.dot(
|
|
self.embedding_user[int(u), :], self.embedding_item[int(i), :]
|
|
)
|
|
for i in item_id_list
|
|
]
|
|
predictions.append(pred)
|
|
predictions_np = np.array(predictions)
|
|
|
|
if len(user_id_list) == 1 or len(item_id_list) == 1:
|
|
predictions_np = predictions_np.flatten()
|
|
|
|
return predictions_np.tolist()
|
|
|
|
else:
|
|
assert self.embedding_user is not None
|
|
assert self.embedding_item is not None
|
|
return np.dot(
|
|
self.embedding_user[int(user_id), :],
|
|
self.embedding_item[int(item_id), :],
|
|
)
|
|
|
|
def user_embedding(self):
|
|
return self.embedding_user
|
|
|
|
def item_embedding(self):
|
|
return self.embedding_item
|
|
|
|
|
|
class EMFTorchModel(PyTorchModel):
|
|
def __init__(
|
|
self,
|
|
learning_rate: float,
|
|
reg_term: float,
|
|
expl_reg_term: float,
|
|
positive_threshold: float,
|
|
momentum: float,
|
|
weight_decay: float,
|
|
latent_dim: int,
|
|
epochs: int,
|
|
batch_size: int,
|
|
knn: int,
|
|
cuda: bool,
|
|
optimizer_name: str,
|
|
device_id=None,
|
|
):
|
|
super().__init__(
|
|
learning_rate=learning_rate,
|
|
latent_dim=latent_dim,
|
|
epochs=epochs,
|
|
batch_size=batch_size,
|
|
cuda=cuda,
|
|
optimizer_name=optimizer_name,
|
|
device_id=device_id,
|
|
)
|
|
|
|
self.reg_term = reg_term
|
|
self.expl_reg_term = expl_reg_term
|
|
self.positive_threshold = positive_threshold
|
|
self.momentum = momentum
|
|
self.weight_decay = weight_decay
|
|
self.knn = knn
|
|
|
|
self.explainability_matrix = None
|
|
self.sim_users = {}
|
|
|
|
self.affine_output = nn.Linear(in_features=self.latent_dim, out_features=1)
|
|
|
|
self.criterion = EMFLoss()
|
|
|
|
def fit(self, data: DataReader) -> None:
|
|
self.data = data
|
|
self.dataset = data.dataset
|
|
|
|
assert self.data is not None
|
|
num_users = self.data.num_user
|
|
num_items = self.data.num_item
|
|
|
|
self.embedding_user = nn.Embedding(
|
|
num_embeddings=num_users, embedding_dim=self.latent_dim
|
|
)
|
|
|
|
self.embedding_item = nn.Embedding(
|
|
num_embeddings=num_items, embedding_dim=self.latent_dim
|
|
)
|
|
|
|
self.compute_explainability()
|
|
|
|
self.optimizer = torch.optim.SGD(
|
|
self.parameters(),
|
|
lr=self.learning_rate,
|
|
momentum=self.momentum,
|
|
weight_decay=self.weight_decay,
|
|
)
|
|
|
|
with tqdm(total=self.epochs) as progress:
|
|
for epoch in range(self.epochs):
|
|
train_loader = self.instance_a_train_loader(self.batch_size)
|
|
loss = self.train_an_epoch(train_loader)
|
|
progress.update(1)
|
|
progress.set_postfix({"loss": loss})
|
|
|
|
def compute_explainability(self):
|
|
assert self.dataset is not None
|
|
ds = self.dataset.pivot(index="userId", columns="itemId", values="rating")
|
|
ds = ds.fillna(0)
|
|
ds = sparse.csr_matrix(ds)
|
|
sim_matrix = cosine_similarity(ds)
|
|
min_val = sim_matrix.min() - 1
|
|
|
|
assert self.data is not None
|
|
for i in range(self.data.num_user):
|
|
sim_matrix[i, i] = min_val
|
|
|
|
knn_to_user_i = (-sim_matrix[i, :]).argsort()[: self.knn]
|
|
self.sim_users[i] = knn_to_user_i
|
|
|
|
self.explainability_matrix = np.zeros((self.data.num_user, self.data.num_item))
|
|
|
|
filter_dataset_on_threshold = self.dataset[
|
|
self.dataset["rating"] >= self.positive_threshold
|
|
]
|
|
|
|
for i in range(self.data.num_user):
|
|
knn_to_user_i = self.sim_users[i]
|
|
|
|
rated_items_by_sim_users = filter_dataset_on_threshold[
|
|
filter_dataset_on_threshold["userId"].isin(knn_to_user_i)
|
|
]
|
|
|
|
sim_scores = rated_items_by_sim_users.groupby(by="itemId")
|
|
sim_scores = sim_scores["rating"].sum()
|
|
sim_scores = sim_scores.reset_index()
|
|
|
|
self.explainability_matrix[i, sim_scores.itemId.astype(int)] = (
|
|
sim_scores.rating.to_list()
|
|
)
|
|
|
|
self.explainability_matrix = MinMaxScaler().fit_transform(
|
|
self.explainability_matrix
|
|
)
|
|
|
|
self.explainability_matrix = torch.from_numpy(self.explainability_matrix)
|
|
|
|
def instance_a_train_loader(self, batch_size):
|
|
assert self.dataset is not None
|
|
dataset = UserItemRatingDataset(
|
|
user_tensor=torch.LongTensor(self.dataset.userId.values),
|
|
item_tensor=torch.LongTensor(self.dataset.itemId.values),
|
|
target_tensor=torch.FloatTensor(self.dataset.rating.values),
|
|
)
|
|
return DataLoader(dataset, batch_size=batch_size, shuffle=True)
|
|
|
|
def train_an_epoch(self, train_loader):
|
|
self.train()
|
|
cnt = 0
|
|
total_loss = 0
|
|
for batch_id, batch in enumerate(train_loader):
|
|
assert isinstance(batch[0], torch.LongTensor)
|
|
user, item, rating = batch[0], batch[1], batch[2]
|
|
rating = rating.float()
|
|
loss = self.train_single_batch(user, item, rating)
|
|
total_loss += loss
|
|
cnt += 1
|
|
return total_loss / cnt
|
|
|
|
def train_single_batch(self, users, items, ratings):
|
|
if self.cuda is True:
|
|
users, items, ratings = users.cuda(), items.cuda(), ratings.cuda()
|
|
|
|
assert self.optimizer is not None
|
|
self.optimizer.zero_grad()
|
|
|
|
ratings_pred = self(users, items)
|
|
|
|
assert self.embedding_user is not None
|
|
user_embeddings = self.embedding_user(users)
|
|
assert self.embedding_item is not None
|
|
item_embeddings = self.embedding_item(items)
|
|
|
|
assert self.explainability_matrix is not None
|
|
loss = self.criterion(
|
|
ratings_pred=ratings_pred,
|
|
ratings=ratings,
|
|
u=user_embeddings,
|
|
v=item_embeddings,
|
|
reg_term=self.reg_term,
|
|
expl=self.explainability_matrix[users, items],
|
|
expl_reg_term=self.expl_reg_term,
|
|
)
|
|
loss.backward()
|
|
self.optimizer.step()
|
|
loss = loss.item()
|
|
|
|
return loss
|
|
|
|
def forward(self, user_indices, item_indices):
|
|
assert self.embedding_user is not None
|
|
user_embeddings = self.embedding_user(user_indices)
|
|
assert self.embedding_item is not None
|
|
item_embeddings = self.embedding_item(item_indices)
|
|
element_product = torch.mul(user_embeddings, item_embeddings)
|
|
rating = self.affine_output(element_product)
|
|
return rating
|