我使用spark rdd将数据写入mysql,我使用的运算符是foreachPartition
,在运算符中我设置连接池并写入数据(使用scalike jdbc的),然后破坏池,但是似乎偶尔找不到连接池,日志说Connection pool is not yet initialized. (name:'xxx)
,我不知道为什么会这样
数据终于完全插入了。但是这个异常让我很困惑
我使用spark rdd将数据写入mysql,我使用的运算符是foreachPartition
,在运算符中我设置连接池并写入数据(使用scalike jdbc的),然后破坏池,但是似乎偶尔找不到连接池,日志说Connection pool is not yet initialized. (name:'xxx)
,我不知道为什么会这样
数据终于完全插入了。但是这个异常让我很困惑
我相信你已经以同样的方式实现了(如果使用了java)
dstream.foreachRDD(rdd -> {
rdd.foreachPartition(partitionOfRecords -> {
Connection connection = createNewConnection();
while (partitionOfRecords.hasNext()) {
connection.send(partitionOfRecords.next());
}
connection.close();
});
});
在 createNewConnection() 方法中,您只需实现单例连接对象模式并离开而不关闭。
dstream.foreachRDD(rdd -> {
rdd.foreachPartition(partitionOfRecords -> {
Connection connection = ConnectionObject.singleTonConnection();
while (partitionOfRecords.hasNext()) {
connection.send(partitionOfRecords.next());
}
});
});
//单吨方法应该是这样的
public class ConnectionObject (){
private static Connection=null;
public static Connection singleTonConnection(){
if(Connection !=null){
/** get new connection from spring data source or jdbc client**/
}
return Connection;
}
}