Update digest-score to run new model

This commit is contained in:
Jackson Harper
2024-06-20 15:49:12 +08:00
parent 99c95194d0
commit 17e5978792
9 changed files with 638 additions and 465 deletions

View File

@ -0,0 +1,13 @@
FROM python:3.8-slim
WORKDIR /app
ENV GRPC_PYTHON_BUILD_SYSTEM_OPENSSL "1"
ENV GRPC_PYTHON_BUILD_SYSTEM_ZLIB "1"
COPY . /app
RUN pip install --no-cache-dir -r requirements.txt
EXPOSE 5000
CMD ["python", "serve.py"]

View File

@ -1,8 +1,5 @@
import psycopg2
import logging import logging
from flask import Flask, request, jsonify from flask import Flask, request, jsonify
from pydantic import BaseModel, ConfigDict, ValidationError, conlist
from typing import List from typing import List
@ -10,58 +7,23 @@ import os
import sys import sys
import json import json
import pytz import pytz
import pickle
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import joblib import joblib
from datetime import datetime, timedelta from urllib.parse import urlparse
from datetime import datetime from datetime import datetime
import dateutil.parser import dateutil.parser
from google.cloud import storage from google.cloud import storage
from features.user_history import FEATURE_COLUMNS
app = Flask(__name__) app = Flask(__name__)
logging.basicConfig(level=logging.INFO, stream=sys.stdout) logging.basicConfig(level=logging.INFO, stream=sys.stdout)
TRAIN_FEATURES = [ USER_HISTORY_PATH = 'user_features.pkl'
"item_has_thumbnail", MODEL_PIPELINE_PATH = 'predict_read_pipeline-v002.pkl'
"item_has_site_icon",
'user_30d_interactions_author_count',
'user_30d_interactions_site_count',
'user_30d_interactions_subscription_count',
'user_30d_interactions_author_rate',
'user_30d_interactions_site_rate',
'user_30d_interactions_subscription_rate',
'global_30d_interactions_site_count',
'global_30d_interactions_author_count',
'global_30d_interactions_subscription_count',
'global_30d_interactions_site_rate',
'global_30d_interactions_author_rate',
'global_30d_interactions_subscription_rate'
]
DB_PARAMS = {
'dbname': os.getenv('DB_NAME') or 'omnivore',
'user': os.getenv('DB_USER'),
'password': os.getenv('DB_PASSWORD'),
'host': os.getenv('DB_HOST') or 'localhost',
'port': os.getenv('DB_PORT') or '5432'
}
USER_FEATURES = {
"site": "user_30d_interactions_site",
"author": "user_30d_interactions_author",
"subscription": "user_30d_interactions_subscription",
}
GLOBAL_FEATURES = {
"site": "global_30d_interactions_site",
"author": "global_30d_interactions_author",
"subscription": "global_30d_interactions_subscription",
}
def download_from_gcs(bucket_name, gcs_path, destination_path): def download_from_gcs(bucket_name, gcs_path, destination_path):
storage_client = storage.Client() storage_client = storage.Client()
@ -70,94 +32,44 @@ def download_from_gcs(bucket_name, gcs_path, destination_path):
blob.download_to_filename(destination_path) blob.download_to_filename(destination_path)
def load_pipeline(): def load_pipeline(path):
bucket_name = os.getenv('GCS_BUCKET') pipeline = joblib.load(path)
pipeline_gcs_path = os.getenv('PIPELINE_GCS_PATH')
download_from_gcs(bucket_name, pipeline_gcs_path, '/tmp/pipeline.pkl')
pipeline = joblib.load('/tmp/pipeline.pkl')
return pipeline return pipeline
def load_pipeline_local(): def load_tables_from_pickle(path):
pipeline = joblib.load('predict_user_clicked_random_forest_pipeline-v001.pkl') with open(path, 'rb') as handle:
return pipeline tables = pickle.load(handle)
return tables
def fetch_user_features(name, feature_name): def load_user_features(path):
conn = psycopg2.connect(**DB_PARAMS)
cur = conn.cursor()
query = f"SELECT user_id, {name}, interactions, interaction_rate FROM {feature_name}"
cur.execute(query)
data = cur.fetchall()
cur.close()
conn.close()
columns = [
"user_id",
name,
"interactions",
"interaction_rate"
]
rate_feature_name = f"{feature_name}_rate"
count_feature_name = f"{feature_name}_count"
df_loaded = pd.DataFrame(data, columns=columns)
df_loaded = df_loaded.rename(columns={"interactions": count_feature_name}, errors="raise")
df_loaded = df_loaded.rename(columns={"interaction_rate": rate_feature_name}, errors="raise")
df_loaded[rate_feature_name] = df_loaded[rate_feature_name].fillna(0)
df_loaded[count_feature_name] = df_loaded[count_feature_name].fillna(0)
return df_loaded
def fetch_global_features(name, feature_name):
conn = psycopg2.connect(**DB_PARAMS)
cur = conn.cursor()
query = f"SELECT {name}, interactions, interaction_rate FROM {feature_name}"
cur.execute(query)
data = cur.fetchall()
cur.close()
conn.close()
columns = [
name,
"interactions",
"interaction_rate"
]
rate_feature_name = f"{feature_name}_rate"
count_feature_name = f"{feature_name}_count"
df_loaded = pd.DataFrame(data, columns=columns)
df_loaded = df_loaded.rename(columns={"interactions": count_feature_name}, errors="raise")
df_loaded = df_loaded.rename(columns={"interaction_rate": rate_feature_name}, errors="raise")
df_loaded[rate_feature_name] = df_loaded[rate_feature_name].fillna(0)
df_loaded[count_feature_name] = df_loaded[count_feature_name].fillna(0)
return df_loaded
def load_user_features():
result = {} result = {}
for view_name in USER_FEATURES.keys(): tables = load_tables_from_pickle(path)
key_name = USER_FEATURES[view_name] for table_name in tables.keys():
result[key_name] = fetch_user_features(view_name, key_name) result[table_name] = tables[table_name].to_pandas()
app.logger.info(f"loaded {len(result[key_name])} features for {key_name}")
return result return result
def load_global_features(): def dataframe_to_dict(df):
result = {} result = {}
for view_name in GLOBAL_FEATURES.keys(): for index, row in df.iterrows():
key_name = GLOBAL_FEATURES[view_name] user_id = row['user_id']
result[key_name] = fetch_global_features(view_name, key_name) if user_id not in result:
app.logger.info(f"loaded {len(result[key_name])} features for {key_name}") result[user_id] = []
result[user_id].append(row.to_dict())
return result return result
def merge_dicts(dict1, dict2):
for key, value in dict2.items():
if key in dict1:
dict1[key].extend(value)
else:
dict1[key] = value
return dict1
def compute_score(user_id, item_features): def compute_score(user_id, item_features):
interaction_score = compute_interaction_score(user_id, item_features) interaction_score = compute_interaction_score(user_id, item_features)
return { return {
@ -166,86 +78,55 @@ def compute_score(user_id, item_features):
} }
def compute_time_bonus_score(item_features):
saved_at = item_features['saved_at']
current_time = datetime.now(pytz.utc)
time_diff_hours = (current_time - saved_at).total_seconds() / 3600
max_diff_hours = 3 * 24
if time_diff_hours >= max_diff_hours:
return 0.0
else:
return max(0.0, min(1.0, 1 - (time_diff_hours / max_diff_hours)))
def compute_interaction_score(user_id, item_features): def compute_interaction_score(user_id, item_features):
print('item_features', item_features) original_url_host = urlparse(item_features.get('original_url')).netloc
df_test = pd.DataFrame([{ df_test = pd.DataFrame([{
'user_id': user_id, 'user_id': user_id,
'author': item_features.get('author'), 'author': item_features.get('author'),
'site': item_features.get('site'), 'site': item_features.get('site'),
'subscription': item_features.get('subscription'), 'subscription': item_features.get('subscription'),
'original_url_host': original_url_host,
'item_has_thumbnail': 1 if item_features.get('has_thumbnail') else 0, 'item_has_thumbnail': 1 if item_features.get('has_thumbnail') else 0,
"item_has_site_icon": 1 if item_features.get('has_site_icon') else 0, "item_has_site_icon": 1 if item_features.get('has_site_icon') else 0,
'item_word_count': item_features.get('words_count'),
'is_subscription': 1 if item_features.get('is_subscription') else 0,
'is_newsletter': 1 if item_features.get('is_newsletter') else 0,
'is_feed': 1 if item_features.get('is_feed') else 0,
'days_since_subscribed': item_features.get('days_since_subscribed'),
'subscription_count': item_features.get('subscription_count'),
'subscription_auto_add_to_library': item_features.get('subscription_auto_add_to_library'),
'subscription_fetch_content': item_features.get('subscription_fetch_content'),
'has_author': 1 if item_features.get('author') else 0,
'inbox_folder': 1 if item_features.get('folder') == 'inbox' else 0,
}]) }])
for name in USER_FEATURES.keys(): for name, df in user_features.items():
feature_name = USER_FEATURES[name] df = df[df['user_id'] == user_id]
df_feature = user_features[feature_name] if 'author' in name:
df_test = df_test.merge(df_feature, on=['user_id', name], how='left') merge_keys = ['user_id', 'author']
df_test[f"{feature_name}_rate"] = df_test[f"{feature_name}_rate"].fillna(0) elif 'site' in name:
df_test[f"{feature_name}_count"] = df_test[f"{feature_name}_count"].fillna(0) merge_keys = ['user_id', 'site']
elif 'subscription' in name:
merge_keys = ['user_id', 'subscription']
elif 'original_url_host' in name:
merge_keys = ['user_id', 'original_url_host']
else:
print("skipping feature: ", name)
continue
for name in GLOBAL_FEATURES.keys(): df_test = pd.merge(df_test, df, on=merge_keys, how='left')
feature_name = GLOBAL_FEATURES[name] df_test = df_test.fillna(0)
df_feature = global_features[feature_name] df_predict = df_test[FEATURE_COLUMNS]
df_test = df_test.merge(df_feature, on=name, how='left')
df_test[f"{feature_name}_rate"] = df_test[f"{feature_name}_rate"].fillna(0)
df_test[f"{feature_name}_count"] = df_test[f"{feature_name}_count"].fillna(0)
df_predict = df_test[TRAIN_FEATURES]
# Print out the columns with values, so we can know how sparse our data is
#scored_columns = df_predict.columns[(df_predict.notnull() & (df_predict != 0)).any()].tolist()
#print("scored columns", scored_columns)
interaction_score = pipeline.predict_proba(df_predict) interaction_score = pipeline.predict_proba(df_predict)
print('score', interaction_score, 'item_features', df_test[df_test != 0].stack())
return interaction_score[0][1] return interaction_score[0][1]
def get_library_item(library_item_id):
conn = psycopg2.connect(**DB_PARAMS)
cur = conn.cursor()
query = """
SELECT
li.title,
li.author,
li.saved_at,
li.site_name as site,
li.item_language as language,
li.subscription,
li.word_count,
li.directionality,
CASE WHEN li.thumbnail IS NOT NULL then 1 else 0 END as has_thumbnail,
CASE WHEN li.site_icon IS NOT NULL then 1 else 0 END as has_site_icon
FROM omnivore.library_item li
WHERE li.id = %s
"""
cur.execute(query, (library_item_id,))
data = cur.fetchone()
columns = [desc[0] for desc in cur.description]
cur.close()
conn.close()
if data:
item_dict = dict(zip(columns, data))
return item_dict
else:
return None
@app.route('/_ah/health', methods=['GET']) @app.route('/_ah/health', methods=['GET'])
def ready(): def ready():
return jsonify({'OK': 'yes'}), 200 return jsonify({'OK': 'yes'}), 200
@ -254,29 +135,17 @@ def ready():
@app.route('/users/<user_id>/features', methods=['GET']) @app.route('/users/<user_id>/features', methods=['GET'])
def get_user_features(user_id): def get_user_features(user_id):
result = {} result = {}
df_user = pd.DataFrame([{
'user_id': user_id,
}])
for name in USER_FEATURES.keys(): user_data = {}
feature_name = USER_FEATURES[name] for name, df in user_features.items():
rate_feature_name = f"{feature_name}_rate" df = df[df['user_id'] == user_id]
count_feature_name = f"{feature_name}_count" df_dict = dataframe_to_dict(df)
df_feature = user_features[feature_name] user_data = merge_dicts(user_data, df_dict)
df_filtered = df_feature[df_feature['user_id'] == user_id]
if not df_filtered.empty:
rate = df_filtered[[name, rate_feature_name]].dropna().to_dict(orient='records')
count = df_filtered[[name, count_feature_name]].dropna().to_dict(orient='records')
result[feature_name] = {
'rate': rate,
'count': count
}
return jsonify(result), 200 return jsonify(user_data), 200
@app.route('/users/<user_id>/library_items/<library_item_id>/score', methods=['GET'])
def get_library_item_score(user_id, library_item_id):
item_features = get_library_item(library_item_id)
score = compute_score(user_id, item_features)
return jsonify({'score': score})
@app.route('/predict', methods=['POST']) @app.route('/predict', methods=['POST'])
@ -287,7 +156,6 @@ def predict():
user_id = data.get('user_id') user_id = data.get('user_id')
item_features = data.get('item_features') item_features = data.get('item_features')
item_features['saved_at'] = dateutil.parser.isoparse(item_features['saved_at'])
if user_id is None: if user_id is None:
return jsonify({'error': 'Missing user_id'}), 400 return jsonify({'error': 'Missing user_id'}), 400
@ -316,7 +184,6 @@ def batch():
print('key": ', key) print('key": ', key)
print('item: ', item) print('item: ', item)
library_item_id = item['library_item_id'] library_item_id = item['library_item_id']
item['saved_at'] = dateutil.parser.isoparse(item['saved_at'])
result[library_item_id] = compute_score(user_id, item) result[library_item_id] = compute_score(user_id, item)
return jsonify(result) return jsonify(result)
@ -325,14 +192,15 @@ def batch():
return jsonify({'error': str(e)}), 500 return jsonify({'error': str(e)}), 500
if os.getenv('LOAD_LOCAL_MODEL'): if os.getenv('LOAD_LOCAL_MODEL') != None:
pipeline = load_pipeline_local() gcs_bucket_name = os.getenv('GCS_BUCKET')
else: download_from_gcs(gcs_bucket_name, f'data/features/user_features.pkl', USER_HISTORY_PATH)
pipeline = load_pipeline() download_from_gcs(gcs_bucket_name, f'data/models/predict_read_pipeline-v002.pkl', MODEL_PIPELINE_PATH)
user_features = load_user_features()
global_features = load_global_features()
pipeline = load_pipeline(MODEL_PIPELINE_PATH)
user_features = load_user_features(USER_HISTORY_PATH)
print('loaded pipeline and user_features', pipeline, user_features)
if __name__ == '__main__': if __name__ == '__main__':
app.run(debug=True, port=5000) app.run(debug=True, port=5000)

View File

@ -0,0 +1,31 @@
import psycopg2
import numpy as np
import pandas as pd
from sqlalchemy import create_engine, text
from datetime import datetime, timedelta
import os
from io import BytesIO
import tempfile
import pyarrow as pa
import pyarrow.parquet as pq
from google.cloud import storage
from features.extract import extract_and_upload_raw_data
from features.user_history import generate_and_upload_user_history
def main():
execution_date = os.getenv('EXECUTION_DATE')
num_days_history = os.getenv('NUM_DAYS_HISTORY')
gcs_bucket_name = os.getenv('GCS_BUCKET')
extract_and_upload_raw_data(execution_date, num_days_history, gcs_bucket_name)
generate_and_upload_user_history(execution_date, gcs_bucket_name)
print("done")
if __name__ == "__main__":
main()

View File

View File

@ -0,0 +1,109 @@
# extract and upload raw data used for feature generation
import psycopg2
import numpy as np
import pandas as pd
from sqlalchemy import create_engine, text
from datetime import datetime, timedelta
import os
from io import BytesIO
import tempfile
import pyarrow as pa
import pyarrow.parquet as pq
from google.cloud import storage
DB_PARAMS = {
'dbname': os.getenv('DB_NAME') or 'omnivore',
'user': os.getenv('DB_USER'),
'password': os.getenv('DB_PASSWORD'),
'host': os.getenv('DB_HOST') or 'localhost',
'port': os.getenv('DB_PORT') or '5432'
}
def extract_host(url):
try:
return urlparse(url).netloc
except Exception as e:
return None
def fetch_raw_data(date_str, num_days_history):
end_date = pd.to_datetime(date_str)
start_date = end_date - timedelta(days=num_days_history)
start_date_str = start_date.strftime('%Y-%m-%d 00:00:00')
end_date_str = end_date.strftime('%Y-%m-%d 23:59:59')
conn_str = f"postgresql://{DB_PARAMS['user']}:{DB_PARAMS['password']}@{DB_PARAMS['host']}:{DB_PARAMS['port']}/{DB_PARAMS['dbname']}"
# conn_str = f"postgresql://{DB_PARAMS['host']}:{DB_PARAMS['port']}/{DB_PARAMS['dbname']}"
engine = create_engine(conn_str)
query = text("""
SELECT
li.id as library_item_id,
li.user_id,
li.created_at,
li.archived_at,
li.deleted_at,
CASE WHEN li.folder = 'inbox' then 1 else 0 END as inbox_folder,
li.item_type,
li.item_language AS language,
li.content_reader,
li.word_count as item_word_count,
CASE WHEN li.thumbnail IS NOT NULL then 1 else 0 END as item_has_thumbnail,
CASE WHEN li.site_icon IS NOT NULL then 1 else 0 END as item_has_site_icon,
li.original_url,
li.site_name AS site,
li.author,
li.subscription,
sub.type as subscription_type,
sub.created_at as subscription_start_date,
sub.count as subscription_count,
sub.auto_add_to_library as subscription_auto_add_to_library,
sub.fetch_content as subscription_fetch_content,
sub.folder as subscription_folder,
CASE WHEN li.read_at is not NULL then 1 else 0 END as user_clicked,
CASE WHEN li.reading_progress_bottom_percent > 10 THEN 1 ELSE 0 END AS user_read,
CASE WHEN li.reading_progress_bottom_percent > 50 THEN 1 ELSE 0 END AS user_long_read
FROM omnivore.library_item AS li
LEFT JOIN omnivore.subscriptions sub on li.subscription = sub.name AND sub.user_id = li.user_id
WHERE li.created_at >= :start_date AND li.created_at <= :end_date;
""")
chunk_size = 100000 # Adjust based on available memory and performance needs
with tempfile.TemporaryDirectory() as tmpdir:
parquet_files = []
with engine.connect() as conn:
for i, chunk in enumerate(pd.read_sql(query, conn, params={'start_date': start_date_str, 'end_date': end_date_str}, chunksize=chunk_size)):
chunk['library_item_id'] = chunk['library_item_id'].astype(str)
chunk['user_id'] = chunk['user_id'].astype(str)
chunk['original_url_host'] = chunk['original_url'].apply(extract_host)
parquet_file = os.path.join(tmpdir, f'chunk_{i}.parquet')
chunk.to_parquet(parquet_file)
parquet_files.append(parquet_file)
concatenated_df = pd.concat([pd.read_parquet(file) for file in parquet_files], ignore_index=True)
parquet_buffer = BytesIO()
table = pa.Table.from_pandas(concatenated_df)
pq.write_table(table, parquet_buffer)
parquet_buffer.seek(0)
return parquet_buffer
def upload_raw_databuffer(feather_buffer, execution_date, gcs_bucket_name):
client = storage.Client()
bucket = client.bucket(gcs_bucket_name)
blob = bucket.blob(f'data/raw/library_items_{execution_date}.parquet')
blob.upload_from_file(feather_buffer, content_type='application/octet-stream')
print("Data stored successfully.")
def extract_and_upload_raw_data(execution_date, num_days_history, gcs_bucket_name):
buffer = fetch_raw_data(execution_date, int(num_days_history))
upload_raw_databuffer(buffer, execution_date, gcs_bucket_name)

View File

@ -0,0 +1,235 @@
# download raw user data, aggregate user history, and upload to GCS
import psycopg2
import numpy as np
import pandas as pd
from sqlalchemy import create_engine, text
from datetime import datetime, timedelta
import os
from io import BytesIO
import tempfile
import pickle
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.feather as feather
from google.cloud import storage
FEATURE_COLUMNS=[
# targets
# 'user_clicked', 'user_read', 'user_long_read',
# item attributes / user setup attributes
'item_word_count','item_has_site_icon', 'is_subscription',
'inbox_folder', 'has_author',
# how the user has setup the subscription
'is_newsletter', 'is_feed', 'days_since_subscribed',
'subscription_count', 'subscription_auto_add_to_library',
'subscription_fetch_content',
# user/item interaction history
'user_original_url_host_saved_count_week_1',
'user_original_url_host_interaction_count_week_1',
'user_original_url_host_rate_week_1',
'user_original_url_host_proportion_week_1',
'user_original_url_host_saved_count_week_2',
'user_original_url_host_interaction_count_week_2',
'user_original_url_host_rate_week_2',
'user_original_url_host_proportion_week_2',
'user_original_url_host_saved_count_week_3',
'user_original_url_host_interaction_count_week_3',
'user_original_url_host_rate_week_3',
'user_original_url_host_proportion_week_3',
'user_original_url_host_saved_count_week_4',
'user_original_url_host_interaction_count_week_4',
'user_original_url_host_rate_week_4',
'user_original_url_host_proportion_week_4',
'user_subscription_saved_count_week_1',
'user_subscription_interaction_count_week_1',
'user_subscription_rate_week_1', 'user_subscription_proportion_week_1',
'user_site_saved_count_week_3', 'user_site_interaction_count_week_3',
'user_site_rate_week_3', 'user_site_proportion_week_3',
'user_site_saved_count_week_2', 'user_site_interaction_count_week_2',
'user_site_rate_week_2', 'user_site_proportion_week_2',
'user_subscription_saved_count_week_2',
'user_subscription_interaction_count_week_2',
'user_subscription_rate_week_2', 'user_subscription_proportion_week_2',
'user_site_saved_count_week_1', 'user_site_interaction_count_week_1',
'user_site_rate_week_1', 'user_site_proportion_week_1',
'user_subscription_saved_count_week_3',
'user_subscription_interaction_count_week_3',
'user_subscription_rate_week_3', 'user_subscription_proportion_week_3',
'user_author_saved_count_week_4',
'user_author_interaction_count_week_4', 'user_author_rate_week_4',
'user_author_proportion_week_4', 'user_author_saved_count_week_1',
'user_author_interaction_count_week_1', 'user_author_rate_week_1',
'user_author_proportion_week_1', 'user_site_saved_count_week_4',
'user_site_interaction_count_week_4', 'user_site_rate_week_4',
'user_site_proportion_week_4', 'user_author_saved_count_week_2',
'user_author_interaction_count_week_2', 'user_author_rate_week_2',
'user_author_proportion_week_2', 'user_author_saved_count_week_3',
'user_author_interaction_count_week_3', 'user_author_rate_week_3',
'user_author_proportion_week_3', 'user_subscription_saved_count_week_4',
'user_subscription_interaction_count_week_4',
'user_subscription_rate_week_4', 'user_subscription_proportion_week_4'
]
def parquet_to_dataframe(file_path):
table = pq.read_table(file_path)
df = table.to_pandas()
return df
def load_local_raw_library_items():
local_file_path = '/Users/jacksonh/Downloads/data_raw_library_items_2024-03-01.parquet'
df = parquet_to_dataframe(local_file_path)
return df
def load_tables_from_pickle(pickle_file):
with open(pickle_file, 'rb') as handle:
tables = pickle.load(handle)
return tables
def download_raw_library_items(execution_date, gcs_bucket_name):
local_file_path = 'raw_library_items.parquet'
client = storage.Client()
bucket = client.bucket(gcs_bucket_name)
blob = bucket.blob(f'data/raw/library_items_{execution_date}.parquet')
blob.download_to_filename(local_file_path)
df = parquet_to_dataframe(local_file_path)
os.remove(local_file_path)
return df
def load_feather_files(feature_directory):
dataframes = {}
for file_name in os.listdir(feature_directory):
if file_name.endswith('.feather'):
file_path = os.path.join(feature_directory, file_name)
df_name = os.path.splitext(file_name)[0] # Use the file name (without extension) as key
table = feather.read_table(file_path)
dataframes[df_name] = table
return dataframes
# def save_tables_to_arrow_ipc(tables, output_file):
# with pa.OSFile(output_file, 'wb') as sink:
# with pa.ipc.new_stream(sink, tables[next(iter(tables))].schema) as writer:
# for name, table in tables.items():
# print("NAME:", name, "TABLE", table)
# writer.write_table(table)
def save_tables_to_arrow_ipc_with_schemas(tables, output_file):
with pa.OSFile(output_file, 'wb') as sink:
with pa.ipc.new_stream(sink, pa.schema([])) as writer:
for name, table in tables.items():
metadata = table.schema.metadata or {}
metadata = {**metadata, b'table_name': name.encode('utf-8')}
schema = table.schema.add_metadata(metadata)
print("NAME:", name, "TABLE", table)
writer.write_table(table.replace_schema_metadata(schema.metadata))
def save_tables_to_pickle(tables, output_file):
with open(output_file, 'wb') as handle:
pickle.dump(tables, handle, protocol=pickle.HIGHEST_PROTOCOL)
def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print(f'File {source_file_name} uploaded to {destination_blob_name} in bucket {bucket_name}.')
def generate_and_upload_user_history(execution_date, gcs_bucket_name):
df = download_raw_library_items(execution_date, gcs_bucket_name)
# df = load_local_raw_library_items()
with tempfile.TemporaryDirectory() as tmpdir:
user_preferences = aggregate_user_preferences(df, tmpdir)
dataframes = load_feather_files(tmpdir)
filename = os.path.join(tmpdir, 'user_features.pkl')
save_tables_to_pickle(dataframes, filename)
files = load_tables_from_pickle(filename)
print("GENERATED FEATURE TABLES:", files.keys())
for table in files.keys():
print("TABLE: ", table, "LEN: ", len(files[table]))
upload_to_gcs(gcs_bucket_name, filename, f'data/features/user_features.pkl')
def compute_dimension_aggregates(df, dimension, bucket_name):
# Compute initial aggregates to filter out items with less than 2 saved counts
initial_agg = df.groupby(['user_id', dimension]).size().reset_index(name='count')
filtered_df = df[df.set_index(['user_id', dimension]).index.isin(initial_agg[initial_agg['count'] >= 2].set_index(['user_id', dimension]).index)]
agg = filtered_df.groupby(['user_id', dimension]).agg(
saved_count=(dimension, 'count'),
interaction_count=('user_clicked', 'sum')
).reset_index()
agg[f'user_{dimension}_rate_{bucket_name}'] = agg['interaction_count'] / agg['saved_count']
agg[f'user_{dimension}_proportion_{bucket_name}'] = agg.groupby('user_id')['interaction_count'].transform(lambda x: x / x.sum())
agg = agg.rename(columns={
'saved_count': f'user_{dimension}_saved_count_{bucket_name}',
'interaction_count': f'user_{dimension}_interaction_count_{bucket_name}'
})
return agg
def calculate_and_save_aggregates(bucket_name, bucket_df, output_dir):
# Compute aggregates for each dimension
dimensions = ['author', 'site', 'original_url_host', 'subscription']
for dimension in dimensions:
agg_df = compute_dimension_aggregates(bucket_df, dimension, bucket_name)
# Save the aggregated DataFrame to a Feather file
filename = os.path.join(output_dir, f'user_{dimension}_{bucket_name}.feather')
save_aggregated_data(agg_df, filename)
print(f"Saved aggregated data for {dimension} in {bucket_name} to {filename}")
def save_aggregated_data(df, filename):
buffer = BytesIO()
df.to_feather(buffer)
buffer.seek(0)
with open(filename, 'wb') as f:
f.write(buffer.getbuffer())
def aggregate_user_preferences(df, output_dir):
# Convert 'created_at' to datetime
df['created_at'] = pd.to_datetime(df['created_at'])
end_date = df['created_at'].max()
# Define bucket ranges for the past four weeks
buckets = {
'week_4': (end_date - timedelta(weeks=4), end_date - timedelta(weeks=3)),
'week_3': (end_date - timedelta(weeks=3), end_date - timedelta(weeks=2)),
'week_2': (end_date - timedelta(weeks=2), end_date - timedelta(weeks=1)),
'week_1': (end_date - timedelta(weeks=1), end_date)
}
# Calculate aggregates for each bucket and save to file
for bucket_name, (start_date, end_date) in buckets.items():
bucket_df = df[(df['created_at'] >= start_date) & (df['created_at'] < end_date)]
calculate_and_save_aggregates(bucket_name, bucket_df, output_dir)
def create_and_upload_user_history(execution_date, num_days_history, gcs_bucket_name):
buffer = download_raw_library_items(execution_date, gcs_bucket_name)
buffer = open_raw_library_items()
upload_raw_databuffer(buffer, execution_date, gcs_bucket_name)

View File

@ -6,3 +6,5 @@ google-cloud-storage
flask flask
pydantic pydantic
sklearn2pmml sklearn2pmml
sqlalchemy
pyarrow

View File

@ -1,20 +1,27 @@
import psycopg2
import pandas as pd import pandas as pd
import joblib
from datetime import datetime
import os import os
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn2pmml import PMMLPipeline, sklearn2pmml
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, classification_report
import numpy as np import numpy as np
from datetime import datetime, timedelta
from sklearn.linear_model import SGDClassifier
from sklearn.ensemble import RandomForestClassifier, VotingClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report, confusion_matrix
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from sklearn2pmml import PMMLPipeline, sklearn2pmml
from google.cloud import storage from google.cloud import storage
from google.cloud.exceptions import PreconditionFailed from google.cloud.exceptions import PreconditionFailed
import pickle
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.feather as feather
from features.user_history import FEATURE_COLUMNS
DB_PARAMS = { DB_PARAMS = {
'dbname': os.getenv('DB_NAME') or 'omnivore', 'dbname': os.getenv('DB_NAME') or 'omnivore',
@ -24,276 +31,184 @@ DB_PARAMS = {
'port': os.getenv('DB_PORT') or '5432' 'port': os.getenv('DB_PORT') or '5432'
} }
def parquet_to_dataframe(file_path):
TRAIN_FEATURES = [ table = pq.read_table(file_path)
# "item_word_count", df = table.to_pandas()
"item_has_thumbnail",
"item_has_site_icon",
'user_30d_interactions_author_count',
'user_30d_interactions_site_count',
'user_30d_interactions_subscription_count',
'user_30d_interactions_author_rate',
'user_30d_interactions_site_rate',
'user_30d_interactions_subscription_rate',
'global_30d_interactions_site_count',
'global_30d_interactions_author_count',
'global_30d_interactions_subscription_count',
'global_30d_interactions_site_rate',
'global_30d_interactions_author_rate',
'global_30d_interactions_subscription_rate'
]
def fetch_data(sample_size):
# Connect to the PostgreSQL database
conn = psycopg2.connect(**DB_PARAMS)
cur = conn.cursor()
query = f"""
SELECT
user_id,
created_at,
item_folder,
item_type,
language,
content_reader,
directionality,
item_word_count,
item_has_thumbnail,
item_has_site_icon,
site,
author,
subscription,
item_subscription_type,
user_clicked,
user_read,
user_long_read
FROM user_7d_activity LIMIT {sample_size}
"""
cur.execute(query)
data = cur.fetchall()
cur.close()
conn.close()
columns = [
"user_id",
"created_at",
"item_folder",
"item_type",
"language",
"content_reader",
"directionality",
"item_word_count",
"item_has_thumbnail",
"item_has_site_icon",
"site",
"author",
"subscription",
"item_subscription_type",
"user_clicked",
"user_read",
"user_long_read",
]
df = pd.DataFrame(data, columns=columns)
return df return df
def save_to_pickle(object, output_file):
with open(output_file, 'wb') as handle:
pickle.dump(object, handle, protocol=pickle.HIGHEST_PROTOCOL)
def add_user_features(df, name, feature_name): def load_tables_from_pickle(pickle_file):
conn = psycopg2.connect(**DB_PARAMS) with open(pickle_file, 'rb') as handle:
cur = conn.cursor() tables = pickle.load(handle)
query = f"SELECT user_id, {name}, interactions, interaction_rate FROM {feature_name}" return tables
cur.execute(query) def load_dataframes_from_pickle(pickle_file):
data = cur.fetchall() result = {}
tables = load_tables_from_pickle(pickle_file)
cur.close() for table_name in tables.keys():
conn.close() result[table_name] = tables[table_name].to_pandas()
columns = [ return result
"user_id",
name,
"interactions",
"interaction_rate"
]
rate_feature_name = f"{feature_name}_rate"
count_feature_name = f"{feature_name}_count"
df_loaded = pd.DataFrame(data, columns=columns)
df_loaded = df_loaded.rename(columns={"interactions": count_feature_name}, errors="raise")
df_loaded = df_loaded.rename(columns={"interaction_rate": rate_feature_name}, errors="raise")
df_merged = pd.merge(df, df_loaded[['user_id', name, rate_feature_name, count_feature_name]], on=['user_id',name], how='left')
df_merged[rate_feature_name] = df_merged[rate_feature_name].fillna(0)
df_merged[count_feature_name] = df_merged[count_feature_name].fillna(0)
return df_merged
def add_global_features(df, name, feature_name): def download_from_gcs(bucket_name, source_blob_name, destination_file_name):
conn = psycopg2.connect(**DB_PARAMS) client = storage.Client()
cur = conn.cursor() bucket = client.bucket(bucket_name)
query = f"SELECT {name}, interactions, interaction_rate FROM {feature_name}" blob = bucket.blob(source_blob_name)
blob.download_to_filename(destination_file_name)
cur.execute(query) print(f'Blob {source_blob_name} downloaded to {destination_file_name}.')
data = cur.fetchall()
cur.close()
conn.close()
columns = [
name,
"interactions",
"interaction_rate"
]
rate_feature_name = f"{feature_name}_rate"
count_feature_name = f"{feature_name}_count"
df_loaded = pd.DataFrame(data, columns=columns)
df_loaded = df_loaded.rename(columns={"interactions": count_feature_name}, errors="raise")
df_loaded = df_loaded.rename(columns={"interaction_rate": rate_feature_name}, errors="raise")
df_merged = pd.merge(df, df_loaded[[name, count_feature_name, rate_feature_name]], on=name, how='left')
df_merged[rate_feature_name] = df_merged[rate_feature_name].fillna(0)
df_merged[count_feature_name] = df_merged[count_feature_name].fillna(0)
return df_merged
def add_dummy_features(df):
known_folder_types = ['inbox', 'following']
known_subscription_types = ['NEWSLETTER', 'RSS']
# known_item_types = ['ARTICLE', 'BOOK', 'FILE', 'HIGHLIGHTS', 'IMAGE', 'PROFILE', 'TWEET', 'UNKNOWN','VIDEO','WEBSITE']
#known_content_reader_types = ['WEB', 'PDF', 'EPUB']
# known_directionality_types = ['LTR', 'RTL']
folder_dummies = pd.get_dummies(df['item_folder'], columns=known_subscription_types, prefix='item_folder')
subscription_type_dummies = pd.get_dummies(df['item_subscription_type'], columns=known_subscription_types, prefix='item_subscription_type')
# item_type_dummies = pd.get_dummies(df['item_type'], columns=known_item_types, prefix='item_type')
# content_reader_dummies = pd.get_dummies(df['content_reader'], columns=known_content_reader_types, prefix='content_reader')
# directionality_dummies = pd.get_dummies(df['directionality'], columns=known_directionality_types, prefix='directionality')
# language_dummies = pd.get_dummies(df['language'], prefix='language')
# if 'title_topic' in df.columns:
# title_topic_dummies = pd.get_dummies(df['title_topic'], prefix='title_topic')
new_feature_names = list(subscription_type_dummies.columns) + list(folder_dummies.columns)
print("NEW FEATURE NAMES: ", new_feature_names)
# new_feature_names = list(item_type_dummies.columns) + list(content_reader_dummies.columns) + \
# list(directionality_dummies.columns) + list(language_dummies.columns)
# if 'title_topic' in df.columns:
# new_feature_names += list(title_topic_dummies.columns)
# , title_topic_dummies
return pd.concat([df, subscription_type_dummies, folder_dummies], axis=1), new_feature_names
def random_forest_predictor(df, feature_columns, user_interaction):
features = df[feature_columns]
features = features.fillna(0)
target = df[user_interaction]
X = features
y = target.values
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
scaler = StandardScaler()
rf_classifier = RandomForestClassifier(n_estimators=50, max_depth=10, random_state=42)
pipeline = PMMLPipeline([
("scaler", scaler),
("classifier", rf_classifier)
])
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)
feature_importance = rf_classifier.feature_importances_
print("Feature Importance:")
for feature, importance in zip(feature_columns, feature_importance):
print(f"{feature}: {importance}")
print("\nClassification Report:")
print(classification_report(y_test, y_pred))
return pipeline
def save_and_upload_model(pipeline, target_interaction_type, bucket_name):
pipeline_file_name = f'predict_{target_interaction_type}_random_forest_pipeline-v001.pkl'
joblib.dump(pipeline, pipeline_file_name)
if bucket_name:
upload_to_gcs(bucket_name, pipeline_file_name, f'models/{pipeline_file_name}')
else:
print("No GCS credentials so i am not uploading")
def upload_to_gcs(bucket_name, source_file_name, destination_blob_name): def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
"""Uploads a file to the bucket."""
storage_client = storage.Client() storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name) bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name) blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name) blob.upload_from_filename(source_file_name)
print(f"File {source_file_name} uploaded to {destination_blob_name}.") print(f"File {source_file_name} uploaded to {destination_blob_name}.")
def resample_data(df): def load_and_sample_library_items_from_parquet(raw_file_path, sample_size):
print("Initial distribution:\n", df['user_clicked'].value_counts()) df = parquet_to_dataframe(raw_file_path)
sampled_df = df.sample(frac=sample_size, random_state=42)
return sampled_df
# Separate the majority and minority classes
df_majority = df[df['user_clicked'] == False]
df_minority = df[df['user_clicked'] == True]
# Resample the minority class def merge_user_preference_data(sampled_raw_df, feature_dict):
df_minority_oversampled = df_minority.sample(n=len(df_majority), replace=True, random_state=42) # Start with the sampled raw DataFrame
merged_df = sampled_raw_df
# Combine the majority class with the oversampled minority class # Iterate through the files in the feature directory
df_balanced = pd.concat([df_majority, df_minority_oversampled]) for key in feature_dict.keys():
user_preference_df = feature_dict[key]
# Determine the dimension to join on
if 'author' in key:
merge_keys = ['user_id', 'author']
elif 'site' in key:
merge_keys = ['user_id', 'site']
elif 'subscription' in key:
merge_keys = ['user_id', 'subscription']
elif 'original_url_host' in key:
merge_keys = ['user_id', 'original_url_host']
else:
print("skipping feature: ", key)
continue # Skip files that don't match expected patterns
# Merge with the current user preference DataFrame
merged_df = pd.merge(merged_df, user_preference_df, on=merge_keys, how='left')
# Shuffle the DataFrame to mix the classes # Optionally, fill NaNs after each merge step to avoid growing NaNs
df_balanced = df_balanced.sample(frac=1, random_state=42).reset_index(drop=True) merged_df = merged_df.fillna(0)
return merged_df
# Check the new distribution def prepare_data(df):
print("Balanced distribution:\n", df_balanced['user_clicked'].value_counts()) df['created_at'] = pd.to_datetime(df['created_at'])
df['subscription_start_date'] = pd.to_datetime(df['subscription_start_date'], errors='coerce')
# Display the first few rows of the balanced DataFrame df['is_subscription'] = df['subscription'].apply(lambda x: 1 if pd.notna(x) and x != '' else 0)
print(df_balanced.head()) df['has_author'] = df['author'].apply(lambda x: 1 if pd.notna(x) and x != '' else 0)
# Calculate the days since subscribed
df['days_since_subscribed'] = (df['created_at'] - df['subscription_start_date']).dt.days
# Handle cases where subscription_start_date is NaT (Not a Time) or negative
df['days_since_subscribed'] = df['days_since_subscribed'].apply(lambda x: x if x >= 0 else 0)
df['days_since_subscribed'] = df['days_since_subscribed'].fillna(0).astype(int)
df['is_feed'] = df['subscription_type'].apply(lambda x: 1 if x == 'RSS' else 0)
df['is_newsletter'] = df['subscription_type'].apply(lambda x: 1 if x == 'NEWSLETTER' else 0)
df = df.dropna(subset=['user_clicked'])
# Fill NaNs in other columns with 0 (if any remain)
df = df.fillna(0)
X = df[FEATURE_COLUMNS] # .drop(columns=['user_id', 'user_clicked'])
Y = df['user_clicked']
return X, Y
def train_random_forest_model(X, Y):
model = RandomForestClassifier(
class_weight={0: 1, 1: 10},
n_estimators=10,
max_depth=10,
random_state=42
)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
X_train, X_test, Y_train, Y_test = train_test_split(X_scaled, Y, test_size=0.3, random_state=42)
pipeline = PMMLPipeline([
("scaler", scaler),
("classifier", model)
])
pipeline.fit(X_train, Y_train)
Y_pred = pipeline.predict(X_test)
print_classification_report(Y_test, Y_pred)
print_feature_importance(X, model)
return pipeline
def print_feature_importance(X, rf):
# Get feature importances
importances = rf.feature_importances_
# Get the indices of the features sorted by importance
indices = np.argsort(importances)[::-1]
# Print the feature ranking
print("Feature ranking:")
for f in range(X.shape[1]):
print(f"{f + 1}. feature {indices[f]} ({importances[indices[f]]:.4f}) - {X.columns[indices[f]]}")
def print_classification_report(Y_test, Y_pred):
report = classification_report(Y_test, Y_pred, target_names=['Not Clicked', 'Clicked'], output_dict=True)
print("Classification Report:")
print(f"Accuracy: {report['accuracy']:.4f}")
print(f"Precision (Not Clicked): {report['Not Clicked']['precision']:.4f}")
print(f"Recall (Not Clicked): {report['Not Clicked']['recall']:.4f}")
print(f"F1-Score (Not Clicked): {report['Not Clicked']['f1-score']:.4f}")
print(f"Precision (Clicked): {report['Clicked']['precision']:.4f}")
print(f"Recall (Clicked): {report['Clicked']['recall']:.4f}")
print(f"F1-Score (Clicked): {report['Clicked']['f1-score']:.4f}")
return df_balanced
def main(): def main():
sample_size = int(os.getenv('SAMPLE_SIZE')) or 1000 execution_date = os.getenv('EXECUTION_DATE')
num_days_history = int(os.getenv('NUM_DAYS_HISTORY')) or 21 num_days_history = os.getenv('NUM_DAYS_HISTORY')
gcs_bucket = os.getenv('GCS_BUCKET') gcs_bucket_name = os.getenv('GCS_BUCKET')
print("about to fetch library data") raw_data_path = f'raw_library_items_${execution_date}.parquet'
df = fetch_data(sample_size) user_history_path = 'features_user_features.pkl'
print("FETCHED", df) pipeline_path = 'predict_read_pipeline-v002.pkl'
df = add_user_features(df, 'author', 'user_30d_interactions_author') download_from_gcs(gcs_bucket_name, f'data/features/user_features.pkl', user_history_path)
df = add_user_features(df, 'site', 'user_30d_interactions_site') download_from_gcs(gcs_bucket_name, f'data/raw/library_items_{execution_date}.parquet', raw_data_path)
df = add_user_features(df, 'subscription', 'user_30d_interactions_subscription')
df = add_global_features(df, 'site', 'global_30d_interactions_site')
df = add_global_features(df, 'author', 'global_30d_interactions_author')
df = add_global_features(df, 'subscription', 'global_30d_interactions_subscription')
df = resample_data(df) sampled_raw_df = load_and_sample_library_items_from_parquet(raw_data_path, 0.10)
print("training RandomForest with number of library_items: ", len(df)) user_history = load_dataframes_from_pickle(user_history_path)
pipeline = random_forest_predictor(df, TRAIN_FEATURES, 'user_clicked')
print(f"uploading model and scaler to {gcs_bucket}") merged_df = merge_user_preference_data(sampled_raw_df, user_history)
save_and_upload_model(pipeline, 'user_clicked', gcs_bucket)
print("created merged data", merged_df.columns)
X, Y = prepare_data(merged_df)
random_forest_pipeline = train_random_forest_model(X, Y)
save_to_pickle(random_forest_pipeline, pipeline_path)
upload_to_gcs(gcs_bucket_name, pipeline_path, f'data/models/{pipeline_path}')
print("done")
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -97,15 +97,15 @@ describe('ArticleSavingRequest API', () => {
).to.eql(ArticleSavingRequestStatus.Processing) ).to.eql(ArticleSavingRequestStatus.Processing)
}) })
it('creates a library item in db', async () => { it('returns an error if the url is invalid', async () => {
const url = 'https://blog.omnivore.app/1' const res = await graphqlRequest(
await graphqlRequest( createArticleSavingRequestMutation('invalid url'),
createArticleSavingRequestMutation('https://blog.omnivore.app/1'),
authToken authToken
).expect(200) ).expect(200)
const item = await findLibraryItemByUrl(url, user.id) expect(res.body.data.createArticleSavingRequest.errorCodes).to.eql([
expect(item?.readableContent).to.eql('Your link is being saved...') CreateArticleSavingRequestErrorCode.BadData,
])
}) })
it('returns an error if the url is invalid', async () => { it('returns an error if the url is invalid', async () => {