Add /_stats endpoint

Also, remove ingest_stats worker and use the cache instead.
This commit is contained in:
MICHAEL JACKSON
2017-08-22 08:31:33 -07:00
parent c4f3d5bbbc
commit 2a0d32f214
32 changed files with 555 additions and 18278 deletions

View File

@ -1,5 +1,5 @@
require('isomorphic-fetch')
const invariant = require('invariant')
const warning = require('warning')
const gunzip = require('gunzip-maybe')
const ndjson = require('ndjson')
@ -7,12 +7,12 @@ const CloudflareAPIURL = 'https://api.cloudflare.com'
const CloudflareEmail = process.env.CLOUDFLARE_EMAIL
const CloudflareKey = process.env.CLOUDFLARE_KEY
invariant(
warning(
CloudflareEmail,
'Missing the $CLOUDFLARE_EMAIL environment variable'
)
invariant(
warning(
CloudflareKey,
'Missing the $CLOUDFLARE_KEY environment variable'
)
@ -30,16 +30,50 @@ function getJSON(path, headers) {
return get(path, headers).then(function (res) {
return res.json()
}).then(function (data) {
if (!data.success) {
console.error(`CloudflareAPI.getJSON failed at ${path}`)
console.error(data)
throw new Error('Failed to getJSON from Cloudflare')
}
return data.result
})
}
function getZones(domain) {
return getJSON(`/zones?name=${domain}`)
function getZones(domains) {
return Promise.all(
(Array.isArray(domains) ? domains : [ domains ]).map(function (domain) {
return getJSON(`/zones?name=${domain}`)
})
).then(function (results) {
return results.reduce(function (memo, zones) {
return memo.concat(zones)
})
})
}
function getZoneAnalyticsDashboard(zoneId, since) {
return getJSON(`/zones/${zoneId}/analytics/dashboard?since=${since}&continuous=true`)
function reduceResults(target, values) {
Object.keys(values).forEach(key => {
const value = values[key]
if (typeof value === 'object' && value) {
target[key] = reduceResults(target[key] || {}, value)
} else if (typeof value === 'number') {
target[key] = (target[key] || 0) + values[key]
}
})
return target
}
function getZoneAnalyticsDashboard(zones, since, until) {
return Promise.all(
(Array.isArray(zones) ? zones : [ zones ]).map(function (zone) {
return getJSON(`/zones/${zone.id}/analytics/dashboard?since=${since.toISOString()}&until=${until.toISOString()}`)
})
).then(function (results) {
return results.reduce(reduceResults)
})
}
function getJSONStream(path, headers) {

View File

@ -1,345 +0,0 @@
const invariant = require('invariant')
const startOfDay = require('date-fns/start_of_day')
const addDays = require('date-fns/add_days')
const cf = require('./CloudflareAPI')
const db = require('./RedisClient')
const {
createDayKey,
createHourKey,
createMinuteKey
} = require('./StatsServer')
/**
* Domains we want to analyze.
*/
const DomainNames = [
'unpkg.com',
'npmcdn.com'
]
const oneSecond = 1000
const oneMinute = oneSecond * 60
const oneHour = oneMinute * 60
const oneDay = oneHour * 24
function getSeconds(date) {
return Math.floor(date.getTime() / 1000)
}
function reduceResults(memo, results) {
Object.keys(results).forEach(function (key) {
const value = results[key]
if (typeof value === 'object' && value) {
memo[key] = reduceResults(memo[key] || {}, value)
} else if (typeof value === 'number') {
memo[key] = (memo[key] || 0) + results[key]
}
})
return memo
}
function ingestStatsForZones(zones, since, processDashboard) {
return new Promise(function (resolve) {
const zoneNames = zones.map(function (zone) {
return zone.name
}).join(', ')
console.log(
'info: Started ingesting stats for zones %s since %d',
zoneNames,
since
)
const startFetchTime = Date.now()
resolve(
Promise.all(
zones.map(function (zone) {
return cf.getZoneAnalyticsDashboard(zone.id, since)
})
).then(function (results) {
const endFetchTime = Date.now()
console.log(
'info: Fetched zone analytics dashboards for %s since %d in %dms',
zoneNames,
since,
endFetchTime - startFetchTime
)
// We don't have per-minute dashboards available for npmcdn.com yet,
// so the dashboard for that domain will be null when querying for
// per-minute data. Just filter it out here for now.
results = results.filter(Boolean)
return results.length ? results.reduce(reduceResults) : null
}).then(function (dashboard) {
if (dashboard == null) {
console.warn(
'warning: Missing dashboards for %s since %d',
zoneNames,
since
)
return
}
const startProcessTime = Date.now()
return processDashboard(dashboard).then(function () {
const endProcessTime = Date.now()
console.log(
'info: Processed zone analytics dashboards for %s since %d in %dms',
zoneNames,
since,
endProcessTime - startProcessTime
)
})
})
)
})
}
function ingestPerDayStats(zones) {
return ingestStatsForZones(zones, -10080, processPerDayDashboard)
}
function processPerDayDashboard(dashboard) {
return Promise.all(dashboard.timeseries.map(processPerDayTimeseries))
}
function errorCount(httpStatus) {
return Object.keys(httpStatus).reduce(function (memo, status) {
return parseInt(status, 10) >= 500 ? memo + httpStatus[status] : memo
}, 0)
}
function processPerDayTimeseries(ts) {
return new Promise(function (resolve) {
const since = new Date(ts.since)
const until = new Date(ts.until)
invariant(
since.getUTCHours() === 0 && since.getUTCMinutes() === 0 && since.getUTCSeconds() === 0,
'Per-day timeseries.since must begin exactly on the day'
)
invariant(
(until - since) === oneDay,
'Per-day timeseries must span exactly one day'
)
const nextDay = startOfDay(addDays(until, 1))
const oneYearLater = getSeconds(addDays(nextDay, 365))
const dayKey = createDayKey(since)
// Q: How many requests do we serve per day?
db.set(`stats-requests-${dayKey}`, ts.requests.all)
db.expireat(`stats-requests-${dayKey}`, oneYearLater)
// Q: How many requests do we serve per day from the cache?
db.set(`stats-cachedRequests-${dayKey}`, ts.requests.cached)
db.expireat(`stats-cachedRequests-${dayKey}`, oneYearLater)
// Q: How much bandwidth do we serve per day?
db.set(`stats-bandwidth-${dayKey}`, ts.bandwidth.all)
db.expireat(`stats-bandwidth-${dayKey}`, oneYearLater)
// Q: How much bandwidth do we serve per day from the cache?
db.set(`stats-cachedBandwidth-${dayKey}`, ts.bandwidth.cached)
db.expireat(`stats-cachedBandwidth-${dayKey}`, oneYearLater)
// Q: How many errors do we serve per day?
db.set(`stats-errors-${dayKey}`, errorCount(ts.requests.http_status))
db.expireat(`stats-errors-${dayKey}`, oneYearLater)
// Q: How many threats do we see each day?
db.set(`stats-threats-${dayKey}`, ts.threats.all)
db.expireat(`stats-threats-${dayKey}`, oneYearLater)
// Q: How many unique visitors do we see each day?
db.set(`stats-uniques-${dayKey}`, ts.uniques.all)
db.expireat(`stats-uniques-${dayKey}`, oneYearLater)
const requestsByCountry = []
const bandwidthByCountry = []
Object.keys(ts.requests.country).forEach(function (country) {
const requests = ts.requests.country[country]
const bandwidth = ts.bandwidth.country[country]
// Include only countries who made at least 100K requests.
if (requests > 100000) {
requestsByCountry.push(requests, country)
bandwidthByCountry.push(bandwidth, country)
}
})
const thirtyDaysLater = getSeconds(addDays(nextDay, 30))
// Q: How many requests do we serve to a country per day?
if (requestsByCountry.length) {
db.zadd([ `stats-countryRequests-${dayKey}`, ...requestsByCountry ])
db.expireat(`stats-countryRequests-${dayKey}`, thirtyDaysLater)
}
// Q: How much bandwidth do we serve to a country per day?
if (bandwidthByCountry.length) {
db.zadd([ `stats-countryBandwidth-${dayKey}`, ...bandwidthByCountry ])
db.expireat(`stats-countryBandwidth-${dayKey}`, thirtyDaysLater)
}
resolve()
})
}
function ingestPerHourStats(zones) {
return ingestStatsForZones(zones, -1440, processPerHourDashboard)
}
function processPerHourDashboard(dashboard) {
return Promise.all(dashboard.timeseries.map(processPerHourTimeseries))
}
function processPerHourTimeseries(ts) {
return new Promise(function (resolve) {
const since = new Date(ts.since)
const until = new Date(ts.until)
invariant(
since.getUTCMinutes() === 0 && since.getUTCSeconds() === 0,
'Per-hour timeseries.since must begin exactly on the hour'
)
invariant(
(until - since) === oneHour,
'Per-hour timeseries must span exactly one hour'
)
const nextDay = startOfDay(addDays(until, 1))
const sevenDaysLater = getSeconds(addDays(nextDay, 7))
const hourKey = createHourKey(since)
// Q: How many requests do we serve per hour?
db.set(`stats-requests-${hourKey}`, ts.requests.all)
db.expireat(`stats-requests-${hourKey}`, sevenDaysLater)
// Q: How many requests do we serve per hour from the cache?
db.set(`stats-cachedRequests-${hourKey}`, ts.requests.cached)
db.expireat(`stats-cachedRequests-${hourKey}`, sevenDaysLater)
// Q: How much bandwidth do we serve per hour?
db.set(`stats-bandwidth-${hourKey}`, ts.bandwidth.all)
db.expireat(`stats-bandwidth-${hourKey}`, sevenDaysLater)
// Q: How much bandwidth do we serve per hour from the cache?
db.set(`stats-cachedBandwidth-${hourKey}`, ts.bandwidth.cached)
db.expireat(`stats-cachedBandwidth-${hourKey}`, sevenDaysLater)
// Q: How many errors do we serve per hour?
db.set(`stats-errors-${hourKey}`, errorCount(ts.requests.http_status))
db.expireat(`stats-errors-${hourKey}`, sevenDaysLater)
// Q: How many threats do we see each hour?
db.set(`stats-threats-${hourKey}`, ts.threats.all)
db.expireat(`stats-threats-${hourKey}`, sevenDaysLater)
// Q: How many unique visitors do we see each hour?
db.set(`stats-uniques-${hourKey}`, ts.uniques.all)
db.expireat(`stats-uniques-${hourKey}`, sevenDaysLater)
resolve()
})
}
function ingestPerMinuteStats(zones) {
return ingestStatsForZones(zones, -30, processPerMinuteDashboard)
}
function processPerMinuteDashboard(dashboard) {
return Promise.all(dashboard.timeseries.map(processPerMinuteTimeseries))
}
function processPerMinuteTimeseries(ts) {
return new Promise(function (resolve) {
const since = new Date(ts.since)
const until = new Date(ts.until)
invariant(
since.getUTCSeconds() === 0,
'Per-minute timeseries.since must begin exactly on the minute'
)
invariant(
(until - since) === oneMinute,
'Per-minute timeseries must span exactly one minute'
)
const nextDay = startOfDay(addDays(until, 1))
const oneDayLater = getSeconds(addDays(nextDay, 1))
const minuteKey = createMinuteKey(since)
// Q: How many requests do we serve per minute?
db.set(`stats-requests-${minuteKey}`, ts.requests.all)
db.expireat(`stats-requests-${minuteKey}`, oneDayLater)
// Q: How many requests do we serve per minute from the cache?
db.set(`stats-cachedRequests-${minuteKey}`, ts.requests.cached)
db.expireat(`stats-cachedRequests-${minuteKey}`, oneDayLater)
// Q: How much bandwidth do we serve per minute?
db.set(`stats-bandwidth-${minuteKey}`, ts.bandwidth.all)
db.expireat(`stats-bandwidth-${minuteKey}`, oneDayLater)
// Q: How much bandwidth do we serve per minute from the cache?
db.set(`stats-cachedBandwidth-${minuteKey}`, ts.bandwidth.cached)
db.expireat(`stats-cachedBandwidth-${minuteKey}`, oneDayLater)
// Q: How many errors do we serve per hour?
db.set(`stats-errors-${minuteKey}`, errorCount(ts.requests.http_status))
db.expireat(`stats-errors-${minuteKey}`, oneDayLater)
// Q: How many threats do we see each minute?
db.set(`stats-threats-${minuteKey}`, ts.threats.all)
db.expireat(`stats-threats-${minuteKey}`, oneDayLater)
// Q: How many unique visitors do we see each minute?
db.set(`stats-uniques-${minuteKey}`, ts.uniques.all)
db.expireat(`stats-uniques-${minuteKey}`, oneDayLater)
resolve()
})
}
function startZones(zones) {
function takePerMinuteTurn() {
ingestPerMinuteStats(zones)
}
function takePerHourTurn() {
ingestPerHourStats(zones)
}
function takePerDayTurn() {
ingestPerDayStats(zones)
}
takePerMinuteTurn()
takePerHourTurn()
takePerDayTurn()
setInterval(takePerMinuteTurn, oneMinute)
setInterval(takePerHourTurn, oneHour / 2)
setInterval(takePerDayTurn, oneHour / 2)
}
Promise.all(DomainNames.map(cf.getZones)).then(function (results) {
const zones = results.reduce(function (memo, zones) {
return memo.concat(zones)
})
startZones(zones)
})

View File

@ -1,25 +1,16 @@
const cf = require('./CloudflareAPI')
const db = require('./RedisClient')
function sumValues(array) {
return array.reduce(function (memo, n) {
return memo + (parseInt(n, 10) || 0)
}, 0)
function createDayKey(date) {
return `${date.getUTCFullYear()}-${date.getUTCMonth()}-${date.getUTCDate()}`
}
function getKeyValues(keys) {
return new Promise(function (resolve, reject) {
db.mget(keys, function (error, values) {
if (error) {
reject(error)
} else {
resolve(values)
}
})
})
function createHourKey(date) {
return `${createDayKey(date)}-${date.getUTCHours()}`
}
function sumKeys(keys) {
return getKeyValues(keys).then(sumValues)
function createMinuteKey(date) {
return `${createHourKey(date)}-${date.getUTCMinutes()}`
}
function createScoresMap(array) {
@ -31,7 +22,7 @@ function createScoresMap(array) {
return map
}
function getScoresMap(key, n = 10) {
function getScoresMap(key, n = 100) {
return new Promise(function (resolve, reject) {
db.zrevrange(key, 0, n, 'withscores', function (error, value) {
if (error) {
@ -43,16 +34,31 @@ function getScoresMap(key, n = 10) {
})
}
function createTopScores(map) {
return Object.keys(map).reduce(function (memo, key) {
return memo.concat([ [ key, map[key] ] ])
}, []).sort(function (a, b) {
return b[1] - a[1]
})
function getPackageRequests(date, n = 100) {
return getScoresMap(`stats-packageRequests-${createDayKey(date)}`, n)
}
function getTopScores(key, n) {
return getScoresMap(key, n).then(createTopScores)
function getPackageBandwidth(date, n = 100) {
return getScoresMap(`stats-packageBytes-${createDayKey(date)}`, n)
}
function getProtocolRequests(date) {
return getScoresMap(`stats-protocolRequests-${createDayKey(date)}`)
}
function addDailyMetricsToTimeseries(timeseries) {
const since = new Date(timeseries.since)
return Promise.all([
getPackageRequests(since),
getPackageBandwidth(since),
getProtocolRequests(since)
]).then(function (results) {
timeseries.requests.package = results[0]
timeseries.bandwidth.package = results[1]
timeseries.requests.protocol = results[2]
return timeseries
})
}
function sumMaps(maps) {
@ -65,36 +71,95 @@ function sumMaps(maps) {
}, {})
}
function sumTopScores(keys, n) {
function addDailyMetrics(result) {
return Promise.all(
keys.map(function (key) {
return getScoresMap(key, n)
result.timeseries.map(addDailyMetricsToTimeseries)
).then(function () {
result.totals.requests.package = sumMaps(
result.timeseries.map(function (timeseries) {
return timeseries.requests.package
})
)
result.totals.bandwidth.package = sumMaps(
result.timeseries.map(function (timeseries) {
return timeseries.bandwidth.package
})
)
result.totals.requests.protocol = sumMaps(
result.timeseries.map(function (timeseries) {
return timeseries.requests.protocol
})
)
return result
})
}
function extractPublicInfo(data) {
return {
since: data.since,
until: data.until,
requests: {
all: data.requests.all,
cached: data.requests.cached,
country: data.requests.country,
status: data.requests.http_status
},
bandwidth: {
all: data.bandwidth.all,
cached: data.bandwidth.cached,
country: data.bandwidth.country
},
threats: {
all: data.threats.all,
country: data.threats.country
},
uniques: {
all: data.uniques.all
}
}
}
const DomainNames = [
'unpkg.com',
'npmcdn.com'
]
function fetchStats(since, until) {
return cf.getZones(DomainNames).then(function (zones) {
return cf.getZoneAnalyticsDashboard(zones, since, until).then(function (dashboard) {
return {
timeseries: dashboard.timeseries.map(extractPublicInfo),
totals: extractPublicInfo(dashboard.totals)
}
})
).then(sumMaps).then(createTopScores)
})
}
function createKey(...args) {
return args.join('-')
}
const oneMinute = 1000 * 60
const oneHour = oneMinute * 60
const oneDay = oneHour * 24
function createDayKey(date) {
return createKey(date.getUTCFullYear(), date.getUTCMonth(), date.getUTCDate())
}
function getStats(since, until, callback) {
let promise = fetchStats(since, until)
function createHourKey(date) {
return createKey(createDayKey(date), date.getUTCHours())
}
if ((until - since) > oneDay)
promise = promise.then(addDailyMetrics)
function createMinuteKey(date) {
return createKey(createHourKey(date), date.getUTCMinutes())
promise.then(function (value) {
callback(null, value)
}, callback)
}
module.exports = {
getKeyValues,
sumKeys,
getTopScores,
sumTopScores,
createDayKey,
createHourKey,
createMinuteKey
createMinuteKey,
getStats
}

View File

@ -1,93 +0,0 @@
require('isomorphic-fetch')
const { createStack, createFetch, header, base, query, parseJSON, onResponse } = require('http-client')
const invariant = require('invariant')
const CloudflareKey = process.env.CLOUDFLARE_KEY
const CloudflareEmail = process.env.CLOUDFLARE_EMAIL
invariant(
CloudflareKey,
'Missing $CLOUDFLARE_KEY environment variable'
)
invariant(
CloudflareEmail,
'Missing $CLOUDFLARE_EMAIL environment variable'
)
const createRangeQuery = (since, until) =>
query({
since: since.toISOString(),
until: until.toISOString()
})
const createNameQuery = (name) =>
query({ name })
const getResult = () =>
createStack(
parseJSON(),
onResponse(response => response.jsonData.result)
)
const commonStack = createStack(
header('X-Auth-Key', CloudflareKey),
header('X-Auth-Email', CloudflareEmail),
base('https://api.cloudflare.com/client/v4'),
getResult()
)
const getZones = (domainName) =>
createFetch(
commonStack,
createNameQuery(domainName)
)('/zones')
const getZoneAnalyticsDashboard = (zone, since, until) =>
createFetch(
commonStack,
createRangeQuery(since, until)
)(`/zones/${zone.id}/analytics/dashboard`)
const getAnalyticsDashboards = (domainNames, since, until) =>
Promise.all(
domainNames.map(domainName => getZones(domainName))
).then(
domainZones => domainZones.reduce((memo, zones) => memo.concat(zones))
).then(
zones => Promise.all(zones.map(zone => getZoneAnalyticsDashboard(zone, since, until)))
).then(
results => results.reduce(reduceResults)
)
const reduceResults = (target, results) => {
Object.keys(results).forEach(key => {
const value = results[key]
if (typeof value === 'object' && value) {
target[key] = reduceResults(target[key] || {}, value)
} else if (typeof value === 'number') {
target[key] = (target[key] || 0) + results[key]
}
})
return target
}
const OneMinute = 1000 * 60
const ThirtyDays = OneMinute * 60 * 24 * 30
const fetchStats = (callback) => {
const since = new Date(Date.now() - ThirtyDays)
const until = new Date(Date.now() - OneMinute)
getAnalyticsDashboards([ 'npmcdn.com', 'unpkg.com' ], since, until)
.then(result => callback(null, result), callback)
}
module.exports = {
getZones,
getZoneAnalyticsDashboard,
getAnalyticsDashboards,
fetchStats
}

View File

@ -4,46 +4,20 @@ const express = require('express')
const cors = require('cors')
const morgan = require('morgan')
const { fetchStats } = require('./cloudflare')
const checkBlacklist = require('./middleware/checkBlacklist')
const packageURL = require('./middleware/packageURL')
const fetchFile = require('./middleware/fetchFile')
const serveFile = require('./middleware/serveFile')
/**
* A list of packages we refuse to serve.
*/
const PackageBlacklist = require('./PackageBlacklist').blacklist
const serveStats = require('./middleware/serveStats')
morgan.token('fwd', function (req) {
return req.get('x-forwarded-for').replace(/\s/g, '')
})
function sendHomePage(publicDir) {
const html = fs.readFileSync(path.join(publicDir, 'index.html'), 'utf8')
return function (req, res, next) {
fetchStats(function (error, stats) {
if (error) {
next(error)
} else {
res.set({
'Cache-Control': 'public, max-age=60',
'Cache-Tag': 'home'
})
res.send(
// Replace the __SERVER_DATA__ token that was added to the
// HTML file in the build process (see scripts/build.js).
html.replace('__SERVER_DATA__', JSON.stringify({
cloudflareStats: stats
}))
)
}
})
}
}
/**
* A list of packages we refuse to serve.
*/
const PackageBlacklist = require('./PackageBlacklist').blacklist
function errorHandler(err, req, res, next) {
console.error(err.stack)
@ -68,12 +42,12 @@ function createApp() {
app.use(errorHandler)
app.use(cors())
app.get('/', sendHomePage('build'))
app.use(express.static('build', {
maxAge: '365d'
}))
app.use('/_stats', serveStats())
app.use('/',
packageURL,
checkBlacklist(PackageBlacklist),

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,77 @@
const express = require('express')
const subDays = require('date-fns/sub_days')
const startOfDay = require('date-fns/start_of_day')
const startOfSecond = require('date-fns/start_of_second')
const StatsServer = require('../StatsServer')
function serveArbitraryStats(req, res) {
const now = startOfSecond(new Date)
const since = req.query.since ? new Date(req.query.since) : subDays(now, 30)
const until = req.query.until ? new Date(req.query.until) : now
if (isNaN(since.getTime()))
return res.status(403).send({ error: '?since is not a valid date' })
if (isNaN(until.getTime()))
return res.status(403).send({ error: '?until is not a valid date' })
if (until <= since)
return res.status(403).send({ error: '?until date must come after ?since date' })
if (until > now)
return res.status(403).send({ error: '?until must be a date in the past' })
StatsServer.getStats(since, until, function (error, stats) {
if (error) {
console.error(error)
res.status(500).send({ error: 'Unable to fetch stats' })
} else {
res.set({
'Cache-Control': 'public, max-age=60',
'Cache-Tag': 'stats'
}).send(stats)
}
})
}
function servePastDaysStats(days, req, res) {
const until = startOfDay(new Date)
const since = subDays(until, days)
StatsServer.getStats(since, until, function (error, stats) {
if (error) {
console.error(error)
res.status(500).send({ error: 'Unable to fetch stats' })
} else {
res.set({
'Cache-Control': 'public, max-age=86400',
'Cache-Tag': 'stats'
}).send(stats)
}
})
}
function serveLastMonthStats(req, res) {
servePastDaysStats(30, req, res)
}
function serveLastWeekStats(req, res) {
servePastDaysStats(7, req, res)
}
function serveLastDayStats(req, res) {
servePastDaysStats(1, req, res)
}
function serveStats() {
const app = express.Router()
app.get('/', serveArbitraryStats)
app.get('/last-month', serveLastMonthStats)
app.get('/last-week', serveLastWeekStats)
app.get('/last-day', serveLastDayStats)
return app
}
module.exports = serveStats