From 68a4fb298d61c7dbc45693df46018f1d780fabce Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Wed, 24 Jan 2024 12:03:26 +0800 Subject: [PATCH] add refreshedAt to the gql schema --- packages/api/src/entity/subscription.ts | 18 +++++++++++++-- packages/api/src/generated/graphql.ts | 1 + packages/api/src/generated/schema.graphql | 1 + packages/api/src/jobs/rss/refreshAllFeeds.ts | 11 +++++----- packages/api/src/jobs/rss/refreshFeed.ts | 16 +++++++------- .../api/src/resolvers/subscriptions/index.ts | 22 ++++++++++--------- packages/api/src/schema.ts | 3 +++ packages/api/src/utils/createTask.ts | 13 +++++------ 8 files changed, 52 insertions(+), 33 deletions(-) diff --git a/packages/api/src/entity/subscription.ts b/packages/api/src/entity/subscription.ts index e1b8da23b..2c1f271b4 100644 --- a/packages/api/src/entity/subscription.ts +++ b/packages/api/src/entity/subscription.ts @@ -7,12 +7,23 @@ import { PrimaryGeneratedColumn, UpdateDateColumn, } from 'typeorm' -import { SubscriptionStatus, SubscriptionType } from '../generated/graphql' import { NewsletterEmail } from './newsletter_email' import { User } from './user' export const DEFAULT_SUBSCRIPTION_FOLDER = 'following' +export enum SubscriptionStatus { + Active = 'ACTIVE', + Deleted = 'DELETED', + Unsubscribed = 'UNSUBSCRIBED', + RefreshError = 'REFRESH_ERROR', +} + +export enum SubscriptionType { + Newsletter = 'NEWSLETTER', + Rss = 'RSS', +} + @Entity({ name: 'subscriptions' }) export class Subscription { @PrimaryGeneratedColumn('uuid') @@ -59,7 +70,7 @@ export class Subscription { count!: number @Column('timestamp', { nullable: true }) - lastFetchedAt?: Date | null + mostRecentItemDate?: Date | null @Column('text', { nullable: true }) lastFetchedChecksum?: string | null @@ -73,6 +84,9 @@ export class Subscription { @Column('timestamp', { nullable: true }) scheduledAt?: Date | null + @Column('timestamp', { nullable: true }) + refreshedAt?: Date | null + @Column('boolean') isPrivate?: boolean | null diff --git a/packages/api/src/generated/graphql.ts b/packages/api/src/generated/graphql.ts index c68e340ee..2c6f4fa11 100644 --- a/packages/api/src/generated/graphql.ts +++ b/packages/api/src/generated/graphql.ts @@ -2852,6 +2852,7 @@ export type Subscription = { export enum SubscriptionStatus { Active = 'ACTIVE', Deleted = 'DELETED', + RefreshError = 'REFRESH_ERROR', Unsubscribed = 'UNSUBSCRIBED' } diff --git a/packages/api/src/generated/schema.graphql b/packages/api/src/generated/schema.graphql index 5c2431d8a..a4ea6802d 100644 --- a/packages/api/src/generated/schema.graphql +++ b/packages/api/src/generated/schema.graphql @@ -2248,6 +2248,7 @@ type Subscription { enum SubscriptionStatus { ACTIVE DELETED + REFRESH_ERROR UNSUBSCRIBED } diff --git a/packages/api/src/jobs/rss/refreshAllFeeds.ts b/packages/api/src/jobs/rss/refreshAllFeeds.ts index 6a78d8596..765d34339 100644 --- a/packages/api/src/jobs/rss/refreshAllFeeds.ts +++ b/packages/api/src/jobs/rss/refreshAllFeeds.ts @@ -1,11 +1,10 @@ -import { Job, Queue } from 'bullmq' +import { Job } from 'bullmq' import { DataSource } from 'typeorm' -import { QUEUE_NAME, getBackendQueue } from '../../queue-processor' -import { redisDataSource } from '../../redis_data_source' +import { v4 as uuid } from 'uuid' +import { getBackendQueue } from '../../queue-processor' +import { validateUrl } from '../../services/create_page_save_request' import { RssSubscriptionGroup } from '../../utils/createTask' import { stringToHash } from '../../utils/helpers' -import { validateUrl } from '../../services/create_page_save_request' -import { v4 as uuid } from 'uuid' export type RSSRefreshContext = { type: 'all' | 'user-added' @@ -90,7 +89,7 @@ const updateSubscriptionGroup = async ( refreshContext, subscriptionIds: group.subscriptionIds, feedUrl: group.url, - lastFetchedTimestamps: group.fetchedDates.map( + lastFetchedTimestamps: group.mostRecentItemDates.map( (timestamp) => timestamp?.getTime() || 0 ), // unix timestamp in milliseconds lastFetchedChecksums: group.checksums, diff --git a/packages/api/src/jobs/rss/refreshFeed.ts b/packages/api/src/jobs/rss/refreshFeed.ts index b9c0052f9..86579476c 100644 --- a/packages/api/src/jobs/rss/refreshFeed.ts +++ b/packages/api/src/jobs/rss/refreshFeed.ts @@ -15,7 +15,7 @@ type FolderType = 'following' | 'inbox' interface RefreshFeedRequest { subscriptionIds: string[] feedUrl: string - lastFetchedTimestamps: number[] // unix timestamp in milliseconds + mostRecentItemDates: number[] // unix timestamp in milliseconds scheduledTimestamps: number[] // unix timestamp in milliseconds lastFetchedChecksums: string[] userIds: string[] @@ -28,7 +28,7 @@ export const isRefreshFeedRequest = (data: any): data is RefreshFeedRequest => { return ( 'subscriptionIds' in data && 'feedUrl' in data && - 'lastFetchedTimestamps' in data && + 'mostRecentItemDates' in data && 'scheduledTimestamps' in data && 'userIds' in data && 'lastFetchedChecksums' in data && @@ -422,7 +422,7 @@ const processSubscription = async ( userId: string, feedUrl: string, fetchResult: { content: string; checksum: string }, - lastFetchedAt: number, + mostRecentItemDate: number, scheduledAt: number, lastFetchedChecksum: string, fetchContent: boolean, @@ -445,7 +445,7 @@ const processSubscription = async ( console.log('Feed last build date', feedLastBuildDate) if ( feedLastBuildDate && - new Date(feedLastBuildDate) <= new Date(lastFetchedAt) + new Date(feedLastBuildDate) <= new Date(mostRecentItemDate) ) { console.log('Skipping old feed', feedLastBuildDate) return @@ -492,7 +492,7 @@ const processSubscription = async ( } // skip old items - if (isOldItem(feedItem, lastFetchedAt)) { + if (isOldItem(feedItem, mostRecentItemDate)) { console.log('Skipping old feed item', feedItem.link) continue } @@ -521,7 +521,7 @@ const processSubscription = async ( // no items saved if (!lastItemFetchedAt) { // the feed has been fetched before, no new valid items found - if (lastFetchedAt || !lastValidItem) { + if (mostRecentItemDate || !lastValidItem) { console.log('No new valid items found') return } @@ -571,7 +571,7 @@ export const _refreshFeed = async (request: RefreshFeedRequest) => { const { feedUrl, subscriptionIds, - lastFetchedTimestamps, + mostRecentItemDates, scheduledTimestamps, userIds, lastFetchedChecksums, @@ -618,7 +618,7 @@ export const _refreshFeed = async (request: RefreshFeedRequest) => { userIds[i], feedUrl, fetchResult, - lastFetchedTimestamps[i], + mostRecentItemDates[i], scheduledTimestamps[i], lastFetchedChecksums[i], fetchContents[i] && allowFetchContent, diff --git a/packages/api/src/resolvers/subscriptions/index.ts b/packages/api/src/resolvers/subscriptions/index.ts index b7e1507e4..06256b8bc 100644 --- a/packages/api/src/resolvers/subscriptions/index.ts +++ b/packages/api/src/resolvers/subscriptions/index.ts @@ -4,6 +4,8 @@ import { Brackets, In } from 'typeorm' import { DEFAULT_SUBSCRIPTION_FOLDER, Subscription, + SubscriptionStatus, + SubscriptionType, } from '../../entity/subscription' import { env } from '../../env' import { @@ -28,8 +30,6 @@ import { SubscriptionsError, SubscriptionsErrorCode, SubscriptionsSuccess, - SubscriptionStatus, - SubscriptionType, UnsubscribeError, UnsubscribeErrorCode, UnsubscribeSuccess, @@ -41,13 +41,13 @@ import { getRepository } from '../../repository' import { feedRepository } from '../../repository/feed' import { validateUrl } from '../../services/create_page_save_request' import { unsubscribe } from '../../services/subscriptions' +import { updateSubscription } from '../../services/update_subscription' import { Merge } from '../../util' import { analytics } from '../../utils/analytics' import { enqueueRssFeedFetch } from '../../utils/createTask' -import { getAbsoluteUrl, keysToCamelCase } from '../../utils/helpers' import { authorized } from '../../utils/gql-utils' +import { getAbsoluteUrl, keysToCamelCase } from '../../utils/helpers' import { parseFeed, parseOpml, RSS_PARSER_CONFIG } from '../../utils/parser' -import { updateSubscription } from '../../services/update_subscription' type PartialSubscription = Omit @@ -61,8 +61,7 @@ export const subscriptionsResolver = authorized< QuerySubscriptionsArgs >(async (_obj, { sort, type }, { uid, log }) => { try { - const sortBy = - sort?.by === SortBy.UpdatedTime ? 'lastFetchedAt' : 'createdAt' + const sortBy = sort?.by === SortBy.UpdatedTime ? 'refreshedAt' : 'createdAt' const sortOrder = sort?.order === SortOrder.Ascending ? 'ASC' : 'DESC' const queryBuilder = getRepository(Subscription) @@ -218,7 +217,10 @@ export const subscribeResolver = authorized< type: SubscriptionType.Rss, }) if (existingSubscription) { - if (existingSubscription.status === SubscriptionStatus.Active) { + if ( + existingSubscription.status === SubscriptionStatus.Active || + existingSubscription.status === SubscriptionStatus.RefreshError + ) { return { errorCodes: [SubscribeErrorCode.AlreadySubscribed], } @@ -239,7 +241,7 @@ export const subscribeResolver = authorized< url: feedUrl, subscriptionIds: [updatedSubscription.id], scheduledDates: [new Date()], // fetch immediately - fetchedDates: [updatedSubscription.lastFetchedAt || null], + mostRecentItemDates: [updatedSubscription.mostRecentItemDate || null], checksums: [updatedSubscription.lastFetchedChecksum || null], fetchContents: [updatedSubscription.fetchContent], folders: [updatedSubscription.folder || DEFAULT_SUBSCRIPTION_FOLDER], @@ -289,7 +291,7 @@ export const subscribeResolver = authorized< url: feedUrl, subscriptionIds: [newSubscription.id], scheduledDates: [new Date()], // fetch immediately - fetchedDates: [null], + mostRecentItemDates: [null], checksums: [null], fetchContents: [newSubscription.fetchContent], folders: [newSubscription.folder || DEFAULT_SUBSCRIPTION_FOLDER], @@ -319,7 +321,7 @@ export const updateSubscriptionResolver = authorized< UpdateSubscriptionSuccessPartial, UpdateSubscriptionError, MutationUpdateSubscriptionArgs ->(async (_, { input }, { authTrx, uid, log }) => { +>(async (_, { input }, { uid, log }) => { try { analytics.track({ userId: uid, diff --git a/packages/api/src/schema.ts b/packages/api/src/schema.ts index 47720b676..60e88f7bc 100755 --- a/packages/api/src/schema.ts +++ b/packages/api/src/schema.ts @@ -1686,12 +1686,15 @@ const schema = gql` autoAddToLibrary: Boolean fetchContent: Boolean! folder: String! + mostRecentItemDate: Date + refreshedAt: Date } enum SubscriptionStatus { ACTIVE UNSUBSCRIBED DELETED + REFRESH_ERROR } type SubscriptionsError { diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index 21fb0ea8b..09c84b62d 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -6,23 +6,22 @@ import { google } from '@google-cloud/tasks/build/protos/protos' import axios from 'axios' import { nanoid } from 'nanoid' import { DeepPartial } from 'typeorm' +import { v4 as uuid } from 'uuid' import { ImportItemState } from '../entity/integration' import { Recommendation } from '../entity/recommendation' -import { DEFAULT_SUBSCRIPTION_FOLDER } from '../entity/subscription' import { env } from '../env' import { ArticleSavingRequestStatus, CreateLabelInput, } from '../generated/graphql' +import { queueRSSRefreshFeedJob } from '../jobs/rss/refreshAllFeeds' +import { redisDataSource } from '../redis_data_source' import { signFeatureToken } from '../services/features' import { generateVerificationToken, OmnivoreAuthorizationHeader } from './auth' import { CreateTaskError } from './errors' +import { stringToHash } from './helpers' import { logger } from './logger' import View = google.cloud.tasks.v2.Task.View -import { stringToHash } from './helpers' -import { queueRSSRefreshFeedJob } from '../jobs/rss/refreshAllFeeds' -import { redisDataSource } from '../redis_data_source' -import { v4 as uuid } from 'uuid' // Instantiates a client. const client = new CloudTasksClient() @@ -630,7 +629,7 @@ export interface RssSubscriptionGroup { url: string subscriptionIds: string[] userIds: string[] - fetchedDates: (Date | null)[] + mostRecentItemDates: (Date | null)[] scheduledDates: Date[] checksums: (string | null)[] fetchContents: boolean[] @@ -648,7 +647,7 @@ export const enqueueRssFeedFetch = async ( }, subscriptionIds: subscriptionGroup.subscriptionIds, feedUrl: subscriptionGroup.url, - lastFetchedTimestamps: subscriptionGroup.fetchedDates.map( + mostRecentItemDates: subscriptionGroup.mostRecentItemDates.map( (timestamp) => timestamp?.getTime() || 0 ), // unix timestamp in milliseconds lastFetchedChecksums: subscriptionGroup.checksums,