0

我使用spark rdd将数据写入mysql,我使用的运算符是foreachPartition,在运算符中我设置连接池并写入数据(使用scalike jdbc的),然后破坏池,但是似乎偶尔找不到连接池,日志说Connection pool is not yet initialized. (name:'xxx),我不知道为什么会这样

数据终于完全插入了。但是这个异常让我很困惑

4

1 回答 1

0

我相信你已经以同样的方式实现了(如果使用了java)

dstream.foreachRDD(rdd -> {

  rdd.foreachPartition(partitionOfRecords -> {

    Connection connection = createNewConnection();
    while (partitionOfRecords.hasNext()) {
      connection.send(partitionOfRecords.next());
    }
    connection.close();
  });
});

在 createNewConnection() 方法中,您只需实现单例连接对象模式并离开而不关闭。

dstream.foreachRDD(rdd -> {

  rdd.foreachPartition(partitionOfRecords -> {

    Connection connection = ConnectionObject.singleTonConnection();
    while (partitionOfRecords.hasNext()) {
      connection.send(partitionOfRecords.next());
    }

  });
});

//单吨方法应该是这样的

public class ConnectionObject (){


private static Connection=null;

public static Connection singleTonConnection(){

if(Connection !=null){

/** get new connection from spring data source or jdbc client**/

}
return Connection;

}
}
于 2019-08-20T10:19:26.520 回答