Use regular functions instead of arrows

This commit is contained in:
MICHAEL JACKSON 2017-08-12 09:18:54 -07:00
parent ac26872341
commit c5340f4c53
2 changed files with 69 additions and 49 deletions

View File

@ -47,13 +47,15 @@ function getLogs(zoneId, startTime, endTime) {
}) })
} }
const toSeconds = (millis) => function toSeconds(millis) {
Math.floor(millis / 1000) return Math.floor(millis / 1000)
}
const stringifySeconds = (seconds) => function stringifySeconds(seconds) {
new Date(seconds * 1000).toISOString() return new Date(seconds * 1000).toISOString()
}
const getPackageName = (pathname) => { function getPackageName(pathname) {
const parsed = PackageURL.parse(pathname) const parsed = PackageURL.parse(pathname)
return parsed && parsed.packageName return parsed && parsed.packageName
} }
@ -62,18 +64,20 @@ const oneSecond = 1000
const oneMinute = oneSecond * 60 const oneMinute = oneSecond * 60
const oneHour = oneMinute * 60 const oneHour = oneMinute * 60
const getSeconds = (date) => function getSeconds(date) {
Math.floor(date.getTime() / 1000) return Math.floor(date.getTime() / 1000)
}
const computeCounters = (stream) => function computeCounters(stream) {
new Promise((resolve, reject) => { return new Promise(function (resolve, reject) {
const counters = {} const counters = {}
const expireat = {} const expireat = {}
const incrCounter = (counterName, by = 1) => function incrCounter(counterName, by = 1) {
counters[counterName] = (counters[counterName] || 0) + by counters[counterName] = (counters[counterName] || 0) + by
}
const incrCounterMember = (counterName, member, by = 1) => { function incrCounterMember(counterName, member, by = 1) {
counters[counterName] = counters[counterName] || {} counters[counterName] = counters[counterName] || {}
counters[counterName][member] = (counters[counterName][member] || 0) + by counters[counterName][member] = (counters[counterName][member] || 0) + by
} }
@ -81,7 +85,7 @@ const computeCounters = (stream) =>
stream stream
.pipe(ndjson.parse()) .pipe(ndjson.parse())
.on('error', reject) .on('error', reject)
.on('data', entry => { .on('data', function (entry) {
const date = new Date(Math.round(entry.timestamp / 1000000)) const date = new Date(Math.round(entry.timestamp / 1000000))
const nextDay = startOfDay(addDays(date, 1)) const nextDay = startOfDay(addDays(date, 1))
const thirtyDaysLater = getSeconds(addDays(nextDay, 30)) const thirtyDaysLater = getSeconds(addDays(nextDay, 30))
@ -147,14 +151,15 @@ const computeCounters = (stream) =>
expireat[`stats-hostnameBytes-${dayKey}`] = thirtyDaysLater expireat[`stats-hostnameBytes-${dayKey}`] = thirtyDaysLater
} }
}) })
.on('end', () => { .on('end', function () {
resolve({ counters, expireat }) resolve({ counters, expireat })
}) })
}) })
}
const processLogs = (stream) => function processLogs(stream) {
computeCounters(stream).then(({ counters, expireat }) => { return computeCounters(stream).then(function ({ counters, expireat }) {
Object.keys(counters).forEach(key => { Object.keys(counters).forEach(function (key) {
const value = counters[key] const value = counters[key]
if (typeof value === 'number') { if (typeof value === 'number') {
@ -162,7 +167,7 @@ const processLogs = (stream) =>
db.incrby(key, value) db.incrby(key, value)
} else { } else {
// Sorted set. // Sorted set.
Object.keys(value).forEach(member => { Object.keys(value).forEach(function (member) {
db.zincrby(key, value[member], member) db.zincrby(key, value[member], member)
}) })
} }
@ -171,9 +176,10 @@ const processLogs = (stream) =>
db.expireat(key, expireat[key]) db.expireat(key, expireat[key])
}) })
}) })
}
const ingestLogs = (zone, startSeconds, endSeconds) => function ingestLogs(zone, startSeconds, endSeconds) {
new Promise(resolve => { return new Promise(function (resolve) {
console.log( console.log(
'info: Started ingesting logs for %s from %s to %s', 'info: Started ingesting logs for %s from %s to %s',
zone.name, zone.name,
@ -184,7 +190,7 @@ const ingestLogs = (zone, startSeconds, endSeconds) =>
const startFetchTime = Date.now() const startFetchTime = Date.now()
resolve( resolve(
getLogs(zone.id, startSeconds, endSeconds).then(stream => { getLogs(zone.id, startSeconds, endSeconds).then(function (stream) {
const endFetchTime = Date.now() const endFetchTime = Date.now()
console.log( console.log(
@ -196,7 +202,7 @@ const ingestLogs = (zone, startSeconds, endSeconds) =>
const startProcessTime = Date.now() const startProcessTime = Date.now()
return processLogs(stream).then(() => { return processLogs(stream).then(function () {
const endProcessTime = Date.now() const endProcessTime = Date.now()
console.log( console.log(
@ -209,12 +215,13 @@ const ingestLogs = (zone, startSeconds, endSeconds) =>
}) })
) )
}) })
}
const startZone = (zone) => { function startZone(zone) {
const startSecondsKey = `ingestLogsWorker-nextStartSeconds-${zone.name.replace('.', '-')}` const startSecondsKey = `ingestLogsWorker-nextStartSeconds-${zone.name.replace('.', '-')}`
const takeATurn = () => { function takeATurn() {
db.get(startSecondsKey, (error, value) => { db.get(startSecondsKey, function (error, value) {
let startSeconds = value && parseInt(value, 10) let startSeconds = value && parseInt(value, 10)
const now = Date.now() const now = Date.now()
@ -249,10 +256,10 @@ const startZone = (zone) => {
if (startSeconds < maxSeconds) { if (startSeconds < maxSeconds) {
const endSeconds = startSeconds + LogWindowSeconds const endSeconds = startSeconds + LogWindowSeconds
ingestLogs(zone, startSeconds, endSeconds).then(() => { ingestLogs(zone, startSeconds, endSeconds).then(function () {
db.set(startSecondsKey, endSeconds) db.set(startSecondsKey, endSeconds)
setTimeout(takeATurn) setTimeout(takeATurn)
}, error => { }, function (error) {
console.error(error.stack) console.error(error.stack)
process.exit(1) process.exit(1)
}) })
@ -265,7 +272,10 @@ const startZone = (zone) => {
takeATurn() takeATurn()
} }
Promise.all(DomainNames.map(getZones)).then(results => { Promise.all(DomainNames.map(getZones)).then(function (results) {
const zones = results.reduce((memo, zones) => memo.concat(zones)) const zones = results.reduce(function (memo, zones) {
return memo.concat(zones)
})
zones.forEach(startZone) zones.forEach(startZone)
}) })

View File

@ -59,8 +59,8 @@ const reduceResults = (memo, results) => {
return memo return memo
} }
const ingestStatsForZones = (zones, since, processDashboard) => function ingestStatsForZones(zones, since, processDashboard) {
new Promise(resolve => { return new Promise(resolve => {
const zoneNames = zones.map(zone => zone.name).join(', ') const zoneNames = zones.map(zone => zone.name).join(', ')
console.log( console.log(
@ -120,15 +120,18 @@ const ingestStatsForZones = (zones, since, processDashboard) =>
) )
) )
}) })
}
const ingestPerDayStats = (zones) => function ingestPerDayStats(zones) {
ingestStatsForZones(zones, -10080, processPerDayDashboard) return ingestStatsForZones(zones, -10080, processPerDayDashboard)
}
const processPerDayDashboard = (dashboard) => function processPerDayDashboard(dashboard) {
Promise.all(dashboard.timeseries.map(processPerDayTimeseries)) return Promise.all(dashboard.timeseries.map(processPerDayTimeseries))
}
const processPerDayTimeseries = (ts) => function processPerDayTimeseries(ts) {
new Promise(resolve => { return new Promise(resolve => {
const since = new Date(ts.since) const since = new Date(ts.since)
const until = new Date(ts.until) const until = new Date(ts.until)
@ -199,15 +202,18 @@ const processPerDayTimeseries = (ts) =>
resolve() resolve()
}) })
}
const ingestPerHourStats = (zones) => function ingestPerHourStats(zones) {
ingestStatsForZones(zones, -1440, processPerHourDashboard) return ingestStatsForZones(zones, -1440, processPerHourDashboard)
}
const processPerHourDashboard = (dashboard) => function processPerHourDashboard(dashboard) {
Promise.all(dashboard.timeseries.map(processPerHourTimeseries)) return Promise.all(dashboard.timeseries.map(processPerHourTimeseries))
}
const processPerHourTimeseries = (ts) => function processPerHourTimeseries(ts) {
new Promise(resolve => { return new Promise(resolve => {
const since = new Date(ts.since) const since = new Date(ts.since)
const until = new Date(ts.until) const until = new Date(ts.until)
@ -237,15 +243,18 @@ const processPerHourTimeseries = (ts) =>
resolve() resolve()
}) })
}
const ingestPerMinuteStats = (zones) => function ingestPerMinuteStats(zones) {
ingestStatsForZones(zones, -30, processPerMinuteDashboard) return ingestStatsForZones(zones, -30, processPerMinuteDashboard)
}
const processPerMinuteDashboard = (dashboard) => function processPerMinuteDashboard(dashboard) {
Promise.all(dashboard.timeseries.map(processPerMinuteTimeseries)) return Promise.all(dashboard.timeseries.map(processPerMinuteTimeseries))
}
const processPerMinuteTimeseries = (ts) => function processPerMinuteTimeseries(ts) {
new Promise(resolve => { return new Promise(resolve => {
const since = new Date(ts.since) const since = new Date(ts.since)
const until = new Date(ts.until) const until = new Date(ts.until)
@ -275,8 +284,9 @@ const processPerMinuteTimeseries = (ts) =>
resolve() resolve()
}) })
}
const startZones = (zones) => { function startZones(zones) {
const takePerMinuteTurn = () => const takePerMinuteTurn = () =>
ingestPerMinuteStats(zones) ingestPerMinuteStats(zones)