diff --git a/ml/digest-score/Dockerfile b/ml/digest-score/Dockerfile new file mode 100644 index 000000000..0afa6baec --- /dev/null +++ b/ml/digest-score/Dockerfile @@ -0,0 +1,13 @@ +FROM python:3.8-slim + +WORKDIR /app + +ENV GRPC_PYTHON_BUILD_SYSTEM_OPENSSL "1" +ENV GRPC_PYTHON_BUILD_SYSTEM_ZLIB "1" + +COPY . /app + +RUN pip install --no-cache-dir -r requirements.txt + +EXPOSE 5000 +CMD ["python", "serve.py"] diff --git a/ml/digest-score/app.py b/ml/digest-score/app.py index 0d1765525..501a417be 100644 --- a/ml/digest-score/app.py +++ b/ml/digest-score/app.py @@ -1,8 +1,5 @@ -import psycopg2 - import logging from flask import Flask, request, jsonify -from pydantic import BaseModel, ConfigDict, ValidationError, conlist from typing import List @@ -10,58 +7,23 @@ import os import sys import json import pytz +import pickle import numpy as np import pandas as pd import joblib -from datetime import datetime, timedelta +from urllib.parse import urlparse from datetime import datetime import dateutil.parser from google.cloud import storage +from features.user_history import FEATURE_COLUMNS app = Flask(__name__) logging.basicConfig(level=logging.INFO, stream=sys.stdout) -TRAIN_FEATURES = [ - "item_has_thumbnail", - "item_has_site_icon", +USER_HISTORY_PATH = 'user_features.pkl' +MODEL_PIPELINE_PATH = 'predict_read_pipeline-v002.pkl' - 'user_30d_interactions_author_count', - 'user_30d_interactions_site_count', - 'user_30d_interactions_subscription_count', - - 'user_30d_interactions_author_rate', - 'user_30d_interactions_site_rate', - 'user_30d_interactions_subscription_rate', - - 'global_30d_interactions_site_count', - 'global_30d_interactions_author_count', - 'global_30d_interactions_subscription_count', - - 'global_30d_interactions_site_rate', - 'global_30d_interactions_author_rate', - 'global_30d_interactions_subscription_rate' -] - -DB_PARAMS = { - 'dbname': os.getenv('DB_NAME') or 'omnivore', - 'user': os.getenv('DB_USER'), - 'password': os.getenv('DB_PASSWORD'), - 'host': os.getenv('DB_HOST') or 'localhost', - 'port': os.getenv('DB_PORT') or '5432' -} - -USER_FEATURES = { - "site": "user_30d_interactions_site", - "author": "user_30d_interactions_author", - "subscription": "user_30d_interactions_subscription", -} - -GLOBAL_FEATURES = { - "site": "global_30d_interactions_site", - "author": "global_30d_interactions_author", - "subscription": "global_30d_interactions_subscription", -} def download_from_gcs(bucket_name, gcs_path, destination_path): storage_client = storage.Client() @@ -70,94 +32,44 @@ def download_from_gcs(bucket_name, gcs_path, destination_path): blob.download_to_filename(destination_path) -def load_pipeline(): - bucket_name = os.getenv('GCS_BUCKET') - pipeline_gcs_path = os.getenv('PIPELINE_GCS_PATH') - download_from_gcs(bucket_name, pipeline_gcs_path, '/tmp/pipeline.pkl') - pipeline = joblib.load('/tmp/pipeline.pkl') +def load_pipeline(path): + pipeline = joblib.load(path) return pipeline -def load_pipeline_local(): - pipeline = joblib.load('predict_user_clicked_random_forest_pipeline-v001.pkl') - return pipeline +def load_tables_from_pickle(path): + with open(path, 'rb') as handle: + tables = pickle.load(handle) + return tables -def fetch_user_features(name, feature_name): - conn = psycopg2.connect(**DB_PARAMS) - cur = conn.cursor() - query = f"SELECT user_id, {name}, interactions, interaction_rate FROM {feature_name}" - - cur.execute(query) - data = cur.fetchall() - - cur.close() - conn.close() - columns = [ - "user_id", - name, - "interactions", - "interaction_rate" - ] - - rate_feature_name = f"{feature_name}_rate" - count_feature_name = f"{feature_name}_count" - - df_loaded = pd.DataFrame(data, columns=columns) - df_loaded = df_loaded.rename(columns={"interactions": count_feature_name}, errors="raise") - df_loaded = df_loaded.rename(columns={"interaction_rate": rate_feature_name}, errors="raise") - df_loaded[rate_feature_name] = df_loaded[rate_feature_name].fillna(0) - df_loaded[count_feature_name] = df_loaded[count_feature_name].fillna(0) - - return df_loaded - - -def fetch_global_features(name, feature_name): - conn = psycopg2.connect(**DB_PARAMS) - cur = conn.cursor() - query = f"SELECT {name}, interactions, interaction_rate FROM {feature_name}" - - cur.execute(query) - data = cur.fetchall() - - cur.close() - conn.close() - columns = [ - name, - "interactions", - "interaction_rate" - ] - - rate_feature_name = f"{feature_name}_rate" - count_feature_name = f"{feature_name}_count" - - df_loaded = pd.DataFrame(data, columns=columns) - df_loaded = df_loaded.rename(columns={"interactions": count_feature_name}, errors="raise") - df_loaded = df_loaded.rename(columns={"interaction_rate": rate_feature_name}, errors="raise") - df_loaded[rate_feature_name] = df_loaded[rate_feature_name].fillna(0) - df_loaded[count_feature_name] = df_loaded[count_feature_name].fillna(0) - - return df_loaded - - -def load_user_features(): +def load_user_features(path): result = {} - for view_name in USER_FEATURES.keys(): - key_name = USER_FEATURES[view_name] - result[key_name] = fetch_user_features(view_name, key_name) - app.logger.info(f"loaded {len(result[key_name])} features for {key_name}") + tables = load_tables_from_pickle(path) + for table_name in tables.keys(): + result[table_name] = tables[table_name].to_pandas() return result -def load_global_features(): +def dataframe_to_dict(df): result = {} - for view_name in GLOBAL_FEATURES.keys(): - key_name = GLOBAL_FEATURES[view_name] - result[key_name] = fetch_global_features(view_name, key_name) - app.logger.info(f"loaded {len(result[key_name])} features for {key_name}") + for index, row in df.iterrows(): + user_id = row['user_id'] + if user_id not in result: + result[user_id] = [] + result[user_id].append(row.to_dict()) return result +def merge_dicts(dict1, dict2): + for key, value in dict2.items(): + if key in dict1: + dict1[key].extend(value) + else: + dict1[key] = value + return dict1 + + def compute_score(user_id, item_features): interaction_score = compute_interaction_score(user_id, item_features) return { @@ -166,86 +78,55 @@ def compute_score(user_id, item_features): } -def compute_time_bonus_score(item_features): - saved_at = item_features['saved_at'] - current_time = datetime.now(pytz.utc) - time_diff_hours = (current_time - saved_at).total_seconds() / 3600 - max_diff_hours = 3 * 24 - if time_diff_hours >= max_diff_hours: - return 0.0 - else: - return max(0.0, min(1.0, 1 - (time_diff_hours / max_diff_hours))) - - def compute_interaction_score(user_id, item_features): - print('item_features', item_features) + original_url_host = urlparse(item_features.get('original_url')).netloc df_test = pd.DataFrame([{ 'user_id': user_id, 'author': item_features.get('author'), 'site': item_features.get('site'), 'subscription': item_features.get('subscription'), + 'original_url_host': original_url_host, 'item_has_thumbnail': 1 if item_features.get('has_thumbnail') else 0, "item_has_site_icon": 1 if item_features.get('has_site_icon') else 0, + + 'item_word_count': item_features.get('words_count'), + 'is_subscription': 1 if item_features.get('is_subscription') else 0, + 'is_newsletter': 1 if item_features.get('is_newsletter') else 0, + 'is_feed': 1 if item_features.get('is_feed') else 0, + 'days_since_subscribed': item_features.get('days_since_subscribed'), + 'subscription_count': item_features.get('subscription_count'), + 'subscription_auto_add_to_library': item_features.get('subscription_auto_add_to_library'), + 'subscription_fetch_content': item_features.get('subscription_fetch_content'), + + 'has_author': 1 if item_features.get('author') else 0, + 'inbox_folder': 1 if item_features.get('folder') == 'inbox' else 0, }]) - for name in USER_FEATURES.keys(): - feature_name = USER_FEATURES[name] - df_feature = user_features[feature_name] - df_test = df_test.merge(df_feature, on=['user_id', name], how='left') - df_test[f"{feature_name}_rate"] = df_test[f"{feature_name}_rate"].fillna(0) - df_test[f"{feature_name}_count"] = df_test[f"{feature_name}_count"].fillna(0) + for name, df in user_features.items(): + df = df[df['user_id'] == user_id] + if 'author' in name: + merge_keys = ['user_id', 'author'] + elif 'site' in name: + merge_keys = ['user_id', 'site'] + elif 'subscription' in name: + merge_keys = ['user_id', 'subscription'] + elif 'original_url_host' in name: + merge_keys = ['user_id', 'original_url_host'] + else: + print("skipping feature: ", name) + continue - for name in GLOBAL_FEATURES.keys(): - feature_name = GLOBAL_FEATURES[name] - df_feature = global_features[feature_name] - df_test = df_test.merge(df_feature, on=name, how='left') - df_test[f"{feature_name}_rate"] = df_test[f"{feature_name}_rate"].fillna(0) - df_test[f"{feature_name}_count"] = df_test[f"{feature_name}_count"].fillna(0) + df_test = pd.merge(df_test, df, on=merge_keys, how='left') + df_test = df_test.fillna(0) + df_predict = df_test[FEATURE_COLUMNS] - df_predict = df_test[TRAIN_FEATURES] - - # Print out the columns with values, so we can know how sparse our data is - #scored_columns = df_predict.columns[(df_predict.notnull() & (df_predict != 0)).any()].tolist() - #print("scored columns", scored_columns) interaction_score = pipeline.predict_proba(df_predict) + print('score', interaction_score, 'item_features', df_test[df_test != 0].stack()) return interaction_score[0][1] -def get_library_item(library_item_id): - conn = psycopg2.connect(**DB_PARAMS) - cur = conn.cursor() - query = """ - SELECT - li.title, - li.author, - li.saved_at, - li.site_name as site, - li.item_language as language, - li.subscription, - li.word_count, - li.directionality, - CASE WHEN li.thumbnail IS NOT NULL then 1 else 0 END as has_thumbnail, - CASE WHEN li.site_icon IS NOT NULL then 1 else 0 END as has_site_icon - FROM omnivore.library_item li - WHERE li.id = %s - """ - - cur.execute(query, (library_item_id,)) - - data = cur.fetchone() - columns = [desc[0] for desc in cur.description] - cur.close() - conn.close() - - if data: - item_dict = dict(zip(columns, data)) - return item_dict - else: - return None - - @app.route('/_ah/health', methods=['GET']) def ready(): return jsonify({'OK': 'yes'}), 200 @@ -254,29 +135,17 @@ def ready(): @app.route('/users//features', methods=['GET']) def get_user_features(user_id): result = {} + df_user = pd.DataFrame([{ + 'user_id': user_id, + }]) - for name in USER_FEATURES.keys(): - feature_name = USER_FEATURES[name] - rate_feature_name = f"{feature_name}_rate" - count_feature_name = f"{feature_name}_count" - df_feature = user_features[feature_name] - df_filtered = df_feature[df_feature['user_id'] == user_id] - if not df_filtered.empty: - rate = df_filtered[[name, rate_feature_name]].dropna().to_dict(orient='records') - count = df_filtered[[name, count_feature_name]].dropna().to_dict(orient='records') - result[feature_name] = { - 'rate': rate, - 'count': count - } + user_data = {} + for name, df in user_features.items(): + df = df[df['user_id'] == user_id] + df_dict = dataframe_to_dict(df) + user_data = merge_dicts(user_data, df_dict) - return jsonify(result), 200 - - -@app.route('/users//library_items//score', methods=['GET']) -def get_library_item_score(user_id, library_item_id): - item_features = get_library_item(library_item_id) - score = compute_score(user_id, item_features) - return jsonify({'score': score}) + return jsonify(user_data), 200 @app.route('/predict', methods=['POST']) @@ -287,7 +156,6 @@ def predict(): user_id = data.get('user_id') item_features = data.get('item_features') - item_features['saved_at'] = dateutil.parser.isoparse(item_features['saved_at']) if user_id is None: return jsonify({'error': 'Missing user_id'}), 400 @@ -316,7 +184,6 @@ def batch(): print('key": ', key) print('item: ', item) library_item_id = item['library_item_id'] - item['saved_at'] = dateutil.parser.isoparse(item['saved_at']) result[library_item_id] = compute_score(user_id, item) return jsonify(result) @@ -325,14 +192,15 @@ def batch(): return jsonify({'error': str(e)}), 500 -if os.getenv('LOAD_LOCAL_MODEL'): - pipeline = load_pipeline_local() -else: - pipeline = load_pipeline() +if os.getenv('LOAD_LOCAL_MODEL') != None: + gcs_bucket_name = os.getenv('GCS_BUCKET') + download_from_gcs(gcs_bucket_name, f'data/features/user_features.pkl', USER_HISTORY_PATH) + download_from_gcs(gcs_bucket_name, f'data/models/predict_read_pipeline-v002.pkl', MODEL_PIPELINE_PATH) -user_features = load_user_features() -global_features = load_global_features() +pipeline = load_pipeline(MODEL_PIPELINE_PATH) +user_features = load_user_features(USER_HISTORY_PATH) +print('loaded pipeline and user_features', pipeline, user_features) if __name__ == '__main__': app.run(debug=True, port=5000) \ No newline at end of file diff --git a/ml/digest-score/features.py b/ml/digest-score/features.py new file mode 100644 index 000000000..47766c244 --- /dev/null +++ b/ml/digest-score/features.py @@ -0,0 +1,31 @@ +import psycopg2 +import numpy as np +import pandas as pd +from sqlalchemy import create_engine, text +from datetime import datetime, timedelta + +import os +from io import BytesIO +import tempfile + +import pyarrow as pa +import pyarrow.parquet as pq +from google.cloud import storage + +from features.extract import extract_and_upload_raw_data +from features.user_history import generate_and_upload_user_history + + + +def main(): + execution_date = os.getenv('EXECUTION_DATE') + num_days_history = os.getenv('NUM_DAYS_HISTORY') + gcs_bucket_name = os.getenv('GCS_BUCKET') + + extract_and_upload_raw_data(execution_date, num_days_history, gcs_bucket_name) + generate_and_upload_user_history(execution_date, gcs_bucket_name) + + print("done") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/ml/digest-score/features/__init__.py b/ml/digest-score/features/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ml/digest-score/features/extract.py b/ml/digest-score/features/extract.py new file mode 100644 index 000000000..58e70ecf8 --- /dev/null +++ b/ml/digest-score/features/extract.py @@ -0,0 +1,109 @@ +# extract and upload raw data used for feature generation + +import psycopg2 +import numpy as np +import pandas as pd +from sqlalchemy import create_engine, text +from datetime import datetime, timedelta + +import os +from io import BytesIO +import tempfile + +import pyarrow as pa +import pyarrow.parquet as pq +from google.cloud import storage + +DB_PARAMS = { + 'dbname': os.getenv('DB_NAME') or 'omnivore', + 'user': os.getenv('DB_USER'), + 'password': os.getenv('DB_PASSWORD'), + 'host': os.getenv('DB_HOST') or 'localhost', + 'port': os.getenv('DB_PORT') or '5432' +} + +def extract_host(url): + try: + return urlparse(url).netloc + except Exception as e: + return None + +def fetch_raw_data(date_str, num_days_history): + end_date = pd.to_datetime(date_str) + start_date = end_date - timedelta(days=num_days_history) + start_date_str = start_date.strftime('%Y-%m-%d 00:00:00') + end_date_str = end_date.strftime('%Y-%m-%d 23:59:59') + + conn_str = f"postgresql://{DB_PARAMS['user']}:{DB_PARAMS['password']}@{DB_PARAMS['host']}:{DB_PARAMS['port']}/{DB_PARAMS['dbname']}" + # conn_str = f"postgresql://{DB_PARAMS['host']}:{DB_PARAMS['port']}/{DB_PARAMS['dbname']}" + engine = create_engine(conn_str) + + query = text(""" + SELECT + li.id as library_item_id, + li.user_id, + li.created_at, + li.archived_at, + li.deleted_at, + CASE WHEN li.folder = 'inbox' then 1 else 0 END as inbox_folder, + li.item_type, + li.item_language AS language, + li.content_reader, + li.word_count as item_word_count, + CASE WHEN li.thumbnail IS NOT NULL then 1 else 0 END as item_has_thumbnail, + CASE WHEN li.site_icon IS NOT NULL then 1 else 0 END as item_has_site_icon, + li.original_url, + li.site_name AS site, + li.author, + li.subscription, + sub.type as subscription_type, + sub.created_at as subscription_start_date, + sub.count as subscription_count, + sub.auto_add_to_library as subscription_auto_add_to_library, + sub.fetch_content as subscription_fetch_content, + sub.folder as subscription_folder, + CASE WHEN li.read_at is not NULL then 1 else 0 END as user_clicked, + CASE WHEN li.reading_progress_bottom_percent > 10 THEN 1 ELSE 0 END AS user_read, + CASE WHEN li.reading_progress_bottom_percent > 50 THEN 1 ELSE 0 END AS user_long_read + FROM omnivore.library_item AS li + LEFT JOIN omnivore.subscriptions sub on li.subscription = sub.name AND sub.user_id = li.user_id + WHERE li.created_at >= :start_date AND li.created_at <= :end_date; + """) + + chunk_size = 100000 # Adjust based on available memory and performance needs + + with tempfile.TemporaryDirectory() as tmpdir: + parquet_files = [] + with engine.connect() as conn: + for i, chunk in enumerate(pd.read_sql(query, conn, params={'start_date': start_date_str, 'end_date': end_date_str}, chunksize=chunk_size)): + chunk['library_item_id'] = chunk['library_item_id'].astype(str) + chunk['user_id'] = chunk['user_id'].astype(str) + chunk['original_url_host'] = chunk['original_url'].apply(extract_host) + + parquet_file = os.path.join(tmpdir, f'chunk_{i}.parquet') + chunk.to_parquet(parquet_file) + parquet_files.append(parquet_file) + + concatenated_df = pd.concat([pd.read_parquet(file) for file in parquet_files], ignore_index=True) + + parquet_buffer = BytesIO() + table = pa.Table.from_pandas(concatenated_df) + pq.write_table(table, parquet_buffer) + parquet_buffer.seek(0) + + return parquet_buffer + + +def upload_raw_databuffer(feather_buffer, execution_date, gcs_bucket_name): + client = storage.Client() + bucket = client.bucket(gcs_bucket_name) + blob = bucket.blob(f'data/raw/library_items_{execution_date}.parquet') + blob.upload_from_file(feather_buffer, content_type='application/octet-stream') + + print("Data stored successfully.") + + +def extract_and_upload_raw_data(execution_date, num_days_history, gcs_bucket_name): + buffer = fetch_raw_data(execution_date, int(num_days_history)) + upload_raw_databuffer(buffer, execution_date, gcs_bucket_name) + diff --git a/ml/digest-score/features/user_history.py b/ml/digest-score/features/user_history.py new file mode 100644 index 000000000..3906e6170 --- /dev/null +++ b/ml/digest-score/features/user_history.py @@ -0,0 +1,235 @@ +# download raw user data, aggregate user history, and upload to GCS + +import psycopg2 +import numpy as np +import pandas as pd +from sqlalchemy import create_engine, text +from datetime import datetime, timedelta + +import os +from io import BytesIO +import tempfile + +import pickle +import pyarrow as pa +import pyarrow.parquet as pq +import pyarrow.feather as feather +from google.cloud import storage + +FEATURE_COLUMNS=[ + # targets + # 'user_clicked', 'user_read', 'user_long_read', + + # item attributes / user setup attributes + 'item_word_count','item_has_site_icon', 'is_subscription', + 'inbox_folder', 'has_author', + + # how the user has setup the subscription + 'is_newsletter', 'is_feed', 'days_since_subscribed', + 'subscription_count', 'subscription_auto_add_to_library', + 'subscription_fetch_content', + + # user/item interaction history + 'user_original_url_host_saved_count_week_1', + 'user_original_url_host_interaction_count_week_1', + 'user_original_url_host_rate_week_1', + 'user_original_url_host_proportion_week_1', + + 'user_original_url_host_saved_count_week_2', + 'user_original_url_host_interaction_count_week_2', + 'user_original_url_host_rate_week_2', + 'user_original_url_host_proportion_week_2', + 'user_original_url_host_saved_count_week_3', + 'user_original_url_host_interaction_count_week_3', + 'user_original_url_host_rate_week_3', + 'user_original_url_host_proportion_week_3', + 'user_original_url_host_saved_count_week_4', + 'user_original_url_host_interaction_count_week_4', + 'user_original_url_host_rate_week_4', + 'user_original_url_host_proportion_week_4', + + 'user_subscription_saved_count_week_1', + 'user_subscription_interaction_count_week_1', + 'user_subscription_rate_week_1', 'user_subscription_proportion_week_1', + 'user_site_saved_count_week_3', 'user_site_interaction_count_week_3', + 'user_site_rate_week_3', 'user_site_proportion_week_3', + 'user_site_saved_count_week_2', 'user_site_interaction_count_week_2', + 'user_site_rate_week_2', 'user_site_proportion_week_2', + 'user_subscription_saved_count_week_2', + 'user_subscription_interaction_count_week_2', + 'user_subscription_rate_week_2', 'user_subscription_proportion_week_2', + 'user_site_saved_count_week_1', 'user_site_interaction_count_week_1', + 'user_site_rate_week_1', 'user_site_proportion_week_1', + 'user_subscription_saved_count_week_3', + 'user_subscription_interaction_count_week_3', + 'user_subscription_rate_week_3', 'user_subscription_proportion_week_3', + 'user_author_saved_count_week_4', + 'user_author_interaction_count_week_4', 'user_author_rate_week_4', + 'user_author_proportion_week_4', 'user_author_saved_count_week_1', + 'user_author_interaction_count_week_1', 'user_author_rate_week_1', + 'user_author_proportion_week_1', 'user_site_saved_count_week_4', + 'user_site_interaction_count_week_4', 'user_site_rate_week_4', + 'user_site_proportion_week_4', 'user_author_saved_count_week_2', + 'user_author_interaction_count_week_2', 'user_author_rate_week_2', + 'user_author_proportion_week_2', 'user_author_saved_count_week_3', + 'user_author_interaction_count_week_3', 'user_author_rate_week_3', + 'user_author_proportion_week_3', 'user_subscription_saved_count_week_4', + 'user_subscription_interaction_count_week_4', + 'user_subscription_rate_week_4', 'user_subscription_proportion_week_4' +] + +def parquet_to_dataframe(file_path): + table = pq.read_table(file_path) + df = table.to_pandas() + return df + +def load_local_raw_library_items(): + local_file_path = '/Users/jacksonh/Downloads/data_raw_library_items_2024-03-01.parquet' + df = parquet_to_dataframe(local_file_path) + return df + +def load_tables_from_pickle(pickle_file): + with open(pickle_file, 'rb') as handle: + tables = pickle.load(handle) + return tables + + +def download_raw_library_items(execution_date, gcs_bucket_name): + local_file_path = 'raw_library_items.parquet' + + client = storage.Client() + bucket = client.bucket(gcs_bucket_name) + blob = bucket.blob(f'data/raw/library_items_{execution_date}.parquet') + blob.download_to_filename(local_file_path) + + df = parquet_to_dataframe(local_file_path) + + os.remove(local_file_path) + return df + + +def load_feather_files(feature_directory): + dataframes = {} + for file_name in os.listdir(feature_directory): + if file_name.endswith('.feather'): + file_path = os.path.join(feature_directory, file_name) + df_name = os.path.splitext(file_name)[0] # Use the file name (without extension) as key + table = feather.read_table(file_path) + dataframes[df_name] = table + return dataframes + + +# def save_tables_to_arrow_ipc(tables, output_file): +# with pa.OSFile(output_file, 'wb') as sink: +# with pa.ipc.new_stream(sink, tables[next(iter(tables))].schema) as writer: +# for name, table in tables.items(): +# print("NAME:", name, "TABLE", table) +# writer.write_table(table) + + +def save_tables_to_arrow_ipc_with_schemas(tables, output_file): + with pa.OSFile(output_file, 'wb') as sink: + with pa.ipc.new_stream(sink, pa.schema([])) as writer: + for name, table in tables.items(): + metadata = table.schema.metadata or {} + metadata = {**metadata, b'table_name': name.encode('utf-8')} + schema = table.schema.add_metadata(metadata) + print("NAME:", name, "TABLE", table) + writer.write_table(table.replace_schema_metadata(schema.metadata)) + + +def save_tables_to_pickle(tables, output_file): + with open(output_file, 'wb') as handle: + pickle.dump(tables, handle, protocol=pickle.HIGHEST_PROTOCOL) + + +def upload_to_gcs(bucket_name, source_file_name, destination_blob_name): + client = storage.Client() + bucket = client.bucket(bucket_name) + blob = bucket.blob(destination_blob_name) + blob.upload_from_filename(source_file_name) + print(f'File {source_file_name} uploaded to {destination_blob_name} in bucket {bucket_name}.') + + +def generate_and_upload_user_history(execution_date, gcs_bucket_name): + df = download_raw_library_items(execution_date, gcs_bucket_name) + # df = load_local_raw_library_items() + with tempfile.TemporaryDirectory() as tmpdir: + user_preferences = aggregate_user_preferences(df, tmpdir) + dataframes = load_feather_files(tmpdir) + filename = os.path.join(tmpdir, 'user_features.pkl') + save_tables_to_pickle(dataframes, filename) + files = load_tables_from_pickle(filename) + print("GENERATED FEATURE TABLES:", files.keys()) + for table in files.keys(): + print("TABLE: ", table, "LEN: ", len(files[table])) + upload_to_gcs(gcs_bucket_name, filename, f'data/features/user_features.pkl') + + + +def compute_dimension_aggregates(df, dimension, bucket_name): + # Compute initial aggregates to filter out items with less than 2 saved counts + initial_agg = df.groupby(['user_id', dimension]).size().reset_index(name='count') + filtered_df = df[df.set_index(['user_id', dimension]).index.isin(initial_agg[initial_agg['count'] >= 2].set_index(['user_id', dimension]).index)] + + agg = filtered_df.groupby(['user_id', dimension]).agg( + saved_count=(dimension, 'count'), + interaction_count=('user_clicked', 'sum') + ).reset_index() + + agg[f'user_{dimension}_rate_{bucket_name}'] = agg['interaction_count'] / agg['saved_count'] + agg[f'user_{dimension}_proportion_{bucket_name}'] = agg.groupby('user_id')['interaction_count'].transform(lambda x: x / x.sum()) + + agg = agg.rename(columns={ + 'saved_count': f'user_{dimension}_saved_count_{bucket_name}', + 'interaction_count': f'user_{dimension}_interaction_count_{bucket_name}' + }) + + return agg + +def calculate_and_save_aggregates(bucket_name, bucket_df, output_dir): + # Compute aggregates for each dimension + dimensions = ['author', 'site', 'original_url_host', 'subscription'] + for dimension in dimensions: + agg_df = compute_dimension_aggregates(bucket_df, dimension, bucket_name) + + # Save the aggregated DataFrame to a Feather file + filename = os.path.join(output_dir, f'user_{dimension}_{bucket_name}.feather') + save_aggregated_data(agg_df, filename) + print(f"Saved aggregated data for {dimension} in {bucket_name} to {filename}") + + +def save_aggregated_data(df, filename): + buffer = BytesIO() + df.to_feather(buffer) + buffer.seek(0) + + with open(filename, 'wb') as f: + f.write(buffer.getbuffer()) + + +def aggregate_user_preferences(df, output_dir): + # Convert 'created_at' to datetime + df['created_at'] = pd.to_datetime(df['created_at']) + + end_date = df['created_at'].max() + + # Define bucket ranges for the past four weeks + buckets = { + 'week_4': (end_date - timedelta(weeks=4), end_date - timedelta(weeks=3)), + 'week_3': (end_date - timedelta(weeks=3), end_date - timedelta(weeks=2)), + 'week_2': (end_date - timedelta(weeks=2), end_date - timedelta(weeks=1)), + 'week_1': (end_date - timedelta(weeks=1), end_date) + } + + # Calculate aggregates for each bucket and save to file + for bucket_name, (start_date, end_date) in buckets.items(): + bucket_df = df[(df['created_at'] >= start_date) & (df['created_at'] < end_date)] + calculate_and_save_aggregates(bucket_name, bucket_df, output_dir) + + + +def create_and_upload_user_history(execution_date, num_days_history, gcs_bucket_name): + buffer = download_raw_library_items(execution_date, gcs_bucket_name) + buffer = open_raw_library_items() + upload_raw_databuffer(buffer, execution_date, gcs_bucket_name) \ No newline at end of file diff --git a/ml/digest-score/requirements.txt b/ml/digest-score/requirements.txt index b0e2f280e..d62f7cf55 100644 --- a/ml/digest-score/requirements.txt +++ b/ml/digest-score/requirements.txt @@ -6,3 +6,5 @@ google-cloud-storage flask pydantic sklearn2pmml +sqlalchemy +pyarrow diff --git a/ml/digest-score/train.py b/ml/digest-score/train.py index 2789a2221..fdd4d0b52 100644 --- a/ml/digest-score/train.py +++ b/ml/digest-score/train.py @@ -1,20 +1,27 @@ -import psycopg2 import pandas as pd -import joblib -from datetime import datetime - import os -from sklearn.model_selection import train_test_split -from sklearn.preprocessing import StandardScaler -from sklearn.ensemble import RandomForestClassifier -from sklearn2pmml import PMMLPipeline, sklearn2pmml -from sklearn.pipeline import Pipeline -from sklearn.metrics import accuracy_score, classification_report import numpy as np +from datetime import datetime, timedelta + +from sklearn.linear_model import SGDClassifier +from sklearn.ensemble import RandomForestClassifier, VotingClassifier + +from sklearn.preprocessing import StandardScaler + +from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report, confusion_matrix +from sklearn.utils import shuffle +from sklearn.model_selection import train_test_split +from sklearn2pmml import PMMLPipeline, sklearn2pmml from google.cloud import storage from google.cloud.exceptions import PreconditionFailed +import pickle +import pyarrow as pa +import pyarrow.parquet as pq +import pyarrow.feather as feather + +from features.user_history import FEATURE_COLUMNS DB_PARAMS = { 'dbname': os.getenv('DB_NAME') or 'omnivore', @@ -24,276 +31,184 @@ DB_PARAMS = { 'port': os.getenv('DB_PORT') or '5432' } - -TRAIN_FEATURES = [ - # "item_word_count", - "item_has_thumbnail", - "item_has_site_icon", - - 'user_30d_interactions_author_count', - 'user_30d_interactions_site_count', - 'user_30d_interactions_subscription_count', - - 'user_30d_interactions_author_rate', - 'user_30d_interactions_site_rate', - 'user_30d_interactions_subscription_rate', - - 'global_30d_interactions_site_count', - 'global_30d_interactions_author_count', - 'global_30d_interactions_subscription_count', - - 'global_30d_interactions_site_rate', - 'global_30d_interactions_author_rate', - 'global_30d_interactions_subscription_rate' -] - - -def fetch_data(sample_size): - # Connect to the PostgreSQL database - conn = psycopg2.connect(**DB_PARAMS) - cur = conn.cursor() - query = f""" - SELECT - user_id, - created_at, - item_folder, - item_type, - language, - content_reader, - directionality, - item_word_count, - item_has_thumbnail, - item_has_site_icon, - site, - author, - subscription, - item_subscription_type, - user_clicked, - user_read, - user_long_read - - FROM user_7d_activity LIMIT {sample_size} - """ - - cur.execute(query) - data = cur.fetchall() - - cur.close() - conn.close() - columns = [ - "user_id", - "created_at", - "item_folder", - "item_type", - "language", - "content_reader", - "directionality", - "item_word_count", - "item_has_thumbnail", - "item_has_site_icon", - "site", - "author", - "subscription", - "item_subscription_type", - "user_clicked", - "user_read", - "user_long_read", - ] - - df = pd.DataFrame(data, columns=columns) +def parquet_to_dataframe(file_path): + table = pq.read_table(file_path) + df = table.to_pandas() return df +def save_to_pickle(object, output_file): + with open(output_file, 'wb') as handle: + pickle.dump(object, handle, protocol=pickle.HIGHEST_PROTOCOL) -def add_user_features(df, name, feature_name): - conn = psycopg2.connect(**DB_PARAMS) - cur = conn.cursor() - query = f"SELECT user_id, {name}, interactions, interaction_rate FROM {feature_name}" +def load_tables_from_pickle(pickle_file): + with open(pickle_file, 'rb') as handle: + tables = pickle.load(handle) + return tables - cur.execute(query) - data = cur.fetchall() - - cur.close() - conn.close() - columns = [ - "user_id", - name, - "interactions", - "interaction_rate" - ] - - rate_feature_name = f"{feature_name}_rate" - count_feature_name = f"{feature_name}_count" - - df_loaded = pd.DataFrame(data, columns=columns) - df_loaded = df_loaded.rename(columns={"interactions": count_feature_name}, errors="raise") - df_loaded = df_loaded.rename(columns={"interaction_rate": rate_feature_name}, errors="raise") - df_merged = pd.merge(df, df_loaded[['user_id', name, rate_feature_name, count_feature_name]], on=['user_id',name], how='left') - - df_merged[rate_feature_name] = df_merged[rate_feature_name].fillna(0) - df_merged[count_feature_name] = df_merged[count_feature_name].fillna(0) - - return df_merged +def load_dataframes_from_pickle(pickle_file): + result = {} + tables = load_tables_from_pickle(pickle_file) + for table_name in tables.keys(): + result[table_name] = tables[table_name].to_pandas() + return result -def add_global_features(df, name, feature_name): - conn = psycopg2.connect(**DB_PARAMS) - cur = conn.cursor() - query = f"SELECT {name}, interactions, interaction_rate FROM {feature_name}" - - cur.execute(query) - data = cur.fetchall() - - cur.close() - conn.close() - columns = [ - name, - "interactions", - "interaction_rate" - ] - - rate_feature_name = f"{feature_name}_rate" - count_feature_name = f"{feature_name}_count" - - df_loaded = pd.DataFrame(data, columns=columns) - df_loaded = df_loaded.rename(columns={"interactions": count_feature_name}, errors="raise") - df_loaded = df_loaded.rename(columns={"interaction_rate": rate_feature_name}, errors="raise") - - df_merged = pd.merge(df, df_loaded[[name, count_feature_name, rate_feature_name]], on=name, how='left') - - df_merged[rate_feature_name] = df_merged[rate_feature_name].fillna(0) - df_merged[count_feature_name] = df_merged[count_feature_name].fillna(0) - return df_merged - - -def add_dummy_features(df): - known_folder_types = ['inbox', 'following'] - known_subscription_types = ['NEWSLETTER', 'RSS'] - # known_item_types = ['ARTICLE', 'BOOK', 'FILE', 'HIGHLIGHTS', 'IMAGE', 'PROFILE', 'TWEET', 'UNKNOWN','VIDEO','WEBSITE'] - #known_content_reader_types = ['WEB', 'PDF', 'EPUB'] - # known_directionality_types = ['LTR', 'RTL'] - - folder_dummies = pd.get_dummies(df['item_folder'], columns=known_subscription_types, prefix='item_folder') - subscription_type_dummies = pd.get_dummies(df['item_subscription_type'], columns=known_subscription_types, prefix='item_subscription_type') - - # item_type_dummies = pd.get_dummies(df['item_type'], columns=known_item_types, prefix='item_type') - # content_reader_dummies = pd.get_dummies(df['content_reader'], columns=known_content_reader_types, prefix='content_reader') - # directionality_dummies = pd.get_dummies(df['directionality'], columns=known_directionality_types, prefix='directionality') - # language_dummies = pd.get_dummies(df['language'], prefix='language') - - # if 'title_topic' in df.columns: - # title_topic_dummies = pd.get_dummies(df['title_topic'], prefix='title_topic') - - new_feature_names = list(subscription_type_dummies.columns) + list(folder_dummies.columns) - print("NEW FEATURE NAMES: ", new_feature_names) - # new_feature_names = list(item_type_dummies.columns) + list(content_reader_dummies.columns) + \ - # list(directionality_dummies.columns) + list(language_dummies.columns) - - # if 'title_topic' in df.columns: - # new_feature_names += list(title_topic_dummies.columns) - # , title_topic_dummies - return pd.concat([df, subscription_type_dummies, folder_dummies], axis=1), new_feature_names - - -def random_forest_predictor(df, feature_columns, user_interaction): - features = df[feature_columns] - - features = features.fillna(0) - target = df[user_interaction] - - X = features - y = target.values - X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) - scaler = StandardScaler() - rf_classifier = RandomForestClassifier(n_estimators=50, max_depth=10, random_state=42) - - pipeline = PMMLPipeline([ - ("scaler", scaler), - ("classifier", rf_classifier) - ]) - pipeline.fit(X_train, y_train) - - y_pred = pipeline.predict(X_test) - - feature_importance = rf_classifier.feature_importances_ - - print("Feature Importance:") - for feature, importance in zip(feature_columns, feature_importance): - print(f"{feature}: {importance}") - - print("\nClassification Report:") - print(classification_report(y_test, y_pred)) - - return pipeline - - -def save_and_upload_model(pipeline, target_interaction_type, bucket_name): - pipeline_file_name = f'predict_{target_interaction_type}_random_forest_pipeline-v001.pkl' - joblib.dump(pipeline, pipeline_file_name) - - if bucket_name: - upload_to_gcs(bucket_name, pipeline_file_name, f'models/{pipeline_file_name}') - else: - print("No GCS credentials so i am not uploading") +def download_from_gcs(bucket_name, source_blob_name, destination_file_name): + client = storage.Client() + bucket = client.bucket(bucket_name) + blob = bucket.blob(source_blob_name) + blob.download_to_filename(destination_file_name) + print(f'Blob {source_blob_name} downloaded to {destination_file_name}.') def upload_to_gcs(bucket_name, source_file_name, destination_blob_name): - """Uploads a file to the bucket.""" storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(destination_blob_name) blob.upload_from_filename(source_file_name) - print(f"File {source_file_name} uploaded to {destination_blob_name}.") -def resample_data(df): - print("Initial distribution:\n", df['user_clicked'].value_counts()) +def load_and_sample_library_items_from_parquet(raw_file_path, sample_size): + df = parquet_to_dataframe(raw_file_path) + sampled_df = df.sample(frac=sample_size, random_state=42) + return sampled_df - # Separate the majority and minority classes - df_majority = df[df['user_clicked'] == False] - df_minority = df[df['user_clicked'] == True] - # Resample the minority class - df_minority_oversampled = df_minority.sample(n=len(df_majority), replace=True, random_state=42) +def merge_user_preference_data(sampled_raw_df, feature_dict): + # Start with the sampled raw DataFrame + merged_df = sampled_raw_df - # Combine the majority class with the oversampled minority class - df_balanced = pd.concat([df_majority, df_minority_oversampled]) + # Iterate through the files in the feature directory + for key in feature_dict.keys(): + user_preference_df = feature_dict[key] + + # Determine the dimension to join on + if 'author' in key: + merge_keys = ['user_id', 'author'] + elif 'site' in key: + merge_keys = ['user_id', 'site'] + elif 'subscription' in key: + merge_keys = ['user_id', 'subscription'] + elif 'original_url_host' in key: + merge_keys = ['user_id', 'original_url_host'] + else: + print("skipping feature: ", key) + continue # Skip files that don't match expected patterns + + # Merge with the current user preference DataFrame + merged_df = pd.merge(merged_df, user_preference_df, on=merge_keys, how='left') - # Shuffle the DataFrame to mix the classes - df_balanced = df_balanced.sample(frac=1, random_state=42).reset_index(drop=True) + # Optionally, fill NaNs after each merge step to avoid growing NaNs + merged_df = merged_df.fillna(0) + + return merged_df - # Check the new distribution - print("Balanced distribution:\n", df_balanced['user_clicked'].value_counts()) +def prepare_data(df): + df['created_at'] = pd.to_datetime(df['created_at']) + df['subscription_start_date'] = pd.to_datetime(df['subscription_start_date'], errors='coerce') - # Display the first few rows of the balanced DataFrame - print(df_balanced.head()) + df['is_subscription'] = df['subscription'].apply(lambda x: 1 if pd.notna(x) and x != '' else 0) + df['has_author'] = df['author'].apply(lambda x: 1 if pd.notna(x) and x != '' else 0) + + # Calculate the days since subscribed + df['days_since_subscribed'] = (df['created_at'] - df['subscription_start_date']).dt.days + + # Handle cases where subscription_start_date is NaT (Not a Time) or negative + df['days_since_subscribed'] = df['days_since_subscribed'].apply(lambda x: x if x >= 0 else 0) + df['days_since_subscribed'] = df['days_since_subscribed'].fillna(0).astype(int) + + df['is_feed'] = df['subscription_type'].apply(lambda x: 1 if x == 'RSS' else 0) + df['is_newsletter'] = df['subscription_type'].apply(lambda x: 1 if x == 'NEWSLETTER' else 0) + + df = df.dropna(subset=['user_clicked']) + + # Fill NaNs in other columns with 0 (if any remain) + df = df.fillna(0) + + X = df[FEATURE_COLUMNS] # .drop(columns=['user_id', 'user_clicked']) + Y = df['user_clicked'] + + return X, Y + +def train_random_forest_model(X, Y): + model = RandomForestClassifier( + class_weight={0: 1, 1: 10}, + n_estimators=10, + max_depth=10, + random_state=42 + ) + + scaler = StandardScaler() + X_scaled = scaler.fit_transform(X) + + X_train, X_test, Y_train, Y_test = train_test_split(X_scaled, Y, test_size=0.3, random_state=42) + + pipeline = PMMLPipeline([ + ("scaler", scaler), + ("classifier", model) + ]) + + pipeline.fit(X_train, Y_train) + + Y_pred = pipeline.predict(X_test) + print_classification_report(Y_test, Y_pred) + print_feature_importance(X, model) + + return pipeline + + +def print_feature_importance(X, rf): + # Get feature importances + importances = rf.feature_importances_ + + # Get the indices of the features sorted by importance + indices = np.argsort(importances)[::-1] + + # Print the feature ranking + print("Feature ranking:") + + for f in range(X.shape[1]): + print(f"{f + 1}. feature {indices[f]} ({importances[indices[f]]:.4f}) - {X.columns[indices[f]]}") + + + +def print_classification_report(Y_test, Y_pred): + report = classification_report(Y_test, Y_pred, target_names=['Not Clicked', 'Clicked'], output_dict=True) + print("Classification Report:") + print(f"Accuracy: {report['accuracy']:.4f}") + print(f"Precision (Not Clicked): {report['Not Clicked']['precision']:.4f}") + print(f"Recall (Not Clicked): {report['Not Clicked']['recall']:.4f}") + print(f"F1-Score (Not Clicked): {report['Not Clicked']['f1-score']:.4f}") + print(f"Precision (Clicked): {report['Clicked']['precision']:.4f}") + print(f"Recall (Clicked): {report['Clicked']['recall']:.4f}") + print(f"F1-Score (Clicked): {report['Clicked']['f1-score']:.4f}") - return df_balanced def main(): - sample_size = int(os.getenv('SAMPLE_SIZE')) or 1000 - num_days_history = int(os.getenv('NUM_DAYS_HISTORY')) or 21 - gcs_bucket = os.getenv('GCS_BUCKET') + execution_date = os.getenv('EXECUTION_DATE') + num_days_history = os.getenv('NUM_DAYS_HISTORY') + gcs_bucket_name = os.getenv('GCS_BUCKET') - print("about to fetch library data") - df = fetch_data(sample_size) - print("FETCHED", df) + raw_data_path = f'raw_library_items_${execution_date}.parquet' + user_history_path = 'features_user_features.pkl' + pipeline_path = 'predict_read_pipeline-v002.pkl' - df = add_user_features(df, 'author', 'user_30d_interactions_author') - df = add_user_features(df, 'site', 'user_30d_interactions_site') - df = add_user_features(df, 'subscription', 'user_30d_interactions_subscription') - df = add_global_features(df, 'site', 'global_30d_interactions_site') - df = add_global_features(df, 'author', 'global_30d_interactions_author') - df = add_global_features(df, 'subscription', 'global_30d_interactions_subscription') + download_from_gcs(gcs_bucket_name, f'data/features/user_features.pkl', user_history_path) + download_from_gcs(gcs_bucket_name, f'data/raw/library_items_{execution_date}.parquet', raw_data_path) - df = resample_data(df) - print("training RandomForest with number of library_items: ", len(df)) - pipeline = random_forest_predictor(df, TRAIN_FEATURES, 'user_clicked') + sampled_raw_df = load_and_sample_library_items_from_parquet(raw_data_path, 0.10) + user_history = load_dataframes_from_pickle(user_history_path) - print(f"uploading model and scaler to {gcs_bucket}") - save_and_upload_model(pipeline, 'user_clicked', gcs_bucket) + merged_df = merge_user_preference_data(sampled_raw_df, user_history) + + print("created merged data", merged_df.columns) + + X, Y = prepare_data(merged_df) + random_forest_pipeline = train_random_forest_model(X, Y) + save_to_pickle(random_forest_pipeline, pipeline_path) + upload_to_gcs(gcs_bucket_name, pipeline_path, f'data/models/{pipeline_path}') - print("done") if __name__ == "__main__": main() \ No newline at end of file diff --git a/packages/api/test/resolvers/article_saving_request.test.ts b/packages/api/test/resolvers/article_saving_request.test.ts index 9df8b57f5..c47099bd1 100644 --- a/packages/api/test/resolvers/article_saving_request.test.ts +++ b/packages/api/test/resolvers/article_saving_request.test.ts @@ -97,15 +97,15 @@ describe('ArticleSavingRequest API', () => { ).to.eql(ArticleSavingRequestStatus.Processing) }) - it('creates a library item in db', async () => { - const url = 'https://blog.omnivore.app/1' - await graphqlRequest( - createArticleSavingRequestMutation('https://blog.omnivore.app/1'), + it('returns an error if the url is invalid', async () => { + const res = await graphqlRequest( + createArticleSavingRequestMutation('invalid url'), authToken ).expect(200) - const item = await findLibraryItemByUrl(url, user.id) - expect(item?.readableContent).to.eql('Your link is being saved...') + expect(res.body.data.createArticleSavingRequest.errorCodes).to.eql([ + CreateArticleSavingRequestErrorCode.BadData, + ]) }) it('returns an error if the url is invalid', async () => {