Skip to content

Commit

Permalink
Row has changed check, contains filter, column names sorting (#1333)
Browse files Browse the repository at this point in the history
* #907 add hasRowChanged.* methods on In-Mem/Virtual tables

* #884 add `contains` operator to filters

- also adds unit-tests in IgniteSqlFilterClauseTest.
- also adds `escapeSpecialChars` method to escape special characters with `LIKE`
  operator.
- adds small follow-up improvements to IgniteSqlQuery, introduces a
  factory method apply(sqlTemplate: String, args: Any*) for cleaner
  and flexible usage.

* #1308 sort column names by their indexes in sent table meta

* #884 add suggested improvements from review

---------

Co-authored-by: Malik, Junaid <junaid.malik@ubs.com>
  • Loading branch information
naleeha and junaidzm13 committed May 13, 2024
1 parent 1d571a1 commit 3c2bc32
Show file tree
Hide file tree
Showing 20 changed files with 401 additions and 104 deletions.
Expand Up @@ -57,20 +57,13 @@ class IgniteOrderDataProvider(final val igniteStore: IgniteOrderStore)

private def tableUpdater(table: VirtualizedSessionTable): (Int, Map[String, Any]) => Unit = {
val keyField = table.tableDef.keyField
def hasRowChangedAtIndex = getHasRowChanged(table)

(index, rowMap) => {
(idx, rowMap) => {
val newRow = RowWithData(rowMap(keyField).toString, rowMap)
if (hasRowChangedAtIndex(index, newRow)) table.processUpdateForIndex(index, newRow.key, newRow, clock.now())
if (table.hasRowChangedAtIndex(idx, newRow)) table.processUpdateForIndex(idx, newRow.key, newRow, clock.now())
}
}

private def getHasRowChanged(table: VirtualizedSessionTable) = (index: Int, newRow: RowWithData) => {
val existingKeyAtIndex = table.primaryKeys.get(index)
val existingRow = table.pullRow(existingKeyAtIndex)
!existingRow.equals(newRow)
}

override def subscribe(key: String): Unit = {}

override def doStart(): Unit = {}
Expand Down
Expand Up @@ -90,7 +90,7 @@ class IgniteOrderStoreTest extends AnyFunSuiteLike with BeforeAndAfterAll with B
parentOrder2 = GivenParentHasChildOrder(parentOrder2, 5)
parentOrder2 = GivenParentHasChildOrder(parentOrder2, 6)

val filterQuery = IgniteSqlQuery("parentId = ?", List(2))
val filterQuery = IgniteSqlQuery("parentId = ?", 2)
val childOrder = orderStore.findChildOrder(filterQuery, IgniteSqlQuery.empty, 2, 1).toList

assert(childOrder != null)
Expand All @@ -111,7 +111,7 @@ class IgniteOrderStoreTest extends AnyFunSuiteLike with BeforeAndAfterAll with B
parentOrder2 = GivenParentHasChildOrder(parentOrder2, 5, ric = "VOD.L")
parentOrder2 = GivenParentHasChildOrder(parentOrder2, 6, ric = "VOD.L")

val filterQuery = IgniteSqlQuery("ric = ?", List("VOD.L"))
val filterQuery = IgniteSqlQuery("ric = ?", "VOD.L")
val childOrder = orderStore.findChildOrder(filterQuery, IgniteSqlQuery.empty, 100, 0).toList

assert(childOrder != null)
Expand Down Expand Up @@ -176,7 +176,7 @@ class IgniteOrderStoreTest extends AnyFunSuiteLike with BeforeAndAfterAll with B
parentOrder2 = GivenParentHasChildOrder(parentOrder2, 5, ric = "VOD.L")
parentOrder2 = GivenParentHasChildOrder(parentOrder2, 6, ric = "BP.L")

val filterQuery = IgniteSqlQuery("ric = ?", List("VOD.L"))
val filterQuery = IgniteSqlQuery("ric = ?", "VOD.L")

val count = orderStore.getCount(filterQuery)

Expand Down
Expand Up @@ -5,8 +5,8 @@ import org.finos.vuu.feature.ignite.IgniteSqlQuery.QuerySeparator

object IgniteSqlQuery {
def apply(sqlTemplate: String): IgniteSqlQuery = new IgniteSqlQuery(sqlTemplate, List.empty)
def apply(): IgniteSqlQuery = IgniteSqlQuery("")
def empty: IgniteSqlQuery = IgniteSqlQuery()
def apply(sqlTemplate: String, args: Any*) = new IgniteSqlQuery(sqlTemplate, args.toList)
def empty: IgniteSqlQuery = IgniteSqlQuery("")

sealed abstract class QuerySeparator(val value: String)
object QuerySeparator {
Expand Down
Expand Up @@ -35,8 +35,7 @@ case class EqIgniteSqlFilterClause(columnName: String, value: String) extends Ig
}

private object EqIgniteSqlFilterClause {
def eqSqlQuery(field: String, value: Any): IgniteSqlQuery =
IgniteSqlQuery(s"$field = ?", List(value))
def eqSqlQuery(field: String, value: Any): IgniteSqlQuery = IgniteSqlQuery(s"$field = ?", value)
}

case class NeqIgniteSqlFilterClause(columnName: String, value: String) extends IgniteSqlFilterClause {
Expand All @@ -47,7 +46,7 @@ case class NeqIgniteSqlFilterClause(columnName: String, value: String) extends I
}
}

private def neqSql(field: String, value: Any) = IgniteSqlQuery(s"$field != ?", List(value))
private def neqSql(field: String, value: Any) = IgniteSqlQuery(s"$field != ?", value)
}

case class RangeIgniteSqlFilterClause(op: RangeOp)(columnName: String, value: String) extends IgniteSqlFilterClause {
Expand All @@ -58,37 +57,52 @@ case class RangeIgniteSqlFilterClause(op: RangeOp)(columnName: String, value: St
}
}

private def rangeSql(field: String, value: Any) = IgniteSqlQuery(s"$field ${op.value} ?", List(value))
private def rangeSql(field: String, value: Any) = IgniteSqlQuery(s"$field ${op.value} ?", value)

override def toString = s"RangeIgniteSqlFilterClause[$op]($columnName, $value)"
}

case class StartsIgniteSqlFilterClause(columnName: String, value: String) extends IgniteSqlFilterClause with StrictLogging {
override def toSql(schemaMapper: SchemaMapper): IgniteSqlQuery = {
FilterColumnValueParser(schemaMapper).parse(columnName, value) match {
case Right(ParsedResult(f, externalValue)) => startsSql(f, externalValue)
case Left(errMsg) => logErrorAndReturnEmptySql(errMsg)
}
}

private def startsSql(f: SchemaField, value: Any): IgniteSqlQuery = f.dataType match {
case STRING_DATA_TYPE => IgniteSqlQuery(s"${f.name} LIKE ?", List(s"$value%"))
case _ => logErrorAndReturnEmptySql(s"`Starts` clause unsupported for non-string column: `${f.name}` (${f.dataType})")
}
sealed abstract class RangeOp(val value: String)
object RangeOp {
final case object GT extends RangeOp(value = ">")
final case object GTE extends RangeOp(value = ">=")
final case object LT extends RangeOp(value = "<")
final case object LTE extends RangeOp(value = "<=")
}

case class EndsIgniteSqlFilterClause(columnName: String, value: String) extends IgniteSqlFilterClause with StrictLogging {
override def toSql(schemaMapper: SchemaMapper): IgniteSqlQuery = {
case class RegexIgniteSqlFilterClause(op: RegexOp)(columnName: String, value: String) extends IgniteSqlFilterClause with StrictLogging {

override def toSql(schemaMapper: SchemaMapper): IgniteSqlQuery =
FilterColumnValueParser(schemaMapper).parse(columnName, value) match {
case Right(ParsedResult(f, externalValue)) => endsSql(f, externalValue)
case Right(ParsedResult(f, externalValue)) => regexSql(f, externalValue)
case Left(errMsg) => logErrorAndReturnEmptySql(errMsg)
}

private def regexSql(f: SchemaField, value: Any): IgniteSqlQuery = f.dataType match {
case STRING_DATA_TYPE =>
val escapedValue = escapeSpecialChars(s"$value")
IgniteSqlQuery(s"${f.name} LIKE ? ESCAPE '\\'", op.apply(escapedValue))
case _ =>
logErrorAndReturnEmptySql(s"`$op` clause unsupported for non-string field: `${f.name}` (type: ${f.dataType})")
}

private def endsSql(f: SchemaField, value: Any): IgniteSqlQuery = f.dataType match {
case STRING_DATA_TYPE => IgniteSqlQuery(s"${f.name} LIKE ?", List(s"%$value"))
case _ => logErrorAndReturnEmptySql(s"`Ends` clause unsupported for non-string column: `${f.name}` (${f.dataType})")
private def escapeSpecialChars(value: String, escapeChar: String = "\\\\"): String = {
val specialCharsRegex = s"(?<specialChars>[%_])|(?<escapeChar>$escapeChar)".r
specialCharsRegex.replaceAllIn(value, m =>
if (m.group("specialChars") != null) s"$escapeChar$m"
else if (m.group("escapeChar") != null) escapeChar * 2
else throw new Exception(s"An unexpected error occurred: escaping $m is not supported.")
)
}

override def toString = s"RegexIgniteSqlFilterClause[$op]($columnName, $value)"
}

sealed abstract class RegexOp(val apply: String => String)
object RegexOp {
final case object Starts extends RegexOp(s => s"$s%")
final case object Ends extends RegexOp(s => s"%$s")
final case object Contains extends RegexOp(s => s"%$s%")
}

case class InIgniteSqlFilterClause(columnName: String, values: List[String]) extends IgniteSqlFilterClause with StrictLogging {
Expand All @@ -105,14 +119,6 @@ case class InIgniteSqlFilterClause(columnName: String, values: List[String]) ext
}
}

sealed abstract class RangeOp(val value: String)
object RangeOp {
final case object GT extends RangeOp(value = ">")
final case object GTE extends RangeOp(value = ">=")
final case object LT extends RangeOp(value = "<")
final case object LTE extends RangeOp(value = "<=")
}

private object joinNonEmptyQueries {
def apply(queries: List[IgniteSqlQuery], sep: QuerySeparator): IgniteSqlQuery = {
val joinedQuery = queries
Expand Down
Expand Up @@ -39,10 +39,13 @@ class IgniteSqlFilterTreeVisitor extends FilterBaseVisitor[IgniteSqlFilterClause
RangeIgniteSqlFilterClause(RangeOp.LTE)(ctx.ID().getText, ctx.NUMBER().getText)

override def visitOperationStarts(ctx: OperationStartsContext): IgniteSqlFilterClause =
StartsIgniteSqlFilterClause(ctx.ID().getText, ctx.STRING().getText)
RegexIgniteSqlFilterClause(RegexOp.Starts)(ctx.ID().getText, ctx.STRING().getText)

override def visitOperationEnds(ctx: OperationEndsContext): IgniteSqlFilterClause =
EndsIgniteSqlFilterClause(ctx.ID().getText, ctx.STRING().getText)
RegexIgniteSqlFilterClause(RegexOp.Ends)(ctx.ID().getText, ctx.STRING().getText)

override def visitOperationContains(ctx: OperationContainsContext): IgniteSqlFilterClause =
RegexIgniteSqlFilterClause(RegexOp.Contains)(ctx.ID().getText, ctx.STRING().getText)

override def visitOperationIn(ctx: OperationInContext): IgniteSqlFilterClause = {
InIgniteSqlFilterClause(ctx.ID().getText, FilterTreeVisitor.operationInValues(ctx))
Expand Down
Expand Up @@ -39,20 +39,29 @@ class IgniteSqlQueryTest extends AnyFeatureSpec with Matchers {
}

Feature("appendQuery") {
val query = IgniteSqlQuery("SELECT * FROM ?", List("TABLE_2"))
val query = IgniteSqlQuery("SELECT * FROM ?", "TABLE_2")

Scenario("should return new joined query with the specified separator") {
val query2 = IgniteSqlQuery("WHERE id = ?", List(23))
val query2 = IgniteSqlQuery("WHERE id = ?", 23)

val joinedQuery = query.appendQuery(query2, QuerySeparator.SPACE)

joinedQuery.sqlTemplate shouldEqual "SELECT * FROM ? WHERE id = ?"
joinedQuery.args shouldEqual List("TABLE_2", 23)
}

Scenario("should be able to append query with multiple args") {
val query2 = IgniteSqlQuery("WHERE id = ? OR id = ?", List(23, 25))

val joinedQuery = query.appendQuery(query2, QuerySeparator.SPACE)

joinedQuery.sqlTemplate shouldEqual "SELECT * FROM ? WHERE id = ? OR id = ?"
joinedQuery.args shouldEqual List("TABLE_2", 23, 25)
}
}

Feature("appendArgs") {
val query = IgniteSqlQuery("SELECT * FROM ?", List("TABLE2"))
val query = IgniteSqlQuery("SELECT * FROM ?", "TABLE2")

Scenario("should return new query with appended args") {
val args = List(1, "string", 'A')
Expand All @@ -77,15 +86,15 @@ class IgniteSqlQueryTest extends AnyFeatureSpec with Matchers {
}

Scenario("should return false when sqlTemplate is empty but args is not") {
val query = IgniteSqlQuery("", List("A"))
val query = IgniteSqlQuery("", "A")

query.isEmpty shouldBe false
}
}

Feature("toString") {
Scenario("should return expected string representation of the object") {
val query = IgniteSqlQuery("SELECT * FROM ?", List("TABLE2"))
val query = IgniteSqlQuery("SELECT * FROM ?", "TABLE2")

query.toString shouldEqual "IgniteSqlQuery(SELECT * FROM ?,List(TABLE2))"
}
Expand Down
@@ -1,7 +1,7 @@
package org.finos.vuu.feature.ignite

import com.typesafe.scalalogging.StrictLogging
import org.apache.ignite.cache.query.{IndexQuery, IndexQueryCriterion, SqlFieldsQuery}
import org.apache.ignite.cache.query.{IndexQuery, IndexQueryCriterion}
import org.apache.ignite.cache.{QueryEntity, QueryIndex, QueryIndexType}
import org.apache.ignite.{Ignite, IgniteCache, Ignition}
import org.apache.ignite.cluster.ClusterState
Expand Down
@@ -0,0 +1,145 @@
package org.finos.vuu.feature.ignite.filter

import org.finos.vuu.core.table.SimpleColumn
import org.finos.vuu.feature.ignite.IgniteSqlQuery
import org.finos.vuu.feature.ignite.filter.IgniteSqlFilterClauseTest.schemaMapperWithGivenFields
import org.finos.vuu.util.schema.{ExternalEntitySchema, SchemaField, SchemaMapperBuilder}
import org.scalatest.featurespec.AnyFeatureSpec
import org.scalatest.matchers.should.Matchers

class IgniteSqlFilterClauseTest extends AnyFeatureSpec with Matchers {

Feature(s"RegexIgniteSqlFilterClause[RegexOp.Starts]}") {
val schemaMapper = schemaMapperWithGivenFields(("tag", classOf[String]), ("age", classOf[Int]))

Scenario("should return correct query when simple value passed") {
val clause = RegexIgniteSqlFilterClause(RegexOp.Starts)("tag", "To")

val result = clause.toSql(schemaMapper)

result shouldEqual IgniteSqlQuery("tag LIKE ? ESCAPE '\\'", List("To%"))
}

Scenario("should return correct query with escaped special chars when value with special chars passed") {
val clause = RegexIgniteSqlFilterClause(RegexOp.Starts)("tag", "100%_off\\")

val result = clause.toSql(schemaMapper)

result shouldEqual IgniteSqlQuery("tag LIKE ? ESCAPE '\\'", List("100\\%\\_off\\\\%"))
}

Scenario("should return empty query when non-string column") {
val clause = RegexIgniteSqlFilterClause(RegexOp.Starts)("age", "15")

val result = clause.toSql(schemaMapper)

result shouldEqual IgniteSqlQuery.empty
}

Scenario("should return empty query when no mapped field found") {
val clause = RegexIgniteSqlFilterClause(RegexOp.Starts)("tagged", "To")

val result = clause.toSql(schemaMapper)

result shouldEqual IgniteSqlQuery.empty
}

Scenario("should return expected string representation") {
RegexIgniteSqlFilterClause(RegexOp.Starts)("tag", "abc").toString shouldEqual "RegexIgniteSqlFilterClause[Starts](tag, abc)"
}
}

Feature(s"RegexIgniteSqlFilterClause[RegexOp.Ends]}") {
val schemaMapper = schemaMapperWithGivenFields(("tag", classOf[String]), ("age", classOf[Int]))

Scenario("should return correct query when simple value passed") {
val clause = RegexIgniteSqlFilterClause(RegexOp.Ends)("tag", "To")

val result = clause.toSql(schemaMapper)

result shouldEqual IgniteSqlQuery("tag LIKE ? ESCAPE '\\'", List("%To"))
}

Scenario("should return correct query with escaped special chars when value with special chars passed") {
val clause = RegexIgniteSqlFilterClause(RegexOp.Ends)("tag", "100%_off\\")

val result = clause.toSql(schemaMapper)

result shouldEqual IgniteSqlQuery("tag LIKE ? ESCAPE '\\'", List("%100\\%\\_off\\\\"))
}

Scenario("should return empty query when non-string column") {
val clause = RegexIgniteSqlFilterClause(RegexOp.Ends)("age", "15")

val result = clause.toSql(schemaMapper)

result shouldEqual IgniteSqlQuery.empty
}

Scenario("should return empty query when no mapped field found") {
val clause = RegexIgniteSqlFilterClause(RegexOp.Ends)("tagged", "To")

val result = clause.toSql(schemaMapper)

result shouldEqual IgniteSqlQuery.empty
}

Scenario("should return expected string representation") {
RegexIgniteSqlFilterClause(RegexOp.Ends)("tag", "abc").toString shouldEqual "RegexIgniteSqlFilterClause[Ends](tag, abc)"
}
}

Feature(s"RegexIgniteSqlFilterClause[RegexOp.Contains]}") {
val schemaMapper = schemaMapperWithGivenFields(("tag", classOf[String]), ("age", classOf[Int]))

Scenario("should return correct query when simple value passed") {
val clause = RegexIgniteSqlFilterClause(RegexOp.Contains)("tag", "gold")

val result = clause.toSql(schemaMapper)

result shouldEqual IgniteSqlQuery("tag LIKE ? ESCAPE '\\'", List("%gold%"))
}

Scenario("should return correct query with escaped special chars when value with special chars passed") {
val clause = RegexIgniteSqlFilterClause(RegexOp.Contains)("tag", "\\100%_off\\")

val result = clause.toSql(schemaMapper)

result shouldEqual IgniteSqlQuery("tag LIKE ? ESCAPE '\\'", List("%\\\\100\\%\\_off\\\\%"))
}

Scenario("should return empty query when non-string column") {
val clause = RegexIgniteSqlFilterClause(RegexOp.Contains)("age", "15")

val result = clause.toSql(schemaMapper)

result shouldEqual IgniteSqlQuery.empty
}

Scenario("should return empty query when no mapped field found") {
val clause = RegexIgniteSqlFilterClause(RegexOp.Contains)("tagged", "To")

val result = clause.toSql(schemaMapper)

result shouldEqual IgniteSqlQuery.empty
}

Scenario("should return expected string representation") {
RegexIgniteSqlFilterClause(RegexOp.Contains)("tag", "abc").toString shouldEqual "RegexIgniteSqlFilterClause[Contains](tag, abc)"
}
}
}

private object IgniteSqlFilterClauseTest {
private def schemaMapperWithGivenFields(fields: (String, Class[_])*) = {
val fieldsAndColumns = fields.toList.zipWithIndex.map({case ((name, dataType), idx) =>
(SchemaField(name, dataType, idx), SimpleColumn(name, idx, dataType))
})

SchemaMapperBuilder(externalSchema(fieldsAndColumns.map(_._1)), fieldsAndColumns.map(_._2).toArray).build()
}

private def externalSchema(fields: List[SchemaField]) = TestSchema(fields)

private case class TestSchema(override val fields: List[SchemaField]) extends ExternalEntitySchema
}

0 comments on commit 3c2bc32

Please sign in to comment.