Merge pull request #4149 from omnivore-app/feat/ml-score-v3
V3 scoring model, add prom monitoring
This commit is contained in:
@ -1,5 +1,6 @@
|
|||||||
import logging
|
import logging
|
||||||
from flask import Flask, request, jsonify
|
from flask import Flask, request, jsonify
|
||||||
|
from prometheus_client import start_http_server, Histogram, Summary, Counter, generate_latest
|
||||||
|
|
||||||
from typing import List
|
from typing import List
|
||||||
from timeit import default_timer as timer
|
from timeit import default_timer as timer
|
||||||
@ -7,7 +8,6 @@ from timeit import default_timer as timer
|
|||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import json
|
import json
|
||||||
import pytz
|
|
||||||
import pickle
|
import pickle
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
@ -16,17 +16,53 @@ from urllib.parse import urlparse
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import dateutil.parser
|
import dateutil.parser
|
||||||
from google.cloud import storage
|
from google.cloud import storage
|
||||||
|
|
||||||
|
import concurrent.futures
|
||||||
|
from threading import Lock, RLock
|
||||||
|
from collections import ChainMap
|
||||||
|
import copy
|
||||||
|
|
||||||
from features.user_history import FEATURE_COLUMNS
|
from features.user_history import FEATURE_COLUMNS
|
||||||
|
from auth import user_token_required, admin_token_required
|
||||||
|
|
||||||
|
|
||||||
|
class ThreadSafeUserFeatures:
|
||||||
|
def __init__(self):
|
||||||
|
self._data = {}
|
||||||
|
self._lock = RLock()
|
||||||
|
|
||||||
|
def get(self):
|
||||||
|
with self._lock:
|
||||||
|
return dict(self._data)
|
||||||
|
|
||||||
|
def update(self, new_features):
|
||||||
|
with self._lock:
|
||||||
|
self._data.update(new_features)
|
||||||
|
|
||||||
|
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
|
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
|
||||||
|
|
||||||
|
|
||||||
USER_HISTORY_PATH = 'user_features.pkl'
|
USER_HISTORY_PATH = 'user_features.pkl'
|
||||||
MODEL_PIPELINE_PATH = 'predict_read_pipeline-v002.pkl'
|
MODEL_PIPELINE_PATH = 'predict_read_model-v003.pkl'
|
||||||
|
|
||||||
pipeline = None
|
pipeline = None
|
||||||
user_features = None
|
user_features_store = ThreadSafeUserFeatures()
|
||||||
|
|
||||||
|
|
||||||
|
# these buckets are used for reporting scores, we want to make sure
|
||||||
|
# there is decent diversity in the returned scores.
|
||||||
|
score_bucket_ranges = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
|
||||||
|
score_buckets = {
|
||||||
|
f'score_bucket_{int(b * 10)}': Counter(f'inference_score_bucket_{int(b * 10)}', f'Number of scores in the range {b - 0.1:.1f} to {b:.1f}')
|
||||||
|
for b in score_bucket_ranges
|
||||||
|
}
|
||||||
|
|
||||||
|
def observe_score(score):
|
||||||
|
for b in score_bucket_ranges:
|
||||||
|
if b - 0.1 < score <= b:
|
||||||
|
score_buckets[f'score_bucket_{int(b * 10)}'].inc()
|
||||||
|
break
|
||||||
|
|
||||||
def download_from_gcs(bucket_name, gcs_path, destination_path):
|
def download_from_gcs(bucket_name, gcs_path, destination_path):
|
||||||
storage_client = storage.Client()
|
storage_client = storage.Client()
|
||||||
@ -72,31 +108,39 @@ def merge_dicts(dict1, dict2):
|
|||||||
dict1[key] = value
|
dict1[key] = value
|
||||||
return dict1
|
return dict1
|
||||||
|
|
||||||
|
def predict_proba_wrapper(X):
|
||||||
|
return pipeline.predict_proba(X)
|
||||||
|
|
||||||
|
|
||||||
def refresh_data():
|
def refresh_data():
|
||||||
start = timer()
|
start = timer()
|
||||||
global pipeline
|
global pipeline
|
||||||
global user_features
|
if os.getenv('LOAD_LOCAL_MODEL') == None:
|
||||||
if os.getenv('LOAD_LOCAL_MODEL') != None:
|
app.logger.info(f"loading data from {os.getenv('GCS_BUCKET')}")
|
||||||
gcs_bucket_name = os.getenv('GCS_BUCKET')
|
gcs_bucket_name = os.getenv('GCS_BUCKET')
|
||||||
download_from_gcs(gcs_bucket_name, f'data/features/user_features.pkl', USER_HISTORY_PATH)
|
download_from_gcs(gcs_bucket_name, f'data/features/{USER_HISTORY_PATH}', USER_HISTORY_PATH)
|
||||||
download_from_gcs(gcs_bucket_name, f'data/models/predict_read_pipeline-v002.pkl', MODEL_PIPELINE_PATH)
|
download_from_gcs(gcs_bucket_name, f'data/models/{MODEL_PIPELINE_PATH}', MODEL_PIPELINE_PATH)
|
||||||
pipeline = load_pipeline(MODEL_PIPELINE_PATH)
|
pipeline = load_pipeline(MODEL_PIPELINE_PATH)
|
||||||
user_features = load_user_features(USER_HISTORY_PATH)
|
|
||||||
end = timer()
|
new_features = load_user_features(USER_HISTORY_PATH)
|
||||||
print('time to refresh data (in seconds):', end - start)
|
user_features_store.update(new_features)
|
||||||
print('loaded pipeline:', pipeline)
|
|
||||||
print('loaded number of user_features:', len(user_features))
|
app.logger.info(f'time to refresh data (in seconds): {timer() - start}')
|
||||||
|
app.logger.info(f'loaded pipeline: {pipeline}')
|
||||||
|
app.logger.info(f'loaded number of user_features: {len(new_features)}')
|
||||||
|
|
||||||
|
|
||||||
def compute_score(user_id, item_features):
|
def compute_score(user_id, item_features, user_features):
|
||||||
interaction_score = compute_interaction_score(user_id, item_features)
|
interaction_score = compute_interaction_score(user_id, item_features, user_features)
|
||||||
|
observe_score(interaction_score)
|
||||||
return {
|
return {
|
||||||
'score': interaction_score,
|
'score': interaction_score,
|
||||||
'interaction_score': interaction_score,
|
'interaction_score': interaction_score,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def compute_interaction_score(user_id, item_features):
|
def compute_interaction_score(user_id, item_features, user_features):
|
||||||
|
start = timer()
|
||||||
original_url_host = urlparse(item_features.get('original_url')).netloc
|
original_url_host = urlparse(item_features.get('original_url')).netloc
|
||||||
df_test = pd.DataFrame([{
|
df_test = pd.DataFrame([{
|
||||||
'user_id': user_id,
|
'user_id': user_id,
|
||||||
@ -134,35 +178,69 @@ def compute_interaction_score(user_id, item_features):
|
|||||||
else:
|
else:
|
||||||
print("skipping feature: ", name)
|
print("skipping feature: ", name)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
df_test = pd.merge(df_test, df, on=merge_keys, how='left')
|
df_test = pd.merge(df_test, df, on=merge_keys, how='left')
|
||||||
df_test = df_test.fillna(0)
|
df_test = df_test.fillna(0)
|
||||||
df_predict = df_test[FEATURE_COLUMNS]
|
df_predict = df_test[FEATURE_COLUMNS]
|
||||||
|
|
||||||
|
infer_start = timer()
|
||||||
interaction_score = pipeline.predict_proba(df_predict)
|
interaction_score = pipeline.predict_proba(df_predict)
|
||||||
print('score', interaction_score, 'item_features', df_test[df_test != 0].stack())
|
app.logger.info(f'time to call infer (in seconds): {timer() - infer_start}')
|
||||||
|
|
||||||
|
app.logger.info(f'INTERACTION SCORE: {interaction_score}')
|
||||||
|
app.logger.info(f'item_features:\n{df_predict[df_predict != 0].stack()}')
|
||||||
|
app.logger.info(f'time to compute score (in seconds): {timer() - start}')
|
||||||
|
|
||||||
|
return np.float64(interaction_score[0][1])
|
||||||
|
|
||||||
|
|
||||||
|
def process_parallel_item(user_id, key, item, user_features):
|
||||||
|
library_item_id = item['library_item_id']
|
||||||
|
return library_item_id, compute_score(user_id, item, user_features)
|
||||||
|
|
||||||
|
def parallel_compute_scores(user_id, items, max_workers=None):
|
||||||
|
user_features = user_features_store.get()
|
||||||
|
result = {}
|
||||||
|
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||||
|
future_to_item = {executor.submit(process_parallel_item, user_id, key, item, user_features): (key, item)
|
||||||
|
for key, item in items.items()}
|
||||||
|
|
||||||
|
for future in concurrent.futures.as_completed(future_to_item):
|
||||||
|
key, item = future_to_item[future]
|
||||||
|
try:
|
||||||
|
library_item_id, score = future.result()
|
||||||
|
result[library_item_id] = score
|
||||||
|
except Exception as exc:
|
||||||
|
app.logger.error(f'Item {key} generated an exception: {exc}')
|
||||||
|
return result
|
||||||
|
|
||||||
return interaction_score[0][1]
|
|
||||||
|
|
||||||
|
|
||||||
@app.route('/_ah/health', methods=['GET'])
|
@app.route('/_ah/health', methods=['GET'])
|
||||||
def ready():
|
def ready():
|
||||||
return jsonify({'OK': 'yes'}), 200
|
return jsonify({'OK': 'yes'}), 200
|
||||||
|
|
||||||
|
@app.route('/metrics')
|
||||||
|
def metrics():
|
||||||
|
return generate_latest(), 200, {'Content-Type': 'text/plain; charset=utf-8'}
|
||||||
|
|
||||||
|
|
||||||
@app.route('/refresh', methods=['GET'])
|
@app.route('/refresh', methods=['GET'])
|
||||||
|
@admin_token_required
|
||||||
def refresh():
|
def refresh():
|
||||||
refresh_data()
|
refresh_data()
|
||||||
return jsonify({'OK': 'yes'}), 200
|
return jsonify({'OK': 'yes'}), 200
|
||||||
|
|
||||||
|
|
||||||
@app.route('/users/<user_id>/features', methods=['GET'])
|
@app.route('/users/<user_id>/features', methods=['GET'])
|
||||||
|
@admin_token_required
|
||||||
def get_user_features(user_id):
|
def get_user_features(user_id):
|
||||||
result = {}
|
result = {}
|
||||||
df_user = pd.DataFrame([{
|
df_user = pd.DataFrame([{
|
||||||
'user_id': user_id,
|
'user_id': user_id,
|
||||||
}])
|
}])
|
||||||
|
|
||||||
|
user_features = user_features_store.get()
|
||||||
|
|
||||||
user_data = {}
|
user_data = {}
|
||||||
for name, df in user_features.items():
|
for name, df in user_features.items():
|
||||||
df = df[df['user_id'] == user_id]
|
df = df[df['user_id'] == user_id]
|
||||||
@ -173,18 +251,20 @@ def get_user_features(user_id):
|
|||||||
|
|
||||||
|
|
||||||
@app.route('/predict', methods=['POST'])
|
@app.route('/predict', methods=['POST'])
|
||||||
|
@user_token_required
|
||||||
def predict():
|
def predict():
|
||||||
try:
|
try:
|
||||||
data = request.get_json()
|
data = request.get_json()
|
||||||
app.logger.info(f"predict scoring request: {data}")
|
app.logger.info(f"predict scoring request: {data}")
|
||||||
|
|
||||||
user_id = data.get('user_id')
|
user_id = request.user_id
|
||||||
item_features = data.get('item_features')
|
item_features = data.get('item_features')
|
||||||
|
|
||||||
if user_id is None:
|
if user_id is None:
|
||||||
return jsonify({'error': 'Missing user_id'}), 400
|
return jsonify({'error': 'Missing user_id'}), 400
|
||||||
|
|
||||||
score = compute_score(user_id, item_features)
|
user_features = user_features_store.get()
|
||||||
|
score = compute_score(user_id, item_features, user_features)
|
||||||
return jsonify({'score': score})
|
return jsonify({'score': score})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
app.logger.error(f"exception in predict endpoint: {request.get_json()}\n{e}")
|
app.logger.error(f"exception in predict endpoint: {request.get_json()}\n{e}")
|
||||||
@ -192,24 +272,20 @@ def predict():
|
|||||||
|
|
||||||
|
|
||||||
@app.route('/batch', methods=['POST'])
|
@app.route('/batch', methods=['POST'])
|
||||||
|
@user_token_required
|
||||||
def batch():
|
def batch():
|
||||||
|
start = timer()
|
||||||
try:
|
try:
|
||||||
result = {}
|
|
||||||
data = request.get_json()
|
data = request.get_json()
|
||||||
app.logger.info(f"batch scoring request: {data}")
|
|
||||||
|
|
||||||
user_id = data.get('user_id')
|
|
||||||
items = data.get('items')
|
items = data.get('items')
|
||||||
|
user_id = request.user_id
|
||||||
|
if user_id == None:
|
||||||
|
return jsonify({'error': 'no user_id supplied'}), 400
|
||||||
|
if len(items) > 101:
|
||||||
|
return jsonify({'error': f'too many items: {len(items)}'}), 400
|
||||||
|
result = parallel_compute_scores(user_id, items)
|
||||||
|
|
||||||
if user_id is None:
|
app.logger.info(f'time to compute batch of {len(items)} items (in seconds): {timer() - start}')
|
||||||
return jsonify({'error': 'Missing user_id'}), 400
|
|
||||||
|
|
||||||
for key, item in items.items():
|
|
||||||
print('key": ', key)
|
|
||||||
print('item: ', item)
|
|
||||||
library_item_id = item['library_item_id']
|
|
||||||
result[library_item_id] = compute_score(user_id, item)
|
|
||||||
|
|
||||||
return jsonify(result)
|
return jsonify(result)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
app.logger.error(f"exception in batch endpoint: {request.get_json()}\n{e}")
|
app.logger.error(f"exception in batch endpoint: {request.get_json()}\n{e}")
|
||||||
|
|||||||
58
ml/digest-score/auth.py
Normal file
58
ml/digest-score/auth.py
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
import os
|
||||||
|
import jwt
|
||||||
|
from flask import request, jsonify
|
||||||
|
from functools import wraps
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
|
|
||||||
|
SECRET_KEY = os.getenv('JWT_SECRET')
|
||||||
|
ADMIN_SECRET_KEY = os.getenv('JWT_ADMIN_SECRET_KEY')
|
||||||
|
|
||||||
|
def generate_admin_token():
|
||||||
|
expiration_time = datetime.utcnow() + timedelta(minutes=5)
|
||||||
|
payload = {
|
||||||
|
'role': 'admin',
|
||||||
|
'exp': expiration_time
|
||||||
|
}
|
||||||
|
|
||||||
|
token = jwt.encode(payload, ADMIN_SECRET_KEY, algorithm="HS256")
|
||||||
|
return token
|
||||||
|
|
||||||
|
|
||||||
|
def user_token_required(f):
|
||||||
|
@wraps(f)
|
||||||
|
def decorated(*args, **kwargs):
|
||||||
|
token = None
|
||||||
|
if 'Authorization' in request.headers:
|
||||||
|
print("request.headers['Authorization'].split(" ")[1]", request.headers['Authorization'].split(" ")[1])
|
||||||
|
token = request.headers['Authorization'].split(" ")[1]
|
||||||
|
if not token:
|
||||||
|
return jsonify({'message': 'Token is missing!'}), 401
|
||||||
|
try:
|
||||||
|
data = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
|
||||||
|
request.user_id = data['uid']
|
||||||
|
except jwt.ExpiredSignatureError:
|
||||||
|
return jsonify({'message': 'Token has expired!'}), 401
|
||||||
|
except jwt.InvalidTokenError:
|
||||||
|
return jsonify({'message': 'Token is invalid!'}), 401
|
||||||
|
return f(*args, **kwargs)
|
||||||
|
return decorated
|
||||||
|
|
||||||
|
def admin_token_required(f):
|
||||||
|
@wraps(f)
|
||||||
|
def decorated(*args, **kwargs):
|
||||||
|
token = None
|
||||||
|
if 'Authorization' in request.headers:
|
||||||
|
token = request.headers['Authorization'].split(" ")[1]
|
||||||
|
if not token:
|
||||||
|
return jsonify({'message': 'Token is missing!'}), 401
|
||||||
|
try:
|
||||||
|
data = jwt.decode(token, ADMIN_SECRET_KEY, algorithms=["HS256"])
|
||||||
|
if data['role'] != 'admin':
|
||||||
|
return jsonify({'message': 'Admin token required!'}), 403
|
||||||
|
except jwt.ExpiredSignatureError:
|
||||||
|
return jsonify({'message': 'Token has expired!'}), 401
|
||||||
|
except jwt.InvalidTokenError:
|
||||||
|
return jsonify({'message': 'Token is invalid!'}), 401
|
||||||
|
return f(*args, **kwargs)
|
||||||
|
return decorated
|
||||||
@ -7,6 +7,7 @@ from datetime import datetime, timedelta
|
|||||||
import os
|
import os
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
import tempfile
|
import tempfile
|
||||||
|
import requests
|
||||||
|
|
||||||
import pyarrow as pa
|
import pyarrow as pa
|
||||||
import pyarrow.parquet as pq
|
import pyarrow.parquet as pq
|
||||||
@ -15,16 +16,41 @@ from google.cloud import storage
|
|||||||
from features.extract import extract_and_upload_raw_data
|
from features.extract import extract_and_upload_raw_data
|
||||||
from features.user_history import generate_and_upload_user_history
|
from features.user_history import generate_and_upload_user_history
|
||||||
|
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from auth import generate_admin_token
|
||||||
|
|
||||||
|
|
||||||
|
def call_refresh_api(api):
|
||||||
|
headers = {
|
||||||
|
'Authorization': f'Bearer {generate_admin_token()}'
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
response = requests.get(api, headers=headers, timeout=10)
|
||||||
|
if response.status_code == 200:
|
||||||
|
print("scoring service refreshed")
|
||||||
|
else:
|
||||||
|
print(f"failed to refresh scoring service: {response.status_code}")
|
||||||
|
except requests.exceptions.Timeout:
|
||||||
|
print(f"The request timed out after {timeout} seconds")
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
print(f"An error occurred while refreshing scoring service: {e}")
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
execution_date = os.getenv('EXECUTION_DATE')
|
score_service = os.getenv("SCORING_SERVICE_URL")
|
||||||
num_days_history = os.getenv('NUM_DAYS_HISTORY')
|
num_days_history = os.getenv('NUM_DAYS_HISTORY')
|
||||||
gcs_bucket_name = os.getenv('GCS_BUCKET')
|
gcs_bucket_name = os.getenv('GCS_BUCKET')
|
||||||
|
|
||||||
|
current_date_utc = datetime.now(timezone.utc)
|
||||||
|
execution_date = current_date_utc.strftime("%Y-%m-%d")
|
||||||
|
print(f'updating features using execution date: {execution_date}')
|
||||||
|
|
||||||
extract_and_upload_raw_data(execution_date, num_days_history, gcs_bucket_name)
|
extract_and_upload_raw_data(execution_date, num_days_history, gcs_bucket_name)
|
||||||
generate_and_upload_user_history(execution_date, gcs_bucket_name)
|
generate_and_upload_user_history(execution_date, gcs_bucket_name)
|
||||||
|
|
||||||
|
if score_service:
|
||||||
|
call_refresh_api(score_service)
|
||||||
|
|
||||||
print("done")
|
print("done")
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
@ -17,65 +17,30 @@ import pyarrow.feather as feather
|
|||||||
from google.cloud import storage
|
from google.cloud import storage
|
||||||
|
|
||||||
FEATURE_COLUMNS=[
|
FEATURE_COLUMNS=[
|
||||||
# targets
|
'user_subscription_rate_week_1',
|
||||||
# 'user_clicked', 'user_read', 'user_long_read',
|
'user_subscription_proportion_week_1',
|
||||||
|
'user_site_rate_week_3',
|
||||||
# item attributes / user setup attributes
|
'user_site_proportion_week_3',
|
||||||
'item_word_count','item_has_site_icon', 'is_subscription',
|
'user_site_rate_week_2',
|
||||||
'inbox_folder', 'has_author',
|
'user_site_proportion_week_2',
|
||||||
|
'user_subscription_rate_week_2',
|
||||||
# how the user has setup the subscription
|
'user_subscription_proportion_week_2',
|
||||||
'is_newsletter', 'is_feed', 'days_since_subscribed',
|
'user_site_rate_week_1',
|
||||||
'subscription_count', 'subscription_auto_add_to_library',
|
'user_site_proportion_week_1',
|
||||||
'subscription_fetch_content',
|
'user_subscription_rate_week_3',
|
||||||
|
'user_subscription_proportion_week_3',
|
||||||
# user/item interaction history
|
'user_author_rate_week_4',
|
||||||
'user_original_url_host_saved_count_week_1',
|
'user_author_proportion_week_4',
|
||||||
'user_original_url_host_interaction_count_week_1',
|
'user_author_rate_week_1',
|
||||||
'user_original_url_host_rate_week_1',
|
'user_author_proportion_week_1',
|
||||||
'user_original_url_host_proportion_week_1',
|
'user_site_rate_week_4',
|
||||||
|
'user_site_proportion_week_4',
|
||||||
'user_original_url_host_saved_count_week_2',
|
'user_author_rate_week_2',
|
||||||
'user_original_url_host_interaction_count_week_2',
|
'user_author_proportion_week_2',
|
||||||
'user_original_url_host_rate_week_2',
|
'user_author_rate_week_3',
|
||||||
'user_original_url_host_proportion_week_2',
|
'user_author_proportion_week_3',
|
||||||
'user_original_url_host_saved_count_week_3',
|
'user_subscription_rate_week_4',
|
||||||
'user_original_url_host_interaction_count_week_3',
|
'user_subscription_proportion_week_4'
|
||||||
'user_original_url_host_rate_week_3',
|
|
||||||
'user_original_url_host_proportion_week_3',
|
|
||||||
'user_original_url_host_saved_count_week_4',
|
|
||||||
'user_original_url_host_interaction_count_week_4',
|
|
||||||
'user_original_url_host_rate_week_4',
|
|
||||||
'user_original_url_host_proportion_week_4',
|
|
||||||
|
|
||||||
'user_subscription_saved_count_week_1',
|
|
||||||
'user_subscription_interaction_count_week_1',
|
|
||||||
'user_subscription_rate_week_1', 'user_subscription_proportion_week_1',
|
|
||||||
'user_site_saved_count_week_3', 'user_site_interaction_count_week_3',
|
|
||||||
'user_site_rate_week_3', 'user_site_proportion_week_3',
|
|
||||||
'user_site_saved_count_week_2', 'user_site_interaction_count_week_2',
|
|
||||||
'user_site_rate_week_2', 'user_site_proportion_week_2',
|
|
||||||
'user_subscription_saved_count_week_2',
|
|
||||||
'user_subscription_interaction_count_week_2',
|
|
||||||
'user_subscription_rate_week_2', 'user_subscription_proportion_week_2',
|
|
||||||
'user_site_saved_count_week_1', 'user_site_interaction_count_week_1',
|
|
||||||
'user_site_rate_week_1', 'user_site_proportion_week_1',
|
|
||||||
'user_subscription_saved_count_week_3',
|
|
||||||
'user_subscription_interaction_count_week_3',
|
|
||||||
'user_subscription_rate_week_3', 'user_subscription_proportion_week_3',
|
|
||||||
'user_author_saved_count_week_4',
|
|
||||||
'user_author_interaction_count_week_4', 'user_author_rate_week_4',
|
|
||||||
'user_author_proportion_week_4', 'user_author_saved_count_week_1',
|
|
||||||
'user_author_interaction_count_week_1', 'user_author_rate_week_1',
|
|
||||||
'user_author_proportion_week_1', 'user_site_saved_count_week_4',
|
|
||||||
'user_site_interaction_count_week_4', 'user_site_rate_week_4',
|
|
||||||
'user_site_proportion_week_4', 'user_author_saved_count_week_2',
|
|
||||||
'user_author_interaction_count_week_2', 'user_author_rate_week_2',
|
|
||||||
'user_author_proportion_week_2', 'user_author_saved_count_week_3',
|
|
||||||
'user_author_interaction_count_week_3', 'user_author_rate_week_3',
|
|
||||||
'user_author_proportion_week_3', 'user_subscription_saved_count_week_4',
|
|
||||||
'user_subscription_interaction_count_week_4',
|
|
||||||
'user_subscription_rate_week_4', 'user_subscription_proportion_week_4'
|
|
||||||
]
|
]
|
||||||
|
|
||||||
def parquet_to_dataframe(file_path):
|
def parquet_to_dataframe(file_path):
|
||||||
|
|||||||
@ -6,5 +6,9 @@ google-cloud-storage
|
|||||||
flask
|
flask
|
||||||
pydantic
|
pydantic
|
||||||
sklearn2pmml
|
sklearn2pmml
|
||||||
sqlalchemy
|
sqlalchemy
|
||||||
pyarrow
|
pyarrow
|
||||||
|
requests
|
||||||
|
PyJWT
|
||||||
|
prometheus_client
|
||||||
|
xgboost==2.1.0
|
||||||
|
|||||||
@ -1,17 +1,11 @@
|
|||||||
import pandas as pd
|
|
||||||
import os
|
import os
|
||||||
|
import pandas as pd
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
from sklearn.linear_model import SGDClassifier
|
import xgboost as xgb
|
||||||
from sklearn.ensemble import RandomForestClassifier, VotingClassifier
|
from sklearn.metrics import classification_report
|
||||||
|
|
||||||
from sklearn.preprocessing import StandardScaler
|
|
||||||
|
|
||||||
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report, confusion_matrix
|
|
||||||
from sklearn.utils import shuffle
|
|
||||||
from sklearn.model_selection import train_test_split
|
from sklearn.model_selection import train_test_split
|
||||||
from sklearn2pmml import PMMLPipeline, sklearn2pmml
|
|
||||||
|
|
||||||
from google.cloud import storage
|
from google.cloud import storage
|
||||||
from google.cloud.exceptions import PreconditionFailed
|
from google.cloud.exceptions import PreconditionFailed
|
||||||
@ -23,13 +17,6 @@ import pyarrow.feather as feather
|
|||||||
|
|
||||||
from features.user_history import FEATURE_COLUMNS
|
from features.user_history import FEATURE_COLUMNS
|
||||||
|
|
||||||
DB_PARAMS = {
|
|
||||||
'dbname': os.getenv('DB_NAME') or 'omnivore',
|
|
||||||
'user': os.getenv('DB_USER'),
|
|
||||||
'password': os.getenv('DB_PASSWORD'),
|
|
||||||
'host': os.getenv('DB_HOST') or 'localhost',
|
|
||||||
'port': os.getenv('DB_PORT') or '5432'
|
|
||||||
}
|
|
||||||
|
|
||||||
def parquet_to_dataframe(file_path):
|
def parquet_to_dataframe(file_path):
|
||||||
table = pq.read_table(file_path)
|
table = pq.read_table(file_path)
|
||||||
@ -122,58 +109,15 @@ def prepare_data(df):
|
|||||||
|
|
||||||
return X, Y
|
return X, Y
|
||||||
|
|
||||||
def train_random_forest_model(X, Y):
|
|
||||||
model = RandomForestClassifier(
|
|
||||||
class_weight={0: 1, 1: 10},
|
|
||||||
n_estimators=10,
|
|
||||||
max_depth=10,
|
|
||||||
random_state=42
|
|
||||||
)
|
|
||||||
|
|
||||||
scaler = StandardScaler()
|
def train_xgb_model(X, Y):
|
||||||
X_scaled = scaler.fit_transform(X)
|
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=42)
|
||||||
|
model = xgb.XGBClassifier(max_depth=7, n_estimators=5)
|
||||||
|
model.fit(X_train, y_train)
|
||||||
|
|
||||||
X_train, X_test, Y_train, Y_test = train_test_split(X_scaled, Y, test_size=0.3, random_state=42)
|
y_pred = model.predict(X_test)
|
||||||
|
print(classification_report(y_test, y_pred))
|
||||||
pipeline = PMMLPipeline([
|
return model
|
||||||
("scaler", scaler),
|
|
||||||
("classifier", model)
|
|
||||||
])
|
|
||||||
|
|
||||||
pipeline.fit(X_train, Y_train)
|
|
||||||
|
|
||||||
Y_pred = pipeline.predict(X_test)
|
|
||||||
print_classification_report(Y_test, Y_pred)
|
|
||||||
print_feature_importance(X, model)
|
|
||||||
|
|
||||||
return pipeline
|
|
||||||
|
|
||||||
|
|
||||||
def print_feature_importance(X, rf):
|
|
||||||
# Get feature importances
|
|
||||||
importances = rf.feature_importances_
|
|
||||||
|
|
||||||
# Get the indices of the features sorted by importance
|
|
||||||
indices = np.argsort(importances)[::-1]
|
|
||||||
|
|
||||||
# Print the feature ranking
|
|
||||||
print("Feature ranking:")
|
|
||||||
|
|
||||||
for f in range(X.shape[1]):
|
|
||||||
print(f"{f + 1}. feature {indices[f]} ({importances[indices[f]]:.4f}) - {X.columns[indices[f]]}")
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def print_classification_report(Y_test, Y_pred):
|
|
||||||
report = classification_report(Y_test, Y_pred, target_names=['Not Clicked', 'Clicked'], output_dict=True)
|
|
||||||
print("Classification Report:")
|
|
||||||
print(f"Accuracy: {report['accuracy']:.4f}")
|
|
||||||
print(f"Precision (Not Clicked): {report['Not Clicked']['precision']:.4f}")
|
|
||||||
print(f"Recall (Not Clicked): {report['Not Clicked']['recall']:.4f}")
|
|
||||||
print(f"F1-Score (Not Clicked): {report['Not Clicked']['f1-score']:.4f}")
|
|
||||||
print(f"Precision (Clicked): {report['Clicked']['precision']:.4f}")
|
|
||||||
print(f"Recall (Clicked): {report['Clicked']['recall']:.4f}")
|
|
||||||
print(f"F1-Score (Clicked): {report['Clicked']['f1-score']:.4f}")
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
@ -181,24 +125,23 @@ def main():
|
|||||||
num_days_history = os.getenv('NUM_DAYS_HISTORY')
|
num_days_history = os.getenv('NUM_DAYS_HISTORY')
|
||||||
gcs_bucket_name = os.getenv('GCS_BUCKET')
|
gcs_bucket_name = os.getenv('GCS_BUCKET')
|
||||||
|
|
||||||
raw_data_path = f'raw_library_items_${execution_date}.parquet'
|
raw_data_path = f'raw_library_items_{execution_date}.parquet'
|
||||||
user_history_path = 'features_user_features.pkl'
|
user_history_path = 'features_user_features.pkl'
|
||||||
pipeline_path = 'predict_read_pipeline-v002.pkl'
|
pipeline_path = 'predict_read_pipeline-v002.pkl'
|
||||||
|
model_path = 'predict_read_model-v003.pkl'
|
||||||
|
|
||||||
download_from_gcs(gcs_bucket_name, f'data/features/user_features.pkl', user_history_path)
|
download_from_gcs(gcs_bucket_name, f'data/features/user_features.pkl', user_history_path)
|
||||||
download_from_gcs(gcs_bucket_name, f'data/raw/library_items_{execution_date}.parquet', raw_data_path)
|
download_from_gcs(gcs_bucket_name, f'data/raw/library_items_{execution_date}.parquet', raw_data_path)
|
||||||
|
|
||||||
sampled_raw_df = load_and_sample_library_items_from_parquet(raw_data_path, 0.10)
|
sampled_raw_df = load_and_sample_library_items_from_parquet(raw_data_path, 0.95)
|
||||||
user_history = load_dataframes_from_pickle(user_history_path)
|
user_history = load_dataframes_from_pickle(user_history_path)
|
||||||
|
|
||||||
merged_df = merge_user_preference_data(sampled_raw_df, user_history)
|
merged_df = merge_user_preference_data(sampled_raw_df, user_history)
|
||||||
|
|
||||||
print("created merged data", merged_df.columns)
|
|
||||||
|
|
||||||
X, Y = prepare_data(merged_df)
|
X, Y = prepare_data(merged_df)
|
||||||
random_forest_pipeline = train_random_forest_model(X, Y)
|
xgb_model = train_xgb_model(X, Y)
|
||||||
save_to_pickle(random_forest_pipeline, pipeline_path)
|
save_to_pickle(xgb_model, model_path)
|
||||||
upload_to_gcs(gcs_bucket_name, pipeline_path, f'data/models/{pipeline_path}')
|
upload_to_gcs(gcs_bucket_name, model_path, f'data/models/{model_path}')
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
@ -3,6 +3,7 @@ import client from 'prom-client'
|
|||||||
import { env } from '../env'
|
import { env } from '../env'
|
||||||
import { registerMetric } from '../prometheus'
|
import { registerMetric } from '../prometheus'
|
||||||
import { logError } from '../utils/logger'
|
import { logError } from '../utils/logger'
|
||||||
|
import { createWebAuthToken } from '../routers/auth/jwt_helpers'
|
||||||
|
|
||||||
export interface Feature {
|
export interface Feature {
|
||||||
library_item_id?: string
|
library_item_id?: string
|
||||||
@ -79,8 +80,13 @@ class ScoreClientImpl implements ScoreClient {
|
|||||||
const start = Date.now()
|
const start = Date.now()
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
const authToken = await createWebAuthToken(data.user_id)
|
||||||
|
if (!authToken) {
|
||||||
|
throw Error('could not create auth token')
|
||||||
|
}
|
||||||
const response = await axios.post<ScoreApiResponse>(this.apiUrl, data, {
|
const response = await axios.post<ScoreApiResponse>(this.apiUrl, data, {
|
||||||
headers: {
|
headers: {
|
||||||
|
Authorization: `Bearer ${authToken}`,
|
||||||
'Content-Type': 'application/json',
|
'Content-Type': 'application/json',
|
||||||
},
|
},
|
||||||
timeout: 20000, // 20 seconds
|
timeout: 20000, // 20 seconds
|
||||||
|
|||||||
Reference in New Issue
Block a user