一、Dataframe写入Elasticsearch
1.1 依赖
根据实际使用的spark版本及ES版本选择合适的包,在提交任务时指定--packages
参数即可。
example:
--packages=org.elasticsearch:elasticsearch-spark-30_2.12:7.13.1
1.2 参考地址
- es packages: https://search.maven.org/search?q=g:org.elasticsearch
- spark-es configuration: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
1.3 pyspark代码示例
to_es.py
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from config import ES_CONFIG
spark = SparkSession \
.builder \
.appName("to_es") \
.getOrCreate()
data_schema = StructType([
StructField("name", StringType(), True),
StructField("source", StringType(), True),
StructField("end_format", StringType(), True),
StructField("operator_time", StringType(), True),
StructField("operator_user", StringType(), True),
StructField("sha1", StringType(), True),
])
df = spark.read.csv("/home/testuser/data/csv/", schema=data_schema , sep="\t", header=False)
df.write.format('es') \
.mode('append') \
.options(**{
"es.write.operation": "upsert", # 更新模式
"es.spark.dataframe.write.null": "true", # 支持写入null值字段,默认会忽略写入value为null的字段
"es.mapping.id": "name", # 作为写入更新唯一字段不重复插入,同时作为写入的document_id
'es.resource': ES_CONFIG['index'],
'es.nodes.wan.only': 'true',
'es.nodes': ES_CONFIG['nodes'],
'es.port': ES_CONFIG['port'],
'es.net.http.auth.user': ES_CONFIG['user'],
'es.net.http.auth.pass': ES_CONFIG['password']
}).save()
1.4 任务提交示例
spark-submit --packages=org.elasticsearch:elasticsearch-spark-30_2.12:7.13.1 to_es.py
二、Dataframe 写入Redis
2.1 按照自定义格式写入Redis
import datetime
import functools
import os
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from config import REDIS_CONFIG, BASE_OUTPUT_DIR
import redis
spark = SparkSession.builder.appName("to-redis").getOrCreate()
data_schema = StructType([
StructField("name", StringType(), True),
StructField("source", StringType(), True),
StructField("end_format", StringType(), True),
StructField("operator_time", StringType(), True),
StructField("operator_user", StringType(), True),
StructField("sha1", StringType(), True),
])
def to_redis(part, batch=500):
redis_pool = redis.ConnectionPool(host='127.0.0.1', port=26379, db=10, password='password')
redis_cli = redis.StrictRedis(connection_pool=redis_pool)
cnt = 0
pipeline = redis_cli.pipeline()
for row in part:
pipeline.set(row.name, "\t".join([row.name, row.source, row.end_format]))
cnt += 1
if cnt > 0 and cnt % batch == 0:
pipeline.execute()
if cnt % batch != 0:
pipeline.execute()
pipeline.close()
redis_cli.close()
sdf = spark.read.csv("/home/testuser/data/csv/", schema=data_schema, header=False, sep="\t")
sdf.show()
# 按照自定义的写入方式和格式 分片写入到redis
sdf.foreachPartition(functools.partial(to_redis, batch=500))
2.2使用spark-redis连接器JAR包写入redis
- 连接器下载及文档链接:https://github.com/RedisLabs/spark-redis
- 参考链接1: https://redis.com/blog/getting-started-redis-apache-spark-python/
- 参考链接2: https://github.com/RedisLabs/spark-redis/blob/branch-2.4/doc/python.md
注意: 使用该方式写入redis时,写入方式均为HSET,无法定制
spark = SparkSession.builder. \
config("spark.redis.host", "127.0.0.1"). \
config("spark.redis.port", "6379"). \
config("spark.redis.auth", "password"). \
config("spark.redis.db", 10). \
getOrCreate()
data_schema = StructType([
StructField("name", StringType(), True),
StructField("source", StringType(), True),
StructField("end_format", StringType(), True),
StructField("operator_time", StringType(), True),
StructField("operator_user", StringType(), True),
StructField("sha1", StringType(), True),
])
sdf = spark.read.csv("/home/testuser/data/csv/", schema=data_schema, header=False, sep="\t")
sdf.write.format("org.apache.spark.sql.redis").option("table", "name_group").option("key.column", "name").save()
2.2.1任务提交示例
spark-submit --jars <path-to>/spark-redis-<version>-jar-with-dependencies.jar to_redis.py
三、 Dataframe 写入MongoDB
- 使用packages参数自动下载连接器
--packages=org.mongodb.spark:mongo-spark-connector_2.11:2.4.2
- 参考地址:https://www.mongodb.com/docs/spark-connector/v3.0/
# uri可包含使用的数据库、集合,spark默认使用uri中指定的数据集
mongo_uri = "mongodb://username:password@127.0.0.1:27017/db_name.collection_name?authSource=admin"
spark = SparkSession.builder.\
appName("to_mongo").\
config("spark.mongodb.input.uri", mongo_uri). \
getOrCreate()
data_schema = StructType([
StructField("name", StringType(), True),
StructField("source", StringType(), True),
StructField("end_format", StringType(), True),
StructField("operator_time", StringType(), True),
StructField("operator_user", StringType(), True),
StructField("sha1", StringType(), True),
])
sdf = self.spark.read.format("com.mongodb.spark.sql.DefaultSource").load(schema=data_schema)
3.2 任务提交示例
spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.2 to_mongo.py