Skip to content

Commit

Permalink
[SPARK-33513][BUILD] Upgrade to Scala 2.13.4 to improve exhaustivity
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR aims the followings.
1. Upgrade from Scala 2.13.3 to 2.13.4 for Apache Spark 3.1
2. Fix exhaustivity issues in both Scala 2.12/2.13 (Scala 2.13.4 requires this for compilation.)
3. Enforce the improved exhaustive check by using the existing Scala 2.13 GitHub Action compilation job.

### Why are the changes needed?

Scala 2.13.4 is a maintenance release for 2.13 line and improves JDK 15 support.
- https://github.com/scala/scala/releases/tag/v2.13.4

Also, it improves exhaustivity check.
- scala/scala#9140 (Check exhaustivity of pattern matches with "if" guards and custom extractors)
- scala/scala#9147 (Check all bindings exhaustively, e.g. tuples components)

### Does this PR introduce _any_ user-facing change?

Yep. Although it's a maintenance version change, it's a Scala version change.

### How was this patch tested?

Pass the CIs and do the manual testing.
- Scala 2.12 CI jobs(GitHub Action/Jenkins UT/Jenkins K8s IT) to check the validity of code change.
- Scala 2.13 Compilation job to check the compilation

Closes #30455 from dongjoon-hyun/SCALA_3.13.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
dongjoon-hyun committed Nov 24, 2020
1 parent 0592181 commit 3ce4ab5
Show file tree
Hide file tree
Showing 35 changed files with 77 additions and 23 deletions.
Expand Up @@ -169,7 +169,7 @@ private[spark] class StorageStatus(
.getOrElse((0L, 0L))
case _ if !level.useOffHeap =>
(_nonRddStorageInfo.onHeapUsage, _nonRddStorageInfo.diskUsage)
case _ if level.useOffHeap =>
case _ =>
(_nonRddStorageInfo.offHeapUsage, _nonRddStorageInfo.diskUsage)
}
val newMem = math.max(oldMem + changeInMem, 0L)
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Expand Up @@ -757,15 +757,15 @@ private[spark] object JsonProtocol {

def taskResourceRequestMapFromJson(json: JValue): Map[String, TaskResourceRequest] = {
val jsonFields = json.asInstanceOf[JObject].obj
jsonFields.map { case JField(k, v) =>
jsonFields.collect { case JField(k, v) =>
val req = taskResourceRequestFromJson(v)
(k, req)
}.toMap
}

def executorResourceRequestMapFromJson(json: JValue): Map[String, ExecutorResourceRequest] = {
val jsonFields = json.asInstanceOf[JObject].obj
jsonFields.map { case JField(k, v) =>
jsonFields.collect { case JField(k, v) =>
val req = executorResourceRequestFromJson(v)
(k, req)
}.toMap
Expand Down Expand Up @@ -1229,7 +1229,7 @@ private[spark] object JsonProtocol {

def resourcesMapFromJson(json: JValue): Map[String, ResourceInformation] = {
val jsonFields = json.asInstanceOf[JObject].obj
jsonFields.map { case JField(k, v) =>
jsonFields.collect { case JField(k, v) =>
val resourceInfo = ResourceInformation.parseJson(v)
(k, resourceInfo)
}.toMap
Expand All @@ -1241,7 +1241,7 @@ private[spark] object JsonProtocol {

def mapFromJson(json: JValue): Map[String, String] = {
val jsonFields = json.asInstanceOf[JObject].obj
jsonFields.map { case JField(k, JString(v)) => (k, v) }.toMap
jsonFields.collect { case JField(k, JString(v)) => (k, v) }.toMap
}

def propertiesFromJson(json: JValue): Properties = {
Expand Down
Expand Up @@ -302,6 +302,8 @@ private[spark] object BLAS extends Serializable {
j += 1
prevCol = col
}
case _ =>
throw new IllegalArgumentException(s"spr doesn't support vector type ${v.getClass}.")
}
}

Expand Down
Expand Up @@ -286,6 +286,7 @@ private[ml] object RFormulaParser extends RegexParsers {

private val pow: Parser[Term] = term ~ "^" ~ "^[1-9]\\d*".r ^^ {
case base ~ "^" ~ degree => power(base, degree.toInt)
case t => throw new IllegalArgumentException(s"Invalid term: $t")
} | term

private val interaction: Parser[Term] = pow * (":" ^^^ { interact _ })
Expand All @@ -298,7 +299,10 @@ private[ml] object RFormulaParser extends RegexParsers {
private val expr = (sum | term)

private val formula: Parser[ParsedRFormula] =
(label ~ "~" ~ expr) ^^ { case r ~ "~" ~ t => ParsedRFormula(r, t.asTerms.terms) }
(label ~ "~" ~ expr) ^^ {
case r ~ "~" ~ t => ParsedRFormula(r, t.asTerms.terms)
case t => throw new IllegalArgumentException(s"Invalid term: $t")
}

def parse(value: String): ParsedRFormula = parseAll(formula, value) match {
case Success(result, _) => result
Expand Down
Expand Up @@ -314,6 +314,8 @@ object StandardScalerModel extends MLReadable[StandardScalerModel] {
case SparseVector(size, indices, values) =>
val newValues = transformSparseWithScale(scale, indices, values.clone())
Vectors.sparse(size, indices, newValues)
case v =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}

case (false, false) =>
Expand Down
Expand Up @@ -74,6 +74,8 @@ private[ml] object JsonMatrixConverter {
("values" -> values.toSeq) ~
("isTransposed" -> isTransposed)
compact(render(jValue))
case _ =>
throw new IllegalArgumentException(s"Unknown matrix type ${m.getClass}.")
}
}
}
Expand Up @@ -57,6 +57,8 @@ private[ml] object JsonVectorConverter {
case DenseVector(values) =>
val jValue = ("type" -> 1) ~ ("values" -> values.toSeq)
compact(render(jValue))
case _ =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}
}
}
Expand Up @@ -45,6 +45,8 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] {
row.setNullAt(2)
row.update(3, UnsafeArrayData.fromPrimitiveArray(values))
row
case v =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}
}

Expand Down
Expand Up @@ -200,6 +200,9 @@ private[ml] class BlockHingeAggregator(
case sm: SparseMatrix if !fitIntercept =>
val gradSumVec = new DenseVector(gradientSumArray)
BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec)

case m =>
throw new IllegalArgumentException(s"Unknown matrix type ${m.getClass}.")
}

if (fitIntercept) gradientSumArray(numFeatures) += vec.values.sum
Expand Down
Expand Up @@ -504,6 +504,9 @@ private[ml] class BlockLogisticAggregator(
case sm: SparseMatrix if !fitIntercept =>
val gradSumVec = new DenseVector(gradientSumArray)
BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec)

case m =>
throw new IllegalArgumentException(s"Unknown matrix type ${m.getClass}.")
}

if (fitIntercept) gradientSumArray(numFeatures) += vec.values.sum
Expand Down
Expand Up @@ -192,6 +192,8 @@ private[spark] object Instrumentation {
case Failure(NonFatal(e)) =>
instr.logFailure(e)
throw e
case Failure(e) =>
throw e
case Success(result) =>
instr.logSuccess()
result
Expand Down
Expand Up @@ -167,6 +167,8 @@ class StandardScalerModel @Since("1.3.0") (
val newValues = NewStandardScalerModel
.transformSparseWithScale(localScale, indices, values.clone())
Vectors.sparse(size, indices, newValues)
case v =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}

case _ => vector
Expand Down
2 changes: 2 additions & 0 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala
Expand Up @@ -285,6 +285,8 @@ private[spark] object BLAS extends Serializable with Logging {
j += 1
prevCol = col
}
case _ =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}
}

Expand Down
Expand Up @@ -289,6 +289,8 @@ class VectorUDT extends UserDefinedType[Vector] {
row.setNullAt(2)
row.update(3, UnsafeArrayData.fromPrimitiveArray(values))
row
case v =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}
}

Expand Down
Expand Up @@ -145,6 +145,8 @@ class IndexedRowMatrix @Since("1.0.0") (
.map { case (values, blockColumn) =>
((blockRow.toInt, blockColumn), (rowInBlock.toInt, values.zipWithIndex))
}
case v =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}
}.groupByKey(GridPartitioner(numRowBlocks, numColBlocks, rows.getNumPartitions)).map {
case ((blockRow, blockColumn), itr) =>
Expand Down Expand Up @@ -187,6 +189,8 @@ class IndexedRowMatrix @Since("1.0.0") (
Iterator.tabulate(indices.length)(i => MatrixEntry(rowIndex, indices(i), values(i)))
case DenseVector(values) =>
Iterator.tabulate(values.length)(i => MatrixEntry(rowIndex, i, values(i)))
case v =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}
}
new CoordinateMatrix(entries, numRows(), numCols())
Expand Down
Expand Up @@ -748,6 +748,8 @@ class RowMatrix @Since("1.0.0") (
}
buf
}.flatten
case v =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}
}
}.reduceByKey(_ + _).map { case ((i, j), sim) =>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -3264,7 +3264,7 @@
<profile>
<id>scala-2.13</id>
<properties>
<scala.version>2.13.3</scala.version>
<scala.version>2.13.4</scala.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
Expand Down
Expand Up @@ -313,7 +313,6 @@ trait MesosSchedulerUtils extends Logging {
// offer has the required attribute and subsumes the required values for that attribute
case (name, requiredValues) =>
offerAttributes.get(name) match {
case None => false
case Some(_) if requiredValues.isEmpty => true // empty value matches presence
case Some(scalarValue: Value.Scalar) =>
// check if provided values is less than equal to the offered values
Expand All @@ -332,6 +331,7 @@ trait MesosSchedulerUtils extends Logging {
// check if the specified value is equal, if multiple values are specified
// we succeed if any of them match.
requiredValues.contains(textValue.getValue)
case _ => false
}
}
}
Expand Down
Expand Up @@ -178,7 +178,7 @@ class MesosFineGrainedSchedulerBackendSuite
val (execInfo, _) = backend.createExecutorInfo(
Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor")
assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true))
assert(execInfo.getContainer.getDocker.getForcePullImage)
val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
assert(portmaps.get(0).getHostPort.equals(80))
assert(portmaps.get(0).getContainerPort.equals(8080))
Expand Down
Expand Up @@ -94,7 +94,7 @@ private[this] object JsonPathParser extends RegexParsers {
case Success(result, _) =>
Some(result)

case NoSuccess(msg, next) =>
case _ =>
None
}
}
Expand Down
Expand Up @@ -322,7 +322,9 @@ case class Literal (value: Any, dataType: DataType) extends LeafExpression {
case (a: Array[Byte], b: Array[Byte]) => util.Arrays.equals(a, b)
case (a: ArrayBasedMapData, b: ArrayBasedMapData) =>
a.keyArray == b.keyArray && a.valueArray == b.valueArray
case (a, b) => a != null && a.equals(b)
case (a: Double, b: Double) if a.isNaN && b.isNaN => true
case (a: Float, b: Float) if a.isNaN && b.isNaN => true
case (a, b) => a != null && a == b
}
case _ => false
}
Expand Down
Expand Up @@ -981,7 +981,7 @@ case class MapObjects private(
(genValue: String) => s"$builder.add($genValue);",
s"$builder;"
)
case None =>
case _ =>
// array
(
s"""
Expand Down
Expand Up @@ -190,6 +190,9 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
}

case VALUE_TRUE | VALUE_FALSE => BooleanType

case _ =>
throw new SparkException("Malformed JSON")
}
}

Expand Down
Expand Up @@ -197,9 +197,9 @@ object StarSchemaDetection extends PredicateHelper with SQLConfHelper {
} else {
false
}
case None => false
case _ => false
}
case None => false
case _ => false
}
case _ => false
}
Expand Down Expand Up @@ -239,7 +239,7 @@ object StarSchemaDetection extends PredicateHelper with SQLConfHelper {
case Some(col) if t.outputSet.contains(col) =>
val stats = t.stats
stats.attributeStats.nonEmpty && stats.attributeStats.contains(col)
case None => false
case _ => false
}
case _ => false
}
Expand Down
Expand Up @@ -685,6 +685,7 @@ object FoldablePropagation extends Rule[LogicalPlan] {
case LeftOuter => newJoin.right.output
case RightOuter => newJoin.left.output
case FullOuter => newJoin.left.output ++ newJoin.right.output
case _ => Nil
})
val newFoldableMap = AttributeMap(foldableMap.baseMap.values.filterNot {
case (attr, _) => missDerivedAttrsSet.contains(attr)
Expand Down
Expand Up @@ -967,6 +967,8 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
(UsingJoin(baseJoinType, visitIdentifierList(c.identifierList)), None)
case Some(c) if c.booleanExpression != null =>
(baseJoinType, Option(expression(c.booleanExpression)))
case Some(c) =>
throw new ParseException(s"Unimplemented joinCriteria: $c", ctx)
case None if join.NATURAL != null =>
if (baseJoinType == Cross) {
throw new ParseException("NATURAL CROSS JOIN is not supported", ctx)
Expand Down
Expand Up @@ -362,7 +362,7 @@ case class Join(
left.constraints
case RightOuter =>
right.constraints
case FullOuter =>
case _ =>
ExpressionSet()
}
}
Expand Down
Expand Up @@ -120,7 +120,7 @@ class GenericArrayData(val array: Array[Any]) extends ArrayData {
if (!o2.isInstanceOf[Double] || ! java.lang.Double.isNaN(o2.asInstanceOf[Double])) {
return false
}
case _ => if (!o1.equals(o2)) {
case _ => if (o1.getClass != o2.getClass || o1 != o2) {
return false
}
}
Expand Down
Expand Up @@ -39,6 +39,7 @@ class ScanOperationSuite extends SparkFunSuite {
assert(projects(0) === colB)
assert(projects(1) === aliasR)
assert(filters.size === 1)
case _ => assert(false)
}
}

Expand All @@ -50,6 +51,7 @@ class ScanOperationSuite extends SparkFunSuite {
assert(projects(0) === colA)
assert(projects(1) === colB)
assert(filters.size === 1)
case _ => assert(false)
}
}

Expand All @@ -65,6 +67,7 @@ class ScanOperationSuite extends SparkFunSuite {
assert(projects.size === 2)
assert(projects(0) === colA)
assert(projects(1) === aliasId)
case _ => assert(false)
}
}

Expand All @@ -81,6 +84,7 @@ class ScanOperationSuite extends SparkFunSuite {
assert(projects(0) === colA)
assert(projects(1) === aliasR)
assert(filters.size === 1)
case _ => assert(false)
}
}

Expand All @@ -93,6 +97,7 @@ class ScanOperationSuite extends SparkFunSuite {
assert(projects(0) === colA)
assert(projects(1) === aliasR)
assert(filters.size === 1)
case _ => assert(false)
}
}

Expand Down
Expand Up @@ -45,7 +45,7 @@ class ArrayDataIndexedSeqSuite extends SparkFunSuite {
if (e != null) {
elementDt match {
// For Nan, etc.
case FloatType | DoubleType => assert(seq(i).equals(e))
case FloatType | DoubleType => assert(seq(i) == e)
case _ => assert(seq(i) === e)
}
} else {
Expand Down
Expand Up @@ -868,12 +868,12 @@ class SparkSqlAstBuilder extends AstBuilder {
// assert if directory is local when LOCAL keyword is mentioned
val scheme = Option(storage.locationUri.get.getScheme)
scheme match {
case None =>
case Some(pathScheme) if (!pathScheme.equals("file")) =>
throw new ParseException("LOCAL is supported only with file: scheme", ctx)
case _ =>
// force scheme to be file rather than fs.default.name
val loc = Some(UriBuilder.fromUri(CatalogUtils.stringToURI(path)).scheme("file").build())
storage = storage.copy(locationUri = loc)
case Some(pathScheme) if (!pathScheme.equals("file")) =>
throw new ParseException("LOCAL is supported only with file: scheme", ctx)
}
}

Expand Down
Expand Up @@ -91,7 +91,7 @@ trait BaseAggregateExec extends UnaryExecNode with AliasAwareOutputPartitioning
override def requiredChildDistribution: List[Distribution] = {
requiredChildDistributionExpressions match {
case Some(exprs) if exprs.isEmpty => AllTuples :: Nil
case Some(exprs) if exprs.nonEmpty => ClusteredDistribution(exprs) :: Nil
case Some(exprs) => ClusteredDistribution(exprs) :: Nil
case None => UnspecifiedDistribution :: Nil
}
}
Expand Down

0 comments on commit 3ce4ab5

Please sign in to comment.