add end_time to the migrate script
This commit is contained in:
@ -22,7 +22,8 @@ ES_SCAN_SIZE = os.getenv('ES_SCAN_SIZE', 1000)
|
||||
ES_SCROLL_TIME = os.getenv('ES_SCROLL_TIME', '2m')
|
||||
ES_INDEX = os.getenv('ES_INDEX', 'pages_alias')
|
||||
|
||||
CUT_OFF_DATE = os.getenv('CUT_OFF_DATE', '2000-01-01')
|
||||
START_TIME = os.getenv('START_TIME', '2000-01-01')
|
||||
END_TIME = os.getenv('END_TIME', '2100-01-01')
|
||||
# ISO 8601 format
|
||||
DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
|
||||
|
||||
@ -35,7 +36,7 @@ async def assert_data(db_conn, es_client, users, uploaded_files):
|
||||
for user in users:
|
||||
user_id = user['id']
|
||||
number_of_docs_in_postgres = await db_conn.fetchval(
|
||||
f'SELECT COUNT(*) FROM omnivore.library_item WHERE user_id = \'{user_id}\'')
|
||||
f'SELECT COUNT(1) FROM omnivore.library_item WHERE user_id = \'{user_id}\'')
|
||||
|
||||
query = {
|
||||
'size': 0,
|
||||
@ -226,7 +227,8 @@ async def insert_recommendations(db_conn, recommendations):
|
||||
async def insert_into_postgres(insert_query, db_conn, records):
|
||||
await db_conn.executemany(insert_query, records, timeout=60)
|
||||
# cool down for PG_COOLDOWN_TIME seconds
|
||||
await asyncio.sleep(float(PG_COOLDOWN_TIME))
|
||||
if PG_COOLDOWN_TIME > 0:
|
||||
await asyncio.sleep(float(PG_COOLDOWN_TIME))
|
||||
|
||||
|
||||
def remove_null_bytes(val):
|
||||
@ -236,7 +238,7 @@ def remove_null_bytes(val):
|
||||
|
||||
|
||||
async def main():
|
||||
print('Starting migration', CUT_OFF_DATE)
|
||||
print('Starting migration', START_TIME, END_TIME)
|
||||
|
||||
# postgres connection
|
||||
db_conn = await asyncpg.connect(user=PG_USER, password=PG_PASSWORD,
|
||||
@ -249,6 +251,9 @@ async def main():
|
||||
try:
|
||||
print(await es_client.info())
|
||||
|
||||
# disable update_library_item_modtime trigger
|
||||
await db_conn.execute('ALTER TABLE omnivore.library_item DISABLE TRIGGER update_library_item_modtime')
|
||||
|
||||
print('Getting list of users from postgres')
|
||||
users = await db_conn.fetch('SELECT id FROM omnivore.user')
|
||||
|
||||
@ -269,7 +274,8 @@ async def main():
|
||||
{
|
||||
'range': {
|
||||
'updatedAt': {
|
||||
'gte': CUT_OFF_DATE
|
||||
'gte': START_TIME,
|
||||
'lte': END_TIME,
|
||||
}
|
||||
}
|
||||
},
|
||||
@ -457,6 +463,9 @@ async def main():
|
||||
if len(recommendations) > 0:
|
||||
await insert_recommendations(db_conn, recommendations)
|
||||
|
||||
# enable update_library_item_modtime trigger
|
||||
await db_conn.execute('ALTER TABLE omnivore.library_item ENABLE TRIGGER update_library_item_modtime')
|
||||
|
||||
print('Migration complete', migrated_at)
|
||||
|
||||
await assert_data(db_conn, es_client, users, uploaded_files)
|
||||
|
||||
Reference in New Issue
Block a user