Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(#5296) Support Parquet predicates/projections in tests #5309

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

clairemcginty
Copy link
Contributor

@clairemcginty clairemcginty commented Mar 19, 2024

WIP of Parquet projection/predicate support in JobTest.

Alternately, we could provide custom assertions similar to CoderAssertions, i.e.:

val record: AvroType = ???
record withPredicate(FilterApi.and(...)) withProjection(...schema...) should eq(...)

but I think that's overall harder for users to work with.

The downside of this approach is that I'll have to implement separately for ParquetAvroIO, ParquetTypeIO, ParquetExampleIO, and SmbIO (TypeIO/ExampleIO won't need projections support but they could support filtering).

any feedback on the different possible approaches here is welcome!

@@ -74,4 +74,28 @@ package object parquet {
private[parquet] def ofNullable(conf: Configuration): Configuration =
Option(conf).getOrElse(empty())
}

private[parquet] def inMemoryOutputFile(baos: ByteArrayOutputStream): OutputFile =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feels a bit hack-ish to have these in src/main, but I don't see a way around it :(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the alternative is to actually write a temp file for every record we roundtrip, which would allow us to use all built-in Parquet IOs

Copy link

codecov bot commented Mar 19, 2024

Codecov Report

Attention: Patch coverage is 86.36364% with 9 lines in your changes are missing coverage. Please review.

Project coverage is 61.23%. Comparing base (79a0ecb) to head (78dd162).

Files Patch % Lines
...potify/scio/testing/parquet/ParquetTestUtils.scala 85.71% 9 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #5309      +/-   ##
==========================================
+ Coverage   61.08%   61.23%   +0.15%     
==========================================
  Files         306      308       +2     
  Lines       10993    11059      +66     
  Branches      774      785      +11     
==========================================
+ Hits         6715     6772      +57     
- Misses       4278     4287       +9     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@clairemcginty clairemcginty marked this pull request as draft March 20, 2024 13:07
@kellen
Copy link
Contributor

kellen commented Mar 20, 2024

This kind of breaks the boundary between the read and the test mocks. We don't for example implement the filtering for the BQ storage API. What we could potentially instead do is recommend that users implement their filters separately, and if they need to test them do:

val allMocks: Seq[T] = ???
val filteredMocks = allMocks.parquetFilter(MyJob.MY_FILTER_API)
// ...
.input(SmbIO[K, T]("foo", _.getKey), filteredMocks)

@clairemcginty
Copy link
Contributor Author

This kind of breaks the boundary between the read and the test mocks. We don't for example implement the filtering for the BQ storage API. What we could potentially instead do is recommend that users implement their filters separately, and if they need to test them do:

val allMocks: Seq[T] = ???
val filteredMocks = allMocks.parquetFilter(MyJob.MY_FILTER_API)
// ...
.input(SmbIO[K, T]("foo", _.getKey), filteredMocks)

yeah I do see what you mean! It's tough because making it part of the TestIO itself is simplest thing from a usability perspective, but it does deviate from typical expectation of how JobTest works.

We could implement a parquetFilter type method, as you suggested, in the scio-test artifact, to keep it from being used in production (my initial concern) 🤔

@clairemcginty
Copy link
Contributor Author

This kind of breaks the boundary between the read and the test mocks. We don't for example implement the filtering for the BQ storage API. What we could potentially instead do is recommend that users implement their filters separately, and if they need to test them do:

val allMocks: Seq[T] = ???
val filteredMocks = allMocks.parquetFilter(MyJob.MY_FILTER_API)
// ...
.input(SmbIO[K, T]("foo", _.getKey), filteredMocks)

Re-implemented as a set of helpers in scio-test

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import scala.reflect.ClassTag

object ParquetHelpers {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love the names for any of these objects/classes, better ideas welcome

build.sbt Outdated
@@ -704,6 +704,14 @@ lazy val `scio-test` = project
"org.scalactic" %% "scalactic" % scalatestVersion,
"org.scalatest" %% "scalatest" % scalatestVersion,
"org.typelevel" %% "cats-kernel" % catsVersion,
// provided
"com.spotify" %% "magnolify-parquet" % magnolifyVersion % Provided,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming that anyone using these helpers already has compile-time Parquet dependencies... 🤷‍♀️

roundtripped
}

private def inMemoryOutputFile(baos: ByteArrayOutputStream): OutputFile = new OutputFile {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks a class would be more appropriate

Suggested change
private def inMemoryOutputFile(baos: ByteArrayOutputStream): OutputFile = new OutputFile {
private class InMemoryOutputFile(baos: ByteArrayOutputStream) extends OutputFile {

Comment on lines 47 to 52
.parquetFilter(
FilterApi.gt(FilterApi.intColumn("int_field"), 5.asInstanceOf[java.lang.Integer])
)
.parquetProject(
SchemaBuilder.record("TestRecord").fields().optionalInt("int_field").endRecord()
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% convinced on this syntax.
IMHO It would be nicer to develop custom scalatest matchers instead. WDYT ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree it's a bit clunky! Plus, I'm now working on the TFExample integration now, and discovering that the parquetFilter/parquetProject APIs don't work for this case as Examples require an explicit Schema to be passed, so it's getting messy

Custom matchers would be nice syntactically. Something like:

records withProjection(...) should ...
records withFilter(...) should ...

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only issues is that, IMO, many users will want to plug this directly into their JobTests, to verify that the projection won't generate a NPE in the scio job logic, for example. This would be a bit harder to do with the scalatest matcher approach

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is the primary use-case IMO; users want to generate unfiltered data and simulate the filter/projection being applied in the pipeline

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviving this thread! I guess a custom matcher could work if it supported Iterables, ie:

withPredicate(records, predicate) should  { filteredRecords => 
   JobTest[T]
       .input(ParquetAvroIO(path), filteredRecords)
}

but it is a bit awkward to use. I think the primary use case of these helpers is to simply make sure that the protection/predicate is compatible with the Scio job logic. Which actually points back to us supporting it natively in Parquet*IO in JobTest 😅😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this more... maybe it makes the most sense to implement this as a non-default Coder in scio-test-parquet, that can be constructed by the user with a projection/predicate:

def parquetTestCoder[T <: SpecificRecord](projection: Schema, predicate: Option[FilterPredicate]): Coder[T] = ...

That way, it could be declared in whatever scope the user needs it, either in JobTest:

implicit val parquetCoder: Coder[MyRecord] = parquetTestCoder[MyRecord](projection, Some(FilterApi.lt(...))

JobTest[MyJob.type]
  .input(ParquetAvroIO[MyRecord]("path"), records)
...

Or just ad-hoc:

implicit val parquetCoder: Coder[MyRecord] = parquetTestCoder[MyRecord](projection, Some(FilterApi.lt(...))

val record: MyRecord = ???

record coderShould ...

wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I briefly tried out this idea and realized it won't work well as a Coder that's applied per-element: if application of FilterPredicate filters out the record, Coder#decode will return null. Thus, the user code would have to be written to handle null records:

implicit val parquetCoder: Coder[MyRecord] = parquetTestCoder[MyRecord](projection, Some(FilterApi.lt(...))

sc
  .parquetAvroFile[T](path)
  .filter(_ != null)

which is a bad pattern to enforce. so I think this idea is out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to update the test ParquetIOs to accept projection and predicate if we want to enable this testing in JobTest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is getting too complicated--IMO, let not wire it into JobTest by default, but keep these as test helpers that play nicely with scalatest (I think the custom scalatest matcher route doesn't 100% work here because withProjection/withPredicate aren't strictly Matchers or Assertions). I refactored the API to work like:

records withFilter filter withProjection projection should have size(...)
records withFilter filter withProjection projection should containInAnyOrder(...)

It can also be used explicitly with JobTest, by just applying it to a test input.

@clairemcginty clairemcginty marked this pull request as ready for review May 30, 2024 18:44
@clairemcginty clairemcginty changed the title (WIP) (#5296) Support Parquet predicates/projections in tests (#5296) Support Parquet predicates/projections in tests May 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants