Merge pull request #2330 from omnivore-app/fix/bulk-api
perf: add max doc and throttling to bulk api
This commit is contained in:
@ -695,10 +695,12 @@ export const searchAsYouType = async (
|
||||
}
|
||||
}
|
||||
|
||||
export const updatePagesAsync = async (
|
||||
export const updatePages = async (
|
||||
userId: string,
|
||||
action: BulkActionType,
|
||||
args: PageSearchArgs,
|
||||
maxDocs: number,
|
||||
async: boolean,
|
||||
labels?: Label[]
|
||||
): Promise<string | null> => {
|
||||
// build the script
|
||||
@ -774,8 +776,11 @@ export const updatePagesAsync = async (
|
||||
const { body } = await client.updateByQuery({
|
||||
index: INDEX_ALIAS,
|
||||
conflicts: 'proceed',
|
||||
wait_for_completion: false,
|
||||
wait_for_completion: !async,
|
||||
body: searchBody,
|
||||
max_docs: maxDocs,
|
||||
requests_per_second: 500, // throttle the requests
|
||||
slices: 'auto', // parallelize the requests
|
||||
})
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
|
||||
@ -784,8 +789,13 @@ export const updatePagesAsync = async (
|
||||
return null
|
||||
}
|
||||
|
||||
console.log('update pages task started', body.task)
|
||||
return body.task as string
|
||||
if (async) {
|
||||
console.log('update pages task started', body.task)
|
||||
return body.task as string
|
||||
}
|
||||
|
||||
console.log('updated pages in elastic', body.updated)
|
||||
return body.updated as string
|
||||
} catch (e) {
|
||||
console.log('failed to update pages in elastic', e)
|
||||
return null
|
||||
|
||||
@ -1302,6 +1302,8 @@ export type MutationAddPopularReadArgs = {
|
||||
|
||||
export type MutationBulkActionArgs = {
|
||||
action: BulkActionType;
|
||||
async?: InputMaybe<Scalars['Boolean']>;
|
||||
expectedCount?: InputMaybe<Scalars['Int']>;
|
||||
labelIds?: InputMaybe<Array<Scalars['ID']>>;
|
||||
query: Scalars['String'];
|
||||
};
|
||||
|
||||
@ -1093,7 +1093,7 @@ type MoveLabelSuccess {
|
||||
|
||||
type Mutation {
|
||||
addPopularRead(name: String!): AddPopularReadResult!
|
||||
bulkAction(action: BulkActionType!, labelIds: [ID!], query: String!): BulkActionResult!
|
||||
bulkAction(action: BulkActionType!, async: Boolean, expectedCount: Int, labelIds: [ID!], query: String!): BulkActionResult!
|
||||
createArticle(input: CreateArticleInput!): CreateArticleResult!
|
||||
createArticleSavingRequest(input: CreateArticleSavingRequestInput!): CreateArticleSavingRequestResult!
|
||||
createGroup(input: CreateGroupInput!): CreateGroupResult!
|
||||
|
||||
@ -13,7 +13,7 @@ import {
|
||||
searchAsYouType,
|
||||
searchPages,
|
||||
updatePage,
|
||||
updatePagesAsync,
|
||||
updatePages,
|
||||
} from '../../elastic/pages'
|
||||
import {
|
||||
ArticleSavingRequestStatus,
|
||||
@ -1098,46 +1098,59 @@ export const bulkActionResolver = authorized<
|
||||
BulkActionSuccess,
|
||||
BulkActionError,
|
||||
MutationBulkActionArgs
|
||||
>(async (_parent, { query, action, labelIds }, { claims: { uid }, log }) => {
|
||||
log.info('bulkActionResolver')
|
||||
>(
|
||||
async (
|
||||
_parent,
|
||||
{ query, action, labelIds, expectedCount, async },
|
||||
{ claims: { uid }, log }
|
||||
) => {
|
||||
log.info('bulkActionResolver')
|
||||
|
||||
analytics.track({
|
||||
userId: uid,
|
||||
event: 'BulkAction',
|
||||
properties: {
|
||||
env: env.server.apiEnv,
|
||||
action,
|
||||
},
|
||||
})
|
||||
analytics.track({
|
||||
userId: uid,
|
||||
event: 'BulkAction',
|
||||
properties: {
|
||||
env: env.server.apiEnv,
|
||||
action,
|
||||
},
|
||||
})
|
||||
|
||||
if (!uid) {
|
||||
log.log('bulkActionResolver', { error: 'Unauthorized' })
|
||||
return { errorCodes: [BulkActionErrorCode.Unauthorized] }
|
||||
}
|
||||
if (!uid) {
|
||||
log.log('bulkActionResolver', { error: 'Unauthorized' })
|
||||
return { errorCodes: [BulkActionErrorCode.Unauthorized] }
|
||||
}
|
||||
|
||||
if (!query) {
|
||||
log.log('bulkActionResolver', { error: 'no query' })
|
||||
return { errorCodes: [BulkActionErrorCode.BadRequest] }
|
||||
}
|
||||
|
||||
// get labels if needed
|
||||
let labels = undefined
|
||||
if (action === BulkActionType.AddLabels) {
|
||||
if (!labelIds || labelIds.length === 0) {
|
||||
if (!query) {
|
||||
log.log('bulkActionResolver', { error: 'no query' })
|
||||
return { errorCodes: [BulkActionErrorCode.BadRequest] }
|
||||
}
|
||||
|
||||
labels = await getLabelsByIds(uid, labelIds)
|
||||
// get labels if needed
|
||||
let labels = undefined
|
||||
if (action === BulkActionType.AddLabels) {
|
||||
if (!labelIds || labelIds.length === 0) {
|
||||
return { errorCodes: [BulkActionErrorCode.BadRequest] }
|
||||
}
|
||||
|
||||
labels = await getLabelsByIds(uid, labelIds)
|
||||
}
|
||||
|
||||
// parse query
|
||||
const searchQuery = parseSearchQuery(query)
|
||||
|
||||
// start a task to update pages
|
||||
const taskId = await updatePages(
|
||||
uid,
|
||||
action,
|
||||
searchQuery,
|
||||
Math.min(expectedCount ?? 500, 500), // default and max to 500
|
||||
!!async, // default to false
|
||||
labels
|
||||
)
|
||||
|
||||
return { success: !!taskId }
|
||||
}
|
||||
|
||||
// parse query
|
||||
const searchQuery = parseSearchQuery(query)
|
||||
|
||||
// start a task to update pages
|
||||
const taskId = await updatePagesAsync(uid, action, searchQuery, labels)
|
||||
|
||||
return { success: !!taskId }
|
||||
})
|
||||
)
|
||||
|
||||
const getUpdateReason = (page: Page, since: Date) => {
|
||||
if (page.state === ArticleSavingRequestStatus.Deleted) {
|
||||
|
||||
@ -2545,6 +2545,8 @@ const schema = gql`
|
||||
query: String!
|
||||
action: BulkActionType!
|
||||
labelIds: [ID!]
|
||||
expectedCount: Int # max number of items to process
|
||||
async: Boolean # if true, return immediately and process in the background
|
||||
): BulkActionResult!
|
||||
importFromIntegration(integrationId: ID!): ImportFromIntegrationResult!
|
||||
}
|
||||
|
||||
@ -1267,13 +1267,11 @@ describe('Article API', () => {
|
||||
authToken
|
||||
).expect(200)
|
||||
expect(res.body.data.bulkAction.success).to.be.true
|
||||
// Wait for the archive to finish
|
||||
await setTimeout(async () => {
|
||||
const pages = await graphqlRequest(searchQuery(), authToken).expect(
|
||||
200
|
||||
)
|
||||
expect(pages.body.data.search.pageInfo.totalCount).to.eql(0)
|
||||
}, 1000)
|
||||
|
||||
await refreshIndex()
|
||||
|
||||
const pages = await graphqlRequest(searchQuery(), authToken).expect(200)
|
||||
expect(pages.body.data.search.pageInfo.totalCount).to.eql(0)
|
||||
})
|
||||
})
|
||||
|
||||
@ -1284,14 +1282,14 @@ describe('Article API', () => {
|
||||
authToken
|
||||
).expect(200)
|
||||
expect(res.body.data.bulkAction.success).to.be.true
|
||||
// Wait for the delete to finish
|
||||
await setTimeout(async () => {
|
||||
const pages = await graphqlRequest(
|
||||
searchQuery('in:all'),
|
||||
authToken
|
||||
).expect(200)
|
||||
expect(pages.body.data.search.pageInfo.totalCount).to.eql(0)
|
||||
}, 1000)
|
||||
|
||||
await refreshIndex()
|
||||
|
||||
const pages = await graphqlRequest(
|
||||
searchQuery('in:all'),
|
||||
authToken
|
||||
).expect(200)
|
||||
expect(pages.body.data.search.pageInfo.totalCount).to.eql(0)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user