spark – Json

Loading JSON data with Spark SQL into a DataFrame

Spark SQL has built in support for reading in JSON files which contain a separate, self-contained JSON object per line. Multi-line JSON files are currently not compatible with Spark SQL.

// Create temp test.json file on DBFS
// Example JSON with one JSON object per line
dbutils.fs.put(
"/tmp/test.json",
"""{"string":"string1","int":1,"array":[1,2,3],"dict": {"key": "value1"}}
{"string":"string2","int":2,"array":[2,4,6],"dict": {"key": "value2"}}
{"string":"string3","int":3,"array":[3,6,9],"dict": {"key": "value3", "extra_key": "extra_value3"}}
""",
true)

val testJsonData = sqlContext.read.json("/tmp/test.json")
testJsonData.printSchema
testJsonData.count()
display(testJsonData.select("dict.key").collect())
display(testJsonData.filter(testJsonData("int") > 1))

// Convert DataFrame directly into an RDD
val testJsonDataRDD = testJsonData.rdd

// View first 3 rows of the RDD
testJsonDataRDD.take(3).foreach(println)

// View distinct values in the 'array' column
testJsonDataRDD.map(r => r(0).asInstanceOf[Seq[Long]]).flatMap(x => x)
.distinct().collect()

// Create a Spark SQL temp table
// Note that temp tables will not persist across cluster restarts
testJsonData.registerTempTable("test_json")

Saving to JSON files

You can save the results of your DataFrame queries into JSON files, if you wanted to save your data with a nested schema.

(testJsonData
 // Each partition will be saved into an individual file. By setting repartition(1), we will create 1 output file
 // Note: ensure that your output file fits on a single worker
 .repartition(1)
 .write
 .format("json")
 .option("header", "true")
 .save("/FileStore/testJsonOutput"))

display(dbutils.fs.ls("/FileStore/testJsonOutput"))

Best Practice Tip: Specify a schema for your JSON input data.

  • By specifying a schema, you can speed up your Spark job by cutting down the time Spark uses to infer the schema.
  • In addition, if you have a lot of keys that you don’t care about, you can filter for only the keys you need.
  • Finally, too many keys in your JSON data can trigger an OOM error on your Spark Driver when you infer the schema.

     

    The first step is to create a data point with your ideal schema from your data set.

    val jsonSchema = sqlContext.read.json(sc.parallelize(Array(“””{“string”:”string1″,”int”:1,”dict”: {“key”: “value1″}}”””)))
    jsonSchema: org.apache.spark.sql.DataFrame = [dict: struct<key:string>, int: bigint, string: string]
    jsonSchema.printSchema()
    root |– dict: struct (nullable = true) | |– key: string (nullable = true) |– int: long (nullable = true) |– string: string (nullable = true)

Now, we will load the JSON data and specify a schema using the inferred schema above.
You can specify the schema directory by following the upstream Spark SQL Guide. If there are too many missing values, Spark may not be able to accurately infer the schema therefore directly providing the schema is recommended.


>sqlContext.read.json(“/tmp/test.json”)

val testJsonDataWithoutExtraKey = sqlContext.read.format("json").schema(jsonSchema.schema).load("/tmp/test.json")
testJsonDataWithoutExtraKey: org.apache.spark.sql.DataFrame = [dict: struct<key:string>, int: bigint, string: string]

testJsonDataWithoutExtraKey.printSchema()
root |– dict: struct (nullable = true) | |– key: string (nullable = true) |– int: long (nullable = true) |– string: string (nullable = true)

By specifying the schema, we excluded the ‘array’ column and the ‘extra_key’ in the ‘dict’ column.

display(testJsonDataWithoutExtraKey)

Best Practice Tip: ETL your JSON data to parquet for faster query speeds.

With a very large data set, ETL-ing your data to parquet can speed up your query speeds. Adjust the number of partitions to write parquet files that are at least 128MB in size.

Best Practice Tip: Handle Corrupt records.


Let’s try creating a JSON file with a corrupt record.


dbutils.fs.put(

"/tmp/testBad.json",
"""{"string":"string1","int":1,"array":[1,2,3],"dict": {"key": "value1"}}
{"string":"string2","int":2,"array":[2,4,6],"dict": {"key": "value2"}}
{"string":"string3","int":3,"array":[3,6,9],"dict": {"key": "value3", "extra_key": "extra_value3"}}
[invalid JSON string]
""",
true)
val testJsonDataWithBadRecord = sqlContext.read.json("/tmp/testBad.json")
testJsonDataWithBadRecord: org.apache.spark.sql.DataFrame = [_corrupt_record: string, array: array<bigint>, dict: struct<extra_key:string,key:string>, int: bigint, string: string]

Bad json lines are denoted by a field called “_corrupt_record” which contains the bad line.


display(testJsonDataWithBadRecord.where("_corrupt_record is not null"))

To retrieve only valid lines, check for _corrupt_record is null.

Additionally, you can try to recover corrupt lines if that’s possible.


display(testJsonDataWithBadRecord.where("_corrupt_record is null"))