diff --git a/packages/db/elastic_migrations/migrate_from_elastic.py b/packages/db/elastic_migrations/migrate_from_elastic.py index 3e171df3a..587a34efa 100755 --- a/packages/db/elastic_migrations/migrate_from_elastic.py +++ b/packages/db/elastic_migrations/migrate_from_elastic.py @@ -22,7 +22,8 @@ ES_SCAN_SIZE = os.getenv('ES_SCAN_SIZE', 1000) ES_SCROLL_TIME = os.getenv('ES_SCROLL_TIME', '2m') ES_INDEX = os.getenv('ES_INDEX', 'pages_alias') -CUT_OFF_DATE = os.getenv('CUT_OFF_DATE', '2000-01-01') +START_TIME = os.getenv('START_TIME', '2000-01-01') +END_TIME = os.getenv('END_TIME', '2100-01-01') # ISO 8601 format DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' @@ -35,7 +36,7 @@ async def assert_data(db_conn, es_client, users, uploaded_files): for user in users: user_id = user['id'] number_of_docs_in_postgres = await db_conn.fetchval( - f'SELECT COUNT(*) FROM omnivore.library_item WHERE user_id = \'{user_id}\'') + f'SELECT COUNT(1) FROM omnivore.library_item WHERE user_id = \'{user_id}\'') query = { 'size': 0, @@ -226,7 +227,8 @@ async def insert_recommendations(db_conn, recommendations): async def insert_into_postgres(insert_query, db_conn, records): await db_conn.executemany(insert_query, records, timeout=60) # cool down for PG_COOLDOWN_TIME seconds - await asyncio.sleep(float(PG_COOLDOWN_TIME)) + if PG_COOLDOWN_TIME > 0: + await asyncio.sleep(float(PG_COOLDOWN_TIME)) def remove_null_bytes(val): @@ -236,7 +238,7 @@ def remove_null_bytes(val): async def main(): - print('Starting migration', CUT_OFF_DATE) + print('Starting migration', START_TIME, END_TIME) # postgres connection db_conn = await asyncpg.connect(user=PG_USER, password=PG_PASSWORD, @@ -249,6 +251,9 @@ async def main(): try: print(await es_client.info()) + # disable update_library_item_modtime trigger + await db_conn.execute('ALTER TABLE omnivore.library_item DISABLE TRIGGER update_library_item_modtime') + print('Getting list of users from postgres') users = await db_conn.fetch('SELECT id FROM omnivore.user') @@ -269,7 +274,8 @@ async def main(): { 'range': { 'updatedAt': { - 'gte': CUT_OFF_DATE + 'gte': START_TIME, + 'lte': END_TIME, } } }, @@ -457,6 +463,9 @@ async def main(): if len(recommendations) > 0: await insert_recommendations(db_conn, recommendations) + # enable update_library_item_modtime trigger + await db_conn.execute('ALTER TABLE omnivore.library_item ENABLE TRIGGER update_library_item_modtime') + print('Migration complete', migrated_at) await assert_data(db_conn, es_client, users, uploaded_files)