diff --git a/ml/digest-score/app.py b/ml/digest-score/app.py index 8d1000b40..6bcaad920 100644 --- a/ml/digest-score/app.py +++ b/ml/digest-score/app.py @@ -1,5 +1,6 @@ import logging from flask import Flask, request, jsonify +from prometheus_client import start_http_server, Histogram, Summary, Counter, generate_latest from typing import List from timeit import default_timer as timer @@ -7,7 +8,6 @@ from timeit import default_timer as timer import os import sys import json -import pytz import pickle import numpy as np import pandas as pd @@ -16,17 +16,53 @@ from urllib.parse import urlparse from datetime import datetime import dateutil.parser from google.cloud import storage + +import concurrent.futures +from threading import Lock, RLock +from collections import ChainMap +import copy + from features.user_history import FEATURE_COLUMNS +from auth import user_token_required, admin_token_required + + +class ThreadSafeUserFeatures: + def __init__(self): + self._data = {} + self._lock = RLock() + + def get(self): + with self._lock: + return dict(self._data) + + def update(self, new_features): + with self._lock: + self._data.update(new_features) + app = Flask(__name__) logging.basicConfig(level=logging.INFO, stream=sys.stdout) - USER_HISTORY_PATH = 'user_features.pkl' -MODEL_PIPELINE_PATH = 'predict_read_pipeline-v002.pkl' +MODEL_PIPELINE_PATH = 'predict_read_model-v003.pkl' pipeline = None -user_features = None +user_features_store = ThreadSafeUserFeatures() + + +# these buckets are used for reporting scores, we want to make sure +# there is decent diversity in the returned scores. +score_bucket_ranges = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] +score_buckets = { + f'score_bucket_{int(b * 10)}': Counter(f'inference_score_bucket_{int(b * 10)}', f'Number of scores in the range {b - 0.1:.1f} to {b:.1f}') + for b in score_bucket_ranges +} + +def observe_score(score): + for b in score_bucket_ranges: + if b - 0.1 < score <= b: + score_buckets[f'score_bucket_{int(b * 10)}'].inc() + break def download_from_gcs(bucket_name, gcs_path, destination_path): storage_client = storage.Client() @@ -72,31 +108,39 @@ def merge_dicts(dict1, dict2): dict1[key] = value return dict1 +def predict_proba_wrapper(X): + return pipeline.predict_proba(X) + + def refresh_data(): start = timer() global pipeline - global user_features - if os.getenv('LOAD_LOCAL_MODEL') != None: + if os.getenv('LOAD_LOCAL_MODEL') == None: + app.logger.info(f"loading data from {os.getenv('GCS_BUCKET')}") gcs_bucket_name = os.getenv('GCS_BUCKET') - download_from_gcs(gcs_bucket_name, f'data/features/user_features.pkl', USER_HISTORY_PATH) - download_from_gcs(gcs_bucket_name, f'data/models/predict_read_pipeline-v002.pkl', MODEL_PIPELINE_PATH) + download_from_gcs(gcs_bucket_name, f'data/features/{USER_HISTORY_PATH}', USER_HISTORY_PATH) + download_from_gcs(gcs_bucket_name, f'data/models/{MODEL_PIPELINE_PATH}', MODEL_PIPELINE_PATH) pipeline = load_pipeline(MODEL_PIPELINE_PATH) - user_features = load_user_features(USER_HISTORY_PATH) - end = timer() - print('time to refresh data (in seconds):', end - start) - print('loaded pipeline:', pipeline) - print('loaded number of user_features:', len(user_features)) + + new_features = load_user_features(USER_HISTORY_PATH) + user_features_store.update(new_features) + + app.logger.info(f'time to refresh data (in seconds): {timer() - start}') + app.logger.info(f'loaded pipeline: {pipeline}') + app.logger.info(f'loaded number of user_features: {len(new_features)}') -def compute_score(user_id, item_features): - interaction_score = compute_interaction_score(user_id, item_features) +def compute_score(user_id, item_features, user_features): + interaction_score = compute_interaction_score(user_id, item_features, user_features) + observe_score(interaction_score) return { 'score': interaction_score, 'interaction_score': interaction_score, } -def compute_interaction_score(user_id, item_features): +def compute_interaction_score(user_id, item_features, user_features): + start = timer() original_url_host = urlparse(item_features.get('original_url')).netloc df_test = pd.DataFrame([{ 'user_id': user_id, @@ -134,35 +178,69 @@ def compute_interaction_score(user_id, item_features): else: print("skipping feature: ", name) continue - df_test = pd.merge(df_test, df, on=merge_keys, how='left') df_test = df_test.fillna(0) df_predict = df_test[FEATURE_COLUMNS] + infer_start = timer() interaction_score = pipeline.predict_proba(df_predict) - print('score', interaction_score, 'item_features', df_test[df_test != 0].stack()) + app.logger.info(f'time to call infer (in seconds): {timer() - infer_start}') + + app.logger.info(f'INTERACTION SCORE: {interaction_score}') + app.logger.info(f'item_features:\n{df_predict[df_predict != 0].stack()}') + app.logger.info(f'time to compute score (in seconds): {timer() - start}') + + return np.float64(interaction_score[0][1]) + + +def process_parallel_item(user_id, key, item, user_features): + library_item_id = item['library_item_id'] + return library_item_id, compute_score(user_id, item, user_features) + +def parallel_compute_scores(user_id, items, max_workers=None): + user_features = user_features_store.get() + result = {} + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_item = {executor.submit(process_parallel_item, user_id, key, item, user_features): (key, item) + for key, item in items.items()} + + for future in concurrent.futures.as_completed(future_to_item): + key, item = future_to_item[future] + try: + library_item_id, score = future.result() + result[library_item_id] = score + except Exception as exc: + app.logger.error(f'Item {key} generated an exception: {exc}') + return result - return interaction_score[0][1] @app.route('/_ah/health', methods=['GET']) def ready(): return jsonify({'OK': 'yes'}), 200 +@app.route('/metrics') +def metrics(): + return generate_latest(), 200, {'Content-Type': 'text/plain; charset=utf-8'} + @app.route('/refresh', methods=['GET']) +@admin_token_required def refresh(): refresh_data() return jsonify({'OK': 'yes'}), 200 @app.route('/users//features', methods=['GET']) +@admin_token_required def get_user_features(user_id): result = {} df_user = pd.DataFrame([{ 'user_id': user_id, }]) + user_features = user_features_store.get() + user_data = {} for name, df in user_features.items(): df = df[df['user_id'] == user_id] @@ -173,18 +251,20 @@ def get_user_features(user_id): @app.route('/predict', methods=['POST']) +@user_token_required def predict(): try: data = request.get_json() app.logger.info(f"predict scoring request: {data}") - user_id = data.get('user_id') + user_id = request.user_id item_features = data.get('item_features') if user_id is None: return jsonify({'error': 'Missing user_id'}), 400 - score = compute_score(user_id, item_features) + user_features = user_features_store.get() + score = compute_score(user_id, item_features, user_features) return jsonify({'score': score}) except Exception as e: app.logger.error(f"exception in predict endpoint: {request.get_json()}\n{e}") @@ -192,24 +272,20 @@ def predict(): @app.route('/batch', methods=['POST']) +@user_token_required def batch(): + start = timer() try: - result = {} data = request.get_json() - app.logger.info(f"batch scoring request: {data}") - - user_id = data.get('user_id') items = data.get('items') + user_id = request.user_id + if user_id == None: + return jsonify({'error': 'no user_id supplied'}), 400 + if len(items) > 101: + return jsonify({'error': f'too many items: {len(items)}'}), 400 + result = parallel_compute_scores(user_id, items) - if user_id is None: - return jsonify({'error': 'Missing user_id'}), 400 - - for key, item in items.items(): - print('key": ', key) - print('item: ', item) - library_item_id = item['library_item_id'] - result[library_item_id] = compute_score(user_id, item) - + app.logger.info(f'time to compute batch of {len(items)} items (in seconds): {timer() - start}') return jsonify(result) except Exception as e: app.logger.error(f"exception in batch endpoint: {request.get_json()}\n{e}") diff --git a/ml/digest-score/auth.py b/ml/digest-score/auth.py new file mode 100644 index 000000000..d4876bb0d --- /dev/null +++ b/ml/digest-score/auth.py @@ -0,0 +1,58 @@ +import os +import jwt +from flask import request, jsonify +from functools import wraps +from datetime import datetime, timedelta + + +SECRET_KEY = os.getenv('JWT_SECRET') +ADMIN_SECRET_KEY = os.getenv('JWT_ADMIN_SECRET_KEY') + +def generate_admin_token(): + expiration_time = datetime.utcnow() + timedelta(minutes=5) + payload = { + 'role': 'admin', + 'exp': expiration_time + } + + token = jwt.encode(payload, ADMIN_SECRET_KEY, algorithm="HS256") + return token + + +def user_token_required(f): + @wraps(f) + def decorated(*args, **kwargs): + token = None + if 'Authorization' in request.headers: + print("request.headers['Authorization'].split(" ")[1]", request.headers['Authorization'].split(" ")[1]) + token = request.headers['Authorization'].split(" ")[1] + if not token: + return jsonify({'message': 'Token is missing!'}), 401 + try: + data = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) + request.user_id = data['uid'] + except jwt.ExpiredSignatureError: + return jsonify({'message': 'Token has expired!'}), 401 + except jwt.InvalidTokenError: + return jsonify({'message': 'Token is invalid!'}), 401 + return f(*args, **kwargs) + return decorated + +def admin_token_required(f): + @wraps(f) + def decorated(*args, **kwargs): + token = None + if 'Authorization' in request.headers: + token = request.headers['Authorization'].split(" ")[1] + if not token: + return jsonify({'message': 'Token is missing!'}), 401 + try: + data = jwt.decode(token, ADMIN_SECRET_KEY, algorithms=["HS256"]) + if data['role'] != 'admin': + return jsonify({'message': 'Admin token required!'}), 403 + except jwt.ExpiredSignatureError: + return jsonify({'message': 'Token has expired!'}), 401 + except jwt.InvalidTokenError: + return jsonify({'message': 'Token is invalid!'}), 401 + return f(*args, **kwargs) + return decorated \ No newline at end of file diff --git a/ml/digest-score/features.py b/ml/digest-score/features.py index 47766c244..32e6af60c 100644 --- a/ml/digest-score/features.py +++ b/ml/digest-score/features.py @@ -7,6 +7,7 @@ from datetime import datetime, timedelta import os from io import BytesIO import tempfile +import requests import pyarrow as pa import pyarrow.parquet as pq @@ -15,16 +16,41 @@ from google.cloud import storage from features.extract import extract_and_upload_raw_data from features.user_history import generate_and_upload_user_history +from datetime import datetime, timezone +from auth import generate_admin_token + + +def call_refresh_api(api): + headers = { + 'Authorization': f'Bearer {generate_admin_token()}' + } + try: + response = requests.get(api, headers=headers, timeout=10) + if response.status_code == 200: + print("scoring service refreshed") + else: + print(f"failed to refresh scoring service: {response.status_code}") + except requests.exceptions.Timeout: + print(f"The request timed out after {timeout} seconds") + except requests.exceptions.RequestException as e: + print(f"An error occurred while refreshing scoring service: {e}") def main(): - execution_date = os.getenv('EXECUTION_DATE') + score_service = os.getenv("SCORING_SERVICE_URL") num_days_history = os.getenv('NUM_DAYS_HISTORY') gcs_bucket_name = os.getenv('GCS_BUCKET') + current_date_utc = datetime.now(timezone.utc) + execution_date = current_date_utc.strftime("%Y-%m-%d") + print(f'updating features using execution date: {execution_date}') + extract_and_upload_raw_data(execution_date, num_days_history, gcs_bucket_name) generate_and_upload_user_history(execution_date, gcs_bucket_name) + if score_service: + call_refresh_api(score_service) + print("done") if __name__ == "__main__": diff --git a/ml/digest-score/features/user_history.py b/ml/digest-score/features/user_history.py index 289ecc64d..9eed6eacc 100644 --- a/ml/digest-score/features/user_history.py +++ b/ml/digest-score/features/user_history.py @@ -17,65 +17,30 @@ import pyarrow.feather as feather from google.cloud import storage FEATURE_COLUMNS=[ - # targets - # 'user_clicked', 'user_read', 'user_long_read', - - # item attributes / user setup attributes - 'item_word_count','item_has_site_icon', 'is_subscription', - 'inbox_folder', 'has_author', - - # how the user has setup the subscription - 'is_newsletter', 'is_feed', 'days_since_subscribed', - 'subscription_count', 'subscription_auto_add_to_library', - 'subscription_fetch_content', - - # user/item interaction history - 'user_original_url_host_saved_count_week_1', - 'user_original_url_host_interaction_count_week_1', - 'user_original_url_host_rate_week_1', - 'user_original_url_host_proportion_week_1', - - 'user_original_url_host_saved_count_week_2', - 'user_original_url_host_interaction_count_week_2', - 'user_original_url_host_rate_week_2', - 'user_original_url_host_proportion_week_2', - 'user_original_url_host_saved_count_week_3', - 'user_original_url_host_interaction_count_week_3', - 'user_original_url_host_rate_week_3', - 'user_original_url_host_proportion_week_3', - 'user_original_url_host_saved_count_week_4', - 'user_original_url_host_interaction_count_week_4', - 'user_original_url_host_rate_week_4', - 'user_original_url_host_proportion_week_4', - - 'user_subscription_saved_count_week_1', - 'user_subscription_interaction_count_week_1', - 'user_subscription_rate_week_1', 'user_subscription_proportion_week_1', - 'user_site_saved_count_week_3', 'user_site_interaction_count_week_3', - 'user_site_rate_week_3', 'user_site_proportion_week_3', - 'user_site_saved_count_week_2', 'user_site_interaction_count_week_2', - 'user_site_rate_week_2', 'user_site_proportion_week_2', - 'user_subscription_saved_count_week_2', - 'user_subscription_interaction_count_week_2', - 'user_subscription_rate_week_2', 'user_subscription_proportion_week_2', - 'user_site_saved_count_week_1', 'user_site_interaction_count_week_1', - 'user_site_rate_week_1', 'user_site_proportion_week_1', - 'user_subscription_saved_count_week_3', - 'user_subscription_interaction_count_week_3', - 'user_subscription_rate_week_3', 'user_subscription_proportion_week_3', - 'user_author_saved_count_week_4', - 'user_author_interaction_count_week_4', 'user_author_rate_week_4', - 'user_author_proportion_week_4', 'user_author_saved_count_week_1', - 'user_author_interaction_count_week_1', 'user_author_rate_week_1', - 'user_author_proportion_week_1', 'user_site_saved_count_week_4', - 'user_site_interaction_count_week_4', 'user_site_rate_week_4', - 'user_site_proportion_week_4', 'user_author_saved_count_week_2', - 'user_author_interaction_count_week_2', 'user_author_rate_week_2', - 'user_author_proportion_week_2', 'user_author_saved_count_week_3', - 'user_author_interaction_count_week_3', 'user_author_rate_week_3', - 'user_author_proportion_week_3', 'user_subscription_saved_count_week_4', - 'user_subscription_interaction_count_week_4', - 'user_subscription_rate_week_4', 'user_subscription_proportion_week_4' + 'user_subscription_rate_week_1', + 'user_subscription_proportion_week_1', + 'user_site_rate_week_3', + 'user_site_proportion_week_3', + 'user_site_rate_week_2', + 'user_site_proportion_week_2', + 'user_subscription_rate_week_2', + 'user_subscription_proportion_week_2', + 'user_site_rate_week_1', + 'user_site_proportion_week_1', + 'user_subscription_rate_week_3', + 'user_subscription_proportion_week_3', + 'user_author_rate_week_4', + 'user_author_proportion_week_4', + 'user_author_rate_week_1', + 'user_author_proportion_week_1', + 'user_site_rate_week_4', + 'user_site_proportion_week_4', + 'user_author_rate_week_2', + 'user_author_proportion_week_2', + 'user_author_rate_week_3', + 'user_author_proportion_week_3', + 'user_subscription_rate_week_4', + 'user_subscription_proportion_week_4' ] def parquet_to_dataframe(file_path): diff --git a/ml/digest-score/requirements.txt b/ml/digest-score/requirements.txt index d62f7cf55..b7fbe1b94 100644 --- a/ml/digest-score/requirements.txt +++ b/ml/digest-score/requirements.txt @@ -6,5 +6,9 @@ google-cloud-storage flask pydantic sklearn2pmml -sqlalchemy +sqlalchemy pyarrow +requests +PyJWT +prometheus_client +xgboost==2.1.0 diff --git a/ml/digest-score/train.py b/ml/digest-score/train.py index db8deff84..3a3191554 100644 --- a/ml/digest-score/train.py +++ b/ml/digest-score/train.py @@ -1,17 +1,11 @@ -import pandas as pd import os +import pandas as pd import numpy as np from datetime import datetime, timedelta -from sklearn.linear_model import SGDClassifier -from sklearn.ensemble import RandomForestClassifier, VotingClassifier - -from sklearn.preprocessing import StandardScaler - -from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report, confusion_matrix -from sklearn.utils import shuffle +import xgboost as xgb +from sklearn.metrics import classification_report from sklearn.model_selection import train_test_split -from sklearn2pmml import PMMLPipeline, sklearn2pmml from google.cloud import storage from google.cloud.exceptions import PreconditionFailed @@ -23,13 +17,6 @@ import pyarrow.feather as feather from features.user_history import FEATURE_COLUMNS -DB_PARAMS = { - 'dbname': os.getenv('DB_NAME') or 'omnivore', - 'user': os.getenv('DB_USER'), - 'password': os.getenv('DB_PASSWORD'), - 'host': os.getenv('DB_HOST') or 'localhost', - 'port': os.getenv('DB_PORT') or '5432' -} def parquet_to_dataframe(file_path): table = pq.read_table(file_path) @@ -122,58 +109,15 @@ def prepare_data(df): return X, Y -def train_random_forest_model(X, Y): - model = RandomForestClassifier( - class_weight={0: 1, 1: 10}, - n_estimators=10, - max_depth=10, - random_state=42 - ) - scaler = StandardScaler() - X_scaled = scaler.fit_transform(X) +def train_xgb_model(X, Y): + X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=42) + model = xgb.XGBClassifier(max_depth=7, n_estimators=5) + model.fit(X_train, y_train) - X_train, X_test, Y_train, Y_test = train_test_split(X_scaled, Y, test_size=0.3, random_state=42) - - pipeline = PMMLPipeline([ - ("scaler", scaler), - ("classifier", model) - ]) - - pipeline.fit(X_train, Y_train) - - Y_pred = pipeline.predict(X_test) - print_classification_report(Y_test, Y_pred) - print_feature_importance(X, model) - - return pipeline - - -def print_feature_importance(X, rf): - # Get feature importances - importances = rf.feature_importances_ - - # Get the indices of the features sorted by importance - indices = np.argsort(importances)[::-1] - - # Print the feature ranking - print("Feature ranking:") - - for f in range(X.shape[1]): - print(f"{f + 1}. feature {indices[f]} ({importances[indices[f]]:.4f}) - {X.columns[indices[f]]}") - - - -def print_classification_report(Y_test, Y_pred): - report = classification_report(Y_test, Y_pred, target_names=['Not Clicked', 'Clicked'], output_dict=True) - print("Classification Report:") - print(f"Accuracy: {report['accuracy']:.4f}") - print(f"Precision (Not Clicked): {report['Not Clicked']['precision']:.4f}") - print(f"Recall (Not Clicked): {report['Not Clicked']['recall']:.4f}") - print(f"F1-Score (Not Clicked): {report['Not Clicked']['f1-score']:.4f}") - print(f"Precision (Clicked): {report['Clicked']['precision']:.4f}") - print(f"Recall (Clicked): {report['Clicked']['recall']:.4f}") - print(f"F1-Score (Clicked): {report['Clicked']['f1-score']:.4f}") + y_pred = model.predict(X_test) + print(classification_report(y_test, y_pred)) + return model def main(): @@ -181,24 +125,23 @@ def main(): num_days_history = os.getenv('NUM_DAYS_HISTORY') gcs_bucket_name = os.getenv('GCS_BUCKET') - raw_data_path = f'raw_library_items_${execution_date}.parquet' + raw_data_path = f'raw_library_items_{execution_date}.parquet' user_history_path = 'features_user_features.pkl' pipeline_path = 'predict_read_pipeline-v002.pkl' + model_path = 'predict_read_model-v003.pkl' download_from_gcs(gcs_bucket_name, f'data/features/user_features.pkl', user_history_path) download_from_gcs(gcs_bucket_name, f'data/raw/library_items_{execution_date}.parquet', raw_data_path) - sampled_raw_df = load_and_sample_library_items_from_parquet(raw_data_path, 0.10) + sampled_raw_df = load_and_sample_library_items_from_parquet(raw_data_path, 0.95) user_history = load_dataframes_from_pickle(user_history_path) merged_df = merge_user_preference_data(sampled_raw_df, user_history) - print("created merged data", merged_df.columns) - X, Y = prepare_data(merged_df) - random_forest_pipeline = train_random_forest_model(X, Y) - save_to_pickle(random_forest_pipeline, pipeline_path) - upload_to_gcs(gcs_bucket_name, pipeline_path, f'data/models/{pipeline_path}') + xgb_model = train_xgb_model(X, Y) + save_to_pickle(xgb_model, model_path) + upload_to_gcs(gcs_bucket_name, model_path, f'data/models/{model_path}') if __name__ == "__main__": diff --git a/packages/api/src/services/score.ts b/packages/api/src/services/score.ts index a601e63c1..78e04c568 100644 --- a/packages/api/src/services/score.ts +++ b/packages/api/src/services/score.ts @@ -3,6 +3,7 @@ import client from 'prom-client' import { env } from '../env' import { registerMetric } from '../prometheus' import { logError } from '../utils/logger' +import { createWebAuthToken } from '../routers/auth/jwt_helpers' export interface Feature { library_item_id?: string @@ -79,8 +80,13 @@ class ScoreClientImpl implements ScoreClient { const start = Date.now() try { + const authToken = await createWebAuthToken(data.user_id) + if (!authToken) { + throw Error('could not create auth token') + } const response = await axios.post(this.apiUrl, data, { headers: { + Authorization: `Bearer ${authToken}`, 'Content-Type': 'application/json', }, timeout: 20000, // 20 seconds