Loading JSON data with Spark SQL into a DataFrame
Spark SQL has built in support for reading in JSON files which contain a separate, self-contained JSON object per line. Multi-line JSON files are currently not compatible with Spark SQL.
// Create temp test.json file on DBFS // Example JSON with one JSON object per line dbutils.fs.put( "/tmp/test.json", """{"string":"string1","int":1,"array":[1,2,3],"dict": {"key": "value1"}} {"string":"string2","int":2,"array":[2,4,6],"dict": {"key": "value2"}} {"string":"string3","int":3,"array":[3,6,9],"dict": {"key": "value3", "extra_key": "extra_value3"}} """, true)
val testJsonData = sqlContext.read.json("/tmp/test.json")
testJsonData.printSchema
testJsonData.count()
display(testJsonData.select("dict.key").collect())
display(testJsonData.filter(testJsonData("int") > 1))
// Convert DataFrame directly into an RDD val testJsonDataRDD = testJsonData.rdd
// View first 3 rows of the RDD testJsonDataRDD.take(3).foreach(println)
// View distinct values in the 'array' column testJsonDataRDD.map(r => r(0).asInstanceOf[Seq[Long]]).flatMap(x => x) .distinct().collect()
// Create a Spark SQL temp table // Note that temp tables will not persist across cluster restarts testJsonData.registerTempTable("test_json")
Saving to JSON files
You can save the results of your DataFrame queries into JSON files, if you wanted to save your data with a nested schema.
(testJsonData // Each partition will be saved into an individual file. By setting repartition(1), we will create 1 output file // Note: ensure that your output file fits on a single worker .repartition(1) .write .format("json") .option("header", "true") .save("/FileStore/testJsonOutput"))
display(dbutils.fs.ls("/FileStore/testJsonOutput"))
Best Practice Tip: Specify a schema for your JSON input data.
- By specifying a schema, you can speed up your Spark job by cutting down the time Spark uses to infer the schema.
- In addition, if you have a lot of keys that you don’t care about, you can filter for only the keys you need.
- Finally, too many keys in your JSON data can trigger an OOM error on your Spark Driver when you infer the schema.
jsonSchema: org.apache.spark.sql.DataFrame = [dict: struct<key:string>, int: bigint, string: string]jsonSchema.printSchema()root |– dict: struct (nullable = true) | |– key: string (nullable = true) |– int: long (nullable = true) |– string: string (nullable = true)
val testJsonDataWithoutExtraKey = sqlContext.read.format("json").schema(jsonSchema.schema).load("/tmp/test.json")
display(testJsonDataWithoutExtraKey)
Best Practice Tip: ETL your JSON data to parquet for faster query speeds.
With a very large data set, ETL-ing your data to parquet can speed up your query speeds. Adjust the number of partitions to write parquet files that are at least 128MB in size.
"/tmp/testBad.json", """{"string":"string1","int":1,"array":[1,2,3],"dict": {"key": "value1"}} {"string":"string2","int":2,"array":[2,4,6],"dict": {"key": "value2"}} {"string":"string3","int":3,"array":[3,6,9],"dict": {"key": "value3", "extra_key": "extra_value3"}} [invalid JSON string] """, true)
val testJsonDataWithBadRecord = sqlContext.read.json("/tmp/testBad.json")
display(testJsonDataWithBadRecord.where("_corrupt_record is not null"))
display(testJsonDataWithBadRecord.where("_corrupt_record is null"))
The first step is to create a data point with your ideal schema from your data set.