From 8fb2c7810ebed2b734a593096474555f365291f6 Mon Sep 17 00:00:00 2001 From: MICHAEL JACKSON Date: Thu, 17 Aug 2017 11:24:40 -0700 Subject: [PATCH] Improve stats workers - More accurate expiration times for stats - Reduce duplication between stats and logs worker - Expire hostname stats after one week --- server/CloudflareAPI.js | 30 ++++++++- server/IngestLogsWorker.js | 119 ++++++++++-------------------------- server/IngestStatsWorker.js | 87 +++++++++++++------------- 3 files changed, 105 insertions(+), 131 deletions(-) diff --git a/server/CloudflareAPI.js b/server/CloudflareAPI.js index 0b33d98..abbb4f7 100644 --- a/server/CloudflareAPI.js +++ b/server/CloudflareAPI.js @@ -1,5 +1,7 @@ require('isomorphic-fetch') const invariant = require('invariant') +const gunzip = require('gunzip-maybe') +const ndjson = require('ndjson') const CloudflareAPIURL = 'https://api.cloudflare.com' const CloudflareEmail = process.env.CLOUDFLARE_EMAIL @@ -32,7 +34,33 @@ function getJSON(path, headers) { }) } +function getZones(domain) { + return getJSON(`/zones?name=${domain}`) +} + +function getZoneAnalyticsDashboard(zoneId, since) { + return getJSON(`/zones/${zoneId}/analytics/dashboard?since=${since}&continuous=true`) +} + +function getJSONStream(path, headers) { + const acceptGzipHeaders = Object.assign({}, headers, { 'Accept-Encoding': 'gzip' }) + + return get(path, acceptGzipHeaders).then(function (res) { + return res.body.pipe(gunzip()) + }).then(function (stream) { + return stream.pipe(ndjson.parse()) + }) +} + +function getLogs(zoneId, startTime, endTime) { + return getJSONStream(`/zones/${zoneId}/logs/requests?start=${startTime}&end=${endTime}`) +} + module.exports = { get, - getJSON + getJSON, + getZones, + getZoneAnalyticsDashboard, + getJSONStream, + getLogs } diff --git a/server/IngestLogsWorker.js b/server/IngestLogsWorker.js index 98a40ba..c4aa37e 100644 --- a/server/IngestLogsWorker.js +++ b/server/IngestLogsWorker.js @@ -1,9 +1,7 @@ const parseURL = require('url').parse -const invariant = require('invariant') -const gunzip = require('gunzip-maybe') -const ndjson = require('ndjson') const startOfDay = require('date-fns/start_of_day') const addDays = require('date-fns/add_days') +const validateNPMPackageName = require('validate-npm-package-name') const PackageURL = require('./PackageURL') const cf = require('./CloudflareAPI') const db = require('./RedisClient') @@ -25,109 +23,63 @@ const DomainNames = [ */ const LogWindowSeconds = 30 -function getZones(domain) { - return cf.getJSON(`/zones?name=${domain}`) -} - -function getLogs(zoneId, startTime, endTime) { - return cf.get( - `/zones/${zoneId}/logs/requests?start=${startTime}&end=${endTime}`, - { 'Accept-Encoding': 'gzip' } - ).then(function (res) { - return res.body.pipe(gunzip()) - }) -} - -function toSeconds(millis) { - return Math.floor(millis / 1000) +function getSeconds(date) { + return Math.floor(date.getTime() / 1000) } function stringifySeconds(seconds) { return new Date(seconds * 1000).toISOString() } -function getPackageName(pathname) { - const parsed = PackageURL.parse(pathname) - return parsed && parsed.packageName +function toSeconds(millis) { + return Math.floor(millis / 1000) } const oneSecond = 1000 const oneMinute = oneSecond * 60 const oneHour = oneMinute * 60 -function getSeconds(date) { - return Math.floor(date.getTime() / 1000) -} - function computeCounters(stream) { return new Promise(function (resolve, reject) { const counters = {} const expireat = {} - function incrCounter(counterName, by = 1) { - counters[counterName] = (counters[counterName] || 0) + by - } - - function incrCounterMember(counterName, member, by = 1) { - counters[counterName] = counters[counterName] || {} - counters[counterName][member] = (counters[counterName][member] || 0) + by + function incr(key, member, by, expiry) { + counters[key] = counters[key] || {} + counters[key][member] = (counters[key][member] || 0) + by + expireat[key] = expiry } stream - .pipe(ndjson.parse()) .on('error', reject) .on('data', function (entry) { const date = new Date(Math.round(entry.timestamp / 1000000)) + const nextDay = startOfDay(addDays(date, 1)) + const sevenDaysLater = getSeconds(addDays(nextDay, 7)) const thirtyDaysLater = getSeconds(addDays(nextDay, 30)) - const dayKey = createDayKey(date) - const hourKey = createHourKey(date) - - // Q: How many requests are served by origin/cache/edge per day/hour? - if (entry.origin) { - incrCounter(`stats-originRequests-${dayKey}`) - expireat[`stats-originRequests-${dayKey}`] = thirtyDaysLater - - incrCounter(`stats-originRequests-${hourKey}`) - expireat[`stats-originRequests-${hourKey}`] = thirtyDaysLater - } else if (entry.cache) { - incrCounter(`stats-cacheRequests-${dayKey}`) - expireat[`stats-cacheRequests-${dayKey}`] = thirtyDaysLater - - incrCounter(`stats-cacheRequests-${hourKey}`) - expireat[`stats-cacheRequests-${hourKey}`] = thirtyDaysLater - } else { - incrCounter(`stats-edgeRequests-${dayKey}`) - expireat[`stats-edgeRequests-${dayKey}`] = thirtyDaysLater - - incrCounter(`stats-edgeRequests-${hourKey}`) - expireat[`stats-edgeRequests-${hourKey}`] = thirtyDaysLater - } const clientRequest = entry.clientRequest const edgeResponse = entry.edgeResponse - // Q: How many requests do we receive for a package per day? - // Q: How many bytes do we serve for a package per day? - const uri = clientRequest.uri - const package = getPackageName(parseURL(uri).pathname) + if (edgeResponse.status === 200) { + // Q: How many requests do we serve for a package per day? + // Q: How many bytes do we serve for a package per day? + const url = PackageURL.parse(parseURL(clientRequest.uri).pathname) + const packageName = url && url.packageName - if (package) { - incrCounterMember(`stats-packageRequests-${dayKey}`, package) - expireat[`stats-packageRequests-${dayKey}`] = thirtyDaysLater - - incrCounterMember(`stats-packageBytes-${dayKey}`, package, edgeResponse.bytes) - expireat[`stats-packageBytes-${dayKey}`] = thirtyDaysLater + if (packageName && validateNPMPackageName(packageName).errors == null) { + incr(`stats-packageRequests-${dayKey}`, packageName, 1, thirtyDaysLater) + incr(`stats-packageBytes-${dayKey}`, packageName, edgeResponse.bytes, thirtyDaysLater) + } } - // Q: How many requests per day do we receive via each protocol? + // Q: How many requests per day do we receive via a protocol? const protocol = clientRequest.httpProtocol - if (protocol) { - incrCounterMember(`stats-protocolRequests-${dayKey}`, protocol) - expireat[`stats-protocolRequests-${dayKey}`] = thirtyDaysLater - } + if (protocol) + incr(`stats-protocolRequests-${dayKey}`, protocol, 1, thirtyDaysLater) // Q: How many requests do we receive from a hostname per day? // Q: How many bytes do we serve to a hostname per day? @@ -135,11 +87,8 @@ function computeCounters(stream) { const hostname = referer && parseURL(referer).hostname if (hostname) { - incrCounterMember(`stats-hostnameRequests-${dayKey}`, hostname) - expireat[`stats-hostnameRequests-${dayKey}`] = thirtyDaysLater - - incrCounterMember(`stats-hostnameBytes-${dayKey}`, hostname, edgeResponse.bytes) - expireat[`stats-hostnameBytes-${dayKey}`] = thirtyDaysLater + incr(`stats-hostnameRequests-${dayKey}`, hostname, 1, sevenDaysLater) + incr(`stats-hostnameBytes-${dayKey}`, hostname, edgeResponse.bytes, sevenDaysLater) } }) .on('end', function () { @@ -151,17 +100,11 @@ function computeCounters(stream) { function processLogs(stream) { return computeCounters(stream).then(function ({ counters, expireat }) { Object.keys(counters).forEach(function (key) { - const value = counters[key] + const values = counters[key] - if (typeof value === 'number') { - // Simple counter. - db.incrby(key, value) - } else { - // Sorted set. - Object.keys(value).forEach(function (member) { - db.zincrby(key, value[member], member) - }) - } + Object.keys(values).forEach(function (member) { + db.zincrby(key, values[member], member) + }) if (expireat[key]) db.expireat(key, expireat[key]) @@ -181,7 +124,7 @@ function ingestLogs(zone, startSeconds, endSeconds) { const startFetchTime = Date.now() resolve( - getLogs(zone.id, startSeconds, endSeconds).then(function (stream) { + cf.getLogs(zone.id, startSeconds, endSeconds).then(function (stream) { const endFetchTime = Date.now() console.log( @@ -263,7 +206,7 @@ function startZone(zone) { takeATurn() } -Promise.all(DomainNames.map(getZones)).then(function (results) { +Promise.all(DomainNames.map(cf.getZones)).then(function (results) { const zones = results.reduce(function (memo, zones) { return memo.concat(zones) }) diff --git a/server/IngestStatsWorker.js b/server/IngestStatsWorker.js index 8552942..02a9f54 100644 --- a/server/IngestStatsWorker.js +++ b/server/IngestStatsWorker.js @@ -1,5 +1,6 @@ -const addDays = require('date-fns/add_days') const invariant = require('invariant') +const startOfDay = require('date-fns/start_of_day') +const addDays = require('date-fns/add_days') const cf = require('./CloudflareAPI') const db = require('./RedisClient') const { @@ -16,23 +17,11 @@ const DomainNames = [ 'npmcdn.com' ] -function getZones(domain) { - return cf.getJSON(`/zones?name=${domain}`) -} - -function getZoneAnalyticsDashboard(zoneId, since) { - return cf.getJSON(`/zones/${zoneId}/analytics/dashboard?since=${since}&continuous=true`) -} - const oneSecond = 1000 const oneMinute = oneSecond * 60 const oneHour = oneMinute * 60 const oneDay = oneHour * 24 -const oneMinuteSeconds = 60 -const oneHourSeconds = oneMinuteSeconds * 60 -const oneDaySeconds = oneHourSeconds * 24 - function getSeconds(date) { return Math.floor(date.getTime() / 1000) } @@ -68,7 +57,7 @@ function ingestStatsForZones(zones, since, processDashboard) { resolve( Promise.all( zones.map(function (zone) { - return getZoneAnalyticsDashboard(zone.id, since) + return cf.getZoneAnalyticsDashboard(zone.id, since) }) ).then(function (results) { const endFetchTime = Date.now() @@ -129,33 +118,33 @@ function processPerDayTimeseries(ts) { invariant( since.getUTCHours() === 0 && since.getUTCMinutes() === 0 && since.getUTCSeconds() === 0, - 'error: Per-day timeseries.since must begin exactly on the day' + 'Per-day timeseries.since must begin exactly on the day' ) invariant( (until - since) === oneDay, - 'error: Per-day timeseries must span exactly one day' + 'Per-day timeseries must span exactly one day' ) + const nextDay = startOfDay(addDays(until, 1)) + const oneYearLater = getSeconds(addDays(nextDay, 365)) const dayKey = createDayKey(since) - const oneYearLater = getSeconds(addDays(until, 365)) - const thirtyDaysLater = getSeconds(addDays(until, 30)) // Q: How many requests do we serve per day? db.set(`stats-requests-${dayKey}`, ts.requests.all) db.expireat(`stats-requests-${dayKey}`, oneYearLater) // Q: How many requests do we serve per day from the cache? - db.set(`stats-requestsFromCache-${dayKey}`, ts.requests.cached) - db.expireat(`stats-requestsFromCache-${dayKey}`, oneYearLater) + db.set(`stats-cachedRequests-${dayKey}`, ts.requests.cached) + db.expireat(`stats-cachedRequests-${dayKey}`, oneYearLater) // Q: How much bandwidth do we serve per day? db.set(`stats-bandwidth-${dayKey}`, ts.bandwidth.all) db.expireat(`stats-bandwidth-${dayKey}`, oneYearLater) // Q: How much bandwidth do we serve per day from the cache? - db.set(`stats-bandwidthFromCache-${dayKey}`, ts.bandwidth.cached) - db.expireat(`stats-bandwidthFromCache-${dayKey}`, oneYearLater) + db.set(`stats-cachedBandwidth-${dayKey}`, ts.bandwidth.cached) + db.expireat(`stats-cachedBandwidth-${dayKey}`, oneYearLater) const httpStatus = ts.requests.http_status const errors = Object.keys(httpStatus).reduce(function (memo, status) { @@ -180,16 +169,18 @@ function processPerDayTimeseries(ts) { } }) + const thirtyDaysLater = getSeconds(addDays(nextDay, 30)) + // Q: How many requests do we serve to a country per day? if (requestsByCountry.length) { - db.zadd([ `stats-requestsByCountry-${dayKey}`, ...requestsByCountry ]) - db.expireat(`stats-requestsByCountry-${dayKey}`, thirtyDaysLater) + db.zadd([ `stats-countryRequests-${dayKey}`, ...requestsByCountry ]) + db.expireat(`stats-countryRequests-${dayKey}`, thirtyDaysLater) } // Q: How much bandwidth do we serve to a country per day? if (bandwidthByCountry.length) { - db.zadd([ `stats-bandwidthByCountry-${dayKey}`, ...bandwidthByCountry ]) - db.expireat(`stats-bandwidthByCountry-${dayKey}`, thirtyDaysLater) + db.zadd([ `stats-countryBandwidth-${dayKey}`, ...bandwidthByCountry ]) + db.expireat(`stats-countryBandwidth-${dayKey}`, thirtyDaysLater) } resolve() @@ -211,27 +202,33 @@ function processPerHourTimeseries(ts) { invariant( since.getUTCMinutes() === 0 && since.getUTCSeconds() === 0, - 'error: Per-hour timeseries.since must begin exactly on the hour' + 'Per-hour timeseries.since must begin exactly on the hour' ) invariant( (until - since) === oneHour, - 'error: Per-hour timeseries must span exactly one hour' + 'Per-hour timeseries must span exactly one hour' ) + const nextDay = startOfDay(addDays(until, 1)) + const sevenDaysLater = getSeconds(addDays(nextDay, 7)) const hourKey = createHourKey(since) // Q: How many requests do we serve per hour? - db.setex(`stats-requests-${hourKey}`, (oneDaySeconds * 7), ts.requests.all) + db.set(`stats-requests-${hourKey}`, ts.requests.all) + db.expireat(`stats-requests-${hourKey}`, sevenDaysLater) // Q: How many requests do we serve per hour from the cache? - db.setex(`stats-requestsFromCache-${hourKey}`, (oneDaySeconds * 7), ts.requests.cached) + db.set(`stats-cachedRequests-${hourKey}`, ts.requests.cached) + db.expireat(`stats-cachedRequests-${hourKey}`, sevenDaysLater) // Q: How much bandwidth do we serve per hour? - db.setex(`stats-bandwidth-${hourKey}`, (oneDaySeconds * 7), ts.bandwidth.all) + db.set(`stats-bandwidth-${hourKey}`, ts.bandwidth.all) + db.expireat(`stats-bandwidth-${hourKey}`, sevenDaysLater) // Q: How much bandwidth do we serve per hour from the cache? - db.setex(`stats-bandwidthFromCache-${hourKey}`, (oneDaySeconds * 7), ts.bandwidth.cached) + db.set(`stats-cachedBandwidth-${hourKey}`, ts.bandwidth.cached) + db.expireat(`stats-cachedBandwidth-${hourKey}`, sevenDaysLater) resolve() }) @@ -252,27 +249,33 @@ function processPerMinuteTimeseries(ts) { invariant( since.getUTCSeconds() === 0, - 'error: Per-minute timeseries.since must begin exactly on the minute' + 'Per-minute timeseries.since must begin exactly on the minute' ) invariant( (until - since) === oneMinute, - 'error: Per-minute timeseries must span exactly one minute' + 'Per-minute timeseries must span exactly one minute' ) + const nextDay = startOfDay(addDays(until, 1)) + const oneDayLater = getSeconds(addDays(nextDay, 1)) const minuteKey = createMinuteKey(since) // Q: How many requests do we serve per minute? - db.setex(`stats-requests-${minuteKey}`, oneDaySeconds, ts.requests.all) + db.set(`stats-requests-${minuteKey}`, ts.requests.all) + db.expireat(`stats-requests-${minuteKey}`, oneDayLater) // Q: How many requests do we serve per minute from the cache? - db.setex(`stats-requestsFromCache-${minuteKey}`, oneDaySeconds, ts.requests.cached) + db.set(`stats-cachedRequests-${minuteKey}`, ts.requests.cached) + db.expireat(`stats-cachedRequests-${minuteKey}`, oneDayLater) // Q: How much bandwidth do we serve per minute? - db.setex(`stats-bandwidth-${minuteKey}`, oneDaySeconds, ts.bandwidth.all) + db.set(`stats-bandwidth-${minuteKey}`, ts.bandwidth.all) + db.expireat(`stats-bandwidth-${minuteKey}`, oneDayLater) // Q: How much bandwidth do we serve per minute from the cache? - db.setex(`stats-bandwidthFromCache-${minuteKey}`, oneDaySeconds, ts.bandwidth.cached) + db.set(`stats-cachedBandwidth-${minuteKey}`, ts.bandwidth.cached) + db.expireat(`stats-cachedBandwidth-${minuteKey}`, oneDayLater) resolve() }) @@ -280,15 +283,15 @@ function processPerMinuteTimeseries(ts) { function startZones(zones) { function takePerMinuteTurn() { - return ingestPerMinuteStats(zones) + ingestPerMinuteStats(zones) } function takePerHourTurn() { - return ingestPerHourStats(zones) + ingestPerHourStats(zones) } function takePerDayTurn() { - return ingestPerDayStats(zones) + ingestPerDayStats(zones) } takePerMinuteTurn() @@ -300,7 +303,7 @@ function startZones(zones) { setInterval(takePerDayTurn, oneHour / 2) } -Promise.all(DomainNames.map(getZones)).then(function (results) { +Promise.all(DomainNames.map(cf.getZones)).then(function (results) { const zones = results.reduce(function (memo, zones) { return memo.concat(zones) })