python 读取各种 文件

使用mapPartitions()来重用解析器

读取 非结构化的python

import json
data = input.map(lambda x : json.loads(x))


读取CSV 文件 用python
import csv
import StringIO
...
def loadRecord(line):
	"""解析一行csv记录"""
	input = StringIO.StringIO(line)
	reader = csv.DictReader(input,fieldnames = ["name","favoriteAnimal"])
	return reader.next()
input = sc.textFile(inputFile).map(loadRecord)

存储csv :

def writeRecord(records):
	"""写出一些csv记录"""
	output =  StringIO.StringIO()
	writer = csv.DictWriter(output,fieldnames=["name","favoriteAnimal"])
	for record in records:
		writer.writerow(record)
	return [output.getvalue()]

pandaLoves.mapPartitions(writeRecords).saveAsTextFile(outputfile)

 Spark Sql 读取 Hive 数据

1、将Hive-site.xml文件复制到Spark 的./conf/目录下

2、创建HiveContext 对象,Spark SQL 的 入口,spark2.0以后可以用sparkSession API 来实现创建对象。

代码如下:

from pyspark.sql import HiveContext

hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT name,age From Users")
firstRow = rows.first()
print(firstRow.name)

获取结构一致的json文件

#使用jsonFile方法从整个文件中获取由ROW对象组成的RDD
tweets = hiveCtx.jsonFile("tweets.json)
#也可以将RDD注册成为一张表,从中选出特定的字段
tweets.registerTempTable('tweets')
results = hiveCtx.sql("SELECT user.name,text FROM tweets")

转载自:https://blog.csdn.net/Cincinnati_De/article/details/79878068

You may also like...