spark – Json

Loading JSON data with Spark SQL into a DataFrame

Spark SQL has built in support for reading in JSON files which contain a separate, self-contained JSON object per line. Multi-line JSON files are currently not compatible with Spark SQL.

// Create temp test.json file on DBFS
// Example JSON with one JSON object per line
dbutils.fs.put(
"/tmp/test.json",
"""{"string":"string1","int":1,"array":[1,2,3],"dict": {"key": "value1"}}
{"string":"string2","int":2,"array":[2,4,6],"dict": {"key": "value2"}}
{"string":"string3","int":3,"array":[3,6,9],"dict": {"key": "value3", "extra_key": "extra_value3"}}
""",
true)

val testJsonData = sqlContext.read.json("/tmp/test.json")
testJsonData.printSchema
testJsonData.count()
display(testJsonData.select("dict.key").collect())
display(testJsonData.filter(testJsonData("int") > 1))

// Convert DataFrame directly into an RDD
val testJsonDataRDD = testJsonData.rdd

// View first 3 rows of the RDD
testJsonDataRDD.take(3).foreach(println)

// View distinct values in the 'array' column
testJsonDataRDD.map(r => r(0).asInstanceOf[Seq[Long]]).flatMap(x => x)
.distinct().collect()

// Create a Spark SQL temp table
// Note that temp tables will not persist across cluster restarts
testJsonData.registerTempTable("test_json")

Saving to JSON files

You can save the results of your DataFrame queries into JSON files, if you wanted to save your data with a nested schema.

(testJsonData
 // Each partition will be saved into an individual file. By setting repartition(1), we will create 1 output file
 // Note: ensure that your output file fits on a single worker
 .repartition(1)
 .write
 .format("json")
 .option("header", "true")
 .save("/FileStore/testJsonOutput"))

display(dbutils.fs.ls("/FileStore/testJsonOutput"))

Best Practice Tip: Specify a schema for your JSON input data.

  • By specifying a schema, you can speed up your Spark job by cutting down the time Spark uses to infer the schema.
  • In addition, if you have a lot of keys that you don’t care about, you can filter for only the keys you need.
  • Finally, too many keys in your JSON data can trigger an OOM error on your Spark Driver when you infer the schema.

     

    The first step is to create a data point with your ideal schema from your data set.

    val jsonSchema = sqlContext.read.json(sc.parallelize(Array(“””{“string”:”string1″,”int”:1,”dict”: {“key”: “value1″}}”””)))
    jsonSchema: org.apache.spark.sql.DataFrame = [dict: struct<key:string>, int: bigint, string: string]
    jsonSchema.printSchema()
    root |– dict: struct (nullable = true) | |– key: string (nullable = true) |– int: long (nullable = true) |– string: string (nullable = true)

Now, we will load the JSON data and specify a schema using the inferred schema above.
You can specify the schema directory by following the upstream Spark SQL Guide. If there are too many missing values, Spark may not be able to accurately infer the schema therefore directly providing the schema is recommended.


>sqlContext.read.json(“/tmp/test.json”)

val testJsonDataWithoutExtraKey = sqlContext.read.format("json").schema(jsonSchema.schema).load("/tmp/test.json")
testJsonDataWithoutExtraKey: org.apache.spark.sql.DataFrame = [dict: struct<key:string>, int: bigint, string: string]

testJsonDataWithoutExtraKey.printSchema()
root |– dict: struct (nullable = true) | |– key: string (nullable = true) |– int: long (nullable = true) |– string: string (nullable = true)

By specifying the schema, we excluded the ‘array’ column and the ‘extra_key’ in the ‘dict’ column.

display(testJsonDataWithoutExtraKey)

Best Practice Tip: ETL your JSON data to parquet for faster query speeds.

With a very large data set, ETL-ing your data to parquet can speed up your query speeds. Adjust the number of partitions to write parquet files that are at least 128MB in size.

Best Practice Tip: Handle Corrupt records.


Let’s try creating a JSON file with a corrupt record.


dbutils.fs.put(

"/tmp/testBad.json",
"""{"string":"string1","int":1,"array":[1,2,3],"dict": {"key": "value1"}}
{"string":"string2","int":2,"array":[2,4,6],"dict": {"key": "value2"}}
{"string":"string3","int":3,"array":[3,6,9],"dict": {"key": "value3", "extra_key": "extra_value3"}}
[invalid JSON string]
""",
true)
val testJsonDataWithBadRecord = sqlContext.read.json("/tmp/testBad.json")
testJsonDataWithBadRecord: org.apache.spark.sql.DataFrame = [_corrupt_record: string, array: array<bigint>, dict: struct<extra_key:string,key:string>, int: bigint, string: string]

Bad json lines are denoted by a field called “_corrupt_record” which contains the bad line.


display(testJsonDataWithBadRecord.where("_corrupt_record is not null"))

To retrieve only valid lines, check for _corrupt_record is null.

Additionally, you can try to recover corrupt lines if that’s possible.


display(testJsonDataWithBadRecord.where("_corrupt_record is null"))


					

spark – Parquet Files in Scala

Parquet Files are a great format for storing large tables in SparkSQL.

  • Consider converting text files with a schema into parquet files for more efficient storage.
  • Parquet provides a lot of optimizations under the hood to speed up your queries.
  • Just call .write.parquet on a DataFrame to encode in into Parquet.
case class MyCaseClass(key: String, group: String, value: Int, someints: Seq[Int], somemap: Map[String, Int])
val dataframe = sc.parallelize(Array(MyCaseClass("a", "vowels", 1, Array(1), Map("a" -> 1)),
  MyCaseClass("b", "consonants", 2, Array(2, 2), Map("b" -> 2)),
  MyCaseClass("c", "consonants", 3, Array(3, 3, 3), Map("c" -> 3)),
  MyCaseClass("d", "consonants", 4, Array(4, 4, 4, 4), Map("d" -> 4)),
  MyCaseClass("e", "vowels", 5, Array(5, 5, 5, 5, 5), Map("e" -> 5)))
).toDF()
// now write it to disk
dataframe.write.parquet("/tmp/testScalaParquetFiles")


val rdd = sqlContext.read.parquet("/tmp/testScalaParquetFiles")  
rdd.registerTempTable("parquetTable1")
%sql CREATE TABLE parquetTable2
USING org.apache.spark.sql.parquet
OPTIONS (
  path "/tmp/testScalaParquetFiles"
)

“Some other Parquet-producing systems, in particular Impala, Hive, and older versions of Spark SQL, do not differentiate between binary data and strings when writing out the Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.”

Within scala, you can call the following to set the configuration:

sqlContext.setConf("spark.sql.parquet.binaryAsString", "true")

If using SQL, you can use the set key=value notation:

set spark.sql.parquet.binaryAsString=true

scala – append to list

How do I append to the list?

In scala to append into a list, use “:+” single value

scala questions

13)   What is function currying in Scala?

Currying is the technique of transforming a function that takes multiple arguments into a function that takes a single argument Many of the same techniques as language like Haskell and LISP are supported by Scala. Function currying is one of the least used and misunderstood one.

14)   What are implicit parameters in Scala?

Implicit parameter is the way that allows parameters of a method to be “found”.  It is similar to default parameters, but it has a different mechanism for finding the “default” value.  The implicit parameter is a parameter to method or constructor that is marked as implicit.  This means if a parameter value is not mentioned then the compiler will search for an “implicit” value defined within a scope.

15)   What is a closure in Scala?

A closure is a function whose return value depends on the value of the variables declared outside the function.

16)   What is Monad in Scala?

A monad is an object that wraps another object. You pass the Monad mini-programs, i.e functions, to perform the data manipulation of the underlying object, instead of manipulating the object directly.  Monad chooses how to apply the program to the underlying object.

17)   What is Scala anonymous function?

In a source code, anonymous functions are called ‘function literals’ and at run time, function literals are instantiated into objects called function values.  Scala provides a relatively easy syntax for defining anonymous functions.

18)   Explain ‘Scala higher order’ functions?

Scala allows the definition of higher order functions.  These are functions that take other functions as parameters, or whose result is a function.  In the following example, apply () function takes another function ‘f’ and a value ‘v’ and applies function to v.

Example:

When the above code is compiled and executed, it produces following result.

19)    What is the difference between var and value?

In scala, you can define a variable using either a, val or var keywords.  The difference between val and var is,  var is much like java declaration, but val is little different.  We cannot change the reference to point to another reference, once the variable is declared using val. The variable defined using var keywords are mutable and can be changed any number of times.

20)   What are option, some and none in scala?

‘Option’ is a Scala generic type that can either be ‘some’ generic value or none.  ‘Queue’ often uses it to represent primitives that may be null.