Add log ingestion worker
This commit is contained in:
		@ -14,8 +14,13 @@
 | 
			
		||||
    "date-fns": "^1.28.1",
 | 
			
		||||
    "express": "^4.15.2",
 | 
			
		||||
    "express-unpkg": "^2.0.1",
 | 
			
		||||
    "firebase-admin": "^4.2.1",
 | 
			
		||||
    "gunzip-maybe": "^1.4.0",
 | 
			
		||||
    "http-client": "^4.3.1",
 | 
			
		||||
    "invariant": "^2.2.2",
 | 
			
		||||
    "isomorphic-fetch": "^2.2.1",
 | 
			
		||||
    "morgan": "^1.8.1",
 | 
			
		||||
    "ndjson": "^1.5.0",
 | 
			
		||||
    "pretty-bytes": "^3",
 | 
			
		||||
    "prop-types": "^15.5.8",
 | 
			
		||||
    "raven": "^1.2.1",
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										294
									
								
								workers/ingest-logs.js
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										294
									
								
								workers/ingest-logs.js
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,294 @@
 | 
			
		||||
require('isomorphic-fetch')
 | 
			
		||||
const parseURL = require('url').parse
 | 
			
		||||
const formatURL = require('url').format
 | 
			
		||||
const crypto = require('crypto')
 | 
			
		||||
const invariant = require('invariant')
 | 
			
		||||
const admin = require('firebase-admin')
 | 
			
		||||
const gunzip = require('gunzip-maybe')
 | 
			
		||||
const ndjson = require('ndjson')
 | 
			
		||||
 | 
			
		||||
const CloudflareEmail = process.env.CLOUDFLARE_EMAIL
 | 
			
		||||
const CloudflareKey = process.env.CLOUDFLARE_KEY
 | 
			
		||||
const FirebaseURL = process.env.FIREBASE_URL
 | 
			
		||||
const FirebaseAccount = process.env.FIREBASE_ACCOUNT
 | 
			
		||||
 | 
			
		||||
invariant(
 | 
			
		||||
  CloudflareEmail,
 | 
			
		||||
  'Missing the $CLOUDFLARE_EMAIL environment variable'
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
invariant(
 | 
			
		||||
  CloudflareKey,
 | 
			
		||||
  'Missing the $CLOUDFLARE_KEY environment variable'
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
invariant(
 | 
			
		||||
  FirebaseURL,
 | 
			
		||||
  'Missing the $FIREBASE_URL environment variable'
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
invariant(
 | 
			
		||||
  FirebaseAccount,
 | 
			
		||||
  'Missing the $FIREBASE_ACCOUNT environment variable'
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
Stuff we wanna show on the website:
 | 
			
		||||
 | 
			
		||||
- Most popular packages
 | 
			
		||||
- Protocol usage (HTTP/1.1 vs HTTP/2)
 | 
			
		||||
- Requests per minute
 | 
			
		||||
- Requests per day/week/month (aggregate)
 | 
			
		||||
- Edge/cache/origin hit rates
 | 
			
		||||
- Browser usage
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
const serviceAccount = JSON.parse(FirebaseAccount)
 | 
			
		||||
 | 
			
		||||
admin.initializeApp({
 | 
			
		||||
  credential: admin.credential.cert(serviceAccount),
 | 
			
		||||
  databaseURL: FirebaseURL,
 | 
			
		||||
  databaseAuthVariableOverride: {
 | 
			
		||||
    uid: 'ingest-logs-worker'
 | 
			
		||||
  }
 | 
			
		||||
})
 | 
			
		||||
 | 
			
		||||
const db = admin.database()
 | 
			
		||||
const logsRef = db.ref('logs')
 | 
			
		||||
 | 
			
		||||
const getZones = (domain) =>
 | 
			
		||||
  fetch(`https://api.cloudflare.com/client/v4/zones?name=${domain}`, {
 | 
			
		||||
    method: 'GET',
 | 
			
		||||
    headers: {
 | 
			
		||||
      'X-Auth-Email': CloudflareEmail,
 | 
			
		||||
      'X-Auth-Key': CloudflareKey
 | 
			
		||||
    }
 | 
			
		||||
  }).then(res => res.json())
 | 
			
		||||
    .then(data => data.result)
 | 
			
		||||
 | 
			
		||||
const getLogs = (zoneId, startTime, endTime) =>
 | 
			
		||||
  fetch(`https://api.cloudflare.com/client/v4/zones/${zoneId}/logs/requests?start=${startTime}&end=${endTime}`, {
 | 
			
		||||
    method: 'GET',
 | 
			
		||||
    headers: {
 | 
			
		||||
      'X-Auth-Email': CloudflareEmail,
 | 
			
		||||
      'X-Auth-Key': CloudflareKey,
 | 
			
		||||
      'Accept-Encoding': 'gzip'
 | 
			
		||||
    }
 | 
			
		||||
  }).then(res => res.body.pipe(gunzip()))
 | 
			
		||||
 | 
			
		||||
const incrementKey = (object, key, n = 1) =>
 | 
			
		||||
  object[key] = (object[key] || 0) + n
 | 
			
		||||
 | 
			
		||||
const toSeconds = (millis) =>
 | 
			
		||||
  Math.floor(millis / 1000)
 | 
			
		||||
 | 
			
		||||
const stringifySeconds = (seconds) =>
 | 
			
		||||
  new Date(seconds * 1000).toGMTString()
 | 
			
		||||
 | 
			
		||||
const hashKey = (key) =>
 | 
			
		||||
  crypto.createHash('sha1').update(key).digest('hex')
 | 
			
		||||
 | 
			
		||||
// TODO: Copied from express-unpkg, use the same function
 | 
			
		||||
const URLFormat = /^\/((?:@[^\/@]+\/)?[^\/@]+)(?:@([^\/]+))?(\/.*)?$/
 | 
			
		||||
 | 
			
		||||
const getPackageName = (pathname) => {
 | 
			
		||||
  const match = URLFormat.exec(pathname)
 | 
			
		||||
  return match && match[1]
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const oneSecond = 1000
 | 
			
		||||
const oneMinute = oneSecond * 60
 | 
			
		||||
const thirtyMinutes = oneMinute * 30
 | 
			
		||||
const oneHour = oneMinute * 60
 | 
			
		||||
 | 
			
		||||
const ingestLogs = (zone, startSeconds, endSeconds) =>
 | 
			
		||||
  new Promise((resolve, reject) => {
 | 
			
		||||
    console.log(
 | 
			
		||||
      'START ingesting logs for %s from %s to %s',
 | 
			
		||||
      zone.name,
 | 
			
		||||
      stringifySeconds(startSeconds),
 | 
			
		||||
      stringifySeconds(endSeconds)
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    const startFetchTime = Date.now()
 | 
			
		||||
 | 
			
		||||
    getLogs(zone.id, startSeconds, endSeconds).then(stream => {
 | 
			
		||||
      const endFetchTime = Date.now()
 | 
			
		||||
 | 
			
		||||
      console.log(
 | 
			
		||||
        'Fetched %ds worth of logs for %s at %s in %dms',
 | 
			
		||||
        endSeconds - startSeconds,
 | 
			
		||||
        zone.name,
 | 
			
		||||
        stringifySeconds(startSeconds),
 | 
			
		||||
        endFetchTime - startFetchTime
 | 
			
		||||
      )
 | 
			
		||||
 | 
			
		||||
      const changes = {}
 | 
			
		||||
 | 
			
		||||
      stream
 | 
			
		||||
        .pipe(ndjson.parse())
 | 
			
		||||
        .on('error', reject)
 | 
			
		||||
        .on('data', entry => {
 | 
			
		||||
          const date = new Date(Math.round(entry.timestamp / 1000000))
 | 
			
		||||
          const dayKey = `${date.getUTCFullYear()}/${date.getUTCMonth()}/${date.getUTCDate()}`
 | 
			
		||||
          const minuteKey = `${date.getUTCHours()}/${date.getUTCMinutes()}`
 | 
			
		||||
 | 
			
		||||
          // Q: How many requests do we receive per day?
 | 
			
		||||
          incrementKey(changes, `${dayKey}/totalRequests/${minuteKey}`, 1)
 | 
			
		||||
 | 
			
		||||
          // Q: How many requests do we receive to edge/cache/origin per minute?
 | 
			
		||||
          if (entry.origin) {
 | 
			
		||||
            incrementKey(changes, `${dayKey}/originRequests/${minuteKey}`)
 | 
			
		||||
          } else if (entry.cache) {
 | 
			
		||||
            incrementKey(changes, `${dayKey}/cacheRequests/${minuteKey}`)
 | 
			
		||||
          } else {
 | 
			
		||||
            incrementKey(changes, `${dayKey}/edgeRequests/${minuteKey}`)
 | 
			
		||||
          }
 | 
			
		||||
 | 
			
		||||
          const clientRequest = entry.clientRequest
 | 
			
		||||
 | 
			
		||||
          // Q: How many requests per day do we receive for a package?
 | 
			
		||||
          const uri = clientRequest.uri
 | 
			
		||||
          const package = getPackageName(parseURL(uri).pathname)
 | 
			
		||||
          if (package) {
 | 
			
		||||
            const key = `${dayKey}/packages/${hashKey(package)}`
 | 
			
		||||
 | 
			
		||||
            if (changes[key]) {
 | 
			
		||||
              changes[key].requests += 1
 | 
			
		||||
            } else {
 | 
			
		||||
              changes[key] = { package, requests: 1 }
 | 
			
		||||
            }
 | 
			
		||||
          }
 | 
			
		||||
 | 
			
		||||
          // Q: How many requests per day do we receive via each protocol?
 | 
			
		||||
          const protocol = clientRequest.httpProtocol
 | 
			
		||||
          if (protocol) {
 | 
			
		||||
            const key = `${dayKey}/protocols/${hashKey(protocol)}`
 | 
			
		||||
 | 
			
		||||
            if (changes[key]) {
 | 
			
		||||
              changes[key].requests += 1
 | 
			
		||||
            } else {
 | 
			
		||||
              changes[key] = { protocol, requests: 1 }
 | 
			
		||||
            }
 | 
			
		||||
          }
 | 
			
		||||
 | 
			
		||||
          // Q: How many requests per day do we receive from an origin?
 | 
			
		||||
          // const referer = clientRequest.referer
 | 
			
		||||
          // if (referer) {
 | 
			
		||||
          //   const url = parseURL(referer)
 | 
			
		||||
          //   const origin = formatURL({
 | 
			
		||||
          //     protocol: url.protocol,
 | 
			
		||||
          //     hostname: url.hostname
 | 
			
		||||
          //   })
 | 
			
		||||
          //
 | 
			
		||||
          //   const key = `${dayKey}/origins/${hashKey(origin)}`
 | 
			
		||||
          //
 | 
			
		||||
          //   if (changes[key]) {
 | 
			
		||||
          //     changes[key].requests += 1
 | 
			
		||||
          //   } else {
 | 
			
		||||
          //     changes[key] = { origin, requests: 1 }
 | 
			
		||||
          //   }
 | 
			
		||||
          // }
 | 
			
		||||
        })
 | 
			
		||||
        .on('end', () => {
 | 
			
		||||
          console.log(
 | 
			
		||||
            'FINISH ingesting logs for %s from %s to %s',
 | 
			
		||||
            zone.name,
 | 
			
		||||
            stringifySeconds(startSeconds),
 | 
			
		||||
            stringifySeconds(endSeconds)
 | 
			
		||||
          )
 | 
			
		||||
 | 
			
		||||
          // Record the changes.
 | 
			
		||||
          Object.keys(changes).forEach(key => {
 | 
			
		||||
            const ref = logsRef.child(key)
 | 
			
		||||
 | 
			
		||||
            ref.transaction(value => {
 | 
			
		||||
              if (typeof changes[key].requests === 'number') {
 | 
			
		||||
                // Nested value with a "requests" property.
 | 
			
		||||
                if (value && value.requests) {
 | 
			
		||||
                  value.requests += changes[key].requests
 | 
			
		||||
                  return value
 | 
			
		||||
                } else {
 | 
			
		||||
                  return changes[key]
 | 
			
		||||
                }
 | 
			
		||||
              } else {
 | 
			
		||||
                // Simple counter.
 | 
			
		||||
                return (value || 0) + changes[key]
 | 
			
		||||
              }
 | 
			
		||||
            })
 | 
			
		||||
          })
 | 
			
		||||
 | 
			
		||||
          resolve(changes)
 | 
			
		||||
        })
 | 
			
		||||
    })
 | 
			
		||||
  })
 | 
			
		||||
 | 
			
		||||
const startIngestingLogs = (zone) => {
 | 
			
		||||
  const startSecondsRef = logsRef.child(`nextStartSeconds/${zone.name.replace('.', '-')}`)
 | 
			
		||||
 | 
			
		||||
  let inProgress = false
 | 
			
		||||
 | 
			
		||||
  const takeATurn = () => {
 | 
			
		||||
    if (inProgress) {
 | 
			
		||||
      console.log(
 | 
			
		||||
        'Still ingesting logs for %s, waiting for another turn...',
 | 
			
		||||
        zone.name
 | 
			
		||||
      )
 | 
			
		||||
 | 
			
		||||
      return
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    inProgress = true
 | 
			
		||||
 | 
			
		||||
    startSecondsRef.once('value', snapshot => {
 | 
			
		||||
      let startSeconds = snapshot.val()
 | 
			
		||||
 | 
			
		||||
      const now = Date.now()
 | 
			
		||||
 | 
			
		||||
      // Cloudflare keeps logs around for 72 hours.
 | 
			
		||||
      const minSeconds = toSeconds(now - oneHour * 72)
 | 
			
		||||
 | 
			
		||||
      if (startSeconds == null) {
 | 
			
		||||
        startSeconds = minSeconds
 | 
			
		||||
      } else if (startSeconds < minSeconds) {
 | 
			
		||||
        console.warn(
 | 
			
		||||
          'WARNING: dropping logs for %s from %s to %s!',
 | 
			
		||||
          zone.name,
 | 
			
		||||
          stringifySeconds(startSeconds),
 | 
			
		||||
          stringifySeconds(minSeconds)
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        startSeconds = minSeconds
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (startSeconds < toSeconds(now - thirtyMinutes)) {
 | 
			
		||||
        const endSeconds = startSeconds + 10
 | 
			
		||||
 | 
			
		||||
        ingestLogs(zone, startSeconds, endSeconds).then(() => {
 | 
			
		||||
          startSecondsRef.set(endSeconds)
 | 
			
		||||
          inProgress = false
 | 
			
		||||
        })
 | 
			
		||||
      } else {
 | 
			
		||||
        console.log(
 | 
			
		||||
          'Waiting for 30 minutes to pass before fetching logs for %s...',
 | 
			
		||||
          zone.name
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        inProgress = false
 | 
			
		||||
      }
 | 
			
		||||
    })
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  takeATurn()
 | 
			
		||||
  setInterval(takeATurn, oneSecond * 3)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const domains = [
 | 
			
		||||
  //'npmcdn.com', // We don't have log data on npmcdn.com yet :/
 | 
			
		||||
  'unpkg.com'
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
Promise.all(domains.map(getZones)).then(results => {
 | 
			
		||||
  const zones = results.reduce((memo, zones) => memo.concat(zones))
 | 
			
		||||
  zones.forEach(startIngestingLogs)
 | 
			
		||||
})
 | 
			
		||||
		Reference in New Issue
	
	Block a user