我想使用谷歌云存储从我的流媒体作业中写入(接收)元素。DataStreamStreamingFileSink
为此,我使用Hadoop 的Google Cloud Storage 连接器作为 的实现,org.apache.hadoop.fs.FileSystem并用作包装 Flink 的hadoop FileSystem类的实现。HadoopFileSystemorg.apache.flink.core.fs.FileSystem
我在我的 gradle 文件中包含了以下依赖项:
compile("com.google.cloud.bigdataoss:gcs-connector:1.9.4-hadoop2")compile("org.apache.flink:flink-connector-filesystem_2.11:1.6.0")provided("org.apache.flink:flink-shaded-hadoop2:1.6.0")
现在,根据我对源[1] [2] [3]FileSystemFactory的理解,Flink在运行时(通过java.util.ServiceLoader)动态加载实现,并且HadoopFsFactory在运行时(通过反射,如果它在类路径中找到 Hadoop)加载它然后用于创建FileSystem.
我面临的问题是RecoverableWriterHadoop 兼容包的默认值仅支持hdfs文件方案(我使用gs),因此在运行时抛出错误。
所以,我(我调用extended了)和返回一个自定义实现,然后处理恢复的细节等,并创建了一个相应的类(该类用 装饰,因此应该可以被 发现)。HadoopFileSystemGCSFileSystem@overridedFileSystem#createRecoverableWriter()RecoverableWriterFileSystemFactory@AutoServiceServiceLoader
该设置在本地和本地 docker 集群上运行良好(实际上 GCS 连接器由于缺乏授权而引发错误,但这很好,因为这意味着FileSystem已加载并正在运行)但是当我将它部署到正在运行的 docker 集群时它会失败在谷歌计算引擎上。
在 GCE 上,默认值HadoopFileSystem被加载并按照方案 isgs而不是抛出异常hdfs,但我的假设是它应该已经加载了我的工厂实现,因此不应该出现这个错误。
我在 Flink v1.6.0上并使用docker-flink在 Docker 上运行长时间运行的会话集群