try to insert library item one by one if batch insert failed

This commit is contained in:
Hongbo Wu
2023-09-23 15:36:56 +08:00
parent 323be5376a
commit 663ccdae41

View File

@ -150,7 +150,7 @@ async def insert_library_items(db_conn, library_items):
original_content = EXCLUDED.original_content,
deleted_at = EXCLUDED.deleted_at
'''
print('Inserting library items into postgres')
print(f'Inserting {len(library_items)} library items into postgres')
await insert_into_postgres(insert_query, db_conn, library_items)
print(f'Inserted {len(library_items)} library items')
@ -185,7 +185,7 @@ async def insert_highlights(db_conn, highlights):
color = EXCLUDED.color,
html = EXCLUDED.html
'''
print('Inserting highlights into postgres')
print(f'Inserting {len(highlights)} highlights into postgres')
await insert_into_postgres(insert_query, db_conn, highlights)
print(f'Inserted {len(highlights)} highlights')
@ -202,7 +202,7 @@ async def insert_labels(db_conn, labels):
WHERE l.id = $1 AND ($2 IS NULL OR li.id = $2) AND ($3 IS NULL OR h.id = $3)
ON CONFLICT (label_id, library_item_id, highlight_id) DO NOTHING
'''
print('Inserting labels into postgres')
print(f'Inserting {len(labels)} labels into postgres')
await insert_into_postgres(insert_query, db_conn, labels)
print(f'Inserted {len(labels)} labels')
@ -221,13 +221,21 @@ async def insert_recommendations(db_conn, recommendations):
note = EXCLUDED.note,
created_at = EXCLUDED.created_at
'''
print('Inserting recommendations into postgres')
print(f'Inserting {len(recommendations)} recommendations into postgres')
await insert_into_postgres(insert_query, db_conn, recommendations)
print(f'Inserted {len(recommendations)} recommendations')
async def insert_into_postgres(insert_query, db_conn, records):
await db_conn.executemany(insert_query, records, timeout=int(PG_TIMEOUT))
try:
await db_conn.executemany(insert_query, records, timeout=int(PG_TIMEOUT))
except Exception as err:
print('Insert into postgres ERROR:', err)
# excute insert query one by one
for record in records:
print('Inserting record', record)
await db_conn.execute(insert_query, *record, timeout=int(PG_TIMEOUT))
# cool down for PG_COOLDOWN_TIME seconds
if float(PG_COOLDOWN_TIME) > 0:
await asyncio.sleep(float(PG_COOLDOWN_TIME))
@ -342,16 +350,13 @@ async def main():
reading_progress_percent = source.get('readingProgressPercent', 0)
reading_progress_anchor = source.get('readingProgressAnchorIndex', 0)
content = source['content']
description = source.get('description', None)
# skip item if content is larger than 1MB
if len(content) > 1000000:
print('Skipping item', doc['_id'], 'because content is larger than 1MB')
continue
# truncate description to 1MB characters
description = source.get('description', None)
if description is not None and len(description) > 1000000:
description = description[:1000000]
library_item = (
id,
get_uuid(source['userId']),