RDD2DataFrame
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.avcdata</groupId>
<artifactId>spark-test</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.avcdata.Main</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-test-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.0</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>central</id>
<name>Central Repository</name>
<url>http://repo.maven.apache.org/maven2</url>
<layout>default</layout>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>
RDD2DataFrameRelection.java
package com.avcdata;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import java.util.List;
public class RDD2DataFrameRelection {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("RDD2DataFrameRelection").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> lines = sc.textFile("students.txt");
JavaRDD<Student> studentRDD = lines.map((Function<String, Student>) line -> {
String[] lineSplited = line.split(",");
Student stu = new Student();
stu.setId(Integer.valueOf(lineSplited[0]));
stu.setName(lineSplited[1]);
stu.setAge(Integer.valueOf(lineSplited[2]));
return stu;
});
// 使用反射方式将RDD转换为DataFrame
DataFrame studentDF = sqlContext.createDataFrame(studentRDD, Student.class);
studentDF.printSchema();
// 有了DataFrame后就可以注册一个临时表,SQL语句还是查询年龄小于18岁的人
studentDF.registerTempTable("student");
DataFrame teenagerDF = sqlContext.sql("SELECT * FROM student WHERE age <= 18");
JavaRDD<Row> teenagerRDD = teenagerDF.toJavaRDD();
JavaRDD<Student> teenagerStudentRDD = teenagerRDD.map((Function<Row, Student>) row -> {
// 可以直接通过列名了从Row里面来获取数据,这样的好处就是不用担心顺序
int id = row.getAs("id");
int age = row.getAs("age");
String name = row.getAs("name");
Student stu = new Student();
stu.setId(id);
stu.setAge(age);
stu.setName(name);
return stu;
});
List<Student> studentList = teenagerStudentRDD.collect();
studentList.forEach(System.out::println);
}
}
Scala版本
package com.avcdata
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
case class Person(id: Int, name: String, age: Int)
object RDD2DataFrame {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD2DataFrame").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val lines = sc.textFile("students.txt")
val studentRDD = lines.map(line => {
val lineSplited = line.split(",")
Person(lineSplited(0).toInt, lineSplited(1), lineSplited(2).toInt)
})
import sqlContext.implicits._
val studentDF = studentRDD.toDF()
studentDF.registerTempTable("person")
val teenagerDF = sqlContext.sql("SELECT * FROM person WHERE age < 18")
teenagerDF.printSchema()
val teenagerPersonRDD = teenagerDF.rdd.map(row => Person(row.getAs("id"), row.getAs("name"), row.getAs("age")))
teenagerPersonRDD.collect().foreach(println)
}
}
RDD2DataFrameDynamic.java
package com.avcdata;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.ArrayList;
import java.util.List;
public class RDD2DataFrameDynamic {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("RDD2DataFrameRelection").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> lines = sc.textFile("students.txt");
JavaRDD<Row> rows = lines.map((Function<String, Row>) line -> {
String[] lineSplited = line.split(",");
return RowFactory.create(Integer.valueOf(lineSplited[0]), lineSplited[1], Integer.valueOf(lineSplited[2]));
});
// 动态构造元数据,还有一种方式是通过反射的方式来构建出DataFrame,这里我们用的是动态创建元数据
// 有些时候我们一开始不确定有哪些列,而这些列需要从数据库比如MySQL或者配置文件来加载出来
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
StructType schema = DataTypes.createStructType(fields);
DataFrame studentDF = sqlContext.createDataFrame(rows, schema);
studentDF.registerTempTable("stu");
DataFrame teenagerDF = sqlContext.sql("SELECT * FROM stu WHERE age <= 18");
List<Row> teenagerList = teenagerDF.javaRDD().collect();
teenagerList.forEach(System.out::println);
}
}
RDD2DataFrameDynamic.scala
package com.avcdata
import org.apache.spark.sql.{RowFactory, SQLContext}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
object RDD2DataFramedy {
def main(args: Array[String]): Unit = {
// 初始化配置及上下文
val conf = new SparkConf().setAppName("RDD2DataFrame").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// 从文件中读取行RDD
val lines = sc.textFile(path = "students.txt")
// 将文件RDD转为RDD[Row]
val rows = lines.map(line => {
val lineSplited = line.split(",")
RowFactory.create(Integer.valueOf(lineSplited(0)), lineSplited(1), Integer.valueOf(lineSplited(2)))
})
// 自定义DataFrame格式
val schema = new StructType().add(StructField("id", DataTypes.IntegerType, nullable = true))
.add(StructField("name", DataTypes.StringType, nullable = true))
.add(StructField("age", DataTypes.IntegerType, nullable = true))
val personDF = sqlContext.createDataFrame(rows, schema)
personDF.registerTempTable(tableName = "stu")
val teenagerPersonDF = sqlContext.sql(sqlText = "SELECT * FROM stu WHERE age < 18")
teenagerPersonDF.rdd.collect foreach println
}
}
students.txt
1,yasaka,17
2,marry,17
3,jack,18
4,tom,19