From 663ccdae4113a0dce4d2bc43cc7addc9818a057e Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Sat, 23 Sep 2023 15:36:56 +0800 Subject: [PATCH] try to insert library item one by one if batch insert failed --- .../migrate_from_elastic.py | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/packages/db/elastic_migrations/migrate_from_elastic.py b/packages/db/elastic_migrations/migrate_from_elastic.py index 0b3d972d2..50ae7831d 100755 --- a/packages/db/elastic_migrations/migrate_from_elastic.py +++ b/packages/db/elastic_migrations/migrate_from_elastic.py @@ -150,7 +150,7 @@ async def insert_library_items(db_conn, library_items): original_content = EXCLUDED.original_content, deleted_at = EXCLUDED.deleted_at ''' - print('Inserting library items into postgres') + print(f'Inserting {len(library_items)} library items into postgres') await insert_into_postgres(insert_query, db_conn, library_items) print(f'Inserted {len(library_items)} library items') @@ -185,7 +185,7 @@ async def insert_highlights(db_conn, highlights): color = EXCLUDED.color, html = EXCLUDED.html ''' - print('Inserting highlights into postgres') + print(f'Inserting {len(highlights)} highlights into postgres') await insert_into_postgres(insert_query, db_conn, highlights) print(f'Inserted {len(highlights)} highlights') @@ -202,7 +202,7 @@ async def insert_labels(db_conn, labels): WHERE l.id = $1 AND ($2 IS NULL OR li.id = $2) AND ($3 IS NULL OR h.id = $3) ON CONFLICT (label_id, library_item_id, highlight_id) DO NOTHING ''' - print('Inserting labels into postgres') + print(f'Inserting {len(labels)} labels into postgres') await insert_into_postgres(insert_query, db_conn, labels) print(f'Inserted {len(labels)} labels') @@ -221,13 +221,21 @@ async def insert_recommendations(db_conn, recommendations): note = EXCLUDED.note, created_at = EXCLUDED.created_at ''' - print('Inserting recommendations into postgres') + print(f'Inserting {len(recommendations)} recommendations into postgres') await insert_into_postgres(insert_query, db_conn, recommendations) print(f'Inserted {len(recommendations)} recommendations') async def insert_into_postgres(insert_query, db_conn, records): - await db_conn.executemany(insert_query, records, timeout=int(PG_TIMEOUT)) + try: + await db_conn.executemany(insert_query, records, timeout=int(PG_TIMEOUT)) + except Exception as err: + print('Insert into postgres ERROR:', err) + # excute insert query one by one + for record in records: + print('Inserting record', record) + await db_conn.execute(insert_query, *record, timeout=int(PG_TIMEOUT)) + # cool down for PG_COOLDOWN_TIME seconds if float(PG_COOLDOWN_TIME) > 0: await asyncio.sleep(float(PG_COOLDOWN_TIME)) @@ -342,16 +350,13 @@ async def main(): reading_progress_percent = source.get('readingProgressPercent', 0) reading_progress_anchor = source.get('readingProgressAnchorIndex', 0) content = source['content'] + description = source.get('description', None) + # skip item if content is larger than 1MB if len(content) > 1000000: print('Skipping item', doc['_id'], 'because content is larger than 1MB') continue - # truncate description to 1MB characters - description = source.get('description', None) - if description is not None and len(description) > 1000000: - description = description[:1000000] - library_item = ( id, get_uuid(source['userId']),