New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[MAINTENANCE] Remove legacy DataConnectors #9923
Conversation
✅ Deploy Preview for niobium-lead-7998 canceled.
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #9923 +/- ##
===========================================
+ Coverage 77.33% 77.79% +0.46%
===========================================
Files 490 456 -34
Lines 41659 39630 -2029
===========================================
- Hits 32218 30832 -1386
+ Misses 9441 8798 -643
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
f36ac0f
to
4c65cc2
Compare
warnings.warn( | ||
"In order to access blobs with an InferredAssetGCSDataConnector, \ | ||
or enabling recursive file discovery with a Fluent datasource, \ | ||
the delimiter that has been passed to gcs_options in your config must be empty; \ | ||
please note that the value is being set to None in order to work with the Google SDK." | ||
) | ||
query_options["delimiter"] = None | ||
|
||
keys: List[str] = [] | ||
for blob in gcs_client.list_blobs(**query_options): | ||
name: str = blob.name | ||
if name.endswith("/"): # GCS includes directories in blob output | ||
continue | ||
|
||
keys.append(name) | ||
|
||
return keys | ||
|
||
|
||
def list_s3_keys( # noqa: C901 - too complex | ||
s3, query_options: dict, iterator_dict: dict, recursive: bool = False | ||
) -> Generator[str, None, None]: | ||
""" | ||
For InferredAssetS3DataConnector, we take bucket and prefix and search for files using RegEx at and below the level | ||
specified by that bucket and prefix. However, for ConfiguredAssetS3DataConnector, we take bucket and prefix and | ||
search for files using RegEx only at the level specified by that bucket and prefix. This restriction for the | ||
ConfiguredAssetS3DataConnector is needed, because paths on S3 are comprised not only the leaf file name but the | ||
full path that includes both the prefix and the file name. Otherwise, in the situations where multiple data assets | ||
share levels of a directory tree, matching files to data assets will not be possible, due to the path ambiguity. | ||
:param s3: s3 client connection | ||
:param query_options: s3 query attributes ("Bucket", "Prefix", "Delimiter", "MaxKeys") | ||
:param iterator_dict: dictionary to manage "NextContinuationToken" (if "IsTruncated" is returned from S3) | ||
:param recursive: True for InferredAssetS3DataConnector and False for ConfiguredAssetS3DataConnector (see above) | ||
:return: string valued key representing file path on S3 (full prefix and leaf file name) | ||
""" # noqa: E501 | ||
if iterator_dict is None: | ||
iterator_dict = {} | ||
|
||
if "continuation_token" in iterator_dict: | ||
query_options.update({"ContinuationToken": iterator_dict["continuation_token"]}) | ||
|
||
logger.debug(f"Fetching objects from S3 with query options: {query_options}") | ||
|
||
s3_objects_info: dict = s3.list_objects_v2(**query_options) | ||
|
||
if not any(key in s3_objects_info for key in ["Contents", "CommonPrefixes"]): | ||
raise ValueError("S3 query may not have been configured correctly.") # noqa: TRY003 | ||
|
||
if "Contents" in s3_objects_info: | ||
keys: List[str] = [item["Key"] for item in s3_objects_info["Contents"] if item["Size"] > 0] | ||
yield from keys | ||
|
||
if recursive and "CommonPrefixes" in s3_objects_info: | ||
common_prefixes: List[Dict[str, Any]] = s3_objects_info["CommonPrefixes"] | ||
for prefix_info in common_prefixes: | ||
query_options_tmp: dict = copy.deepcopy(query_options) | ||
query_options_tmp.update({"Prefix": prefix_info["Prefix"]}) | ||
# Recursively fetch from updated prefix | ||
yield from list_s3_keys( | ||
s3=s3, | ||
query_options=query_options_tmp, | ||
iterator_dict={}, | ||
recursive=recursive, | ||
) | ||
|
||
if s3_objects_info["IsTruncated"]: | ||
iterator_dict["continuation_token"] = s3_objects_info["NextContinuationToken"] | ||
# Recursively fetch more | ||
yield from list_s3_keys( | ||
s3=s3, | ||
query_options=query_options, | ||
iterator_dict=iterator_dict, | ||
recursive=recursive, | ||
) | ||
|
||
if "continuation_token" in iterator_dict: | ||
# Make sure we clear the token once we've gotten fully through | ||
del iterator_dict["continuation_token"] | ||
|
||
|
||
# TODO: <Alex>We need to move sorters and _validate_sorters_configuration() to DataConnector</Alex> | ||
# As a rule, this method should not be in "util", but in the specific high-level "DataConnector" class, where it is # noqa: E501 | ||
# called (and declared as private in that class). Currently, this is "FilePathDataConnector". However, since this # noqa: E501 | ||
# method is also used in tests, it can remain in the present "util" module (as an exception to the above stated rule). # noqa: E501 | ||
def build_sorters_from_config(config_list: List[Dict[str, Any]]) -> Optional[dict]: | ||
sorter_dict: Dict[str, Sorter] = {} | ||
if config_list is not None: | ||
for sorter_config in config_list: | ||
# if sorters were not configured | ||
if sorter_config is None: | ||
return None | ||
|
||
if "name" not in sorter_config: | ||
raise ValueError("Sorter config should have a name") # noqa: TRY003 | ||
|
||
sorter_name: str = sorter_config["name"] | ||
new_sorter: Sorter = _build_sorter_from_config(sorter_config=sorter_config) | ||
sorter_dict[sorter_name] = new_sorter | ||
|
||
return sorter_dict |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if these utils were in use, the were migrated to the fluent dataconnector module where they are used
if batch_definition is None: | ||
batch_definition = ( | ||
my_data_connector.get_batch_definition_list_from_batch_request( | ||
batch_request=BatchRequest( | ||
datasource_name="my_test_datasource", | ||
data_connector_name="my_sql_data_connector", | ||
data_asset_name="my_asset", | ||
) | ||
) | ||
)[0] | ||
# maintain legacy behavior - standup a dummy LegacyBatchDefinition | ||
batch_definition = LegacyBatchDefinition( | ||
datasource_name="my_test_datasource", | ||
data_connector_name="my_sql_data_connector", | ||
data_asset_name="my_asset", | ||
batch_identifiers=IDDict(), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, this function used the ConfiguredAssetSqlDataConnector
to build a LegacyBatchDefinition
. After running through a bunch of code, it appears that the batch definition never has batch_identifiers
, and all the other fields are dummy fields not corresponding to real data. The solution is to just stand up the LegacyBatchDefinition
, and tests appear to be satisfied.
config_defaults={"module_name": "great_expectations.datasource.data_connector"}, | ||
) | ||
introspection_output = data_connector._introspect_db() | ||
introspection_output = introspect_db(execution_engine=execution_engine) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code previously used an InferredAssetSqlDataConnector
to introspect the database and obtain table names for the test teardown process. data_connector._introspect_db()
had no dependencies on the data connector class outside of the SqlAlchemyExecutionEngine
, so I refactored that method to a stand alone function which requires the execution engine as an argument.
while doing this work, I discovered that |
invoke lint
(usesruff format
+ruff check
)For more information about contributing, see Contribute.
After you submit your PR, keep the page open and monitor the statuses of the various checks made by our continuous integration process at the bottom of the page. Please fix any issues that come up and reach out on Slack if you need help. Thanks for contributing!