Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to retrieve object from TupleFilesystemStoreBackend on Azure Databricks since yesterday #9713

Open
damczyk opened this issue Apr 5, 2024 · 1 comment

Comments

@damczyk
Copy link

damczyk commented Apr 5, 2024

Describe the bug
Since yesterday running the GX code
context.get_checkpoint(onboarding_checkpoint_name).run(run_name=onboarding_checkpoint_name, batch_request=batch_request)
leads to the error

InvalidKeyError: **Unable to retrieve object from TupleFilesystemStoreBackend** with the following Key: /dbfs/mnt/sdl/control-tower-ccu/DataQuality/GX/expectations/del/control_tower_ccu_datasources_godistributiondates.json
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/great_expectations/data_context/store/tuple_store_backend.py:320, in TupleFilesystemStoreBackend._get(self, key)
    319 try:
--> 320     with open(filepath) as infile:
    321         contents: str = infile.read().rstrip("\n")
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/great_expectations/data_context/store/tuple_store_backend.py:323, in TupleFilesystemStoreBackend._get(self, key)
    321         contents: str = infile.read().rstrip("\n")
    322 except FileNotFoundError:
--> 323     raise InvalidKeyError(
    324         f"Unable to retrieve object from TupleFilesystemStoreBackend with the following Key: {filepath!s}"
    325     )
    327 return contents

It worked yesterday until about 4 p.m. (CET)
Since then I get the error.
Since then GX tries to get the JSON for the expectation suite from a sub-directory /del/, which does not exist
/dbfs/mnt/sdl/control-tower-ccu/DataQuality/GX/expectations**/del/**control_tower_ccu_datasources_godistributiondates.json

The JSON control_tower_ccu_datasources_godistributiondates.json is stored in dir
/dbfs/mnt/sdl/control-tower-ccu/DataQuality/GX/expectations
(without /del/) and has always been the days before.

To Reproduce
Please include your great_expectations.yml config, the code you’re executing that causes the issue, and the full stack trace of any error(s).
I can only give you my context.json because I'm on Databricks using Ephemeral Data Context.
I started my Databricks cluster using Library
great_expectations[azure_secrets]==0.18.12 (and also on great_expectations[azure_secrets]==0.18.10)

I started a Databricks notebook with
import great_expectations as gx
besides other libraries and then I build my dataframe object with my data called df_Staging.
In one cell of my notebook all GX code is executed like this:

# Create or load great expectations context
    context = GX_Context(context_root_dir, context_connection_string).get_gx_context()

    # Create batch request
    batch_request = (context
        .sources
        .add_or_update_spark(name=data_source_name)
        .add_dataframe_asset(name=data_asset_name, dataframe=df_Staging)
        .build_batch_request()
    )

    # Run Onboarding checkpoint
    control_tower_ccu_datasources_block_checkpoint_result = context.get_checkpoint(onboarding_checkpoint_name).run(run_name=onboarding_checkpoint_name, batch_request=batch_request)

    # Check the validation result
    if control_tower_ccu_datasources_block_checkpoint_result.success:
        print("The validation succeeded")

Class GX_Context with method get_gx_context() looks like this:

import great_expectations as gx


class GX_Context:
    def __init__(self, root_dir, connection_string, filepath_prefix=""):
        self.root_dir = root_dir
        self.connection_string = connection_string
        self.filepath_prefix = filepath_prefix

        if not isinstance(self.root_dir, str) or not self.root_dir:
            raise ValueError("root_dir must be a non-empty string")
        if not isinstance(self.connection_string, str) or not self.connection_string:
            raise ValueError("connection_string must be a non-empty string")

    def get_gx_context(self) -> gx.data_context.EphemeralDataContext:
        project_config = gx.data_context.types.base.DataContextConfig(
            store_backend_defaults=gx.data_context.types.base.FilesystemStoreBackendDefaults(
                root_directory=self.root_dir
            ),
            data_docs_sites={
                "az_site": {
                    "class_name": "SiteBuilder",
                    "store_backend": {
                        "class_name": "TupleAzureBlobStoreBackend",
                        "container": r"\$web",
                        # "filepath_prefix": self.filepath_prefix,
                        "connection_string": self.connection_string,
                    },
                    "site_index_builder": {
                        "class_name": "DefaultSiteIndexBuilder",
                        "show_cta_footer": True,
                    },
                }
            },
        )

        context = gx.get_context(project_config=project_config)

        return context

Expected behavior
A clear and concise description of what you expected to happen.

Executing checkpoint
context.get_checkpoint(onboarding_checkpoint_name).run(run_name=onboarding_checkpoint_name, batch_request=batch_request)
reads the expectation suite from
/dbfs/mnt/sdl/control-tower-ccu/DataQuality/GX/expectations/control_tower_ccu_datasources_godistributiondates.json
as the days before and not from
/dbfs/mnt/sdl/control-tower-ccu/DataQuality/GX/expectations/**del/**control_tower_ccu_datasources_godistributiondates.json
and delivers a result instead of an error

Environment (please complete the following information):

  • Operating System: Databricks Cluster with runtime 14.3 LTS
  • Great Expectations Version: great_expectations[azure_secrets]==0.18.12 + great_expectations[azure_secrets]==0.18.10
  • Data Source: PySpark Dataframe
  • Cloud environment: Azure Databricks

Additional context
Add any other context about the problem here.

@damczyk
Copy link
Author

damczyk commented Apr 5, 2024

Before this error I used Data Assistant to create an expectation suite for
control_tower_ccu_datasources_godistributiondates.json

During the profiling process the code exited with an error.
Unfortunately I do not remember which error this was. But seemed to be a resource problem.

Now it seems that at some place GX still has a handle on file /dbfs/mnt/sdl/control-tower-ccu/DataQuality/GX/expectations/del/control_tower_ccu_datasources_godistributiondates.json

When I put a file named control_tower_ccu_datasources_godistributiondates.json in location /dbfs/mnt/sdl/control-tower-ccu/DataQuality/GX/expectations/del/ at least the other expectations in my other notebooks can be executed.
This works as a workaround.

So the problem is now reduced to the question how to get rid of the handle to /dbfs/mnt/sdl/control-tower-ccu/DataQuality/GX/expectations/del/control_tower_ccu_datasources_godistributiondates.json ?
Terminating and restarting the cluster also with different gx libs (from great_expectations[azure_secrets]==0.18.10 to great_expectations[azure_secrets]==0.18.12) does not have any effect.

Code for executing the Data Assistant to create the expectation suite was:

try:
    # Create or load great expectations context
    context = GX_Context(context_root_dir, context_connection_string).get_gx_context()

    # Create batch request
    batch_request = (context
        .sources
        .add_or_update_spark(name=data_source_name)
        .add_dataframe_asset(name=data_asset_name, dataframe=df_cleansing)
        .build_batch_request()
    )

    # Profiler
    # Run the default onboarding profiler on the batch request
    onboarding_data_assistant_result = (context
        .assistants
        .onboarding
        .run(
            batch_request=batch_request,
            exclude_column_names=[],
            estimation="exact"
        )
    )

    # Get the expectation suite from the onboarding result
    onboarding_suite = (onboarding_data_assistant_result
        .get_expectation_suite(
            expectation_suite_name=onboarding_suite_name
        )
    )

    # Perist expectation suite with the specified suite name from above
    context.add_or_update_expectation_suite(expectation_suite=onboarding_suite)

    # Create and persist checkpoint to reuse for multiple batches
    context.add_or_update_checkpoint(
        name=onboarding_checkpoint_name,
        batch_request=batch_request,
        expectation_suite_name=onboarding_suite_name,
    )

    # Run Onboarding checkpoint
    control_tower_ccu_datasources_block_checkpoint_result = context.get_checkpoint(onboarding_checkpoint_name).run(run_name=onboarding_checkpoint_name)

    # Check the validation result
    if control_tower_ccu_datasources_block_checkpoint_result.success:
        print("The validation succeeded")
    else:
        dbutils.notebook.exit("The validation failed : " + control_tower_ccu_datasources_block_checkpoint_result["run_results"][list(control_tower_ccu_datasources_block_checkpoint_result["run_results"].keys())[0]]["actions_results"]["update_data_docs"]["az_site"])

except Exception as exception:
    handle_exception(exception, dbutils.notebook.entry_point.getDbutils().notebook().getContext())
    raise exception

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants