diff --git a/src/pbi_cli/commands/database.py b/src/pbi_cli/commands/database.py index 7e81f9d..1235719 100644 --- a/src/pbi_cli/commands/database.py +++ b/src/pbi_cli/commands/database.py @@ -48,6 +48,24 @@ def export_tmdl(ctx: PbiContext, folder_path: str) -> None: run_command(ctx, _export_tmdl, database=session.database, folder_path=folder_path) +@database.command(name="diff-tmdl") +@click.argument("base_folder", type=click.Path(exists=True, file_okay=False)) +@click.argument("head_folder", type=click.Path(exists=True, file_okay=False)) +@pass_context +def diff_tmdl_cmd(ctx: PbiContext, base_folder: str, head_folder: str) -> None: + """Compare two TMDL export folders and show what changed. + + Useful for CI/CD to summarise model changes between branches: + + pbi database diff-tmdl ./base-export/ ./head-export/ + + No Power BI Desktop connection is required. + """ + from pbi_cli.core.tmdl_diff import diff_tmdl_folders + + run_command(ctx, diff_tmdl_folders, base_folder=base_folder, head_folder=head_folder) + + @database.command(name="export-tmsl") @pass_context def export_tmsl(ctx: PbiContext) -> None: diff --git a/src/pbi_cli/core/tmdl_diff.py b/src/pbi_cli/core/tmdl_diff.py new file mode 100644 index 0000000..c91aa29 --- /dev/null +++ b/src/pbi_cli/core/tmdl_diff.py @@ -0,0 +1,315 @@ +"""TMDL folder diff -- pure Python, no .NET required.""" + +from __future__ import annotations + +import re +from pathlib import Path +from typing import Any + +from pbi_cli.core.errors import PbiCliError + +# Entity keywords inside table files (at 1-tab indent) +_TABLE_ENTITY_KEYWORDS = frozenset({"measure", "column", "hierarchy", "partition", "variation"}) + + +def diff_tmdl_folders(base_folder: str, head_folder: str) -> dict[str, Any]: + """Compare two TMDL export folders and return a structured diff. + + Works on any two folders produced by ``pbi database export-tmdl`` or + exported from Power BI Desktop / Fabric Git. No live connection needed. + + Returns a dict with keys: base, head, changed, summary, tables, + relationships, model. + """ + base = Path(base_folder) + head = Path(head_folder) + if not base.is_dir(): + raise PbiCliError(f"Base folder not found: {base}") + if not head.is_dir(): + raise PbiCliError(f"Head folder not found: {head}") + + base_def = _find_definition_dir(base) + head_def = _find_definition_dir(head) + + tables_diff = _diff_tables(base_def, head_def) + rels_diff = _diff_relationships(base_def, head_def) + model_diff = _diff_model(base_def, head_def) + + any_changed = bool( + tables_diff["added"] + or tables_diff["removed"] + or tables_diff["changed"] + or rels_diff["added"] + or rels_diff["removed"] + or rels_diff["changed"] + or model_diff["changed_properties"] + ) + + summary: dict[str, Any] = { + "tables_added": len(tables_diff["added"]), + "tables_removed": len(tables_diff["removed"]), + "tables_changed": len(tables_diff["changed"]), + "relationships_added": len(rels_diff["added"]), + "relationships_removed": len(rels_diff["removed"]), + "relationships_changed": len(rels_diff["changed"]), + "model_changed": bool(model_diff["changed_properties"]), + } + + return { + "base": str(base), + "head": str(head), + "changed": any_changed, + "summary": summary, + "tables": tables_diff, + "relationships": rels_diff, + "model": model_diff, + } + + +def _find_definition_dir(folder: Path) -> Path: + """Return the directory that directly contains model.tmdl / tables/. + + Handles both: + - Direct layout: folder/model.tmdl + - SemanticModel: folder/definition/model.tmdl + """ + candidate = folder / "definition" + if candidate.is_dir(): + return candidate + return folder + + +def _read_tmdl(path: Path) -> str: + """Read a TMDL file, returning empty string if absent.""" + if not path.exists(): + return "" + return path.read_text(encoding="utf-8") + + +def _strip_lineage_tags(text: str) -> str: + """Remove lineageTag lines so spurious GUID regeneration is ignored.""" + return re.sub(r"[ \t]*lineageTag:.*\n?", "", text) + + +# --------------------------------------------------------------------------- +# Table diffing +# --------------------------------------------------------------------------- + + +def _diff_tables(base_def: Path, head_def: Path) -> dict[str, Any]: + base_tables_dir = base_def / "tables" + head_tables_dir = head_def / "tables" + + base_names = _list_tmdl_names(base_tables_dir) + head_names = _list_tmdl_names(head_tables_dir) + + added = sorted(head_names - base_names) + removed = sorted(base_names - head_names) + changed: dict[str, Any] = {} + + for name in sorted(base_names & head_names): + base_text = _read_tmdl(base_tables_dir / f"{name}.tmdl") + head_text = _read_tmdl(head_tables_dir / f"{name}.tmdl") + if _strip_lineage_tags(base_text) == _strip_lineage_tags(head_text): + continue + table_diff = _diff_table_entities(base_text, head_text) + if any(table_diff[k] for k in table_diff): + changed[name] = table_diff + + return {"added": added, "removed": removed, "changed": changed} + + +def _list_tmdl_names(tables_dir: Path) -> set[str]: + """Return stem names of all .tmdl files in a directory.""" + if not tables_dir.is_dir(): + return set() + return {p.stem for p in tables_dir.glob("*.tmdl")} + + +def _diff_table_entities( + base_text: str, head_text: str +) -> dict[str, list[str]]: + """Compare entity blocks within two table TMDL files.""" + base_entities = _parse_table_entities(base_text) + head_entities = _parse_table_entities(head_text) + + result: dict[str, list[str]] = { + "measures_added": [], + "measures_removed": [], + "measures_changed": [], + "columns_added": [], + "columns_removed": [], + "columns_changed": [], + "partitions_added": [], + "partitions_removed": [], + "partitions_changed": [], + "other_added": [], + "other_removed": [], + "other_changed": [], + } + + all_keys = set(base_entities) | set(head_entities) + for key in sorted(all_keys): + keyword, _, name = key.partition("/") + added_key = f"{keyword}s_added" if f"{keyword}s_added" in result else "other_added" + removed_key = f"{keyword}s_removed" if f"{keyword}s_removed" in result else "other_removed" + changed_key = f"{keyword}s_changed" if f"{keyword}s_changed" in result else "other_changed" + + if key not in base_entities: + result[added_key].append(name) + elif key not in head_entities: + result[removed_key].append(name) + else: + b = _strip_lineage_tags(base_entities[key]) + h = _strip_lineage_tags(head_entities[key]) + if b != h: + result[changed_key].append(name) + + # Remove empty other_* lists to keep output clean + for k in ("other_added", "other_removed", "other_changed"): + if not result[k]: + del result[k] + + return result + + +def _parse_table_entities(text: str) -> dict[str, str]: + """Parse a table TMDL file into {keyword/name: text_block} entries. + + Entities (measure, column, hierarchy, partition, variation) start at + exactly one tab of indentation inside the table declaration. + """ + entities: dict[str, str] = {} + lines = text.splitlines(keepends=True) + current_key: str | None = None + current_lines: list[str] = [] + + for line in lines: + # Entity declaration: starts with exactly one tab, not two + if line.startswith("\t") and not line.startswith("\t\t"): + stripped = line[1:] # remove leading tab + keyword = stripped.split()[0] if stripped.split() else "" + if keyword in _TABLE_ENTITY_KEYWORDS: + # Save previous block + if current_key is not None: + entities[current_key] = "".join(current_lines) + name = _extract_entity_name(keyword, stripped) + current_key = f"{keyword}/{name}" + current_lines = [line] + continue + + if current_key is not None: + current_lines.append(line) + + if current_key is not None: + entities[current_key] = "".join(current_lines) + + return entities + + +def _extract_entity_name(keyword: str, declaration: str) -> str: + """Extract the entity name from a TMDL declaration line (no leading tab).""" + # e.g. "measure 'Total Revenue' = ..." -> "Total Revenue" + # e.g. "column ProductID" -> "ProductID" + # e.g. "partition Sales = m" -> "Sales" + rest = declaration[len(keyword):].strip() + if rest.startswith("'"): + end = rest.find("'", 1) + return rest[1:end] if end > 0 else rest[1:] + # Take first token, stop at '=' or whitespace + token = re.split(r"[\s=]", rest)[0] + return token.strip("'\"") if token else rest + + +# --------------------------------------------------------------------------- +# Relationship diffing +# --------------------------------------------------------------------------- + + +def _diff_relationships(base_def: Path, head_def: Path) -> dict[str, list[str]]: + base_rels = _parse_relationships(_read_tmdl(base_def / "relationships.tmdl")) + head_rels = _parse_relationships(_read_tmdl(head_def / "relationships.tmdl")) + + all_keys = set(base_rels) | set(head_rels) + added: list[str] = [] + removed: list[str] = [] + changed: list[str] = [] + + for key in sorted(all_keys): + if key not in base_rels: + added.append(key) + elif key not in head_rels: + removed.append(key) + elif _strip_lineage_tags(base_rels[key]) != _strip_lineage_tags(head_rels[key]): + changed.append(key) + + return {"added": added, "removed": removed, "changed": changed} + + +def _parse_relationships(text: str) -> dict[str, str]: + """Parse relationships.tmdl into {from -> to: text_block} entries.""" + if not text.strip(): + return {} + + blocks: dict[str, str] = {} + current_lines: list[str] = [] + in_rel = False + + for line in text.splitlines(keepends=True): + if line.startswith("relationship "): + if in_rel and current_lines: + _save_relationship(current_lines, blocks) + current_lines = [line] + in_rel = True + elif in_rel: + current_lines.append(line) + + if in_rel and current_lines: + _save_relationship(current_lines, blocks) + + return blocks + + +def _save_relationship(lines: list[str], blocks: dict[str, str]) -> None: + """Extract semantic key from a relationship block and store it.""" + from_col = "" + to_col = "" + for line in lines: + stripped = line.strip() + if stripped.startswith("fromColumn:"): + from_col = stripped.split(":", 1)[1].strip() + elif stripped.startswith("toColumn:"): + to_col = stripped.split(":", 1)[1].strip() + if from_col or to_col: + key = f"{from_col} -> {to_col}" + blocks[key] = "".join(lines) + + +# --------------------------------------------------------------------------- +# Model property diffing +# --------------------------------------------------------------------------- + + +def _diff_model(base_def: Path, head_def: Path) -> dict[str, list[str]]: + base_props = _parse_model_props(_read_tmdl(base_def / "model.tmdl")) + head_props = _parse_model_props(_read_tmdl(head_def / "model.tmdl")) + + changed: list[str] = [] + all_keys = set(base_props) | set(head_props) + for key in sorted(all_keys): + b_val = base_props.get(key) + h_val = head_props.get(key) + if b_val != h_val: + changed.append(f"{key}: {b_val!r} -> {h_val!r}") + + return {"changed_properties": changed} + + +def _parse_model_props(text: str) -> dict[str, str]: + """Extract key: value properties at 1-tab indent from model.tmdl.""" + props: dict[str, str] = {} + for line in text.splitlines(): + if line.startswith("\t") and not line.startswith("\t\t") and ":" in line: + key, _, val = line[1:].partition(":") + props[key.strip()] = val.strip() + return props diff --git a/tests/test_tmdl_diff.py b/tests/test_tmdl_diff.py new file mode 100644 index 0000000..f42c7ed --- /dev/null +++ b/tests/test_tmdl_diff.py @@ -0,0 +1,319 @@ +"""Tests for pbi_cli.core.tmdl_diff.""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import pytest + +from pbi_cli.core.errors import PbiCliError +from pbi_cli.core.tmdl_diff import diff_tmdl_folders + +# --------------------------------------------------------------------------- +# Fixture helpers +# --------------------------------------------------------------------------- + +_MODEL_TMDL = """\ +model Model +\tculture: en-US +\tdefaultPowerBIDataSourceVersion: powerBI_V3 +\tsourceQueryCulture: en-US + +ref table Sales +ref cultureInfo en-US +""" + +_RELATIONSHIPS_TMDL = """\ +relationship abc-def-111 +\tlineageTag: xyz +\tfromColumn: Sales.ProductID +\ttoColumn: Product.ProductID + +relationship abc-def-222 +\tfromColumn: Sales.CustomerID +\ttoColumn: Customer.CustomerID +""" + +_SALES_TMDL = """\ +table Sales +\tlineageTag: tbl-001 + +\tmeasure 'Total Revenue' = SUM(Sales[Amount]) +\t\tformatString: "$#,0" +\t\tlineageTag: msr-001 + +\tcolumn Amount +\t\tdataType: decimal +\t\tlineageTag: col-001 +\t\tsummarizeBy: sum +\t\tsourceColumn: Amount + +\tpartition Sales = m +\t\tmode: import +\t\tsource +\t\t\tlet +\t\t\t Source = Csv.Document(...) +\t\t\tin +\t\t\t Source +""" + +_DATE_TMDL = """\ +table Date +\tlineageTag: tbl-002 + +\tcolumn Date +\t\tdataType: dateTime +\t\tlineageTag: col-002 +\t\tsummarizeBy: none +\t\tsourceColumn: Date +""" + +# Inline TMDL snippets reused across multiple tests +_NEW_MEASURE_SNIPPET = ( + "\n\tmeasure 'YTD Revenue'" + " = CALCULATE([Total Revenue], DATESYTD('Date'[Date]))" + "\n\t\tlineageTag: msr-new\n" +) +_TOTAL_REVENUE_BLOCK = ( + "\n\tmeasure 'Total Revenue' = SUM(Sales[Amount])" + '\n\t\tformatString: "$#,0"' + "\n\t\tlineageTag: msr-001\n" +) +_NEW_COL_SNIPPET = ( + "\n\tcolumn Region" + "\n\t\tdataType: string" + "\n\t\tsummarizeBy: none" + "\n\t\tsourceColumn: Region\n" +) +_AMOUNT_COL_BLOCK = ( + "\n\tcolumn Amount" + "\n\t\tdataType: decimal" + "\n\t\tlineageTag: col-001" + "\n\t\tsummarizeBy: sum" + "\n\t\tsourceColumn: Amount\n" +) +_NEW_REL_SNIPPET = ( + "\nrelationship abc-def-999" + "\n\tfromColumn: Sales.RegionID" + "\n\ttoColumn: Region.ID\n" +) +_TRIMMED_RELS = ( + "relationship abc-def-111" + "\n\tfromColumn: Sales.ProductID" + "\n\ttoColumn: Product.ProductID\n" +) +_REL_222_BASE = ( + "relationship abc-def-222" + "\n\tfromColumn: Sales.CustomerID" + "\n\ttoColumn: Customer.CustomerID" +) +_REL_222_CHANGED = ( + "relationship abc-def-222" + "\n\tfromColumn: Sales.CustomerID" + "\n\ttoColumn: Customer.CustomerID" + "\n\tcrossFilteringBehavior: bothDirections" +) + + +def _make_tmdl_folder( + root: Path, + *, + model_text: str = _MODEL_TMDL, + relationships_text: str = _RELATIONSHIPS_TMDL, + tables: dict[str, str] | None = None, +) -> Path: + """Create a minimal TMDL folder under root and return its path.""" + if tables is None: + tables = {"Sales": _SALES_TMDL, "Date": _DATE_TMDL} + root.mkdir(parents=True, exist_ok=True) + (root / "model.tmdl").write_text(model_text, encoding="utf-8") + (root / "database.tmdl").write_text("database\n\tcompatibilityLevel: 1600\n", encoding="utf-8") + (root / "relationships.tmdl").write_text(relationships_text, encoding="utf-8") + tables_dir = root / "tables" + tables_dir.mkdir() + for name, text in tables.items(): + (tables_dir / f"{name}.tmdl").write_text(text, encoding="utf-8") + return root + + +def _make_semantic_model_folder( + root: Path, + **kwargs: Any, +) -> Path: + """Create a SemanticModel-layout folder (definition/ subdirectory).""" + root.mkdir(parents=True, exist_ok=True) + defn_dir = root / "definition" + defn_dir.mkdir() + _make_tmdl_folder(defn_dir, **kwargs) + (root / ".platform").write_text("{}", encoding="utf-8") + return root + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestDiffTmdlFolders: + def test_identical_folders_returns_no_changes(self, tmp_path: Path) -> None: + base = _make_tmdl_folder(tmp_path / "base") + head = _make_tmdl_folder(tmp_path / "head") + result = diff_tmdl_folders(str(base), str(head)) + assert result["changed"] is False + assert result["summary"]["tables_added"] == 0 + assert result["summary"]["tables_removed"] == 0 + assert result["summary"]["tables_changed"] == 0 + + def test_lineage_tag_only_change_is_not_reported(self, tmp_path: Path) -> None: + base = _make_tmdl_folder(tmp_path / "base") + changed_sales = _SALES_TMDL.replace("tbl-001", "NEW-TAG").replace("msr-001", "NEW-MSR") + head = _make_tmdl_folder( + tmp_path / "head", + tables={"Sales": changed_sales, "Date": _DATE_TMDL}, + ) + result = diff_tmdl_folders(str(base), str(head)) + assert result["changed"] is False + + def test_table_added(self, tmp_path: Path) -> None: + product_tmdl = "table Product\n\tlineageTag: tbl-003\n\n\tcolumn ID\n\t\tdataType: int64\n" + base = _make_tmdl_folder(tmp_path / "base") + head = _make_tmdl_folder( + tmp_path / "head", + tables={"Sales": _SALES_TMDL, "Date": _DATE_TMDL, "Product": product_tmdl}, + ) + result = diff_tmdl_folders(str(base), str(head)) + assert result["changed"] is True + assert "Product" in result["tables"]["added"] + assert result["tables"]["removed"] == [] + + def test_table_removed(self, tmp_path: Path) -> None: + base = _make_tmdl_folder(tmp_path / "base") + head = _make_tmdl_folder(tmp_path / "head", tables={"Sales": _SALES_TMDL}) + result = diff_tmdl_folders(str(base), str(head)) + assert "Date" in result["tables"]["removed"] + + def test_measure_added(self, tmp_path: Path) -> None: + modified_sales = _SALES_TMDL + _NEW_MEASURE_SNIPPET + base = _make_tmdl_folder(tmp_path / "base") + head = _make_tmdl_folder( + tmp_path / "head", + tables={"Sales": modified_sales, "Date": _DATE_TMDL}, + ) + result = diff_tmdl_folders(str(base), str(head)) + assert result["changed"] is True + sales_diff = result["tables"]["changed"]["Sales"] + assert "YTD Revenue" in sales_diff["measures_added"] + + def test_measure_removed(self, tmp_path: Path) -> None: + stripped_sales = _SALES_TMDL.replace(_TOTAL_REVENUE_BLOCK, "") + base = _make_tmdl_folder(tmp_path / "base") + head = _make_tmdl_folder( + tmp_path / "head", + tables={"Sales": stripped_sales, "Date": _DATE_TMDL}, + ) + result = diff_tmdl_folders(str(base), str(head)) + sales_diff = result["tables"]["changed"]["Sales"] + assert "Total Revenue" in sales_diff["measures_removed"] + + def test_measure_expression_changed(self, tmp_path: Path) -> None: + modified_sales = _SALES_TMDL.replace( + "measure 'Total Revenue' = SUM(Sales[Amount])", + "measure 'Total Revenue' = SUMX(Sales, Sales[Amount] * Sales[Qty])", + ) + base = _make_tmdl_folder(tmp_path / "base") + head = _make_tmdl_folder( + tmp_path / "head", + tables={"Sales": modified_sales, "Date": _DATE_TMDL}, + ) + result = diff_tmdl_folders(str(base), str(head)) + sales_diff = result["tables"]["changed"]["Sales"] + assert "Total Revenue" in sales_diff["measures_changed"] + + def test_column_added(self, tmp_path: Path) -> None: + modified_sales = _SALES_TMDL + _NEW_COL_SNIPPET + base = _make_tmdl_folder(tmp_path / "base") + head = _make_tmdl_folder( + tmp_path / "head", + tables={"Sales": modified_sales, "Date": _DATE_TMDL}, + ) + result = diff_tmdl_folders(str(base), str(head)) + sales_diff = result["tables"]["changed"]["Sales"] + assert "Region" in sales_diff["columns_added"] + + def test_column_removed(self, tmp_path: Path) -> None: + stripped = _SALES_TMDL.replace(_AMOUNT_COL_BLOCK, "") + base = _make_tmdl_folder(tmp_path / "base") + head = _make_tmdl_folder( + tmp_path / "head", + tables={"Sales": stripped, "Date": _DATE_TMDL}, + ) + result = diff_tmdl_folders(str(base), str(head)) + sales_diff = result["tables"]["changed"]["Sales"] + assert "Amount" in sales_diff["columns_removed"] + + def test_relationship_added(self, tmp_path: Path) -> None: + base = _make_tmdl_folder(tmp_path / "base") + head = _make_tmdl_folder( + tmp_path / "head", + relationships_text=_RELATIONSHIPS_TMDL + _NEW_REL_SNIPPET, + ) + result = diff_tmdl_folders(str(base), str(head)) + assert "Sales.RegionID -> Region.ID" in result["relationships"]["added"] + + def test_relationship_removed(self, tmp_path: Path) -> None: + base = _make_tmdl_folder(tmp_path / "base") + head = _make_tmdl_folder(tmp_path / "head", relationships_text=_TRIMMED_RELS) + result = diff_tmdl_folders(str(base), str(head)) + assert "Sales.CustomerID -> Customer.CustomerID" in result["relationships"]["removed"] + + def test_relationship_changed(self, tmp_path: Path) -> None: + changed_rels = _RELATIONSHIPS_TMDL.replace(_REL_222_BASE, _REL_222_CHANGED) + base = _make_tmdl_folder(tmp_path / "base") + head = _make_tmdl_folder(tmp_path / "head", relationships_text=changed_rels) + result = diff_tmdl_folders(str(base), str(head)) + assert "Sales.CustomerID -> Customer.CustomerID" in result["relationships"]["changed"] + + def test_model_property_changed(self, tmp_path: Path) -> None: + changed_model = _MODEL_TMDL.replace("culture: en-US", "culture: fr-FR") + base = _make_tmdl_folder(tmp_path / "base") + head = _make_tmdl_folder(tmp_path / "head", model_text=changed_model) + result = diff_tmdl_folders(str(base), str(head)) + assert result["summary"]["model_changed"] is True + assert any("culture" in p for p in result["model"]["changed_properties"]) + + def test_semantic_model_layout(self, tmp_path: Path) -> None: + """Handles the SemanticModel folder layout (definition/ subdirectory).""" + base = _make_semantic_model_folder(tmp_path / "MyModel.SemanticModel.base") + head = _make_semantic_model_folder(tmp_path / "MyModel.SemanticModel.head") + result = diff_tmdl_folders(str(base), str(head)) + assert result["changed"] is False + + def test_missing_base_folder_raises(self, tmp_path: Path) -> None: + head = _make_tmdl_folder(tmp_path / "head") + with pytest.raises(PbiCliError, match="Base folder not found"): + diff_tmdl_folders(str(tmp_path / "nonexistent"), str(head)) + + def test_missing_head_folder_raises(self, tmp_path: Path) -> None: + base = _make_tmdl_folder(tmp_path / "base") + with pytest.raises(PbiCliError, match="Head folder not found"): + diff_tmdl_folders(str(base), str(tmp_path / "nonexistent")) + + def test_result_keys_present(self, tmp_path: Path) -> None: + base = _make_tmdl_folder(tmp_path / "base") + head = _make_tmdl_folder(tmp_path / "head") + result = diff_tmdl_folders(str(base), str(head)) + assert "base" in result + assert "head" in result + assert "changed" in result + assert "summary" in result + assert "tables" in result + assert "relationships" in result + assert "model" in result + + def test_no_relationships_file(self, tmp_path: Path) -> None: + """Handles missing relationships.tmdl gracefully.""" + base = _make_tmdl_folder(tmp_path / "base", relationships_text="") + head = _make_tmdl_folder(tmp_path / "head", relationships_text="") + result = diff_tmdl_folders(str(base), str(head)) + assert result["relationships"] == {"added": [], "removed": [], "changed": []}