Files
2026-05-22 10:02:10 +02:00

180 lines
6.5 KiB
Python

import numpy as np
import pandas as pd
class ModelEvaluator:
disc_functions = ["log", "linear"]
def __init__(self, test_set, top_n: int = 10, discount_function: str = "log"):
self.test_set = test_set
self._top_n = top_n
assert discount_function in self.disc_functions, "Wrong Discount Function."
self._discount_function = discount_function
self.num_users = self.test_set.userId.nunique()
@property
def top_n(self):
return self._top_n
@top_n.setter
def top_n(self, top_n: int):
self._top_n = top_n
@property
def discount_function(self):
return self._discount_function
@discount_function.setter
def discount_function(self, discount_function: str):
assert discount_function in self.disc_functions, "Wrong Discount Function."
self._discount_function = discount_function
def cal_hit_ratio(self, recommendations):
"""
Hit Ratio
:param recommendations: dataframe, columns = ['userId', 'itemId', 'rank']
:return: hit rate.
"""
test_in_top_n = self.get_hits(recommendations)
# count hits per user
hits_per_user = self.count_positives(test_in_top_n)
# merge with the entire list of positive items for user
hits_per_user = hits_per_user.merge(
self.count_positives(self.test_set),
on="userId",
suffixes=("_true", ""),
how="right",
)
# if there are users with 0 hits the merge will have NA.
hits_per_user = hits_per_user.fillna(0)
# get the hit rate per user
hit_rate = hits_per_user.positive_true / hits_per_user.positive
# average
hit_rate = hit_rate.mean()
return hit_rate
def get_hits(self, recommendations):
"""
Find which items in the test set have a hit on the recommendations.
:param recommendations: dataframe, columns = ['userId', 'itemId', 'rank']
:return: dataframe, removing the rows missing in the test set.
"""
# check whether there are top_n items per user
top_n_recommendations = self.filter_to_top_n(recommendations)
# find the hits
test_in_top_n = pd.merge(
top_n_recommendations, self.test_set, on=["userId", "itemId"]
)
return test_in_top_n
def filter_to_top_n(self, dataset):
"""
if rank > top_n, we do not use it for evaluation
:param dataset: dataframe, columns = ['userId', 'itemId', 'rank']
:return: dataframe, columns = ['userId', 'itemId', 'rank']
"""
return dataset[dataset["rank"] <= self.top_n]
def cal_ndcg(self, recommendations):
r"""
For evaluating the top-N recommendation list, we also provide the normalized Discounted Cumulative Gain at N
recommendation (nDCG@N) computed as the ratio of the Discounted Cumulative Gain(DCG) with the ideal Discounted
Cumulative Gain(IDCG):
DGC_{pos} = rel_1 + \sum_{i=2}^{pos} \frac{rel_i}{\log_2i} \qquad \qquad
IDGC_{pos} = rel_1 + \sum_{i=2}^{|h|-1} \frac{rel_i}{\log_2i} \\
nDCG_{pos} = \frac{DCG}{IDCG}
where pos denotes the position up to which relevance is accumulated, and $rel_i$ is the relevance of the recommended item at position \textit{i}.
Ref: Y. Wang, L. Wang, Y. Li, D. He, T.-Y. Liu, and W. Chen.
A theoretical analysis of ndcgtype ranking measures.
:param recommendations: dataframe, columns = ['userId', 'itemId', 'rank']
:return: nDCG
"""
# get hits
hits = self.get_hits(recommendations)
DCG = self.cal_dcg(hits)
iDCG = self.cal_idcg()
# join to check if there are users in the test without hits
nDCG = iDCG.merge(DCG, on="userId", how="left")
nDCG = nDCG.fillna(0)
# normalize
nDCG["ndcg"] = nDCG["dcg"] / nDCG["idcg"]
return nDCG["ndcg"].mean()
def cal_dcg(self, hits):
"""
Discounted Comulative Gain
:param hits: recommendations: dataframe, columns = ['userId', 'itemId', 'rank']
:return: DCG
"""
# todo: the gain so far is set to a constant.
if self.discount_function == "log":
hits["discounted_gain"] = np.log(2) / np.log(hits["rank"] + 1)
elif self.discount_function == "linear":
hits["discounted_gain"] = 1 / hits["rank"]
DCG = hits.groupby("userId")["discounted_gain"].sum()
return pd.DataFrame(
{"userId": hits["userId"].unique(), "dcg": DCG}
).reset_index(drop=True)
def cal_idcg(self):
"""
the Ideal DCG, is the DCG for the best ranking possible (i.e. all true positives were recommended first).
:return: iDCG
"""
# create a fake ranking for test set items.
# We assume that the items in the test set are all on the Top-N list.
count_positives = self.count_positives(self.test_set)
ideal_rank = [i for x in count_positives["positive"] for i in (range(1, x + 1))]
test_ideal_ranking = self.test_set.copy()
test_ideal_ranking["rank"] = ideal_rank
# Filter to have at most top-N items.
test_ideal_ranking = self.filter_to_top_n(test_ideal_ranking)
# get the dcg for the ideal ranking
idcg = self.cal_dcg(test_ideal_ranking)
idcg = idcg.rename(columns={"dcg": "idcg"})
return idcg
@staticmethod
def count_positives(dataset):
"""
Returns the positives count.
:param dataset: dataframe, columns = ['userId', 'itemId', 'rank']
:return: dataframe, columns = ['userId', 'positive']
"""
users_with_positives = dataset.userId.unique()
positives_per_user = dataset.groupby("userId")["itemId"].count()
positives_per_user = pd.DataFrame(
{"userId": users_with_positives, "positive": positives_per_user}
)
return positives_per_user.reset_index(drop=True)
# if __name__ == '__main__':
## recoms = pd.DataFrame({
# 'userId': [1, 1, 1, 2, 2, 2, 3, 3, 3],
# 'itemId': [1, 2, 3, 4, 1, 2, 2, 3, 4],
# 'rank': [1, 2, 3, 1, 2, 3, 1, 2, 3]
# })
# test = pd.DataFrame({
# 'userId': [1, 1, 2, 3],
# 'itemId': [1, 4, 1, 5]
# })
# eval = Evaluator(test_set=test, top_n=2)
# assert eval.num_users == 3, 'number of users'
# assert eval.top_n == 2, 'number of top n'
# eval.top_n = 3
# assert eval.top_n == 3, 'changing of top n'
# print(eval.cal_hit_ratio(recoms))
# print(eval.cal_ndcg(recoms))