Developing on Metadata Ingestion
If you just want to use metadata ingestion, check the user-centric guide. This document is for developers who want to develop and possibly contribute to the metadata ingestion framework.
Also take a look at the guide to adding a source.
Getting Started
Requirements
- Python 3.8+ must be installed in your host environment.
- Java 17 (gradle won't work with newer or older versions)
- On Debian/Ubuntu:
sudo apt install python3-dev python3-venv
- On Fedora (if using LDAP source integration):
sudo yum install openldap-devel
Set up your Python environment
From the repository root:
cd metadata-ingestion
../gradlew :metadata-ingestion:installDev
source venv/bin/activate
datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)"
(Optional) Set up your Python environment for developing on Airflow Plugin
From the repository root:
cd metadata-ingestion-modules/airflow-plugin
../../gradlew :metadata-ingestion-modules:airflow-plugin:installDev
source venv/bin/activate
datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)"
# start the airflow web server
export AIRFLOW_HOME=~/airflow
airflow webserver --port 8090 -d
# start the airflow scheduler
airflow scheduler
# access the airflow service and run any of the DAG
# open http://localhost:8090/
# select any DAG and click on the `play arrow` button to start the DAG
# add the debug lines in the codebase, i.e. in ./src/datahub_airflow_plugin/datahub_listener.py
logger.debug("this is the sample debug line")
# run the DAG again and you can see the debug lines in the task_run log at,
#1. click on the `timestamp` in the `Last Run` column
#2. select the task
#3. click on the `log` option
P.S. if you are not able to see the log lines, then restart the
airflow scheduler
and rerun the DAG
(Optional) Set up your Python environment for developing on Dagster Plugin
From the repository root:
cd metadata-ingestion-modules/dagster-plugin
../../gradlew :metadata-ingestion-modules:dagster-plugin:installDev
source venv/bin/activate
datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)"
(Optional) Set up your Python environment for developing on Prefect Plugin
From the repository root:
cd metadata-ingestion-modules/prefect-plugin
../../gradlew :metadata-ingestion-modules:prefect-plugin:installDev
source venv/bin/activate
datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)"
(Optional) Set up your Python environment for developing on GX Plugin
From the repository root:
cd metadata-ingestion-modules/gx-plugin
../../gradlew :metadata-ingestion-modules:gx-plugin:installDev
source venv/bin/activate
datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)"
(Optional) Set up your Python environment for developing on Dagster Plugin
From the repository root:
cd metadata-ingestion-modules/dagster-plugin
../../gradlew :metadata-ingestion-modules:dagster-plugin:installDev
source venv/bin/activate
datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)"
Common setup issues
Common issues (click to expand):
datahub command not found with PyPI install
If you've already run the pip install, but running datahub
in your command line doesn't work, then there is likely an issue with your PATH setup and Python.
The easiest way to circumvent this is to install and run via Python, and use python3 -m datahub
in place of datahub
.
python3 -m pip install --upgrade acryl-datahub
python3 -m datahub --help
Wheel issues e.g. "Failed building wheel for avro-python3" or "error: invalid command 'bdist_wheel'"
This means Python's wheel
is not installed. Try running the following commands and then retry.
pip install --upgrade pip wheel setuptools
pip cache purge
Failure to install confluent_kafka: "error: command 'x86_64-linux-gnu-gcc' failed with exit status 1"
This sometimes happens if there's a version mismatch between the Kafka's C library and the Python wrapper library. Try running pip install confluent_kafka==1.5.0
and then retrying.
Conflict: acryl-datahub requires pydantic 1.10
The base acryl-datahub
package supports both Pydantic 1.x and 2.x. However, some of our specific sources require Pydantic 1.x because of transitive dependencies.
If you're primarily using acryl-datahub
for the SDKs, you can install acryl-datahub
and some extras, like acryl-datahub[sql-parser]
, without getting conflicts related to Pydantic versioning.
We recommend not installing full ingestion sources into your main environment (e.g. avoid having a dependency on acryl-datahub[snowflake]
or other ingestion sources).
Instead, we recommend using UI-based ingestion or isolating the ingestion pipelines using virtual environments. If you're using an orchestrator, they often have first-class support for virtual environments - here's an example for Airflow.
Using Plugins in Development
The syntax for installing plugins is slightly different in development. For example:
- pip install 'acryl-datahub[bigquery,datahub-rest]'
+ pip install -e '.[bigquery,datahub-rest]'
Architecture
The architecture of this metadata ingestion framework is heavily inspired by Apache Gobblin (also originally a LinkedIn project!). We have a standardized format - the MetadataChangeEvent - and sources and sinks which respectively produce and consume these objects. The sources pull metadata from a variety of data systems, while the sinks are primarily for moving this metadata into DataHub.
Code layout
- The CLI interface is defined in entrypoints.py and in the cli directory.
- The high level interfaces are defined in the API directory.
- The actual sources and sinks have their own directories. The registry files in those directories import the implementations.
- The metadata models are created using code generation, and eventually live in the
./src/datahub/metadata
directory. However, these files are not checked in and instead are generated at build time. See the codegen script for details. - Tests live in the
tests
directory. They're split between smaller unit tests and larger integration tests.
Code style
We use ruff, and mypy to ensure consistent code style and quality.
# Assumes: pip install -e '.[dev]' and venv is activated
ruff check src/ tests/
mypy src/ tests/
or you can run from root of the repository
./gradlew :metadata-ingestion:lint
# This will auto-fix some linting issues.
./gradlew :metadata-ingestion:lintFix
Some other notes:
- Prefer mixin classes over tall inheritance hierarchies.
- Write type annotations wherever possible.
- Use
typing.Protocol
to make implicit interfaces explicit. - If you ever find yourself copying and pasting large chunks of code, there's probably a better way to do it.
- Prefer a standalone helper method over a
@staticmethod
. - You probably should not be defining a
__hash__
method yourself. Using@dataclass(frozen=True)
is a good way to get a hashable class. - Avoid global state. In sources, this includes instance variables that effectively function as "global" state for the source.
- Avoid defining functions within other functions. This makes it harder to read and test the code.
- When interacting with external APIs, parse the responses into a dataclass rather than operating directly on the response object.
Dependency Management
The vast majority of our dependencies are not required by the "core" package but instead can be optionally installed using Python "extras". This allows us to keep the core package lightweight. We should be deliberate about adding new dependencies to the core framework.
Where possible, we should avoid pinning version dependencies. The acryl-datahub
package is frequently used as a library and hence installed alongside other tools. If you need to restrict the version of a dependency, use a range like >=1.2.3,<2.0.0
or a negative constraint like >=1.2.3, !=1.2.7
instead. Every upper bound and negative constraint should be accompanied by a comment explaining why it's necessary.
Caveat: Some packages like Great Expectations and Airflow frequently make breaking changes. For such packages, it's ok to add a "defensive" upper bound with the current latest version, accompanied by a comment. It's critical that we revisit these upper bounds at least once a month and broaden them if possible.
Guidelines for Ingestion Configs
We use pydantic to define the ingestion configs. In order to ensure that the configs are consistent and easy to use, we have a few guidelines:
Naming
- Most important point: we should match the terminology of the source system. For example, snowflake shouldn’t have a
host_port
, it should have anaccount_id
. - We should prefer slightly more verbose names when the alternative isn’t descriptive enough. For example
client_id
ortenant_id
over a bareid
andaccess_secret
over a baresecret
. - AllowDenyPatterns should be used whenever we need to filter a list. The pattern should always apply to the fully qualified name of the entity. These configs should be named
*_pattern
, for exampletable_pattern
. - Avoid
*_only
configs likeprofile_table_level_only
in favor ofprofile_table_level
andprofile_column_level
.include_tables
andinclude_views
are a good example.
Content
- All configs should have a description.
- When using inheritance or mixin classes, make sure that the fields and documentation is applicable in the base class. The
bigquery_temp_table_schema
field definitely shouldn’t be showing up in every single source’s profiling config! - Set reasonable defaults!
- The configs should not contain a default that you’d reasonably expect to be built in. As a bad example, the Postgres source’s
schema_pattern
has a default deny pattern containinginformation_schema
. This means that if the user overrides the schema_pattern, they’ll need to manually add the information_schema to their deny patterns. This is a bad, and the filtering should’ve been handled automatically by the source’s implementation, not added at runtime by its config.
- The configs should not contain a default that you’d reasonably expect to be built in. As a bad example, the Postgres source’s
Coding
- Use a single pydantic validator per thing to validate - we shouldn’t have validation methods that are 50 lines long.
- Use
SecretStr
for passwords, auth tokens, etc. - When doing simple field renames, use the
pydantic_renamed_field
helper. - When doing field deprecations, use the
pydantic_removed_field
helper. - Validator methods must only throw ValueError, TypeError, or AssertionError. Do not throw ConfigurationError from validators.
- Set
hidden_from_docs
for internal-only config flags. However, needing this often indicates a larger problem with the code structure. The hidden field should probably be a class attribute or an instance variable on the corresponding source.
Testing
# Follow standard install from source procedure - see above.
# Install, including all dev requirements.
pip install -e '.[dev]'
# For running integration tests, you can use
pip install -e '.[integration-tests]'
# Run the full testing suite
pytest -vv
# Run unit tests.
pytest -m 'not integration'
# Run Docker-based integration tests.
pytest -m 'integration'
# You can also run these steps via the gradle build:
../gradlew :metadata-ingestion:lint
../gradlew :metadata-ingestion:lintFix
../gradlew :metadata-ingestion:testQuick
../gradlew :metadata-ingestion:testFull
../gradlew :metadata-ingestion:check
# Run all tests in a single file
../gradlew :metadata-ingestion:testSingle -PtestFile=tests/unit/test_bigquery_source.py
# Run all tests under tests/unit
../gradlew :metadata-ingestion:testSingle -PtestFile=tests/unit
Updating golden test files
If you made some changes that require generating new "golden" data files for use in testing a specific ingestion source, you can run the following to re-generate them:
pytest tests/integration/<source>/<source>.py --update-golden-files
For example,
pytest tests/integration/dbt/test_dbt.py --update-golden-files
Testing the Airflow plugin
For the Airflow plugin, we use tox
to test across multiple sets of dependencies.
cd metadata-ingestion-modules/airflow-plugin
# Run all tests.
tox
# Run a specific environment.
# These are defined in the `tox.ini` file
tox -e py310-airflow26
# Run a specific test.
tox -e py310-airflow26 -- tests/integration/test_plugin.py
# Update all golden files.
tox -- --update-golden-files
# Update golden files for a specific environment.
tox -e py310-airflow26 -- --update-golden-files