基础操作:选择、映射、聚合
我们可以对流式dataset/dataframe执行所有类型的操作,包括untyped操作,SQL类操作,typed操作。
case class DeviceData(device: String, type: String, signal: Double, time: DateTime)
val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData] // streaming Dataset with IOT device data
// Select the devices which have signal more than 10
df.select("device").where("signal > 10") // using untyped APIs
ds.filter(_.signal > 10).map(_.device) // using typed APIs
// Running count of the number of updates for each device type
df.groupBy("type").count() // using untyped API
// Running average signal for each device type
Import org.apache.spark.sql.expressions.scalalang.typed._
ds.groupByKey(_.type).agg(typed.avg(_.signal)) // using typed API
滑动窗口:基于event-time
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
对延迟数据是天然支持的
join操作
structured streaming,支持将一个流式dataset与一个静态dataset进行join。
val staticDf = spark.read. ...
val streamingDf = spark.readStream. ...
streamingDf.join(staticDf, “type”) // inner equi-join with a static DF
streamingDf.join(staticDf, “type”, “right_join”) // right outer join with a static DF
不支持的操作
- streaming dataframe的chain aggregation
- limit and take
- distinct
- sort
仅在聚合过后,同时使用complete output mode时可用 - streaming dataframe和static dataframe的outer join
full outer join是不支持的
streaming dataframe在左侧时,left outer join是不支持的
streaming dataframe在右侧时,right outer join是不支持的 - 两个streaming dataframe的join是不支持的
- count()
只能groupBy().count() - foreach()
只能df.writeStream.foreach() - show()
只能console output sink