-
Notifications
You must be signed in to change notification settings - Fork 12
[data dashboard backend] Add Data Dashboard Backend service #267
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
735a81d
537b1df
df71d19
0cf64a9
4d02dd3
859d04f
c5c925e
8590a80
b179559
7018dca
0250885
2a4d3b1
abb6927
51fdead
0df8dd4
9c29557
2bd32be
09cfd69
264a341
02ea382
9762fbe
c09f960
26f80d7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| # Kafka-data transformer (ksql-server) for data-dashboard-backend service | ||
|
|
||
| Reference: https://docs.ksql-server.io/ | ||
|
|
||
| The data-dashboard-backend service uses data derived from Kafka topics imported into the _observation_ table | ||
| data-dashboard-backend service database. The data in the Kafka topics is transformed by the ksql-server data transformer | ||
| to be imported into the _observation_ table. | ||
|
|
||
| The ksql-server data transformer is able to register Consumer/Producers to Kafka that transform data in a topic and | ||
| publish the results to another topic. | ||
|
|
||
| The provided ksql-server _questionnaire_response_observations.sql_ and _questionnaire_app_events_observation.sql_ SQL files | ||
| transform, respectively, the _questionnaire_response_ and _questionnaire_app_event_ topics and publish the data to the | ||
| _ksql_observations_ topic. The _ksql_observations_ topic is consumed by the radar-jdbc-connector service deployed for the | ||
| data-dashboard-backend service (see: [20-data-dashboard.yaml](../helmfile.d/20-dashboard.yaml)). | ||
|
|
||
| When transformation of other topics is required, new SQL files can be added to this directory. These new files should be | ||
| referenced in the _cp-ksql-server_ -> ksql -> queries_ section of the `etc/base.yaml.gotmpl` file. New ksql-server SQL | ||
| files should transform towards the following format of the _ksql_observations_ topic: | ||
|
|
||
| ``` | ||
| TOPIC KEY: | ||
| PROJECT: the project identifier | ||
| SOURCE: the source identifier | ||
| SUBJECT: the subject/study participant identifier | ||
| TOPIC VALUE: | ||
| TOPIC: the topic identifier | ||
| CATEGORY: the category identifier (optional) | ||
| VARIABLE: the variable identifier | ||
| DATE: the date of the observation | ||
| END_DATE: the end date of the observation (optional) | ||
| TYPE: the type of the observation (STRING, STRING_JSON, INTEGER, DOUBLE) | ||
| VALUE_TEXTUAL: the textual value of the observation (optional, must be set when VALUE_NUMERIC is NULL) | ||
| VALUE_NUMERIC: the numeric value of the observation (optional, must be set when VALUE_TEXTUAL is NULL) | ||
| ``` | ||
|
|
||
| New messages are added to the _ksql_observations_ topic by inserting into the _observations_ stream ( | ||
| see [_base_observations_stream.sql](_base_observations_stream.sql)): | ||
|
|
||
| ``` | ||
| INSERT INTO observations | ||
| SELECT | ||
| ... | ||
| PARTITION BY q.projectId, q.userId, q.sourceId | ||
| EMIT CHANGES; | ||
| ``` |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -290,10 +290,12 @@ radar_rest_sources_backend: | |
| garmin: | ||
| enable: "false" | ||
|
|
||
| # --------------------------------------------------------- 20-grafana.yaml --------------------------------------------------------- | ||
| # --------------------------------------------------------- 20-dashboard.yaml --------------------------------------------------------- | ||
|
|
||
| timescaledb_username: postgres | ||
| timescaledb_db_name: grafana-metrics | ||
| data_dashboard_backend_db_name: data-dashboard | ||
| data_dashboard_backend_db_username: postgres | ||
| grafana_metrics_username: postgres | ||
|
|
||
| timescaledb: | ||
|
|
@@ -315,23 +317,63 @@ timescaledb: | |
| # Uncomment when upgrading | ||
| #existingClaim: "data-timescaledb-postgresql-0" | ||
|
|
||
| # Make sure to set: | ||
| #- radar_jdbc_connector_grafana._install to true | ||
| #- ksql_server._install to true | ||
| radar_grafana: | ||
| _install: true | ||
| _install: false | ||
| _chart_version: 6.26.8 | ||
| _extra_timeout: 0 | ||
| replicaCount: 1 | ||
| env: | ||
| GF_DASHBOARDS_DEFAULT_HOME_DASHBOARD_PATH: /var/lib/grafana/dashboards/allprojects/home.json | ||
|
|
||
| radar_jdbc_connector: | ||
| _install: true | ||
| # Make sure to set: | ||
pvannierop marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| #- radar_jdbc_connector_data_dashboard_backend._install to true | ||
| #- ksql_server._install to true | ||
| data_dashboard_backend: | ||
| _install: false | ||
| _chart_version: 0.1.0 | ||
keyvaann marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| _extra_timeout: 0 | ||
| replicaCount: 1 | ||
|
|
||
| # Install when radar_grafana._install is 'true' | ||
| radar_jdbc_connector_grafana: | ||
| _install: false | ||
| _chart_version: 0.5.1 | ||
| _extra_timeout: 0 | ||
| replicaCount: 1 | ||
| sink: | ||
| # Change the list of topics if you have dashboards that read other data or if you don't have certain topics available on your cluster. | ||
| topics: android_phone_relative_location, android_phone_battery_level, connect_fitbit_intraday_heart_rate, connect_fitbit_intraday_steps | ||
|
|
||
| # Install when data_dashboard_backend._install is 'true' | ||
| radar_jdbc_connector_data_dashboard_backend: | ||
pvannierop marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| _install: false | ||
| _chart_version: 0.5.1 | ||
| _extra_timeout: 0 | ||
| replicaCount: 1 | ||
|
|
||
| # Install when using realtime analysis | ||
| radar_jdbc_connector_realtime_dashboard: | ||
| _install: false | ||
| _chart_version: 0.5.1 | ||
| _extra_timeout: 0 | ||
| replicaCount: 1 | ||
|
|
||
| # Install when: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. again it can be enabled automatically in the helmfiles using templating with if statements
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed. We decided to postpone this to future changes. |
||
| #- radar_grafana._install is 'true' | ||
| #- data_dashboard_backend._install is 'true' | ||
| #- using realtime analysis | ||
| ksql_server: | ||
| _install: false | ||
| _chart_version: 0.3.1 | ||
| _extra_timeout: 0 | ||
| # -- Uncomment when using real-time analysis | ||
| # ksql: | ||
| # headless: false | ||
| # -- | ||
|
|
||
| # --------------------------------------------------------- 20-ingestion.yaml --------------------------------------------------------- | ||
|
|
||
| radar_gateway: | ||
|
|
@@ -488,30 +530,6 @@ radar_push_endpoint: | |
| garmin: | ||
| enabled: true | ||
|
|
||
| # --------------------------------------------------------- 40-realtime-analyses.yaml --------------------------------------------------------- | ||
|
|
||
| radar_jdbc_connector_agg: | ||
| _install: false | ||
| _chart_version: 0.5.1 | ||
| _extra_timeout: 0 | ||
| replicaCount: 1 | ||
|
|
||
| ksql_server: | ||
| _install: false | ||
| _chart_version: 0.3.1 | ||
| _extra_timeout: 0 | ||
| replicaCount: 1 | ||
| servicePort: 8088 | ||
| kafka: | ||
| bootstrapServers: PLAINTEXT://cp-kafka:9092 | ||
| cp-schema-registry: | ||
| url: http://cp-schema-registry:8081 | ||
| ksql: | ||
| headless: false | ||
| configurationOverrides: | ||
| "ksql.server.url": "http://0.0.0.0:8088" | ||
| "ksql.advertised.listener": "http://ksql-server:8088" | ||
|
|
||
| # --------------------------------------------------------- 99-velero.yaml --------------------------------------------------------- | ||
|
|
||
| velero: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| SET 'auto.offset.reset' = 'earliest'; | ||
keyvaann marked this conversation as resolved.
Show resolved
Hide resolved
pvannierop marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| -- Register the 'ksql_observations' topic (is created when not exists). | ||
| CREATE STREAM observations ( | ||
| PROJECT VARCHAR KEY, -- 'KEY' means that this field is part of the kafka message key | ||
| SUBJECT VARCHAR KEY, | ||
| SOURCE VARCHAR KEY, | ||
| TOPIC_NAME VARCHAR, | ||
| CATEGORY VARCHAR, | ||
| VARIABLE VARCHAR, | ||
| OBSERVATION_TIME TIMESTAMP, | ||
| OBSERVATION_TIME_END TIMESTAMP, | ||
| TYPE VARCHAR, | ||
| VALUE_NUMERIC DOUBLE, | ||
| VALUE_TEXTUAL VARCHAR | ||
| ) WITH ( | ||
| kafka_topic = 'ksql_observations', | ||
| partitions = 3, | ||
| format = 'avro' | ||
| ); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| CREATE STREAM questionnaire_app_event ( | ||
| projectId VARCHAR KEY, -- 'KEY' means that this field is part of the kafka message key | ||
| userId VARCHAR KEY, | ||
| sourceId VARCHAR KEY, | ||
| questionnaireName VARCHAR, | ||
| eventType VARCHAR, | ||
| time DOUBLE, | ||
| metadata MAP<VARCHAR, VARCHAR>+ | ||
| ) WITH ( | ||
| kafka_topic = 'questionnaire_app_event', | ||
| partitions = 3, | ||
| format = 'avro' | ||
| ); | ||
|
|
||
| INSERT INTO observations | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe add a Query ID to the insert so we can identify them
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean an autoincremented id field for each inserted row?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have implemented a auto-incrementing 'id' field in data-dashboard-backend. The config in this PR has been updated accordingly.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh sorry for the confusion i just meant the QUERY_ID -- https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/insert-into/#query_id (it is just a name for the INSERT query) For the database tables, i think the primary keys you had previously were good. This is not that important in case of questionnaires, but when you have passive data such as fitbit there is usually high chance of having duplicate data, so this would reduce that in the database. Also would help with querying if using primary keys (as will be indexed by default).
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added it like so: and |
||
| WITH (QUERY_ID='questionnaire_app_event_observations') | ||
| SELECT | ||
| q.projectId AS PROJECT, | ||
| q.userId AS SUBJECT, | ||
| q.sourceId AS SOURCE, | ||
| 'questionnaire_app_event' as TOPIC_NAME, | ||
| CAST(NULL as VARCHAR) as CATEGORY, | ||
| q.questionnaireName as VARIABLE, | ||
| FROM_UNIXTIME(CAST(q.time * 1000 AS BIGINT)) as OBSERVATION_TIME, | ||
| CAST(NULL as TIMESTAMP) as OBSERVATION_TIME_END, | ||
| 'STRING_JSON' as TYPE, | ||
| CAST(NULL as DOUBLE) as VALUE_NUMERIC, | ||
| TO_JSON_STRING(q.metadata) as VALUE_TEXTUAL | ||
| FROM questionnaire_app_event q | ||
| PARTITION BY q.projectId, q.userId, q.sourceId -- this sets the fields in the kafka message key | ||
| EMIT CHANGES; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,83 @@ | ||
| CREATE STREAM questionnaire_response ( | ||
| projectId VARCHAR KEY, -- 'KEY' means that this field is part of the kafka message key | ||
| userId VARCHAR KEY, | ||
| sourceId VARCHAR KEY, | ||
| time DOUBLE, | ||
| timeCompleted DOUBLE, | ||
| timeNotification DOUBLE, | ||
| name VARCHAR, | ||
| version VARCHAR, | ||
| answers ARRAY<STRUCT<questionId VARCHAR, value STRUCT<int INT, string VARCHAR, double DOUBLE>, startTime DOUBLE, endTime DOUBLE>> | ||
| ) WITH ( | ||
| kafka_topic = 'questionnaire_response', | ||
| partitions = 3, | ||
| format = 'avro' | ||
| ); | ||
|
|
||
| CREATE STREAM questionnaire_response_exploded | ||
| AS SELECT | ||
| EXPLODE(TRANSFORM(q.answers, a => a->questionId)) as VARIABLE, | ||
| FROM_UNIXTIME(CAST(q.time * 1000 AS BIGINT)) as OBSERVATION_TIME, | ||
| q.projectId, | ||
| q.userId, | ||
| q.sourceId, | ||
| 'questionnaire_response' as TOPIC_NAME, | ||
| q.name as CATEGORY, | ||
| CAST(NULL as TIMESTAMP) as OBSERVATION_TIME_END, | ||
| -- WARNING!!! The cast from VARCHAR (string) to DOUBLE will throw an JAVA exception if the string is not a number. | ||
| -- This does not mean that the message will be lost. The value will be present in the VALUE_TEXTUAL_OPTIONAL field. | ||
| EXPLODE(TRANSFORM(q.answers, a => COALESCE(a->value->double, CAST(a->value->int as DOUBLE), CAST(a->value->string as DOUBLE)))) as VALUE_NUMERIC, | ||
| EXPLODE(TRANSFORM(q.answers, a => CASE | ||
| WHEN a->value->int IS NOT NULL THEN 'INTEGER' | ||
| WHEN a->value->double IS NOT NULL THEN 'DOUBLE' | ||
| ELSE NULL | ||
| END)) as TYPE, | ||
| -- Note: When cast to double works for the string value, the VALUE_TEXTUAL_OPTIONAL will also be set. | ||
| EXPLODE(TRANSFORM(q.answers, a => a->value->string)) as VALUE_TEXTUAL_OPTIONAL | ||
| FROM questionnaire_response q | ||
| EMIT CHANGES; | ||
|
|
||
| INSERT INTO observations | ||
pvannierop marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| WITH (QUERY_ID='questionnaire_response_observations') | ||
| SELECT | ||
| q.projectId as PROJECT, | ||
| q.sourceId as SOURCE, | ||
| q.userId as SUBJECT, | ||
| TOPIC_NAME, CATEGORY, VARIABLE, OBSERVATION_TIME, OBSERVATION_TIME_END, | ||
| CASE | ||
| WHEN TYPE IS NULL AND VALUE_NUMERIC IS NOT NULL THEN 'DOUBLE' -- must have been derived from a string cast | ||
| WHEN TYPE IS NULL AND VALUE_NUMERIC IS NULL THEN 'STRING' | ||
| ELSE TYPE -- keep the original type when TYPE is not NULL | ||
| END as TYPE, | ||
| VALUE_NUMERIC, | ||
| CASE | ||
| WHEN VALUE_NUMERIC IS NOT NULL THEN NULL -- When cast to double has worked for the string value, set VALUE_TEXTUAL to NULL. | ||
| ELSE VALUE_TEXTUAL_OPTIONAL | ||
| END as VALUE_TEXTUAL | ||
| FROM questionnaire_response_exploded q | ||
| PARTITION BY q.projectId, q.userId, q.sourceId -- this sets the fields in the kafka message key | ||
| EMIT CHANGES; | ||
|
|
||
| -- TODO: exploding the 'select:' questions is not yet fully designed. | ||
| -- I keep the code here for future reference. | ||
| -- Multi-select questionnaire questions are stored as a single 'value' string with the | ||
| -- names of the selected options separated by comma's. Multiselect questions are prefixed | ||
| -- by 'select:' in the questionId. | ||
| -- When 'questionId' is like 'select:%' create a new stream with the select options. | ||
| -- The options in the value field split commas and added as separate VARIABLE records. | ||
| -- The VALUE_NUMERIC is set to 1 and VALUE_TEXTUAL is set to NULL. | ||
| -- INSERT INTO observations | ||
| -- SELECT | ||
| -- EXPLODE(SPLIT(VALUE_TEXTUAL, ',')) as VARIABLE, | ||
| -- PROJECT, SOURCE, SUBJECT, TOPIC_NAME, CATEGORY, OBSERVATION_TIME, OBSERVATION_TIME_END, | ||
| -- 'INTEGER' as TYPE, | ||
| -- CAST(1 as DOUBLE) VALUE_NUMERIC, | ||
| -- CAST(NULL as VARCHAR) as VALUE_TEXTUAL | ||
| -- FROM questionnaire_response_observations | ||
| -- WHERE | ||
| -- VARIABLE IS NOT NULL | ||
| -- AND VARIABLE LIKE 'select:%' | ||
| -- AND VALUE_TEXTUAL IS NOT NULL | ||
| -- AND VALUE_TEXTUAL != '' | ||
| -- PARTITION BY SUBJECT, PROJECT, SOURCE | ||
| -- EMIT CHANGES; | ||
Uh oh!
There was an error while loading. Please reload this page.