Skip to content

Commit

Permalink
add redis
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaumewuip committed Aug 21, 2023
1 parent 700b734 commit 3412d2a
Show file tree
Hide file tree
Showing 7 changed files with 387 additions and 214 deletions.
139 changes: 94 additions & 45 deletions rss-to-tana/index.js
@@ -1,11 +1,16 @@
const RSSParser = require('rss-parser');
const cron = require('node-cron');

const { saveItem } = require('./item');
const { run } = require('./runner')
const Store = require('./store');
const Item = require('./item');
const Tana = require('./tana');

const parser = new RSSParser();
const parser = new RSSParser({
defaultRSS: 2.0,
xml2js: {
strict: true,
}
});

const schedules = {
twiceAtNight: '0 0 23,4 * * *', // 23:00 and 04:00 every day
Expand All @@ -17,116 +22,160 @@ const rssFeeds = [
{
url: 'https://lesoreillescurieuses.com/feed/',
cron: schedules.twiceAtNight,
toTana: Tana.album,
toTana: Item.tana.album,
},
{
url: 'https://cmd.wuips.com/rss/feed.xml',
cron: schedules.everyHour,
toTana: Tana.music,
toTana: Item.tana.music,
},
{
url: 'http://pitchfork.com/rss/reviews/best/albums/',
cron: schedules.twiceAtNight,
toTana: Tana.album,
toTana: Item.tana.album,
},
{
url: 'https://www.prun.net/emission/8MNV-iss/rss',
cron: schedules.twiceAtNight,
toTana: Tana.music,
toTana: Item.tana.music,
},
{
url: 'https://stnt.org/rss.xml',
cron: schedules.twiceAtNight,
toTana: Tana.album,
toTana: Item.tana.album,
},
{
url: 'https://www.tsugi.fr/feed/',
cron: schedules.twiceAtNight,
toTana: Tana.album,
toTana: Item.tana.album,
},

// Tech
{
// Codrops
url: 'http://feeds2.feedburner.com/tympanus',
cron: schedules.everyHour,
toTana: Tana.website,
toTana: Item.tana.website,
},
{
url: 'https://leaddev.com/content-piece-and-series/rss.xml',
cron: schedules.everyHour,
toTana: Tana.website,
toTana: Item.tana.website,
},
{
// Thoughtworks Technology Podcast
url: 'http://feeds.soundcloud.com/users/soundcloud:users:94605026/sounds.rss',
cron: schedules.everyHour,
toTana: Tana.website,
toTana: Item.tana.website,
},

// Design
{
url: 'http://minimalissimo.com/feed/',
cron: schedules.twiceAtNight,
toTana: Tana.website,
toTana: Item.tana.website,
},

// Personal Development
{
url: 'http://feeds.feedburner.com/zenhabits',
cron: schedules.twiceAtNight,
toTana: Tana.website,
toTana: Item.tana.website,
},

// Others
{
url: 'http://www.lesothers.com/feed/',
cron: schedules.twiceAtNight,
toTana: Tana.website,
toTana: Item.tana.website,
},
{
url: 'https://worksinprogress.substack.com/feed/',
cron: schedules.twiceAtNight,
toTana: Tana.website,
toTana: Item.tana.website,
}
];

function parseFeed(feed) {
return async function (lastRunDate) {
try {
console.log(feed.url, `parsing for items published after ${lastRunDate.toISOString()}`)
const parsedFeed = await parser.parseURL(feed.url);
function dateDiffInDays(a, b) {
const _MS_PER_DAY = 1000 * 60 * 60 * 24;
// Discard the time and time-zone information.
const utc1 = Date.UTC(a.getFullYear(), a.getMonth(), a.getDate());
const utc2 = Date.UTC(b.getFullYear(), b.getMonth(), b.getDate());

for (const item of parsedFeed.items) {
const publishedAt = new Date(item.isoDate);
if (publishedAt > lastRunDate) {
console.log(feed.url, `new ${item.title} detected`);
return Math.floor((utc2 - utc1) / _MS_PER_DAY);
}

const tanaNode = feed.toTana(feed.url, item)
saveItem(tanaNode);
}
}
} catch (error) {
console.error(feed.url, `parsing error`, error);
}
async function extractItems(feed) {
console.log(feed.url, '- parsing')
try {
const parsedFeed = await parser.parseURL(feed.url);

return parsedFeed.items.map(rssItem => Item.create(rssItem, feed))
} catch (error) {
console.error(feed.url, `parsing error`, error);

return []
}
}

// removes items older than 3 days
async function filterOlderItems(items) {
const now = new Date()
return items.filter(item => dateDiffInDays(item.publishedAt, now) < 3)
}

for (const feed of rssFeeds) {
/**
* We can use FORCE=true env var to run the feeds parsing directly, without
* cron schedule
*/
if (process.env.FORCE === 'true') {
run(parseFeed(feed))()
} else {
console.log('Scheduling', feed.url, 'on', feed.cron)
async function filterSavedItems(items) {
const newItems = []

if (!cron.validate(feed.cron)) {
throw new Error(`${feed.cron} not a valid cron expression`)
}
for (const item of items) {
const itemSavedAlready = await Store.savedAlready(item.id)

cron.schedule(feed.cron, run(parseFeed(feed)))
if (!itemSavedAlready) {
newItems.push(item)
}
}

return newItems
}

async function parseFeed(feed) {
const items = await extractItems(feed)
console.log(feed.url, `- ${items.length} items in feed`)

const notOldItems = await filterOlderItems(items)
console.log(feed.url, `- ${notOldItems.length} items young enough`)

const notAlreadySaved = await filterSavedItems(notOldItems)
console.log(feed.url, `- ${notAlreadySaved.length} new items`)

Tana.saveItems(notAlreadySaved);
}

// 1. Parses feed to retrive their items
// 2. Filters out those that
// - either are older than 3 days
// - are in the redis
// Items left are considered new items
// 3. Pushes the new items to the save queue
// 4. Once saved, saves the items in the redis
(async () => {
await Store.initialize()

for (const feed of rssFeeds) {
/**
* We can use FORCE=true env var to run the feeds parsing directly, without
* cron schedule
*/
if (process.env.FORCE === 'true') {
await parseFeed(feed)
} else {
console.log('Scheduling', feed.url, 'on', feed.cron)

if (!cron.validate(feed.cron)) {
throw new Error(`${feed.cron} not a valid cron expression`)
}

cron.schedule(feed.cron, () => parseFeed(feed))
}
}
})();
156 changes: 98 additions & 58 deletions rss-to-tana/item.js
@@ -1,67 +1,107 @@
const API_KEY = process.env.TANA_API_KEY
function source(feedUrl) {
return {
/* Source */
type: "field",
attributeId: "SalqarOgiv",
children: [
{
name: `RSS to Tana - ${feedUrl}`
}
]
}
}

function postNodes(nodes) {
// Sending all given nodes at once as we think we won't have more than 100
// nodes here
// @see https://github.com/tanainc/tana-input-api-samples
//
// We're also adding the #inbox super tag on all node
const payload = {
targetNodeId: 'INBOX',
nodes: nodes.map(node => ({
...node,
supertags: [
...node.supertags,
{
/* inbox */
id: 'hNwXd-0aYDVj'
}
]
}))
};
function title(item) {
return {
/* Title */
type: 'field',
attributeId: 'ksBOEhsvfu',
children: [
{
name: item.title,
}
]
}
}

return fetch('https://europe-west1-tagr-prod.cloudfunctions.net/addToNodeV2', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${API_KEY}`
},
body: JSON.stringify(payload)
})
.then(response => {
if (!response.ok) {
throw new Error(`Error saving nodes: ${response.status} ${response.statusText}`)
}
})
function url(item) {
return {
/* URL */
type: 'field',
attributeId: 'S4UUISQkxn2X',
children: [
{
dataType: 'url',
name: item.link
}
]
}
}

const queue = []
function album(feedUrl, item) {
return {
name: item.title,
supertags: [
{
/* Album */
id: 'eWlghv3V42SH'
},
],
children: [
title(item),
url(item),
source(feedUrl)
]
}
}

// every 20s, we post the queue
setInterval(
() => {
if (queue.length) {
console.log(`Posting ${queue.length} items to Tana`)
function music(feedUrl, item) {
return {
name: item.title,
supertags: [
{
/* Music */
id: 'VI7FwJEpFAqY'
},
],
children: [
title(item),
url(item),
source(feedUrl)
]
}
}

// extracting all items from the queue
const nodes = queue.splice(0, Infinity)
function website(feedUrl, item) {
return {
name: item.title,
supertags: [
{
/* Website */
id: 'G3E1S3l-dk0v'
}
],
children: [
url(item),
source(feedUrl)
]
}
}

postNodes(nodes)
.then(() => {
console.log(`${nodes.length} nodes saved`);
})
// in case of failure, we put back items in the queue
.catch(error => {
console.error(error);
queue.push(...nodes)
});
}
},
20 * 1000
)
const create = (rssItem, feed) => ({
id: rssItem.link,
title: rssItem.title,
publishedAt: new Date(rssItem.isoDate),
tanaNode: feed.toTana(feed.url, rssItem),
feed,
})

function saveItem(node) {
queue.push(node)
}

module.exports = { saveItem };
module.exports = {
tana: {
album,
music,
website,
},
create,
}

0 comments on commit 3412d2a

Please sign in to comment.