Fix-16888: sas viya 4 table type error fixes (#27222)

This commit is contained in:
harshsoni2024 2026-04-17 17:08:32 +05:30 committed by GitHub
parent f8979cfb2b
commit 2591526b7e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 557 additions and 46 deletions

View file

@ -18,6 +18,7 @@ import copy
import json
import re
import traceback
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Iterable, Optional, Tuple
@ -83,6 +84,86 @@ from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@dataclass(frozen=True)
class SASResourceContext:
"""Components extracted from a SAS Information Catalog resourceId.
The SAS Data Tables REST API exposes table resources at paths of the form:
/dataTables/dataSources/{provider}~fs~{host}~fs~{library}/tables/{table}
where ``~fs~`` is the field separator (literal, not URL-encoded).
Known provider values
---------------------
- ``cas`` CAS (Cloud Analytic Services) table. *host* is the CAS
server name (e.g. ``cas-shared-default``).
- ``Compute`` SAS Compute session table. *host* is a session UUID
(e.g. ``49736234-36b3-48d2-b2e2-e12aa365ce05``).
Real-world examples
-------------------
CAS table:
``/dataTables/dataSources/cas~fs~cas-shared-default~fs~Samples/tables/WATER_CLUSTER``
Compute table:
``/dataTables/dataSources/Compute~fs~49736234-~fs~PUBLIC/tables/LAS_TRAIN``
Reference
---------
SAS REST API Data Tables service:
https://developer.sas.com/rest-apis/dataTables
"""
provider: str
host: str
library: str
raw_resource_id: str
@property
def database_name(self) -> str:
return f"{self.provider}.{self.host}"
# The field separator used inside the ``dataSources`` path segment.
_SAS_FIELD_SEPARATOR = "~fs~"
def parse_resource_id(resource_id: str) -> Optional[SASResourceContext]:
"""Parse a SAS Information Catalog resourceId into its components.
Returns ``None`` (instead of raising) when the resourceId does not
conform to the expected shape so that callers can cleanly fall back
to the relationships-based lookup.
"""
segments = resource_id.split("/")
# Expected: ['', 'dataTables', 'dataSources', '<context>', 'tables', ...]
if len(segments) < 4:
logger.warning(
"resourceId %r has fewer than 4 slash-delimited segments; "
"cannot extract provider/host/library.",
resource_id,
)
return None
context = segments[3]
parts = context.split(_SAS_FIELD_SEPARATOR)
if len(parts) < 3:
logger.warning(
"resourceId context segment %r has %d field(s) (expected 3: "
"provider, host, library); cannot derive database/schema.",
context,
len(parts),
)
return None
return SASResourceContext(
provider=parts[0],
host=parts[1],
library=parts[2],
raw_resource_id=resource_id,
)
class SasSource(
DatabaseServiceSource
): # pylint: disable=too-many-instance-attributes,too-many-public-methods
@ -232,53 +313,77 @@ class SasSource(
def create_database_schema(self, table):
"""
create database schema
Create database and schema entities for the given table.
First attempts to derive provider/host/library from the table's
``resourceId`` via ``parse_resource_id``. If the resourceId does
not match the expected SAS Data Tables shape, or the resulting
create/update call fails, falls back to a relationships-based
lookup through the Information Catalog.
"""
try:
context = table["resourceId"].split("/")[3]
resource_id = table.get("resourceId", "")
ctx = parse_resource_id(resource_id)
provider = context.split("~")[0]
self.db_name = provider + "." + context.split("~")[2]
self.db_schema_name = context.split("~")[4]
if ctx is not None:
try:
self.db_name = ctx.database_name
self.db_schema_name = ctx.library
database = CreateDatabaseRequest(
name=self.db_name,
displayName=self.db_name,
service=self.config.serviceName,
database = CreateDatabaseRequest(
name=self.db_name,
displayName=self.db_name,
service=self.config.serviceName,
)
database = self.metadata.create_or_update(data=database)
db_schema = CreateDatabaseSchemaRequest(
name=self.db_schema_name, database=database.fullyQualifiedName
)
return self.metadata.create_or_update(db_schema)
except HTTPError as exc:
logger.debug(
"Falling back to relationships-based schema lookup for "
"%s after HTTP error: %s",
resource_id,
exc,
)
return self._create_database_schema_from_relationships(table)
def _create_database_schema_from_relationships(self, table):
"""Derive database/schema from the table's catalog relationships.
This is the fallback path when ``parse_resource_id`` returns
``None`` or the primary create fails. It looks for a
``dataStoreDataSets`` relationship to locate the parent data
store, then uses ``create_database_alt`` for the database entity.
"""
data_store_data_sets = "4b114f6e-1c2a-4060-9184-6809a612f27b"
data_store_id = None
for relation in table.get("relationships", []):
if relation["definitionId"] != data_store_data_sets:
continue
data_store_id = relation["endpointId"]
break
if data_store_id is None:
logger.error(
"Failed to derive database schema for SAS table '%s' (resourceId=%s): "
"missing data store identifier because the expected "
"'dataStoreDataSets' relationship was not found.",
table.get("name", "<unknown>"),
table.get("resourceId", "<missing>"),
)
database = self.metadata.create_or_update(data=database)
return None
db_schema = CreateDatabaseSchemaRequest(
name=self.db_schema_name, database=database.fullyQualifiedName
)
db_schema_entity = self.metadata.create_or_update(db_schema)
return db_schema_entity
except HTTPError as _:
# Find the "database" entity in Information Catalog
# First see if the table is a member of the library through the relationships attribute
# Or we could use views to query the dataStores
data_store_data_sets = "4b114f6e-1c2a-4060-9184-6809a612f27b"
data_store_id = None
for relation in table["relationships"]:
if relation["definitionId"] != data_store_data_sets:
continue
data_store_id = relation["endpointId"]
break
if data_store_id is None:
# log error due to exclude amount of work with tables in dataTables
logger.error("Data store id should not be none")
return None
data_store = self.sas_client.get_instance(data_store_id)
database = self.create_database_alt(data_store)
self.db_schema_name = data_store["name"]
db_schema = CreateDatabaseSchemaRequest(
name=data_store["name"], database=database.fullyQualifiedName
)
db_schema_entity = self.metadata.create_or_update(db_schema)
return db_schema_entity
data_store = self.sas_client.get_instance(data_store_id)
database = self.create_database_alt(data_store)
self.db_schema_name = data_store["name"]
db_schema = CreateDatabaseSchemaRequest(
name=data_store["name"], database=database.fullyQualifiedName
)
return self.metadata.create_or_update(db_schema)
def create_columns_alt(self, table):
"""
@ -439,6 +544,7 @@ class SasSource(
global table_fqn
table_entity, table_fqn = None, None
table_name = table.get("name") if isinstance(table, dict) else None
try:
table_url = self.sas_client.get_information_catalog_link(table["id"])
@ -506,10 +612,13 @@ class SasSource(
custom_attributes = [
custom_attribute["name"] for custom_attribute in TABLE_CUSTOM_ATTR
]
# Drop null values — OpenMetadata's custom-field types
# (e.g. STRING_TYPE) reject null and fail the create with
# "Custom field <name> has invalid JSON [$: null found, string expected]"
extension_attributes = {
attr: value
for attr, value in table_extension.items()
if attr in custom_attributes
if attr in custom_attributes and value is not None
}
table_request = CreateTableRequest(
@ -529,6 +638,18 @@ class SasSource(
table_entity = self.metadata.get_by_name(
entity=Table, fqn=self.get_table_fqn(table_name)
)
# If the table wasn't actually persisted (e.g. the sink
# rejected the CreateTableRequest), skip the follow-up
# patch/profile calls so we don't raise an AttributeError
# that masks the real sink-side failure.
if table_entity is None:
logger.warning(
f"Table [{table_name}] was not created in OpenMetadata; "
"skipping description/extension/profile updates. "
"Check the sink logs for the underlying error."
)
return
# update the description
logger.debug(
f"Updating description for {table_entity.id.root} with {table_description}"
@ -595,10 +716,13 @@ class SasSource(
except Exception as exc:
logger.error(f"table failed to create: {table}")
error_name = table_name or (
table.get("id") if isinstance(table, dict) else "unknown"
)
yield Either(
left=StackTraceError(
name=table_name,
error=f"Unexpected exception to create table [{table_name}]: {exc}",
name=str(error_name),
error=f"Unexpected exception to create table [{error_name}]: {exc}",
stackTrace=traceback.format_exc(),
)
)
@ -637,7 +761,7 @@ class SasSource(
entity=Table, fqn=source_table_fqn
)
if source_table_entity:
if source_table_entity and target_table_entity:
yield from self.create_table_lineage(
source_table_entity, target_table_entity
)

View file

@ -0,0 +1,387 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Unit tests for the SAS connector.
These tests pin the bug fixes from issue #16888 where metadata ingestion
failed with a default SAS Viya 4 configuration because:
1. `casHost` (and other nullable SAS attributes) came back as None and
the backend rejected the CreateTableRequest with 400
"Custom field casHost has invalid JSON [$: null found, string expected]".
2. After the sink rejection, the source re-fetched the table and crashed
on `None.id`, masking the real sink error.
3. The bare `except` in `create_table_entity` referenced `table_name`
before it was assigned in some code paths.
4. `create_database_schema` only caught `HTTPError`, so a malformed
`resourceId` raised an uncaught `IndexError`.
"""
# pylint: disable=protected-access
from unittest.mock import MagicMock, patch
import pytest
from requests.exceptions import HTTPError
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.ingestion.source.database.sas.client import SASClient
from metadata.ingestion.source.database.sas.metadata import (
SASResourceContext,
SasSource,
parse_resource_id,
)
MOCK_SAS_CONFIG = {
"source": {
"type": "sas",
"serviceName": "local_sas",
"serviceConnection": {
"config": {
"type": "SAS",
"serverHost": "http://your-server-host.org",
"username": "username",
"password": "password",
"datatables": True,
"dataTablesCustomFilter": None,
"reports": False,
"reportsCustomFilter": None,
"dataflows": False,
"dataflowsCustomFilter": None,
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {"jwtToken": "sas-unit-test"},
}
},
}
# A realistic dataset search hit taken from the user's ingestion log
# (#16888). resourceId uses the "~fs~" separator style that the parsing
# code in create_database_schema relies on.
LAS_TRAIN_SEARCH_HIT = {
"id": "0396a44a-889f-4ee0-8211-252fc088a3cc",
"name": "LAS_TRAIN",
"type": "sasTable",
"attributes": {"library": "PUBLIC", "reviewStatus": "none"},
}
# Mock `get_views` response. The SAS source expects a `dataSet` entity
# plus zero or more `dataField`/`Column` entities in "entities".
LAS_TRAIN_VIEW = {
"entities": [
{
"id": "0396a44a-889f-4ee0-8211-252fc088a3cc",
"type": ["Table", "dataSet"],
"name": "LAS_TRAIN",
"resourceId": (
"/dataTables/dataSources/Compute~fs~"
"49736234-36b3-48d2-b2e2-e12aa365ce05~fs~PUBLIC/tables/LAS_TRAIN"
),
"creationTimeStamp": None,
"attributes": {
"analysisTimeStamp": "2024-07-01T10:25:00.000Z",
"rowCount": 10,
"columnCount": 1,
"dataSize": 1024,
# The specific field that triggered the original bug —
# SAS returns it as null for compute-backed tables.
"casHost": None,
"CASLIB": "PUBLIC",
"engineName": "V9",
},
},
{
"id": "col-1",
"type": ["Column"],
"name": "col1",
"attributes": {
"dataType": "char",
"ordinalPosition": 1,
"charsMaxCount": 10,
},
},
]
}
@pytest.fixture
def sas_source():
"""Build a SasSource with every network call mocked out."""
with patch.object(SASClient, "get_token", return_value="token"), patch(
"metadata.ingestion.source.database.sas.metadata.SasSource.test_connection"
), patch(
"metadata.ingestion.source.database.sas.metadata."
"SasSource.add_table_custom_attributes"
):
config = OpenMetadataWorkflowConfig.model_validate(MOCK_SAS_CONFIG)
source = SasSource.create(
MOCK_SAS_CONFIG["source"],
MagicMock(),
)
source.config = config.source
source.db_service_name = "local_sas"
return source
class TestParseResourceId:
"""Cover the standalone parse_resource_id function that extracts
provider/host/library from a SAS Information Catalog resourceId.
Known shapes (SAS Data Tables REST API):
/dataTables/dataSources/{provider}~fs~{host}~fs~{library}/tables/{table}
"""
def test_cas_table(self):
ctx = parse_resource_id(
"/dataTables/dataSources/cas~fs~cas-shared-default~fs~Samples"
"/tables/WATER_CLUSTER?ext=sashdat"
)
assert ctx == SASResourceContext(
provider="cas",
host="cas-shared-default",
library="Samples",
raw_resource_id=(
"/dataTables/dataSources/cas~fs~cas-shared-default~fs~Samples"
"/tables/WATER_CLUSTER?ext=sashdat"
),
)
assert ctx.database_name == "cas.cas-shared-default"
def test_compute_table(self):
ctx = parse_resource_id(
"/dataTables/dataSources/Compute~fs~"
"49736234-36b3-48d2-b2e2-e12aa365ce05~fs~PUBLIC/tables/LAS_TRAIN"
)
assert ctx.provider == "Compute"
assert ctx.host == "49736234-36b3-48d2-b2e2-e12aa365ce05"
assert ctx.library == "PUBLIC"
assert ctx.database_name == "Compute.49736234-36b3-48d2-b2e2-e12aa365ce05"
def test_too_few_slash_segments_returns_none(self):
assert parse_resource_id("/too/short") is None
def test_missing_field_separator_returns_none(self):
assert (
parse_resource_id("/dataTables/dataSources/no-separators-here/tables/T")
is None
)
def test_only_two_fields_returns_none(self):
assert parse_resource_id("/dataTables/dataSources/cas~fs~host/tables/T") is None
def test_empty_string_returns_none(self):
assert parse_resource_id("") is None
def test_frozen_dataclass(self):
ctx = parse_resource_id("/dataTables/dataSources/cas~fs~host~fs~lib/tables/T")
with pytest.raises(AttributeError):
ctx.provider = "modified"
class TestCreateDatabaseSchema:
"""Cover create_database_schema using parse_resource_id + fallback."""
def test_well_formed_resource_id_sets_db_and_schema(self, sas_source):
sas_source.metadata = MagicMock()
sas_source.metadata.create_or_update.return_value = MagicMock(
fullyQualifiedName="cas.cas-shared-default"
)
table = {
"resourceId": (
"/dataTables/dataSources/cas~fs~cas-shared-default~fs~Samples"
"/tables/WATER_CLUSTER?ext=sashdat"
),
}
sas_source.create_database_schema(table)
assert sas_source.db_name == "cas.cas-shared-default"
assert sas_source.db_schema_name == "Samples"
def test_malformed_resource_id_falls_back_to_relationships(self, sas_source):
sas_source.metadata = MagicMock()
sas_source.sas_client = MagicMock()
sas_source.sas_client.get_instance.return_value = {
"name": "fallback_schema",
"resourceId": "/dataSources/some/parent",
"links": [{"rel": "parent", "uri": "/parent"}],
}
sas_source.create_database_alt = MagicMock(
return_value=MagicMock(fullyQualifiedName="fallback_db")
)
table = {
"resourceId": "/too/short",
"relationships": [
{
"definitionId": "4b114f6e-1c2a-4060-9184-6809a612f27b",
"endpointId": "data-store-1",
}
],
}
result = sas_source.create_database_schema(table)
assert result is not None
sas_source.create_database_alt.assert_called_once()
def test_fallback_returns_none_when_no_data_store_relationship(self, sas_source):
sas_source.metadata = MagicMock()
sas_source.sas_client = MagicMock()
table = {
"resourceId": "/x/y",
"relationships": [],
}
assert sas_source.create_database_schema(table) is None
class TestExtensionAttributeFiltering:
"""The primary bug: null extension values must be stripped before
the CreateTableRequest is yielded, otherwise the sink returns 400."""
def _run_create_table_entity(self, sas_source):
sas_source.sas_client = MagicMock()
sas_source.sas_client.get_information_catalog_link.return_value = (
"http://sas/catalog/LAS_TRAIN"
)
sas_source.metadata = MagicMock()
# Table does not exist yet, so the source should yield a Create.
sas_source.metadata.get_by_name.return_value = None
with patch.object(
SasSource,
"get_entities_using_view",
return_value=(LAS_TRAIN_VIEW["entities"], LAS_TRAIN_VIEW["entities"][0]),
), patch.object(
SasSource,
"create_database_schema",
return_value=MagicMock(fullyQualifiedName="cas.49736234.PUBLIC"),
):
return list(sas_source.create_table_entity(LAS_TRAIN_SEARCH_HIT))
def test_null_cas_host_is_dropped_from_extension(self, sas_source):
"""The regression guard from #16888: casHost=None must not end
up in the CreateTableRequest extension."""
results = self._run_create_table_entity(sas_source)
create_requests = [
r.right
for r in results
if r.right is not None and isinstance(r.right, CreateTableRequest)
]
assert create_requests, f"No CreateTableRequest yielded: {results}"
request = create_requests[0]
assert request.extension is not None
extension = request.extension.root
assert "casHost" not in extension, (
"Null casHost must be stripped so the backend does not reject "
"the create with 'null found, string expected'"
)
# Non-null custom attributes should still be kept.
assert extension.get("CASLIB") == "PUBLIC"
assert extension.get("engineName") == "V9"
class TestSinkFailureGuard:
"""After yielding the CreateTableRequest, the source must tolerate
the table not existing (e.g. because the sink rejected the create)."""
def test_missing_table_after_yield_does_not_raise_attribute_error(self, sas_source):
"""Simulates the log in #16888: get_by_name returns None after the
yield because the sink 400'd. We must NOT crash on `None.id`."""
sas_source.sas_client = MagicMock()
sas_source.sas_client.get_information_catalog_link.return_value = (
"http://sas/catalog/LAS_TRAIN"
)
sas_source.metadata = MagicMock()
# Two get_by_name calls:
# 1. Check-before-create → None (table does not exist yet)
# 2. Re-fetch after yield → None (sink rejected create)
sas_source.metadata.get_by_name.return_value = None
with patch.object(
SasSource,
"get_entities_using_view",
return_value=(LAS_TRAIN_VIEW["entities"], LAS_TRAIN_VIEW["entities"][0]),
), patch.object(
SasSource,
"create_database_schema",
return_value=MagicMock(fullyQualifiedName="cas.49736234.PUBLIC"),
), patch.object(
SasSource, "create_lineage_table_source", return_value=iter([])
):
results = list(sas_source.create_table_entity(LAS_TRAIN_SEARCH_HIT))
# The CreateTableRequest must still be yielded (the sink will
# record its own failure); the source itself must NOT yield a
# StackTraceError (AttributeError) on the follow-up patch calls.
stack_trace_errors = [r for r in results if r.left is not None]
assert not stack_trace_errors, (
f"Source should not raise after sink-side failure, got: "
f"{[e.left.error for e in stack_trace_errors]}"
)
# The PATCH/profile calls must not have been invoked because we
# returned early.
sas_source.metadata.client.patch.assert_not_called()
sas_source.metadata.client.put.assert_not_called()
class TestExceptionHandlerSafety:
"""The bare except used to reference `table_name` which could be
undefined if the exception fired before it was assigned."""
def test_exception_before_table_name_assigned_yields_stack_trace(self, sas_source):
"""If get_entities_using_view throws, `table_name` is not set.
The except block must still produce a valid StackTraceError
(previously raised UnboundLocalError)."""
sas_source.sas_client = MagicMock()
sas_source.sas_client.get_information_catalog_link.return_value = "url"
sas_source.metadata = MagicMock()
with patch.object(
SasSource,
"get_entities_using_view",
side_effect=HTTPError("boom"),
):
results = list(sas_source.create_table_entity(LAS_TRAIN_SEARCH_HIT))
errors = [r.left for r in results if r.left is not None]
assert len(errors) == 1
error = errors[0]
# Falls back to the search-hit's name (not an UnboundLocalError).
assert error.name == "LAS_TRAIN"
assert "boom" in error.error
def test_exception_with_non_dict_table_yields_unknown_name(self, sas_source):
"""Defensive: even if `table` is not a dict, the except block
should still produce a valid StackTraceError."""
sas_source.sas_client = MagicMock()
sas_source.sas_client.get_information_catalog_link.side_effect = RuntimeError(
"kaboom"
)
sas_source.metadata = MagicMock()
results = list(sas_source.create_table_entity({"id": "abc"}))
errors = [r.left for r in results if r.left is not None]
assert len(errors) == 1
# Without a "name" in the search hit, we fall back to the id.
assert errors[0].name == "abc"