From cd842393f6cb761b72595e1a82e8e8b1c99d7e7a Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Wed, 29 May 2024 14:24:59 +0800 Subject: [PATCH 1/3] Add initial digest-score service --- ml/digest-score/README.md | 72 ++++++ ml/digest-score/app.py | 341 ++++++++++++++++++++++++++++ ml/digest-score/create-features.sql | 301 ++++++++++++++++++++++++ ml/digest-score/requirements.txt | 8 + ml/digest-score/train.py | 299 ++++++++++++++++++++++++ 5 files changed, 1021 insertions(+) create mode 100644 ml/digest-score/README.md create mode 100644 ml/digest-score/app.py create mode 100644 ml/digest-score/create-features.sql create mode 100644 ml/digest-score/requirements.txt create mode 100644 ml/digest-score/train.py diff --git a/ml/digest-score/README.md b/ml/digest-score/README.md new file mode 100644 index 000000000..4a424e2a9 --- /dev/null +++ b/ml/digest-score/README.md @@ -0,0 +1,72 @@ +# digest-score + +The digest-score is a combination of two scores: `interaction_score` which is how likely it is the user opens the library_item and `time_bonus_score` which is how fresh the article is. In the future we might remove the `time_bonus_score` and move that to the mixing step. + +## Creating features + +Currently the features are materialized views created in the public schema. Before moving to production we should create a `features` schema. To create all the materialized views locally run the `create-features.sql` file with the psql command: + +`psql omnivore -f create_feature_views.sql` + +## Training + +Currently we create a small random forest model, when running locally this will be saved to disk during training. The reason we use such a simple model is to focus on feature development. The model is mostly a guide to understand whether or not the features are relevant. + +To train the data run the following command: + +`NUM_DAYS_HISTORY=1000 SAMPLE_SIZE=100 python train.py` + +This will create a file: `predict_user_clicked_random_forest_pipeline-v001.pkl` + +## Running the service + +Now that there is a model created, you can run the service using the following command: + +`LOAD_LOCAL_MODEL=true python app.py` + +To test the model make a curl request using your user id, for example: + +### A single prediction + +``` +curl -d '{ "user_id": "2da52794-0dd2-11ef-9855-5f368b90f676", "item_features": { "site": "Omnivore Blog", "title": "this is a title", "author": "Tiago Forte", "subscription": "this is a subscriptionsdfsdfsdf", "has_thumbnail": true, "has_site_icon": true, "saved_at": "2024-05-27T04:20:47Z" }}' -H 'Content-Type: application/json' localhost:5000/predict +``` + +### A batch prediction + +curl -d '{ "user_id": "2da52794-0dd2-11ef-9855-5f368b90f676", "items": { "134f883e-efd8-11ee-ae98-532a6874855a": { "library_item_id": "134f883e-efd8-11ee-ae98-532a6874855a", "site": "TikTok", "title": "this is a title", "author": "Tiago Forte", "subscription": "this is a subscriptionsdfsdfsdf", "has_thumbnail": true, "has_site_icon": true, "saved_at": "2024-05-27T04:20:47Z" }} }' -H 'Content-Type: application/json' localhost:5000/batch + +### Make a prediction for a given user and library_item (for debugging only) + +``` +curl localhost:5000/users/2da52794-0dd2-11ef-9855-5f368b90f676/library_items/134f883e-efd8-11ee-ae98-532a6874855a/score +{ + "score": { + "interaction_score": 1.0, + "score": 0.8, + "time_bonus_score": 0.0 + } +} +``` + +### Print the user's profile data (for debugging only) + +``` +curl localhost:5000/users/2da52794-0dd2-11ef-9855-5f368b90f676/features +{ + "user_30d_interactions_site": { + "count": [ + { + "site": "TikTok", + "user_30d_interactions_site_count": 3 + } + ], + "rate": [ + { + "site": "TikTok", + "user_30d_interactions_site_rate": 0.75 + } + ] + } +} +``` diff --git a/ml/digest-score/app.py b/ml/digest-score/app.py new file mode 100644 index 000000000..1ba2a26e4 --- /dev/null +++ b/ml/digest-score/app.py @@ -0,0 +1,341 @@ +import psycopg2 + +import logging +from flask import Flask, request, jsonify +from pydantic import BaseModel, ConfigDict, ValidationError, conlist + +from typing import List + +import os +import sys +import json +import pytz +import numpy as np +import pandas as pd +import joblib +from datetime import datetime, timedelta +from datetime import datetime +import dateutil.parser +from google.cloud import storage + +app = Flask(__name__) +logging.basicConfig(level=logging.INFO, stream=sys.stdout) + + +TRAIN_FEATURES = [ + "item_has_thumbnail", + "item_has_site_icon", + + 'user_30d_interactions_author_count', + 'user_30d_interactions_site_count', + 'user_30d_interactions_subscription_count', + + 'user_30d_interactions_author_rate', + 'user_30d_interactions_site_rate', + 'user_30d_interactions_subscription_rate', + + 'global_30d_interactions_site_count', + 'global_30d_interactions_author_count', + 'global_30d_interactions_subscription_count', + + 'global_30d_interactions_site_rate', + 'global_30d_interactions_author_rate', + 'global_30d_interactions_subscription_rate' +] + +DB_PARAMS = { + 'dbname': os.getenv('DB_NAME') or 'omnivore', + 'user': os.getenv('DB_USER'), + 'password': os.getenv('DB_PASSWORD'), + 'host': os.getenv('DB_HOST') or 'localhost', + 'port': os.getenv('DB_PORT') or '5432' +} + +USER_FEATURES = { + "site": "user_30d_interactions_site", + "author": "user_30d_interactions_author", + "subscription": "user_30d_interactions_subscription", +} + +GLOBAL_FEATURES = { + "site": "global_30d_interactions_site", + "author": "global_30d_interactions_author", + "subscription": "global_30d_interactions_subscription", +} + +def download_from_gcs(bucket_name, gcs_path, destination_path): + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(gcs_path) + blob.download_to_filename(destination_path) + + +def load_pipeline(): + bucket_name = os.getenv('GCS_BUCKET') + pipeline_gcs_path = os.getenv('PIPELINE_GCS_PATH') + download_from_gcs(bucket_name, pipeline_gcs_path, '/tmp/pipeline.pkl') + pipeline = joblib.load('/tmp/pipeline.pkl') + return pipeline + + +def load_pipeline_local(): + pipeline = joblib.load('predict_user_clicked_random_forest_pipeline-v001.pkl') + return pipeline + + +def fetch_user_features(name, feature_name): + conn = psycopg2.connect(**DB_PARAMS) + cur = conn.cursor() + query = f"SELECT user_id, {name}, interactions, interaction_rate FROM {feature_name}" + + cur.execute(query) + data = cur.fetchall() + + cur.close() + conn.close() + columns = [ + "user_id", + name, + "interactions", + "interaction_rate" + ] + + rate_feature_name = f"{feature_name}_rate" + count_feature_name = f"{feature_name}_count" + + df_loaded = pd.DataFrame(data, columns=columns) + df_loaded = df_loaded.rename(columns={"interactions": count_feature_name}, errors="raise") + df_loaded = df_loaded.rename(columns={"interaction_rate": rate_feature_name}, errors="raise") + df_loaded[rate_feature_name] = df_loaded[rate_feature_name].fillna(0) + df_loaded[count_feature_name] = df_loaded[count_feature_name].fillna(0) + + return df_loaded + + +def fetch_global_features(name, feature_name): + conn = psycopg2.connect(**DB_PARAMS) + cur = conn.cursor() + query = f"SELECT {name}, interactions, interaction_rate FROM {feature_name}" + + cur.execute(query) + data = cur.fetchall() + + cur.close() + conn.close() + columns = [ + name, + "interactions", + "interaction_rate" + ] + + rate_feature_name = f"{feature_name}_rate" + count_feature_name = f"{feature_name}_count" + + df_loaded = pd.DataFrame(data, columns=columns) + df_loaded = df_loaded.rename(columns={"interactions": count_feature_name}, errors="raise") + df_loaded = df_loaded.rename(columns={"interaction_rate": rate_feature_name}, errors="raise") + df_loaded[rate_feature_name] = df_loaded[rate_feature_name].fillna(0) + df_loaded[count_feature_name] = df_loaded[count_feature_name].fillna(0) + + return df_loaded + + +def load_user_features(): + result = {} + for view_name in USER_FEATURES.keys(): + key_name = USER_FEATURES[view_name] + result[key_name] = fetch_user_features(view_name, key_name) + app.logger.info(f"loaded {len(result[key_name])} features for {key_name}") + return result + + +def load_global_features(): + result = {} + for view_name in GLOBAL_FEATURES.keys(): + key_name = GLOBAL_FEATURES[view_name] + result[key_name] = fetch_global_features(view_name, key_name) + app.logger.info(f"loaded {len(result[key_name])} features for {key_name}") + return result + + +def compute_score(user_id, item_features): + alpha = 0.2 + interaction_score = compute_interaction_score(user_id, item_features) + time_bonus_score = compute_time_bonus_score(item_features) + return { + 'score': (1 - alpha) * interaction_score + alpha * time_bonus_score, + 'interaction_score': interaction_score, + 'time_bonus_score': time_bonus_score + } + + +def compute_time_bonus_score(item_features): + saved_at = item_features['saved_at'] + current_time = datetime.now(pytz.utc) + time_diff_hours = (current_time - saved_at).total_seconds() / 3600 + max_diff_hours = 3 * 24 + if time_diff_hours >= max_diff_hours: + return 0.0 + else: + return max(0.0, min(1.0, 1 - (time_diff_hours / max_diff_hours))) + + +def compute_interaction_score(user_id, item_features): + print('item_features', item_features) + df_test = pd.DataFrame([{ + 'user_id': user_id, + 'author': item_features.get('author'), + 'site': item_features.get('site'), + 'subscription': item_features.get('subscription'), + + 'item_has_thumbnail': 1 if item_features.get('has_thumbnail') else 0, + "item_has_site_icon": 1 if item_features.get('has_site_icon') else 0, + }]) + + for name in USER_FEATURES.keys(): + feature_name = USER_FEATURES[name] + df_feature = user_features[feature_name] + df_test = df_test.merge(df_feature, on=['user_id', name], how='left') + df_test[f"{feature_name}_rate"] = df_test[f"{feature_name}_rate"].fillna(0) + df_test[f"{feature_name}_count"] = df_test[f"{feature_name}_count"].fillna(0) + + for name in GLOBAL_FEATURES.keys(): + feature_name = GLOBAL_FEATURES[name] + df_feature = global_features[feature_name] + df_test = df_test.merge(df_feature, on=name, how='left') + df_test[f"{feature_name}_rate"] = df_test[f"{feature_name}_rate"].fillna(0) + df_test[f"{feature_name}_count"] = df_test[f"{feature_name}_count"].fillna(0) + + df_predict = df_test[TRAIN_FEATURES] + + # Print out the columns with values, so we can know how sparse our data is + #scored_columns = df_predict.columns[(df_predict.notnull() & (df_predict != 0)).any()].tolist() + #print("scored columns", scored_columns) + interaction_score = pipeline.predict_proba(df_predict) + + return interaction_score[0][1] + + +def get_library_item(library_item_id): + conn = psycopg2.connect(**DB_PARAMS) + cur = conn.cursor() + query = """ + SELECT + li.title, + li.author, + li.saved_at, + li.site_name as site, + li.item_language as language, + li.subscription, + li.word_count, + li.directionality, + CASE WHEN li.thumbnail IS NOT NULL then 1 else 0 END as has_thumbnail, + CASE WHEN li.site_icon IS NOT NULL then 1 else 0 END as has_site_icon + FROM omnivore.library_item li + WHERE li.id = %s + """ + + cur.execute(query, (library_item_id,)) + + data = cur.fetchone() + columns = [desc[0] for desc in cur.description] + cur.close() + conn.close() + + if data: + item_dict = dict(zip(columns, data)) + return item_dict + else: + return None + + +@app.route('/_ah/health', methods=['GET']) +def ready(): + return jsonify({'OK': 'yes'}), 200 + + +@app.route('/users//features', methods=['GET']) +def get_user_features(user_id): + result = {} + + for name in USER_FEATURES.keys(): + feature_name = USER_FEATURES[name] + rate_feature_name = f"{feature_name}_rate" + count_feature_name = f"{feature_name}_count" + df_feature = user_features[feature_name] + df_filtered = df_feature[df_feature['user_id'] == user_id] + if not df_filtered.empty: + rate = df_filtered[[name, rate_feature_name]].dropna().to_dict(orient='records') + count = df_filtered[[name, count_feature_name]].dropna().to_dict(orient='records') + result[feature_name] = { + 'rate': rate, + 'count': count + } + + return jsonify(result), 200 + + +@app.route('/users//library_items//score', methods=['GET']) +def get_library_item_score(user_id, library_item_id): + item_features = get_library_item(library_item_id) + score = compute_score(user_id, item_features) + return jsonify({'score': score}) + + +@app.route('/predict', methods=['POST']) +def predict(): + try: + data = request.get_json() + app.logger.info(f"predict scoring request: {data}") + + user_id = data.get('user_id') + item_features = data.get('item_features') + item_features['saved_at'] = dateutil.parser.isoparse(item_features['saved_at']) + + if user_id is None: + return jsonify({'error': 'Missing user_id'}), 400 + + score = compute_score(user_id, item_features) + return jsonify({'score': score}) + except Exception as e: + app.logger.error(f"exception in predict endpoint: {request.get_json()}\n{e}") + return jsonify({'error': str(e)}), 500 + + +@app.route('/batch', methods=['POST']) +def batch(): + try: + result = {} + data = request.get_json() + app.logger.info(f"batch scoring request: {data}") + + user_id = data.get('user_id') + items = data.get('items') + + if user_id is None: + return jsonify({'error': 'Missing user_id'}), 400 + + for key, item in items.items(): + print('key": ', key) + print('item: ', item) + library_item_id = item['library_item_id'] + item['saved_at'] = dateutil.parser.isoparse(item['saved_at']) + result[library_item_id] = compute_score(user_id, item) + + return jsonify(result) + except Exception as e: + app.logger.error(f"exception in batch endpoint: {request.get_json()}\n{e}") + return jsonify({'error': str(e)}), 500 + + +if os.getenv('LOAD_LOCAL_MODEL'): + pipeline = load_pipeline_local() +else: + pipeline = load_pipeline() + +user_features = load_user_features() +global_features = load_global_features() + + +if __name__ == '__main__': + app.run(debug=True, port=5000) \ No newline at end of file diff --git a/ml/digest-score/create-features.sql b/ml/digest-score/create-features.sql new file mode 100644 index 000000000..cf90e679d --- /dev/null +++ b/ml/digest-score/create-features.sql @@ -0,0 +1,301 @@ + +DROP MATERIALIZED VIEW IF EXISTS user_30d_interactions_site ; +CREATE MATERIALIZED VIEW user_30d_interactions_site AS +WITH interactions AS ( + SELECT + li.user_id, + li.site_name AS site, + COUNT(*) AS interactions + FROM + omnivore.library_item li + WHERE + li.read_at IS NOT NULL AND + li.created_at >= NOW() - INTERVAL '30 DAYS' AND + li.created_at < NOW() + GROUP BY + li.user_id, li.site_name + HAVING COUNT(*) > 2 +), +total_items AS ( + SELECT + li.user_id, + li.site_name AS site, + COUNT(*) AS total_items + FROM + omnivore.library_item li + WHERE + li.created_at >= NOW() - INTERVAL '30 DAYS' AND + li.created_at < NOW() + GROUP BY + li.user_id, li.site_name +) +SELECT + i.user_id, + i.site, + i.interactions, + t.total_items, + (i.interactions::float / t.total_items) AS interaction_rate +FROM + interactions i +JOIN + total_items t ON i.user_id = t.user_id AND i.site = t.site; + + +DROP MATERIALIZED VIEW IF EXISTS user_30d_interactions_subscription ; +CREATE MATERIALIZED VIEW user_30d_interactions_subscription AS +WITH interactions AS ( + SELECT + li.user_id, + li.subscription, + COUNT(*) AS interactions + FROM + omnivore.library_item li + WHERE + li.read_at IS NOT NULL AND + li.created_at >= NOW() - INTERVAL '30 DAYS' AND + li.created_at < NOW() + GROUP BY + li.user_id, li.subscription + HAVING COUNT(*) > 2 +), +total_items AS ( + SELECT + li.user_id, + li.subscription, + COUNT(*) AS total_items + FROM + omnivore.library_item li + WHERE + li.created_at >= NOW() - INTERVAL '30 DAYS' AND + li.created_at < NOW() + GROUP BY + li.user_id, li.subscription +) +SELECT + i.user_id, + i.subscription, + i.interactions, + t.total_items, + (i.interactions::float / t.total_items) AS interaction_rate +FROM + interactions i +JOIN + total_items t ON i.user_id = t.user_id AND i.subscription = t.subscription; + + +DROP MATERIALIZED VIEW IF EXISTS user_30d_interactions_author ; +CREATE MATERIALIZED VIEW user_30d_interactions_author AS +WITH interactions AS ( + SELECT + li.user_id, + li.author, + COUNT(*) AS interactions + FROM + omnivore.library_item li + WHERE + li.read_at IS NOT NULL AND + li.created_at >= NOW() - INTERVAL '30 DAYS' AND + li.created_at < NOW() + GROUP BY + li.user_id, li.author + HAVING COUNT(*) > 2 +), +total_items AS ( + SELECT + li.user_id, + li.author, + COUNT(*) AS total_items + FROM + omnivore.library_item li + WHERE + li.created_at >= NOW() - INTERVAL '30 DAYS' AND + li.created_at < NOW() + GROUP BY + li.user_id, li.author +) +SELECT + i.user_id, + i.author, + i.interactions, + t.total_items, + (i.interactions::float / t.total_items) AS interaction_rate +FROM + interactions i +JOIN + total_items t ON i.user_id = t.user_id AND i.author = t.author; + + + +DROP MATERIALIZED VIEW IF EXISTS global_30d_interactions_site; +CREATE MATERIALIZED VIEW global_30d_interactions_site AS +WITH interactions AS ( + SELECT + li.site_name AS site, + COUNT(*) AS interactions + FROM + omnivore.library_item li + WHERE + li.read_at IS NOT NULL AND + li.site_name IS NOT NULL AND + li.created_at >= NOW() - INTERVAL '30 DAYS' AND + li.created_at < NOW() + GROUP BY + li.site_name + HAVING COUNT(*) > 3 +), +total_items AS ( + SELECT + li.site_name AS site, + COUNT(*) AS total_items + FROM + omnivore.library_item li + WHERE + li.site_name IS NOT NULL AND + li.created_at >= NOW() - INTERVAL '30 DAYS' AND + li.created_at < NOW() + GROUP BY + li.site_name +) +SELECT + i.site, + i.interactions, + t.total_items, + (i.interactions::float / t.total_items) AS interaction_rate +FROM + interactions i +JOIN + total_items t ON i.site = t.site; + + +DROP MATERIALIZED VIEW IF EXISTS global_30d_interactions_subscription ; +CREATE MATERIALIZED VIEW global_30d_interactions_subscription AS +SELECT + subscription, + COUNT(*) AS interactions +FROM + omnivore.library_item li +WHERE + li.read_at is not null AND + li.subscription is not NULL AND + li.created_at >= NOW() - INTERVAL '30 DAYS' AND + li.created_at < NOW() +GROUP BY + li.subscription + HAVING COUNT(*) > 3; + + +DROP MATERIALIZED VIEW IF EXISTS global_30d_interactions_subscription; +CREATE MATERIALIZED VIEW global_30d_interactions_subscription AS +WITH interactions AS ( + SELECT + li.subscription, + COUNT(*) AS interactions + FROM + omnivore.library_item li + WHERE + li.read_at IS NOT NULL AND + li.subscription IS NOT NULL AND + li.created_at >= NOW() - INTERVAL '30 DAYS' AND + li.created_at < NOW() + GROUP BY + li.subscription + HAVING COUNT(*) > 3 +), +total_items AS ( + SELECT + li.subscription, + COUNT(*) AS total_items + FROM + omnivore.library_item li + WHERE + li.subscription IS NOT NULL AND + li.created_at >= NOW() - INTERVAL '30 DAYS' AND + li.created_at < NOW() + GROUP BY + li.subscription +) +SELECT + i.subscription, + i.interactions, + t.total_items, + (i.interactions::float / t.total_items) AS interaction_rate +FROM + interactions i +JOIN + total_items t ON i.subscription = t.subscription; + + +DROP MATERIALIZED VIEW IF EXISTS global_30d_interactions_author; +CREATE MATERIALIZED VIEW global_30d_interactions_author AS +WITH interactions AS ( + SELECT + li.author, + COUNT(*) AS interactions + FROM + omnivore.library_item li + WHERE + li.read_at IS NOT NULL AND + li.author IS NOT NULL AND + li.created_at >= NOW() - INTERVAL '30 DAYS' AND + li.created_at < NOW() + GROUP BY + li.author + HAVING COUNT(*) > 3 +), +total_items AS ( + SELECT + li.author, + COUNT(*) AS total_items + FROM + omnivore.library_item li + WHERE + li.author IS NOT NULL AND + li.created_at >= NOW() - INTERVAL '30 DAYS' AND + li.created_at < NOW() + GROUP BY + li.author +) +SELECT + i.author, + i.interactions, + t.total_items, + (i.interactions::float / t.total_items) AS interaction_rate +FROM + interactions i +JOIN + total_items t ON i.author = t.author; + + +DROP MATERIALIZED VIEW IF EXISTS user_7d_activity ; +CREATE MATERIALIZED VIEW user_7d_activity AS +SELECT + li.id as library_item_id, + li.user_id, + li.created_at, + + li.folder as item_folder, + + li.item_type, + li.item_language AS language, + li.content_reader, + li.directionality, + li.word_count as item_word_count, + + CASE WHEN li.thumbnail IS NOT NULL then 1 else 0 END as item_has_thumbnail, + CASE WHEN li.site_icon IS NOT NULL then 1 else 0 END as item_has_site_icon, + + li.site_name AS site, + li.author, + li.subscription, + sub.type as item_subscription_type, + + CASE WHEN li.read_at is not NULL then 1 else 0 END as user_clicked, + CASE WHEN li.reading_progress_bottom_percent > 10 THEN 1 ELSE 0 END AS user_read, + CASE WHEN li.reading_progress_bottom_percent > 50 THEN 1 ELSE 0 END AS user_long_read + + FROM omnivore.library_item AS li + LEFT JOIN omnivore.subscriptions sub on li.subscription = sub.name AND sub.user_id = li.user_id +WHERE + li.created_at >= NOW() - INTERVAL '21 days' AND + li.created_at < NOW() + ; \ No newline at end of file diff --git a/ml/digest-score/requirements.txt b/ml/digest-score/requirements.txt new file mode 100644 index 000000000..b0e2f280e --- /dev/null +++ b/ml/digest-score/requirements.txt @@ -0,0 +1,8 @@ +psycopg2-binary +pandas +scikit-learn +joblib +google-cloud-storage +flask +pydantic +sklearn2pmml diff --git a/ml/digest-score/train.py b/ml/digest-score/train.py new file mode 100644 index 000000000..2789a2221 --- /dev/null +++ b/ml/digest-score/train.py @@ -0,0 +1,299 @@ +import psycopg2 +import pandas as pd +import joblib +from datetime import datetime + +import os +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import StandardScaler +from sklearn.ensemble import RandomForestClassifier +from sklearn2pmml import PMMLPipeline, sklearn2pmml +from sklearn.pipeline import Pipeline +from sklearn.metrics import accuracy_score, classification_report +import numpy as np + +from google.cloud import storage +from google.cloud.exceptions import PreconditionFailed + + +DB_PARAMS = { + 'dbname': os.getenv('DB_NAME') or 'omnivore', + 'user': os.getenv('DB_USER'), + 'password': os.getenv('DB_PASSWORD'), + 'host': os.getenv('DB_HOST') or 'localhost', + 'port': os.getenv('DB_PORT') or '5432' +} + + +TRAIN_FEATURES = [ + # "item_word_count", + "item_has_thumbnail", + "item_has_site_icon", + + 'user_30d_interactions_author_count', + 'user_30d_interactions_site_count', + 'user_30d_interactions_subscription_count', + + 'user_30d_interactions_author_rate', + 'user_30d_interactions_site_rate', + 'user_30d_interactions_subscription_rate', + + 'global_30d_interactions_site_count', + 'global_30d_interactions_author_count', + 'global_30d_interactions_subscription_count', + + 'global_30d_interactions_site_rate', + 'global_30d_interactions_author_rate', + 'global_30d_interactions_subscription_rate' +] + + +def fetch_data(sample_size): + # Connect to the PostgreSQL database + conn = psycopg2.connect(**DB_PARAMS) + cur = conn.cursor() + query = f""" + SELECT + user_id, + created_at, + item_folder, + item_type, + language, + content_reader, + directionality, + item_word_count, + item_has_thumbnail, + item_has_site_icon, + site, + author, + subscription, + item_subscription_type, + user_clicked, + user_read, + user_long_read + + FROM user_7d_activity LIMIT {sample_size} + """ + + cur.execute(query) + data = cur.fetchall() + + cur.close() + conn.close() + columns = [ + "user_id", + "created_at", + "item_folder", + "item_type", + "language", + "content_reader", + "directionality", + "item_word_count", + "item_has_thumbnail", + "item_has_site_icon", + "site", + "author", + "subscription", + "item_subscription_type", + "user_clicked", + "user_read", + "user_long_read", + ] + + df = pd.DataFrame(data, columns=columns) + return df + + +def add_user_features(df, name, feature_name): + conn = psycopg2.connect(**DB_PARAMS) + cur = conn.cursor() + query = f"SELECT user_id, {name}, interactions, interaction_rate FROM {feature_name}" + + cur.execute(query) + data = cur.fetchall() + + cur.close() + conn.close() + columns = [ + "user_id", + name, + "interactions", + "interaction_rate" + ] + + rate_feature_name = f"{feature_name}_rate" + count_feature_name = f"{feature_name}_count" + + df_loaded = pd.DataFrame(data, columns=columns) + df_loaded = df_loaded.rename(columns={"interactions": count_feature_name}, errors="raise") + df_loaded = df_loaded.rename(columns={"interaction_rate": rate_feature_name}, errors="raise") + df_merged = pd.merge(df, df_loaded[['user_id', name, rate_feature_name, count_feature_name]], on=['user_id',name], how='left') + + df_merged[rate_feature_name] = df_merged[rate_feature_name].fillna(0) + df_merged[count_feature_name] = df_merged[count_feature_name].fillna(0) + + return df_merged + + +def add_global_features(df, name, feature_name): + conn = psycopg2.connect(**DB_PARAMS) + cur = conn.cursor() + query = f"SELECT {name}, interactions, interaction_rate FROM {feature_name}" + + cur.execute(query) + data = cur.fetchall() + + cur.close() + conn.close() + columns = [ + name, + "interactions", + "interaction_rate" + ] + + rate_feature_name = f"{feature_name}_rate" + count_feature_name = f"{feature_name}_count" + + df_loaded = pd.DataFrame(data, columns=columns) + df_loaded = df_loaded.rename(columns={"interactions": count_feature_name}, errors="raise") + df_loaded = df_loaded.rename(columns={"interaction_rate": rate_feature_name}, errors="raise") + + df_merged = pd.merge(df, df_loaded[[name, count_feature_name, rate_feature_name]], on=name, how='left') + + df_merged[rate_feature_name] = df_merged[rate_feature_name].fillna(0) + df_merged[count_feature_name] = df_merged[count_feature_name].fillna(0) + return df_merged + + +def add_dummy_features(df): + known_folder_types = ['inbox', 'following'] + known_subscription_types = ['NEWSLETTER', 'RSS'] + # known_item_types = ['ARTICLE', 'BOOK', 'FILE', 'HIGHLIGHTS', 'IMAGE', 'PROFILE', 'TWEET', 'UNKNOWN','VIDEO','WEBSITE'] + #known_content_reader_types = ['WEB', 'PDF', 'EPUB'] + # known_directionality_types = ['LTR', 'RTL'] + + folder_dummies = pd.get_dummies(df['item_folder'], columns=known_subscription_types, prefix='item_folder') + subscription_type_dummies = pd.get_dummies(df['item_subscription_type'], columns=known_subscription_types, prefix='item_subscription_type') + + # item_type_dummies = pd.get_dummies(df['item_type'], columns=known_item_types, prefix='item_type') + # content_reader_dummies = pd.get_dummies(df['content_reader'], columns=known_content_reader_types, prefix='content_reader') + # directionality_dummies = pd.get_dummies(df['directionality'], columns=known_directionality_types, prefix='directionality') + # language_dummies = pd.get_dummies(df['language'], prefix='language') + + # if 'title_topic' in df.columns: + # title_topic_dummies = pd.get_dummies(df['title_topic'], prefix='title_topic') + + new_feature_names = list(subscription_type_dummies.columns) + list(folder_dummies.columns) + print("NEW FEATURE NAMES: ", new_feature_names) + # new_feature_names = list(item_type_dummies.columns) + list(content_reader_dummies.columns) + \ + # list(directionality_dummies.columns) + list(language_dummies.columns) + + # if 'title_topic' in df.columns: + # new_feature_names += list(title_topic_dummies.columns) + # , title_topic_dummies + return pd.concat([df, subscription_type_dummies, folder_dummies], axis=1), new_feature_names + + +def random_forest_predictor(df, feature_columns, user_interaction): + features = df[feature_columns] + + features = features.fillna(0) + target = df[user_interaction] + + X = features + y = target.values + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) + scaler = StandardScaler() + rf_classifier = RandomForestClassifier(n_estimators=50, max_depth=10, random_state=42) + + pipeline = PMMLPipeline([ + ("scaler", scaler), + ("classifier", rf_classifier) + ]) + pipeline.fit(X_train, y_train) + + y_pred = pipeline.predict(X_test) + + feature_importance = rf_classifier.feature_importances_ + + print("Feature Importance:") + for feature, importance in zip(feature_columns, feature_importance): + print(f"{feature}: {importance}") + + print("\nClassification Report:") + print(classification_report(y_test, y_pred)) + + return pipeline + + +def save_and_upload_model(pipeline, target_interaction_type, bucket_name): + pipeline_file_name = f'predict_{target_interaction_type}_random_forest_pipeline-v001.pkl' + joblib.dump(pipeline, pipeline_file_name) + + if bucket_name: + upload_to_gcs(bucket_name, pipeline_file_name, f'models/{pipeline_file_name}') + else: + print("No GCS credentials so i am not uploading") + + +def upload_to_gcs(bucket_name, source_file_name, destination_blob_name): + """Uploads a file to the bucket.""" + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(destination_blob_name) + blob.upload_from_filename(source_file_name) + + print(f"File {source_file_name} uploaded to {destination_blob_name}.") + + +def resample_data(df): + print("Initial distribution:\n", df['user_clicked'].value_counts()) + + # Separate the majority and minority classes + df_majority = df[df['user_clicked'] == False] + df_minority = df[df['user_clicked'] == True] + + # Resample the minority class + df_minority_oversampled = df_minority.sample(n=len(df_majority), replace=True, random_state=42) + + # Combine the majority class with the oversampled minority class + df_balanced = pd.concat([df_majority, df_minority_oversampled]) + + # Shuffle the DataFrame to mix the classes + df_balanced = df_balanced.sample(frac=1, random_state=42).reset_index(drop=True) + + # Check the new distribution + print("Balanced distribution:\n", df_balanced['user_clicked'].value_counts()) + + # Display the first few rows of the balanced DataFrame + print(df_balanced.head()) + + return df_balanced + +def main(): + sample_size = int(os.getenv('SAMPLE_SIZE')) or 1000 + num_days_history = int(os.getenv('NUM_DAYS_HISTORY')) or 21 + gcs_bucket = os.getenv('GCS_BUCKET') + + print("about to fetch library data") + df = fetch_data(sample_size) + print("FETCHED", df) + + df = add_user_features(df, 'author', 'user_30d_interactions_author') + df = add_user_features(df, 'site', 'user_30d_interactions_site') + df = add_user_features(df, 'subscription', 'user_30d_interactions_subscription') + df = add_global_features(df, 'site', 'global_30d_interactions_site') + df = add_global_features(df, 'author', 'global_30d_interactions_author') + df = add_global_features(df, 'subscription', 'global_30d_interactions_subscription') + + df = resample_data(df) + print("training RandomForest with number of library_items: ", len(df)) + pipeline = random_forest_predictor(df, TRAIN_FEATURES, 'user_clicked') + + print(f"uploading model and scaler to {gcs_bucket}") + save_and_upload_model(pipeline, 'user_clicked', gcs_bucket) + + print("done") + +if __name__ == "__main__": + main() \ No newline at end of file From a302a97b836e04c7cff5e4272fe6cf88658f0318 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Wed, 29 May 2024 15:17:05 +0800 Subject: [PATCH 2/3] Remove the time_bonus_score as these scores are precomputed --- ml/digest-score/app.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/ml/digest-score/app.py b/ml/digest-score/app.py index 1ba2a26e4..0d1765525 100644 --- a/ml/digest-score/app.py +++ b/ml/digest-score/app.py @@ -159,13 +159,10 @@ def load_global_features(): def compute_score(user_id, item_features): - alpha = 0.2 interaction_score = compute_interaction_score(user_id, item_features) - time_bonus_score = compute_time_bonus_score(item_features) return { - 'score': (1 - alpha) * interaction_score + alpha * time_bonus_score, + 'score': interaction_score, 'interaction_score': interaction_score, - 'time_bonus_score': time_bonus_score } From c74429125146487e97393baa21342ba8273bd1a4 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Wed, 29 May 2024 15:23:46 +0800 Subject: [PATCH 3/3] Update docs --- ml/digest-score/README.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ml/digest-score/README.md b/ml/digest-score/README.md index 4a424e2a9..257063d68 100644 --- a/ml/digest-score/README.md +++ b/ml/digest-score/README.md @@ -1,6 +1,6 @@ # digest-score -The digest-score is a combination of two scores: `interaction_score` which is how likely it is the user opens the library_item and `time_bonus_score` which is how fresh the article is. In the future we might remove the `time_bonus_score` and move that to the mixing step. +The digest-score is the result of calculating the user's `interaction_score` which is how likely it is the user opens the library_item. ## Creating features @@ -42,9 +42,8 @@ curl -d '{ "user_id": "2da52794-0dd2-11ef-9855-5f368b90f676", "items": { "134f88 curl localhost:5000/users/2da52794-0dd2-11ef-9855-5f368b90f676/library_items/134f883e-efd8-11ee-ae98-532a6874855a/score { "score": { - "interaction_score": 1.0, "score": 0.8, - "time_bonus_score": 0.0 + "interaction_score": 1.0, } } ```