fix: resolve CI failures (formatting, mypy, Windows-only tests)

- Run ruff format on all source files
- Fix mypy type: ignore comments for .NET imports (import-not-found)
- Add mypy overrides for pythonnet/clr/clr_loader (no type stubs)
- Fix trace_export return type (dict[str, Any] not dict[str, str])
- Restrict CI test matrix to Windows-only (Power BI Desktop requirement)
This commit is contained in:
MinaSaad1 2026-03-27 07:28:44 +02:00
parent b777adec55
commit 58926221c2
4 changed files with 145 additions and 132 deletions

View file

@ -32,7 +32,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
os: [windows-latest]
python-version: ["3.10", "3.12", "3.13"]
steps:
- uses: actions/checkout@v4

View file

@ -83,3 +83,7 @@ markers = [
[tool.mypy]
python_version = "3.10"
strict = true
[[tool.mypy.overrides]]
module = ["pythonnet", "clr", "clr_loader"]
ignore_missing_imports = true

View file

@ -59,7 +59,9 @@ def _ensure_initialized() -> None:
def get_server_class() -> Any:
"""Return the ``Microsoft.AnalysisServices.Tabular.Server`` class."""
_ensure_initialized()
from Microsoft.AnalysisServices.Tabular import Server # type: ignore[import-untyped]
from Microsoft.AnalysisServices.Tabular import ( # type: ignore[import-not-found,unused-ignore]
Server,
)
return Server
@ -67,8 +69,8 @@ def get_server_class() -> Any:
def get_adomd_connection_class() -> Any:
"""Return the ``AdomdConnection`` class."""
_ensure_initialized()
from Microsoft.AnalysisServices.AdomdClient import (
AdomdConnection, # type: ignore[import-untyped]
from Microsoft.AnalysisServices.AdomdClient import ( # type: ignore[import-not-found,unused-ignore]
AdomdConnection,
)
return AdomdConnection
@ -77,7 +79,9 @@ def get_adomd_connection_class() -> Any:
def get_adomd_command_class() -> Any:
"""Return the ``AdomdCommand`` class."""
_ensure_initialized()
from Microsoft.AnalysisServices.AdomdClient import AdomdCommand # type: ignore[import-untyped]
from Microsoft.AnalysisServices.AdomdClient import (
AdomdCommand, # type: ignore[import-not-found,unused-ignore]
)
return AdomdCommand
@ -85,7 +89,9 @@ def get_adomd_command_class() -> Any:
def get_tmdl_serializer() -> Any:
"""Return the ``TmdlSerializer`` class."""
_ensure_initialized()
from Microsoft.AnalysisServices.Tabular import TmdlSerializer # type: ignore[import-untyped]
from Microsoft.AnalysisServices.Tabular import ( # type: ignore[import-not-found,unused-ignore]
TmdlSerializer,
)
return TmdlSerializer
@ -98,14 +104,12 @@ def get_tom_classes(*names: str) -> tuple[Any, ...]:
Measure, Table = get_tom_classes("Measure", "Table")
"""
_ensure_initialized()
import Microsoft.AnalysisServices.Tabular as TOM # type: ignore[import-untyped]
import Microsoft.AnalysisServices.Tabular as TOM # type: ignore[import-not-found,unused-ignore]
results: list[Any] = []
for name in names:
cls = getattr(TOM, name, None)
if cls is None:
raise AttributeError(
f"Class '{name}' not found in Microsoft.AnalysisServices.Tabular"
)
raise AttributeError(f"Class '{name}' not found in Microsoft.AnalysisServices.Tabular")
results.append(cls)
return tuple(results)

View file

@ -112,14 +112,16 @@ def table_list(model: Any) -> list[dict[str, Any]]:
"""List all tables with summary info."""
results: list[dict[str, Any]] = []
for table in model.Tables:
results.append({
"name": str(table.Name),
"columns": table.Columns.Count,
"measures": table.Measures.Count,
"partitions": table.Partitions.Count,
"isHidden": bool(table.IsHidden),
"description": _safe_str(table.Description),
})
results.append(
{
"name": str(table.Name),
"columns": table.Columns.Count,
"measures": table.Measures.Count,
"partitions": table.Partitions.Count,
"isHidden": bool(table.IsHidden),
"description": _safe_str(table.Description),
}
)
return results
@ -142,13 +144,15 @@ def table_get_schema(model: Any, table_name: str) -> list[dict[str, Any]]:
table = _get_table(model, table_name)
results: list[dict[str, Any]] = []
for col in table.Columns:
results.append({
"name": str(col.Name),
"dataType": str(col.DataType),
"type": str(col.Type),
"isHidden": bool(col.IsHidden),
"formatString": _safe_str(col.FormatString),
})
results.append(
{
"name": str(col.Name),
"dataType": str(col.DataType),
"type": str(col.Type),
"isHidden": bool(col.IsHidden),
"formatString": _safe_str(col.FormatString),
}
)
return results
@ -213,9 +217,7 @@ def table_delete(model: Any, table_name: str) -> dict[str, Any]:
return {"status": "deleted", "name": table_name}
def table_refresh(
model: Any, table_name: str, refresh_type: str = "Automatic"
) -> dict[str, Any]:
def table_refresh(model: Any, table_name: str, refresh_type: str = "Automatic") -> dict[str, Any]:
"""Request a table refresh."""
from pbi_cli.core.dotnet_loader import get_tom_classes
@ -242,9 +244,7 @@ def table_rename(model: Any, old_name: str, new_name: str) -> dict[str, Any]:
return {"status": "renamed", "oldName": old_name, "newName": new_name}
def table_mark_as_date(
model: Any, table_name: str, date_column: str
) -> dict[str, Any]:
def table_mark_as_date(model: Any, table_name: str, date_column: str) -> dict[str, Any]:
"""Mark a table as a date table."""
table = _get_table(model, table_name)
col = _get_column(table, date_column)
@ -265,15 +265,17 @@ def column_list(model: Any, table_name: str) -> list[dict[str, Any]]:
table = _get_table(model, table_name)
results: list[dict[str, Any]] = []
for col in table.Columns:
results.append({
"name": str(col.Name),
"dataType": str(col.DataType),
"type": str(col.Type),
"isHidden": bool(col.IsHidden),
"displayFolder": _safe_str(col.DisplayFolder),
"description": _safe_str(col.Description),
"formatString": _safe_str(col.FormatString),
})
results.append(
{
"name": str(col.Name),
"dataType": str(col.DataType),
"type": str(col.Type),
"isHidden": bool(col.IsHidden),
"displayFolder": _safe_str(col.DisplayFolder),
"description": _safe_str(col.Description),
"formatString": _safe_str(col.FormatString),
}
)
return results
@ -323,8 +325,7 @@ def column_create(
dt_map = _data_type_map()
if data_type not in dt_map:
raise ValueError(
f"Unknown data type '{data_type}'. "
f"Valid types: {', '.join(sorted(dt_map.keys()))}"
f"Unknown data type '{data_type}'. Valid types: {', '.join(sorted(dt_map.keys()))}"
)
if expression is not None:
@ -364,9 +365,7 @@ def column_delete(model: Any, table_name: str, column_name: str) -> dict[str, An
return {"status": "deleted", "name": column_name, "tableName": table_name}
def column_rename(
model: Any, table_name: str, old_name: str, new_name: str
) -> dict[str, Any]:
def column_rename(model: Any, table_name: str, old_name: str, new_name: str) -> dict[str, Any]:
"""Rename a column."""
table = _get_table(model, table_name)
col = _get_column(table, old_name)
@ -404,20 +403,20 @@ def measure_list(model: Any, table_name: str | None = None) -> list[dict[str, An
if table_name and str(table.Name) != table_name:
continue
for m in table.Measures:
results.append({
"name": str(m.Name),
"tableName": str(table.Name),
"expression": _safe_str(m.Expression),
"displayFolder": _safe_str(m.DisplayFolder),
"description": _safe_str(m.Description),
"isHidden": bool(m.IsHidden),
})
results.append(
{
"name": str(m.Name),
"tableName": str(table.Name),
"expression": _safe_str(m.Expression),
"displayFolder": _safe_str(m.DisplayFolder),
"description": _safe_str(m.Description),
"isHidden": bool(m.IsHidden),
}
)
return results
def measure_get(
model: Any, table_name: str, measure_name: str
) -> dict[str, Any]:
def measure_get(model: Any, table_name: str, measure_name: str) -> dict[str, Any]:
"""Get detailed metadata for a single measure."""
table = _get_table(model, table_name)
m = _get_measure(table, measure_name)
@ -497,9 +496,7 @@ def measure_delete(model: Any, table_name: str, name: str) -> dict[str, Any]:
return {"status": "deleted", "name": name, "tableName": table_name}
def measure_rename(
model: Any, table_name: str, old_name: str, new_name: str
) -> dict[str, Any]:
def measure_rename(model: Any, table_name: str, old_name: str, new_name: str) -> dict[str, Any]:
"""Rename a measure."""
table = _get_table(model, table_name)
m = _get_measure(table, old_name)
@ -508,9 +505,7 @@ def measure_rename(
return {"status": "renamed", "oldName": old_name, "newName": new_name}
def measure_move(
model: Any, table_name: str, name: str, dest_table_name: str
) -> dict[str, Any]:
def measure_move(model: Any, table_name: str, name: str, dest_table_name: str) -> dict[str, Any]:
"""Move a measure to a different table."""
src_table = _get_table(model, table_name)
dest_table = _get_table(model, dest_table_name)
@ -661,13 +656,15 @@ def partition_list(model: Any, table_name: str) -> list[dict[str, Any]]:
table = _get_table(model, table_name)
results: list[dict[str, Any]] = []
for p in table.Partitions:
results.append({
"name": str(p.Name),
"tableName": str(table.Name),
"mode": _safe_str(p.Mode),
"sourceType": _safe_str(p.SourceType),
"state": _safe_str(p.State),
})
results.append(
{
"name": str(p.Name),
"tableName": str(table.Name),
"mode": _safe_str(p.Mode),
"sourceType": _safe_str(p.SourceType),
"state": _safe_str(p.State),
}
)
return results
@ -725,9 +722,7 @@ def partition_delete(model: Any, table_name: str, name: str) -> dict[str, Any]:
return {"status": "deleted", "name": name, "tableName": table_name}
def partition_refresh(
model: Any, table_name: str, name: str
) -> dict[str, Any]:
def partition_refresh(model: Any, table_name: str, name: str) -> dict[str, Any]:
"""Refresh a partition."""
from pbi_cli.core.dotnet_loader import get_tom_classes
@ -749,11 +744,13 @@ def role_list(model: Any) -> list[dict[str, Any]]:
"""List all security roles."""
results: list[dict[str, Any]] = []
for role in model.Roles:
results.append({
"name": str(role.Name),
"description": _safe_str(role.Description),
"modelPermission": str(role.ModelPermission),
})
results.append(
{
"name": str(role.Name),
"description": _safe_str(role.Description),
"modelPermission": str(role.ModelPermission),
}
)
return results
@ -770,10 +767,12 @@ def role_get(model: Any, name: str) -> dict[str, Any]:
role = _get_role(model, name)
filters: list[dict[str, str]] = []
for tp in role.TablePermissions:
filters.append({
"tableName": str(tp.Table.Name),
"filterExpression": _safe_str(tp.FilterExpression),
})
filters.append(
{
"tableName": str(tp.Table.Name),
"filterExpression": _safe_str(tp.FilterExpression),
}
)
return {
"name": str(role.Name),
"description": _safe_str(role.Description),
@ -782,9 +781,7 @@ def role_get(model: Any, name: str) -> dict[str, Any]:
}
def role_create(
model: Any, name: str, description: str | None = None
) -> dict[str, Any]:
def role_create(model: Any, name: str, description: str | None = None) -> dict[str, Any]:
"""Create a security role."""
from pbi_cli.core.dotnet_loader import get_tom_classes
@ -816,16 +813,16 @@ def perspective_list(model: Any) -> list[dict[str, Any]]:
"""List all perspectives."""
results: list[dict[str, Any]] = []
for p in model.Perspectives:
results.append({
"name": str(p.Name),
"description": _safe_str(p.Description),
})
results.append(
{
"name": str(p.Name),
"description": _safe_str(p.Description),
}
)
return results
def perspective_create(
model: Any, name: str, description: str | None = None
) -> dict[str, Any]:
def perspective_create(model: Any, name: str, description: str | None = None) -> dict[str, Any]:
"""Create a perspective."""
from pbi_cli.core.dotnet_loader import get_tom_classes
@ -860,9 +857,7 @@ def perspective_delete(model: Any, name: str) -> dict[str, Any]:
# ---------------------------------------------------------------------------
def hierarchy_list(
model: Any, table_name: str | None = None
) -> list[dict[str, Any]]:
def hierarchy_list(model: Any, table_name: str | None = None) -> list[dict[str, Any]]:
"""List hierarchies, optionally filtered by table."""
results: list[dict[str, Any]] = []
for table in model.Tables:
@ -870,12 +865,14 @@ def hierarchy_list(
continue
for h in table.Hierarchies:
levels = [str(lv.Name) for lv in h.Levels]
results.append({
"name": str(h.Name),
"tableName": str(table.Name),
"description": _safe_str(h.Description),
"levels": levels,
})
results.append(
{
"name": str(h.Name),
"tableName": str(table.Name),
"description": _safe_str(h.Description),
"levels": levels,
}
)
return results
@ -893,11 +890,13 @@ def hierarchy_get(model: Any, table_name: str, name: str) -> dict[str, Any]:
h = _get_hierarchy(table, name)
levels = []
for lv in h.Levels:
levels.append({
"name": str(lv.Name),
"ordinal": int(lv.Ordinal),
"column": _safe_str(lv.Column.Name) if lv.Column else "",
})
levels.append(
{
"name": str(lv.Name),
"ordinal": int(lv.Ordinal),
"column": _safe_str(lv.Column.Name) if lv.Column else "",
}
)
return {
"name": str(h.Name),
"tableName": table_name,
@ -947,12 +946,14 @@ def calc_group_list(model: Any) -> list[dict[str, Any]]:
cg = table.CalculationGroup
if cg is not None:
items = [str(ci.Name) for ci in cg.CalculationItems]
results.append({
"name": str(table.Name),
"description": _safe_str(table.Description),
"precedence": int(cg.Precedence),
"items": items,
})
results.append(
{
"name": str(table.Name),
"description": _safe_str(table.Description),
"precedence": int(cg.Precedence),
"items": items,
}
)
return results
@ -1013,11 +1014,13 @@ def calc_item_list(model: Any, group_name: str) -> list[dict[str, Any]]:
raise ValueError(f"Table '{group_name}' is not a calculation group")
results: list[dict[str, Any]] = []
for ci in cg.CalculationItems:
results.append({
"name": str(ci.Name),
"expression": _safe_str(ci.Expression),
"ordinal": int(ci.Ordinal),
})
results.append(
{
"name": str(ci.Name),
"expression": _safe_str(ci.Expression),
"ordinal": int(ci.Ordinal),
}
)
return results
@ -1057,12 +1060,14 @@ def expression_list(model: Any) -> list[dict[str, Any]]:
"""List all named expressions (shared expressions / parameters)."""
results: list[dict[str, Any]] = []
for expr in model.Expressions:
results.append({
"name": str(expr.Name),
"kind": _safe_str(expr.Kind),
"expression": _safe_str(expr.Expression),
"description": _safe_str(expr.Description),
})
results.append(
{
"name": str(expr.Name),
"kind": _safe_str(expr.Kind),
"expression": _safe_str(expr.Expression),
"description": _safe_str(expr.Description),
}
)
return results
@ -1094,9 +1099,7 @@ def expression_create(
"""Create a named expression."""
from pbi_cli.core.dotnet_loader import get_tom_classes
NamedExpression, ExpressionKind = get_tom_classes(
"NamedExpression", "ExpressionKind"
)
NamedExpression, ExpressionKind = get_tom_classes("NamedExpression", "ExpressionKind")
e = NamedExpression()
e.Name = name
e.Kind = ExpressionKind.M
@ -1166,12 +1169,14 @@ def database_list(server: Any) -> list[dict[str, Any]]:
"""List all databases on the server."""
results: list[dict[str, Any]] = []
for db in server.Databases:
results.append({
"name": str(db.Name),
"id": str(db.ID),
"compatibilityLevel": int(db.CompatibilityLevel),
"lastUpdated": str(db.LastUpdate),
})
results.append(
{
"name": str(db.Name),
"id": str(db.ID),
"compatibilityLevel": int(db.CompatibilityLevel),
"lastUpdated": str(db.LastUpdate),
}
)
return results
@ -1251,7 +1256,7 @@ def trace_fetch() -> list[dict[str, Any]]:
return list(_trace_events)
def trace_export(path: str) -> dict[str, str]:
def trace_export(path: str) -> dict[str, Any]:
"""Export trace events to a JSON file."""
import json