Skip to content

Commit 09f2d92

Browse files
authored
perf: New neo4j csv publisher to improve performance using batched params (#1957)
* New publisher using managed transactions, unwind statements, configurable two or one way relations Signed-off-by: Kristen Armes <[email protected]> * Add logic for preserving adhoc ui data, and move write transactions logic to reusable function Signed-off-by: Kristen Armes <[email protected]> * Fixing flake8 error (not sure how it got through, not from my change) Signed-off-by: Kristen Armes <[email protected]> * Pulling in a few of the latest changes from the original publisher and lint Signed-off-by: Kristen Armes <[email protected]> * Fix tests Signed-off-by: Kristen Armes <[email protected]> * Addressing PR feedback and change index creation to use a managed transaction Signed-off-by: Kristen Armes <[email protected]> * Refactor props body function and separate out constants Signed-off-by: Kristen Armes <[email protected]> * Addressing PR feedback plus refactoring Signed-off-by: Kristen Armes <[email protected]> * Addressing PR feedback, more refactoring Signed-off-by: Kristen Armes <[email protected]> * Minor fixes and bump version Signed-off-by: Kristen Armes <[email protected]> Signed-off-by: Kristen Armes <[email protected]>
1 parent 01d6e8c commit 09f2d92

File tree

7 files changed

+623
-19
lines changed

7 files changed

+623
-19
lines changed

databuilder/databuilder/extractor/hive_table_last_updated_extractor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,8 @@ class HiveTableLastUpdatedExtractor(Extractor):
113113
AND NOT EXISTS (SELECT * FROM PARTITION_KEYS WHERE PARTITION_KEYS.TBL_ID = TBLS.TBL_ID)
114114
"""
115115

116-
DEFAULT_POSTGRES_ADDTIONAL_WHERE_CLAUSE = """ NOT EXISTS (SELECT * FROM "PARTITIONS" p WHERE p."TBL_ID" = t."TBL_ID")
117-
AND NOT EXISTS (SELECT * FROM "PARTITION_KEYS" pk WHERE pk."TBL_ID" = t."TBL_ID")
116+
DEFAULT_POSTGRES_ADDTIONAL_WHERE_CLAUSE = """ NOT EXISTS (SELECT * FROM "PARTITIONS" p
117+
WHERE p."TBL_ID" = t."TBL_ID") AND NOT EXISTS (SELECT * FROM "PARTITION_KEYS" pk WHERE pk."TBL_ID" = t."TBL_ID")
118118
"""
119119

120120
DATABASE = 'hive'

databuilder/databuilder/publisher/neo4j_csv_publisher.py

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424

2525
from databuilder.publisher.base_publisher import Publisher
2626
from databuilder.publisher.neo4j_preprocessor import NoopRelationPreprocessor
27+
from databuilder.publisher.publisher_config_constants import (
28+
Neo4jCsvPublisherConfigs, PublishBehaviorConfigs, PublisherConfigs,
29+
)
2730

2831
# Setting field_size_limit to solve the error below
2932
# _csv.Error: field larger than field limit (131072)
@@ -32,53 +35,53 @@
3235

3336
# Config keys
3437
# A directory that contains CSV files for nodes
35-
NODE_FILES_DIR = 'node_files_directory'
38+
NODE_FILES_DIR = PublisherConfigs.NODE_FILES_DIR
3639
# A directory that contains CSV files for relationships
37-
RELATION_FILES_DIR = 'relation_files_directory'
40+
RELATION_FILES_DIR = PublisherConfigs.RELATION_FILES_DIR
3841
# A end point for Neo4j e.g: bolt://localhost:9999
39-
NEO4J_END_POINT_KEY = 'neo4j_endpoint'
42+
NEO4J_END_POINT_KEY = Neo4jCsvPublisherConfigs.NEO4J_END_POINT_KEY
4043
# A transaction size that determines how often it commits.
41-
NEO4J_TRANSACTION_SIZE = 'neo4j_transaction_size'
44+
NEO4J_TRANSACTION_SIZE = Neo4jCsvPublisherConfigs.NEO4J_TRANSACTION_SIZE
4245
# A progress report frequency that determines how often it report the progress.
4346
NEO4J_PROGRESS_REPORT_FREQUENCY = 'neo4j_progress_report_frequency'
4447
# A boolean flag to make it fail if relationship is not created
4548
NEO4J_RELATIONSHIP_CREATION_CONFIRM = 'neo4j_relationship_creation_confirm'
4649

47-
NEO4J_MAX_CONN_LIFE_TIME_SEC = 'neo4j_max_conn_life_time_sec'
50+
NEO4J_MAX_CONN_LIFE_TIME_SEC = Neo4jCsvPublisherConfigs.NEO4J_MAX_CONN_LIFE_TIME_SEC
4851

4952
# list of nodes that are create only, and not updated if match exists
50-
NEO4J_CREATE_ONLY_NODES = 'neo4j_create_only_nodes'
53+
NEO4J_CREATE_ONLY_NODES = Neo4jCsvPublisherConfigs.NEO4J_CREATE_ONLY_NODES
5154

5255
# list of node labels that could attempt to be accessed simultaneously
5356
NEO4J_DEADLOCK_NODE_LABELS = 'neo4j_deadlock_node_labels'
5457

55-
NEO4J_USER = 'neo4j_user'
56-
NEO4J_PASSWORD = 'neo4j_password'
58+
NEO4J_USER = Neo4jCsvPublisherConfigs.NEO4J_USER
59+
NEO4J_PASSWORD = Neo4jCsvPublisherConfigs.NEO4J_PASSWORD
5760
# in Neo4j (v4.0+), we can create and use more than one active database at the same time
58-
NEO4J_DATABASE_NAME = 'neo4j_database'
61+
NEO4J_DATABASE_NAME = Neo4jCsvPublisherConfigs.NEO4J_DATABASE_NAME
5962

6063
# NEO4J_ENCRYPTED is a boolean indicating whether to use SSL/TLS when connecting
61-
NEO4J_ENCRYPTED = 'neo4j_encrypted'
64+
NEO4J_ENCRYPTED = Neo4jCsvPublisherConfigs.NEO4J_ENCRYPTED
6265
# NEO4J_VALIDATE_SSL is a boolean indicating whether to validate the server's SSL/TLS
6366
# cert against system CAs
64-
NEO4J_VALIDATE_SSL = 'neo4j_validate_ssl'
67+
NEO4J_VALIDATE_SSL = Neo4jCsvPublisherConfigs.NEO4J_VALIDATE_SSL
6568

6669
# This will be used to provide unique tag to the node and relationship
67-
JOB_PUBLISH_TAG = 'job_publish_tag'
70+
JOB_PUBLISH_TAG = PublisherConfigs.JOB_PUBLISH_TAG
6871

6972
# any additional fields that should be added to nodes and rels through config
70-
ADDITIONAL_FIELDS = 'additional_fields'
73+
ADDITIONAL_FIELDS = PublisherConfigs.ADDITIONAL_PUBLISHER_METADATA_FIELDS
7174

7275
# Neo4j property name for published tag
73-
PUBLISHED_TAG_PROPERTY_NAME = 'published_tag'
76+
PUBLISHED_TAG_PROPERTY_NAME = PublisherConfigs.PUBLISHED_TAG_PROPERTY_NAME
7477

7578
# Neo4j property name for last updated timestamp
76-
LAST_UPDATED_EPOCH_MS = 'publisher_last_updated_epoch_ms'
79+
LAST_UPDATED_EPOCH_MS = PublisherConfigs.LAST_UPDATED_EPOCH_MS
7780

7881
# A boolean flag to indicate if publisher_metadata (e.g. published_tag,
7982
# publisher_last_updated_epoch_ms)
8083
# will be included as properties of the Neo4j nodes
81-
ADD_PUBLISHER_METADATA = 'add_publisher_metadata'
84+
ADD_PUBLISHER_METADATA = PublishBehaviorConfigs.ADD_PUBLISHER_METADATA
8285

8386
RELATION_PREPROCESSOR = 'relation_preprocessor'
8487

0 commit comments

Comments
 (0)