Skip to content

Neo4jCsvPublisher Speed Optimization (Parallelism)  #1610

@chonyy

Description

@chonyy

Hi Team,
I’m wondering if there’s a plan to apply multiprocessing on the publishers. We have a large amount of metadata in our production, which ended up running 3 million queries on neo4j . It takes about 90 minutes to finish.

To investigate the bottleneck, I looked into the code and logged the time elapsed for each step in a single iteration in the _publish_node function. This is the result

  • Neo4j query: 0.1ms
  • Create statement: 1ms
  • Others: super fast, doesn’t matter
    image

Surprisingly, the bottleneck is not the db query, it’s the statement creation. The process is basically

  1. loop each row in csv
  2. parse the row into a dictionary
  3. loop through each key value pair in the dictionary to get the props
  4. fill the statement Jinjia template with the props
  5. execute the query with the statement

I’m thinking that instead of read a row => create a node in graph db one by one, maybe we could use multiprocessing to speed up the process. I believe there will be no dependency issue as long as we publish all the nodes before publishing relations, which is already handled in the current codebase.
I’m planning on implementing multiprocessing for this, is there any potential problem? Like dependency, graph db load, etc..

Expected Behavior or Use Case

Speed up the performance of the publisher. Currently, a 90 min sync is not acceptable for our use case 😢

Service or Ingestion ETL

Ingestion ETL, publisher

Possible Implementation

Thanks to @dkunitsk 's idea, I think there are three possible implementations

  1. Multiprocessing on call side
  2. Multiprocessing on Neo4j publisher
  3. Neo4j UNWIND (Batch processing)

image
image
image

class HiveParallelIndexer:
    # Shim for adding all node labels to the NEO4J_DEADLOCK_NODE_LABELS config
    # which enables retries for those node labels. This is important for parallel writing
    # since we see intermittent Neo4j deadlock errors relatively often.
    class ContainsAllList(list):
        def __contains__(self, item):
            return True

    def __init__(self, publish_tag: str, parallelism: int):
        self.publish_tag = publish_tag
        self.parallelism = parallelism

    def __call__(self, worker_index: int):
        # Sharding:
        #   - take the md5 hash of the schema.table_name
        #   - convert the first 3 characters of the hash to decimal (3 chosen arbitrarily)
        #   - mod by total number of processes
        where_clause_suffix = """
            WHERE MOD(CONV(LEFT(MD5(CONCAT(d.NAME, '.', t.TBL_NAME)), 3), 16, 10), {total_parallelism}) = {worker_index}
            AND t.TBL_TYPE IN ('EXTERNAL_TABLE', 'MANAGED_TABLE', 'VIRTUAL_VIEW')
            AND (t.VIEW_EXPANDED_TEXT != '/* Presto View */' OR t.VIEW_EXPANDED_TEXT is NULL)
        """.format(total_parallelism=self.parallelism,
            worker_index=worker_index)

        # configs relevant for multiprocessing
        job_config = ConfigFactory.from_dict({
            'extractor.hive_table_metadata.{}'.format(HiveTableMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY):
                where_clause_suffix,
            # keeping this relatively low, in our experience, reduces neo4j deadlocks
            'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_TRANSACTION_SIZE):
                100,
            'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_DEADLOCK_NODE_LABELS):
                HiveParallelIndexer.ContainsAllList(),
        })
        job = DefaultJob(conf=job_config,
                         task=DefaultTask(
                             extractor=HiveTableMetadataExtractor(),
                             loader=FsNeo4jCSVLoader()),
                         publisher=Neo4jCsvPublisher())
        job.launch()


parallelism = 16
indexer = HiveParallelIndexer(
    publish_tag='2021-12-03'
    parallelism=parallelism)

with multiprocessing.Pool(processes=parallelism) as pool:
    def callback(_):
        # fast fail in case of exception in any process
        print('terminating due to exception')
        pool.terminate()
    res = pool.map_async(indexer, [i for i in range(parallelism)], error_callback=callback)
    res.get()

Screenshots of Slack Discussion

image
image

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:databuilderFrom databuilder folderstatus:needs_votesIssue or bug fix that needs support from the community to be consideredtype:featureA new feature request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions