scala> // Define the case classes for using in conjunction with DataFrames and Dataset
scala> case class Trans(accNo: String, tranAmount: Double)
defined class Trans
scala> // Creation of the list from where the Dataset is going to be created using a case class.
scala> val acTransList = Seq(Trans("SB10001", 1000), Trans("SB10002",1200), Trans("SB10003", 8000), Trans("SB10004",400), Trans("SB10005",300),Trans("SB10006",10000), Trans("SB10007",500), Trans("SB10008",56),Trans("SB10009",30),Trans("SB10010",7000), Trans("CR10001",7000),Trans("SB10002",-10))
acTransList: Seq[Trans] = List(Trans(SB10001,1000.0), Trans(SB10002,1200.0), Trans(SB10003,8000.0), Trans(SB10004,400.0), Trans(SB10005,300.0), Trans(SB10006,10000.0), Trans(SB10007,500.0), Trans(SB10008,56.0), Trans(SB10009,30.0), Trans(SB10010,7000.0), Trans(CR10001,7000.0), Trans(SB10002,-10.0))
scala> // Create the Dataset
scala> val acTransDS = acTransList.toDS()
acTransDS: org.apache.spark.sql.Dataset[Trans] = [accNo: string, tranAmount: double]
scala> acTransDS.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10001| 1000.0|
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10004| 400.0|
|SB10005| 300.0|
|SB10006| 10000.0|
|SB10007| 500.0|
|SB10008| 56.0|
|SB10009| 30.0|
|SB10010| 7000.0|
|CR10001| 7000.0|
|SB10002| -10.0|
+-------+----------+
scala> // Apply filter and create another Dataset of good transaction records
scala> val goodTransRecords = acTransDS.filter(_.tranAmount > 0).filter(_.accNo.startsWith("SB"))
goodTransRecords: org.apache.spark.sql.Dataset[Trans] = [accNo: string, tranAmount: double]
scala> goodTransRecords.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10001| 1000.0|
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10004| 400.0|
|SB10005| 300.0|
|SB10006| 10000.0|
|SB10007| 500.0|
|SB10008| 56.0|
|SB10009| 30.0|
|SB10010| 7000.0|
+-------+----------+
scala> // Apply filter and create another Dataset of high value transaction records
scala> val highValueTransRecords = goodTransRecords.filter(_.tranAmount > 1000)
highValueTransRecords: org.apache.spark.sql.Dataset[Trans] = [accNo: string, tranAmount: double]
scala> highValueTransRecords.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10006| 10000.0|
|SB10010| 7000.0|
+-------+----------+
scala> // The function that identifies the bad amounts
scala> val badAmountLambda = (trans: Trans) => trans.tranAmount <= 0
badAmountLambda: Trans => Boolean = <function1>
scala> // The function that identifies bad accounts
scala> val badAcNoLambda = (trans: Trans) => trans.accNo.startsWith("SB") == false
badAcNoLambda: Trans => Boolean = <function1>
scala> // The function that identifies bad accounts
scala> val badAcNoLambda = (trans: Trans) => trans.accNo.startsWith("SB") == false
badAcNoLambda: Trans => Boolean = <function1>
scala> // Apply filter and create another Dataset of bad amount records
scala> val badAmountRecords = acTransDS.filter(badAmountLambda)
badAmountRecords: org.apache.spark.sql.Dataset[Trans] = [accNo: string, tranAmount: double]
scala> badAmountRecords.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10002| -10.0|
+-------+----------+
scala> // Apply filter and create another Dataset of bad account records
scala> val badAccountRecords = acTransDS.filter(badAcNoLambda)
badAccountRecords: org.apache.spark.sql.Dataset[Trans] = [accNo: string, tranAmount: double]
scala> badAccountRecords.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|CR10001| 7000.0|
+-------+----------+
scala> // Do the union of two Dataset and create another Dataset
scala> val badTransRecords = badAmountRecords.union(badAccountRecords)
badTransRecords: org.apache.spark.sql.Dataset[Trans] = [accNo: string, tranAmount: double]
scala> badTransRecords.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10002| -10.0|
|CR10001| 7000.0|
+-------+----------+
scala> // Calculate the sum
scala> val sumAmount = goodTransRecords.map(trans => trans.tranAmount).reduce(_ + _)
sumAmount: Double = 28486.0
scala> // Calculate the maximum
scala> val maxAmount = goodTransRecords.map(trans => trans.tranAmount).reduce((a, b) => if (a > b) a else b)
maxAmount: Double = 10000.0
scala> // Calculate the minimum
scala> val minAmount = goodTransRecords.map(trans => trans.tranAmount).reduce((a, b) => if (a < b) a else b)
minAmount: Double = 30.0
scala> // Convert the Dataset to DataFrame
scala> val acTransDF = acTransDS.toDF()
acTransDF: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
scala> acTransDF.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10001| 1000.0|
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10004| 400.0|
|SB10005| 300.0|
|SB10006| 10000.0|
|SB10007| 500.0|
|SB10008| 56.0|
|SB10009| 30.0|
|SB10010| 7000.0|
|CR10001| 7000.0|
|SB10002| -10.0|
+-------+----------+
scala> // Use Spark SQL to find out invalid transaction records
scala> acTransDF.createOrReplaceTempView("trans")
scala> val invalidTransactions = spark.sql("SELECT accNo, tranAmount FROM trans WHERE (accNo NOT LIKE 'SB%') OR tranAmount <= 0")
19/12/02 22:29:36 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
invalidTransactions: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
scala> invalidTransactions.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|CR10001| 7000.0|
|SB10002| -10.0|
+-------+----------+
scala> // Interoperability of RDD, DataFrame and Dataset
scala> // Create RDD
scala> val acTransRDD = sc.parallelize(acTransList)
acTransRDD: org.apache.spark.rdd.RDD[Trans] = ParallelCollectionRDD[41] at parallelize at <console>:26
scala> // Convert RDD to DataFrame
scala> val acTransRDDtoDF = acTransRDD.toDF()
acTransRDDtoDF: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
scala> // Convert the DataFrame to Dataset with the type checking
scala> val acTransDFtoDS = acTransRDDtoDF.as[Trans]
acTransDFtoDS: org.apache.spark.sql.Dataset[Trans] = [accNo: string, tranAmount: double]
scala> acTransDFtoDS.show()
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10001| 1000.0|
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10004| 400.0|
|SB10005| 300.0|
|SB10006| 10000.0|
|SB10007| 500.0|
|SB10008| 56.0|
|SB10009| 30.0|
|SB10010| 7000.0|
|CR10001| 7000.0|
|SB10002| -10.0|
+-------+----------+
007 Dataset
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 007~19班的战友们、前来围观的战友们和朋友们:大家晚上好!我是本月第一周志愿者,本次会议主持人:西北~甘肃~兰...
- 摘要: 每天看着身边川流不息的人群,一个个看着都忙得不可开交,我们每个人都像跟着身边的人看热闹似的,根本不知道前面...
- 一转眼求学模块就已经打卡结束了!经过学习《有效学习》这本书和猫叔的两堂分享课,我对定位过程有了进一步的认识。 定位...