出于测试目的,我配置了一个 4 节点集群,每个集群都有一个 Spark Worker 和一个 MongoDB Shard。这些是详细信息:
- 四个 Debian 9 服务器(命名为 visa0、visa1、visa2、visa3)
- Spark(v2.4.0) 集群在 4 个节点上(visa1:master,visa0..3:slave)
- MongoDB(v3.2.11)分片集群con 4节点(配置服务器副本集在visa1..3上,mongos在visa1上,分片服务器:visa0..3)
- 我正在使用安装了“spark-shell --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0”的 MongoDB Spark 连接器
使用 配置 SparkSession 时MongoShardedPartitioner
,从数据库加载的每个数据帧都是空的,尽管数据帧模式已正确获取。
这可以在spark-defaults.conf
文件中或.config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner")
在 SparkSession 构建器中完成配置。
, df.count MongoShardedPartitioner
() == 0:
./pyspark --master "spark://visa1:7077" --packages "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Python version 3.5.3 (default, Sep 27 2018 17:27:03)
SparkSession available as 'spark'.
>>> spark2 = SparkSession \
... .builder \
... .appName("myApp") \
... .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") \
... .getOrCreate()
>>>
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
>>>
>>> df2.count()
0
但无需指定分区程序即可正常工作:
./pyspark --master "spark://visa1:7077" --packages "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Python version 3.5.3 (default, Sep 27 2018 17:27:03)
SparkSession available as 'spark'.
>>> spark2 = SparkSession \
... .builder \
... .appName("myApp") \
... .getOrCreate()
>>>
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
2019-01-07 22:7:33 WARN Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
>>>
>>> df2.count()
1162
问题:
- 我如何知道默认配置了哪个分区器?
MongoShardedPartitioner
在这种情况下如何使用?
提前致谢
2019 年 1 月 13 日:推荐的解决方法
正如下面所回答的,似乎MongoShardedPartitioner
不支持将散列索引作为分片索引。但是,我需要一个哈希索引来将块均匀地分布在我的节点上,与时间无关(我猜使用 _id 会按时间顺序分布)。
我的解决方法是在数据库中使用计算出的日期桶的 md5 散列创建一个新字段,将其编入索引(作为普通索引),并将其用作分片索引。
现在,代码工作正常:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Python version 3.5.3 (default, Sep 27 2018 17:25:39)
SparkSession available as 'spark'.
>>>
>>>
>>> spark2 = SparkSession \
... .builder \
... .appName("myApp") \
... .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") \
... .config("spark.mongodb.input.partitionerOptions.shardkey", "datebuckethash") \
... .getOrCreate()
>>>
>>>
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
2019-01-13 11:19:31 WARN Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
>>>
>>> df2.count()
1162