Scenario and Symptoms
After an upgrade to DSE 5.1 or later, an existing job that was working in a DSE 5.0 environment can fail with the following error:
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: scala.None$ is not a valid external type for schema of date
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, clientId), StringType), true) AS clientId#53
+- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, clientId), StringType), true)
+- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, clientId), StringType)
+- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, clientId)
+- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
+- input[0, org.apache.spark.sql.Row, true]
Cause
As part of the upgrade from DSE 5.0 to 5.1 there is also a change to the Apache Spark™ component from version 1.6.x.x to Apache Spark™ 2.0.x.x
The error reported above occurs because Apache Spark 2.0 does not accept optional types with Row encoder.
SPARK-19056 (Row encoder should accept optional types)
https://issues.apache.org/jira/browse/SPARK-19056
import org.apache.spark.sql.types._ import org.apache.spark.sql.Row spark.createDataFrame( sc.parallelize(Seq(Row(None))), StructType(Seq(StructField("v", StringType, true))) ).first // Caused by: java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: scala.None$ is not a valid external type for schema of string
This error is expected behavior. Even though this error did not occur in earlier versions of DSE that included Spark 1.6.x, using optional types with Row encoder is not documented or supported.
Solution
For Spark 2.0 and later, use Dataset for typed operation and custom objects.
For example:
val ds = Seq(1 -> None, 2 -> Some("str")).toDS ds.toDF // schema: <_1: int, _2: string>