Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance saveAsCustomOutput and provide access to POutput #4995

Open
mkuthan opened this issue Sep 9, 2023 · 4 comments
Open

Enhance saveAsCustomOutput and provide access to POutput #4995

mkuthan opened this issue Sep 9, 2023 · 4 comments
Labels
enhancement New feature or request io
Milestone

Comments

@mkuthan
Copy link
Contributor

mkuthan commented Sep 9, 2023

Function saveAsCustomOutput is an excellent extension point for implementing my own IO. It gives me also an abstraction for testing the whole pipeline using JobTest. I would like thanks all Scio authors for that functionality.

But for IOs like BigQuery, POutput is not a terminal step, it delivers information about errors. The API for accessing such errors is very different for each writing to BigQuery method.

  • getFailedInserts or getFailedInsertsWithErr for Streaming Inserts
  • getFailedStorageApiInserts for Storage Write API
  • for Batch Loads there is no such method currently

You can also access successful rows for Streaming Inserts, Storage Write API and Batch Loads with another 3 methods.

I know that I could use internal.apply(name, io) method and extract errors like this:

val io = BigQueryIO.writeTableRows().to(tableId)

val errors = scollection.transform(name) { in =>
  val results = in.internal.apply("Write to BQ", io)

  scollection.context.wrap(results.getFailedStorageApiInserts)
    .map(failedRow => (failedRow.getRow, failedRow.getErrorMessage))
}
errors

But I can't use output(CustomIO[Out](id)) { results => ...} from JobTest anymore. I could hack this limitation with TransformOverride but it is not so easy to write assertion then:

transformOverride(TransformOverride.ofIter[In, Out](
        id,
        (r: In) =>
          // how to make an assertion here?
          Option.empty[Out].toList
      ))

I would love to see the following enhancement for saveAsCustomOutput. This is only a short showcase of the overall idea, but I'm glad to hear from you better API or implementation:

def saveAsCustomOutput[O <: POutput](
    name: String,
    transform: PTransform[PCollection[T], O]
)(outputFn: O => Unit): ClosedTap[Nothing] = {
  if (self.context.isTest) {
    TestDataManager.getOutput(self.context.testId.get)(CustomIO[T](name))(self)
  } else {
    val pOutput = self.internal.apply(name, transform)
    outputFn(pOutput)
  }

  ClosedTap[Nothing](EmptyTap)
}

With such extension I'm able to do anything I need with the writeResult, for example:

val io = BigQueryIO.writeTableRows().to(tableId)
var errors = scol.context.empty[(TableRow, String)] // I don't like var but didn't find better way to access output from the closure below

scol.saveAsCustomOutput(id, io) { writeResult =>
    errors = self.context.wrap(writeResult.getFailedStorageApiInserts)
      .map(failedRow => (failedRow.getRow, failedRow.getErrorMessage))
  }

errors

What do you think about such extension? Looks very generic and should handle all kinds of use cases when POutput delivers something valuable.

@RustedBones RustedBones added enhancement New feature or request io labels Sep 11, 2023
@mkuthan
Copy link
Contributor Author

mkuthan commented Sep 13, 2023

One more experiment with an even more generic API that allows encapsulating all steps of the custom IO as a single Scala friendly transform.

With custom output like this, I could provide a composite transform that converts domain objects into underlying storage format (for example JSON bytes) and then save bytes in the database using the Beam IO connector.
In the job test I will be able to use domain object T because the whole transform will be replaced by the stub. As a bonus, transformFn has full access to the POutput to handle errors.

val self: SCollection[T] ...

def betterSaveAsCustomOutput[O <: POutput](name: String)(transformFn: SCollection[T] => O): ClosedTap[Nothing] = {
  if (self.context.isTest) {
    TestDataManager.getOutput(self.context.testId.get)(CustomIO[T](name))(self)
  } else {
    self.applyInternal(
      name,
      new PTransform[PCollection[T], O]() {
        override def expand(input: PCollection[T]): O =
          transformFn(self.context.wrap(input))
      }
    )
  }

  ClosedTap[Nothing](EmptyTap)
}

For the reading part I could do the same. Provide the composite transform that reads bytes from the database using Beam IO connector, deserializes JSON, and returns domain objects. Everything as a single transform, easy to test at the job level. In the test I only need to prepare domain objects as input. The bytes representation from the Beam IO connector is fully encapsulated in the composite transform.

val self: ScioContent = ...

def betterCustomInput[T, I >: PBegin <: PInput](name: String)(transformFn: I => SCollection[T]): SCollection[T] =
  self.requireNotClosed {
    if (self.isTest) {
      TestDataManager.getInput(self.testId.get)(CustomIO[T](name)).toSCollection(self)
    } else {
      self.applyTransform(
        name,
        new PTransform[I, PCollection[T]]() {
          override def expand(input: I): PCollection[T] =
            transformFn(input).internal
        }
      )
    }
  }
}

Alternatively I could put my composite transform into plain Beam PTransform and use existing customInput or saveAsCustomOutput. But I would prefer to use the Scio API in my code :)

@RustedBones
Copy link
Contributor

Thanks for the feature request @mkuthan. Will look at this during our preparation of 0.14.
We should also change the BQ to probably expose the handleErrors with the raw WriteResult

@RustedBones
Copy link
Contributor

@mkuthan just a status update on this issue.

We've opted for another strategy that avoids passing a pipeline transforms function to the IO. We prefered to expose possible SCollection write result as side output. This way, we can keep a 'flat' pipeline definition.

On 0.14, the testing framework will mock those as empty, but we plan to let users set custom values in the future.

I hope this setup fits with your needs. Let us know otherwise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request io
Projects
None yet
Development

No branches or pull requests

2 participants