前言
It has been a long time.
最近有需求读取并测试parquet格式的文件。目前hive、impala、spark等框架均支持parquet。
本文是采用scala接口的spark进行简单的hello world。
进入spark-shell环境
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | [root@hadoop01 ~]# su - spark [spark@hadoop01 ~]$ spark- spark-class   spark-shell   spark-sql     spark-submit   [spark@hadoop01 ~]$ spark-shell  15/10/10 10:24:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/10/10 10:24:12 INFO SecurityManager: Changing view acls to: spark 15/10/10 10:24:12 INFO SecurityManager: Changing modify acls to: spark 15/10/10 10:24:12 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark); users with modify permissions: Set(spark) 15/10/10 10:24:12 INFO HttpServer: Starting HTTP Server 15/10/10 10:24:12 INFO Server: jetty-8.y.z-SNAPSHOT 15/10/10 10:24:12 INFO AbstractConnector: Started SocketConnector@0.0.0.0:48566 15/10/10 10:24:12 INFO Utils: Successfully started service 'HTTP class server' on port 48566. Welcome to       ____              __      / __/__  ___ _____/ /__     _\ \/ _ \/ _ `/ __/  '_/    /___/ .__/\_,_/_/ /_/\_\   version 1.3.1       /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) Type in expressions to have them evaluated. Type :help for more information. 15/10/10 10:24:17 INFO SparkContext: Running Spark version 1.3.1 <strong>[…A lot of spark log output…]</strong> 15/10/10 10:24:28 INFO SparkILoop: Created spark context.. Spark context available as sc. 15/10/10 10:24:29 INFO SparkILoop: Created sql context (with Hive support).. SQL context available as sqlContext. scala> | 
注意:
- 输出可以看到这里用的是spark1.3.1
- spark-shell自动为你注册了Spark context,该对象名字为:sc。后面直接使用sc对象进行设置。
- 看到scala提示符,就意味着可以进行编程测试了。
注册SQLContext并进行配置
| 1 2 3 4 5 6 | scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@1530f74e scala> sqlContext.setConf("spark.sql.parquet.binaryAsString","true") scala> | 
步骤:
- 使用sc(Spark context)对象注册SQLContext
- 使用setConf对SQLContext进行配置
- 可配置的参数见:http://spark.apache.org/docs/1.3.1/sql-programming-guide.html#parquet-files
- 可配置的参数在1.5.1的最新版本中增加了很多
导入parquet文件
导入文件:
| 1 2 3 4 5 6 | scala> val parquetFile = sqlContext.parquetFile("/tmp/me.parquet") 15/10/10 10:47:12 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. parquetFile: org.apache.spark.sql.DataFrame = [CONTRACTID: string, TDATETIME: string, CONTRACTNAME: string, LASTPX: double, HIGHPX: double, LOWPX: double, CQ: double, TQ: double, LASTQTY: double, INITOPENINTS: double, OPENINTS: double, INTSCHG: double, TURNOVER: double, RISELIMIT: double, FALLLIMIT: double, PRESETTLE: double, PRECLOSE: double, OPENPX: double, CLOSEPX: double, SETTLEMENTPX: double, LIFELOW: double, LIFEHIGH: double, AVGPX: double, BIDIMPLYQTY: double, ASKIMPLYQTY: double, SIDE: string, S1: double, B1: double, SV1: double, BV1: double, S5: double, S4: double, S3: double, S2: double, B2: double, B3: double, B4: double, B5: double, SV5: double, SV4: double, SV3: double, SV2: double, BV2: double, BV3: double, BV4: double, BV5: double, PREDELTA: double, CURRDELTA: double, CHG... | 
打印parquet文件的schema:
| 1 2 3 4 | scala> parquetFile.printSchema() root  |-- CONTRACTID: string (nullable = false)  |-- TDATETIME: string (nullable = false) | 
注意:如果sqlContext.setConf(“spark.sql.parquet.binaryAsString”,”false”),则列数据类型将为原始的binary。这里自动进行了转换。
将parquet文件注册为临时表
| 1 | scala> parquetFile.registerTempTable("parquetFile") | 
parquet表的DML操作
执行sql:
| 1 2 | scala> val tdatetime = sqlContext.sql("SELECT TDATETIME FROM parquetFile") tdatetime: org.apache.spark.sql.DataFrame = [TDATETIME: string] | 
遍历结果:
| 1 2 3 4 5 6 7 8 9 | scala> contractid.map(t => "TDATETIME: " + t(0)).collect().foreach(println) 15/10/10 10:17:56 INFO MemoryStore: ensureFreeSpace(223942) called with curMem=276682, maxMem=15558896517 […A lot of spark log output…] 15/10/10 10:17:56 INFO DAGScheduler: Stage 1 (collect at <console>:26) finished in 0.265 s 15/10/10 10:17:56 INFO DAGScheduler: Job 1 finished: collect at <console>:26, took 0.284977 s TDATETIME: 2015-05-04 08:45:35.223 TDATETIME: 2015-05-04 08:46:36.067 TDATETIME: 2015-05-04 08:47:36.940 [……] |