Add initial digest-score service

This commit is contained in:
Jackson Harper
2024-05-29 14:24:59 +08:00
parent e30157bffa
commit cd842393f6
5 changed files with 1021 additions and 0 deletions

72
ml/digest-score/README.md Normal file
View File

@ -0,0 +1,72 @@
# digest-score
The digest-score is a combination of two scores: `interaction_score` which is how likely it is the user opens the library_item and `time_bonus_score` which is how fresh the article is. In the future we might remove the `time_bonus_score` and move that to the mixing step.
## Creating features
Currently the features are materialized views created in the public schema. Before moving to production we should create a `features` schema. To create all the materialized views locally run the `create-features.sql` file with the psql command:
`psql omnivore -f create_feature_views.sql`
## Training
Currently we create a small random forest model, when running locally this will be saved to disk during training. The reason we use such a simple model is to focus on feature development. The model is mostly a guide to understand whether or not the features are relevant.
To train the data run the following command:
`NUM_DAYS_HISTORY=1000 SAMPLE_SIZE=100 python train.py`
This will create a file: `predict_user_clicked_random_forest_pipeline-v001.pkl`
## Running the service
Now that there is a model created, you can run the service using the following command:
`LOAD_LOCAL_MODEL=true python app.py`
To test the model make a curl request using your user id, for example:
### A single prediction
```
curl -d '{ "user_id": "2da52794-0dd2-11ef-9855-5f368b90f676", "item_features": { "site": "Omnivore Blog", "title": "this is a title", "author": "Tiago Forte", "subscription": "this is a subscriptionsdfsdfsdf", "has_thumbnail": true, "has_site_icon": true, "saved_at": "2024-05-27T04:20:47Z" }}' -H 'Content-Type: application/json' localhost:5000/predict
```
### A batch prediction
curl -d '{ "user_id": "2da52794-0dd2-11ef-9855-5f368b90f676", "items": { "134f883e-efd8-11ee-ae98-532a6874855a": { "library_item_id": "134f883e-efd8-11ee-ae98-532a6874855a", "site": "TikTok", "title": "this is a title", "author": "Tiago Forte", "subscription": "this is a subscriptionsdfsdfsdf", "has_thumbnail": true, "has_site_icon": true, "saved_at": "2024-05-27T04:20:47Z" }} }' -H 'Content-Type: application/json' localhost:5000/batch
### Make a prediction for a given user and library_item (for debugging only)
```
curl localhost:5000/users/2da52794-0dd2-11ef-9855-5f368b90f676/library_items/134f883e-efd8-11ee-ae98-532a6874855a/score
{
"score": {
"interaction_score": 1.0,
"score": 0.8,
"time_bonus_score": 0.0
}
}
```
### Print the user's profile data (for debugging only)
```
curl localhost:5000/users/2da52794-0dd2-11ef-9855-5f368b90f676/features
{
"user_30d_interactions_site": {
"count": [
{
"site": "TikTok",
"user_30d_interactions_site_count": 3
}
],
"rate": [
{
"site": "TikTok",
"user_30d_interactions_site_rate": 0.75
}
]
}
}
```

341
ml/digest-score/app.py Normal file
View File

@ -0,0 +1,341 @@
import psycopg2
import logging
from flask import Flask, request, jsonify
from pydantic import BaseModel, ConfigDict, ValidationError, conlist
from typing import List
import os
import sys
import json
import pytz
import numpy as np
import pandas as pd
import joblib
from datetime import datetime, timedelta
from datetime import datetime
import dateutil.parser
from google.cloud import storage
app = Flask(__name__)
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
TRAIN_FEATURES = [
"item_has_thumbnail",
"item_has_site_icon",
'user_30d_interactions_author_count',
'user_30d_interactions_site_count',
'user_30d_interactions_subscription_count',
'user_30d_interactions_author_rate',
'user_30d_interactions_site_rate',
'user_30d_interactions_subscription_rate',
'global_30d_interactions_site_count',
'global_30d_interactions_author_count',
'global_30d_interactions_subscription_count',
'global_30d_interactions_site_rate',
'global_30d_interactions_author_rate',
'global_30d_interactions_subscription_rate'
]
DB_PARAMS = {
'dbname': os.getenv('DB_NAME') or 'omnivore',
'user': os.getenv('DB_USER'),
'password': os.getenv('DB_PASSWORD'),
'host': os.getenv('DB_HOST') or 'localhost',
'port': os.getenv('DB_PORT') or '5432'
}
USER_FEATURES = {
"site": "user_30d_interactions_site",
"author": "user_30d_interactions_author",
"subscription": "user_30d_interactions_subscription",
}
GLOBAL_FEATURES = {
"site": "global_30d_interactions_site",
"author": "global_30d_interactions_author",
"subscription": "global_30d_interactions_subscription",
}
def download_from_gcs(bucket_name, gcs_path, destination_path):
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(gcs_path)
blob.download_to_filename(destination_path)
def load_pipeline():
bucket_name = os.getenv('GCS_BUCKET')
pipeline_gcs_path = os.getenv('PIPELINE_GCS_PATH')
download_from_gcs(bucket_name, pipeline_gcs_path, '/tmp/pipeline.pkl')
pipeline = joblib.load('/tmp/pipeline.pkl')
return pipeline
def load_pipeline_local():
pipeline = joblib.load('predict_user_clicked_random_forest_pipeline-v001.pkl')
return pipeline
def fetch_user_features(name, feature_name):
conn = psycopg2.connect(**DB_PARAMS)
cur = conn.cursor()
query = f"SELECT user_id, {name}, interactions, interaction_rate FROM {feature_name}"
cur.execute(query)
data = cur.fetchall()
cur.close()
conn.close()
columns = [
"user_id",
name,
"interactions",
"interaction_rate"
]
rate_feature_name = f"{feature_name}_rate"
count_feature_name = f"{feature_name}_count"
df_loaded = pd.DataFrame(data, columns=columns)
df_loaded = df_loaded.rename(columns={"interactions": count_feature_name}, errors="raise")
df_loaded = df_loaded.rename(columns={"interaction_rate": rate_feature_name}, errors="raise")
df_loaded[rate_feature_name] = df_loaded[rate_feature_name].fillna(0)
df_loaded[count_feature_name] = df_loaded[count_feature_name].fillna(0)
return df_loaded
def fetch_global_features(name, feature_name):
conn = psycopg2.connect(**DB_PARAMS)
cur = conn.cursor()
query = f"SELECT {name}, interactions, interaction_rate FROM {feature_name}"
cur.execute(query)
data = cur.fetchall()
cur.close()
conn.close()
columns = [
name,
"interactions",
"interaction_rate"
]
rate_feature_name = f"{feature_name}_rate"
count_feature_name = f"{feature_name}_count"
df_loaded = pd.DataFrame(data, columns=columns)
df_loaded = df_loaded.rename(columns={"interactions": count_feature_name}, errors="raise")
df_loaded = df_loaded.rename(columns={"interaction_rate": rate_feature_name}, errors="raise")
df_loaded[rate_feature_name] = df_loaded[rate_feature_name].fillna(0)
df_loaded[count_feature_name] = df_loaded[count_feature_name].fillna(0)
return df_loaded
def load_user_features():
result = {}
for view_name in USER_FEATURES.keys():
key_name = USER_FEATURES[view_name]
result[key_name] = fetch_user_features(view_name, key_name)
app.logger.info(f"loaded {len(result[key_name])} features for {key_name}")
return result
def load_global_features():
result = {}
for view_name in GLOBAL_FEATURES.keys():
key_name = GLOBAL_FEATURES[view_name]
result[key_name] = fetch_global_features(view_name, key_name)
app.logger.info(f"loaded {len(result[key_name])} features for {key_name}")
return result
def compute_score(user_id, item_features):
alpha = 0.2
interaction_score = compute_interaction_score(user_id, item_features)
time_bonus_score = compute_time_bonus_score(item_features)
return {
'score': (1 - alpha) * interaction_score + alpha * time_bonus_score,
'interaction_score': interaction_score,
'time_bonus_score': time_bonus_score
}
def compute_time_bonus_score(item_features):
saved_at = item_features['saved_at']
current_time = datetime.now(pytz.utc)
time_diff_hours = (current_time - saved_at).total_seconds() / 3600
max_diff_hours = 3 * 24
if time_diff_hours >= max_diff_hours:
return 0.0
else:
return max(0.0, min(1.0, 1 - (time_diff_hours / max_diff_hours)))
def compute_interaction_score(user_id, item_features):
print('item_features', item_features)
df_test = pd.DataFrame([{
'user_id': user_id,
'author': item_features.get('author'),
'site': item_features.get('site'),
'subscription': item_features.get('subscription'),
'item_has_thumbnail': 1 if item_features.get('has_thumbnail') else 0,
"item_has_site_icon": 1 if item_features.get('has_site_icon') else 0,
}])
for name in USER_FEATURES.keys():
feature_name = USER_FEATURES[name]
df_feature = user_features[feature_name]
df_test = df_test.merge(df_feature, on=['user_id', name], how='left')
df_test[f"{feature_name}_rate"] = df_test[f"{feature_name}_rate"].fillna(0)
df_test[f"{feature_name}_count"] = df_test[f"{feature_name}_count"].fillna(0)
for name in GLOBAL_FEATURES.keys():
feature_name = GLOBAL_FEATURES[name]
df_feature = global_features[feature_name]
df_test = df_test.merge(df_feature, on=name, how='left')
df_test[f"{feature_name}_rate"] = df_test[f"{feature_name}_rate"].fillna(0)
df_test[f"{feature_name}_count"] = df_test[f"{feature_name}_count"].fillna(0)
df_predict = df_test[TRAIN_FEATURES]
# Print out the columns with values, so we can know how sparse our data is
#scored_columns = df_predict.columns[(df_predict.notnull() & (df_predict != 0)).any()].tolist()
#print("scored columns", scored_columns)
interaction_score = pipeline.predict_proba(df_predict)
return interaction_score[0][1]
def get_library_item(library_item_id):
conn = psycopg2.connect(**DB_PARAMS)
cur = conn.cursor()
query = """
SELECT
li.title,
li.author,
li.saved_at,
li.site_name as site,
li.item_language as language,
li.subscription,
li.word_count,
li.directionality,
CASE WHEN li.thumbnail IS NOT NULL then 1 else 0 END as has_thumbnail,
CASE WHEN li.site_icon IS NOT NULL then 1 else 0 END as has_site_icon
FROM omnivore.library_item li
WHERE li.id = %s
"""
cur.execute(query, (library_item_id,))
data = cur.fetchone()
columns = [desc[0] for desc in cur.description]
cur.close()
conn.close()
if data:
item_dict = dict(zip(columns, data))
return item_dict
else:
return None
@app.route('/_ah/health', methods=['GET'])
def ready():
return jsonify({'OK': 'yes'}), 200
@app.route('/users/<user_id>/features', methods=['GET'])
def get_user_features(user_id):
result = {}
for name in USER_FEATURES.keys():
feature_name = USER_FEATURES[name]
rate_feature_name = f"{feature_name}_rate"
count_feature_name = f"{feature_name}_count"
df_feature = user_features[feature_name]
df_filtered = df_feature[df_feature['user_id'] == user_id]
if not df_filtered.empty:
rate = df_filtered[[name, rate_feature_name]].dropna().to_dict(orient='records')
count = df_filtered[[name, count_feature_name]].dropna().to_dict(orient='records')
result[feature_name] = {
'rate': rate,
'count': count
}
return jsonify(result), 200
@app.route('/users/<user_id>/library_items/<library_item_id>/score', methods=['GET'])
def get_library_item_score(user_id, library_item_id):
item_features = get_library_item(library_item_id)
score = compute_score(user_id, item_features)
return jsonify({'score': score})
@app.route('/predict', methods=['POST'])
def predict():
try:
data = request.get_json()
app.logger.info(f"predict scoring request: {data}")
user_id = data.get('user_id')
item_features = data.get('item_features')
item_features['saved_at'] = dateutil.parser.isoparse(item_features['saved_at'])
if user_id is None:
return jsonify({'error': 'Missing user_id'}), 400
score = compute_score(user_id, item_features)
return jsonify({'score': score})
except Exception as e:
app.logger.error(f"exception in predict endpoint: {request.get_json()}\n{e}")
return jsonify({'error': str(e)}), 500
@app.route('/batch', methods=['POST'])
def batch():
try:
result = {}
data = request.get_json()
app.logger.info(f"batch scoring request: {data}")
user_id = data.get('user_id')
items = data.get('items')
if user_id is None:
return jsonify({'error': 'Missing user_id'}), 400
for key, item in items.items():
print('key": ', key)
print('item: ', item)
library_item_id = item['library_item_id']
item['saved_at'] = dateutil.parser.isoparse(item['saved_at'])
result[library_item_id] = compute_score(user_id, item)
return jsonify(result)
except Exception as e:
app.logger.error(f"exception in batch endpoint: {request.get_json()}\n{e}")
return jsonify({'error': str(e)}), 500
if os.getenv('LOAD_LOCAL_MODEL'):
pipeline = load_pipeline_local()
else:
pipeline = load_pipeline()
user_features = load_user_features()
global_features = load_global_features()
if __name__ == '__main__':
app.run(debug=True, port=5000)

View File

@ -0,0 +1,301 @@
DROP MATERIALIZED VIEW IF EXISTS user_30d_interactions_site ;
CREATE MATERIALIZED VIEW user_30d_interactions_site AS
WITH interactions AS (
SELECT
li.user_id,
li.site_name AS site,
COUNT(*) AS interactions
FROM
omnivore.library_item li
WHERE
li.read_at IS NOT NULL AND
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
li.created_at < NOW()
GROUP BY
li.user_id, li.site_name
HAVING COUNT(*) > 2
),
total_items AS (
SELECT
li.user_id,
li.site_name AS site,
COUNT(*) AS total_items
FROM
omnivore.library_item li
WHERE
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
li.created_at < NOW()
GROUP BY
li.user_id, li.site_name
)
SELECT
i.user_id,
i.site,
i.interactions,
t.total_items,
(i.interactions::float / t.total_items) AS interaction_rate
FROM
interactions i
JOIN
total_items t ON i.user_id = t.user_id AND i.site = t.site;
DROP MATERIALIZED VIEW IF EXISTS user_30d_interactions_subscription ;
CREATE MATERIALIZED VIEW user_30d_interactions_subscription AS
WITH interactions AS (
SELECT
li.user_id,
li.subscription,
COUNT(*) AS interactions
FROM
omnivore.library_item li
WHERE
li.read_at IS NOT NULL AND
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
li.created_at < NOW()
GROUP BY
li.user_id, li.subscription
HAVING COUNT(*) > 2
),
total_items AS (
SELECT
li.user_id,
li.subscription,
COUNT(*) AS total_items
FROM
omnivore.library_item li
WHERE
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
li.created_at < NOW()
GROUP BY
li.user_id, li.subscription
)
SELECT
i.user_id,
i.subscription,
i.interactions,
t.total_items,
(i.interactions::float / t.total_items) AS interaction_rate
FROM
interactions i
JOIN
total_items t ON i.user_id = t.user_id AND i.subscription = t.subscription;
DROP MATERIALIZED VIEW IF EXISTS user_30d_interactions_author ;
CREATE MATERIALIZED VIEW user_30d_interactions_author AS
WITH interactions AS (
SELECT
li.user_id,
li.author,
COUNT(*) AS interactions
FROM
omnivore.library_item li
WHERE
li.read_at IS NOT NULL AND
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
li.created_at < NOW()
GROUP BY
li.user_id, li.author
HAVING COUNT(*) > 2
),
total_items AS (
SELECT
li.user_id,
li.author,
COUNT(*) AS total_items
FROM
omnivore.library_item li
WHERE
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
li.created_at < NOW()
GROUP BY
li.user_id, li.author
)
SELECT
i.user_id,
i.author,
i.interactions,
t.total_items,
(i.interactions::float / t.total_items) AS interaction_rate
FROM
interactions i
JOIN
total_items t ON i.user_id = t.user_id AND i.author = t.author;
DROP MATERIALIZED VIEW IF EXISTS global_30d_interactions_site;
CREATE MATERIALIZED VIEW global_30d_interactions_site AS
WITH interactions AS (
SELECT
li.site_name AS site,
COUNT(*) AS interactions
FROM
omnivore.library_item li
WHERE
li.read_at IS NOT NULL AND
li.site_name IS NOT NULL AND
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
li.created_at < NOW()
GROUP BY
li.site_name
HAVING COUNT(*) > 3
),
total_items AS (
SELECT
li.site_name AS site,
COUNT(*) AS total_items
FROM
omnivore.library_item li
WHERE
li.site_name IS NOT NULL AND
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
li.created_at < NOW()
GROUP BY
li.site_name
)
SELECT
i.site,
i.interactions,
t.total_items,
(i.interactions::float / t.total_items) AS interaction_rate
FROM
interactions i
JOIN
total_items t ON i.site = t.site;
DROP MATERIALIZED VIEW IF EXISTS global_30d_interactions_subscription ;
CREATE MATERIALIZED VIEW global_30d_interactions_subscription AS
SELECT
subscription,
COUNT(*) AS interactions
FROM
omnivore.library_item li
WHERE
li.read_at is not null AND
li.subscription is not NULL AND
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
li.created_at < NOW()
GROUP BY
li.subscription
HAVING COUNT(*) > 3;
DROP MATERIALIZED VIEW IF EXISTS global_30d_interactions_subscription;
CREATE MATERIALIZED VIEW global_30d_interactions_subscription AS
WITH interactions AS (
SELECT
li.subscription,
COUNT(*) AS interactions
FROM
omnivore.library_item li
WHERE
li.read_at IS NOT NULL AND
li.subscription IS NOT NULL AND
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
li.created_at < NOW()
GROUP BY
li.subscription
HAVING COUNT(*) > 3
),
total_items AS (
SELECT
li.subscription,
COUNT(*) AS total_items
FROM
omnivore.library_item li
WHERE
li.subscription IS NOT NULL AND
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
li.created_at < NOW()
GROUP BY
li.subscription
)
SELECT
i.subscription,
i.interactions,
t.total_items,
(i.interactions::float / t.total_items) AS interaction_rate
FROM
interactions i
JOIN
total_items t ON i.subscription = t.subscription;
DROP MATERIALIZED VIEW IF EXISTS global_30d_interactions_author;
CREATE MATERIALIZED VIEW global_30d_interactions_author AS
WITH interactions AS (
SELECT
li.author,
COUNT(*) AS interactions
FROM
omnivore.library_item li
WHERE
li.read_at IS NOT NULL AND
li.author IS NOT NULL AND
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
li.created_at < NOW()
GROUP BY
li.author
HAVING COUNT(*) > 3
),
total_items AS (
SELECT
li.author,
COUNT(*) AS total_items
FROM
omnivore.library_item li
WHERE
li.author IS NOT NULL AND
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
li.created_at < NOW()
GROUP BY
li.author
)
SELECT
i.author,
i.interactions,
t.total_items,
(i.interactions::float / t.total_items) AS interaction_rate
FROM
interactions i
JOIN
total_items t ON i.author = t.author;
DROP MATERIALIZED VIEW IF EXISTS user_7d_activity ;
CREATE MATERIALIZED VIEW user_7d_activity AS
SELECT
li.id as library_item_id,
li.user_id,
li.created_at,
li.folder as item_folder,
li.item_type,
li.item_language AS language,
li.content_reader,
li.directionality,
li.word_count as item_word_count,
CASE WHEN li.thumbnail IS NOT NULL then 1 else 0 END as item_has_thumbnail,
CASE WHEN li.site_icon IS NOT NULL then 1 else 0 END as item_has_site_icon,
li.site_name AS site,
li.author,
li.subscription,
sub.type as item_subscription_type,
CASE WHEN li.read_at is not NULL then 1 else 0 END as user_clicked,
CASE WHEN li.reading_progress_bottom_percent > 10 THEN 1 ELSE 0 END AS user_read,
CASE WHEN li.reading_progress_bottom_percent > 50 THEN 1 ELSE 0 END AS user_long_read
FROM omnivore.library_item AS li
LEFT JOIN omnivore.subscriptions sub on li.subscription = sub.name AND sub.user_id = li.user_id
WHERE
li.created_at >= NOW() - INTERVAL '21 days' AND
li.created_at < NOW()
;

View File

@ -0,0 +1,8 @@
psycopg2-binary
pandas
scikit-learn
joblib
google-cloud-storage
flask
pydantic
sklearn2pmml

299
ml/digest-score/train.py Normal file
View File

@ -0,0 +1,299 @@
import psycopg2
import pandas as pd
import joblib
from datetime import datetime
import os
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn2pmml import PMMLPipeline, sklearn2pmml
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, classification_report
import numpy as np
from google.cloud import storage
from google.cloud.exceptions import PreconditionFailed
DB_PARAMS = {
'dbname': os.getenv('DB_NAME') or 'omnivore',
'user': os.getenv('DB_USER'),
'password': os.getenv('DB_PASSWORD'),
'host': os.getenv('DB_HOST') or 'localhost',
'port': os.getenv('DB_PORT') or '5432'
}
TRAIN_FEATURES = [
# "item_word_count",
"item_has_thumbnail",
"item_has_site_icon",
'user_30d_interactions_author_count',
'user_30d_interactions_site_count',
'user_30d_interactions_subscription_count',
'user_30d_interactions_author_rate',
'user_30d_interactions_site_rate',
'user_30d_interactions_subscription_rate',
'global_30d_interactions_site_count',
'global_30d_interactions_author_count',
'global_30d_interactions_subscription_count',
'global_30d_interactions_site_rate',
'global_30d_interactions_author_rate',
'global_30d_interactions_subscription_rate'
]
def fetch_data(sample_size):
# Connect to the PostgreSQL database
conn = psycopg2.connect(**DB_PARAMS)
cur = conn.cursor()
query = f"""
SELECT
user_id,
created_at,
item_folder,
item_type,
language,
content_reader,
directionality,
item_word_count,
item_has_thumbnail,
item_has_site_icon,
site,
author,
subscription,
item_subscription_type,
user_clicked,
user_read,
user_long_read
FROM user_7d_activity LIMIT {sample_size}
"""
cur.execute(query)
data = cur.fetchall()
cur.close()
conn.close()
columns = [
"user_id",
"created_at",
"item_folder",
"item_type",
"language",
"content_reader",
"directionality",
"item_word_count",
"item_has_thumbnail",
"item_has_site_icon",
"site",
"author",
"subscription",
"item_subscription_type",
"user_clicked",
"user_read",
"user_long_read",
]
df = pd.DataFrame(data, columns=columns)
return df
def add_user_features(df, name, feature_name):
conn = psycopg2.connect(**DB_PARAMS)
cur = conn.cursor()
query = f"SELECT user_id, {name}, interactions, interaction_rate FROM {feature_name}"
cur.execute(query)
data = cur.fetchall()
cur.close()
conn.close()
columns = [
"user_id",
name,
"interactions",
"interaction_rate"
]
rate_feature_name = f"{feature_name}_rate"
count_feature_name = f"{feature_name}_count"
df_loaded = pd.DataFrame(data, columns=columns)
df_loaded = df_loaded.rename(columns={"interactions": count_feature_name}, errors="raise")
df_loaded = df_loaded.rename(columns={"interaction_rate": rate_feature_name}, errors="raise")
df_merged = pd.merge(df, df_loaded[['user_id', name, rate_feature_name, count_feature_name]], on=['user_id',name], how='left')
df_merged[rate_feature_name] = df_merged[rate_feature_name].fillna(0)
df_merged[count_feature_name] = df_merged[count_feature_name].fillna(0)
return df_merged
def add_global_features(df, name, feature_name):
conn = psycopg2.connect(**DB_PARAMS)
cur = conn.cursor()
query = f"SELECT {name}, interactions, interaction_rate FROM {feature_name}"
cur.execute(query)
data = cur.fetchall()
cur.close()
conn.close()
columns = [
name,
"interactions",
"interaction_rate"
]
rate_feature_name = f"{feature_name}_rate"
count_feature_name = f"{feature_name}_count"
df_loaded = pd.DataFrame(data, columns=columns)
df_loaded = df_loaded.rename(columns={"interactions": count_feature_name}, errors="raise")
df_loaded = df_loaded.rename(columns={"interaction_rate": rate_feature_name}, errors="raise")
df_merged = pd.merge(df, df_loaded[[name, count_feature_name, rate_feature_name]], on=name, how='left')
df_merged[rate_feature_name] = df_merged[rate_feature_name].fillna(0)
df_merged[count_feature_name] = df_merged[count_feature_name].fillna(0)
return df_merged
def add_dummy_features(df):
known_folder_types = ['inbox', 'following']
known_subscription_types = ['NEWSLETTER', 'RSS']
# known_item_types = ['ARTICLE', 'BOOK', 'FILE', 'HIGHLIGHTS', 'IMAGE', 'PROFILE', 'TWEET', 'UNKNOWN','VIDEO','WEBSITE']
#known_content_reader_types = ['WEB', 'PDF', 'EPUB']
# known_directionality_types = ['LTR', 'RTL']
folder_dummies = pd.get_dummies(df['item_folder'], columns=known_subscription_types, prefix='item_folder')
subscription_type_dummies = pd.get_dummies(df['item_subscription_type'], columns=known_subscription_types, prefix='item_subscription_type')
# item_type_dummies = pd.get_dummies(df['item_type'], columns=known_item_types, prefix='item_type')
# content_reader_dummies = pd.get_dummies(df['content_reader'], columns=known_content_reader_types, prefix='content_reader')
# directionality_dummies = pd.get_dummies(df['directionality'], columns=known_directionality_types, prefix='directionality')
# language_dummies = pd.get_dummies(df['language'], prefix='language')
# if 'title_topic' in df.columns:
# title_topic_dummies = pd.get_dummies(df['title_topic'], prefix='title_topic')
new_feature_names = list(subscription_type_dummies.columns) + list(folder_dummies.columns)
print("NEW FEATURE NAMES: ", new_feature_names)
# new_feature_names = list(item_type_dummies.columns) + list(content_reader_dummies.columns) + \
# list(directionality_dummies.columns) + list(language_dummies.columns)
# if 'title_topic' in df.columns:
# new_feature_names += list(title_topic_dummies.columns)
# , title_topic_dummies
return pd.concat([df, subscription_type_dummies, folder_dummies], axis=1), new_feature_names
def random_forest_predictor(df, feature_columns, user_interaction):
features = df[feature_columns]
features = features.fillna(0)
target = df[user_interaction]
X = features
y = target.values
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
scaler = StandardScaler()
rf_classifier = RandomForestClassifier(n_estimators=50, max_depth=10, random_state=42)
pipeline = PMMLPipeline([
("scaler", scaler),
("classifier", rf_classifier)
])
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)
feature_importance = rf_classifier.feature_importances_
print("Feature Importance:")
for feature, importance in zip(feature_columns, feature_importance):
print(f"{feature}: {importance}")
print("\nClassification Report:")
print(classification_report(y_test, y_pred))
return pipeline
def save_and_upload_model(pipeline, target_interaction_type, bucket_name):
pipeline_file_name = f'predict_{target_interaction_type}_random_forest_pipeline-v001.pkl'
joblib.dump(pipeline, pipeline_file_name)
if bucket_name:
upload_to_gcs(bucket_name, pipeline_file_name, f'models/{pipeline_file_name}')
else:
print("No GCS credentials so i am not uploading")
def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
"""Uploads a file to the bucket."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print(f"File {source_file_name} uploaded to {destination_blob_name}.")
def resample_data(df):
print("Initial distribution:\n", df['user_clicked'].value_counts())
# Separate the majority and minority classes
df_majority = df[df['user_clicked'] == False]
df_minority = df[df['user_clicked'] == True]
# Resample the minority class
df_minority_oversampled = df_minority.sample(n=len(df_majority), replace=True, random_state=42)
# Combine the majority class with the oversampled minority class
df_balanced = pd.concat([df_majority, df_minority_oversampled])
# Shuffle the DataFrame to mix the classes
df_balanced = df_balanced.sample(frac=1, random_state=42).reset_index(drop=True)
# Check the new distribution
print("Balanced distribution:\n", df_balanced['user_clicked'].value_counts())
# Display the first few rows of the balanced DataFrame
print(df_balanced.head())
return df_balanced
def main():
sample_size = int(os.getenv('SAMPLE_SIZE')) or 1000
num_days_history = int(os.getenv('NUM_DAYS_HISTORY')) or 21
gcs_bucket = os.getenv('GCS_BUCKET')
print("about to fetch library data")
df = fetch_data(sample_size)
print("FETCHED", df)
df = add_user_features(df, 'author', 'user_30d_interactions_author')
df = add_user_features(df, 'site', 'user_30d_interactions_site')
df = add_user_features(df, 'subscription', 'user_30d_interactions_subscription')
df = add_global_features(df, 'site', 'global_30d_interactions_site')
df = add_global_features(df, 'author', 'global_30d_interactions_author')
df = add_global_features(df, 'subscription', 'global_30d_interactions_subscription')
df = resample_data(df)
print("training RandomForest with number of library_items: ", len(df))
pipeline = random_forest_predictor(df, TRAIN_FEATURES, 'user_clicked')
print(f"uploading model and scaler to {gcs_bucket}")
save_and_upload_model(pipeline, 'user_clicked', gcs_bucket)
print("done")
if __name__ == "__main__":
main()