# Databricks notebook source
# MAGIC %md
# MAGIC # TitanRDM SDK System Tests
# MAGIC
# MAGIC This notebook tests the TitanRDM SDK against a real TitanRDM instance.
# MAGIC
# MAGIC ## Prerequisites
# MAGIC 1. Create a secret scope named `titan-rdm` with the following keys:
# MAGIC    - `url`: TitanRDM instance URL
# MAGIC    - `client_id`: OAuth client ID
# MAGIC    - `client_secret`: OAuth client secret
# MAGIC 2. Upload test CSV files to DBFS (e.g., `/dbfs/test_data/customers.csv`)
# MAGIC
# MAGIC ## Usage
# MAGIC Run cells sequentially or use "Run All" to execute all tests.

# COMMAND ----------

# MAGIC %md
# MAGIC ## Setup

# COMMAND ----------

# Install the SDK (adjust path or use PyPI when published)

# For development, install from source or use:
# %pip uninstall -y titan-rdm-sdk
# %pip install titan-rdm-sdk

# COMMAND ----------

# DBTITLE 1,Configuration Widgets
# Create widgets for configurable parameters
dbutils.widgets.text("branch_id", "174", "Branch ID")
dbutils.widgets.text("table_definition_key", "100", "Table Definition Key")
dbutils.widgets.text("import_mapping_key", "10", "Import Mapping Key")
dbutils.widgets.text("test_csv_path", "/dbfs/test_data/customers.csv", "Test CSV Path")

# COMMAND ----------

# DBTITLE 1,Load Configuration
# Get credentials from Databricks secret scope
# To create a secret scope:
#   databricks secrets create-scope --scope titan-rdm
#   databricks secrets put --scope titan-rdm --key url
#   databricks secrets put --scope titan-rdm --key client_id
#   databricks secrets put --scope titan-rdm --key client_secret

try:
    TITAN_URL = dbutils.secrets.get(scope="titan-rdm", key="url")
    TITAN_CLIENT_ID = dbutils.secrets.get(scope="titan-rdm", key="client_id")
    TITAN_CLIENT_SECRET = dbutils.secrets.get(scope="titan-rdm", key="client_secret")
    print("✓ Credentials loaded from secret scope 'titan-rdm'")
except Exception as e:
    print(f"⚠ Could not load secrets: {e}")
    print("Please set up the 'titan-rdm' secret scope with url, client_id, and client_secret")
    # Fallback for testing - DO NOT hardcode credentials in production!
    TITAN_URL = None
    TITAN_CLIENT_ID = None
    TITAN_CLIENT_SECRET = None

# Get widget values
BRANCH_ID = int(dbutils.widgets.get("branch_id"))
TABLE_DEFINITION_KEY = int(dbutils.widgets.get("table_definition_key"))
IMPORT_MAPPING_KEY = int(dbutils.widgets.get("import_mapping_key"))
TEST_CSV_PATH = dbutils.widgets.get("test_csv_path")

print(f"Branch ID: {BRANCH_ID}")
print(f"Table Definition Key: {TABLE_DEFINITION_KEY}")
print(f"Import Mapping Key: {IMPORT_MAPPING_KEY}")
print(f"Test CSV Path: {TEST_CSV_PATH}")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Test 1: Authentication

# COMMAND ----------

# DBTITLE 1,Test Client Authentication
from titan_rdm_sdk import TitanRDMClient
from titan_rdm_sdk.exceptions import AuthenticationError, APIError

def test_authentication():
    """Test that client authenticates successfully."""
    try:
        client = TitanRDMClient(
            url=TITAN_URL,
            client_id=TITAN_CLIENT_ID,
            client_secret=TITAN_CLIENT_SECRET,
        )
        print("✓ Authentication successful!")
        print(f"  URL: {client.url}")
        print(f"  Token expires at: {client.token_expires_at}")
        return client
    except AuthenticationError as e:
        print(f"✗ Authentication failed: {e}")
        raise

# Run test
client = test_authentication()

# COMMAND ----------

# MAGIC %md
# MAGIC ## Test 2: Upload Workflow

# COMMAND ----------

# DBTITLE 1,Load Test Data
import pandas as pd

def load_test_data(csv_path: str) -> pd.DataFrame:
    """Load test CSV file."""
    try:
        df = pd.read_csv(csv_path)
        print(f"✓ Loaded {len(df)} rows from {csv_path}")
        print(f"  Columns: {list(df.columns)}")
        display(df.head())
        return df
    except Exception as e:
        print(f"✗ Failed to load CSV: {e}")
        raise

# Run test
test_df = load_test_data(TEST_CSV_PATH)

# COMMAND ----------

# DBTITLE 1,Test Full Upload Workflow
def test_full_upload(client, df, branch_id, table_def_key, import_map_key):
    """Test complete upload workflow."""
    print("Starting full upload test...")
    
    # Step 1: Create upload batch
    upload = client.get_upload(
        branch_id=branch_id,
        description="Databricks system test - full upload",
        correlation_code="databricks-test-001",
    )
    print(f"✓ Created upload batch: ID={upload.id}, Status={upload.status}")
    
    # Step 2: Create table upload
    table_upload = upload.get_table_upload(
        table_definition_key=table_def_key,
        import_mapping_key=import_map_key,
        pattern="full",
    )
    print(f"✓ Created table upload: ID={table_upload.id}")
    
    # Step 3: Send data
    table_upload.send(df)
    print(f"✓ Data uploaded. Status={table_upload.status}")
    
    # Step 4: Complete import
    upload.complete(message="Databricks system test completed")
    print(f"✓ Upload completed. Final status={upload.status}")
    
    return upload

# Run test
upload_result = test_full_upload(
    client=client,
    df=test_df,
    branch_id=BRANCH_ID,
    table_def_key=TABLE_DEFINITION_KEY,
    import_map_key=IMPORT_MAPPING_KEY,
)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Test 3: Download Workflow

# COMMAND ----------

# DBTITLE 1,Test Full Download Workflow
def test_full_download(client, branch_id, table_def_key):
    """Test complete download (export) workflow."""
    print("Starting full download test...")
    
    # Step 1: Create export
    download = client.get_download(
        branch_id=branch_id,
        table_definition_key=table_def_key,
        pattern="full",
        correlation_code="databricks-download-001",
    )
    print(f"✓ Created export: ID={download.id}, Status={download.status}")
    
    # Step 2: Wait for export to complete
    print("Waiting for export to complete...")
    final_status = download.wait_until_ready(poll_interval=2.0, max_wait=120.0)
    print(f"✓ Export completed with status: {final_status}")
    
    if final_status == "ready":
        # Step 3: Download data
        df = download.receive()
        print(f"✓ Downloaded {len(df)} rows, {len(df.columns)} columns")
        print(f"  Rows exported: {download.rows_exported}")
        print(f"  Duration: {download.duration_secs} seconds")
        display(df.head())
        return df
    else:
        print(f"✗ Export failed: {download.message}")
        return None

# Run test
downloaded_df = test_full_download(
    client=client,
    branch_id=BRANCH_ID,
    table_def_key=TABLE_DEFINITION_KEY,
)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Test 4: Incremental Upload

# COMMAND ----------

# DBTITLE 1,Test Incremental Upload
def test_incremental_upload(client, df, branch_id, table_def_key, import_map_key):
    """Test incremental upload pattern."""
    print("Starting incremental upload test...")
    
    # Use subset of data
    df_subset = df.head(5)
    print(f"Using {len(df_subset)} rows for incremental test")
    
    upload = client.get_upload(
        branch_id=branch_id,
        description="Databricks system test - incremental",
        correlation_code="databricks-incremental-002",
    )
    
    table_upload = upload.get_table_upload(
        table_definition_key=table_def_key,
        import_mapping_key=import_map_key,
        pattern="incremental",
        correlation_code='abc123'
    )
    
    table_upload.send(df_subset)
    upload.complete()
    
    print(f"✓ Incremental upload completed. Status={upload.status}")
    return upload

# Run test
incremental_result = test_incremental_upload(
    client=client,
    df=test_df,
    branch_id=BRANCH_ID,
    table_def_key=TABLE_DEFINITION_KEY,
    import_map_key=IMPORT_MAPPING_KEY,
)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Test 5: Error Handling

# COMMAND ----------

# DBTITLE 1,Test Error Handling
def test_error_handling(client, branch_id):
    """Test that errors are handled correctly."""
    from titan_rdm_sdk.exceptions import APIError, ValidationError
    
    print("Testing error handling...")
    
    # Test 1: Invalid branch ID
    try:
        client.get_upload(branch_id=999999999)
        print("✗ Should have raised an error for invalid branch ID")
    except APIError as e:
        print(f"✓ Correctly raised APIError for invalid branch ID: {e}")
    
    # Test 2: Invalid table definition key
    try:
        upload = client.get_upload(branch_id=branch_id)
        upload.get_table_upload(
            table_definition_key=999999999,
            import_mapping_key=1,
        )
        print("✗ Should have raised an error for invalid table definition key")
    except APIError as e:
        print(f"✓ Correctly raised APIError for invalid table key: {e}")
    
    print("✓ Error handling tests passed!")

# Run test
test_error_handling(client=client, branch_id=BRANCH_ID)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Test 6: List Branches

# COMMAND ----------

# DBTITLE 1,Test List Branches
def test_list_branches(client):
    """Test listing all branches."""
    from titan_rdm_sdk import Branch
    
    print("Listing branches...")
    branches = client.get_branches()
    
    print(f"✓ Found {len(branches)} branch(es)")
    for b in branches:
        print(f"  [{b.id}] {b.name} (type={b.branch_type}, status={b.status})")
    
    assert len(branches) > 0
    assert all(isinstance(b, Branch) for b in branches)
    print("✓ List branches test passed!")
    return branches

# Run test
branches_list = test_list_branches(client)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Test 7: Get Branch by Name

# COMMAND ----------

# DBTITLE 1,Test Get Branch by Name
def test_get_branch_by_name(client, branches_list):
    """Test getting a branch by name."""
    # Use the test branch name
    target_name = 'test'
    print(f"Looking up branch by name: '{target_name}'...")
    
    branch = client.get_branch_by_name(target_name)
    
    print(f"✓ Found branch: id={branch.id}, name={branch.name}")
    print(f"  type={branch.branch_type}, status={branch.status}")
    print(f"  database_suffix={branch.database_suffix}")
    if branch.parent_branch_name:
        print(f"  parent={branch.parent_branch_name}")
    
    assert branch.name == target_name
    print("✓ Get branch by name test passed!")
    return branch

# Run test
found_branch = test_get_branch_by_name(client, branches_list)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Test 8: Get Branch by ID

# COMMAND ----------

# DBTITLE 1,Test Get Branch by ID
def test_get_branch_by_id(client, branch_id):
    """Test getting a branch by ID."""
    print(f"Looking up branch by ID: {branch_id}...")
    
    branch = client.get_branch(branch_id)
    
    print(f"✓ Found branch: id={branch.id}, name={branch.name}")
    assert branch.id == branch_id
    print("✓ Get branch by ID test passed!")
    return branch

# Run test
branch_by_id = test_get_branch_by_id(client, found_branch.id)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Test 9: List Domains

# COMMAND ----------

# DBTITLE 1,Test List Domains
def test_list_domains(client):
    """Test listing all domains."""
    from titan_rdm_sdk import Domain
    
    print("Listing domains...")
    domains = client.get_domains()
    
    print(f"✓ Found {len(domains)} domain(s)")
    for d in domains:
        print(f"  [{d.id}] {d.name} (abbrev={d.abbreviation})")
    
    assert len(domains) > 0
    assert all(isinstance(d, Domain) for d in domains)
    print("✓ List domains test passed!")
    return domains

# Run test
domains_list = test_list_domains(client)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Test 10: Get Domain by Name and ID

# COMMAND ----------

# DBTITLE 1,Test Get Domain by Name and ID
def test_get_domain(client, domains_list):
    """Test getting a domain by name and by ID."""
    target = domains_list[0]
    
    # By name
    print(f"Looking up domain by name: '{target.name}'...")
    domain_by_name = client.get_domain_by_name(target.name)
    assert domain_by_name.id == target.id
    print(f"✓ Found domain by name: id={domain_by_name.id}, name={domain_by_name.name}")
    
    # By ID
    print(f"Looking up domain by ID: {target.id}...")
    domain_by_id = client.get_domain(target.id)
    assert domain_by_id.name == target.name
    print(f"✓ Found domain by ID: id={domain_by_id.id}, name={domain_by_id.name}")
    
    print("✓ Get domain tests passed!")
    return domain_by_name

# Run test
found_domain = test_get_domain(client, domains_list)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Test 11: List Deployed Table Definitions

# COMMAND ----------

# DBTITLE 1,Test List Deployed Table Definitions
def test_list_deployed_tables(client, branch_id, domain_id):
    """Test listing deployed table definitions for a branch and domain."""
    from titan_rdm_sdk import DeployedTableDefinition
    
    print(f"Listing deployed tables for branch_id={branch_id}, domain_id={domain_id}...")
    tables = client.get_deployed_table_definitions(
        branch_id=branch_id,
        domain_id=domain_id,
    )
    
    print(f"✓ Found {len(tables)} deployed table(s)")
    for t in tables:
        col_count = len(t.column_definitions)
        print(f"  [{t.id}] {t.name} → {t.database_table_name} ({col_count} columns)")
    
    assert all(isinstance(t, DeployedTableDefinition) for t in tables)
    print("✓ List deployed table definitions test passed!")
    return tables

# Run test
deployed_tables = test_list_deployed_tables(
    client,
    branch_id=found_branch.id,
    domain_id=found_domain.id,
)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Test 12: Get Deployed Table Definition by Key

# COMMAND ----------

# DBTITLE 1,Test Get Deployed Table Definition by Key
def test_get_deployed_table_by_key(client, deployed_tables, branch_id):
    """Test looking up a deployed table definition by key and branch."""
    if not deployed_tables:
        print("⚠ No deployed tables to test with — skipping")
        return None
    
    target = deployed_tables[0]
    print(f"Looking up deployed table by key='{target.key}', branch_id={branch_id}...")
    
    table = client.get_deployed_table_definition(
        key=target.key,
        branch_id=branch_id,
    )
    
    print(f"✓ Found: {table.name} → {table.database_table_name}")
    print(f"  Domain: {table.domain_name}")
    print(f"  Columns ({len(table.column_definitions)}):")
    for col in table.column_definitions:
        pk = " [PK]" if col.is_primary_key else ""
        req = " NOT NULL" if col.is_required else ""
        print(f"    {col.name}: {col.data_type}{pk}{req}")
    
    assert table.key == target.key
    print("✓ Get deployed table definition by key test passed!")
    return table

# Run test
found_table = test_get_deployed_table_by_key(
    client,
    deployed_tables=deployed_tables,
    branch_id=found_branch.id,
)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Test Summary

# COMMAND ----------

# DBTITLE 1,Summary
print("=" * 60)
print("TitanRDM SDK System Tests - Summary")
print("=" * 60)
print(f"✓ Authentication: Passed")
print(f"✓ Full Upload: Passed (ID: {upload_result.id})")
print(f"✓ Full Download: {'Passed' if downloaded_df is not None else 'Failed'}")
print(f"✓ Incremental Upload: Passed (ID: {incremental_result.id})")
print(f"✓ Error Handling: Passed")
print(f"✓ List Branches: Passed ({len(branches_list)} found)")
print(f"✓ Get Branch by Name: Passed ({found_branch.name})")
print(f"✓ Get Branch by ID: Passed ({branch_by_id.id})")
print(f"✓ List Domains: Passed ({len(domains_list)} found)")
print(f"✓ Get Domain: Passed ({found_domain.name})")
print(f"✓ List Deployed Tables: Passed ({len(deployed_tables)} found)")
print(f"✓ Get Deployed Table by Key: {'Passed' if found_table else 'Skipped'}")
print("=" * 60)
print("All tests completed!")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Cleanup (Optional)
# MAGIC
# MAGIC Add cleanup logic here if needed to remove test data from TitanRDM.
