Expire all stats keys eventually
This commit is contained in:
		| @ -4,6 +4,12 @@ const invariant = require('invariant') | |||||||
| const gunzip = require('gunzip-maybe') | const gunzip = require('gunzip-maybe') | ||||||
| const ndjson = require('ndjson') | const ndjson = require('ndjson') | ||||||
| const redis = require('redis') | const redis = require('redis') | ||||||
|  | const startOfDay = require('date-fns/start_of_day') | ||||||
|  | const addDays = require('date-fns/add_days') | ||||||
|  | const { | ||||||
|  |   createDayKey, | ||||||
|  |   createHourKey | ||||||
|  | } = require('./StatsServer') | ||||||
|  |  | ||||||
| const CloudflareEmail = process.env.CLOUDFLARE_EMAIL | const CloudflareEmail = process.env.CLOUDFLARE_EMAIL | ||||||
| const CloudflareKey = process.env.CLOUDFLARE_KEY | const CloudflareKey = process.env.CLOUDFLARE_KEY | ||||||
| @ -77,9 +83,13 @@ const oneSecond = 1000 | |||||||
| const oneMinute = oneSecond * 60 | const oneMinute = oneSecond * 60 | ||||||
| const oneHour = oneMinute * 60 | const oneHour = oneMinute * 60 | ||||||
|  |  | ||||||
|  | const getSeconds = (date) => | ||||||
|  |   Math.floor(date.getTime() / 1000) | ||||||
|  |  | ||||||
| const computeCounters = (stream) => | const computeCounters = (stream) => | ||||||
|   new Promise((resolve, reject) => { |   new Promise((resolve, reject) => { | ||||||
|     const counters = {} |     const counters = {} | ||||||
|  |     const expireat = {} | ||||||
|  |  | ||||||
|     const incrCounter = (counterName, by = 1) => |     const incrCounter = (counterName, by = 1) => | ||||||
|       counters[counterName] = (counters[counterName] || 0) + by |       counters[counterName] = (counters[counterName] || 0) + by | ||||||
| @ -94,25 +104,31 @@ const computeCounters = (stream) => | |||||||
|       .on('error', reject) |       .on('error', reject) | ||||||
|       .on('data', entry => { |       .on('data', entry => { | ||||||
|         const date = new Date(Math.round(entry.timestamp / 1000000)) |         const date = new Date(Math.round(entry.timestamp / 1000000)) | ||||||
|         const dayKey = `${date.getUTCFullYear()}-${date.getUTCMonth()}-${date.getUTCDate()}` |         const nextDay = startOfDay(addDays(date, 1)) | ||||||
|         const hourKey = `${dayKey}-${date.getUTCHours()}` |         const thirtyDaysLater = getSeconds(addDays(nextDay, 30)) | ||||||
|         // const minuteKey = `${hourKey}-${date.getUTCMinutes()}` |  | ||||||
|  |  | ||||||
|         // Q: How many requests do we receive per day/hour/minute? |         const dayKey = createDayKey(date) | ||||||
|         // incrCounter(`stats-requests-${dayKey}`) // Done by ingest_stats worker |         const hourKey = createHourKey(date) | ||||||
|         // incrCounter(`stats-requests-${hourKey}`) // Done by ingest_stats worker |  | ||||||
|         // incrCounter(`stats-requests-${minuteKey}`) // Done by ingest_stats worker |  | ||||||
|  |  | ||||||
|         // Q: How many requests are served by origin/cache/edge per day/hour? |         // Q: How many requests are served by origin/cache/edge per day/hour? | ||||||
|         if (entry.origin) { |         if (entry.origin) { | ||||||
|           incrCounter(`stats-originRequests-${dayKey}`) |           incrCounter(`stats-originRequests-${dayKey}`) | ||||||
|  |           expireat[`stats-originRequests-${dayKey}`] = thirtyDaysLater | ||||||
|  |  | ||||||
|           incrCounter(`stats-originRequests-${hourKey}`) |           incrCounter(`stats-originRequests-${hourKey}`) | ||||||
|  |           expireat[`stats-originRequests-${hourKey}`] = thirtyDaysLater | ||||||
|         } else if (entry.cache) { |         } else if (entry.cache) { | ||||||
|           incrCounter(`stats-cacheRequests-${dayKey}`) |           incrCounter(`stats-cacheRequests-${dayKey}`) | ||||||
|  |           expireat[`stats-cacheRequests-${dayKey}`] = thirtyDaysLater | ||||||
|  |  | ||||||
|           incrCounter(`stats-cacheRequests-${hourKey}`) |           incrCounter(`stats-cacheRequests-${hourKey}`) | ||||||
|  |           expireat[`stats-cacheRequests-${hourKey}`] = thirtyDaysLater | ||||||
|         } else { |         } else { | ||||||
|           incrCounter(`stats-edgeRequests-${dayKey}`) |           incrCounter(`stats-edgeRequests-${dayKey}`) | ||||||
|  |           expireat[`stats-edgeRequests-${dayKey}`] = thirtyDaysLater | ||||||
|  |  | ||||||
|           incrCounter(`stats-edgeRequests-${hourKey}`) |           incrCounter(`stats-edgeRequests-${hourKey}`) | ||||||
|  |           expireat[`stats-edgeRequests-${hourKey}`] = thirtyDaysLater | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         const clientRequest = entry.clientRequest |         const clientRequest = entry.clientRequest | ||||||
| @ -125,14 +141,19 @@ const computeCounters = (stream) => | |||||||
|  |  | ||||||
|         if (package) { |         if (package) { | ||||||
|           incrCounterMember(`stats-packageRequests-${dayKey}`, package) |           incrCounterMember(`stats-packageRequests-${dayKey}`, package) | ||||||
|  |           expireat[`stats-packageRequests-${dayKey}`] = thirtyDaysLater | ||||||
|  |  | ||||||
|           incrCounterMember(`stats-packageBytes-${dayKey}`, package, edgeResponse.bytes) |           incrCounterMember(`stats-packageBytes-${dayKey}`, package, edgeResponse.bytes) | ||||||
|  |           expireat[`stats-packageBytes-${dayKey}`] = thirtyDaysLater | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         // Q: How many requests per day do we receive via each protocol? |         // Q: How many requests per day do we receive via each protocol? | ||||||
|         const protocol = clientRequest.httpProtocol |         const protocol = clientRequest.httpProtocol | ||||||
|  |  | ||||||
|         if (protocol) |         if (protocol) { | ||||||
|           incrCounterMember(`stats-protocolRequests-${dayKey}`, protocol) |           incrCounterMember(`stats-protocolRequests-${dayKey}`, protocol) | ||||||
|  |           expireat[`stats-protocolRequests-${dayKey}`] = thirtyDaysLater | ||||||
|  |         } | ||||||
|  |  | ||||||
|         // Q: How many requests do we receive from a hostname per day? |         // Q: How many requests do we receive from a hostname per day? | ||||||
|         // Q: How many bytes do we serve to a hostname per day? |         // Q: How many bytes do we serve to a hostname per day? | ||||||
| @ -141,16 +162,19 @@ const computeCounters = (stream) => | |||||||
|  |  | ||||||
|         if (hostname) { |         if (hostname) { | ||||||
|           incrCounterMember(`stats-hostnameRequests-${dayKey}`, hostname) |           incrCounterMember(`stats-hostnameRequests-${dayKey}`, hostname) | ||||||
|  |           expireat[`stats-hostnameRequests-${dayKey}`] = thirtyDaysLater | ||||||
|  |  | ||||||
|           incrCounterMember(`stats-hostnameBytes-${dayKey}`, hostname, edgeResponse.bytes) |           incrCounterMember(`stats-hostnameBytes-${dayKey}`, hostname, edgeResponse.bytes) | ||||||
|  |           expireat[`stats-hostnameBytes-${dayKey}`] = thirtyDaysLater | ||||||
|         } |         } | ||||||
|       }) |       }) | ||||||
|       .on('end', () => { |       .on('end', () => { | ||||||
|         resolve(counters) |         resolve({ counters, expireat }) | ||||||
|       }) |       }) | ||||||
|   }) |   }) | ||||||
|  |  | ||||||
| const processLogs = (stream) => | const processLogs = (stream) => | ||||||
|   computeCounters(stream).then(counters => { |   computeCounters(stream).then(({ counters, expireat }) => { | ||||||
|     Object.keys(counters).forEach(key => { |     Object.keys(counters).forEach(key => { | ||||||
|       const value = counters[key] |       const value = counters[key] | ||||||
|  |  | ||||||
| @ -163,6 +187,9 @@ const processLogs = (stream) => | |||||||
|           db.zincrby(key, value[member], member) |           db.zincrby(key, value[member], member) | ||||||
|         }) |         }) | ||||||
|       } |       } | ||||||
|  |  | ||||||
|  |       if (expireat[key]) | ||||||
|  |         db.expireat(key, expireat[key]) | ||||||
|     }) |     }) | ||||||
|   }) |   }) | ||||||
|  |  | ||||||
|  | |||||||
| @ -1,11 +1,12 @@ | |||||||
| require('isomorphic-fetch') | require('isomorphic-fetch') | ||||||
| const redis = require('redis') | const redis = require('redis') | ||||||
|  | const addDays = require('date-fns/add_days') | ||||||
| const invariant = require('invariant') | const invariant = require('invariant') | ||||||
| const { | const { | ||||||
|   createDayKey, |   createDayKey, | ||||||
|   createHourKey, |   createHourKey, | ||||||
|   createMinuteKey |   createMinuteKey | ||||||
| } = require('../server/StatsServer') | } = require('./StatsServer') | ||||||
|  |  | ||||||
| const CloudflareEmail = process.env.CLOUDFLARE_EMAIL | const CloudflareEmail = process.env.CLOUDFLARE_EMAIL | ||||||
| const CloudflareKey = process.env.CLOUDFLARE_KEY | const CloudflareKey = process.env.CLOUDFLARE_KEY | ||||||
| @ -65,6 +66,9 @@ const oneMinuteSeconds = 60 | |||||||
| const oneHourSeconds = oneMinuteSeconds * 60 | const oneHourSeconds = oneMinuteSeconds * 60 | ||||||
| const oneDaySeconds = oneHourSeconds * 24 | const oneDaySeconds = oneHourSeconds * 24 | ||||||
|  |  | ||||||
|  | const getSeconds = (date) => | ||||||
|  |   Math.floor(date.getTime() / 1000) | ||||||
|  |  | ||||||
| const reduceResults = (memo, results) => { | const reduceResults = (memo, results) => { | ||||||
|   Object.keys(results).forEach(key => { |   Object.keys(results).forEach(key => { | ||||||
|     const value = results[key] |     const value = results[key] | ||||||
| @ -163,27 +167,34 @@ const processPerDayTimeseries = (ts) => | |||||||
|     ) |     ) | ||||||
|  |  | ||||||
|     const dayKey = createDayKey(since) |     const dayKey = createDayKey(since) | ||||||
|  |     const oneYearLater = getSeconds(addDays(until, 365)) | ||||||
|  |     const thirtyDaysLater = getSeconds(addDays(until, 30)) | ||||||
|  |  | ||||||
|     // Q: How many requests do we serve per day? |     // Q: How many requests do we serve per day? | ||||||
|     db.set(`stats-requests-${dayKey}`, ts.requests.all) |     db.set(`stats-requests-${dayKey}`, ts.requests.all) | ||||||
|  |     db.expireat(`stats-requests-${dayKey}`, oneYearLater) | ||||||
|  |  | ||||||
|     // Q: How many requests do we serve per day from the cache? |     // Q: How many requests do we serve per day from the cache? | ||||||
|     db.set(`stats-requestsFromCache-${dayKey}`, ts.requests.cached) |     db.set(`stats-requestsFromCache-${dayKey}`, ts.requests.cached) | ||||||
|  |     db.expireat(`stats-requestsFromCache-${dayKey}`, oneYearLater) | ||||||
|  |  | ||||||
|     // Q: How much bandwidth do we serve per day? |     // Q: How much bandwidth do we serve per day? | ||||||
|     db.set(`stats-bandwidth-${dayKey}`, ts.bandwidth.all) |     db.set(`stats-bandwidth-${dayKey}`, ts.bandwidth.all) | ||||||
|  |     db.expireat(`stats-bandwidth-${dayKey}`, oneYearLater) | ||||||
|  |  | ||||||
|     // Q: How much bandwidth do we serve per day from the cache? |     // Q: How much bandwidth do we serve per day from the cache? | ||||||
|     db.set(`stats-bandwidthFromCache-${dayKey}`, ts.bandwidth.cached) |     db.set(`stats-bandwidthFromCache-${dayKey}`, ts.bandwidth.cached) | ||||||
|  |     db.expireat(`stats-bandwidthFromCache-${dayKey}`, oneYearLater) | ||||||
|  |  | ||||||
|     // Q: How many errors do we serve per day? |  | ||||||
|     const httpStatus = ts.requests.http_status |     const httpStatus = ts.requests.http_status | ||||||
|     const errors = Object.keys(httpStatus).reduce((memo, status) => { |     const errors = Object.keys(httpStatus).reduce((memo, status) => { | ||||||
|       return parseInt(status, 10) >= 500 ? memo + httpStatus[status] : memo |       return parseInt(status, 10) >= 500 ? memo + httpStatus[status] : memo | ||||||
|     }, 0) |     }, 0) | ||||||
|  |  | ||||||
|  |     // Q: How many errors do we serve per day? | ||||||
|     db.set(`stats-errors-${dayKey}`, errors) |     db.set(`stats-errors-${dayKey}`, errors) | ||||||
|  |     db.expireat(`stats-errors-${dayKey}`, oneYearLater) | ||||||
|  |  | ||||||
|     // Q: How many requests do we serve to a country per day? |  | ||||||
|     // Q: How much bandwidth do we serve to a country per day? |  | ||||||
|     const requestsByCountry = [] |     const requestsByCountry = [] | ||||||
|     const bandwidthByCountry = [] |     const bandwidthByCountry = [] | ||||||
|  |  | ||||||
| @ -191,18 +202,24 @@ const processPerDayTimeseries = (ts) => | |||||||
|       const requests = ts.requests.country[country] |       const requests = ts.requests.country[country] | ||||||
|       const bandwidth = ts.bandwidth.country[country] |       const bandwidth = ts.bandwidth.country[country] | ||||||
|  |  | ||||||
|       // Include only countries who made at least 1M requests. |       // Include only countries who made at least 100K requests. | ||||||
|       if (requests > 1000000) { |       if (requests > 100000) { | ||||||
|         requestsByCountry.push(requests, country) |         requestsByCountry.push(requests, country) | ||||||
|         bandwidthByCountry.push(bandwidth, country) |         bandwidthByCountry.push(bandwidth, country) | ||||||
|       } |       } | ||||||
|     }) |     }) | ||||||
|  |  | ||||||
|     if (requestsByCountry.length) |     // Q: How many requests do we serve to a country per day? | ||||||
|  |     if (requestsByCountry.length) { | ||||||
|       db.zadd([ `stats-requestsByCountry-${dayKey}`, ...requestsByCountry ]) |       db.zadd([ `stats-requestsByCountry-${dayKey}`, ...requestsByCountry ]) | ||||||
|  |       db.expireat(`stats-requestsByCountry-${dayKey}`, thirtyDaysLater) | ||||||
|  |     } | ||||||
|  |  | ||||||
|     if (bandwidthByCountry.length) |     // Q: How much bandwidth do we serve to a country per day? | ||||||
|  |     if (bandwidthByCountry.length) { | ||||||
|       db.zadd([ `stats-bandwidthByCountry-${dayKey}`, ...bandwidthByCountry ]) |       db.zadd([ `stats-bandwidthByCountry-${dayKey}`, ...bandwidthByCountry ]) | ||||||
|  |       db.expireat(`stats-bandwidthByCountry-${dayKey}`, thirtyDaysLater) | ||||||
|  |     } | ||||||
|  |  | ||||||
|     resolve() |     resolve() | ||||||
|   }) |   }) | ||||||
| @ -230,14 +247,16 @@ const processPerHourTimeseries = (ts) => | |||||||
|  |  | ||||||
|     const hourKey = createHourKey(since) |     const hourKey = createHourKey(since) | ||||||
|  |  | ||||||
|     // Q: How many requests do we serve per hour? (expire after 7 days) |     // Q: How many requests do we serve per hour? | ||||||
|     db.setex(`stats-requests-${hourKey}`, (oneDaySeconds * 7), ts.requests.all) |     db.setex(`stats-requests-${hourKey}`, (oneDaySeconds * 7), ts.requests.all) | ||||||
|     // Q: How many requests do we serve per hour from the cache? (expire after 7 days) |  | ||||||
|  |     // Q: How many requests do we serve per hour from the cache? | ||||||
|     db.setex(`stats-requestsFromCache-${hourKey}`, (oneDaySeconds * 7), ts.requests.cached) |     db.setex(`stats-requestsFromCache-${hourKey}`, (oneDaySeconds * 7), ts.requests.cached) | ||||||
|  |  | ||||||
|     // Q: How much bandwidth do we serve per hour? (expire after 7 days) |     // Q: How much bandwidth do we serve per hour? | ||||||
|     db.setex(`stats-bandwidth-${hourKey}`, (oneDaySeconds * 7), ts.bandwidth.all) |     db.setex(`stats-bandwidth-${hourKey}`, (oneDaySeconds * 7), ts.bandwidth.all) | ||||||
|     // Q: How much bandwidth do we serve per hour from the cache? (expire after 7 days) |  | ||||||
|  |     // Q: How much bandwidth do we serve per hour from the cache? | ||||||
|     db.setex(`stats-bandwidthFromCache-${hourKey}`, (oneDaySeconds * 7), ts.bandwidth.cached) |     db.setex(`stats-bandwidthFromCache-${hourKey}`, (oneDaySeconds * 7), ts.bandwidth.cached) | ||||||
|  |  | ||||||
|     resolve() |     resolve() | ||||||
| @ -266,14 +285,16 @@ const processPerMinuteTimeseries = (ts) => | |||||||
|  |  | ||||||
|     const minuteKey = createMinuteKey(since) |     const minuteKey = createMinuteKey(since) | ||||||
|  |  | ||||||
|     // Q: How many requests do we serve per minute? (expire after 1 day) |     // Q: How many requests do we serve per minute? | ||||||
|     db.setex(`stats-requests-${minuteKey}`, oneDaySeconds, ts.requests.all) |     db.setex(`stats-requests-${minuteKey}`, oneDaySeconds, ts.requests.all) | ||||||
|     // Q: How many requests do we serve per minute from the cache? (expire after 1 day) |  | ||||||
|  |     // Q: How many requests do we serve per minute from the cache? | ||||||
|     db.setex(`stats-requestsFromCache-${minuteKey}`, oneDaySeconds, ts.requests.cached) |     db.setex(`stats-requestsFromCache-${minuteKey}`, oneDaySeconds, ts.requests.cached) | ||||||
|  |  | ||||||
|     // Q: How much bandwidth do we serve per minute? (expire after 1 day) |     // Q: How much bandwidth do we serve per minute? | ||||||
|     db.setex(`stats-bandwidth-${minuteKey}`, oneDaySeconds, ts.bandwidth.all) |     db.setex(`stats-bandwidth-${minuteKey}`, oneDaySeconds, ts.bandwidth.all) | ||||||
|     // Q: How much bandwidth do we serve per minute from the cache? (expire after 1 day) |  | ||||||
|  |     // Q: How much bandwidth do we serve per minute from the cache? | ||||||
|     db.setex(`stats-bandwidthFromCache-${minuteKey}`, oneDaySeconds, ts.bandwidth.cached) |     db.setex(`stats-bandwidthFromCache-${minuteKey}`, oneDaySeconds, ts.bandwidth.cached) | ||||||
|  |  | ||||||
|     resolve() |     resolve() | ||||||
|  | |||||||
| @ -58,14 +58,16 @@ const sumTopScores = (keys, n) => | |||||||
|     }, {}) |     }, {}) | ||||||
|   }) |   }) | ||||||
|  |  | ||||||
|  | const createKey = (...args) => args.join('-') | ||||||
|  |  | ||||||
| const createDayKey = (date) => | const createDayKey = (date) => | ||||||
|   `${date.getUTCFullYear()}-${date.getUTCMonth()}-${date.getUTCDate()}` |   createKey(date.getUTCFullYear(), date.getUTCMonth(), date.getUTCDate()) | ||||||
|  |  | ||||||
| const createHourKey = (date) => | const createHourKey = (date) => | ||||||
|   `${createDayKey(date)}-${date.getUTCHours()}` |   createKey(createDayKey(date), date.getUTCHours()) | ||||||
|  |  | ||||||
| const createMinuteKey = (date) => | const createMinuteKey = (date) => | ||||||
|   `${createDayKey(date)}-${date.getUTCMinutes()}` |   createKey(createHourKey(date), date.getUTCMinutes()) | ||||||
|  |  | ||||||
| module.exports = { | module.exports = { | ||||||
|   getKeyValues, |   getKeyValues, | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user