finish migration script

This commit is contained in:
Hongbo Wu
2023-09-19 16:37:21 +08:00
parent 68e381deb2
commit 41e8ee3aca
2 changed files with 110 additions and 33 deletions

View File

@ -17,7 +17,7 @@ PG_DB = os.getenv('PG_DB', 'omnivore')
ES_URL = os.getenv('ES_URL', 'http://localhost:9200')
ES_USERNAME = os.getenv('ES_USERNAME', 'elastic')
ES_PASSWORD = os.getenv('ES_PASSWORD', 'password')
ES_SCAN_SIZE = os.getenv('ES_SCAN_SIZE', 100)
ES_SCAN_SIZE = os.getenv('ES_SCAN_SIZE', 10000)
ES_INDEX = os.getenv('ES_INDEX', 'pages_alias')
CUT_OFF_DATE = os.getenv('CUT_OFF_DATE', '2000-01-01')
@ -31,7 +31,6 @@ def get_uuid(val):
return val
except ValueError:
id = convert_string_to_uuid(val)
print('Converted string to uuid', val, id)
return id
@ -121,8 +120,43 @@ async def insert_highlights(db_conn, highlights):
print(f'Inserted {len(highlights)} highlights')
async def insert_labels(db_conn, labels):
insert_query = """
INSERT INTO omnivore.entity_labels (
label_id, library_item_id, highlight_id
)
SELECT $1, $2, $3
FROM omnivore.labels l
LEFT JOIN omnivore.library_item li ON li.id = $2
LEFT JOIN omnivore.highlight h ON h.id = $3
WHERE l.id = $1 AND ($2 IS NULL OR li.id = $2) AND ($3 IS NULL OR h.id = $3)
ON CONFLICT (label_id, library_item_id, highlight_id) DO NOTHING
"""
print('Inserting labels into postgres')
await insert_into_postgres(insert_query, db_conn, labels)
print(f'Inserted {len(labels)} labels')
async def insert_recommendations(db_conn, recommendations):
insert_query = """
INSERT INTO omnivore.recommendation (
library_item_id, recommender_id, group_id, note, created_at
)
SELECT $1, $2, $3, $4, $5
FROM omnivore.library_item li
INNER JOIN omnivore.user u ON u.id = $2
INNER JOIN omnivore.group g ON g.id = $3
WHERE li.id = $1
ON CONFLICT (library_item_id, recommender_id, group_id) DO UPDATE SET
note = EXCLUDED.note,
created_at = EXCLUDED.created_at
"""
print('Inserting recommendations into postgres')
await insert_into_postgres(insert_query, db_conn, recommendations)
print(f'Inserted {len(recommendations)} recommendations')
async def insert_into_postgres(insert_query, db_conn, records):
print('Inserting data into postgres')
await db_conn.executemany(insert_query, records, timeout=60)
@ -134,6 +168,8 @@ def remove_null_bytes(val):
async def main():
print('Starting migration')
migrated_at = CUT_OFF_DATE
print('Cut off date', CUT_OFF_DATE)
# postgres connection
db_conn = await asyncpg.connect(user=PG_USER, password=PG_PASSWORD,
@ -203,6 +239,8 @@ async def main():
}
]
}
# get current time
migrated_at = datetime.utcnow().strftime(DATE_FORMAT)
# Scan API for larger library
docs = async_scan(es_client, index=ES_INDEX, query=query,
preserve_order=True, size=ES_SCAN_SIZE,
@ -216,29 +254,6 @@ async def main():
# convert library items to postgres format
source = doc['_source']
subscription = source['subscription'] if 'subscription' in source else source.get('rssFeedUrl', None)
# convert highlights to postgres format
if 'highlights' in source:
for highlight in source['highlights']:
highlights.append((
get_uuid(highlight['id']),
get_uuid(highlight['userId']),
highlight.get('quote', None),
highlight.get('prefix', None),
highlight.get('suffix', None),
highlight.get('patch', None),
highlight.get('annotation', None),
convert_string_to_datetime(highlight['createdAt']),
convert_string_to_datetime(highlight.get('updatedAt', None)),
convert_string_to_datetime(highlight.get('sharedAt', None)),
highlight.get('shortId', None),
id,
highlight.get('highlightPositionPercent', 0),
highlight.get('highlightPositionAnchorIndex', 0),
highlight.get('type', 'HIGHLIGHT'),
highlight.get('color', None),
highlight.get('html', None),
))
page_type = source['pageType']
content_reader = 'WEB'
if 'uploadFileId' in source:
@ -278,22 +293,85 @@ async def main():
remove_null_bytes(source.get('originalHtml', None)),
)
library_items.append(library_item)
# convert labels to postgres format
if 'labels' in source:
for label in source['labels']:
labels.append((
get_uuid(label['id']),
id,
None,
))
# convert highlights to postgres format
if 'highlights' in source:
for highlight in source['highlights']:
highlight_id = get_uuid(highlight['id'])
highlights.append((
highlight_id,
get_uuid(highlight['userId']),
highlight.get('quote', None),
highlight.get('prefix', None),
highlight.get('suffix', None),
highlight.get('patch', None),
highlight.get('annotation', None),
convert_string_to_datetime(highlight['createdAt']),
convert_string_to_datetime(highlight.get('updatedAt', None)),
convert_string_to_datetime(highlight.get('sharedAt', None)),
highlight.get('shortId', None),
id,
highlight.get('highlightPositionPercent', 0),
highlight.get('highlightPositionAnchorIndex', 0),
highlight.get('type', 'HIGHLIGHT'),
highlight.get('color', None),
highlight.get('html', None),
))
if 'labels' in highlight:
for label in highlight['labels']:
labels.append((
get_uuid(label['id']),
None,
highlight_id,
))
# convert recommendations to postgres format
if 'recommendations' in source:
for recommendation in source['recommendations']:
recommendations.append((
id,
get_uuid(recommendation['user']['userId']),
get_uuid(recommendation['id']),
recommendation.get('note', None),
convert_string_to_datetime(recommendation['recommendedAt']),
))
# copy to postgres every ES_SCAN_SIZE records
if i % ES_SCAN_SIZE == 0:
await insert_library_items(db_conn, library_items)
print('Copied', i, 'records to postgres')
library_items = []
if len(highlights) > 0:
await insert_highlights(db_conn, highlights)
highlights = []
print('Copied', i, 'records to postgres')
library_items = []
if len(labels) > 0:
await insert_labels(db_conn, labels)
labels = []
if len(recommendations) > 0:
await insert_recommendations(db_conn, recommendations)
recommendations = []
# copy remaining records to postgres
if len(library_items) > 0:
await insert_library_items(db_conn, library_items)
if len(highlights) > 0:
await insert_highlights(db_conn, highlights)
print('Copied', i, 'records to postgres')
if len(highlights) > 0:
await insert_highlights(db_conn, highlights)
if len(labels) > 0:
await insert_labels(db_conn, labels)
if len(recommendations) > 0:
await insert_recommendations(db_conn, recommendations)
print('Migration complete')
print('Migration complete', migrated_at)
except Exception as err:
print('Migration error', err)
finally:

View File

@ -9,8 +9,7 @@ CREATE TABLE omnivore.entity_labels (
library_item_id uuid REFERENCES omnivore.library_item(id) ON DELETE CASCADE,
highlight_id uuid REFERENCES omnivore.highlight(id) ON DELETE CASCADE,
label_id uuid NOT NULL REFERENCES omnivore.labels(id) ON DELETE CASCADE,
unique(library_item_id, label_id),
unique(highlight_id, label_id)
unique(label_id, library_item_id, highlight_id)
);
GRANT SELECT, INSERT, DELETE ON omnivore.entity_labels TO omnivore_user;