From 089f1eedb3b8b847f1765c3669c03476fc030d85 Mon Sep 17 00:00:00 2001 From: Michael Jackson Date: Fri, 1 Jun 2018 22:15:41 -0700 Subject: [PATCH] Use Cloudflare /received API instead of /requests --- server/CloudflareAPI.js | 18 ++-- server/ingestLogs.js | 180 +++++++++++++++++++++------------------- 2 files changed, 108 insertions(+), 90 deletions(-) diff --git a/server/CloudflareAPI.js b/server/CloudflareAPI.js index 10f7548..d4119d5 100644 --- a/server/CloudflareAPI.js +++ b/server/CloudflareAPI.js @@ -3,7 +3,7 @@ const invariant = require("invariant"); const gunzip = require("gunzip-maybe"); const ndjson = require("ndjson"); -const cloudflareURL = "https://api.cloudflare.com"; +const cloudflareURL = "https://api.cloudflare.com/client/v4"; const cloudflareEmail = process.env.CLOUDFLARE_EMAIL; const cloudflareKey = process.env.CLOUDFLARE_KEY; @@ -15,7 +15,7 @@ invariant( invariant(cloudflareKey, "Missing the $CLOUDFLARE_KEY environment variable"); function get(path, headers) { - return fetch(`${cloudflareURL}/client/v4${path}`, { + return fetch(`${cloudflareURL}${path}`, { headers: Object.assign({}, headers, { "X-Auth-Email": cloudflareEmail, "X-Auth-Key": cloudflareKey @@ -74,18 +74,24 @@ function getZoneAnalyticsDashboard(zones, since, until) { } function getJSONStream(path, headers) { - const acceptGzipHeaders = Object.assign({}, headers, { + const gzipHeaders = Object.assign({}, headers, { "Accept-Encoding": "gzip" }); - return get(path, acceptGzipHeaders) + return get(path, gzipHeaders) .then(res => res.body.pipe(gunzip())) .then(stream => stream.pipe(ndjson.parse())); } -function getLogs(zoneId, startTime, endTime) { +function getLogs(zoneId, startTime, endTime, fieldsArray) { + const fields = fieldsArray.join(","); + + // console.log( + // `https://api.cloudflare.com/client/v4/zones/${zoneId}/logs/received?start=${startTime}&end=${endTime}&fields=${fields}` + // ); + return getJSONStream( - `/zones/${zoneId}/logs/requests?start=${startTime}&end=${endTime}` + `/zones/${zoneId}/logs/received?start=${startTime}&end=${endTime}&fields=${fields}` ); } diff --git a/server/ingestLogs.js b/server/ingestLogs.js index 114695e..1428283 100644 --- a/server/ingestLogs.js +++ b/server/ingestLogs.js @@ -1,5 +1,6 @@ const parseURL = require("url").parse; const startOfDay = require("date-fns/start_of_day"); +const startOfMinute = require("date-fns/start_of_minute"); const addDays = require("date-fns/add_days"); const db = require("./utils/redis"); @@ -20,28 +21,35 @@ const domainNames = [ /** * The window of time to download in a single fetch. */ -const logWindowSeconds = 30; +const logWindowSeconds = 60; + +/** + * The minimum time to wait between fetches. + */ +const minInterval = 15000; function getSeconds(date) { return Math.floor(date.getTime() / 1000); } function stringifySeconds(seconds) { - return new Date(seconds * 1000).toISOString(); + return new Date(seconds * 1000).toISOString().replace(/\.0+Z$/, "Z"); } -function toSeconds(millis) { - return Math.floor(millis / 1000); +function toSeconds(ms) { + return Math.floor(ms / 1000); } const oneSecond = 1000; const oneMinute = oneSecond * 60; const oneHour = oneMinute * 60; +const oneDay = oneHour * 24; function computeCounters(stream) { return new Promise((resolve, reject) => { const counters = {}; const expireat = {}; + let totalEntries = 0; function incr(key, member, by, expiry) { counters[key] = counters[key] || {}; @@ -51,21 +59,20 @@ function computeCounters(stream) { stream .on("error", reject) - .on("data", function(entry) { - const date = new Date(Math.round(entry.timestamp / 1000000)); + .on("data", entry => { + totalEntries += 1; + + const date = new Date(Math.round(entry.EdgeStartTimestamp / 1000000)); const nextDay = startOfDay(addDays(date, 1)); const sevenDaysLater = getSeconds(addDays(nextDay, 7)); const thirtyDaysLater = getSeconds(addDays(nextDay, 30)); const dayKey = StatsAPI.createDayKey(date); - const clientRequest = entry.clientRequest; - const edgeResponse = entry.edgeResponse; - - if (edgeResponse.status === 200) { + if (entry.EdgeResponseStatus === 200) { // Q: How many requests do we serve for a package per day? // Q: How many bytes do we serve for a package per day? - const url = parsePackageURL(parseURL(clientRequest.uri).pathname); + const url = parsePackageURL(entry.ClientRequestURI); const packageName = url && url.packageName; if (packageName && isValidPackageName(packageName)) { @@ -78,26 +85,27 @@ function computeCounters(stream) { incr( `stats-packageBytes-${dayKey}`, packageName, - edgeResponse.bytes, + entry.EdgeResponseBytes, thirtyDaysLater ); } } // Q: How many requests per day do we receive via a protocol? - const protocol = clientRequest.httpProtocol; + const protocol = entry.ClientRequestProtocol; - if (protocol) + if (protocol) { incr( `stats-protocolRequests-${dayKey}`, protocol, 1, thirtyDaysLater ); + } // Q: How many requests do we receive from a hostname per day? // Q: How many bytes do we serve to a hostname per day? - const referer = clientRequest.referer; + const referer = entry.ClientRequestReferer; const hostname = referer && parseURL(referer).hostname; if (hostname) { @@ -105,86 +113,93 @@ function computeCounters(stream) { incr( `stats-hostnameBytes-${dayKey}`, hostname, - edgeResponse.bytes, + entry.EdgeResponseBytes, sevenDaysLater ); } }) - .on("end", function() { - resolve({ counters, expireat }); + .on("end", () => { + resolve({ counters, expireat, totalEntries }); }); }); } function processLogs(stream) { - return computeCounters(stream).then(({ counters, expireat }) => { - Object.keys(counters).forEach(key => { - const values = counters[key]; + return computeCounters(stream).then( + ({ counters, expireat, totalEntries }) => { + Object.keys(counters).forEach(key => { + const values = counters[key]; - Object.keys(values).forEach(member => { - db.zincrby(key, values[member], member); + Object.keys(values).forEach(member => { + db.zincrby(key, values[member], member); + }); + + if (expireat[key]) { + db.expireat(key, expireat[key]); + } }); - if (expireat[key]) db.expireat(key, expireat[key]); + return totalEntries; + } + ); +} + +function ingestLogs(zone, startSeconds, endSeconds) { + const startFetchTime = Date.now(); + const fields = [ + "EdgeStartTimestamp", + "EdgeResponseStatus", + "EdgeResponseBytes", + "ClientRequestProtocol", + "ClientRequestURI", + "ClientRequestReferer" + ]; + + return CloudflareAPI.getLogs( + zone.id, + stringifySeconds(startSeconds), + stringifySeconds(endSeconds), + fields + ).then(stream => { + const endFetchTime = Date.now(); + + console.log( + "info: Fetched logs for %s from %s to %s (%dms)", + zone.name, + stringifySeconds(startSeconds), + stringifySeconds(endSeconds), + endFetchTime - startFetchTime + ); + + const startProcessTime = Date.now(); + + return processLogs(stream).then(totalEntries => { + const endProcessTime = Date.now(); + + console.log( + "info: Processed %d log entries for %s (%dms)", + totalEntries, + zone.name, + endProcessTime - startProcessTime + ); }); }); } -function ingestLogs(zone, startSeconds, endSeconds) { - return new Promise(resolve => { - console.log( - "info: Started ingesting logs for %s from %s to %s", - zone.name, - stringifySeconds(startSeconds), - stringifySeconds(endSeconds) - ); - - const startFetchTime = Date.now(); - - resolve( - CloudflareAPI.getLogs(zone.id, startSeconds, endSeconds).then(stream => { - const endFetchTime = Date.now(); - - console.log( - "info: Fetched %ds worth of logs for %s in %dms", - endSeconds - startSeconds, - zone.name, - endFetchTime - startFetchTime - ); - - const startProcessTime = Date.now(); - - return processLogs(stream).then(() => { - const endProcessTime = Date.now(); - - console.log( - "info: Processed %ds worth of logs for %s in %dms", - endSeconds - startSeconds, - zone.name, - endProcessTime - startProcessTime - ); - }); - }) - ); - }); -} - function startZone(zone) { - const startSecondsKey = `ingestLogsWorker-nextStartSeconds-${zone.name.replace( - ".", - "-" - )}`; + const suffix = zone.name.replace(".", "-"); + const startSecondsKey = `ingestLogs-start-${suffix}`; function takeATurn() { - db.get(startSecondsKey, function(error, value) { + const now = Date.now(); + + // Cloudflare keeps logs around for 7 days. + // https://support.cloudflare.com/hc/en-us/articles/216672448-Enterprise-Log-Share-Logpull-REST-API + const minSeconds = toSeconds(startOfMinute(now - oneDay * 5)); + + db.get(startSecondsKey, (error, value) => { let startSeconds = value && parseInt(value, 10); - const now = Date.now(); - - // Cloudflare keeps logs around for 72 hours. - // https://support.cloudflare.com/hc/en-us/articles/216672448-Enterprise-Log-Share-REST-API - const minSeconds = toSeconds(now - oneHour * 72); - if (startSeconds == null) { startSeconds = minSeconds; } else if (startSeconds < minSeconds) { @@ -198,6 +213,8 @@ function startZone(zone) { startSeconds = minSeconds; } + const endSeconds = startSeconds + logWindowSeconds; + // The log for a request is typically available within thirty (30) minutes // of the request taking place under normal conditions. We deliver logs // ordered by the time that the logs were created, i.e. the timestamp of @@ -208,15 +225,13 @@ function startZone(zone) { // https://support.cloudflare.com/hc/en-us/articles/216672448-Enterprise-Log-Share-REST-API const maxSeconds = toSeconds(now - oneMinute * 30); - if (startSeconds < maxSeconds) { - const endSeconds = startSeconds + logWindowSeconds; - + if (endSeconds < maxSeconds) { ingestLogs(zone, startSeconds, endSeconds).then( - function() { + () => { db.set(startSecondsKey, endSeconds); - setTimeout(takeATurn); + setTimeout(takeATurn, minInterval); }, - function(error) { + error => { console.error(error.stack); process.exit(1); } @@ -231,9 +246,6 @@ function startZone(zone) { } Promise.all(domainNames.map(CloudflareAPI.getZones)).then(results => { - const zones = results.reduce((memo, zones) => { - return memo.concat(zones); - }); - + const zones = results.reduce((memo, zones) => memo.concat(zones)); zones.forEach(startZone); });