From 41e8ee3acabe625e0961527cd355fa8375b7de8b Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Tue, 19 Sep 2023 16:37:21 +0800 Subject: [PATCH] finish migration script --- .../migrate_from_elastic.py | 140 ++++++++++++++---- .../db/migrations/0120.do.entity_labels.sql | 3 +- 2 files changed, 110 insertions(+), 33 deletions(-) diff --git a/packages/db/elastic_migrations/migrate_from_elastic.py b/packages/db/elastic_migrations/migrate_from_elastic.py index 204edd6bc..8e0a93302 100644 --- a/packages/db/elastic_migrations/migrate_from_elastic.py +++ b/packages/db/elastic_migrations/migrate_from_elastic.py @@ -17,7 +17,7 @@ PG_DB = os.getenv('PG_DB', 'omnivore') ES_URL = os.getenv('ES_URL', 'http://localhost:9200') ES_USERNAME = os.getenv('ES_USERNAME', 'elastic') ES_PASSWORD = os.getenv('ES_PASSWORD', 'password') -ES_SCAN_SIZE = os.getenv('ES_SCAN_SIZE', 100) +ES_SCAN_SIZE = os.getenv('ES_SCAN_SIZE', 10000) ES_INDEX = os.getenv('ES_INDEX', 'pages_alias') CUT_OFF_DATE = os.getenv('CUT_OFF_DATE', '2000-01-01') @@ -31,7 +31,6 @@ def get_uuid(val): return val except ValueError: id = convert_string_to_uuid(val) - print('Converted string to uuid', val, id) return id @@ -121,8 +120,43 @@ async def insert_highlights(db_conn, highlights): print(f'Inserted {len(highlights)} highlights') +async def insert_labels(db_conn, labels): + insert_query = """ + INSERT INTO omnivore.entity_labels ( + label_id, library_item_id, highlight_id + ) + SELECT $1, $2, $3 + FROM omnivore.labels l + LEFT JOIN omnivore.library_item li ON li.id = $2 + LEFT JOIN omnivore.highlight h ON h.id = $3 + WHERE l.id = $1 AND ($2 IS NULL OR li.id = $2) AND ($3 IS NULL OR h.id = $3) + ON CONFLICT (label_id, library_item_id, highlight_id) DO NOTHING + """ + print('Inserting labels into postgres') + await insert_into_postgres(insert_query, db_conn, labels) + print(f'Inserted {len(labels)} labels') + + +async def insert_recommendations(db_conn, recommendations): + insert_query = """ + INSERT INTO omnivore.recommendation ( + library_item_id, recommender_id, group_id, note, created_at + ) + SELECT $1, $2, $3, $4, $5 + FROM omnivore.library_item li + INNER JOIN omnivore.user u ON u.id = $2 + INNER JOIN omnivore.group g ON g.id = $3 + WHERE li.id = $1 + ON CONFLICT (library_item_id, recommender_id, group_id) DO UPDATE SET + note = EXCLUDED.note, + created_at = EXCLUDED.created_at + """ + print('Inserting recommendations into postgres') + await insert_into_postgres(insert_query, db_conn, recommendations) + print(f'Inserted {len(recommendations)} recommendations') + + async def insert_into_postgres(insert_query, db_conn, records): - print('Inserting data into postgres') await db_conn.executemany(insert_query, records, timeout=60) @@ -134,6 +168,8 @@ def remove_null_bytes(val): async def main(): print('Starting migration') + migrated_at = CUT_OFF_DATE + print('Cut off date', CUT_OFF_DATE) # postgres connection db_conn = await asyncpg.connect(user=PG_USER, password=PG_PASSWORD, @@ -203,6 +239,8 @@ async def main(): } ] } + # get current time + migrated_at = datetime.utcnow().strftime(DATE_FORMAT) # Scan API for larger library docs = async_scan(es_client, index=ES_INDEX, query=query, preserve_order=True, size=ES_SCAN_SIZE, @@ -216,29 +254,6 @@ async def main(): # convert library items to postgres format source = doc['_source'] subscription = source['subscription'] if 'subscription' in source else source.get('rssFeedUrl', None) - # convert highlights to postgres format - if 'highlights' in source: - for highlight in source['highlights']: - highlights.append(( - get_uuid(highlight['id']), - get_uuid(highlight['userId']), - highlight.get('quote', None), - highlight.get('prefix', None), - highlight.get('suffix', None), - highlight.get('patch', None), - highlight.get('annotation', None), - convert_string_to_datetime(highlight['createdAt']), - convert_string_to_datetime(highlight.get('updatedAt', None)), - convert_string_to_datetime(highlight.get('sharedAt', None)), - highlight.get('shortId', None), - id, - highlight.get('highlightPositionPercent', 0), - highlight.get('highlightPositionAnchorIndex', 0), - highlight.get('type', 'HIGHLIGHT'), - highlight.get('color', None), - highlight.get('html', None), - )) - page_type = source['pageType'] content_reader = 'WEB' if 'uploadFileId' in source: @@ -278,22 +293,85 @@ async def main(): remove_null_bytes(source.get('originalHtml', None)), ) library_items.append(library_item) + + # convert labels to postgres format + if 'labels' in source: + for label in source['labels']: + labels.append(( + get_uuid(label['id']), + id, + None, + )) + + # convert highlights to postgres format + if 'highlights' in source: + for highlight in source['highlights']: + highlight_id = get_uuid(highlight['id']) + highlights.append(( + highlight_id, + get_uuid(highlight['userId']), + highlight.get('quote', None), + highlight.get('prefix', None), + highlight.get('suffix', None), + highlight.get('patch', None), + highlight.get('annotation', None), + convert_string_to_datetime(highlight['createdAt']), + convert_string_to_datetime(highlight.get('updatedAt', None)), + convert_string_to_datetime(highlight.get('sharedAt', None)), + highlight.get('shortId', None), + id, + highlight.get('highlightPositionPercent', 0), + highlight.get('highlightPositionAnchorIndex', 0), + highlight.get('type', 'HIGHLIGHT'), + highlight.get('color', None), + highlight.get('html', None), + )) + + if 'labels' in highlight: + for label in highlight['labels']: + labels.append(( + get_uuid(label['id']), + None, + highlight_id, + )) + + # convert recommendations to postgres format + if 'recommendations' in source: + for recommendation in source['recommendations']: + recommendations.append(( + id, + get_uuid(recommendation['user']['userId']), + get_uuid(recommendation['id']), + recommendation.get('note', None), + convert_string_to_datetime(recommendation['recommendedAt']), + )) + # copy to postgres every ES_SCAN_SIZE records if i % ES_SCAN_SIZE == 0: await insert_library_items(db_conn, library_items) + print('Copied', i, 'records to postgres') + library_items = [] if len(highlights) > 0: await insert_highlights(db_conn, highlights) highlights = [] - print('Copied', i, 'records to postgres') - library_items = [] + if len(labels) > 0: + await insert_labels(db_conn, labels) + labels = [] + if len(recommendations) > 0: + await insert_recommendations(db_conn, recommendations) + recommendations = [] # copy remaining records to postgres if len(library_items) > 0: await insert_library_items(db_conn, library_items) - if len(highlights) > 0: - await insert_highlights(db_conn, highlights) print('Copied', i, 'records to postgres') + if len(highlights) > 0: + await insert_highlights(db_conn, highlights) + if len(labels) > 0: + await insert_labels(db_conn, labels) + if len(recommendations) > 0: + await insert_recommendations(db_conn, recommendations) - print('Migration complete') + print('Migration complete', migrated_at) except Exception as err: print('Migration error', err) finally: diff --git a/packages/db/migrations/0120.do.entity_labels.sql b/packages/db/migrations/0120.do.entity_labels.sql index 2bfa8a499..62298d7d2 100755 --- a/packages/db/migrations/0120.do.entity_labels.sql +++ b/packages/db/migrations/0120.do.entity_labels.sql @@ -9,8 +9,7 @@ CREATE TABLE omnivore.entity_labels ( library_item_id uuid REFERENCES omnivore.library_item(id) ON DELETE CASCADE, highlight_id uuid REFERENCES omnivore.highlight(id) ON DELETE CASCADE, label_id uuid NOT NULL REFERENCES omnivore.labels(id) ON DELETE CASCADE, - unique(library_item_id, label_id), - unique(highlight_id, label_id) + unique(label_id, library_item_id, highlight_id) ); GRANT SELECT, INSERT, DELETE ON omnivore.entity_labels TO omnivore_user;