Uploading Data
Upload data from pandas DataFrames to TitanRDM. The SDK supports full and incremental upload patterns, and handles multi-table imports in a single batch.
Note: If you are using Databricks or another Spark-based environment, consider using the
SparkSyncclass instead of manually creating uploads. TheSparkSyncclass provides a more convenient way to upload data by automatically discovering tables and creating uploads for them. Note:SparkSyncclass uses theConventionSyncclass internally. You can useConventionSyncdirectly with pandas DataFrames if you prefer more control over the upload process.
Concepts
| Term | Description |
| Upload (Import Batch) | A container for one or more table uploads. Created once, completed after all tables are sent. |
| Table Upload | A single table's data within an import batch. Each table upload targets a specific table definition and import mapping. |
| Pattern | full replaces all data; incremental adds/updates rows; incremental_with_delete adds/updates/removes rows. |
| Import Mapping | Defines how your source columns map to the target table's columns. |
Full Upload Workflow
import pandas as pd
from titan_rdm_sdk import TitanRDMClient
client = TitanRDMClient(
url="https://your-tenant.titanrdm.com",
client_id="your_client_id",
client_secret="your_client_secret",
)
# Step 1: Create an import batch
upload = client.get_upload(
branch_id=174,
description="Customer data update",
correlation_code="batch-001",
)
print(f"Import batch created: ID={upload.id}, Status={upload.status}")
# Step 2: Create a table upload
table_upload = upload.get_table_upload(
table_definition_key=456,
import_mapping_key=123,
pattern="full",
)
print(f"Table upload created: ID={table_upload.id}")
# Step 3: Send data
df = pd.DataFrame({
"customer_id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"email": ["alice@example.com", "bob@example.com", "charlie@example.com"],
})
table_upload.send(df)
print(f"Data uploaded. Status={table_upload.status}")
# Step 4: Complete the import batch
upload.complete(message="Upload completed successfully")
print(f"Import completed. Final status={upload.status}")
Incremental Upload
Use pattern="incremental" to add or update rows without replacing the full dataset:
upload = client.get_upload(
branch_id=174,
description="Incremental customer update",
correlation_code="incremental-002",
)
table_upload = upload.get_table_upload(
table_definition_key=456,
import_mapping_key=123,
pattern="incremental",
correlation_code="incr-customers",
)
# Only send the changed rows
df_changes = df.head(5)
table_upload.send(df_changes)
upload.complete()
Incremental with Delete
Use pattern="incremental_with_delete" to also remove rows that are no longer present in the source:
table_upload = upload.get_table_upload(
table_definition_key=456,
import_mapping_key=123,
pattern="incremental_with_delete",
)
Multi-Table Upload
Upload multiple tables in a single import batch:
upload = client.get_upload(
branch_id=branch.id,
description="Multi-table sync",
correlation_code="multi-upload-001",
)
# Upload customers
customers_upload = upload.get_table_upload(
table_definition_key=100,
import_mapping_key=10,
pattern="full",
)
customers_upload.send(customers_df)
# Upload orders
orders_upload = upload.get_table_upload(
table_definition_key=101,
import_mapping_key=11,
pattern="full",
)
orders_upload.send(orders_df)
# Complete the batch (processes all tables)
upload.complete(message="Multi-table upload done")
Using Default Import Mappings
If you don't know the import mapping key, use the SDK to look it up:
# Get deployed table definition
tables = client.get_deployed_table_definitions(branch_id=5, domain_id=1)
table = tables[0]
# Get the default import mapping
import_mapping = client.get_default_import_mapping(table.id)
# Use it in your upload
table_upload = upload.get_table_upload(
table_definition_key=table.key,
import_mapping_key=import_mapping.key,
pattern="full",
)
Upload Object Properties
Upload (Import Batch)
| Property | Type | Description |
id | int | Import batch ID |
status | str | Current status |
message | str | Status message |
description | str | Import description |
correlation_code | str | User-defined tracking ID |
insert_row_count | int | Total rows inserted |
update_row_count | int | Total rows updated |
delete_row_count | int | Total rows deleted |
duration_secs | int | Processing duration |
TableUpload
| Property | Type | Description |
id | int | Table upload ID |
status | str | Current status |
message | str | Status message |
correlation_code | str | User-defined tracking ID |
file_size | int | Uploaded file size in bytes |
insert_row_count | int | Rows inserted |
update_row_count | int | Rows updated |
delete_row_count | int | Rows deleted |
duration_secs | int | Processing duration |
Parameters Reference
client.get_upload()
| Parameter | Type | Required | Description |
branch_id | int | Yes | Target branch ID |
description | str | No | Description of the import |
correlation_code | str | No | User-defined tracking identifier |
upload.get_table_upload()
| Parameter | Type | Required | Description |
table_definition_key | int | Yes | Target table definition key |
import_mapping_key | int | Yes | Import mapping key |
pattern | str | No | 'full', 'incremental', or 'incremental_with_delete' (default: 'full') |
correlation_code | str | No | Tracking identifier for this table upload |
Next Steps
- Downloading Data — Export data from TitanRDM
- Convention Sync — Automate uploads for entire domains
- Error Handling — Handle upload errors gracefully