Files
omnivore/packages/db/elastic_migrations/add_highlight_to_elastic.py
Hongbo Wu 2b70d480d2 Remove article saving request (#493)
* Add state and taskName in elastic page mappings

* Add state and taskName in elastic page interface

* Create page with PROCESSING state before scrapping

* Update createArticleRequest API

* Fix tests

* Add default state for pages

* Update createArticle API

* Update save page

* Update save file

* Update saving item description

* Show unable to parse content for failed page

* Fix date parsing

* Search for not failed pages

* Fix tests

* Add test for saveUrl

* Update get article saving request api

* Update get article test

* Add test for articleSavingRequest API

* Add test for failure

* Return new page id if clientRequestId empty

* Update clientRequestId in savePage

* Update clientRequestId in saveFile

* Replace article with slug in articleSavingRequest

* Add slug in articleSavingRequest response

* Depreciate article

* Use slug in web

* Remove article and highlight fragments

* Query article.slug on Prod

* Show unable to parse description for failed page

* Fix a bug having duplicate pages when saving the same url multiple times

* Add state in response

* Rename variables in removeArticle API

* Rename state

* Add state in response in web

* Make state an enum

* Open temporary page by link id

* Use an empty reader view as the background for loading pages

* Progressively load the article page as content is loaded

* Add includePending flag in getArticles API

* Set includePending = true in web

* Add elastic update mappings in migration script

* Add elastic mappings in docker image

* Move index_settings.json to migrate package

* Remove elastic index creation in api

* Move elastic migrations to a separate directory

* Remove index_settings from api docker image

Co-authored-by: Jackson Harper <jacksonh@gmail.com>
2022-04-29 13:41:06 +08:00

189 lines
5.8 KiB
Python

#!/usr/bin/python
import os
import json
import psycopg2
from psycopg2.extras import RealDictCursor
from elasticsearch import Elasticsearch, NotFoundError
PG_HOST = os.getenv('PG_HOST', 'localhost')
PG_PORT = os.getenv('PG_PORT', 5432)
PG_USER = os.getenv('PG_USER', 'app_user')
PG_PASSWORD = os.getenv('PG_PASSWORD', 'app_pass')
PG_DB = os.getenv('PG_DB', 'omnivore')
ES_URL = os.getenv('ES_URL', 'http://localhost:9200')
ES_USERNAME = os.getenv('ES_USERNAME', 'elastic')
ES_PASSWORD = os.getenv('ES_PASSWORD', 'password')
UPDATE_TIME = os.getenv('UPDATE_TIME', '2019-01-01 00:00:00')
INDEX_SETTINGS = os.getenv('INDEX_SETTINGS', 'index_settings.json')
DATETIME_FORMAT = 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'
def update_mappings(client: Elasticsearch):
print('updating mappings')
try:
with open(INDEX_SETTINGS, 'r') as f:
settings = json.load(f)
client.indices.put_mapping(
index='pages_alias',
body=settings['mappings'])
print('mappings updated')
except Exception as err:
print('update mappings ERROR:', err)
exit(1)
def assertData(conn, client: Elasticsearch, pages):
# get all users from postgres
try:
success = 0
failure = 0
skip = 0
cursor = conn.cursor(cursor_factory=RealDictCursor)
for page in pages:
pageId = page['pageId']
cursor.execute(
f'''SELECT COUNT(*) FROM omnivore.highlight
WHERE elastic_page_id = \'{pageId}\' AND deleted = false''')
countInPostgres = cursor.fetchone()['count']
try:
countInElastic = len(client.get(
index='pages_alias',
id=pageId,
_source=['highlights'])['_source']['highlights'])
except NotFoundError as err:
print('Elasticsearch get ERROR:', err)
# if page is not found in elasticsearch, skip testing
skip += 1
continue
if countInPostgres == countInElastic:
success += 1
print(f'Page {pageId} OK')
else:
failure += 1
print(
f'Page {pageId} ERROR: postgres: {countInPostgres}, elastic: {countInElastic}')
cursor.close()
print(
f'Asserted data, success: {success}, failure: {failure}, skip: {skip}')
except Exception as err:
print('Assert data ERROR:', err)
exit(1)
def ingest_highlights(conn, pages):
try:
import_count = 0
cursor = conn.cursor(cursor_factory=RealDictCursor)
for page in pages:
pageId = page['pageId']
query = '''
SELECT
id,
quote,
prefix,
to_char(created_at, '{DATETIME_FORMAT}') as "createdAt",
to_char(COALESCE(updated_at, current_timestamp), '{DATETIME_FORMAT}') as "updatedAt",
suffix,
patch,
annotation,
short_id as "shortId",
user_id as "userId",
to_char(shared_at, '{DATETIME_FORMAT}') as "sharedAt"
FROM omnivore.highlight
WHERE
elastic_page_id = \'{pageId}\'
AND deleted = false
AND created_at > '{UPDATE_TIME}'
'''.format(pageId=pageId, DATETIME_FORMAT=DATETIME_FORMAT, UPDATE_TIME=UPDATE_TIME)
cursor.execute(query)
result = cursor.fetchall()
import_count += import_highlights_to_es(client, result, pageId)
print(f'Imported total {import_count} highlights to es')
cursor.close()
except Exception as err:
print('Export data to json ERROR:', err)
def import_highlights_to_es(client, highlights, pageId) -> int:
# import highlights to elasticsearch
print(f'Writing {len(highlights)} highlights to page {pageId}')
if len(highlights) == 0:
print('No highlights to import')
return 0
try:
resp = client.update(
index='pages_alias',
id=pageId,
body={'doc': {'highlights': highlights}})
count = 0
if resp['result'] == 'updated':
count = len(highlights)
print(f'Added {count} highlights to page {pageId}')
return count
except Exception as err:
print('Elasticsearch update ERROR:', err)
return 0
def get_pages_with_highlights(conn):
try:
query = f'''
SELECT DISTINCT
elastic_page_id as "pageId"
FROM omnivore.highlight
WHERE
elastic_page_id IS NOT NULL
AND deleted = false
AND created_at > '{UPDATE_TIME}'
'''
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute(query)
result = cursor.fetchall()
cursor.close()
print('Found pages with highlights:', len(result))
return result
except Exception as err:
print('Get pages with highlights ERROR:', err)
print('Starting migration')
# test elastic client
client = Elasticsearch(ES_URL, http_auth=(
ES_USERNAME, ES_PASSWORD), retry_on_timeout=True)
try:
print('Elasticsearch client connected', client.info())
except Exception as err:
print('Elasticsearch client ERROR:', err)
exit(1)
# test postgres client
conn = psycopg2.connect(
f'host={PG_HOST} port={PG_PORT} dbname={PG_DB} user={PG_USER} \
password={PG_PASSWORD}')
print('Postgres connection:', conn.info)
update_mappings(client)
pages = get_pages_with_highlights(conn)
ingest_highlights(conn, pages)
assertData(conn, client, pages)
client.close()
conn.close()
print('Migration complete')