Merge pull request #4002 from omnivore-app/feat/digest-score
Add initial digest-score service
This commit is contained in:
71
ml/digest-score/README.md
Normal file
71
ml/digest-score/README.md
Normal file
@ -0,0 +1,71 @@
|
||||
# digest-score
|
||||
|
||||
The digest-score is the result of calculating the user's `interaction_score` which is how likely it is the user opens the library_item.
|
||||
|
||||
## Creating features
|
||||
|
||||
Currently the features are materialized views created in the public schema. Before moving to production we should create a `features` schema. To create all the materialized views locally run the `create-features.sql` file with the psql command:
|
||||
|
||||
`psql omnivore -f create_feature_views.sql`
|
||||
|
||||
## Training
|
||||
|
||||
Currently we create a small random forest model, when running locally this will be saved to disk during training. The reason we use such a simple model is to focus on feature development. The model is mostly a guide to understand whether or not the features are relevant.
|
||||
|
||||
To train the data run the following command:
|
||||
|
||||
`NUM_DAYS_HISTORY=1000 SAMPLE_SIZE=100 python train.py`
|
||||
|
||||
This will create a file: `predict_user_clicked_random_forest_pipeline-v001.pkl`
|
||||
|
||||
## Running the service
|
||||
|
||||
Now that there is a model created, you can run the service using the following command:
|
||||
|
||||
`LOAD_LOCAL_MODEL=true python app.py`
|
||||
|
||||
To test the model make a curl request using your user id, for example:
|
||||
|
||||
### A single prediction
|
||||
|
||||
```
|
||||
curl -d '{ "user_id": "2da52794-0dd2-11ef-9855-5f368b90f676", "item_features": { "site": "Omnivore Blog", "title": "this is a title", "author": "Tiago Forte", "subscription": "this is a subscriptionsdfsdfsdf", "has_thumbnail": true, "has_site_icon": true, "saved_at": "2024-05-27T04:20:47Z" }}' -H 'Content-Type: application/json' localhost:5000/predict
|
||||
```
|
||||
|
||||
### A batch prediction
|
||||
|
||||
curl -d '{ "user_id": "2da52794-0dd2-11ef-9855-5f368b90f676", "items": { "134f883e-efd8-11ee-ae98-532a6874855a": { "library_item_id": "134f883e-efd8-11ee-ae98-532a6874855a", "site": "TikTok", "title": "this is a title", "author": "Tiago Forte", "subscription": "this is a subscriptionsdfsdfsdf", "has_thumbnail": true, "has_site_icon": true, "saved_at": "2024-05-27T04:20:47Z" }} }' -H 'Content-Type: application/json' localhost:5000/batch
|
||||
|
||||
### Make a prediction for a given user and library_item (for debugging only)
|
||||
|
||||
```
|
||||
curl localhost:5000/users/2da52794-0dd2-11ef-9855-5f368b90f676/library_items/134f883e-efd8-11ee-ae98-532a6874855a/score
|
||||
{
|
||||
"score": {
|
||||
"score": 0.8,
|
||||
"interaction_score": 1.0,
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Print the user's profile data (for debugging only)
|
||||
|
||||
```
|
||||
curl localhost:5000/users/2da52794-0dd2-11ef-9855-5f368b90f676/features
|
||||
{
|
||||
"user_30d_interactions_site": {
|
||||
"count": [
|
||||
{
|
||||
"site": "TikTok",
|
||||
"user_30d_interactions_site_count": 3
|
||||
}
|
||||
],
|
||||
"rate": [
|
||||
{
|
||||
"site": "TikTok",
|
||||
"user_30d_interactions_site_rate": 0.75
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
```
|
||||
338
ml/digest-score/app.py
Normal file
338
ml/digest-score/app.py
Normal file
@ -0,0 +1,338 @@
|
||||
import psycopg2
|
||||
|
||||
import logging
|
||||
from flask import Flask, request, jsonify
|
||||
from pydantic import BaseModel, ConfigDict, ValidationError, conlist
|
||||
|
||||
from typing import List
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import pytz
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import joblib
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime
|
||||
import dateutil.parser
|
||||
from google.cloud import storage
|
||||
|
||||
app = Flask(__name__)
|
||||
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
|
||||
|
||||
|
||||
TRAIN_FEATURES = [
|
||||
"item_has_thumbnail",
|
||||
"item_has_site_icon",
|
||||
|
||||
'user_30d_interactions_author_count',
|
||||
'user_30d_interactions_site_count',
|
||||
'user_30d_interactions_subscription_count',
|
||||
|
||||
'user_30d_interactions_author_rate',
|
||||
'user_30d_interactions_site_rate',
|
||||
'user_30d_interactions_subscription_rate',
|
||||
|
||||
'global_30d_interactions_site_count',
|
||||
'global_30d_interactions_author_count',
|
||||
'global_30d_interactions_subscription_count',
|
||||
|
||||
'global_30d_interactions_site_rate',
|
||||
'global_30d_interactions_author_rate',
|
||||
'global_30d_interactions_subscription_rate'
|
||||
]
|
||||
|
||||
DB_PARAMS = {
|
||||
'dbname': os.getenv('DB_NAME') or 'omnivore',
|
||||
'user': os.getenv('DB_USER'),
|
||||
'password': os.getenv('DB_PASSWORD'),
|
||||
'host': os.getenv('DB_HOST') or 'localhost',
|
||||
'port': os.getenv('DB_PORT') or '5432'
|
||||
}
|
||||
|
||||
USER_FEATURES = {
|
||||
"site": "user_30d_interactions_site",
|
||||
"author": "user_30d_interactions_author",
|
||||
"subscription": "user_30d_interactions_subscription",
|
||||
}
|
||||
|
||||
GLOBAL_FEATURES = {
|
||||
"site": "global_30d_interactions_site",
|
||||
"author": "global_30d_interactions_author",
|
||||
"subscription": "global_30d_interactions_subscription",
|
||||
}
|
||||
|
||||
def download_from_gcs(bucket_name, gcs_path, destination_path):
|
||||
storage_client = storage.Client()
|
||||
bucket = storage_client.bucket(bucket_name)
|
||||
blob = bucket.blob(gcs_path)
|
||||
blob.download_to_filename(destination_path)
|
||||
|
||||
|
||||
def load_pipeline():
|
||||
bucket_name = os.getenv('GCS_BUCKET')
|
||||
pipeline_gcs_path = os.getenv('PIPELINE_GCS_PATH')
|
||||
download_from_gcs(bucket_name, pipeline_gcs_path, '/tmp/pipeline.pkl')
|
||||
pipeline = joblib.load('/tmp/pipeline.pkl')
|
||||
return pipeline
|
||||
|
||||
|
||||
def load_pipeline_local():
|
||||
pipeline = joblib.load('predict_user_clicked_random_forest_pipeline-v001.pkl')
|
||||
return pipeline
|
||||
|
||||
|
||||
def fetch_user_features(name, feature_name):
|
||||
conn = psycopg2.connect(**DB_PARAMS)
|
||||
cur = conn.cursor()
|
||||
query = f"SELECT user_id, {name}, interactions, interaction_rate FROM {feature_name}"
|
||||
|
||||
cur.execute(query)
|
||||
data = cur.fetchall()
|
||||
|
||||
cur.close()
|
||||
conn.close()
|
||||
columns = [
|
||||
"user_id",
|
||||
name,
|
||||
"interactions",
|
||||
"interaction_rate"
|
||||
]
|
||||
|
||||
rate_feature_name = f"{feature_name}_rate"
|
||||
count_feature_name = f"{feature_name}_count"
|
||||
|
||||
df_loaded = pd.DataFrame(data, columns=columns)
|
||||
df_loaded = df_loaded.rename(columns={"interactions": count_feature_name}, errors="raise")
|
||||
df_loaded = df_loaded.rename(columns={"interaction_rate": rate_feature_name}, errors="raise")
|
||||
df_loaded[rate_feature_name] = df_loaded[rate_feature_name].fillna(0)
|
||||
df_loaded[count_feature_name] = df_loaded[count_feature_name].fillna(0)
|
||||
|
||||
return df_loaded
|
||||
|
||||
|
||||
def fetch_global_features(name, feature_name):
|
||||
conn = psycopg2.connect(**DB_PARAMS)
|
||||
cur = conn.cursor()
|
||||
query = f"SELECT {name}, interactions, interaction_rate FROM {feature_name}"
|
||||
|
||||
cur.execute(query)
|
||||
data = cur.fetchall()
|
||||
|
||||
cur.close()
|
||||
conn.close()
|
||||
columns = [
|
||||
name,
|
||||
"interactions",
|
||||
"interaction_rate"
|
||||
]
|
||||
|
||||
rate_feature_name = f"{feature_name}_rate"
|
||||
count_feature_name = f"{feature_name}_count"
|
||||
|
||||
df_loaded = pd.DataFrame(data, columns=columns)
|
||||
df_loaded = df_loaded.rename(columns={"interactions": count_feature_name}, errors="raise")
|
||||
df_loaded = df_loaded.rename(columns={"interaction_rate": rate_feature_name}, errors="raise")
|
||||
df_loaded[rate_feature_name] = df_loaded[rate_feature_name].fillna(0)
|
||||
df_loaded[count_feature_name] = df_loaded[count_feature_name].fillna(0)
|
||||
|
||||
return df_loaded
|
||||
|
||||
|
||||
def load_user_features():
|
||||
result = {}
|
||||
for view_name in USER_FEATURES.keys():
|
||||
key_name = USER_FEATURES[view_name]
|
||||
result[key_name] = fetch_user_features(view_name, key_name)
|
||||
app.logger.info(f"loaded {len(result[key_name])} features for {key_name}")
|
||||
return result
|
||||
|
||||
|
||||
def load_global_features():
|
||||
result = {}
|
||||
for view_name in GLOBAL_FEATURES.keys():
|
||||
key_name = GLOBAL_FEATURES[view_name]
|
||||
result[key_name] = fetch_global_features(view_name, key_name)
|
||||
app.logger.info(f"loaded {len(result[key_name])} features for {key_name}")
|
||||
return result
|
||||
|
||||
|
||||
def compute_score(user_id, item_features):
|
||||
interaction_score = compute_interaction_score(user_id, item_features)
|
||||
return {
|
||||
'score': interaction_score,
|
||||
'interaction_score': interaction_score,
|
||||
}
|
||||
|
||||
|
||||
def compute_time_bonus_score(item_features):
|
||||
saved_at = item_features['saved_at']
|
||||
current_time = datetime.now(pytz.utc)
|
||||
time_diff_hours = (current_time - saved_at).total_seconds() / 3600
|
||||
max_diff_hours = 3 * 24
|
||||
if time_diff_hours >= max_diff_hours:
|
||||
return 0.0
|
||||
else:
|
||||
return max(0.0, min(1.0, 1 - (time_diff_hours / max_diff_hours)))
|
||||
|
||||
|
||||
def compute_interaction_score(user_id, item_features):
|
||||
print('item_features', item_features)
|
||||
df_test = pd.DataFrame([{
|
||||
'user_id': user_id,
|
||||
'author': item_features.get('author'),
|
||||
'site': item_features.get('site'),
|
||||
'subscription': item_features.get('subscription'),
|
||||
|
||||
'item_has_thumbnail': 1 if item_features.get('has_thumbnail') else 0,
|
||||
"item_has_site_icon": 1 if item_features.get('has_site_icon') else 0,
|
||||
}])
|
||||
|
||||
for name in USER_FEATURES.keys():
|
||||
feature_name = USER_FEATURES[name]
|
||||
df_feature = user_features[feature_name]
|
||||
df_test = df_test.merge(df_feature, on=['user_id', name], how='left')
|
||||
df_test[f"{feature_name}_rate"] = df_test[f"{feature_name}_rate"].fillna(0)
|
||||
df_test[f"{feature_name}_count"] = df_test[f"{feature_name}_count"].fillna(0)
|
||||
|
||||
for name in GLOBAL_FEATURES.keys():
|
||||
feature_name = GLOBAL_FEATURES[name]
|
||||
df_feature = global_features[feature_name]
|
||||
df_test = df_test.merge(df_feature, on=name, how='left')
|
||||
df_test[f"{feature_name}_rate"] = df_test[f"{feature_name}_rate"].fillna(0)
|
||||
df_test[f"{feature_name}_count"] = df_test[f"{feature_name}_count"].fillna(0)
|
||||
|
||||
df_predict = df_test[TRAIN_FEATURES]
|
||||
|
||||
# Print out the columns with values, so we can know how sparse our data is
|
||||
#scored_columns = df_predict.columns[(df_predict.notnull() & (df_predict != 0)).any()].tolist()
|
||||
#print("scored columns", scored_columns)
|
||||
interaction_score = pipeline.predict_proba(df_predict)
|
||||
|
||||
return interaction_score[0][1]
|
||||
|
||||
|
||||
def get_library_item(library_item_id):
|
||||
conn = psycopg2.connect(**DB_PARAMS)
|
||||
cur = conn.cursor()
|
||||
query = """
|
||||
SELECT
|
||||
li.title,
|
||||
li.author,
|
||||
li.saved_at,
|
||||
li.site_name as site,
|
||||
li.item_language as language,
|
||||
li.subscription,
|
||||
li.word_count,
|
||||
li.directionality,
|
||||
CASE WHEN li.thumbnail IS NOT NULL then 1 else 0 END as has_thumbnail,
|
||||
CASE WHEN li.site_icon IS NOT NULL then 1 else 0 END as has_site_icon
|
||||
FROM omnivore.library_item li
|
||||
WHERE li.id = %s
|
||||
"""
|
||||
|
||||
cur.execute(query, (library_item_id,))
|
||||
|
||||
data = cur.fetchone()
|
||||
columns = [desc[0] for desc in cur.description]
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
if data:
|
||||
item_dict = dict(zip(columns, data))
|
||||
return item_dict
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
@app.route('/_ah/health', methods=['GET'])
|
||||
def ready():
|
||||
return jsonify({'OK': 'yes'}), 200
|
||||
|
||||
|
||||
@app.route('/users/<user_id>/features', methods=['GET'])
|
||||
def get_user_features(user_id):
|
||||
result = {}
|
||||
|
||||
for name in USER_FEATURES.keys():
|
||||
feature_name = USER_FEATURES[name]
|
||||
rate_feature_name = f"{feature_name}_rate"
|
||||
count_feature_name = f"{feature_name}_count"
|
||||
df_feature = user_features[feature_name]
|
||||
df_filtered = df_feature[df_feature['user_id'] == user_id]
|
||||
if not df_filtered.empty:
|
||||
rate = df_filtered[[name, rate_feature_name]].dropna().to_dict(orient='records')
|
||||
count = df_filtered[[name, count_feature_name]].dropna().to_dict(orient='records')
|
||||
result[feature_name] = {
|
||||
'rate': rate,
|
||||
'count': count
|
||||
}
|
||||
|
||||
return jsonify(result), 200
|
||||
|
||||
|
||||
@app.route('/users/<user_id>/library_items/<library_item_id>/score', methods=['GET'])
|
||||
def get_library_item_score(user_id, library_item_id):
|
||||
item_features = get_library_item(library_item_id)
|
||||
score = compute_score(user_id, item_features)
|
||||
return jsonify({'score': score})
|
||||
|
||||
|
||||
@app.route('/predict', methods=['POST'])
|
||||
def predict():
|
||||
try:
|
||||
data = request.get_json()
|
||||
app.logger.info(f"predict scoring request: {data}")
|
||||
|
||||
user_id = data.get('user_id')
|
||||
item_features = data.get('item_features')
|
||||
item_features['saved_at'] = dateutil.parser.isoparse(item_features['saved_at'])
|
||||
|
||||
if user_id is None:
|
||||
return jsonify({'error': 'Missing user_id'}), 400
|
||||
|
||||
score = compute_score(user_id, item_features)
|
||||
return jsonify({'score': score})
|
||||
except Exception as e:
|
||||
app.logger.error(f"exception in predict endpoint: {request.get_json()}\n{e}")
|
||||
return jsonify({'error': str(e)}), 500
|
||||
|
||||
|
||||
@app.route('/batch', methods=['POST'])
|
||||
def batch():
|
||||
try:
|
||||
result = {}
|
||||
data = request.get_json()
|
||||
app.logger.info(f"batch scoring request: {data}")
|
||||
|
||||
user_id = data.get('user_id')
|
||||
items = data.get('items')
|
||||
|
||||
if user_id is None:
|
||||
return jsonify({'error': 'Missing user_id'}), 400
|
||||
|
||||
for key, item in items.items():
|
||||
print('key": ', key)
|
||||
print('item: ', item)
|
||||
library_item_id = item['library_item_id']
|
||||
item['saved_at'] = dateutil.parser.isoparse(item['saved_at'])
|
||||
result[library_item_id] = compute_score(user_id, item)
|
||||
|
||||
return jsonify(result)
|
||||
except Exception as e:
|
||||
app.logger.error(f"exception in batch endpoint: {request.get_json()}\n{e}")
|
||||
return jsonify({'error': str(e)}), 500
|
||||
|
||||
|
||||
if os.getenv('LOAD_LOCAL_MODEL'):
|
||||
pipeline = load_pipeline_local()
|
||||
else:
|
||||
pipeline = load_pipeline()
|
||||
|
||||
user_features = load_user_features()
|
||||
global_features = load_global_features()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(debug=True, port=5000)
|
||||
301
ml/digest-score/create-features.sql
Normal file
301
ml/digest-score/create-features.sql
Normal file
@ -0,0 +1,301 @@
|
||||
|
||||
DROP MATERIALIZED VIEW IF EXISTS user_30d_interactions_site ;
|
||||
CREATE MATERIALIZED VIEW user_30d_interactions_site AS
|
||||
WITH interactions AS (
|
||||
SELECT
|
||||
li.user_id,
|
||||
li.site_name AS site,
|
||||
COUNT(*) AS interactions
|
||||
FROM
|
||||
omnivore.library_item li
|
||||
WHERE
|
||||
li.read_at IS NOT NULL AND
|
||||
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
|
||||
li.created_at < NOW()
|
||||
GROUP BY
|
||||
li.user_id, li.site_name
|
||||
HAVING COUNT(*) > 2
|
||||
),
|
||||
total_items AS (
|
||||
SELECT
|
||||
li.user_id,
|
||||
li.site_name AS site,
|
||||
COUNT(*) AS total_items
|
||||
FROM
|
||||
omnivore.library_item li
|
||||
WHERE
|
||||
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
|
||||
li.created_at < NOW()
|
||||
GROUP BY
|
||||
li.user_id, li.site_name
|
||||
)
|
||||
SELECT
|
||||
i.user_id,
|
||||
i.site,
|
||||
i.interactions,
|
||||
t.total_items,
|
||||
(i.interactions::float / t.total_items) AS interaction_rate
|
||||
FROM
|
||||
interactions i
|
||||
JOIN
|
||||
total_items t ON i.user_id = t.user_id AND i.site = t.site;
|
||||
|
||||
|
||||
DROP MATERIALIZED VIEW IF EXISTS user_30d_interactions_subscription ;
|
||||
CREATE MATERIALIZED VIEW user_30d_interactions_subscription AS
|
||||
WITH interactions AS (
|
||||
SELECT
|
||||
li.user_id,
|
||||
li.subscription,
|
||||
COUNT(*) AS interactions
|
||||
FROM
|
||||
omnivore.library_item li
|
||||
WHERE
|
||||
li.read_at IS NOT NULL AND
|
||||
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
|
||||
li.created_at < NOW()
|
||||
GROUP BY
|
||||
li.user_id, li.subscription
|
||||
HAVING COUNT(*) > 2
|
||||
),
|
||||
total_items AS (
|
||||
SELECT
|
||||
li.user_id,
|
||||
li.subscription,
|
||||
COUNT(*) AS total_items
|
||||
FROM
|
||||
omnivore.library_item li
|
||||
WHERE
|
||||
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
|
||||
li.created_at < NOW()
|
||||
GROUP BY
|
||||
li.user_id, li.subscription
|
||||
)
|
||||
SELECT
|
||||
i.user_id,
|
||||
i.subscription,
|
||||
i.interactions,
|
||||
t.total_items,
|
||||
(i.interactions::float / t.total_items) AS interaction_rate
|
||||
FROM
|
||||
interactions i
|
||||
JOIN
|
||||
total_items t ON i.user_id = t.user_id AND i.subscription = t.subscription;
|
||||
|
||||
|
||||
DROP MATERIALIZED VIEW IF EXISTS user_30d_interactions_author ;
|
||||
CREATE MATERIALIZED VIEW user_30d_interactions_author AS
|
||||
WITH interactions AS (
|
||||
SELECT
|
||||
li.user_id,
|
||||
li.author,
|
||||
COUNT(*) AS interactions
|
||||
FROM
|
||||
omnivore.library_item li
|
||||
WHERE
|
||||
li.read_at IS NOT NULL AND
|
||||
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
|
||||
li.created_at < NOW()
|
||||
GROUP BY
|
||||
li.user_id, li.author
|
||||
HAVING COUNT(*) > 2
|
||||
),
|
||||
total_items AS (
|
||||
SELECT
|
||||
li.user_id,
|
||||
li.author,
|
||||
COUNT(*) AS total_items
|
||||
FROM
|
||||
omnivore.library_item li
|
||||
WHERE
|
||||
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
|
||||
li.created_at < NOW()
|
||||
GROUP BY
|
||||
li.user_id, li.author
|
||||
)
|
||||
SELECT
|
||||
i.user_id,
|
||||
i.author,
|
||||
i.interactions,
|
||||
t.total_items,
|
||||
(i.interactions::float / t.total_items) AS interaction_rate
|
||||
FROM
|
||||
interactions i
|
||||
JOIN
|
||||
total_items t ON i.user_id = t.user_id AND i.author = t.author;
|
||||
|
||||
|
||||
|
||||
DROP MATERIALIZED VIEW IF EXISTS global_30d_interactions_site;
|
||||
CREATE MATERIALIZED VIEW global_30d_interactions_site AS
|
||||
WITH interactions AS (
|
||||
SELECT
|
||||
li.site_name AS site,
|
||||
COUNT(*) AS interactions
|
||||
FROM
|
||||
omnivore.library_item li
|
||||
WHERE
|
||||
li.read_at IS NOT NULL AND
|
||||
li.site_name IS NOT NULL AND
|
||||
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
|
||||
li.created_at < NOW()
|
||||
GROUP BY
|
||||
li.site_name
|
||||
HAVING COUNT(*) > 3
|
||||
),
|
||||
total_items AS (
|
||||
SELECT
|
||||
li.site_name AS site,
|
||||
COUNT(*) AS total_items
|
||||
FROM
|
||||
omnivore.library_item li
|
||||
WHERE
|
||||
li.site_name IS NOT NULL AND
|
||||
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
|
||||
li.created_at < NOW()
|
||||
GROUP BY
|
||||
li.site_name
|
||||
)
|
||||
SELECT
|
||||
i.site,
|
||||
i.interactions,
|
||||
t.total_items,
|
||||
(i.interactions::float / t.total_items) AS interaction_rate
|
||||
FROM
|
||||
interactions i
|
||||
JOIN
|
||||
total_items t ON i.site = t.site;
|
||||
|
||||
|
||||
DROP MATERIALIZED VIEW IF EXISTS global_30d_interactions_subscription ;
|
||||
CREATE MATERIALIZED VIEW global_30d_interactions_subscription AS
|
||||
SELECT
|
||||
subscription,
|
||||
COUNT(*) AS interactions
|
||||
FROM
|
||||
omnivore.library_item li
|
||||
WHERE
|
||||
li.read_at is not null AND
|
||||
li.subscription is not NULL AND
|
||||
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
|
||||
li.created_at < NOW()
|
||||
GROUP BY
|
||||
li.subscription
|
||||
HAVING COUNT(*) > 3;
|
||||
|
||||
|
||||
DROP MATERIALIZED VIEW IF EXISTS global_30d_interactions_subscription;
|
||||
CREATE MATERIALIZED VIEW global_30d_interactions_subscription AS
|
||||
WITH interactions AS (
|
||||
SELECT
|
||||
li.subscription,
|
||||
COUNT(*) AS interactions
|
||||
FROM
|
||||
omnivore.library_item li
|
||||
WHERE
|
||||
li.read_at IS NOT NULL AND
|
||||
li.subscription IS NOT NULL AND
|
||||
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
|
||||
li.created_at < NOW()
|
||||
GROUP BY
|
||||
li.subscription
|
||||
HAVING COUNT(*) > 3
|
||||
),
|
||||
total_items AS (
|
||||
SELECT
|
||||
li.subscription,
|
||||
COUNT(*) AS total_items
|
||||
FROM
|
||||
omnivore.library_item li
|
||||
WHERE
|
||||
li.subscription IS NOT NULL AND
|
||||
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
|
||||
li.created_at < NOW()
|
||||
GROUP BY
|
||||
li.subscription
|
||||
)
|
||||
SELECT
|
||||
i.subscription,
|
||||
i.interactions,
|
||||
t.total_items,
|
||||
(i.interactions::float / t.total_items) AS interaction_rate
|
||||
FROM
|
||||
interactions i
|
||||
JOIN
|
||||
total_items t ON i.subscription = t.subscription;
|
||||
|
||||
|
||||
DROP MATERIALIZED VIEW IF EXISTS global_30d_interactions_author;
|
||||
CREATE MATERIALIZED VIEW global_30d_interactions_author AS
|
||||
WITH interactions AS (
|
||||
SELECT
|
||||
li.author,
|
||||
COUNT(*) AS interactions
|
||||
FROM
|
||||
omnivore.library_item li
|
||||
WHERE
|
||||
li.read_at IS NOT NULL AND
|
||||
li.author IS NOT NULL AND
|
||||
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
|
||||
li.created_at < NOW()
|
||||
GROUP BY
|
||||
li.author
|
||||
HAVING COUNT(*) > 3
|
||||
),
|
||||
total_items AS (
|
||||
SELECT
|
||||
li.author,
|
||||
COUNT(*) AS total_items
|
||||
FROM
|
||||
omnivore.library_item li
|
||||
WHERE
|
||||
li.author IS NOT NULL AND
|
||||
li.created_at >= NOW() - INTERVAL '30 DAYS' AND
|
||||
li.created_at < NOW()
|
||||
GROUP BY
|
||||
li.author
|
||||
)
|
||||
SELECT
|
||||
i.author,
|
||||
i.interactions,
|
||||
t.total_items,
|
||||
(i.interactions::float / t.total_items) AS interaction_rate
|
||||
FROM
|
||||
interactions i
|
||||
JOIN
|
||||
total_items t ON i.author = t.author;
|
||||
|
||||
|
||||
DROP MATERIALIZED VIEW IF EXISTS user_7d_activity ;
|
||||
CREATE MATERIALIZED VIEW user_7d_activity AS
|
||||
SELECT
|
||||
li.id as library_item_id,
|
||||
li.user_id,
|
||||
li.created_at,
|
||||
|
||||
li.folder as item_folder,
|
||||
|
||||
li.item_type,
|
||||
li.item_language AS language,
|
||||
li.content_reader,
|
||||
li.directionality,
|
||||
li.word_count as item_word_count,
|
||||
|
||||
CASE WHEN li.thumbnail IS NOT NULL then 1 else 0 END as item_has_thumbnail,
|
||||
CASE WHEN li.site_icon IS NOT NULL then 1 else 0 END as item_has_site_icon,
|
||||
|
||||
li.site_name AS site,
|
||||
li.author,
|
||||
li.subscription,
|
||||
sub.type as item_subscription_type,
|
||||
|
||||
CASE WHEN li.read_at is not NULL then 1 else 0 END as user_clicked,
|
||||
CASE WHEN li.reading_progress_bottom_percent > 10 THEN 1 ELSE 0 END AS user_read,
|
||||
CASE WHEN li.reading_progress_bottom_percent > 50 THEN 1 ELSE 0 END AS user_long_read
|
||||
|
||||
FROM omnivore.library_item AS li
|
||||
LEFT JOIN omnivore.subscriptions sub on li.subscription = sub.name AND sub.user_id = li.user_id
|
||||
WHERE
|
||||
li.created_at >= NOW() - INTERVAL '21 days' AND
|
||||
li.created_at < NOW()
|
||||
;
|
||||
8
ml/digest-score/requirements.txt
Normal file
8
ml/digest-score/requirements.txt
Normal file
@ -0,0 +1,8 @@
|
||||
psycopg2-binary
|
||||
pandas
|
||||
scikit-learn
|
||||
joblib
|
||||
google-cloud-storage
|
||||
flask
|
||||
pydantic
|
||||
sklearn2pmml
|
||||
299
ml/digest-score/train.py
Normal file
299
ml/digest-score/train.py
Normal file
@ -0,0 +1,299 @@
|
||||
import psycopg2
|
||||
import pandas as pd
|
||||
import joblib
|
||||
from datetime import datetime
|
||||
|
||||
import os
|
||||
from sklearn.model_selection import train_test_split
|
||||
from sklearn.preprocessing import StandardScaler
|
||||
from sklearn.ensemble import RandomForestClassifier
|
||||
from sklearn2pmml import PMMLPipeline, sklearn2pmml
|
||||
from sklearn.pipeline import Pipeline
|
||||
from sklearn.metrics import accuracy_score, classification_report
|
||||
import numpy as np
|
||||
|
||||
from google.cloud import storage
|
||||
from google.cloud.exceptions import PreconditionFailed
|
||||
|
||||
|
||||
DB_PARAMS = {
|
||||
'dbname': os.getenv('DB_NAME') or 'omnivore',
|
||||
'user': os.getenv('DB_USER'),
|
||||
'password': os.getenv('DB_PASSWORD'),
|
||||
'host': os.getenv('DB_HOST') or 'localhost',
|
||||
'port': os.getenv('DB_PORT') or '5432'
|
||||
}
|
||||
|
||||
|
||||
TRAIN_FEATURES = [
|
||||
# "item_word_count",
|
||||
"item_has_thumbnail",
|
||||
"item_has_site_icon",
|
||||
|
||||
'user_30d_interactions_author_count',
|
||||
'user_30d_interactions_site_count',
|
||||
'user_30d_interactions_subscription_count',
|
||||
|
||||
'user_30d_interactions_author_rate',
|
||||
'user_30d_interactions_site_rate',
|
||||
'user_30d_interactions_subscription_rate',
|
||||
|
||||
'global_30d_interactions_site_count',
|
||||
'global_30d_interactions_author_count',
|
||||
'global_30d_interactions_subscription_count',
|
||||
|
||||
'global_30d_interactions_site_rate',
|
||||
'global_30d_interactions_author_rate',
|
||||
'global_30d_interactions_subscription_rate'
|
||||
]
|
||||
|
||||
|
||||
def fetch_data(sample_size):
|
||||
# Connect to the PostgreSQL database
|
||||
conn = psycopg2.connect(**DB_PARAMS)
|
||||
cur = conn.cursor()
|
||||
query = f"""
|
||||
SELECT
|
||||
user_id,
|
||||
created_at,
|
||||
item_folder,
|
||||
item_type,
|
||||
language,
|
||||
content_reader,
|
||||
directionality,
|
||||
item_word_count,
|
||||
item_has_thumbnail,
|
||||
item_has_site_icon,
|
||||
site,
|
||||
author,
|
||||
subscription,
|
||||
item_subscription_type,
|
||||
user_clicked,
|
||||
user_read,
|
||||
user_long_read
|
||||
|
||||
FROM user_7d_activity LIMIT {sample_size}
|
||||
"""
|
||||
|
||||
cur.execute(query)
|
||||
data = cur.fetchall()
|
||||
|
||||
cur.close()
|
||||
conn.close()
|
||||
columns = [
|
||||
"user_id",
|
||||
"created_at",
|
||||
"item_folder",
|
||||
"item_type",
|
||||
"language",
|
||||
"content_reader",
|
||||
"directionality",
|
||||
"item_word_count",
|
||||
"item_has_thumbnail",
|
||||
"item_has_site_icon",
|
||||
"site",
|
||||
"author",
|
||||
"subscription",
|
||||
"item_subscription_type",
|
||||
"user_clicked",
|
||||
"user_read",
|
||||
"user_long_read",
|
||||
]
|
||||
|
||||
df = pd.DataFrame(data, columns=columns)
|
||||
return df
|
||||
|
||||
|
||||
def add_user_features(df, name, feature_name):
|
||||
conn = psycopg2.connect(**DB_PARAMS)
|
||||
cur = conn.cursor()
|
||||
query = f"SELECT user_id, {name}, interactions, interaction_rate FROM {feature_name}"
|
||||
|
||||
cur.execute(query)
|
||||
data = cur.fetchall()
|
||||
|
||||
cur.close()
|
||||
conn.close()
|
||||
columns = [
|
||||
"user_id",
|
||||
name,
|
||||
"interactions",
|
||||
"interaction_rate"
|
||||
]
|
||||
|
||||
rate_feature_name = f"{feature_name}_rate"
|
||||
count_feature_name = f"{feature_name}_count"
|
||||
|
||||
df_loaded = pd.DataFrame(data, columns=columns)
|
||||
df_loaded = df_loaded.rename(columns={"interactions": count_feature_name}, errors="raise")
|
||||
df_loaded = df_loaded.rename(columns={"interaction_rate": rate_feature_name}, errors="raise")
|
||||
df_merged = pd.merge(df, df_loaded[['user_id', name, rate_feature_name, count_feature_name]], on=['user_id',name], how='left')
|
||||
|
||||
df_merged[rate_feature_name] = df_merged[rate_feature_name].fillna(0)
|
||||
df_merged[count_feature_name] = df_merged[count_feature_name].fillna(0)
|
||||
|
||||
return df_merged
|
||||
|
||||
|
||||
def add_global_features(df, name, feature_name):
|
||||
conn = psycopg2.connect(**DB_PARAMS)
|
||||
cur = conn.cursor()
|
||||
query = f"SELECT {name}, interactions, interaction_rate FROM {feature_name}"
|
||||
|
||||
cur.execute(query)
|
||||
data = cur.fetchall()
|
||||
|
||||
cur.close()
|
||||
conn.close()
|
||||
columns = [
|
||||
name,
|
||||
"interactions",
|
||||
"interaction_rate"
|
||||
]
|
||||
|
||||
rate_feature_name = f"{feature_name}_rate"
|
||||
count_feature_name = f"{feature_name}_count"
|
||||
|
||||
df_loaded = pd.DataFrame(data, columns=columns)
|
||||
df_loaded = df_loaded.rename(columns={"interactions": count_feature_name}, errors="raise")
|
||||
df_loaded = df_loaded.rename(columns={"interaction_rate": rate_feature_name}, errors="raise")
|
||||
|
||||
df_merged = pd.merge(df, df_loaded[[name, count_feature_name, rate_feature_name]], on=name, how='left')
|
||||
|
||||
df_merged[rate_feature_name] = df_merged[rate_feature_name].fillna(0)
|
||||
df_merged[count_feature_name] = df_merged[count_feature_name].fillna(0)
|
||||
return df_merged
|
||||
|
||||
|
||||
def add_dummy_features(df):
|
||||
known_folder_types = ['inbox', 'following']
|
||||
known_subscription_types = ['NEWSLETTER', 'RSS']
|
||||
# known_item_types = ['ARTICLE', 'BOOK', 'FILE', 'HIGHLIGHTS', 'IMAGE', 'PROFILE', 'TWEET', 'UNKNOWN','VIDEO','WEBSITE']
|
||||
#known_content_reader_types = ['WEB', 'PDF', 'EPUB']
|
||||
# known_directionality_types = ['LTR', 'RTL']
|
||||
|
||||
folder_dummies = pd.get_dummies(df['item_folder'], columns=known_subscription_types, prefix='item_folder')
|
||||
subscription_type_dummies = pd.get_dummies(df['item_subscription_type'], columns=known_subscription_types, prefix='item_subscription_type')
|
||||
|
||||
# item_type_dummies = pd.get_dummies(df['item_type'], columns=known_item_types, prefix='item_type')
|
||||
# content_reader_dummies = pd.get_dummies(df['content_reader'], columns=known_content_reader_types, prefix='content_reader')
|
||||
# directionality_dummies = pd.get_dummies(df['directionality'], columns=known_directionality_types, prefix='directionality')
|
||||
# language_dummies = pd.get_dummies(df['language'], prefix='language')
|
||||
|
||||
# if 'title_topic' in df.columns:
|
||||
# title_topic_dummies = pd.get_dummies(df['title_topic'], prefix='title_topic')
|
||||
|
||||
new_feature_names = list(subscription_type_dummies.columns) + list(folder_dummies.columns)
|
||||
print("NEW FEATURE NAMES: ", new_feature_names)
|
||||
# new_feature_names = list(item_type_dummies.columns) + list(content_reader_dummies.columns) + \
|
||||
# list(directionality_dummies.columns) + list(language_dummies.columns)
|
||||
|
||||
# if 'title_topic' in df.columns:
|
||||
# new_feature_names += list(title_topic_dummies.columns)
|
||||
# , title_topic_dummies
|
||||
return pd.concat([df, subscription_type_dummies, folder_dummies], axis=1), new_feature_names
|
||||
|
||||
|
||||
def random_forest_predictor(df, feature_columns, user_interaction):
|
||||
features = df[feature_columns]
|
||||
|
||||
features = features.fillna(0)
|
||||
target = df[user_interaction]
|
||||
|
||||
X = features
|
||||
y = target.values
|
||||
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
|
||||
scaler = StandardScaler()
|
||||
rf_classifier = RandomForestClassifier(n_estimators=50, max_depth=10, random_state=42)
|
||||
|
||||
pipeline = PMMLPipeline([
|
||||
("scaler", scaler),
|
||||
("classifier", rf_classifier)
|
||||
])
|
||||
pipeline.fit(X_train, y_train)
|
||||
|
||||
y_pred = pipeline.predict(X_test)
|
||||
|
||||
feature_importance = rf_classifier.feature_importances_
|
||||
|
||||
print("Feature Importance:")
|
||||
for feature, importance in zip(feature_columns, feature_importance):
|
||||
print(f"{feature}: {importance}")
|
||||
|
||||
print("\nClassification Report:")
|
||||
print(classification_report(y_test, y_pred))
|
||||
|
||||
return pipeline
|
||||
|
||||
|
||||
def save_and_upload_model(pipeline, target_interaction_type, bucket_name):
|
||||
pipeline_file_name = f'predict_{target_interaction_type}_random_forest_pipeline-v001.pkl'
|
||||
joblib.dump(pipeline, pipeline_file_name)
|
||||
|
||||
if bucket_name:
|
||||
upload_to_gcs(bucket_name, pipeline_file_name, f'models/{pipeline_file_name}')
|
||||
else:
|
||||
print("No GCS credentials so i am not uploading")
|
||||
|
||||
|
||||
def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
|
||||
"""Uploads a file to the bucket."""
|
||||
storage_client = storage.Client()
|
||||
bucket = storage_client.bucket(bucket_name)
|
||||
blob = bucket.blob(destination_blob_name)
|
||||
blob.upload_from_filename(source_file_name)
|
||||
|
||||
print(f"File {source_file_name} uploaded to {destination_blob_name}.")
|
||||
|
||||
|
||||
def resample_data(df):
|
||||
print("Initial distribution:\n", df['user_clicked'].value_counts())
|
||||
|
||||
# Separate the majority and minority classes
|
||||
df_majority = df[df['user_clicked'] == False]
|
||||
df_minority = df[df['user_clicked'] == True]
|
||||
|
||||
# Resample the minority class
|
||||
df_minority_oversampled = df_minority.sample(n=len(df_majority), replace=True, random_state=42)
|
||||
|
||||
# Combine the majority class with the oversampled minority class
|
||||
df_balanced = pd.concat([df_majority, df_minority_oversampled])
|
||||
|
||||
# Shuffle the DataFrame to mix the classes
|
||||
df_balanced = df_balanced.sample(frac=1, random_state=42).reset_index(drop=True)
|
||||
|
||||
# Check the new distribution
|
||||
print("Balanced distribution:\n", df_balanced['user_clicked'].value_counts())
|
||||
|
||||
# Display the first few rows of the balanced DataFrame
|
||||
print(df_balanced.head())
|
||||
|
||||
return df_balanced
|
||||
|
||||
def main():
|
||||
sample_size = int(os.getenv('SAMPLE_SIZE')) or 1000
|
||||
num_days_history = int(os.getenv('NUM_DAYS_HISTORY')) or 21
|
||||
gcs_bucket = os.getenv('GCS_BUCKET')
|
||||
|
||||
print("about to fetch library data")
|
||||
df = fetch_data(sample_size)
|
||||
print("FETCHED", df)
|
||||
|
||||
df = add_user_features(df, 'author', 'user_30d_interactions_author')
|
||||
df = add_user_features(df, 'site', 'user_30d_interactions_site')
|
||||
df = add_user_features(df, 'subscription', 'user_30d_interactions_subscription')
|
||||
df = add_global_features(df, 'site', 'global_30d_interactions_site')
|
||||
df = add_global_features(df, 'author', 'global_30d_interactions_author')
|
||||
df = add_global_features(df, 'subscription', 'global_30d_interactions_subscription')
|
||||
|
||||
df = resample_data(df)
|
||||
print("training RandomForest with number of library_items: ", len(df))
|
||||
pipeline = random_forest_predictor(df, TRAIN_FEATURES, 'user_clicked')
|
||||
|
||||
print(f"uploading model and scaler to {gcs_bucket}")
|
||||
save_and_upload_model(pipeline, 'user_clicked', gcs_bucket)
|
||||
|
||||
print("done")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user