Automation pipeline to onboard the Global Labor Database to the Corporate Data Lake and publish its datasets to the Microdata Library
The pipeline is organized in three different jobs:
- Ingestion and Stacking (daily)
- Publication of individual tables to Microdata Library (monthly)
- Publication of stacked tables to Microdata Library (monthly)
This is a list of all support folders/files/tables:
- The
_ingestion_metadata tableis a table that keeps track of which tables have been ingested and published in the Microdata Library, as well as of some dataset-specific metadata. It also stores the path to the dta files and do files that have been associated with that specific ingestion - All country survey specific metadata is stored in am Excel file on the GLD Sharepoint.
When a new survey is onboarded to the GLD catalog, the survey should be added in this csv file.
- Country codes and names are matched using the MEGA table at prd_mega.indicator.country.
- The folder /Volumes/prd_csc_mega/sgld48/vgld48/Workspace/json_to_publish/ contains all json files awaiting publications.
These files are created in the *_json_creation scripts and deleted once publication is successfully completed in the *_json_publication scripts Please note that Databricks tables cannot include the character "-" in their name. Therefore, "-" is replaced with "_"
delta_identification: identifies new tables that need to be ingested. To do so, it crawls the entire Sharepoint library to identify "Data/Harmonized" folders.The new identified tables are added to the _ingestion_metadata table.
This script features the creation of the following _ingestion_metadata variables: filename, dta_path, country, year, quarter, survey, M_version, A_version. These are metadata fields that can be inferred from the filename. table_name is also computed, by removing M_version and A_version from the filename (since each table version is rewritten to the same table)
sharepoint_ingestion: ingests all new tables to prd_csc_mega.sgld48, either fully or in 5000-line chunks, if the file exceeds 900MB. The script also saves all variable labels as column comments in Databricks. At the end of the ingestion process, the following variables are updated in _ingestion_metadata
harmonization- GLD or GLD Light;household_level- TRUE if the dataset contains the hhid variable;ingested- TRUE if the ingestion process is successfultable_version- After each table is fully written, the script retrieves the table’s latest Delta version (the maximum version number). This version is stored in the _ingestion_metadata table and used for reproducibility, since the tables are overwritten when new versions are ingested.
metadata_parsing: This scripts populates metadata fields that cannot be inferred from the filename:
- It identifies the .do file that corresponds to the .dta file ingested, and parses it to extract the description of changes from the previous version to the current one. Both
do_pathandversion_labelare stored in the _ingestion_metadata table. - It identifies the .txt file "Where is this data from? - Readme.txt" file that corresponds to the .dta file ingested, and parses it to extract the data classification. This is stored as
classificationin the ingestion_metadata table. It also computes thestackingflag (1 if the table is supposed to be stacked in the gld_harmonized*_ tables, 0 otherwise).
The stacking flag identifies which dataset versions should be included for each country–year combination. Only datasets for which the data classification was successfully parsed are eligible to be stacked. For both annual data (by country–year) and quarterly data (by country–year–quarter), the most recent eligible version is stacked; if the latest version does not have a data classification in the _ingestion_metadata table, the logic falls back to the next most recent classified version. Panel datasets are always excluded from stacking, all other rows default to stacking = 0, and when multiple harmonization types exist for the same country–year, GLD datasets are preferred over GLD-Light.
table_stacking: stacks individual country–year survey tables into the consolidated gld_harmonized*_ tables. The script:
- Identifies which tables need to be added or updated by comparing
stackingflags and existingstacked_*_table_versionvalues in the _ingestion_metadata table. - For each table to be stacked, aligns its columns to the standard schema defined in
helpers/stacking_schema.r— casting types, filling missing columns with NULL, and carrying over dynamic columns (subnational IDs, GAUL codes). - Removes existing rows for the affected country–year–survey–quarter combinations from the harmonized tables (anti-join), then writes the new data in batches and overwrites the production table atomically.
- Records the new Databricks Delta table version in the _ingestion_metadata table (
stacked_all_table_version,stacked_ouo_table_version), which is later used to construct version statements for publication.
Example: THA_2021_LFS-Q2_V01_M_V02_A_GLD is ingested in tha_2021_lfs_q2 table, overwriting its content (i.e.THA_2021_LFS-Q2_V01_M_V01_A_GLD). Subsequently, the rows of the gld_harmonized_ table for Thailand 2021 are dropped, and replaced with the new rows from tha_2021_lfs_q2. After the replacement has occurred, the script retrieves the gld_harmonized_ latest Delta version (the maximum version number). This version is stored in the _ingestion_metadata table in the
stacked_ouo_table_versioncolumn and used to compute the version statement in the harmonized_json_creation script.
When a new column is added to or removed from the GLD data dictionary, the stacking schema must be updated so the harmonized tables reflect the change. The steps depend on whether the column is static (fixed name) or dynamic (name contains a numeric index).
Static columns — edit helpers/stacking_schema.r, get_gld_schema():
- To add a column: insert a new entry in the list with the column name and its Spark SQL type (e.g.,
new_column = "integer"). - To remove a column: delete its entry from the list.
Dynamic columns (columns whose names follow a pattern with a numeric index, e.g., subnatid1, gaul_adm2_code) — edit helpers/stacking_schema.r:
- Create a new
is_<name>_column()helper that uses a regex to match the column name pattern (seeis_subnational_columnandis_gaul_columnfor examples). - Update
is_dynamic_column()to include the new helper with an||clause.
json_creation: creates all the .json files from metadata in the _ingestion_metadata table, using the supporting files countries.csv, and survey_metadata.csv, and saves them in the json_temp folder. It also computes and verifies github links for those surveys that have documentation published in github.
json_publication: publishes all the json files in json_temp, alongside the corresponding datasets (using the file in the dta_path column of the _ingestion_metadata table), .do files (using the file in the do_path column of the _ingestion_metadata table) and technical documentation/questionnaires (found in the corresponding Docs folder).
Please note that the environment variable "NADA_API_KEY" is used to publish. if the API key needs to be updated, follow these instructions. The scope is GLDKEYVAULT, and it can be managed by anyone in the ITSDA-LKHS-DAP-PROD-gld team. Please note that the structure of the Docs folders varies. Some Docs folders contain files, some contain subfolders called Technical and Questionnaires - however these can be empty. For convenience, if documents are correctly organized, they will be published as two separate resources (Questionnaires and Technical Documentation). If they are not, all documents get uploaded as Technical Documentation.
harmonized_json_creation: computes necessary metadata fields and creates the .json files for the gld_harmonized*_ tables.
Publication to the Microdata Library requires explicit versioning. The harmonized_json_publication script assigns a version identifier (V*) to each gld_harmonized* table every time it is published. Each published version must include a version statement describing all changes made since the previous publication. This version statement is constructed in harmonized_json_creation. For example, consider publishing version V2 of the _gld_harmonized_all table. Suppose the current Databricks table version of _gld_harmonized_all is 15. To identify the Databricks table version corresponding to the previously published version (V1), the script queries the _ingestion_metadata table and retrieves the value of
stacked_all_published_table_versionfor rows wherestacked_all_published_version= 1. In this example, the returned value is 10, indicating that Databricks table version 10 corresponds to the published V1. The script then scans the _ingestion_metadata table for all country–year combinations withstacked_all_published_table_version> 10 and compiles the version statement accordingly, for example: “Updated data for [Country] [Year], [Country] [Year], …”.
harmonized_json_publication: forthcoming
To run API Integration tests, please set the RUN_API_INTEGRATION flag in the API Integration Tests Config section of the
helpers/configfile to TRUE.
Run the full test suite from the repo root:
Rscript tests/testthat_run.rRun a single test file:
R -e 'testthat::test_file("tests/testthat/test_do_file_parsing.r")'Run the full Python test suite from the repo root:
python -m pytestRun a single test file:
python -m pytest tests/pytest/test_ingestion_pipeline.pyImport the repository into a Databricks workspace, then run tests/testthat_run as a notebook. The notebook uses %run commands to execute each test file in sequence.
Please note that testthat is not able to crawl the testthat folder in Databricks, therefore if a new testing script is created, it needs to be added to the notebook manually to be run in Databricks.
Import the repository into a Databricks workspace, then run tests/pytest_run as a notebook. Please note that this will work only if the repository is added in a /Workspace/Repos folder. If the repository is uploaded in a /Workspace/Users/ folder, the subprocess won't be able to identify pytest scripts.
This project is licensed under the MIT License together with the World Bank IGO Rider. The Rider is purely procedural: it reserves all privileges and immunities enjoyed by the World Bank, without adding restrictions to the MIT permissions. Please review both files before using, distributing or contributing.
Should you have any questions or suggestions, please reach out to gld@worldbank.org