diff --git a/ml/digest-score/app.py b/ml/digest-score/app.py index 8d1000b40..f6a894cc9 100644 --- a/ml/digest-score/app.py +++ b/ml/digest-score/app.py @@ -1,5 +1,6 @@ import logging from flask import Flask, request, jsonify +from prometheus_client import start_http_server, Histogram, Summary, Counter, generate_latest from typing import List from timeit import default_timer as timer @@ -17,17 +18,31 @@ from datetime import datetime import dateutil.parser from google.cloud import storage from features.user_history import FEATURE_COLUMNS +from treeinterpreter import treeinterpreter as ti app = Flask(__name__) logging.basicConfig(level=logging.INFO, stream=sys.stdout) - USER_HISTORY_PATH = 'user_features.pkl' -MODEL_PIPELINE_PATH = 'predict_read_pipeline-v002.pkl' +MODEL_PIPELINE_PATH = 'predict_read_model-v003.pkl' pipeline = None user_features = None +# these buckets are used for reporting scores, we want to make sure +# there is decent diversity in the returned scores. +score_bucket_ranges = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] +score_buckets = { + f'score_bucket_{int(b * 10)}': Counter(f'inference_score_bucket_{int(b * 10)}', f'Number of scores in the range {b - 0.1:.1f} to {b:.1f}') + for b in score_bucket_ranges +} + +def observe_score(score): + for b in score_bucket_ranges: + if b - 0.1 < score <= b: + score_buckets[f'score_bucket_{int(b * 10)}'].inc() + break + def download_from_gcs(bucket_name, gcs_path, destination_path): storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) @@ -72,14 +87,20 @@ def merge_dicts(dict1, dict2): dict1[key] = value return dict1 +def predict_proba_wrapper(X): + return pipeline.predict_proba(X) + + def refresh_data(): start = timer() global pipeline + global explainer global user_features - if os.getenv('LOAD_LOCAL_MODEL') != None: + if os.getenv('LOAD_LOCAL_MODEL') == None: + print(f"loading data from {os.getenv('GCS_BUCKET')}") gcs_bucket_name = os.getenv('GCS_BUCKET') - download_from_gcs(gcs_bucket_name, f'data/features/user_features.pkl', USER_HISTORY_PATH) - download_from_gcs(gcs_bucket_name, f'data/models/predict_read_pipeline-v002.pkl', MODEL_PIPELINE_PATH) + download_from_gcs(gcs_bucket_name, f'data/features/{USER_HISTORY_PATH}', USER_HISTORY_PATH) + download_from_gcs(gcs_bucket_name, f'data/models/{MODEL_PIPELINE_PATH}', MODEL_PIPELINE_PATH) pipeline = load_pipeline(MODEL_PIPELINE_PATH) user_features = load_user_features(USER_HISTORY_PATH) end = timer() @@ -90,6 +111,7 @@ def refresh_data(): def compute_score(user_id, item_features): interaction_score = compute_interaction_score(user_id, item_features) + observe_score(interaction_score) return { 'score': interaction_score, 'interaction_score': interaction_score, @@ -97,6 +119,7 @@ def compute_score(user_id, item_features): def compute_interaction_score(user_id, item_features): + start = timer() original_url_host = urlparse(item_features.get('original_url')).netloc df_test = pd.DataFrame([{ 'user_id': user_id, @@ -134,21 +157,27 @@ def compute_interaction_score(user_id, item_features): else: print("skipping feature: ", name) continue - df_test = pd.merge(df_test, df, on=merge_keys, how='left') df_test = df_test.fillna(0) df_predict = df_test[FEATURE_COLUMNS] + end = timer() + print('time to compute score (in seconds):', end - start) interaction_score = pipeline.predict_proba(df_predict) - print('score', interaction_score, 'item_features', df_test[df_test != 0].stack()) + print("INTERACTION SCORE: ", interaction_score) + print('item_features:\n', df_predict[df_predict != 0].stack()) - return interaction_score[0][1] + return np.float64(interaction_score[0][1]) @app.route('/_ah/health', methods=['GET']) def ready(): return jsonify({'OK': 'yes'}), 200 +@app.route('/metrics') +def metrics(): + return generate_latest(), 200, {'Content-Type': 'text/plain; charset=utf-8'} + @app.route('/refresh', methods=['GET']) def refresh(): @@ -158,6 +187,7 @@ def refresh(): @app.route('/users//features', methods=['GET']) def get_user_features(user_id): + print("user_features", user_features) result = {} df_user = pd.DataFrame([{ 'user_id': user_id, @@ -193,6 +223,7 @@ def predict(): @app.route('/batch', methods=['POST']) def batch(): + start = timer() try: result = {} data = request.get_json() @@ -210,6 +241,8 @@ def batch(): library_item_id = item['library_item_id'] result[library_item_id] = compute_score(user_id, item) + end = timer() + print(f'time to compute batch of {len(items)} items (in seconds): {end - start}') return jsonify(result) except Exception as e: app.logger.error(f"exception in batch endpoint: {request.get_json()}\n{e}") diff --git a/ml/digest-score/features.py b/ml/digest-score/features.py index 47766c244..ff7a202a5 100644 --- a/ml/digest-score/features.py +++ b/ml/digest-score/features.py @@ -15,13 +15,17 @@ from google.cloud import storage from features.extract import extract_and_upload_raw_data from features.user_history import generate_and_upload_user_history +from datetime import datetime, timezone def main(): - execution_date = os.getenv('EXECUTION_DATE') num_days_history = os.getenv('NUM_DAYS_HISTORY') gcs_bucket_name = os.getenv('GCS_BUCKET') + current_date_utc = datetime.now(timezone.utc) + execution_date = current_date_utc.strftime("%Y-%m-%d") + print(f'updating features using execution date: {execution_date}') + extract_and_upload_raw_data(execution_date, num_days_history, gcs_bucket_name) generate_and_upload_user_history(execution_date, gcs_bucket_name) diff --git a/ml/digest-score/features/user_history.py b/ml/digest-score/features/user_history.py index 289ecc64d..9eed6eacc 100644 --- a/ml/digest-score/features/user_history.py +++ b/ml/digest-score/features/user_history.py @@ -17,65 +17,30 @@ import pyarrow.feather as feather from google.cloud import storage FEATURE_COLUMNS=[ - # targets - # 'user_clicked', 'user_read', 'user_long_read', - - # item attributes / user setup attributes - 'item_word_count','item_has_site_icon', 'is_subscription', - 'inbox_folder', 'has_author', - - # how the user has setup the subscription - 'is_newsletter', 'is_feed', 'days_since_subscribed', - 'subscription_count', 'subscription_auto_add_to_library', - 'subscription_fetch_content', - - # user/item interaction history - 'user_original_url_host_saved_count_week_1', - 'user_original_url_host_interaction_count_week_1', - 'user_original_url_host_rate_week_1', - 'user_original_url_host_proportion_week_1', - - 'user_original_url_host_saved_count_week_2', - 'user_original_url_host_interaction_count_week_2', - 'user_original_url_host_rate_week_2', - 'user_original_url_host_proportion_week_2', - 'user_original_url_host_saved_count_week_3', - 'user_original_url_host_interaction_count_week_3', - 'user_original_url_host_rate_week_3', - 'user_original_url_host_proportion_week_3', - 'user_original_url_host_saved_count_week_4', - 'user_original_url_host_interaction_count_week_4', - 'user_original_url_host_rate_week_4', - 'user_original_url_host_proportion_week_4', - - 'user_subscription_saved_count_week_1', - 'user_subscription_interaction_count_week_1', - 'user_subscription_rate_week_1', 'user_subscription_proportion_week_1', - 'user_site_saved_count_week_3', 'user_site_interaction_count_week_3', - 'user_site_rate_week_3', 'user_site_proportion_week_3', - 'user_site_saved_count_week_2', 'user_site_interaction_count_week_2', - 'user_site_rate_week_2', 'user_site_proportion_week_2', - 'user_subscription_saved_count_week_2', - 'user_subscription_interaction_count_week_2', - 'user_subscription_rate_week_2', 'user_subscription_proportion_week_2', - 'user_site_saved_count_week_1', 'user_site_interaction_count_week_1', - 'user_site_rate_week_1', 'user_site_proportion_week_1', - 'user_subscription_saved_count_week_3', - 'user_subscription_interaction_count_week_3', - 'user_subscription_rate_week_3', 'user_subscription_proportion_week_3', - 'user_author_saved_count_week_4', - 'user_author_interaction_count_week_4', 'user_author_rate_week_4', - 'user_author_proportion_week_4', 'user_author_saved_count_week_1', - 'user_author_interaction_count_week_1', 'user_author_rate_week_1', - 'user_author_proportion_week_1', 'user_site_saved_count_week_4', - 'user_site_interaction_count_week_4', 'user_site_rate_week_4', - 'user_site_proportion_week_4', 'user_author_saved_count_week_2', - 'user_author_interaction_count_week_2', 'user_author_rate_week_2', - 'user_author_proportion_week_2', 'user_author_saved_count_week_3', - 'user_author_interaction_count_week_3', 'user_author_rate_week_3', - 'user_author_proportion_week_3', 'user_subscription_saved_count_week_4', - 'user_subscription_interaction_count_week_4', - 'user_subscription_rate_week_4', 'user_subscription_proportion_week_4' + 'user_subscription_rate_week_1', + 'user_subscription_proportion_week_1', + 'user_site_rate_week_3', + 'user_site_proportion_week_3', + 'user_site_rate_week_2', + 'user_site_proportion_week_2', + 'user_subscription_rate_week_2', + 'user_subscription_proportion_week_2', + 'user_site_rate_week_1', + 'user_site_proportion_week_1', + 'user_subscription_rate_week_3', + 'user_subscription_proportion_week_3', + 'user_author_rate_week_4', + 'user_author_proportion_week_4', + 'user_author_rate_week_1', + 'user_author_proportion_week_1', + 'user_site_rate_week_4', + 'user_site_proportion_week_4', + 'user_author_rate_week_2', + 'user_author_proportion_week_2', + 'user_author_rate_week_3', + 'user_author_proportion_week_3', + 'user_subscription_rate_week_4', + 'user_subscription_proportion_week_4' ] def parquet_to_dataframe(file_path): diff --git a/ml/digest-score/requirements.txt b/ml/digest-score/requirements.txt index d62f7cf55..193b5bb11 100644 --- a/ml/digest-score/requirements.txt +++ b/ml/digest-score/requirements.txt @@ -6,5 +6,8 @@ google-cloud-storage flask pydantic sklearn2pmml -sqlalchemy +sqlalchemy pyarrow +prometheus_client +treeinterpreter +xgboost==2.1.0 diff --git a/ml/digest-score/train.py b/ml/digest-score/train.py index db8deff84..3a3191554 100644 --- a/ml/digest-score/train.py +++ b/ml/digest-score/train.py @@ -1,17 +1,11 @@ -import pandas as pd import os +import pandas as pd import numpy as np from datetime import datetime, timedelta -from sklearn.linear_model import SGDClassifier -from sklearn.ensemble import RandomForestClassifier, VotingClassifier - -from sklearn.preprocessing import StandardScaler - -from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report, confusion_matrix -from sklearn.utils import shuffle +import xgboost as xgb +from sklearn.metrics import classification_report from sklearn.model_selection import train_test_split -from sklearn2pmml import PMMLPipeline, sklearn2pmml from google.cloud import storage from google.cloud.exceptions import PreconditionFailed @@ -23,13 +17,6 @@ import pyarrow.feather as feather from features.user_history import FEATURE_COLUMNS -DB_PARAMS = { - 'dbname': os.getenv('DB_NAME') or 'omnivore', - 'user': os.getenv('DB_USER'), - 'password': os.getenv('DB_PASSWORD'), - 'host': os.getenv('DB_HOST') or 'localhost', - 'port': os.getenv('DB_PORT') or '5432' -} def parquet_to_dataframe(file_path): table = pq.read_table(file_path) @@ -122,58 +109,15 @@ def prepare_data(df): return X, Y -def train_random_forest_model(X, Y): - model = RandomForestClassifier( - class_weight={0: 1, 1: 10}, - n_estimators=10, - max_depth=10, - random_state=42 - ) - scaler = StandardScaler() - X_scaled = scaler.fit_transform(X) +def train_xgb_model(X, Y): + X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=42) + model = xgb.XGBClassifier(max_depth=7, n_estimators=5) + model.fit(X_train, y_train) - X_train, X_test, Y_train, Y_test = train_test_split(X_scaled, Y, test_size=0.3, random_state=42) - - pipeline = PMMLPipeline([ - ("scaler", scaler), - ("classifier", model) - ]) - - pipeline.fit(X_train, Y_train) - - Y_pred = pipeline.predict(X_test) - print_classification_report(Y_test, Y_pred) - print_feature_importance(X, model) - - return pipeline - - -def print_feature_importance(X, rf): - # Get feature importances - importances = rf.feature_importances_ - - # Get the indices of the features sorted by importance - indices = np.argsort(importances)[::-1] - - # Print the feature ranking - print("Feature ranking:") - - for f in range(X.shape[1]): - print(f"{f + 1}. feature {indices[f]} ({importances[indices[f]]:.4f}) - {X.columns[indices[f]]}") - - - -def print_classification_report(Y_test, Y_pred): - report = classification_report(Y_test, Y_pred, target_names=['Not Clicked', 'Clicked'], output_dict=True) - print("Classification Report:") - print(f"Accuracy: {report['accuracy']:.4f}") - print(f"Precision (Not Clicked): {report['Not Clicked']['precision']:.4f}") - print(f"Recall (Not Clicked): {report['Not Clicked']['recall']:.4f}") - print(f"F1-Score (Not Clicked): {report['Not Clicked']['f1-score']:.4f}") - print(f"Precision (Clicked): {report['Clicked']['precision']:.4f}") - print(f"Recall (Clicked): {report['Clicked']['recall']:.4f}") - print(f"F1-Score (Clicked): {report['Clicked']['f1-score']:.4f}") + y_pred = model.predict(X_test) + print(classification_report(y_test, y_pred)) + return model def main(): @@ -181,24 +125,23 @@ def main(): num_days_history = os.getenv('NUM_DAYS_HISTORY') gcs_bucket_name = os.getenv('GCS_BUCKET') - raw_data_path = f'raw_library_items_${execution_date}.parquet' + raw_data_path = f'raw_library_items_{execution_date}.parquet' user_history_path = 'features_user_features.pkl' pipeline_path = 'predict_read_pipeline-v002.pkl' + model_path = 'predict_read_model-v003.pkl' download_from_gcs(gcs_bucket_name, f'data/features/user_features.pkl', user_history_path) download_from_gcs(gcs_bucket_name, f'data/raw/library_items_{execution_date}.parquet', raw_data_path) - sampled_raw_df = load_and_sample_library_items_from_parquet(raw_data_path, 0.10) + sampled_raw_df = load_and_sample_library_items_from_parquet(raw_data_path, 0.95) user_history = load_dataframes_from_pickle(user_history_path) merged_df = merge_user_preference_data(sampled_raw_df, user_history) - print("created merged data", merged_df.columns) - X, Y = prepare_data(merged_df) - random_forest_pipeline = train_random_forest_model(X, Y) - save_to_pickle(random_forest_pipeline, pipeline_path) - upload_to_gcs(gcs_bucket_name, pipeline_path, f'data/models/{pipeline_path}') + xgb_model = train_xgb_model(X, Y) + save_to_pickle(xgb_model, model_path) + upload_to_gcs(gcs_bucket_name, model_path, f'data/models/{model_path}') if __name__ == "__main__":