add refreshedAt to the gql schema
This commit is contained in:
@ -7,12 +7,23 @@ import {
|
||||
PrimaryGeneratedColumn,
|
||||
UpdateDateColumn,
|
||||
} from 'typeorm'
|
||||
import { SubscriptionStatus, SubscriptionType } from '../generated/graphql'
|
||||
import { NewsletterEmail } from './newsletter_email'
|
||||
import { User } from './user'
|
||||
|
||||
export const DEFAULT_SUBSCRIPTION_FOLDER = 'following'
|
||||
|
||||
export enum SubscriptionStatus {
|
||||
Active = 'ACTIVE',
|
||||
Deleted = 'DELETED',
|
||||
Unsubscribed = 'UNSUBSCRIBED',
|
||||
RefreshError = 'REFRESH_ERROR',
|
||||
}
|
||||
|
||||
export enum SubscriptionType {
|
||||
Newsletter = 'NEWSLETTER',
|
||||
Rss = 'RSS',
|
||||
}
|
||||
|
||||
@Entity({ name: 'subscriptions' })
|
||||
export class Subscription {
|
||||
@PrimaryGeneratedColumn('uuid')
|
||||
@ -59,7 +70,7 @@ export class Subscription {
|
||||
count!: number
|
||||
|
||||
@Column('timestamp', { nullable: true })
|
||||
lastFetchedAt?: Date | null
|
||||
mostRecentItemDate?: Date | null
|
||||
|
||||
@Column('text', { nullable: true })
|
||||
lastFetchedChecksum?: string | null
|
||||
@ -73,6 +84,9 @@ export class Subscription {
|
||||
@Column('timestamp', { nullable: true })
|
||||
scheduledAt?: Date | null
|
||||
|
||||
@Column('timestamp', { nullable: true })
|
||||
refreshedAt?: Date | null
|
||||
|
||||
@Column('boolean')
|
||||
isPrivate?: boolean | null
|
||||
|
||||
|
||||
@ -2852,6 +2852,7 @@ export type Subscription = {
|
||||
export enum SubscriptionStatus {
|
||||
Active = 'ACTIVE',
|
||||
Deleted = 'DELETED',
|
||||
RefreshError = 'REFRESH_ERROR',
|
||||
Unsubscribed = 'UNSUBSCRIBED'
|
||||
}
|
||||
|
||||
|
||||
@ -2248,6 +2248,7 @@ type Subscription {
|
||||
enum SubscriptionStatus {
|
||||
ACTIVE
|
||||
DELETED
|
||||
REFRESH_ERROR
|
||||
UNSUBSCRIBED
|
||||
}
|
||||
|
||||
|
||||
@ -1,11 +1,10 @@
|
||||
import { Job, Queue } from 'bullmq'
|
||||
import { Job } from 'bullmq'
|
||||
import { DataSource } from 'typeorm'
|
||||
import { QUEUE_NAME, getBackendQueue } from '../../queue-processor'
|
||||
import { redisDataSource } from '../../redis_data_source'
|
||||
import { v4 as uuid } from 'uuid'
|
||||
import { getBackendQueue } from '../../queue-processor'
|
||||
import { validateUrl } from '../../services/create_page_save_request'
|
||||
import { RssSubscriptionGroup } from '../../utils/createTask'
|
||||
import { stringToHash } from '../../utils/helpers'
|
||||
import { validateUrl } from '../../services/create_page_save_request'
|
||||
import { v4 as uuid } from 'uuid'
|
||||
|
||||
export type RSSRefreshContext = {
|
||||
type: 'all' | 'user-added'
|
||||
@ -90,7 +89,7 @@ const updateSubscriptionGroup = async (
|
||||
refreshContext,
|
||||
subscriptionIds: group.subscriptionIds,
|
||||
feedUrl: group.url,
|
||||
lastFetchedTimestamps: group.fetchedDates.map(
|
||||
lastFetchedTimestamps: group.mostRecentItemDates.map(
|
||||
(timestamp) => timestamp?.getTime() || 0
|
||||
), // unix timestamp in milliseconds
|
||||
lastFetchedChecksums: group.checksums,
|
||||
|
||||
@ -15,7 +15,7 @@ type FolderType = 'following' | 'inbox'
|
||||
interface RefreshFeedRequest {
|
||||
subscriptionIds: string[]
|
||||
feedUrl: string
|
||||
lastFetchedTimestamps: number[] // unix timestamp in milliseconds
|
||||
mostRecentItemDates: number[] // unix timestamp in milliseconds
|
||||
scheduledTimestamps: number[] // unix timestamp in milliseconds
|
||||
lastFetchedChecksums: string[]
|
||||
userIds: string[]
|
||||
@ -28,7 +28,7 @@ export const isRefreshFeedRequest = (data: any): data is RefreshFeedRequest => {
|
||||
return (
|
||||
'subscriptionIds' in data &&
|
||||
'feedUrl' in data &&
|
||||
'lastFetchedTimestamps' in data &&
|
||||
'mostRecentItemDates' in data &&
|
||||
'scheduledTimestamps' in data &&
|
||||
'userIds' in data &&
|
||||
'lastFetchedChecksums' in data &&
|
||||
@ -422,7 +422,7 @@ const processSubscription = async (
|
||||
userId: string,
|
||||
feedUrl: string,
|
||||
fetchResult: { content: string; checksum: string },
|
||||
lastFetchedAt: number,
|
||||
mostRecentItemDate: number,
|
||||
scheduledAt: number,
|
||||
lastFetchedChecksum: string,
|
||||
fetchContent: boolean,
|
||||
@ -445,7 +445,7 @@ const processSubscription = async (
|
||||
console.log('Feed last build date', feedLastBuildDate)
|
||||
if (
|
||||
feedLastBuildDate &&
|
||||
new Date(feedLastBuildDate) <= new Date(lastFetchedAt)
|
||||
new Date(feedLastBuildDate) <= new Date(mostRecentItemDate)
|
||||
) {
|
||||
console.log('Skipping old feed', feedLastBuildDate)
|
||||
return
|
||||
@ -492,7 +492,7 @@ const processSubscription = async (
|
||||
}
|
||||
|
||||
// skip old items
|
||||
if (isOldItem(feedItem, lastFetchedAt)) {
|
||||
if (isOldItem(feedItem, mostRecentItemDate)) {
|
||||
console.log('Skipping old feed item', feedItem.link)
|
||||
continue
|
||||
}
|
||||
@ -521,7 +521,7 @@ const processSubscription = async (
|
||||
// no items saved
|
||||
if (!lastItemFetchedAt) {
|
||||
// the feed has been fetched before, no new valid items found
|
||||
if (lastFetchedAt || !lastValidItem) {
|
||||
if (mostRecentItemDate || !lastValidItem) {
|
||||
console.log('No new valid items found')
|
||||
return
|
||||
}
|
||||
@ -571,7 +571,7 @@ export const _refreshFeed = async (request: RefreshFeedRequest) => {
|
||||
const {
|
||||
feedUrl,
|
||||
subscriptionIds,
|
||||
lastFetchedTimestamps,
|
||||
mostRecentItemDates,
|
||||
scheduledTimestamps,
|
||||
userIds,
|
||||
lastFetchedChecksums,
|
||||
@ -618,7 +618,7 @@ export const _refreshFeed = async (request: RefreshFeedRequest) => {
|
||||
userIds[i],
|
||||
feedUrl,
|
||||
fetchResult,
|
||||
lastFetchedTimestamps[i],
|
||||
mostRecentItemDates[i],
|
||||
scheduledTimestamps[i],
|
||||
lastFetchedChecksums[i],
|
||||
fetchContents[i] && allowFetchContent,
|
||||
|
||||
@ -4,6 +4,8 @@ import { Brackets, In } from 'typeorm'
|
||||
import {
|
||||
DEFAULT_SUBSCRIPTION_FOLDER,
|
||||
Subscription,
|
||||
SubscriptionStatus,
|
||||
SubscriptionType,
|
||||
} from '../../entity/subscription'
|
||||
import { env } from '../../env'
|
||||
import {
|
||||
@ -28,8 +30,6 @@ import {
|
||||
SubscriptionsError,
|
||||
SubscriptionsErrorCode,
|
||||
SubscriptionsSuccess,
|
||||
SubscriptionStatus,
|
||||
SubscriptionType,
|
||||
UnsubscribeError,
|
||||
UnsubscribeErrorCode,
|
||||
UnsubscribeSuccess,
|
||||
@ -41,13 +41,13 @@ import { getRepository } from '../../repository'
|
||||
import { feedRepository } from '../../repository/feed'
|
||||
import { validateUrl } from '../../services/create_page_save_request'
|
||||
import { unsubscribe } from '../../services/subscriptions'
|
||||
import { updateSubscription } from '../../services/update_subscription'
|
||||
import { Merge } from '../../util'
|
||||
import { analytics } from '../../utils/analytics'
|
||||
import { enqueueRssFeedFetch } from '../../utils/createTask'
|
||||
import { getAbsoluteUrl, keysToCamelCase } from '../../utils/helpers'
|
||||
import { authorized } from '../../utils/gql-utils'
|
||||
import { getAbsoluteUrl, keysToCamelCase } from '../../utils/helpers'
|
||||
import { parseFeed, parseOpml, RSS_PARSER_CONFIG } from '../../utils/parser'
|
||||
import { updateSubscription } from '../../services/update_subscription'
|
||||
|
||||
type PartialSubscription = Omit<Subscription, 'newsletterEmail'>
|
||||
|
||||
@ -61,8 +61,7 @@ export const subscriptionsResolver = authorized<
|
||||
QuerySubscriptionsArgs
|
||||
>(async (_obj, { sort, type }, { uid, log }) => {
|
||||
try {
|
||||
const sortBy =
|
||||
sort?.by === SortBy.UpdatedTime ? 'lastFetchedAt' : 'createdAt'
|
||||
const sortBy = sort?.by === SortBy.UpdatedTime ? 'refreshedAt' : 'createdAt'
|
||||
const sortOrder = sort?.order === SortOrder.Ascending ? 'ASC' : 'DESC'
|
||||
|
||||
const queryBuilder = getRepository(Subscription)
|
||||
@ -218,7 +217,10 @@ export const subscribeResolver = authorized<
|
||||
type: SubscriptionType.Rss,
|
||||
})
|
||||
if (existingSubscription) {
|
||||
if (existingSubscription.status === SubscriptionStatus.Active) {
|
||||
if (
|
||||
existingSubscription.status === SubscriptionStatus.Active ||
|
||||
existingSubscription.status === SubscriptionStatus.RefreshError
|
||||
) {
|
||||
return {
|
||||
errorCodes: [SubscribeErrorCode.AlreadySubscribed],
|
||||
}
|
||||
@ -239,7 +241,7 @@ export const subscribeResolver = authorized<
|
||||
url: feedUrl,
|
||||
subscriptionIds: [updatedSubscription.id],
|
||||
scheduledDates: [new Date()], // fetch immediately
|
||||
fetchedDates: [updatedSubscription.lastFetchedAt || null],
|
||||
mostRecentItemDates: [updatedSubscription.mostRecentItemDate || null],
|
||||
checksums: [updatedSubscription.lastFetchedChecksum || null],
|
||||
fetchContents: [updatedSubscription.fetchContent],
|
||||
folders: [updatedSubscription.folder || DEFAULT_SUBSCRIPTION_FOLDER],
|
||||
@ -289,7 +291,7 @@ export const subscribeResolver = authorized<
|
||||
url: feedUrl,
|
||||
subscriptionIds: [newSubscription.id],
|
||||
scheduledDates: [new Date()], // fetch immediately
|
||||
fetchedDates: [null],
|
||||
mostRecentItemDates: [null],
|
||||
checksums: [null],
|
||||
fetchContents: [newSubscription.fetchContent],
|
||||
folders: [newSubscription.folder || DEFAULT_SUBSCRIPTION_FOLDER],
|
||||
@ -319,7 +321,7 @@ export const updateSubscriptionResolver = authorized<
|
||||
UpdateSubscriptionSuccessPartial,
|
||||
UpdateSubscriptionError,
|
||||
MutationUpdateSubscriptionArgs
|
||||
>(async (_, { input }, { authTrx, uid, log }) => {
|
||||
>(async (_, { input }, { uid, log }) => {
|
||||
try {
|
||||
analytics.track({
|
||||
userId: uid,
|
||||
|
||||
@ -1686,12 +1686,15 @@ const schema = gql`
|
||||
autoAddToLibrary: Boolean
|
||||
fetchContent: Boolean!
|
||||
folder: String!
|
||||
mostRecentItemDate: Date
|
||||
refreshedAt: Date
|
||||
}
|
||||
|
||||
enum SubscriptionStatus {
|
||||
ACTIVE
|
||||
UNSUBSCRIBED
|
||||
DELETED
|
||||
REFRESH_ERROR
|
||||
}
|
||||
|
||||
type SubscriptionsError {
|
||||
|
||||
@ -6,23 +6,22 @@ import { google } from '@google-cloud/tasks/build/protos/protos'
|
||||
import axios from 'axios'
|
||||
import { nanoid } from 'nanoid'
|
||||
import { DeepPartial } from 'typeorm'
|
||||
import { v4 as uuid } from 'uuid'
|
||||
import { ImportItemState } from '../entity/integration'
|
||||
import { Recommendation } from '../entity/recommendation'
|
||||
import { DEFAULT_SUBSCRIPTION_FOLDER } from '../entity/subscription'
|
||||
import { env } from '../env'
|
||||
import {
|
||||
ArticleSavingRequestStatus,
|
||||
CreateLabelInput,
|
||||
} from '../generated/graphql'
|
||||
import { queueRSSRefreshFeedJob } from '../jobs/rss/refreshAllFeeds'
|
||||
import { redisDataSource } from '../redis_data_source'
|
||||
import { signFeatureToken } from '../services/features'
|
||||
import { generateVerificationToken, OmnivoreAuthorizationHeader } from './auth'
|
||||
import { CreateTaskError } from './errors'
|
||||
import { stringToHash } from './helpers'
|
||||
import { logger } from './logger'
|
||||
import View = google.cloud.tasks.v2.Task.View
|
||||
import { stringToHash } from './helpers'
|
||||
import { queueRSSRefreshFeedJob } from '../jobs/rss/refreshAllFeeds'
|
||||
import { redisDataSource } from '../redis_data_source'
|
||||
import { v4 as uuid } from 'uuid'
|
||||
|
||||
// Instantiates a client.
|
||||
const client = new CloudTasksClient()
|
||||
@ -630,7 +629,7 @@ export interface RssSubscriptionGroup {
|
||||
url: string
|
||||
subscriptionIds: string[]
|
||||
userIds: string[]
|
||||
fetchedDates: (Date | null)[]
|
||||
mostRecentItemDates: (Date | null)[]
|
||||
scheduledDates: Date[]
|
||||
checksums: (string | null)[]
|
||||
fetchContents: boolean[]
|
||||
@ -648,7 +647,7 @@ export const enqueueRssFeedFetch = async (
|
||||
},
|
||||
subscriptionIds: subscriptionGroup.subscriptionIds,
|
||||
feedUrl: subscriptionGroup.url,
|
||||
lastFetchedTimestamps: subscriptionGroup.fetchedDates.map(
|
||||
mostRecentItemDates: subscriptionGroup.mostRecentItemDates.map(
|
||||
(timestamp) => timestamp?.getTime() || 0
|
||||
), // unix timestamp in milliseconds
|
||||
lastFetchedChecksums: subscriptionGroup.checksums,
|
||||
|
||||
Reference in New Issue
Block a user