More work on log ingestion
This commit is contained in:
		@ -32,6 +32,19 @@ invariant(
 | 
			
		||||
  'Missing the $FIREBASE_ACCOUNT environment variable'
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Domains we want to analyze.
 | 
			
		||||
 */
 | 
			
		||||
const DomainNames = [
 | 
			
		||||
  //'npmcdn.com', // We don't have log data on npmcdn.com yet :/
 | 
			
		||||
  'unpkg.com'
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * The window of time to download in a single fetch.
 | 
			
		||||
 */
 | 
			
		||||
const LogWindowSeconds = 30
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
Stuff we wanna show on the website:
 | 
			
		||||
 | 
			
		||||
@ -54,7 +67,6 @@ admin.initializeApp({
 | 
			
		||||
})
 | 
			
		||||
 | 
			
		||||
const db = admin.database()
 | 
			
		||||
const logsRef = db.ref('logs')
 | 
			
		||||
 | 
			
		||||
const getZones = (domain) =>
 | 
			
		||||
  fetch(`https://api.cloudflare.com/client/v4/zones?name=${domain}`, {
 | 
			
		||||
@ -76,9 +88,6 @@ const getLogs = (zoneId, startTime, endTime) =>
 | 
			
		||||
    }
 | 
			
		||||
  }).then(res => res.body.pipe(gunzip()))
 | 
			
		||||
 | 
			
		||||
const incrementKey = (object, key, n = 1) =>
 | 
			
		||||
  object[key] = (object[key] || 0) + n
 | 
			
		||||
 | 
			
		||||
const toSeconds = (millis) =>
 | 
			
		||||
  Math.floor(millis / 1000)
 | 
			
		||||
 | 
			
		||||
@ -101,10 +110,117 @@ const oneMinute = oneSecond * 60
 | 
			
		||||
const thirtyMinutes = oneMinute * 30
 | 
			
		||||
const oneHour = oneMinute * 60
 | 
			
		||||
 | 
			
		||||
const ingestLogs = (zone, startSeconds, endSeconds) =>
 | 
			
		||||
const computeLogChanges = (stream) =>
 | 
			
		||||
  new Promise((resolve, reject) => {
 | 
			
		||||
    const changes = {}
 | 
			
		||||
 | 
			
		||||
    const incKey = (key, n = 1) =>
 | 
			
		||||
      changes[key] = (changes[key] || 0) + n
 | 
			
		||||
 | 
			
		||||
    stream
 | 
			
		||||
      .pipe(ndjson.parse())
 | 
			
		||||
      .on('error', reject)
 | 
			
		||||
      .on('data', entry => {
 | 
			
		||||
        const date = new Date(Math.round(entry.timestamp / 1000000))
 | 
			
		||||
        const dayKey = `${date.getUTCFullYear()}/${date.getUTCMonth()}/${date.getUTCDate()}`
 | 
			
		||||
        const hourKey = `${dayKey}/${date.getUTCHours()}`
 | 
			
		||||
        const minuteKey = `${hourKey}/${date.getUTCMinutes()}`
 | 
			
		||||
 | 
			
		||||
        // Q: How many requests do we receive per day?
 | 
			
		||||
        incKey(`requestsPerDay/${dayKey}`)
 | 
			
		||||
 | 
			
		||||
        // Q: How many requests do we receive per minute?
 | 
			
		||||
        incKey(`requestsPerMinute/${minuteKey}`)
 | 
			
		||||
 | 
			
		||||
        // Q: How many requests do we receive to edge/cache/origin per hour?
 | 
			
		||||
        if (entry.origin) {
 | 
			
		||||
          incKey(`originRequestsPerHour/${hourKey}`)
 | 
			
		||||
        } else if (entry.cache) {
 | 
			
		||||
          incKey(`cacheRequestsPerHour/${hourKey}`)
 | 
			
		||||
        } else {
 | 
			
		||||
          incKey(`edgeRequestsPerHour/${hourKey}`)
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        const clientRequest = entry.clientRequest
 | 
			
		||||
 | 
			
		||||
        // Q: How many requests per day do we receive for a package?
 | 
			
		||||
        const uri = clientRequest.uri
 | 
			
		||||
        const package = getPackageName(parseURL(uri).pathname)
 | 
			
		||||
 | 
			
		||||
        if (package) {
 | 
			
		||||
          const key = `packageRequestsPerDay/${dayKey}/${hashKey(package)}`
 | 
			
		||||
 | 
			
		||||
          if (changes[key]) {
 | 
			
		||||
            changes[key].requests += 1
 | 
			
		||||
          } else {
 | 
			
		||||
            changes[key] = { package, requests: 1 }
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Q: How many requests per day do we receive via each protocol?
 | 
			
		||||
        const protocol = clientRequest.httpProtocol
 | 
			
		||||
 | 
			
		||||
        if (protocol) {
 | 
			
		||||
          const key = `protocolRequestsPerDay/${dayKey}/${hashKey(protocol)}`
 | 
			
		||||
 | 
			
		||||
          if (changes[key]) {
 | 
			
		||||
            changes[key].requests += 1
 | 
			
		||||
          } else {
 | 
			
		||||
            changes[key] = { protocol, requests: 1 }
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Q: How many requests per day do we receive from an origin?
 | 
			
		||||
        const referer = clientRequest.referer
 | 
			
		||||
 | 
			
		||||
        if (referer) {
 | 
			
		||||
          const url = parseURL(referer)
 | 
			
		||||
          const origin = formatURL({
 | 
			
		||||
            protocol: url.protocol,
 | 
			
		||||
            hostname: url.hostname
 | 
			
		||||
          })
 | 
			
		||||
 | 
			
		||||
          const key = `originRequestsPerDay/${dayKey}/${hashKey(origin)}`
 | 
			
		||||
 | 
			
		||||
          if (changes[key]) {
 | 
			
		||||
            changes[key].requests += 1
 | 
			
		||||
          } else {
 | 
			
		||||
            changes[key] = { origin, requests: 1 }
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      })
 | 
			
		||||
      .on('end', () => {
 | 
			
		||||
        resolve(changes)
 | 
			
		||||
      })
 | 
			
		||||
  })
 | 
			
		||||
 | 
			
		||||
const processLogs = (stream) =>
 | 
			
		||||
  computeLogChanges(stream).then(changes => {
 | 
			
		||||
    // Record the changes.
 | 
			
		||||
    Object.keys(changes).forEach(key => {
 | 
			
		||||
      const ref = db.ref(`logs/${key}`)
 | 
			
		||||
 | 
			
		||||
      ref.transaction(value => {
 | 
			
		||||
        if (typeof changes[key].requests === 'number') {
 | 
			
		||||
          // Nested value with a "requests" property.
 | 
			
		||||
          if (value && value.requests) {
 | 
			
		||||
            value.requests += changes[key].requests
 | 
			
		||||
            return value
 | 
			
		||||
          } else {
 | 
			
		||||
            return changes[key]
 | 
			
		||||
          }
 | 
			
		||||
        } else {
 | 
			
		||||
          // Simple counter.
 | 
			
		||||
          return (value || 0) + changes[key]
 | 
			
		||||
        }
 | 
			
		||||
      })
 | 
			
		||||
    })
 | 
			
		||||
  })
 | 
			
		||||
 | 
			
		||||
const ingestLogs = (zone, startSeconds, endSeconds) =>
 | 
			
		||||
  new Promise(resolve => {
 | 
			
		||||
    console.log(
 | 
			
		||||
      'START ingesting logs for %s from %s to %s',
 | 
			
		||||
      'LOG: start ingesting logs for %s from %s to %s',
 | 
			
		||||
      zone.name,
 | 
			
		||||
      stringifySeconds(startSeconds),
 | 
			
		||||
      stringifySeconds(endSeconds)
 | 
			
		||||
@ -112,147 +228,51 @@ const ingestLogs = (zone, startSeconds, endSeconds) =>
 | 
			
		||||
 | 
			
		||||
    const startFetchTime = Date.now()
 | 
			
		||||
 | 
			
		||||
    getLogs(zone.id, startSeconds, endSeconds).then(stream => {
 | 
			
		||||
      const endFetchTime = Date.now()
 | 
			
		||||
    resolve(
 | 
			
		||||
      getLogs(zone.id, startSeconds, endSeconds).then(stream => {
 | 
			
		||||
        const endFetchTime = Date.now()
 | 
			
		||||
 | 
			
		||||
      console.log(
 | 
			
		||||
        'Fetched %ds worth of logs for %s at %s in %dms',
 | 
			
		||||
        endSeconds - startSeconds,
 | 
			
		||||
        zone.name,
 | 
			
		||||
        stringifySeconds(startSeconds),
 | 
			
		||||
        endFetchTime - startFetchTime
 | 
			
		||||
      )
 | 
			
		||||
        console.log(
 | 
			
		||||
          'LOG: fetched %ds worth of logs for %s in %dms',
 | 
			
		||||
          endSeconds - startSeconds,
 | 
			
		||||
          zone.name,
 | 
			
		||||
          endFetchTime - startFetchTime
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
      const changes = {}
 | 
			
		||||
        const startProcessTime = Date.now()
 | 
			
		||||
 | 
			
		||||
      stream
 | 
			
		||||
        .pipe(ndjson.parse())
 | 
			
		||||
        .on('error', reject)
 | 
			
		||||
        .on('data', entry => {
 | 
			
		||||
          const date = new Date(Math.round(entry.timestamp / 1000000))
 | 
			
		||||
          const dayKey = `${date.getUTCFullYear()}/${date.getUTCMonth()}/${date.getUTCDate()}`
 | 
			
		||||
          const minuteKey = `${date.getUTCHours()}/${date.getUTCMinutes()}`
 | 
			
		||||
        return processLogs(stream).then(() => {
 | 
			
		||||
          const endProcessTime = Date.now()
 | 
			
		||||
 | 
			
		||||
          // Q: How many requests do we receive per minute?
 | 
			
		||||
          incrementKey(changes, `${dayKey}/totalRequests/${minuteKey}`, 1)
 | 
			
		||||
 | 
			
		||||
          // Q: How many requests do we receive to edge/cache/origin per minute?
 | 
			
		||||
          if (entry.origin) {
 | 
			
		||||
            incrementKey(changes, `${dayKey}/originRequests/${minuteKey}`)
 | 
			
		||||
          } else if (entry.cache) {
 | 
			
		||||
            incrementKey(changes, `${dayKey}/cacheRequests/${minuteKey}`)
 | 
			
		||||
          } else {
 | 
			
		||||
            incrementKey(changes, `${dayKey}/edgeRequests/${minuteKey}`)
 | 
			
		||||
          }
 | 
			
		||||
 | 
			
		||||
          const clientRequest = entry.clientRequest
 | 
			
		||||
 | 
			
		||||
          // Q: How many requests per day do we receive for a package?
 | 
			
		||||
          const uri = clientRequest.uri
 | 
			
		||||
          const package = getPackageName(parseURL(uri).pathname)
 | 
			
		||||
          if (package) {
 | 
			
		||||
            const key = `${dayKey}/packages/${hashKey(package)}`
 | 
			
		||||
 | 
			
		||||
            if (changes[key]) {
 | 
			
		||||
              changes[key].requests += 1
 | 
			
		||||
            } else {
 | 
			
		||||
              changes[key] = { package, requests: 1 }
 | 
			
		||||
            }
 | 
			
		||||
          }
 | 
			
		||||
 | 
			
		||||
          // Q: How many requests per day do we receive via each protocol?
 | 
			
		||||
          const protocol = clientRequest.httpProtocol
 | 
			
		||||
          if (protocol) {
 | 
			
		||||
            const key = `${dayKey}/protocols/${hashKey(protocol)}`
 | 
			
		||||
 | 
			
		||||
            if (changes[key]) {
 | 
			
		||||
              changes[key].requests += 1
 | 
			
		||||
            } else {
 | 
			
		||||
              changes[key] = { protocol, requests: 1 }
 | 
			
		||||
            }
 | 
			
		||||
          }
 | 
			
		||||
 | 
			
		||||
          // Q: How many requests per day do we receive from an origin?
 | 
			
		||||
          // const referer = clientRequest.referer
 | 
			
		||||
          // if (referer) {
 | 
			
		||||
          //   const url = parseURL(referer)
 | 
			
		||||
          //   const origin = formatURL({
 | 
			
		||||
          //     protocol: url.protocol,
 | 
			
		||||
          //     hostname: url.hostname
 | 
			
		||||
          //   })
 | 
			
		||||
          //
 | 
			
		||||
          //   const key = `${dayKey}/origins/${hashKey(origin)}`
 | 
			
		||||
          //
 | 
			
		||||
          //   if (changes[key]) {
 | 
			
		||||
          //     changes[key].requests += 1
 | 
			
		||||
          //   } else {
 | 
			
		||||
          //     changes[key] = { origin, requests: 1 }
 | 
			
		||||
          //   }
 | 
			
		||||
          // }
 | 
			
		||||
        })
 | 
			
		||||
        .on('end', () => {
 | 
			
		||||
          console.log(
 | 
			
		||||
            'FINISH ingesting logs for %s from %s to %s',
 | 
			
		||||
            'LOG: processed %ds worth of logs for %s in %dms',
 | 
			
		||||
            endSeconds - startSeconds,
 | 
			
		||||
            zone.name,
 | 
			
		||||
            stringifySeconds(startSeconds),
 | 
			
		||||
            stringifySeconds(endSeconds)
 | 
			
		||||
            endProcessTime - startProcessTime
 | 
			
		||||
          )
 | 
			
		||||
 | 
			
		||||
          // Record the changes.
 | 
			
		||||
          Object.keys(changes).forEach(key => {
 | 
			
		||||
            const ref = logsRef.child(key)
 | 
			
		||||
 | 
			
		||||
            ref.transaction(value => {
 | 
			
		||||
              if (typeof changes[key].requests === 'number') {
 | 
			
		||||
                // Nested value with a "requests" property.
 | 
			
		||||
                if (value && value.requests) {
 | 
			
		||||
                  value.requests += changes[key].requests
 | 
			
		||||
                  return value
 | 
			
		||||
                } else {
 | 
			
		||||
                  return changes[key]
 | 
			
		||||
                }
 | 
			
		||||
              } else {
 | 
			
		||||
                // Simple counter.
 | 
			
		||||
                return (value || 0) + changes[key]
 | 
			
		||||
              }
 | 
			
		||||
            })
 | 
			
		||||
          })
 | 
			
		||||
 | 
			
		||||
          resolve(changes)
 | 
			
		||||
        })
 | 
			
		||||
    })
 | 
			
		||||
      })
 | 
			
		||||
    )
 | 
			
		||||
  })
 | 
			
		||||
 | 
			
		||||
const startIngestingLogs = (zone) => {
 | 
			
		||||
  const startSecondsRef = logsRef.child(`nextStartSeconds/${zone.name.replace('.', '-')}`)
 | 
			
		||||
 | 
			
		||||
  let inProgress = false
 | 
			
		||||
const startZone = (zone) => {
 | 
			
		||||
  const startSecondsRef = db.ref(`logs/nextStartSeconds/${zone.name.replace('.', '-')}`)
 | 
			
		||||
 | 
			
		||||
  const takeATurn = () => {
 | 
			
		||||
    if (inProgress) {
 | 
			
		||||
      console.log(
 | 
			
		||||
        'Still ingesting logs for %s, waiting for another turn...',
 | 
			
		||||
        zone.name
 | 
			
		||||
      )
 | 
			
		||||
 | 
			
		||||
      return
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    inProgress = true
 | 
			
		||||
 | 
			
		||||
    startSecondsRef.once('value', snapshot => {
 | 
			
		||||
      let startSeconds = snapshot.val()
 | 
			
		||||
 | 
			
		||||
      const now = Date.now()
 | 
			
		||||
 | 
			
		||||
      // Cloudflare keeps logs around for 72 hours.
 | 
			
		||||
      // https://support.cloudflare.com/hc/en-us/articles/216672448-Enterprise-Log-Share-REST-API
 | 
			
		||||
      const minSeconds = toSeconds(now - oneHour * 72)
 | 
			
		||||
 | 
			
		||||
      if (startSeconds == null) {
 | 
			
		||||
        startSeconds = minSeconds
 | 
			
		||||
      } else if (startSeconds < minSeconds) {
 | 
			
		||||
        console.warn(
 | 
			
		||||
          'WARNING: dropping logs for %s from %s to %s!',
 | 
			
		||||
          'WARNING: dropped logs for %s from %s to %s!',
 | 
			
		||||
          zone.name,
 | 
			
		||||
          stringifySeconds(startSeconds),
 | 
			
		||||
          stringifySeconds(minSeconds)
 | 
			
		||||
@ -261,34 +281,44 @@ const startIngestingLogs = (zone) => {
 | 
			
		||||
        startSeconds = minSeconds
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (startSeconds < toSeconds(now - thirtyMinutes)) {
 | 
			
		||||
        const endSeconds = startSeconds + 10
 | 
			
		||||
      // The log for a request is typically available within thirty (30) minutes
 | 
			
		||||
      // of the request taking place under normal conditions. We deliver logs
 | 
			
		||||
      // ordered by the time that the logs were created, i.e. the timestamp of
 | 
			
		||||
      // the request when it was received by the edge. Given the order of
 | 
			
		||||
      // delivery, we recommend waiting a full thirty minutes to ingest a full
 | 
			
		||||
      // set of logs. This will help ensure that any congestion in the log
 | 
			
		||||
      // pipeline has passed and a full set of logs can be ingested.
 | 
			
		||||
      // https://support.cloudflare.com/hc/en-us/articles/216672448-Enterprise-Log-Share-REST-API
 | 
			
		||||
      const maxSeconds = toSeconds(now - thirtyMinutes)
 | 
			
		||||
 | 
			
		||||
      if (startSeconds < maxSeconds) {
 | 
			
		||||
        const endSeconds = startSeconds + LogWindowSeconds
 | 
			
		||||
 | 
			
		||||
        ingestLogs(zone, startSeconds, endSeconds).then(() => {
 | 
			
		||||
          startSecondsRef.set(endSeconds)
 | 
			
		||||
          inProgress = false
 | 
			
		||||
          setTimeout(takeATurn)
 | 
			
		||||
        }, error => {
 | 
			
		||||
          console.error(error.stack)
 | 
			
		||||
          process.exit(1)
 | 
			
		||||
        })
 | 
			
		||||
      } else {
 | 
			
		||||
        console.log(
 | 
			
		||||
          'Waiting for 30 minutes to pass before fetching logs for %s...',
 | 
			
		||||
          zone.name
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        inProgress = false
 | 
			
		||||
        setTimeout(takeATurn, (startSeconds - maxSeconds) * 1000)
 | 
			
		||||
      }
 | 
			
		||||
    })
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  takeATurn()
 | 
			
		||||
  setInterval(takeATurn, oneSecond * 3)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const domains = [
 | 
			
		||||
  //'npmcdn.com', // We don't have log data on npmcdn.com yet :/
 | 
			
		||||
  'unpkg.com'
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
Promise.all(domains.map(getZones)).then(results => {
 | 
			
		||||
Promise.all(DomainNames.map(getZones)).then(results => {
 | 
			
		||||
  const zones = results.reduce((memo, zones) => memo.concat(zones))
 | 
			
		||||
  zones.forEach(startIngestingLogs)
 | 
			
		||||
  zones.forEach(startZone)
 | 
			
		||||
})
 | 
			
		||||
 | 
			
		||||
//const getValues = (object) =>
 | 
			
		||||
//  Object.keys(object).map(key => object[key])
 | 
			
		||||
//
 | 
			
		||||
//db.ref('logs/2017/4/17/packages').orderByChild('requests').limitToLast(10).once('value', (snapshot) => {
 | 
			
		||||
//  const values = getValues(snapshot.val()).sort((a, b) => b.requests - a.requests)
 | 
			
		||||
//  console.log(values)
 | 
			
		||||
//})
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user