Spark SQL数据加载和保存实战

发表于:2017-4-24 10:20

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:记录之园    来源:博客园

  一:前置知识详解:
  Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作,
  Load:可以创建DataFrame,
  Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型。
  二:Spark SQL读写数据代码实战:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.ArrayList;
import java.util.List;
public class SparkSQLLoadSaveOps {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkSQLLoadSaveOps");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext = new SQLContext(sc);
/**
* read()是DataFrameReader类型,load可以将数据读取出来
*/
DataFrame peopleDF = sqlContext.read().format("json").load("E:\\Spark\\Sparkinstanll_package\\Big_Data_Software\\spark-1.6.0-bin-hadoop2.6\\examples\\src\\main\\resources\\people.json");
/**
* 直接对DataFrame进行操作
* Json: 是一种自解释的格式,读取Json的时候怎么判断其是什么格式?
* 通过扫描整个Json。扫描之后才会知道元数据
*/
//通过mode来指定输出文件的是append。创建新文件来追加文件
peopleDF.select("name").write().mode(SaveMode.Append).save("E:\\personNames");
}
}
  读取过程源码分析如下:
  1. read方法返回DataFrameReader,用于读取数据。
[[DataFrameReader]] that can be used to read data in as a [[DataFrame]].
* {{{
*   sqlContext.read.parquet("/path/to/file.parquet")
*   sqlContext.read.schema(schema).json("/path/to/file.json")
* }}}
*
* @group genericdata
* @since 1.4.0
*/
@Experimental
//创建DataFrameReader实例,获得了DataFrameReader引用
def read: DataFrameReader = new DataFrameReader(this)
  2.  然后再调用DataFrameReader类中的format,指出读取文件的格式。
  /**
  * Specifies the input data source format.
  *
  * @since 1.4.0
  */
  def format(source: String): DataFrameReader = {
  this.source = source
  this
  }
  3.  通过DtaFrameReader中load方法通过路径把传入过来的输入变成DataFrame。
  /**
  * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by
  * a local or distributed file system).
  *
  * @since 1.4.0
  */
  // TODO: Remove this one in Spark 2.0.
  def load(path: String): DataFrame = {
  option("path", path).load()
  }
  至此,数据的读取工作就完成了,下面就对DataFrame进行操作。
  下面就是写操作!!!
  1. 调用DataFrame中select函数进行对列筛选
  /**
  * Selects a set of columns. This is a variant of `select` that can only select
  * existing columns using column names (i.e. cannot construct expressions).
  *
  * {{{
  *   // The following two are equivalent:
  *   df.select("colA", "colB")
  *   df.select($"colA", $"colB")
  * }}}
  * @group dfops
  * @since 1.3.0
  */
  @scala.annotation.varargs
  def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)
  2.  然后通过write将结果写入到外部存储系统中。
  /**
  * :: Experimental ::
  * Interface for saving the content of the [[DataFrame]] out into external storage.
  *
  * @group output
  * @since 1.4.0
  */
  @Experimental
  def write: DataFrameWriter = new DataFrameWriter(this)
21/212>
《2023软件测试行业现状调查报告》独家发布~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号