Convention Sync (Pandas)
The ConventionSync class provides a platform-agnostic, convention-based approach to syncing data between TitanRDM and your data platform using pandas DataFrames. It eliminates the need for hard-coded table lists — the SDK discovers all deployed tables automatically.
Naming Convention
Convention Sync uses a predictable naming pattern to match source/target tables:
{domain_abbreviation}_{database_table_name}
For example, if a domain has abbreviation cust and a table has database_table_name of customers, the convention key is cust_customers.
How It Works
- Discovers metadata — Lists all deployed tables for a given domain and branch
- Matches by convention — Maps each table to a DataFrame key using
{abbreviation}_{database_table_name} - Uploads/Downloads — Processes all matching tables in a single operation
Adding a new table in TitanRDM automatically includes it in the next sync run — no code changes required.
Upload by Convention
Upload a dict of DataFrames to TitanRDM. The dict keys must follow the naming convention.
from titan_rdm_sdk import TitanRDMClient
from titan_rdm_sdk.spark_sync import ConventionSync
client = TitanRDMClient(url=URL, client_id=ID, client_secret=SECRET)
sync = ConventionSync(client=client)
# Prepare your DataFrames
dataframes = {
"clin_sites": sites_df,
"clin_delivery_centre": delivery_centre_df,
"clin_org_unit": org_unit_df,
}
# Upload all DataFrames matching the domain's deployed tables
results = sync.upload_by_convention(
branch_id=174,
domain_name="Clinics",
dataframes=dataframes,
)
for r in results:
print(f" {r['table']}: {r['rows']} rows — {r['status']}")
Upload Specific Tables Only
Pass table_names to limit which tables are synced:
results = sync.upload_by_convention(
branch_id=174,
domain_name="Clinics",
dataframes=dataframes,
table_names=["Site", "Delivery Centre"],
)
Parameters
| Parameter | Type | Required | Description |
branch_id | int | Yes | Target branch ID |
domain_name | str | Yes | Exact domain name in TitanRDM |
dataframes | dict[str, DataFrame] | Yes | Dict of {convention_key: DataFrame} |
table_names | list[str] | No | Filter to specific table names |
description | str | No | Import batch description |
correlation_code | str | No | Tracking identifier |
Download by Convention
Download all deployed tables in a domain as pandas DataFrames:
results, dataframes = sync.download_by_convention(
branch_id=174,
domain_name="Clinics",
)
# dataframes is a dict: {"clin_sites": DataFrame, "clin_delivery_centre": DataFrame, ...}
for key, df in dataframes.items():
print(f" {key}: {len(df)} rows")
Download Specific Tables Only
results, dataframes = sync.download_by_convention(
branch_id=174,
domain_name="Clinics",
table_names=["Site", "Delivery Centre", "Org Unit"],
)
Parameters
| Parameter | Type | Required | Description |
branch_id | int | Yes | Target branch ID |
domain_name | str | Yes | Exact domain name in TitanRDM |
table_names | list[str] | No | Filter to specific table names |
correlation_code | str | No | Tracking identifier prefix |
poll_interval | float | No | Seconds between export checks (default: 2.0) |
max_wait | float | No | Max seconds to wait per export (default: 300.0) |
Return Values
Both methods return a list of result dicts:
[
{"domain": "Clinics", "table": "sites", "rows": 150, "status": "success"},
{"domain": "Clinics", "table": "delivery_centre", "rows": 45, "status": "success"},
{"domain": "Clinics", "table": "org_unit", "rows": 0, "status": "skipped (no source data)"},
]
| Status | Meaning |
success | Table synced successfully |
skipped (no source data) | No matching DataFrame provided (upload) |
skipped (empty) | DataFrame was empty |
error: | An error occurred |
Full Example: Sync All Domains
from titan_rdm_sdk import TitanRDMClient
from titan_rdm_sdk.spark_sync import ConventionSync
client = TitanRDMClient(url=URL, client_id=ID, client_secret=SECRET)
sync = ConventionSync(client=client)
branch = client.get_branch_by_name("prod")
domains = client.get_domains()
# Download all tables across all domains
all_data = {}
for domain in domains:
results, frames = sync.download_by_convention(
branch_id=branch.id,
domain_name=domain.name,
)
all_data.update(frames)
print(f"Downloaded {len(all_data)} tables total")
Example Notebook
For a complete working example, see the Convention Sync Example Notebook.
Next Steps
- Spark Sync — Extend convention sync with automatic Spark catalog read/write
- Platform Integrations — BigQuery and Snowflake sync classes
- Example Notebooks — End-to-end Databricks examples