diff --git a/server/IngestLogsWorker.js b/server/IngestLogsWorker.js index cfe00b7..f2f1b09 100644 --- a/server/IngestLogsWorker.js +++ b/server/IngestLogsWorker.js @@ -47,13 +47,15 @@ function getLogs(zoneId, startTime, endTime) { }) } -const toSeconds = (millis) => - Math.floor(millis / 1000) +function toSeconds(millis) { + return Math.floor(millis / 1000) +} -const stringifySeconds = (seconds) => - new Date(seconds * 1000).toISOString() +function stringifySeconds(seconds) { + return new Date(seconds * 1000).toISOString() +} -const getPackageName = (pathname) => { +function getPackageName(pathname) { const parsed = PackageURL.parse(pathname) return parsed && parsed.packageName } @@ -62,18 +64,20 @@ const oneSecond = 1000 const oneMinute = oneSecond * 60 const oneHour = oneMinute * 60 -const getSeconds = (date) => - Math.floor(date.getTime() / 1000) +function getSeconds(date) { + return Math.floor(date.getTime() / 1000) +} -const computeCounters = (stream) => - new Promise((resolve, reject) => { +function computeCounters(stream) { + return new Promise(function (resolve, reject) { const counters = {} const expireat = {} - const incrCounter = (counterName, by = 1) => + function incrCounter(counterName, by = 1) { counters[counterName] = (counters[counterName] || 0) + by + } - const incrCounterMember = (counterName, member, by = 1) => { + function incrCounterMember(counterName, member, by = 1) { counters[counterName] = counters[counterName] || {} counters[counterName][member] = (counters[counterName][member] || 0) + by } @@ -81,7 +85,7 @@ const computeCounters = (stream) => stream .pipe(ndjson.parse()) .on('error', reject) - .on('data', entry => { + .on('data', function (entry) { const date = new Date(Math.round(entry.timestamp / 1000000)) const nextDay = startOfDay(addDays(date, 1)) const thirtyDaysLater = getSeconds(addDays(nextDay, 30)) @@ -147,14 +151,15 @@ const computeCounters = (stream) => expireat[`stats-hostnameBytes-${dayKey}`] = thirtyDaysLater } }) - .on('end', () => { + .on('end', function () { resolve({ counters, expireat }) }) }) +} -const processLogs = (stream) => - computeCounters(stream).then(({ counters, expireat }) => { - Object.keys(counters).forEach(key => { +function processLogs(stream) { + return computeCounters(stream).then(function ({ counters, expireat }) { + Object.keys(counters).forEach(function (key) { const value = counters[key] if (typeof value === 'number') { @@ -162,7 +167,7 @@ const processLogs = (stream) => db.incrby(key, value) } else { // Sorted set. - Object.keys(value).forEach(member => { + Object.keys(value).forEach(function (member) { db.zincrby(key, value[member], member) }) } @@ -171,9 +176,10 @@ const processLogs = (stream) => db.expireat(key, expireat[key]) }) }) +} -const ingestLogs = (zone, startSeconds, endSeconds) => - new Promise(resolve => { +function ingestLogs(zone, startSeconds, endSeconds) { + return new Promise(function (resolve) { console.log( 'info: Started ingesting logs for %s from %s to %s', zone.name, @@ -184,7 +190,7 @@ const ingestLogs = (zone, startSeconds, endSeconds) => const startFetchTime = Date.now() resolve( - getLogs(zone.id, startSeconds, endSeconds).then(stream => { + getLogs(zone.id, startSeconds, endSeconds).then(function (stream) { const endFetchTime = Date.now() console.log( @@ -196,7 +202,7 @@ const ingestLogs = (zone, startSeconds, endSeconds) => const startProcessTime = Date.now() - return processLogs(stream).then(() => { + return processLogs(stream).then(function () { const endProcessTime = Date.now() console.log( @@ -209,12 +215,13 @@ const ingestLogs = (zone, startSeconds, endSeconds) => }) ) }) +} -const startZone = (zone) => { +function startZone(zone) { const startSecondsKey = `ingestLogsWorker-nextStartSeconds-${zone.name.replace('.', '-')}` - const takeATurn = () => { - db.get(startSecondsKey, (error, value) => { + function takeATurn() { + db.get(startSecondsKey, function (error, value) { let startSeconds = value && parseInt(value, 10) const now = Date.now() @@ -249,10 +256,10 @@ const startZone = (zone) => { if (startSeconds < maxSeconds) { const endSeconds = startSeconds + LogWindowSeconds - ingestLogs(zone, startSeconds, endSeconds).then(() => { + ingestLogs(zone, startSeconds, endSeconds).then(function () { db.set(startSecondsKey, endSeconds) setTimeout(takeATurn) - }, error => { + }, function (error) { console.error(error.stack) process.exit(1) }) @@ -265,7 +272,10 @@ const startZone = (zone) => { takeATurn() } -Promise.all(DomainNames.map(getZones)).then(results => { - const zones = results.reduce((memo, zones) => memo.concat(zones)) +Promise.all(DomainNames.map(getZones)).then(function (results) { + const zones = results.reduce(function (memo, zones) { + return memo.concat(zones) + }) + zones.forEach(startZone) }) diff --git a/server/IngestStatsWorker.js b/server/IngestStatsWorker.js index ba71a86..c63e3c8 100644 --- a/server/IngestStatsWorker.js +++ b/server/IngestStatsWorker.js @@ -59,8 +59,8 @@ const reduceResults = (memo, results) => { return memo } -const ingestStatsForZones = (zones, since, processDashboard) => - new Promise(resolve => { +function ingestStatsForZones(zones, since, processDashboard) { + return new Promise(resolve => { const zoneNames = zones.map(zone => zone.name).join(', ') console.log( @@ -120,15 +120,18 @@ const ingestStatsForZones = (zones, since, processDashboard) => ) ) }) +} -const ingestPerDayStats = (zones) => - ingestStatsForZones(zones, -10080, processPerDayDashboard) +function ingestPerDayStats(zones) { + return ingestStatsForZones(zones, -10080, processPerDayDashboard) +} -const processPerDayDashboard = (dashboard) => - Promise.all(dashboard.timeseries.map(processPerDayTimeseries)) +function processPerDayDashboard(dashboard) { + return Promise.all(dashboard.timeseries.map(processPerDayTimeseries)) +} -const processPerDayTimeseries = (ts) => - new Promise(resolve => { +function processPerDayTimeseries(ts) { + return new Promise(resolve => { const since = new Date(ts.since) const until = new Date(ts.until) @@ -199,15 +202,18 @@ const processPerDayTimeseries = (ts) => resolve() }) +} -const ingestPerHourStats = (zones) => - ingestStatsForZones(zones, -1440, processPerHourDashboard) +function ingestPerHourStats(zones) { + return ingestStatsForZones(zones, -1440, processPerHourDashboard) +} -const processPerHourDashboard = (dashboard) => - Promise.all(dashboard.timeseries.map(processPerHourTimeseries)) +function processPerHourDashboard(dashboard) { + return Promise.all(dashboard.timeseries.map(processPerHourTimeseries)) +} -const processPerHourTimeseries = (ts) => - new Promise(resolve => { +function processPerHourTimeseries(ts) { + return new Promise(resolve => { const since = new Date(ts.since) const until = new Date(ts.until) @@ -237,15 +243,18 @@ const processPerHourTimeseries = (ts) => resolve() }) +} -const ingestPerMinuteStats = (zones) => - ingestStatsForZones(zones, -30, processPerMinuteDashboard) +function ingestPerMinuteStats(zones) { + return ingestStatsForZones(zones, -30, processPerMinuteDashboard) +} -const processPerMinuteDashboard = (dashboard) => - Promise.all(dashboard.timeseries.map(processPerMinuteTimeseries)) +function processPerMinuteDashboard(dashboard) { + return Promise.all(dashboard.timeseries.map(processPerMinuteTimeseries)) +} -const processPerMinuteTimeseries = (ts) => - new Promise(resolve => { +function processPerMinuteTimeseries(ts) { + return new Promise(resolve => { const since = new Date(ts.since) const until = new Date(ts.until) @@ -275,8 +284,9 @@ const processPerMinuteTimeseries = (ts) => resolve() }) +} -const startZones = (zones) => { +function startZones(zones) { const takePerMinuteTurn = () => ingestPerMinuteStats(zones)