Simplify log ingestion timer

This commit is contained in:
Michael Jackson 2018-08-25 20:14:49 -07:00
parent 995780bcbb
commit fe4ccec4e2
4 changed files with 69 additions and 78 deletions

View File

@ -1,2 +1,2 @@
web: node server.js
ingest_logs: node modules/ingestLogs.js
ingest_logs: node modules/ingestLogsEveryMinute.js

View File

@ -30,7 +30,7 @@ services:
worker:
build: .
command: node_modules/.bin/nodemon --ignore modules/client modules/ingestLogs.js
command: node_modules/.bin/nodemon --ignore modules/client modules/ingestLogsEveryMinute.js
env_file: .env
environment:
- DATA_URL=redis://data:6379

View File

@ -1,6 +1,5 @@
const parseURL = require("url").parse;
const startOfDay = require("date-fns/start_of_day");
const startOfMinute = require("date-fns/start_of_minute");
const addDays = require("date-fns/add_days");
const db = require("./utils/data");
@ -18,15 +17,12 @@ const domainNames = [
//"npmcdn.com" // We don't have log data on npmcdn.com yet :/
];
/**
* The window of time to download in a single fetch.
*/
const logWindowSeconds = 30;
let cachedZones;
/**
* The minimum time to wait between fetches.
*/
const minInterval = 15000;
const oneSecond = 1000;
const oneMinute = oneSecond * 60;
const oneHour = oneMinute * 60;
const oneDay = oneHour * 24;
function getSeconds(date) {
return Math.floor(date.getTime() / 1000);
@ -40,11 +36,6 @@ function toSeconds(ms) {
return Math.floor(ms / 1000);
}
const oneSecond = 1000;
const oneMinute = oneSecond * 60;
const oneHour = oneMinute * 60;
const oneDay = oneHour * 24;
function computeCounters(stream) {
return new Promise((resolve, reject) => {
const counters = {};
@ -144,7 +135,10 @@ function processLogs(stream) {
);
}
function ingestLogs(zone, startSeconds, endSeconds) {
function ingestLogsForZone(zone, startDate, endDate) {
const startSeconds = toSeconds(startDate);
const endSeconds = toSeconds(endDate);
const startFetchTime = Date.now();
const fields = [
"EdgeStartTimestamp",
@ -186,66 +180,20 @@ function ingestLogs(zone, startSeconds, endSeconds) {
});
}
function startZone(zone) {
const suffix = zone.name.replace(".", "-");
const startSecondsKey = `ingestLogs-start-${suffix}`;
function takeATurn() {
const now = Date.now();
// Cloudflare keeps logs around for 7 days.
// https://support.cloudflare.com/hc/en-us/articles/216672448-Enterprise-Log-Share-Logpull-REST-API
const minSeconds = toSeconds(startOfMinute(now - oneDay * 5));
db.get(startSecondsKey, (error, value) => {
let startSeconds = value && parseInt(value, 10);
if (startSeconds == null) {
startSeconds = minSeconds;
} else if (startSeconds < minSeconds) {
console.warn(
"warning: Dropped logs for %s from %s to %s!",
zone.name,
stringifySeconds(startSeconds),
stringifySeconds(minSeconds)
function getZones(domainNames) {
return Promise.all(domainNames.map(CloudflareAPI.getZones)).then(results =>
results.reduce((memo, zones) => memo.concat(zones))
);
startSeconds = minSeconds;
}
const endSeconds = startSeconds + logWindowSeconds;
function ingestLogs(startDate, endDate) {
return Promise.resolve(cachedZones || getZones(domainNames)).then(zones => {
if (!cachedZones) cachedZones = zones;
// The log for a request is typically available within thirty (30) minutes
// of the request taking place under normal conditions. We deliver logs
// ordered by the time that the logs were created, i.e. the timestamp of
// the request when it was received by the edge. Given the order of
// delivery, we recommend waiting a full thirty minutes to ingest a full
// set of logs. This will help ensure that any congestion in the log
// pipeline has passed and a full set of logs can be ingested.
// https://support.cloudflare.com/hc/en-us/articles/216672448-Enterprise-Log-Share-REST-API
const maxSeconds = toSeconds(now - oneMinute * 30);
if (endSeconds < maxSeconds) {
ingestLogs(zone, startSeconds, endSeconds).then(
() => {
db.set(startSecondsKey, endSeconds);
setTimeout(takeATurn, minInterval);
},
error => {
console.error(error.stack);
process.exit(1);
}
return Promise.all(
zones.map(zone => ingestLogsForZone(zone, startDate, endDate))
);
} else {
setTimeout(takeATurn, (startSeconds - maxSeconds) * 1000);
}
});
}
takeATurn();
}
Promise.all(domainNames.map(CloudflareAPI.getZones)).then(results => {
const zones = results.reduce((memo, zones) => memo.concat(zones));
zones.forEach(startZone);
});
module.exports = ingestLogs;

View File

@ -0,0 +1,43 @@
const addMinutes = require("date-fns/add_minutes");
const startOfMinute = require("date-fns/start_of_minute");
const ingestLogs = require("./ingestLogs");
const oneSecond = 1000;
const oneMinute = oneSecond * 60;
let currentWorkload, timer;
function work() {
const now = Date.now();
// The log for a request is typically available within thirty (30) minutes
// of the request taking place under normal conditions. We deliver logs
// ordered by the time that the logs were created, i.e. the timestamp of
// the request when it was received by the edge. Given the order of
// delivery, we recommend waiting a full thirty minutes to ingest a full
// set of logs. This will help ensure that any congestion in the log
// pipeline has passed and a full set of logs can be ingested.
// https://support.cloudflare.com/hc/en-us/articles/216672448-Enterprise-Log-Share-REST-API
const start = startOfMinute(now - oneMinute * 31);
const end = addMinutes(start, 1);
currentWorkload = ingestLogs(start, end);
}
function shutdown() {
console.log("Shutting down...");
clearInterval(timer);
currentWorkload.then(() => {
console.log("Goodbye!");
process.exit();
});
}
work();
process.on("SIGINT", shutdown).on("SIGTERM", shutdown);
timer = setInterval(work, oneMinute);