From e1fe4237dee4bc09fb06babd4b3c6f0303d684c3 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Thu, 4 Jul 2024 17:49:02 +0800 Subject: [PATCH 1/9] V3 scoring model, add prom monitoring --- ml/digest-score/app.py | 49 ++++++++++--- ml/digest-score/features.py | 6 +- ml/digest-score/features/user_history.py | 83 +++++++--------------- ml/digest-score/requirements.txt | 5 +- ml/digest-score/train.py | 89 +++++------------------- 5 files changed, 90 insertions(+), 142 deletions(-) diff --git a/ml/digest-score/app.py b/ml/digest-score/app.py index 8d1000b40..f6a894cc9 100644 --- a/ml/digest-score/app.py +++ b/ml/digest-score/app.py @@ -1,5 +1,6 @@ import logging from flask import Flask, request, jsonify +from prometheus_client import start_http_server, Histogram, Summary, Counter, generate_latest from typing import List from timeit import default_timer as timer @@ -17,17 +18,31 @@ from datetime import datetime import dateutil.parser from google.cloud import storage from features.user_history import FEATURE_COLUMNS +from treeinterpreter import treeinterpreter as ti app = Flask(__name__) logging.basicConfig(level=logging.INFO, stream=sys.stdout) - USER_HISTORY_PATH = 'user_features.pkl' -MODEL_PIPELINE_PATH = 'predict_read_pipeline-v002.pkl' +MODEL_PIPELINE_PATH = 'predict_read_model-v003.pkl' pipeline = None user_features = None +# these buckets are used for reporting scores, we want to make sure +# there is decent diversity in the returned scores. +score_bucket_ranges = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] +score_buckets = { + f'score_bucket_{int(b * 10)}': Counter(f'inference_score_bucket_{int(b * 10)}', f'Number of scores in the range {b - 0.1:.1f} to {b:.1f}') + for b in score_bucket_ranges +} + +def observe_score(score): + for b in score_bucket_ranges: + if b - 0.1 < score <= b: + score_buckets[f'score_bucket_{int(b * 10)}'].inc() + break + def download_from_gcs(bucket_name, gcs_path, destination_path): storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) @@ -72,14 +87,20 @@ def merge_dicts(dict1, dict2): dict1[key] = value return dict1 +def predict_proba_wrapper(X): + return pipeline.predict_proba(X) + + def refresh_data(): start = timer() global pipeline + global explainer global user_features - if os.getenv('LOAD_LOCAL_MODEL') != None: + if os.getenv('LOAD_LOCAL_MODEL') == None: + print(f"loading data from {os.getenv('GCS_BUCKET')}") gcs_bucket_name = os.getenv('GCS_BUCKET') - download_from_gcs(gcs_bucket_name, f'data/features/user_features.pkl', USER_HISTORY_PATH) - download_from_gcs(gcs_bucket_name, f'data/models/predict_read_pipeline-v002.pkl', MODEL_PIPELINE_PATH) + download_from_gcs(gcs_bucket_name, f'data/features/{USER_HISTORY_PATH}', USER_HISTORY_PATH) + download_from_gcs(gcs_bucket_name, f'data/models/{MODEL_PIPELINE_PATH}', MODEL_PIPELINE_PATH) pipeline = load_pipeline(MODEL_PIPELINE_PATH) user_features = load_user_features(USER_HISTORY_PATH) end = timer() @@ -90,6 +111,7 @@ def refresh_data(): def compute_score(user_id, item_features): interaction_score = compute_interaction_score(user_id, item_features) + observe_score(interaction_score) return { 'score': interaction_score, 'interaction_score': interaction_score, @@ -97,6 +119,7 @@ def compute_score(user_id, item_features): def compute_interaction_score(user_id, item_features): + start = timer() original_url_host = urlparse(item_features.get('original_url')).netloc df_test = pd.DataFrame([{ 'user_id': user_id, @@ -134,21 +157,27 @@ def compute_interaction_score(user_id, item_features): else: print("skipping feature: ", name) continue - df_test = pd.merge(df_test, df, on=merge_keys, how='left') df_test = df_test.fillna(0) df_predict = df_test[FEATURE_COLUMNS] + end = timer() + print('time to compute score (in seconds):', end - start) interaction_score = pipeline.predict_proba(df_predict) - print('score', interaction_score, 'item_features', df_test[df_test != 0].stack()) + print("INTERACTION SCORE: ", interaction_score) + print('item_features:\n', df_predict[df_predict != 0].stack()) - return interaction_score[0][1] + return np.float64(interaction_score[0][1]) @app.route('/_ah/health', methods=['GET']) def ready(): return jsonify({'OK': 'yes'}), 200 +@app.route('/metrics') +def metrics(): + return generate_latest(), 200, {'Content-Type': 'text/plain; charset=utf-8'} + @app.route('/refresh', methods=['GET']) def refresh(): @@ -158,6 +187,7 @@ def refresh(): @app.route('/users//features', methods=['GET']) def get_user_features(user_id): + print("user_features", user_features) result = {} df_user = pd.DataFrame([{ 'user_id': user_id, @@ -193,6 +223,7 @@ def predict(): @app.route('/batch', methods=['POST']) def batch(): + start = timer() try: result = {} data = request.get_json() @@ -210,6 +241,8 @@ def batch(): library_item_id = item['library_item_id'] result[library_item_id] = compute_score(user_id, item) + end = timer() + print(f'time to compute batch of {len(items)} items (in seconds): {end - start}') return jsonify(result) except Exception as e: app.logger.error(f"exception in batch endpoint: {request.get_json()}\n{e}") diff --git a/ml/digest-score/features.py b/ml/digest-score/features.py index 47766c244..ff7a202a5 100644 --- a/ml/digest-score/features.py +++ b/ml/digest-score/features.py @@ -15,13 +15,17 @@ from google.cloud import storage from features.extract import extract_and_upload_raw_data from features.user_history import generate_and_upload_user_history +from datetime import datetime, timezone def main(): - execution_date = os.getenv('EXECUTION_DATE') num_days_history = os.getenv('NUM_DAYS_HISTORY') gcs_bucket_name = os.getenv('GCS_BUCKET') + current_date_utc = datetime.now(timezone.utc) + execution_date = current_date_utc.strftime("%Y-%m-%d") + print(f'updating features using execution date: {execution_date}') + extract_and_upload_raw_data(execution_date, num_days_history, gcs_bucket_name) generate_and_upload_user_history(execution_date, gcs_bucket_name) diff --git a/ml/digest-score/features/user_history.py b/ml/digest-score/features/user_history.py index 289ecc64d..9eed6eacc 100644 --- a/ml/digest-score/features/user_history.py +++ b/ml/digest-score/features/user_history.py @@ -17,65 +17,30 @@ import pyarrow.feather as feather from google.cloud import storage FEATURE_COLUMNS=[ - # targets - # 'user_clicked', 'user_read', 'user_long_read', - - # item attributes / user setup attributes - 'item_word_count','item_has_site_icon', 'is_subscription', - 'inbox_folder', 'has_author', - - # how the user has setup the subscription - 'is_newsletter', 'is_feed', 'days_since_subscribed', - 'subscription_count', 'subscription_auto_add_to_library', - 'subscription_fetch_content', - - # user/item interaction history - 'user_original_url_host_saved_count_week_1', - 'user_original_url_host_interaction_count_week_1', - 'user_original_url_host_rate_week_1', - 'user_original_url_host_proportion_week_1', - - 'user_original_url_host_saved_count_week_2', - 'user_original_url_host_interaction_count_week_2', - 'user_original_url_host_rate_week_2', - 'user_original_url_host_proportion_week_2', - 'user_original_url_host_saved_count_week_3', - 'user_original_url_host_interaction_count_week_3', - 'user_original_url_host_rate_week_3', - 'user_original_url_host_proportion_week_3', - 'user_original_url_host_saved_count_week_4', - 'user_original_url_host_interaction_count_week_4', - 'user_original_url_host_rate_week_4', - 'user_original_url_host_proportion_week_4', - - 'user_subscription_saved_count_week_1', - 'user_subscription_interaction_count_week_1', - 'user_subscription_rate_week_1', 'user_subscription_proportion_week_1', - 'user_site_saved_count_week_3', 'user_site_interaction_count_week_3', - 'user_site_rate_week_3', 'user_site_proportion_week_3', - 'user_site_saved_count_week_2', 'user_site_interaction_count_week_2', - 'user_site_rate_week_2', 'user_site_proportion_week_2', - 'user_subscription_saved_count_week_2', - 'user_subscription_interaction_count_week_2', - 'user_subscription_rate_week_2', 'user_subscription_proportion_week_2', - 'user_site_saved_count_week_1', 'user_site_interaction_count_week_1', - 'user_site_rate_week_1', 'user_site_proportion_week_1', - 'user_subscription_saved_count_week_3', - 'user_subscription_interaction_count_week_3', - 'user_subscription_rate_week_3', 'user_subscription_proportion_week_3', - 'user_author_saved_count_week_4', - 'user_author_interaction_count_week_4', 'user_author_rate_week_4', - 'user_author_proportion_week_4', 'user_author_saved_count_week_1', - 'user_author_interaction_count_week_1', 'user_author_rate_week_1', - 'user_author_proportion_week_1', 'user_site_saved_count_week_4', - 'user_site_interaction_count_week_4', 'user_site_rate_week_4', - 'user_site_proportion_week_4', 'user_author_saved_count_week_2', - 'user_author_interaction_count_week_2', 'user_author_rate_week_2', - 'user_author_proportion_week_2', 'user_author_saved_count_week_3', - 'user_author_interaction_count_week_3', 'user_author_rate_week_3', - 'user_author_proportion_week_3', 'user_subscription_saved_count_week_4', - 'user_subscription_interaction_count_week_4', - 'user_subscription_rate_week_4', 'user_subscription_proportion_week_4' + 'user_subscription_rate_week_1', + 'user_subscription_proportion_week_1', + 'user_site_rate_week_3', + 'user_site_proportion_week_3', + 'user_site_rate_week_2', + 'user_site_proportion_week_2', + 'user_subscription_rate_week_2', + 'user_subscription_proportion_week_2', + 'user_site_rate_week_1', + 'user_site_proportion_week_1', + 'user_subscription_rate_week_3', + 'user_subscription_proportion_week_3', + 'user_author_rate_week_4', + 'user_author_proportion_week_4', + 'user_author_rate_week_1', + 'user_author_proportion_week_1', + 'user_site_rate_week_4', + 'user_site_proportion_week_4', + 'user_author_rate_week_2', + 'user_author_proportion_week_2', + 'user_author_rate_week_3', + 'user_author_proportion_week_3', + 'user_subscription_rate_week_4', + 'user_subscription_proportion_week_4' ] def parquet_to_dataframe(file_path): diff --git a/ml/digest-score/requirements.txt b/ml/digest-score/requirements.txt index d62f7cf55..193b5bb11 100644 --- a/ml/digest-score/requirements.txt +++ b/ml/digest-score/requirements.txt @@ -6,5 +6,8 @@ google-cloud-storage flask pydantic sklearn2pmml -sqlalchemy +sqlalchemy pyarrow +prometheus_client +treeinterpreter +xgboost==2.1.0 diff --git a/ml/digest-score/train.py b/ml/digest-score/train.py index db8deff84..3a3191554 100644 --- a/ml/digest-score/train.py +++ b/ml/digest-score/train.py @@ -1,17 +1,11 @@ -import pandas as pd import os +import pandas as pd import numpy as np from datetime import datetime, timedelta -from sklearn.linear_model import SGDClassifier -from sklearn.ensemble import RandomForestClassifier, VotingClassifier - -from sklearn.preprocessing import StandardScaler - -from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report, confusion_matrix -from sklearn.utils import shuffle +import xgboost as xgb +from sklearn.metrics import classification_report from sklearn.model_selection import train_test_split -from sklearn2pmml import PMMLPipeline, sklearn2pmml from google.cloud import storage from google.cloud.exceptions import PreconditionFailed @@ -23,13 +17,6 @@ import pyarrow.feather as feather from features.user_history import FEATURE_COLUMNS -DB_PARAMS = { - 'dbname': os.getenv('DB_NAME') or 'omnivore', - 'user': os.getenv('DB_USER'), - 'password': os.getenv('DB_PASSWORD'), - 'host': os.getenv('DB_HOST') or 'localhost', - 'port': os.getenv('DB_PORT') or '5432' -} def parquet_to_dataframe(file_path): table = pq.read_table(file_path) @@ -122,58 +109,15 @@ def prepare_data(df): return X, Y -def train_random_forest_model(X, Y): - model = RandomForestClassifier( - class_weight={0: 1, 1: 10}, - n_estimators=10, - max_depth=10, - random_state=42 - ) - scaler = StandardScaler() - X_scaled = scaler.fit_transform(X) +def train_xgb_model(X, Y): + X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=42) + model = xgb.XGBClassifier(max_depth=7, n_estimators=5) + model.fit(X_train, y_train) - X_train, X_test, Y_train, Y_test = train_test_split(X_scaled, Y, test_size=0.3, random_state=42) - - pipeline = PMMLPipeline([ - ("scaler", scaler), - ("classifier", model) - ]) - - pipeline.fit(X_train, Y_train) - - Y_pred = pipeline.predict(X_test) - print_classification_report(Y_test, Y_pred) - print_feature_importance(X, model) - - return pipeline - - -def print_feature_importance(X, rf): - # Get feature importances - importances = rf.feature_importances_ - - # Get the indices of the features sorted by importance - indices = np.argsort(importances)[::-1] - - # Print the feature ranking - print("Feature ranking:") - - for f in range(X.shape[1]): - print(f"{f + 1}. feature {indices[f]} ({importances[indices[f]]:.4f}) - {X.columns[indices[f]]}") - - - -def print_classification_report(Y_test, Y_pred): - report = classification_report(Y_test, Y_pred, target_names=['Not Clicked', 'Clicked'], output_dict=True) - print("Classification Report:") - print(f"Accuracy: {report['accuracy']:.4f}") - print(f"Precision (Not Clicked): {report['Not Clicked']['precision']:.4f}") - print(f"Recall (Not Clicked): {report['Not Clicked']['recall']:.4f}") - print(f"F1-Score (Not Clicked): {report['Not Clicked']['f1-score']:.4f}") - print(f"Precision (Clicked): {report['Clicked']['precision']:.4f}") - print(f"Recall (Clicked): {report['Clicked']['recall']:.4f}") - print(f"F1-Score (Clicked): {report['Clicked']['f1-score']:.4f}") + y_pred = model.predict(X_test) + print(classification_report(y_test, y_pred)) + return model def main(): @@ -181,24 +125,23 @@ def main(): num_days_history = os.getenv('NUM_DAYS_HISTORY') gcs_bucket_name = os.getenv('GCS_BUCKET') - raw_data_path = f'raw_library_items_${execution_date}.parquet' + raw_data_path = f'raw_library_items_{execution_date}.parquet' user_history_path = 'features_user_features.pkl' pipeline_path = 'predict_read_pipeline-v002.pkl' + model_path = 'predict_read_model-v003.pkl' download_from_gcs(gcs_bucket_name, f'data/features/user_features.pkl', user_history_path) download_from_gcs(gcs_bucket_name, f'data/raw/library_items_{execution_date}.parquet', raw_data_path) - sampled_raw_df = load_and_sample_library_items_from_parquet(raw_data_path, 0.10) + sampled_raw_df = load_and_sample_library_items_from_parquet(raw_data_path, 0.95) user_history = load_dataframes_from_pickle(user_history_path) merged_df = merge_user_preference_data(sampled_raw_df, user_history) - print("created merged data", merged_df.columns) - X, Y = prepare_data(merged_df) - random_forest_pipeline = train_random_forest_model(X, Y) - save_to_pickle(random_forest_pipeline, pipeline_path) - upload_to_gcs(gcs_bucket_name, pipeline_path, f'data/models/{pipeline_path}') + xgb_model = train_xgb_model(X, Y) + save_to_pickle(xgb_model, model_path) + upload_to_gcs(gcs_bucket_name, model_path, f'data/models/{model_path}') if __name__ == "__main__": From 0a012fd492064f01e5cd9b004b996714b2bf1c54 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Thu, 4 Jul 2024 18:47:00 +0800 Subject: [PATCH 2/9] Remove unused --- ml/digest-score/app.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ml/digest-score/app.py b/ml/digest-score/app.py index f6a894cc9..c6af8e147 100644 --- a/ml/digest-score/app.py +++ b/ml/digest-score/app.py @@ -8,7 +8,6 @@ from timeit import default_timer as timer import os import sys import json -import pytz import pickle import numpy as np import pandas as pd @@ -18,7 +17,7 @@ from datetime import datetime import dateutil.parser from google.cloud import storage from features.user_history import FEATURE_COLUMNS -from treeinterpreter import treeinterpreter as ti + app = Flask(__name__) logging.basicConfig(level=logging.INFO, stream=sys.stdout) From d715d1d56d2b6fbe06f95ec3705d45504845e2eb Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Thu, 4 Jul 2024 18:47:26 +0800 Subject: [PATCH 3/9] Remove treeinterpreter --- ml/digest-score/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ml/digest-score/requirements.txt b/ml/digest-score/requirements.txt index 193b5bb11..b9ca8e817 100644 --- a/ml/digest-score/requirements.txt +++ b/ml/digest-score/requirements.txt @@ -8,6 +8,6 @@ pydantic sklearn2pmml sqlalchemy pyarrow +requests prometheus_client -treeinterpreter xgboost==2.1.0 From 65f5ff88eaff0cee474784dfb739bfae4cbea918 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Thu, 4 Jul 2024 18:51:07 +0800 Subject: [PATCH 4/9] Refresh after features updated --- ml/digest-score/features.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/ml/digest-score/features.py b/ml/digest-score/features.py index ff7a202a5..21d961ed6 100644 --- a/ml/digest-score/features.py +++ b/ml/digest-score/features.py @@ -7,6 +7,7 @@ from datetime import datetime, timedelta import os from io import BytesIO import tempfile +import requests import pyarrow as pa import pyarrow.parquet as pq @@ -17,8 +18,21 @@ from features.user_history import generate_and_upload_user_history from datetime import datetime, timezone +def call_refresh_api(api): + try: + response = requests.get(api, timeout=10) + if response.status_code == 200: + print("scoring service refreshed") + else: + print(f"failed to refresh scoring service: {response.status_code}") + except requests.exceptions.Timeout: + print(f"The request timed out after {timeout} seconds") + except requests.exceptions.RequestException as e: + print(f"An error occurred while refreshing scoring service: {e}") + def main(): + score_service = os.getenv("SCORING_SERVICE_URL") num_days_history = os.getenv('NUM_DAYS_HISTORY') gcs_bucket_name = os.getenv('GCS_BUCKET') @@ -29,6 +43,9 @@ def main(): extract_and_upload_raw_data(execution_date, num_days_history, gcs_bucket_name) generate_and_upload_user_history(execution_date, gcs_bucket_name) + if score_service: + call_refresh_api(score_service) + print("done") if __name__ == "__main__": From 39df17fce046c247f716f96e34b081c470bc8070 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Fri, 5 Jul 2024 12:41:08 +0800 Subject: [PATCH 5/9] Add a RWLock to the user features storage for refreshing --- ml/digest-score/app.py | 98 +++++++++++++++++++++++++++++------------- 1 file changed, 67 insertions(+), 31 deletions(-) diff --git a/ml/digest-score/app.py b/ml/digest-score/app.py index c6af8e147..0f4111937 100644 --- a/ml/digest-score/app.py +++ b/ml/digest-score/app.py @@ -18,6 +18,24 @@ import dateutil.parser from google.cloud import storage from features.user_history import FEATURE_COLUMNS +import concurrent.futures +from threading import Lock, RLock +from collections import ChainMap +import copy + +class ThreadSafeUserFeatures: + def __init__(self): + self._data = {} + self._lock = RLock() + + def get(self): + with self._lock: + return dict(self._data) + + def update(self, new_features): + with self._lock: + self._data.update(new_features) + app = Flask(__name__) logging.basicConfig(level=logging.INFO, stream=sys.stdout) @@ -26,7 +44,8 @@ USER_HISTORY_PATH = 'user_features.pkl' MODEL_PIPELINE_PATH = 'predict_read_model-v003.pkl' pipeline = None -user_features = None +user_features_store = ThreadSafeUserFeatures() + # these buckets are used for reporting scores, we want to make sure # there is decent diversity in the returned scores. @@ -93,23 +112,23 @@ def predict_proba_wrapper(X): def refresh_data(): start = timer() global pipeline - global explainer - global user_features if os.getenv('LOAD_LOCAL_MODEL') == None: - print(f"loading data from {os.getenv('GCS_BUCKET')}") + app.logger.info(f"loading data from {os.getenv('GCS_BUCKET')}") gcs_bucket_name = os.getenv('GCS_BUCKET') download_from_gcs(gcs_bucket_name, f'data/features/{USER_HISTORY_PATH}', USER_HISTORY_PATH) download_from_gcs(gcs_bucket_name, f'data/models/{MODEL_PIPELINE_PATH}', MODEL_PIPELINE_PATH) pipeline = load_pipeline(MODEL_PIPELINE_PATH) - user_features = load_user_features(USER_HISTORY_PATH) - end = timer() - print('time to refresh data (in seconds):', end - start) - print('loaded pipeline:', pipeline) - print('loaded number of user_features:', len(user_features)) + + new_features = load_user_features(USER_HISTORY_PATH) + user_features_store.update(new_features) + + app.logger.info(f'time to refresh data (in seconds): {timer() - start}') + app.logger.info(f'loaded pipeline: {pipeline}') + app.logger.info(f'loaded number of user_features: {len(new_features)}') -def compute_score(user_id, item_features): - interaction_score = compute_interaction_score(user_id, item_features) +def compute_score(user_id, item_features, user_features): + interaction_score = compute_interaction_score(user_id, item_features, user_features) observe_score(interaction_score) return { 'score': interaction_score, @@ -117,7 +136,7 @@ def compute_score(user_id, item_features): } -def compute_interaction_score(user_id, item_features): +def compute_interaction_score(user_id, item_features, user_features): start = timer() original_url_host = urlparse(item_features.get('original_url')).netloc df_test = pd.DataFrame([{ @@ -160,15 +179,36 @@ def compute_interaction_score(user_id, item_features): df_test = df_test.fillna(0) df_predict = df_test[FEATURE_COLUMNS] - end = timer() - print('time to compute score (in seconds):', end - start) interaction_score = pipeline.predict_proba(df_predict) - print("INTERACTION SCORE: ", interaction_score) - print('item_features:\n', df_predict[df_predict != 0].stack()) + app.logger.info(f'INTERACTION SCORE: {interaction_score}') + app.logger.info(f'item_features:\n{df_predict[df_predict != 0].stack()}') + app.logger.info(f'time to compute score (in seconds): {timer() - start}') return np.float64(interaction_score[0][1]) +def process_parallel_item(user_id, key, item, user_features): + library_item_id = item['library_item_id'] + return library_item_id, compute_score(user_id, item, user_features) + +def parallel_compute_scores(user_id, items, max_workers=None): + user_features = user_features_store.get() + result = {} + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_item = {executor.submit(process_parallel_item, user_id, key, item, user_features): (key, item) + for key, item in items.items()} + + for future in concurrent.futures.as_completed(future_to_item): + key, item = future_to_item[future] + try: + library_item_id, score = future.result() + result[library_item_id] = score + except Exception as exc: + app.logger.error(f'Item {key} generated an exception: {exc}') + return result + + + @app.route('/_ah/health', methods=['GET']) def ready(): return jsonify({'OK': 'yes'}), 200 @@ -186,12 +226,13 @@ def refresh(): @app.route('/users//features', methods=['GET']) def get_user_features(user_id): - print("user_features", user_features) result = {} df_user = pd.DataFrame([{ 'user_id': user_id, }]) + user_features = user_features_store.get() + user_data = {} for name, df in user_features.items(): df = df[df['user_id'] == user_id] @@ -213,7 +254,8 @@ def predict(): if user_id is None: return jsonify({'error': 'Missing user_id'}), 400 - score = compute_score(user_id, item_features) + user_features = user_features_store.get() + score = compute_score(user_id, item_features, user_features) return jsonify({'score': score}) except Exception as e: app.logger.error(f"exception in predict endpoint: {request.get_json()}\n{e}") @@ -224,24 +266,18 @@ def predict(): def batch(): start = timer() try: - result = {} data = request.get_json() app.logger.info(f"batch scoring request: {data}") - user_id = data.get('user_id') items = data.get('items') + user_id = data.get('user_id') + if user_id == None: + return jsonify({'error': 'no user_id supplied'}), 400 + if len(items) > 101: + return jsonify({'error': f'too many items: {len(items)}'}), 400 + result = parallel_compute_scores(user_id, items) - if user_id is None: - return jsonify({'error': 'Missing user_id'}), 400 - - for key, item in items.items(): - print('key": ', key) - print('item: ', item) - library_item_id = item['library_item_id'] - result[library_item_id] = compute_score(user_id, item) - - end = timer() - print(f'time to compute batch of {len(items)} items (in seconds): {end - start}') + app.logger.info(f'time to compute batch of {len(items)} items (in seconds): {timer() - start}') return jsonify(result) except Exception as e: app.logger.error(f"exception in batch endpoint: {request.get_json()}\n{e}") From a7ca8442209cee9ba48e0254265148025fdc3539 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Fri, 5 Jul 2024 14:59:16 +0800 Subject: [PATCH 6/9] Add authorization to digest-score --- ml/digest-score/app.py | 15 ++++++---- ml/digest-score/auth.py | 45 ++++++++++++++++++++++++++++++ ml/digest-score/requirements.txt | 1 + packages/api/src/services/score.ts | 3 ++ 4 files changed, 59 insertions(+), 5 deletions(-) create mode 100644 ml/digest-score/auth.py diff --git a/ml/digest-score/app.py b/ml/digest-score/app.py index 0f4111937..f3c4dcd11 100644 --- a/ml/digest-score/app.py +++ b/ml/digest-score/app.py @@ -16,13 +16,16 @@ from urllib.parse import urlparse from datetime import datetime import dateutil.parser from google.cloud import storage -from features.user_history import FEATURE_COLUMNS import concurrent.futures from threading import Lock, RLock from collections import ChainMap import copy +from features.user_history import FEATURE_COLUMNS +from auth import user_token_required, admin_token_required + + class ThreadSafeUserFeatures: def __init__(self): self._data = {} @@ -219,12 +222,14 @@ def metrics(): @app.route('/refresh', methods=['GET']) +@admin_token_required def refresh(): refresh_data() return jsonify({'OK': 'yes'}), 200 @app.route('/users//features', methods=['GET']) +@admin_token_required def get_user_features(user_id): result = {} df_user = pd.DataFrame([{ @@ -243,12 +248,13 @@ def get_user_features(user_id): @app.route('/predict', methods=['POST']) +@user_token_required def predict(): try: data = request.get_json() app.logger.info(f"predict scoring request: {data}") - user_id = data.get('user_id') + user_id = request.user_id item_features = data.get('item_features') if user_id is None: @@ -263,14 +269,13 @@ def predict(): @app.route('/batch', methods=['POST']) +@user_token_required def batch(): start = timer() try: data = request.get_json() - app.logger.info(f"batch scoring request: {data}") - items = data.get('items') - user_id = data.get('user_id') + user_id = request.user_id if user_id == None: return jsonify({'error': 'no user_id supplied'}), 400 if len(items) > 101: diff --git a/ml/digest-score/auth.py b/ml/digest-score/auth.py new file mode 100644 index 000000000..7dbfcf5ca --- /dev/null +++ b/ml/digest-score/auth.py @@ -0,0 +1,45 @@ +import os +import jwt +from flask import request, jsonify +from functools import wraps + +SECRET_KEY = os.getenv('JWT_SECRET') +ADMIN_SECRET_KEY = os.getenv('JWT_ADMIN_SECRET_KEY') + +def user_token_required(f): + @wraps(f) + def decorated(*args, **kwargs): + token = None + if 'Authorization' in request.headers: + print("request.headers['Authorization'].split(" ")[1]", request.headers['Authorization'].split(" ")[1]) + token = request.headers['Authorization'].split(" ")[1] + if not token: + return jsonify({'message': 'Token is missing!'}), 401 + try: + data = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) + request.user_id = data['uid'] + except jwt.ExpiredSignatureError: + return jsonify({'message': 'Token has expired!'}), 401 + except jwt.InvalidTokenError: + return jsonify({'message': 'Token is invalid!'}), 401 + return f(*args, **kwargs) + return decorated + +def admin_token_required(f): + @wraps(f) + def decorated(*args, **kwargs): + token = None + if 'Authorization' in request.headers: + token = request.headers['Authorization'].split(" ")[1] + if not token: + return jsonify({'message': 'Token is missing!'}), 401 + try: + data = jwt.decode(token, ADMIN_SECRET_KEY, algorithms=["HS256"]) + if data['role'] != 'admin': + return jsonify({'message': 'Admin token required!'}), 403 + except jwt.ExpiredSignatureError: + return jsonify({'message': 'Token has expired!'}), 401 + except jwt.InvalidTokenError: + return jsonify({'message': 'Token is invalid!'}), 401 + return f(*args, **kwargs) + return decorated \ No newline at end of file diff --git a/ml/digest-score/requirements.txt b/ml/digest-score/requirements.txt index b9ca8e817..b7fbe1b94 100644 --- a/ml/digest-score/requirements.txt +++ b/ml/digest-score/requirements.txt @@ -9,5 +9,6 @@ sklearn2pmml sqlalchemy pyarrow requests +PyJWT prometheus_client xgboost==2.1.0 diff --git a/packages/api/src/services/score.ts b/packages/api/src/services/score.ts index e91384a8c..71a744e04 100644 --- a/packages/api/src/services/score.ts +++ b/packages/api/src/services/score.ts @@ -3,6 +3,7 @@ import client from 'prom-client' import { env } from '../env' import { registerMetric } from '../prometheus' import { logError } from '../utils/logger' +import { createWebAuthToken } from '../routers/auth/jwt_helpers' export interface Feature { library_item_id?: string @@ -77,10 +78,12 @@ class ScoreClientImpl implements ScoreClient { async getScores(data: ScoreApiRequestBody): Promise { const start = Date.now() + const authToken = createWebAuthToken(data.user_id) try { const response = await axios.post(this.apiUrl, data, { headers: { + Authorization: `Bearer ${authToken}`, 'Content-Type': 'application/json', }, timeout: 5000, From 6a201018b170f601bb63afaa744f9d72a5417c74 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Fri, 5 Jul 2024 15:20:45 +0800 Subject: [PATCH 7/9] Fix promise --- ml/digest-score/app.py | 3 +++ packages/api/src/services/score.ts | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/ml/digest-score/app.py b/ml/digest-score/app.py index f3c4dcd11..6bcaad920 100644 --- a/ml/digest-score/app.py +++ b/ml/digest-score/app.py @@ -182,7 +182,10 @@ def compute_interaction_score(user_id, item_features, user_features): df_test = df_test.fillna(0) df_predict = df_test[FEATURE_COLUMNS] + infer_start = timer() interaction_score = pipeline.predict_proba(df_predict) + app.logger.info(f'time to call infer (in seconds): {timer() - infer_start}') + app.logger.info(f'INTERACTION SCORE: {interaction_score}') app.logger.info(f'item_features:\n{df_predict[df_predict != 0].stack()}') app.logger.info(f'time to compute score (in seconds): {timer() - start}') diff --git a/packages/api/src/services/score.ts b/packages/api/src/services/score.ts index 71a744e04..250dfb271 100644 --- a/packages/api/src/services/score.ts +++ b/packages/api/src/services/score.ts @@ -78,7 +78,7 @@ class ScoreClientImpl implements ScoreClient { async getScores(data: ScoreApiRequestBody): Promise { const start = Date.now() - const authToken = createWebAuthToken(data.user_id) + const authToken = await createWebAuthToken(data.user_id) try { const response = await axios.post(this.apiUrl, data, { From 3fed16d936ff794770cafcdd21f6f4c1206e65c9 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Fri, 5 Jul 2024 15:30:20 +0800 Subject: [PATCH 8/9] Add auth to refresh call --- ml/digest-score/auth.py | 13 +++++++++++++ ml/digest-score/features.py | 7 ++++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/ml/digest-score/auth.py b/ml/digest-score/auth.py index 7dbfcf5ca..d4876bb0d 100644 --- a/ml/digest-score/auth.py +++ b/ml/digest-score/auth.py @@ -2,10 +2,23 @@ import os import jwt from flask import request, jsonify from functools import wraps +from datetime import datetime, timedelta + SECRET_KEY = os.getenv('JWT_SECRET') ADMIN_SECRET_KEY = os.getenv('JWT_ADMIN_SECRET_KEY') +def generate_admin_token(): + expiration_time = datetime.utcnow() + timedelta(minutes=5) + payload = { + 'role': 'admin', + 'exp': expiration_time + } + + token = jwt.encode(payload, ADMIN_SECRET_KEY, algorithm="HS256") + return token + + def user_token_required(f): @wraps(f) def decorated(*args, **kwargs): diff --git a/ml/digest-score/features.py b/ml/digest-score/features.py index 21d961ed6..32e6af60c 100644 --- a/ml/digest-score/features.py +++ b/ml/digest-score/features.py @@ -17,10 +17,15 @@ from features.extract import extract_and_upload_raw_data from features.user_history import generate_and_upload_user_history from datetime import datetime, timezone +from auth import generate_admin_token + def call_refresh_api(api): + headers = { + 'Authorization': f'Bearer {generate_admin_token()}' + } try: - response = requests.get(api, timeout=10) + response = requests.get(api, headers=headers, timeout=10) if response.status_code == 200: print("scoring service refreshed") else: From 59852a156553509c5dc036e2da30c1704b2e6980 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Fri, 5 Jul 2024 16:43:29 +0800 Subject: [PATCH 9/9] Throw if we cant sign the request --- packages/api/src/services/score.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/api/src/services/score.ts b/packages/api/src/services/score.ts index 250dfb271..9737277d2 100644 --- a/packages/api/src/services/score.ts +++ b/packages/api/src/services/score.ts @@ -78,9 +78,12 @@ class ScoreClientImpl implements ScoreClient { async getScores(data: ScoreApiRequestBody): Promise { const start = Date.now() - const authToken = await createWebAuthToken(data.user_id) try { + const authToken = await createWebAuthToken(data.user_id) + if (!authToken) { + throw Error('could not create auth token') + } const response = await axios.post(this.apiUrl, data, { headers: { Authorization: `Bearer ${authToken}`,