diff --git a/server/IngestLogsWorker.js b/server/IngestLogsWorker.js index 0630e85..02eb514 100644 --- a/server/IngestLogsWorker.js +++ b/server/IngestLogsWorker.js @@ -4,6 +4,12 @@ const invariant = require('invariant') const gunzip = require('gunzip-maybe') const ndjson = require('ndjson') const redis = require('redis') +const startOfDay = require('date-fns/start_of_day') +const addDays = require('date-fns/add_days') +const { + createDayKey, + createHourKey +} = require('./StatsServer') const CloudflareEmail = process.env.CLOUDFLARE_EMAIL const CloudflareKey = process.env.CLOUDFLARE_KEY @@ -77,9 +83,13 @@ const oneSecond = 1000 const oneMinute = oneSecond * 60 const oneHour = oneMinute * 60 +const getSeconds = (date) => + Math.floor(date.getTime() / 1000) + const computeCounters = (stream) => new Promise((resolve, reject) => { const counters = {} + const expireat = {} const incrCounter = (counterName, by = 1) => counters[counterName] = (counters[counterName] || 0) + by @@ -94,25 +104,31 @@ const computeCounters = (stream) => .on('error', reject) .on('data', entry => { const date = new Date(Math.round(entry.timestamp / 1000000)) - const dayKey = `${date.getUTCFullYear()}-${date.getUTCMonth()}-${date.getUTCDate()}` - const hourKey = `${dayKey}-${date.getUTCHours()}` - // const minuteKey = `${hourKey}-${date.getUTCMinutes()}` + const nextDay = startOfDay(addDays(date, 1)) + const thirtyDaysLater = getSeconds(addDays(nextDay, 30)) - // Q: How many requests do we receive per day/hour/minute? - // incrCounter(`stats-requests-${dayKey}`) // Done by ingest_stats worker - // incrCounter(`stats-requests-${hourKey}`) // Done by ingest_stats worker - // incrCounter(`stats-requests-${minuteKey}`) // Done by ingest_stats worker + const dayKey = createDayKey(date) + const hourKey = createHourKey(date) // Q: How many requests are served by origin/cache/edge per day/hour? if (entry.origin) { incrCounter(`stats-originRequests-${dayKey}`) + expireat[`stats-originRequests-${dayKey}`] = thirtyDaysLater + incrCounter(`stats-originRequests-${hourKey}`) + expireat[`stats-originRequests-${hourKey}`] = thirtyDaysLater } else if (entry.cache) { incrCounter(`stats-cacheRequests-${dayKey}`) + expireat[`stats-cacheRequests-${dayKey}`] = thirtyDaysLater + incrCounter(`stats-cacheRequests-${hourKey}`) + expireat[`stats-cacheRequests-${hourKey}`] = thirtyDaysLater } else { incrCounter(`stats-edgeRequests-${dayKey}`) + expireat[`stats-edgeRequests-${dayKey}`] = thirtyDaysLater + incrCounter(`stats-edgeRequests-${hourKey}`) + expireat[`stats-edgeRequests-${hourKey}`] = thirtyDaysLater } const clientRequest = entry.clientRequest @@ -125,14 +141,19 @@ const computeCounters = (stream) => if (package) { incrCounterMember(`stats-packageRequests-${dayKey}`, package) + expireat[`stats-packageRequests-${dayKey}`] = thirtyDaysLater + incrCounterMember(`stats-packageBytes-${dayKey}`, package, edgeResponse.bytes) + expireat[`stats-packageBytes-${dayKey}`] = thirtyDaysLater } // Q: How many requests per day do we receive via each protocol? const protocol = clientRequest.httpProtocol - if (protocol) + if (protocol) { incrCounterMember(`stats-protocolRequests-${dayKey}`, protocol) + expireat[`stats-protocolRequests-${dayKey}`] = thirtyDaysLater + } // Q: How many requests do we receive from a hostname per day? // Q: How many bytes do we serve to a hostname per day? @@ -141,16 +162,19 @@ const computeCounters = (stream) => if (hostname) { incrCounterMember(`stats-hostnameRequests-${dayKey}`, hostname) + expireat[`stats-hostnameRequests-${dayKey}`] = thirtyDaysLater + incrCounterMember(`stats-hostnameBytes-${dayKey}`, hostname, edgeResponse.bytes) + expireat[`stats-hostnameBytes-${dayKey}`] = thirtyDaysLater } }) .on('end', () => { - resolve(counters) + resolve({ counters, expireat }) }) }) const processLogs = (stream) => - computeCounters(stream).then(counters => { + computeCounters(stream).then(({ counters, expireat }) => { Object.keys(counters).forEach(key => { const value = counters[key] @@ -163,6 +187,9 @@ const processLogs = (stream) => db.zincrby(key, value[member], member) }) } + + if (expireat[key]) + db.expireat(key, expireat[key]) }) }) diff --git a/server/IngestStatsWorker.js b/server/IngestStatsWorker.js index 988e368..a353d63 100644 --- a/server/IngestStatsWorker.js +++ b/server/IngestStatsWorker.js @@ -1,11 +1,12 @@ require('isomorphic-fetch') const redis = require('redis') +const addDays = require('date-fns/add_days') const invariant = require('invariant') const { createDayKey, createHourKey, createMinuteKey -} = require('../server/StatsServer') +} = require('./StatsServer') const CloudflareEmail = process.env.CLOUDFLARE_EMAIL const CloudflareKey = process.env.CLOUDFLARE_KEY @@ -65,6 +66,9 @@ const oneMinuteSeconds = 60 const oneHourSeconds = oneMinuteSeconds * 60 const oneDaySeconds = oneHourSeconds * 24 +const getSeconds = (date) => + Math.floor(date.getTime() / 1000) + const reduceResults = (memo, results) => { Object.keys(results).forEach(key => { const value = results[key] @@ -163,27 +167,34 @@ const processPerDayTimeseries = (ts) => ) const dayKey = createDayKey(since) + const oneYearLater = getSeconds(addDays(until, 365)) + const thirtyDaysLater = getSeconds(addDays(until, 30)) // Q: How many requests do we serve per day? db.set(`stats-requests-${dayKey}`, ts.requests.all) + db.expireat(`stats-requests-${dayKey}`, oneYearLater) + // Q: How many requests do we serve per day from the cache? db.set(`stats-requestsFromCache-${dayKey}`, ts.requests.cached) + db.expireat(`stats-requestsFromCache-${dayKey}`, oneYearLater) // Q: How much bandwidth do we serve per day? db.set(`stats-bandwidth-${dayKey}`, ts.bandwidth.all) + db.expireat(`stats-bandwidth-${dayKey}`, oneYearLater) + // Q: How much bandwidth do we serve per day from the cache? db.set(`stats-bandwidthFromCache-${dayKey}`, ts.bandwidth.cached) + db.expireat(`stats-bandwidthFromCache-${dayKey}`, oneYearLater) - // Q: How many errors do we serve per day? const httpStatus = ts.requests.http_status const errors = Object.keys(httpStatus).reduce((memo, status) => { return parseInt(status, 10) >= 500 ? memo + httpStatus[status] : memo }, 0) + // Q: How many errors do we serve per day? db.set(`stats-errors-${dayKey}`, errors) + db.expireat(`stats-errors-${dayKey}`, oneYearLater) - // Q: How many requests do we serve to a country per day? - // Q: How much bandwidth do we serve to a country per day? const requestsByCountry = [] const bandwidthByCountry = [] @@ -191,18 +202,24 @@ const processPerDayTimeseries = (ts) => const requests = ts.requests.country[country] const bandwidth = ts.bandwidth.country[country] - // Include only countries who made at least 1M requests. - if (requests > 1000000) { + // Include only countries who made at least 100K requests. + if (requests > 100000) { requestsByCountry.push(requests, country) bandwidthByCountry.push(bandwidth, country) } }) - if (requestsByCountry.length) + // Q: How many requests do we serve to a country per day? + if (requestsByCountry.length) { db.zadd([ `stats-requestsByCountry-${dayKey}`, ...requestsByCountry ]) + db.expireat(`stats-requestsByCountry-${dayKey}`, thirtyDaysLater) + } - if (bandwidthByCountry.length) + // Q: How much bandwidth do we serve to a country per day? + if (bandwidthByCountry.length) { db.zadd([ `stats-bandwidthByCountry-${dayKey}`, ...bandwidthByCountry ]) + db.expireat(`stats-bandwidthByCountry-${dayKey}`, thirtyDaysLater) + } resolve() }) @@ -230,14 +247,16 @@ const processPerHourTimeseries = (ts) => const hourKey = createHourKey(since) - // Q: How many requests do we serve per hour? (expire after 7 days) + // Q: How many requests do we serve per hour? db.setex(`stats-requests-${hourKey}`, (oneDaySeconds * 7), ts.requests.all) - // Q: How many requests do we serve per hour from the cache? (expire after 7 days) + + // Q: How many requests do we serve per hour from the cache? db.setex(`stats-requestsFromCache-${hourKey}`, (oneDaySeconds * 7), ts.requests.cached) - // Q: How much bandwidth do we serve per hour? (expire after 7 days) + // Q: How much bandwidth do we serve per hour? db.setex(`stats-bandwidth-${hourKey}`, (oneDaySeconds * 7), ts.bandwidth.all) - // Q: How much bandwidth do we serve per hour from the cache? (expire after 7 days) + + // Q: How much bandwidth do we serve per hour from the cache? db.setex(`stats-bandwidthFromCache-${hourKey}`, (oneDaySeconds * 7), ts.bandwidth.cached) resolve() @@ -266,14 +285,16 @@ const processPerMinuteTimeseries = (ts) => const minuteKey = createMinuteKey(since) - // Q: How many requests do we serve per minute? (expire after 1 day) + // Q: How many requests do we serve per minute? db.setex(`stats-requests-${minuteKey}`, oneDaySeconds, ts.requests.all) - // Q: How many requests do we serve per minute from the cache? (expire after 1 day) + + // Q: How many requests do we serve per minute from the cache? db.setex(`stats-requestsFromCache-${minuteKey}`, oneDaySeconds, ts.requests.cached) - // Q: How much bandwidth do we serve per minute? (expire after 1 day) + // Q: How much bandwidth do we serve per minute? db.setex(`stats-bandwidth-${minuteKey}`, oneDaySeconds, ts.bandwidth.all) - // Q: How much bandwidth do we serve per minute from the cache? (expire after 1 day) + + // Q: How much bandwidth do we serve per minute from the cache? db.setex(`stats-bandwidthFromCache-${minuteKey}`, oneDaySeconds, ts.bandwidth.cached) resolve() diff --git a/server/StatsServer.js b/server/StatsServer.js index 3b5ee02..e00945c 100644 --- a/server/StatsServer.js +++ b/server/StatsServer.js @@ -58,14 +58,16 @@ const sumTopScores = (keys, n) => }, {}) }) +const createKey = (...args) => args.join('-') + const createDayKey = (date) => - `${date.getUTCFullYear()}-${date.getUTCMonth()}-${date.getUTCDate()}` + createKey(date.getUTCFullYear(), date.getUTCMonth(), date.getUTCDate()) const createHourKey = (date) => - `${createDayKey(date)}-${date.getUTCHours()}` + createKey(createDayKey(date), date.getUTCHours()) const createMinuteKey = (date) => - `${createDayKey(date)}-${date.getUTCMinutes()}` + createKey(createHourKey(date), date.getUTCMinutes()) module.exports = { getKeyValues,