stop writing empty csv file if no new data retrieved from the integration
This commit is contained in:
@ -211,50 +211,62 @@ export function integrationsServiceRouter() {
|
||||
integrationId: integration.id,
|
||||
})
|
||||
|
||||
// write the list of urls to a csv file and upload it to gcs
|
||||
// path style: imports/<uid>/<date>/<type>-<uuid>.csv
|
||||
const dateStr = DateTime.now().toISODate()
|
||||
const fileUuid = uuidv4()
|
||||
const fullPath = `imports/${userId}/${dateStr}/URL_LIST-${fileUuid}.csv`
|
||||
// open a write_stream to the file
|
||||
const file = createGCSFile(fullPath)
|
||||
writeStream = file.createWriteStream({
|
||||
contentType: 'text/csv',
|
||||
})
|
||||
// stringify the data and pipe it to the write_stream
|
||||
const stringifier = stringify({
|
||||
header: false,
|
||||
columns: ['url', 'state', 'labels'],
|
||||
})
|
||||
stringifier.pipe(writeStream)
|
||||
|
||||
let offset = 0
|
||||
const since = integration.syncedAt?.getTime() || 0
|
||||
let syncedAt = since
|
||||
// eslint-disable-next-line no-constant-condition
|
||||
while (true) {
|
||||
// get pages from integration
|
||||
const retrieved = await integrationService.retrieve({
|
||||
token: integration.token,
|
||||
since,
|
||||
offset,
|
||||
})
|
||||
syncedAt = retrieved.since || Date.now()
|
||||
|
||||
const retrievedData = retrieved.data
|
||||
if (retrievedData.length === 0) {
|
||||
break
|
||||
}
|
||||
// write the list of urls, state and labels to the stream
|
||||
retrievedData.forEach((row) => stringifier.write(row))
|
||||
// get pages from integration
|
||||
const retrieved = await integrationService.retrieve({
|
||||
token: integration.token,
|
||||
since,
|
||||
offset,
|
||||
})
|
||||
syncedAt = retrieved.since || Date.now()
|
||||
|
||||
offset += retrievedData.length
|
||||
console.debug('retrieved data', {
|
||||
total: offset,
|
||||
size: retrievedData.length,
|
||||
let retrievedData = retrieved.data
|
||||
// if there are pages to import
|
||||
if (retrievedData.length > 0) {
|
||||
// write the list of urls to a csv file and upload it to gcs
|
||||
// path style: imports/<uid>/<date>/<type>-<uuid>.csv
|
||||
const dateStr = DateTime.now().toISODate()
|
||||
const fileUuid = uuidv4()
|
||||
const fullPath = `imports/${userId}/${dateStr}/URL_LIST-${fileUuid}.csv`
|
||||
// open a write_stream to the file
|
||||
const file = createGCSFile(fullPath)
|
||||
writeStream = file.createWriteStream({
|
||||
contentType: 'text/csv',
|
||||
})
|
||||
// stringify the data and pipe it to the write_stream
|
||||
const stringifier = stringify({
|
||||
header: false,
|
||||
columns: ['url', 'state', 'labels'],
|
||||
})
|
||||
stringifier.pipe(writeStream)
|
||||
|
||||
// paginate api calls to the integration
|
||||
do {
|
||||
// write the list of urls, state and labels to the stream
|
||||
retrievedData.forEach((row) => stringifier.write(row))
|
||||
|
||||
// get next pages from the integration
|
||||
offset += retrievedData.length
|
||||
|
||||
const retrieved = await integrationService.retrieve({
|
||||
token: integration.token,
|
||||
since,
|
||||
offset,
|
||||
})
|
||||
syncedAt = retrieved.since || Date.now()
|
||||
retrievedData = retrieved.data
|
||||
|
||||
console.debug('retrieved data', {
|
||||
total: offset,
|
||||
size: retrievedData.length,
|
||||
})
|
||||
} while (retrievedData.length > 0)
|
||||
}
|
||||
// update the integration's syncedAt
|
||||
|
||||
// update the integration's syncedAt and remove taskName
|
||||
await getRepository(Integration).update(integration.id, {
|
||||
syncedAt: new Date(syncedAt),
|
||||
taskName: null,
|
||||
|
||||
Reference in New Issue
Block a user