Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination BigQuery: Adapt to newer interface for Sync operations #38132

Merged
merged 2 commits into from May 22, 2024

Conversation

gisripa
Copy link
Contributor

@gisripa gisripa commented May 11, 2024

What

Adapting BigQuery to use #38107

Review guide

Deleted old code which is no longer activated or used under src/main/java.
Minor changes based on CDK signature changes in existing code which isn't deleted.
New code in src/main/kotlin

User Impact

Can this PR be safely reverted and rolled back?

  • YES 💚
  • NO ❌

Copy link

vercel bot commented May 11, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
airbyte-docs ✅ Ready (Inspect) Visit Preview 💬 Add feedback May 22, 2024 9:52pm

Copy link
Contributor Author

gisripa commented May 11, 2024

This stack of pull requests is managed by Graphite. Learn more about stacking.

Join @gisripa and the rest of your teammates on Graphite Graphite

@gisripa gisripa force-pushed the gireesh/05-10-Bigquery_cdk_signature_changes branch from dc2606b to f6903b7 Compare May 12, 2024 19:17
@gisripa gisripa force-pushed the gireesh/bq-refactor-new-intfs branch from cf2bbfc to 1a9f866 Compare May 12, 2024 19:17
@gisripa gisripa force-pushed the gireesh/05-10-Bigquery_cdk_signature_changes branch from f6903b7 to cf090a7 Compare May 12, 2024 22:50
@gisripa gisripa force-pushed the gireesh/bq-refactor-new-intfs branch from 1a9f866 to 3a389ad Compare May 12, 2024 22:50
@gisripa gisripa changed the base branch from gireesh/05-10-Bigquery_cdk_signature_changes to cdk-ops-refactor May 12, 2024 22:53
@gisripa gisripa force-pushed the gireesh/bq-refactor-new-intfs branch 3 times, most recently from 0460083 to 27931dc Compare May 12, 2024 23:36
@gisripa gisripa force-pushed the gireesh/bq-refactor-new-intfs branch from 27931dc to 96ad3be Compare May 13, 2024 03:13
@gisripa gisripa force-pushed the gireesh/bq-refactor-new-intfs branch from 96ad3be to b64a0d5 Compare May 13, 2024 03:27
@gisripa gisripa force-pushed the gireesh/bq-refactor-new-intfs branch from b64a0d5 to 58a377a Compare May 13, 2024 16:36
@gisripa gisripa force-pushed the gireesh/bq-refactor-new-intfs branch from 58a377a to 0ee722d Compare May 13, 2024 17:43
@gisripa gisripa force-pushed the gireesh/bq-refactor-new-intfs branch from 0ee722d to 0d6dcd2 Compare May 13, 2024 19:20
@gisripa gisripa force-pushed the gireesh/bq-refactor-new-intfs branch 2 times, most recently from 16b6c02 to 3488a4c Compare May 14, 2024 17:10
@octavia-squidington-iii octavia-squidington-iii added the CDK Connector Development Kit label May 14, 2024
@gisripa gisripa force-pushed the gireesh/bq-refactor-new-intfs branch from 3488a4c to 025c536 Compare May 14, 2024 17:14
job.waitFor();
LOGGER.info("Job finish {} with status {}", job, job.getStatus());
LOGGER.info("Waiting for Job {} to finish. Status: {}", job.getJobId(), job.getStatus());
// Default totalTimeout is 12 Hours, 30 minutes seems reasonable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

12h??? WTF?

final Job completedJob = job.waitFor(RetryOption.totalTimeout(Duration.ofMinutes(30)));
if (completedJob == null) {
// job no longer exists
LOGGER.warn("Job {} No longer exists", job.getJobId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does that mean when a job doesn't exist? Should we throw an exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure when this case occurs, (followed javadoc implementation suggestion). Earlier we aren't even using the returned completedJob value and their status.

@@ -98,7 +100,7 @@ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND)
FROM ${raw_table}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Re: line 97]

might be worth explaining why we do a substraction. I'm not sure myself, TBH

See this comment inline on Graphite.

@@ -112,7 +114,7 @@ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND)
SELECT MAX(_airbyte_extracted_at)
FROM ${raw_table}
"""))
.build()).iterateAll().iterator().next().get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is ugly as sin. Are you sure we didn't forget to iterate on the iterateAll iterator?

@@ -192,8 +194,8 @@ public void execute(final Sql sql) throws InterruptedException {
}

@Override
public List<DestinationInitialStatus<Impl>> gatherInitialState(List<StreamConfig> streamConfigs) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for removing this Impl class...


override fun check(config: JsonNode): AirbyteConnectionStatus? {
try {
val datasetId = getDatasetId(config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we should explicitely state the types of those objects.

* check.
*/
private fun checkGcsPermission(config: JsonNode): AirbyteConnectionStatus? {
val loadingMethod = config[bqConstants.LOADING_METHOD]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh this is ugly. BigQueryUtils.getLoadingMethod returns an enum, but here we get a jsonNode from the same field that's used by getLoadingMethod... Something to clean up and figure out I guess

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. Didn't take a pass at cleaning the configs in this PR because BigQuery is one hairy config extraction in BigQueryUtils.

val missingPermissions: MutableList<String> = ArrayList()

try {
val credentials = getServiceAccountCredentials(config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this all really should be in the CDK...

package io.airbyte.integrations.destination.bigquery

import com.codepoetics.protonpack.Indexed
import com.codepoetics.protonpack.StreamUtils
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want a dependency on some obscure package that hasn't been maintained in 3 years?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tried removing but it has some zip the stream method which we have to implement or use alternative (not difficult just ignored in this PR). Will take a pass later in the refreshes PR.

storage.testIamPermissions(bucketName, REQUIRED_PERMISSIONS)

missingPermissions.addAll(
StreamUtils.zipWithIndex(permissionsCheckStatusList.stream())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does permissionsCheckStatuslist.withIndex work here? This function is an example of a place where types are far from obvious and should be set explicitely.
I'd rather convert the returned list into a Map<Permission, IsEnabled> than going index-based: REQUIRED_PERMISSIONS.zip(permissionsCheckStatusList).toMap(). Then, missingPermissions becomes map.filterValues

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, see above comment about StreamUtils

)

log.error(e) { message.toString() }
throw ConfigErrorException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why aren't we reusing the message here?

* @param catalog
* - schema of the incoming messages.
*/
override fun getConsumer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stale comment?

// deserialize it. Otherwise, we will let the Google library find it in
// the environment during the client initialization.
if (serviceAccountKey.isTextual) {
// There are cases where we fail to deserialize the service account key. In these
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we ever try to deserialize the string ourselves?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the most ugly one where the serviceAccountKey was at one point text vs json. so trying to deduce which one it is.

Copy link
Contributor

it looks like we both converted from java to kotlin and added some logic in the same PR. Am I correct? Any chance we could split those 2?

@gisripa
Copy link
Contributor Author

gisripa commented May 22, 2024

it looks like we both converted from java to kotlin and added some logic in the same PR. Am I correct? Any chance we could split those 2?

No conversion from Kotlin, complete rewrite and delete of the old classes. Originally was in 2 stacks then folder because Ed was obsolete classes which are deleted down the stack anyway

@gisripa gisripa force-pushed the gireesh/bq-refactor-new-intfs branch from a5ec528 to 30e1e6a Compare May 22, 2024 21:40
@gisripa gisripa merged commit 5b7873a into master May 22, 2024
30 checks passed
@gisripa gisripa deleted the gireesh/bq-refactor-new-intfs branch May 22, 2024 22:34
@gisripa gisripa restored the gireesh/bq-refactor-new-intfs branch May 22, 2024 23:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation connectors/destination/bigquery
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants