create a new sanitized tuple because tuple is immutable
This commit is contained in:
@ -231,18 +231,14 @@ async def insert_recommendations(db_conn, recommendations, original_ids):
|
||||
|
||||
|
||||
async def insert_into_postgres(insert_query, db_conn, records, original_ids):
|
||||
try:
|
||||
# sanitize input if input is a string
|
||||
for record in records:
|
||||
for i, val in enumerate(record):
|
||||
if isinstance(val, str):
|
||||
record[i] = sanitize_string(val)
|
||||
sanitized_records = sanitize_tuples(records)
|
||||
|
||||
await db_conn.executemany(insert_query, records, timeout=int(PG_TIMEOUT))
|
||||
try:
|
||||
await db_conn.executemany(insert_query, sanitized_records, timeout=int(PG_TIMEOUT))
|
||||
except Exception as err:
|
||||
print('Batch insert into postgres ERROR:', err)
|
||||
# excute insert query one by one if batch insert failed
|
||||
for i, record in enumerate(records):
|
||||
for i, record in enumerate(sanitized_records):
|
||||
# print original id for debugging
|
||||
print('Inserting record', original_ids[i])
|
||||
try:
|
||||
@ -268,8 +264,21 @@ async def insert_into_postgres(insert_query, db_conn, records, original_ids):
|
||||
await asyncio.sleep(float(PG_COOLDOWN_TIME))
|
||||
|
||||
|
||||
def sanitize_tuples(tuples):
|
||||
sanitize_tuples = []
|
||||
for tuple in tuples:
|
||||
sanitize_tuple = []
|
||||
for val in tuple:
|
||||
sanitize_tuple.append(sanitize_string(val))
|
||||
sanitize_tuples.append(sanitize_tuple)
|
||||
return sanitize_tuples
|
||||
|
||||
|
||||
def sanitize_string(val):
|
||||
return replace_surrogates(remove_null_bytes(val))
|
||||
# sanitize valu if val is a string
|
||||
if isinstance(val, str):
|
||||
return replace_surrogates(remove_null_bytes(val))
|
||||
return val
|
||||
|
||||
|
||||
def remove_null_bytes(val):
|
||||
|
||||
Reference in New Issue
Block a user