Skip to content

Commit

Permalink
feat(looker): allow customization of AssetKey through `DagsterLooke…
Browse files Browse the repository at this point in the history
…rTranslator` (#21835)

## Summary & Motivation
Introduce a similar pattern to `DagsterDbtTranslator` to allow the user
to customize asset keys.

Next up, we'll allow the user to customize group name, tags, etc.

## How I Tested These Changes
pytest
  • Loading branch information
rexledesma authored and alangenfeld committed May 14, 2024
1 parent 19b2d3d commit 4b07259
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 27 deletions.
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
Binary file modified docs/next/public/objects.inv
Binary file not shown.
2 changes: 2 additions & 0 deletions docs/sphinx/sections/api/apidocs/libraries/dagster-looker.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ Assets
======

.. autodecorator:: looker_assets

.. autoclass:: DagsterLookerTranslator
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dagster._core.libraries import DagsterLibraryRegistry

from .asset_decorator import looker_assets as looker_assets
from .dagster_looker_translator import DagsterLookerTranslator as DagsterLookerTranslator
from .version import __version__ as __version__

DagsterLibraryRegistry.register("dagster-looker", __version__)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pathlib import Path
from typing import Any, Callable
from typing import Any, Callable, Optional

from dagster import AssetsDefinition, multi_asset
from dagster._annotations import experimental
Expand All @@ -9,10 +9,15 @@
build_looker_explore_specs,
build_looker_view_specs,
)
from .dagster_looker_translator import DagsterLookerTranslator


@experimental
def looker_assets(*, project_dir: Path) -> Callable[[Callable[..., Any]], AssetsDefinition]:
def looker_assets(
*,
project_dir: Path,
dagster_looker_translator: Optional[DagsterLookerTranslator] = None,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
"""A decorator for defining Looker assets in a project.
Args:
Expand All @@ -28,11 +33,13 @@ def looker_assets(*, project_dir: Path) -> Callable[[Callable[..., Any]], Assets
@looker_assets(project_dir=Path("my_looker_project"))
def my_looker_project_assets(): ...
"""
dagster_looker_translator = dagster_looker_translator or DagsterLookerTranslator()

return multi_asset(
compute_kind="looker",
specs=[
*build_looker_dashboard_specs(project_dir),
*build_looker_explore_specs(project_dir),
*build_looker_view_specs(project_dir),
*build_looker_dashboard_specs(project_dir, dagster_looker_translator),
*build_looker_explore_specs(project_dir, dagster_looker_translator),
*build_looker_view_specs(project_dir, dagster_looker_translator),
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -10,49 +10,61 @@
from sqlglot import exp, parse_one, to_table
from sqlglot.optimizer import Scope, build_scope, optimize

from .dagster_looker_translator import DagsterLookerTranslator

logger = logging.getLogger("dagster_looker")


def build_looker_dashboard_specs(project_dir: Path) -> Sequence[AssetSpec]:
def build_looker_dashboard_specs(
project_dir: Path,
dagster_looker_translator: DagsterLookerTranslator,
) -> Sequence[AssetSpec]:
looker_dashboard_specs: List[AssetSpec] = []

# https://cloud.google.com/looker/docs/reference/param-lookml-dashboard
for dashboard_path in project_dir.rglob("*.dashboard.lookml"):
for lookml_dashboard in yaml.safe_load(dashboard_path.read_bytes()):
for lookml_dashboard_path in project_dir.rglob("*.dashboard.lookml"):
for lookml_dashboard_props in yaml.safe_load(lookml_dashboard_path.read_bytes()):
looker_dashboard_specs.extend(
AssetSpec(
key=AssetKey(["dashboard", lookml_dashboard["dashboard"]]),
deps={AssetKey(["explore", dashboard_element["explore"]])},
key=dagster_looker_translator.get_asset_key(
lookml_element=(lookml_dashboard_path, lookml_dashboard_props)
),
deps={AssetKey(["explore", lookml_dashboard_element_props["explore"]])},
)
for dashboard_element in itertools.chain(
for lookml_dashboard_element_props in itertools.chain(
# https://cloud.google.com/looker/docs/reference/param-lookml-dashboard#elements_2
lookml_dashboard.get("elements", []),
lookml_dashboard_props.get("elements", []),
# https://cloud.google.com/looker/docs/reference/param-lookml-dashboard#filters
lookml_dashboard.get("filters", []),
lookml_dashboard_props.get("filters", []),
)
if dashboard_element.get("explore")
if lookml_dashboard_element_props.get("explore")
)

return looker_dashboard_specs


def build_looker_explore_specs(project_dir: Path) -> Sequence[AssetSpec]:
def build_looker_explore_specs(
project_dir: Path,
dagster_looker_translator: DagsterLookerTranslator,
) -> Sequence[AssetSpec]:
looker_explore_specs: List[AssetSpec] = []

# https://cloud.google.com/looker/docs/reference/param-explore
for model_path in project_dir.rglob("*.model.lkml"):
for explore in lkml.load(model_path.read_text()).get("explores", []):
explore_asset_key = AssetKey(["explore", explore["name"]])

for lookml_model_path in project_dir.rglob("*.model.lkml"):
for lookml_explore_props in lkml.load(lookml_model_path.read_text()).get("explores", []):
# https://cloud.google.com/looker/docs/reference/param-explore-from
explore_base_view = [{"name": explore.get("from") or explore["name"]}]
explore_base_view = [
{"name": lookml_explore_props.get("from") or lookml_explore_props["name"]}
]

# https://cloud.google.com/looker/docs/reference/param-explore-join
explore_join_views: Sequence[Mapping[str, Any]] = explore.get("joins", [])
explore_join_views: Sequence[Mapping[str, Any]] = lookml_explore_props.get("joins", [])

looker_explore_specs.append(
AssetSpec(
key=explore_asset_key,
key=dagster_looker_translator.get_asset_key(
lookml_element=(lookml_model_path, lookml_explore_props)
),
deps={
AssetKey(["view", view["name"]])
for view in itertools.chain(explore_base_view, explore_join_views)
Expand Down Expand Up @@ -118,16 +130,23 @@ def parse_upstream_asset_keys_from_looker_view(
return {build_asset_key_from_sqlglot_table(table) for table in upstream_sqlglot_tables}


def build_looker_view_specs(project_dir: Path) -> Sequence[AssetSpec]:
def build_looker_view_specs(
project_dir: Path,
dagster_looker_translator: DagsterLookerTranslator,
) -> Sequence[AssetSpec]:
looker_view_specs: List[AssetSpec] = []

# https://cloud.google.com/looker/docs/reference/param-view
for looker_view_path in project_dir.rglob("*.view.lkml"):
for looker_view in lkml.load(looker_view_path.read_text()).get("views", []):
for lookml_view_path in project_dir.rglob("*.view.lkml"):
for lookml_view_props in lkml.load(lookml_view_path.read_text()).get("views", []):
looker_view_specs.append(
AssetSpec(
key=AssetKey(["view", looker_view["name"]]),
deps=parse_upstream_asset_keys_from_looker_view(looker_view, looker_view_path),
key=dagster_looker_translator.get_asset_key(
lookml_element=(lookml_view_path, lookml_view_props)
),
deps=parse_upstream_asset_keys_from_looker_view(
lookml_view_props, lookml_view_path
),
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from pathlib import Path
from typing import Any, Mapping, Tuple

from dagster import AssetKey
from dagster._annotations import experimental, public


@experimental
class DagsterLookerTranslator:
"""Holds a set of methods that derive Dagster asset definition metadata given a representation
of a LookML element (dashboards, explores, views).
This class is exposed so that methods can be overriden to customize how Dagster asset metadata
is derived.
"""

@public
def get_asset_key(self, lookml_element: Tuple[Path, Mapping[str, Any]]) -> AssetKey:
"""A method that takes in a dictionary representing a LookML element
(dashboards, explores, views) and returns the Dagster asset key that represents the element.
The LookML element is parsed using ``lkml``. You can learn more about this here:
https://lkml.readthedocs.io/en/latest/simple.html.
You can learn more about LookML dashboards and the properties available in this
dictionary here: https://cloud.google.com/looker/docs/reference/param-lookml-dashboard.
You can learn more about LookML explores and views and the properties available in this
dictionary here: https://cloud.google.com/looker/docs/reference/lookml-quick-reference.
This method can be overriden to provide a custom asset key for a LookML element.
Args:
lookml_element (Tuple[Path, Mapping[str, Any]]): A tuple with the path to file
defining a LookML element, and a dictionary representing a LookML element.
Returns:
AssetKey: The Dagster asset key that represents the LookML element.
"""
lookml_element_path, lookml_element_props = lookml_element

if lookml_element_path.suffixes == [".dashboard", ".lookml"]:
return AssetKey(["dashboard", lookml_element_props["dashboard"]])

if lookml_element_path.suffixes == [".view", ".lkml"]:
return AssetKey(["view", lookml_element_props["name"]])

if lookml_element_path.suffixes == [".model", ".lkml"]:
return AssetKey(["explore", lookml_element_props["name"]])

raise ValueError(f"Unsupported LookML element: {lookml_element_path}")
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from pathlib import Path
from typing import Any, Mapping, Tuple

import pytest
from dagster import AssetKey
from dagster_looker.asset_decorator import looker_assets
from dagster_looker.dagster_looker_translator import DagsterLookerTranslator

from .looker_projects import test_exception_derived_table_path, test_retail_demo_path

Expand Down Expand Up @@ -274,3 +278,22 @@ def my_looker_assets(): ...
" in file `exception_derived_table.view.lkml`."
" The upstream dependencies for the view will be omitted."
) in caplog.text


def test_with_asset_key_replacements() -> None:
class CustomDagsterLookerTranslator(DagsterLookerTranslator):
def get_asset_key(self, lookml_element: Tuple[Path, Mapping[str, Any]]) -> AssetKey:
return super().get_asset_key(lookml_element).with_prefix("prefix")

@looker_assets(
project_dir=test_retail_demo_path, dagster_looker_translator=CustomDagsterLookerTranslator()
)
def my_looker_assets(): ...

assert all(key.has_prefix(["prefix"]) for key in my_looker_assets.asset_deps.keys())
assert all(
dep.has_prefix(["prefix"])
for deps in my_looker_assets.asset_deps.values()
for dep in deps
if len(dep.path) > 1 and dep.path[1] in ["dashboard", "explore", "view"]
)

0 comments on commit 4b07259

Please sign in to comment.