Skip to content

Conversation

@alinakbase
Copy link
Collaborator

Description of PR purpose/changes

This PR implements a robust, automated pipeline for batch downloading, parsing, and transforming UniRef100 cluster data into a standardized Common Data Model (CDM) format. Key features include:

  • Automated ID fetching: Uses UniProt REST API to fetch the latest UniRef100 cluster IDs, ensuring up-to-date data with each run.
  • Incremental batch XML download: Efficiently downloads only missing or new UniRef100 cluster XML files, skipping those already present locally.
  • Structured XML parsing: Extracts cluster-level, representative member, and individual member data, mapping them to a clean, CDM-compliant schema.
  • Multi-format output: Exports parsed data as .jsonl, flat .csv, .parquet, and single example .json files for downstream analysis.
  • Robust error handling & logging: Provides clear status messages for network failures, parsing errors, and unexpected XML fields.
  • Self-contained, re-runnable pipeline: Enables researchers to always work with current UniRef100 data snapshots.

Testing Instructions

  • Check outputs:
  • all_uniref100_cdm.jsonl
  • all_uniref100_flat.csv
  • all_uniref100_flat.parquet
  • uniref100_cdm_example.json
  • all_uniref100_cdm_array.json
  • Lint and format checks:
  • Run ruff format Uniref_mapping.py
  • Run ruff check Uniref_mapping.py

@codecov
Copy link

codecov bot commented Jun 16, 2025

Codecov Report

Attention: Patch coverage is 0% with 404 lines in your changes missing coverage. Please review.

Project coverage is 52.04%. Comparing base (dd1b528) to head (0dfc75b).

Files with missing lines Patch % Lines
src/parsers/archaea_parsers.py 0.00% 203 Missing ⚠️
src/parsers/Uniref_mapping.py 0.00% 201 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff             @@
##             main       #9       +/-   ##
===========================================
- Coverage   87.26%   52.04%   -35.23%     
===========================================
  Files           9       11        +2     
  Lines         597     1001      +404     
===========================================
  Hits          521      521               
- Misses         76      480      +404     
Files with missing lines Coverage Δ
src/parsers/Uniref_mapping.py 0.00% <0.00%> (ø)
src/parsers/archaea_parsers.py 0.00% <0.00%> (ø)

Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update dd1b528...0dfc75b. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment on lines 420 to 427
## Read the list of IDs and download all XMLs
with open(tsv_path) as fin:
ids = [
line.strip()
for line in fin
if line.strip() and not line.startswith("Cluster ID")
]
batch_download_uniref_xml(ids)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did genAI suggest this method of fetching the uniref data? I'm not convinced that it's more efficient that downloading the big XML file and parsing that using spark's XML parser.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I've tried two methods: Downloading big XML files and parsing them with Spark, and GenAI.
The reason I used it was because I could get the latest synchronized data from Uniref Website. (No need download the whole dataset next time)
After I checked that the Uniref xml files are really small. Spark is more suitable for parsing large data in full volume, so I chose the GenAI method.
But I will write Spark's XML parsers that use it for comparison later. So there will be two parsers about Uniref.

Comment on lines 378 to 389
"entity_id": cdm["entity_id"],
"entity_type": cdm["entity_type"],
"source": cdm["source"],
"uniref_level": cdm["uniref_level"],
"cluster_id": cdm["cluster_id"],
"updated": cdm["updated"],
"member_count": cdm["member_count"],
"functional_annotation": cdm["functional_annotation"],
"common_taxon": cdm["common_taxon"],
"common_taxon_id": cdm["common_taxon_id"],
"created": cdm["created"],
"cdm_version": cdm["cdm_version"],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to check with the CDM science team to see what information they actually need as we might be able to skip some of this stuff

Comment on lines 398 to 403
for i in cdm.get("members", []):
## Add mem_ prefix to each member field
mem_flat = {f"mem_{k}": v for k, v in i.items()}
## merge all fields into one row
row = {**basic, **rep_flat, **mem_flat}
records.append(row)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that we want to flatten out the data like this. We can consult with the CDM sci people to see what information they need from this dataset and then work out which tables in the schema we'll want to populate.

Comment on lines 518 to 523
builder = (
SparkSession.builder.appName("DeltaIngestion")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)
spark = configure_spark_with_delta_pip(builder).getOrCreate()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

Comment on lines 50 to 55
{
"entity_id": cdm_id,
"identifier": f"UniProt:{acc.text}",
"source": "UniProt",
"description": "UniProt accession",
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can these be added directly to the DataFrame instead of converting to python dicts and then to DataFrames?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried skipping the dict and parsing the data directly from xml to generate a spark dataframe, but entries are xml nodes (element tree), not structures supported by the dataframe.

2nd time of pull request
@ialarmedalien ialarmedalien changed the title Add files via upload UniProt parser Jul 2, 2025
alinakbase and others added 17 commits July 2, 2025 18:33
Add requests, pyspark and delta spark in the dependencies.
List Modified functions: 
generate_unique_id:
Originally, uuid4() was used, which produces a random identifier on each call. To ensure consistency across datasets—not just for Archaea files but for all sources—the implementation has been switched to an MD5 hash of the accession. This guarantees that the same accession will always generate the same entity_id.

parse_names:
The code has been optimized by using findall() to eliminate unnecessary if...else statements, streamlining the extraction of protein names.

build_datasource_record:
The previous implementation of build_datasource_record did not accept parameters and relied on a fixed URL. In the revised version, the accessed field uses a POSIX timestamp to capture the data retrieval time.

parse_evidence_map:
Publication references are now explicitly captured as PubMed or DOI identifiers, while other references (e.g., EMBL, GenBank, DDBJ) are classified as supporting objects.

parse_publications:
Only PubMed (PMID) and DOI reference numbers are retained in the output.

parse_uniprot_entries:
Previously, only entries with a modification date exactly equal to a target day were processed. The logic has been updated to include all records with a modification date on or after the specified target_date. Entries with a modification date earlier than the target date are skipped, ensuring only new or updated data is ingested.

download_file and stream_uniprot_xml:
These functions enable downloading any file from a specified URL to a local path, and efficiently parsing UniProt XML entries in a streaming fashion.

Batch writing to Delta Lake:
Instead of writing intermediate JSONL files, Python lists of parsed records are written directly to Delta Lake tables. This approach transforms in-memory parsed data into a structured, persistent format in the data lake.

main function refactoring:
Due to the original length and complexity of the main function, the workflow has been refactored into modular functions. The execution order is as follows:
	1.	Call ingest_uniprot
	2.	Check if the .xml.gz file exists locally; if not, download it from UniProt.
	3.	Generate datasource.json to record provenance information for traceability.
	4.	Initialize a SparkSession with Delta Lake support and create the target database.
	5.	If prior data exists, load all entity_id values from the Delta table.
	6.	Initialize in-memory containers for the CDM tables.
	7.	Parse the XML file in batches, populating the corresponding tables.
use ruff re-format uniprot.py
1. Imports Consolidation and Module Name Update 
2. Function Name and Parameter Updates
3. build_datasource_record Usage
Now expects a URL argument. Test provide a sample URL and update assertions to reflect the returned structure.
4. Test Data Consistency
5. Expanded Output Checking
parse_publications: The new version expects and checks for more types of references (e.g., DDBJ, EMBL) beyond just PubMed and DOI
6. Removal or Renaming of Redundant Tests
7. Synchronization with Main Code Logic
Wherever the business logic changed (timestamps, field values, error handling), tests were modified to reflect that logic, minimizing assertion errors.
generate_cdm_id: 
Load the existing 'identifiers' Delta table and build a mapping from UniProt accession to CDM entity ID.

build_datasource_record:
Build a provenance record for the UniProt datasource without version extraction.

generate_cdm_id:
Generate a CDM entity_id directly from UniProt accession, using 'CDM:' prefix
change the typical usage 
python3 src/parsers/uniprot.py \
    --xml-url "https://ftp.uniprot.org/pub/databases/uniprot/current_release/knowledgebase/taxonomic_divisions/uniprot_sprot_archaea.xml.gz" \
    --output-dir "./output" \
    --namespace "uniprot_db" \
    --batch-size 5000
remove hashlib 

fix two parts: 
1. generate_cdm_id,  make sure it is standard format 
2. test_parse_uniprot_entry
revise function parse_protein_info, avoid NULL in EC_NUMBER
remove empty file
there is new file replace the uniref_mapping.py
Comment on lines 314 to 329
for reaction in comment.findall("u:reaction", NS):
for dbref in reaction.findall("u:dbReference", NS):
db_type = dbref.get("type")
db_id = dbref.get("id")
catalyze_assoc = {
"subject": cdm_id,
"predicate": "catalyzes",
"object": f"{db_type}:{db_id}",
"evidence_type": None,
"supporting_objects": None,
"publications": None,
}
evidence_key = reaction.get("evidence")
if evidence_key and evidence_key in evidence_map:
catalyze_assoc.update(evidence_map[evidence_key])
associations.append(clean(catalyze_assoc))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not critical, but you could consider putting this into its own function to simplify the code a bit. Same with the cofactor code below.



def parse_uniprot_entry(entry, cdm_id, datasource_name="UniProt import", prev_created=None):
desire_date = datetime.datetime.now(datetime.timezone.utc).isoformat()
Copy link
Collaborator

@ialarmedalien ialarmedalien Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

desire_date sounds a bit funny! You might want to change it to something like current_timestamp. I would also move this so that it gets calculated once, early on in the running of the script, so that everything that gets parsed in this ingest session has the same timestamp.

These issues are already finxed: 

first issue: 
I extracted both the reaction association and the cofactor association as separate functions and called them in the parse_associations 
Make new functions, name as parse_reaction_association AND parse_cofactor_association. 
Modified parse_associations function, in catalytic/cofactor 

Second Issue: 
Create current_timestamp in the main ingest function (ingest_uniprot). Pass current_timestamp to parse_entries, parse_entries adds the current_timestamp parameter and passes it to parse_uniprot_entry. Modify the parse_uniprot_entry signature to replace desire_date with current_timestamp.

there are three function add current_timestamp: which is parse_uniprot_entry, parse_entries, ingest_uniprot. 

uuid: 
Make a change in generate_cdm_id.
In parse_entries function, cdm_id = generate_cdm_id() 
To avoid confusion, I've written a detailed explanation of the steps inside this function.
Modified generate_cdm_id (first) and parse_uniprot_entries (last)
Make changes in CLI Interface, and main function. Remove the CSV output. 

Add GO_DELTA_SCHEMA for spark, and set Spark env. 

Revise the process_go_annotations, write to Delta Table.
pass the test
ialarmedalien
ialarmedalien previously approved these changes Aug 15, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants