python 读取各种 文件
使用mapPartitions()来重用解析器
读取 非结构化的python
import json
data = input.map(lambda x : json.loads(x))
读取CSV 文件 用python
import csv
import StringIO
...
def loadRecord(line):
"""解析一行csv记录"""
input = StringIO.StringIO(line)
reader = csv.DictReader(input,fieldnames = ["name","favoriteAnimal"])
return reader.next()
input = sc.textFile(inputFile).map(loadRecord)
存储csv :
def writeRecord(records):
"""写出一些csv记录"""
output = StringIO.StringIO()
writer = csv.DictWriter(output,fieldnames=["name","favoriteAnimal"])
for record in records:
writer.writerow(record)
return [output.getvalue()]
pandaLoves.mapPartitions(writeRecords).saveAsTextFile(outputfile)
Spark Sql 读取 Hive 数据
1、将Hive-site.xml文件复制到Spark 的./conf/目录下
2、创建HiveContext 对象,Spark SQL 的 入口,spark2.0以后可以用sparkSession API 来实现创建对象。
代码如下:
from pyspark.sql import HiveContext
hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT name,age From Users")
firstRow = rows.first()
print(firstRow.name)
获取结构一致的json文件
#使用jsonFile方法从整个文件中获取由ROW对象组成的RDD
tweets = hiveCtx.jsonFile("tweets.json)
#也可以将RDD注册成为一张表,从中选出特定的字段
tweets.registerTempTable('tweets')
results = hiveCtx.sql("SELECT user.name,text FROM tweets")
转载自:https://blog.csdn.net/Cincinnati_De/article/details/79878068