From f3fa340ae910780e77b7bac160752c42bf5e0bc7 Mon Sep 17 00:00:00 2001 From: MICHAEL JACKSON Date: Tue, 23 May 2017 15:00:09 -0700 Subject: [PATCH] More work on log ingestion --- workers/ingest-logs.js | 310 ++++++++++++++++++++++------------------- 1 file changed, 170 insertions(+), 140 deletions(-) diff --git a/workers/ingest-logs.js b/workers/ingest-logs.js index 6f4a5e8..f439803 100644 --- a/workers/ingest-logs.js +++ b/workers/ingest-logs.js @@ -32,6 +32,19 @@ invariant( 'Missing the $FIREBASE_ACCOUNT environment variable' ) +/** + * Domains we want to analyze. + */ +const DomainNames = [ + //'npmcdn.com', // We don't have log data on npmcdn.com yet :/ + 'unpkg.com' +] + +/** + * The window of time to download in a single fetch. + */ +const LogWindowSeconds = 30 + /* Stuff we wanna show on the website: @@ -54,7 +67,6 @@ admin.initializeApp({ }) const db = admin.database() -const logsRef = db.ref('logs') const getZones = (domain) => fetch(`https://api.cloudflare.com/client/v4/zones?name=${domain}`, { @@ -76,9 +88,6 @@ const getLogs = (zoneId, startTime, endTime) => } }).then(res => res.body.pipe(gunzip())) -const incrementKey = (object, key, n = 1) => - object[key] = (object[key] || 0) + n - const toSeconds = (millis) => Math.floor(millis / 1000) @@ -101,10 +110,117 @@ const oneMinute = oneSecond * 60 const thirtyMinutes = oneMinute * 30 const oneHour = oneMinute * 60 -const ingestLogs = (zone, startSeconds, endSeconds) => +const computeLogChanges = (stream) => new Promise((resolve, reject) => { + const changes = {} + + const incKey = (key, n = 1) => + changes[key] = (changes[key] || 0) + n + + stream + .pipe(ndjson.parse()) + .on('error', reject) + .on('data', entry => { + const date = new Date(Math.round(entry.timestamp / 1000000)) + const dayKey = `${date.getUTCFullYear()}/${date.getUTCMonth()}/${date.getUTCDate()}` + const hourKey = `${dayKey}/${date.getUTCHours()}` + const minuteKey = `${hourKey}/${date.getUTCMinutes()}` + + // Q: How many requests do we receive per day? + incKey(`requestsPerDay/${dayKey}`) + + // Q: How many requests do we receive per minute? + incKey(`requestsPerMinute/${minuteKey}`) + + // Q: How many requests do we receive to edge/cache/origin per hour? + if (entry.origin) { + incKey(`originRequestsPerHour/${hourKey}`) + } else if (entry.cache) { + incKey(`cacheRequestsPerHour/${hourKey}`) + } else { + incKey(`edgeRequestsPerHour/${hourKey}`) + } + + const clientRequest = entry.clientRequest + + // Q: How many requests per day do we receive for a package? + const uri = clientRequest.uri + const package = getPackageName(parseURL(uri).pathname) + + if (package) { + const key = `packageRequestsPerDay/${dayKey}/${hashKey(package)}` + + if (changes[key]) { + changes[key].requests += 1 + } else { + changes[key] = { package, requests: 1 } + } + } + + // Q: How many requests per day do we receive via each protocol? + const protocol = clientRequest.httpProtocol + + if (protocol) { + const key = `protocolRequestsPerDay/${dayKey}/${hashKey(protocol)}` + + if (changes[key]) { + changes[key].requests += 1 + } else { + changes[key] = { protocol, requests: 1 } + } + } + + // Q: How many requests per day do we receive from an origin? + const referer = clientRequest.referer + + if (referer) { + const url = parseURL(referer) + const origin = formatURL({ + protocol: url.protocol, + hostname: url.hostname + }) + + const key = `originRequestsPerDay/${dayKey}/${hashKey(origin)}` + + if (changes[key]) { + changes[key].requests += 1 + } else { + changes[key] = { origin, requests: 1 } + } + } + }) + .on('end', () => { + resolve(changes) + }) + }) + +const processLogs = (stream) => + computeLogChanges(stream).then(changes => { + // Record the changes. + Object.keys(changes).forEach(key => { + const ref = db.ref(`logs/${key}`) + + ref.transaction(value => { + if (typeof changes[key].requests === 'number') { + // Nested value with a "requests" property. + if (value && value.requests) { + value.requests += changes[key].requests + return value + } else { + return changes[key] + } + } else { + // Simple counter. + return (value || 0) + changes[key] + } + }) + }) + }) + +const ingestLogs = (zone, startSeconds, endSeconds) => + new Promise(resolve => { console.log( - 'START ingesting logs for %s from %s to %s', + 'LOG: start ingesting logs for %s from %s to %s', zone.name, stringifySeconds(startSeconds), stringifySeconds(endSeconds) @@ -112,147 +228,51 @@ const ingestLogs = (zone, startSeconds, endSeconds) => const startFetchTime = Date.now() - getLogs(zone.id, startSeconds, endSeconds).then(stream => { - const endFetchTime = Date.now() + resolve( + getLogs(zone.id, startSeconds, endSeconds).then(stream => { + const endFetchTime = Date.now() - console.log( - 'Fetched %ds worth of logs for %s at %s in %dms', - endSeconds - startSeconds, - zone.name, - stringifySeconds(startSeconds), - endFetchTime - startFetchTime - ) + console.log( + 'LOG: fetched %ds worth of logs for %s in %dms', + endSeconds - startSeconds, + zone.name, + endFetchTime - startFetchTime + ) - const changes = {} + const startProcessTime = Date.now() - stream - .pipe(ndjson.parse()) - .on('error', reject) - .on('data', entry => { - const date = new Date(Math.round(entry.timestamp / 1000000)) - const dayKey = `${date.getUTCFullYear()}/${date.getUTCMonth()}/${date.getUTCDate()}` - const minuteKey = `${date.getUTCHours()}/${date.getUTCMinutes()}` + return processLogs(stream).then(() => { + const endProcessTime = Date.now() - // Q: How many requests do we receive per minute? - incrementKey(changes, `${dayKey}/totalRequests/${minuteKey}`, 1) - - // Q: How many requests do we receive to edge/cache/origin per minute? - if (entry.origin) { - incrementKey(changes, `${dayKey}/originRequests/${minuteKey}`) - } else if (entry.cache) { - incrementKey(changes, `${dayKey}/cacheRequests/${minuteKey}`) - } else { - incrementKey(changes, `${dayKey}/edgeRequests/${minuteKey}`) - } - - const clientRequest = entry.clientRequest - - // Q: How many requests per day do we receive for a package? - const uri = clientRequest.uri - const package = getPackageName(parseURL(uri).pathname) - if (package) { - const key = `${dayKey}/packages/${hashKey(package)}` - - if (changes[key]) { - changes[key].requests += 1 - } else { - changes[key] = { package, requests: 1 } - } - } - - // Q: How many requests per day do we receive via each protocol? - const protocol = clientRequest.httpProtocol - if (protocol) { - const key = `${dayKey}/protocols/${hashKey(protocol)}` - - if (changes[key]) { - changes[key].requests += 1 - } else { - changes[key] = { protocol, requests: 1 } - } - } - - // Q: How many requests per day do we receive from an origin? - // const referer = clientRequest.referer - // if (referer) { - // const url = parseURL(referer) - // const origin = formatURL({ - // protocol: url.protocol, - // hostname: url.hostname - // }) - // - // const key = `${dayKey}/origins/${hashKey(origin)}` - // - // if (changes[key]) { - // changes[key].requests += 1 - // } else { - // changes[key] = { origin, requests: 1 } - // } - // } - }) - .on('end', () => { console.log( - 'FINISH ingesting logs for %s from %s to %s', + 'LOG: processed %ds worth of logs for %s in %dms', + endSeconds - startSeconds, zone.name, - stringifySeconds(startSeconds), - stringifySeconds(endSeconds) + endProcessTime - startProcessTime ) - - // Record the changes. - Object.keys(changes).forEach(key => { - const ref = logsRef.child(key) - - ref.transaction(value => { - if (typeof changes[key].requests === 'number') { - // Nested value with a "requests" property. - if (value && value.requests) { - value.requests += changes[key].requests - return value - } else { - return changes[key] - } - } else { - // Simple counter. - return (value || 0) + changes[key] - } - }) - }) - - resolve(changes) }) - }) + }) + ) }) -const startIngestingLogs = (zone) => { - const startSecondsRef = logsRef.child(`nextStartSeconds/${zone.name.replace('.', '-')}`) - - let inProgress = false +const startZone = (zone) => { + const startSecondsRef = db.ref(`logs/nextStartSeconds/${zone.name.replace('.', '-')}`) const takeATurn = () => { - if (inProgress) { - console.log( - 'Still ingesting logs for %s, waiting for another turn...', - zone.name - ) - - return - } - - inProgress = true - startSecondsRef.once('value', snapshot => { let startSeconds = snapshot.val() const now = Date.now() // Cloudflare keeps logs around for 72 hours. + // https://support.cloudflare.com/hc/en-us/articles/216672448-Enterprise-Log-Share-REST-API const minSeconds = toSeconds(now - oneHour * 72) if (startSeconds == null) { startSeconds = minSeconds } else if (startSeconds < minSeconds) { console.warn( - 'WARNING: dropping logs for %s from %s to %s!', + 'WARNING: dropped logs for %s from %s to %s!', zone.name, stringifySeconds(startSeconds), stringifySeconds(minSeconds) @@ -261,34 +281,44 @@ const startIngestingLogs = (zone) => { startSeconds = minSeconds } - if (startSeconds < toSeconds(now - thirtyMinutes)) { - const endSeconds = startSeconds + 10 + // The log for a request is typically available within thirty (30) minutes + // of the request taking place under normal conditions. We deliver logs + // ordered by the time that the logs were created, i.e. the timestamp of + // the request when it was received by the edge. Given the order of + // delivery, we recommend waiting a full thirty minutes to ingest a full + // set of logs. This will help ensure that any congestion in the log + // pipeline has passed and a full set of logs can be ingested. + // https://support.cloudflare.com/hc/en-us/articles/216672448-Enterprise-Log-Share-REST-API + const maxSeconds = toSeconds(now - thirtyMinutes) + + if (startSeconds < maxSeconds) { + const endSeconds = startSeconds + LogWindowSeconds ingestLogs(zone, startSeconds, endSeconds).then(() => { startSecondsRef.set(endSeconds) - inProgress = false + setTimeout(takeATurn) + }, error => { + console.error(error.stack) + process.exit(1) }) } else { - console.log( - 'Waiting for 30 minutes to pass before fetching logs for %s...', - zone.name - ) - - inProgress = false + setTimeout(takeATurn, (startSeconds - maxSeconds) * 1000) } }) } takeATurn() - setInterval(takeATurn, oneSecond * 3) } -const domains = [ - //'npmcdn.com', // We don't have log data on npmcdn.com yet :/ - 'unpkg.com' -] - -Promise.all(domains.map(getZones)).then(results => { +Promise.all(DomainNames.map(getZones)).then(results => { const zones = results.reduce((memo, zones) => memo.concat(zones)) - zones.forEach(startIngestingLogs) + zones.forEach(startZone) }) + +//const getValues = (object) => +// Object.keys(object).map(key => object[key]) +// +//db.ref('logs/2017/4/17/packages').orderByChild('requests').limitToLast(10).once('value', (snapshot) => { +// const values = getValues(snapshot.val()).sort((a, b) => b.requests - a.requests) +// console.log(values) +//})