Skip to content

Commit a4aa1b9

Browse files
authored
Merge pull request #51 from DEFRA/release/2.1.0
Release/2.1.0
2 parents a1b3688 + a064eea commit a4aa1b9

26 files changed

+6217
-11721
lines changed

.github/dependabot.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
version: 2
2+
updates:
3+
- package-ecosystem: npm
4+
directory: "/"
5+
schedule:
6+
interval: weekly
7+
open-pull-requests-limit: 20
8+
versioning-strategy: increase
9+
target-branch: development

config/dtsProcessEventRule.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"Description": "Event rule to schedule the dtsProcess lambda execution",
3+
"Name": "{PLACEHOLDER}",
4+
"RoleArn": "{PLACEHOLDER}",
5+
"ScheduleExpression": "{PLACEHOLDER}",
6+
"State": "ENABLED"
7+
}

lib/constants.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
module.exports = {
2+
HTTP_BAD_REQUEST: 400,
3+
HTTP_NOT_FOUND: 404,
4+
HTTP_TOO_MANY_REQUESTS: 429,
5+
INTERNAL_SERVER_ERROR: 500
6+
}

lib/functions/dts-process.js

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
const logger = require('../helpers/logging')
2+
const pg = require('../helpers/db')
3+
const invokeLambda = require('../helpers/invoke-lambda')
4+
const { deleteStation, getRloiIds, getStationData, validateStationData } = require('../helpers/imtd-api')
5+
6+
async function insertStation (stationDataArray) {
7+
try {
8+
await pg.transaction(async trx => {
9+
await Promise.all(stationDataArray.map(async (stationData) => {
10+
const stationID = stationData.station_id
11+
await trx('station_display_time_series').where({ station_id: stationID }).delete()
12+
await trx('station_display_time_series').insert(stationData)
13+
logger.info(`Processed displayTimeSeries for RLOI id ${stationID}`)
14+
}))
15+
})
16+
} catch (error) {
17+
logger.error('Database error processing stationData', error)
18+
throw error
19+
}
20+
}
21+
22+
async function getData (stationId) {
23+
try {
24+
const stationData = await getStationData(stationId)
25+
if (stationData.length === 0) {
26+
(console.log('Deleting station: ', stationId))
27+
const tableName = 'station_display_time_series'
28+
await deleteStation(stationId, tableName)
29+
}
30+
await validateStationData(stationData)
31+
await insertStation(stationData)
32+
} catch (error) {
33+
logger.error(`Could not process data for station ${stationId} (${error.message})`)
34+
}
35+
}
36+
37+
async function handler ({ offset = 0 } = {}) {
38+
const BATCH_SIZE = parseInt(process.env.IMTD_BATCH_SIZE || '500')
39+
40+
logger.info(`Retrieving up to ${BATCH_SIZE} rloi_ids with an offset of ${offset}`)
41+
const rloiids = await getRloiIds({
42+
offset,
43+
limit: BATCH_SIZE
44+
})
45+
logger.info(`Retrieved ${rloiids.length} rloi_ids`)
46+
47+
for (const rloiid of rloiids) {
48+
await getData(rloiid.rloi_id)
49+
}
50+
51+
if (rloiids.length >= BATCH_SIZE) {
52+
const functionName = process.env.AWS_LAMBDA_FUNCTION_NAME
53+
const newOffset = offset + BATCH_SIZE
54+
logger.info(`Invoking ${functionName} with an offset of ${newOffset}`)
55+
56+
await invokeLambda(functionName, {
57+
offset: newOffset
58+
})
59+
}
60+
}
61+
62+
module.exports = {
63+
handler,
64+
validateStationData
65+
}
66+
67+
process.on('SIGTERM', async () => {
68+
logger.info('SIGTERM received, destroying DB connection')
69+
await pg.destroy()
70+
process.exit(0)
71+
})

lib/functions/imtd-process.js

Lines changed: 7 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,10 @@
11
const parseThresholds = require('../models/parse-thresholds')
2-
const axios = require('axios')
32
const logger = require('../helpers/logging')
43
const pg = require('../helpers/db')
54
const invokeLambda = require('../helpers/invoke-lambda')
6-
7-
async function deleteThresholds (stationId) {
8-
try {
9-
await pg('station_imtd_threshold').where({ station_id: stationId }).delete()
10-
logger.info(`Deleted thresholds for RLOI id ${stationId}`)
11-
} catch (error) {
12-
logger.error(`Error deleting thresholds for station ${stationId}`, error)
13-
throw error
14-
}
15-
}
5+
const deleteThresholds = require('../helpers/imtd-api').deleteStation
6+
const { getRloiIds, getImtdApiResponse } = require('../helpers/imtd-api')
7+
const tableName = 'station_imtd_threshold'
168

179
async function insertThresholds (stationId, thresholds) {
1810
try {
@@ -37,21 +29,6 @@ async function insertThresholds (stationId, thresholds) {
3729
}
3830
}
3931

40-
async function getImtdApiResponse (stationId) {
41-
const hostname = 'imfs-prd1-thresholds-api.azurewebsites.net'
42-
try {
43-
return await axios.get(`https://${hostname}/Location/${stationId}?version=2`)
44-
} catch (error) {
45-
if (error.response?.status === 404) {
46-
logger.info(`Station ${stationId} not found (HTTP Status: 404)`)
47-
} else {
48-
const message = error.response?.status ? `HTTP Status: ${error.response.status}` : `Error: ${error.message}`
49-
throw Error(`IMTD API request for station ${stationId} failed (${message})`)
50-
}
51-
return {}
52-
}
53-
}
54-
5532
async function getIMTDThresholds (stationId) {
5633
const response = await getImtdApiResponse(stationId)
5734
if (response.data) {
@@ -66,36 +43,23 @@ async function getData (stationId) {
6643
if (thresholds.length > 0) {
6744
await insertThresholds(stationId, thresholds)
6845
} else {
69-
await deleteThresholds(stationId)
46+
await deleteThresholds(stationId, tableName)
47+
logger.info(`Deleted data for RLOI id ${stationId}`)
7048
}
7149
} catch (error) {
7250
logger.error(`Could not process data for station ${stationId} (${error.message})`)
7351
}
7452
}
7553

76-
async function getRloiIds ({ limit, offset } = {}) {
77-
try {
78-
logger.info(`Retrieving up to ${limit} rloi_ids with an offset of ${offset}`)
79-
const result = await pg('rivers_mview')
80-
.distinct('rloi_id')
81-
.whereNotNull('rloi_id')
82-
.orderBy('rloi_id', 'asc')
83-
.limit(limit)
84-
.offset(offset)
85-
logger.info(`Retrieved ${result.length} rloi_ids`)
86-
return result
87-
} catch (error) {
88-
throw Error(`Could not get list of id's from database (Error: ${error.message})`)
89-
}
90-
}
91-
9254
async function handler ({ offset = 0 } = {}) {
9355
const BATCH_SIZE = parseInt(process.env.IMTD_BATCH_SIZE || '500')
9456

57+
logger.info(`Retrieving up to ${BATCH_SIZE} rloi_ids with an offset of ${offset}`)
9558
const stations = await getRloiIds({
9659
offset,
9760
limit: BATCH_SIZE
9861
})
62+
logger.info(`Retrieved ${stations.length} rloi_ids`)
9963

10064
for (const station of stations) {
10165
await getData(station.rloi_id)

lib/helpers/imtd-api.js

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
const pg = require('./db')
2+
const axios = require('axios')
3+
const { HTTP_NOT_FOUND } = require('../constants')
4+
const logger = require('./logging')
5+
const parseStation = require('../models/parse-time-series')
6+
const Joi = require('joi')
7+
8+
async function deleteStation (stationId, tableName) {
9+
await pg(tableName).where({ station_id: stationId }).delete()
10+
}
11+
12+
async function getRloiIds ({ limit, offset } = {}) {
13+
try {
14+
const result = await pg('rivers_mview')
15+
.distinct('rloi_id')
16+
.whereNotNull('rloi_id')
17+
.orderBy('rloi_id', 'asc')
18+
.limit(limit)
19+
.offset(offset)
20+
return result
21+
} catch (error) {
22+
throw Error(`Could not get list of id's from database (Error: ${error.message})`)
23+
}
24+
}
25+
26+
async function getImtdApiResponse (stationId) {
27+
const hostname = 'imfs-prd1-thresholds-api.azurewebsites.net'
28+
try {
29+
return await axios.get(`https://${hostname}/Location/${stationId}?version=2`)
30+
} catch (error) {
31+
if (error.response?.status === HTTP_NOT_FOUND) {
32+
logger.info(`Station ${stationId} not found (HTTP Status: 404)`)
33+
} else {
34+
const message = error.response?.status ? `HTTP Status: ${error.response.status}` : `Error: ${error.message}`
35+
throw Error(`IMTD API request for station ${stationId} failed (${message})`)
36+
}
37+
return {}
38+
}
39+
}
40+
41+
async function getStationData (stationId) {
42+
const response = await getImtdApiResponse(stationId)
43+
if (response.data) {
44+
return parseStation(response.data[0].TimeSeriesMetaData, stationId)
45+
}
46+
return []
47+
}
48+
49+
async function validateStationData (stationDataArray) {
50+
const schema = Joi.object({
51+
station_id: Joi.number().required(),
52+
direction: Joi.string().required(),
53+
display_time_series: Joi.boolean().required()
54+
})
55+
56+
try {
57+
const validatedData = await Promise.all(
58+
stationDataArray.map((stationData) => schema.validateAsync(stationData))
59+
)
60+
return validatedData
61+
} catch (error) {
62+
throw new Error(`Validation error: ${error.message}`)
63+
}
64+
}
65+
66+
module.exports = { deleteStation, getRloiIds, getImtdApiResponse, getStationData, validateStationData }

lib/helpers/query.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
module.exports = class Query {
32
constructor (sql, bindings) {
43
this.text = sql

lib/models/fwis.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
module.exports = {
32
async save (warnings, timestamp, pool) {
43
const dbWarnings = warnings

lib/models/parse-time-series.js

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/**
2+
* @param {Object} data - The data to be parsed.
3+
* @returns {Object} - The processed data.
4+
*/
5+
function parseTimeSeries (data, stationId) {
6+
if (!data) {
7+
return {}
8+
}
9+
10+
const processedData = data.map((item) => ({
11+
station_id: stationId,
12+
direction: item.qualifier === 'Downstream Stage' ? 'd' : 'u',
13+
display_time_series: item.DisplayTimeSeries
14+
}))
15+
16+
const uniqueProcessedData = processedData.filter((item, index, self) =>
17+
index === self.findIndex((t) => (
18+
t.station_id === item.station_id && t.direction === item.direction
19+
))
20+
)
21+
22+
return uniqueProcessedData
23+
}
24+
25+
module.exports = parseTimeSeries

0 commit comments

Comments
 (0)