问题标签 [apache-kafka-connect]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
elasticsearch - Kafka-Connect vs Filebeat & Logstash
我希望从 Kafka 消费并将数据保存到 Hadoop 和 Elasticsearch 中。我目前已经看到了两种方法:使用 Filebeat 从 Kafka 消费并将其发送到 ES 和使用 Kafka-Connect 框架。有一个 Kafka-Connect-HDFS 和 Kafka-Connect-Elasticsearch 模块。
我不确定使用哪一个来发送流数据。虽然我认为如果我想在某个时候从 Kafka 获取数据并将其放入 Cassandra,我可以为此使用 Kafka-Connect 模块,但 Filebeat 不存在这样的功能。
hdfs - Kafka-Connect-Hdfs - 无法启动 HdfsSinkConnector
我已经从http://docs.confluent.io/2.0.0/quickstart.html#quickstart下载了 kafka connect
我正在尝试运行 hdfs 连接器。以下是设置:
连接-standalone.properties:
和
快速入门-hdfs.properties:
我像这样运行 hdfs 连接器:
cd /home/fclvappi005561/confluent-3.0.0/bin
./connect-standalone ../etc/kafka-connect-hdfs/connect-standalone.properties ../etc/kafka-connect-hdfs/quickstart-hdfs.properties
但我收到一个错误:
[2016-09-12 17:19:28,039] 信息无法启动 HdfsSinkConnector:(io.confluent.connect.hdfs.HdfsSinkTask:72)org.apache.kafka.connect.errors.ConnectException:org.apache.hadoop。 security.AccessControlException: Permission denied: user=lvpi005561, access=WRITE, inode="/topics":root:supergroup:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker. java:319) 在 org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:292) 在 org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:213)在 org.apache.hadoop.hdfs.server.namenode.FSDirectory 的 org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)。checkPermission(FSDirectory.java:1698) at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1682) at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory. java:1665) 在 org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:71) 在 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:3900)在 org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:978) 在 org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:622) 在 org.apache.hadoop .hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) 在 org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) 在 org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) 的 org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) .hadoop.ipc.Server$Handler$1.run(Server.java:2049) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) at java.security.AccessController.doPrivileged(Native方法)在 javax.security.auth.Subject.doAs(Subject.java:415) 在 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) 在 org.apache.hadoop.ipc.Server$Handler .run(Server.java:2043) at io.confluent.connect.hdfs.DataWriter.(DataWriter.java:202) at io.confluent.connect.hdfs.HdfsSinkTask.start(HdfsSinkTask.java:64) at org.apache .kafka.connect.runtime.WorkerSinkTask。initializeAndStart(WorkerSinkTask.java:207) 在 org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:139) 在 org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)在 org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 在 java.util.concurrent.FutureTask.run (FutureTask.java:266) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread。 run(Thread.java:745) 引起:org.apache.hadoop.security.AccessControlException:权限被拒绝:user=fclvappi005561, access=WRITE, inode="/topics":root:supergroup:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:319) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:292 ) 在 org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190) 在 org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:213) 在 org. org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1682) 在 org.apache.hadoop 的 apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1698)。 hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1665) 在 org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp。mkdirs(FSDirMkdirOp.java:71) 在 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:3900) 在 org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer. java:978) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:622) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) 中的 org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) 中的 org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) 中的 org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)。 ipc.Server$Handler$1.run(Server.java:2049) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) 在 java.security.AccessController.doPrivileged(Native Method) 在 javax.security.auth.Subject.doAs(Subject.java:415) 在 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java: 1657) 在 sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 在 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)在 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 在 java.lang.reflect.Constructor.newInstance(Constructor.java:423) 在 org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106 ) 在 org.apache.hadoop.ipc.RemoteException。unwrapRemoteException(RemoteException.java:73) at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2755) at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2724) at org.apache .hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:870) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:866) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver .java:81) 位于 org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:859) 的 org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:866)。 fs.FileSystem.mkdirs(FileSystem.java:1817) 在 io.confluent.connect.hdfs.storage.HdfsStorage.mkdirs(HdfsStorage.java:61) at io.confluent.connect.hdfs.DataWriter.createDir(DataWriter.java:369) at io.confluent.connect.hdfs.DataWriter.(DataWriter.java:170) ... 10 更多原因:org.apache .hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException):权限被拒绝:user=fclvappi005561, access=WRITE, inode="/topics":root:supergroup:drwxr-xr-x at org.apache。 hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:319) 在 org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:292) 在 org.apache.hadoop.hdfs。 server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:213) 在 org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190) 在 org.org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1682) 在 org.apache.hadoop 的 apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1698)。 hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1665) 在 org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:71) 在 org.apache.hadoop.hdfs.server。 namenode.FSNamesystem.mkdirs(FSNamesystem.java:3900) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:978) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs( ClientNamenodeProtocolServerSideTranslatorPB.java:622) 在 org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2。callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)在 org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) 在 org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) 在 java.security.AccessController .doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hadoop.ipc .Server$Handler.run(Server.java:2043) at org.apache.hadoop.ipc.Client.call(Client.java:1468) at org.apache.hadoop.ipc.Client.call(Client.java:1399 ) 在 org.apache.hadoop.ipc。ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) at com.sun.proxy.$Proxy47.mkdirs(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539) at sun .reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke (Method.java:498) 在 org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) 在 org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) 在com.sun.proxy.$Proxy48。mkdirs(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2753) ... 还有 20 个
我应该提一下,我在 127.0.0.1 本地运行了一个 hadoop 的 docker 映像:docker run -d -p 9000:9000 sequenceiq/hadoop-docker:2.7.1
我看到的这个权限被拒绝错误是什么?我在不同的主机上,下面提到的主机bootstrap.servers
apache-kafka-connect - kafka-connect 框架实现恰好一次语义所需的设置
我的印象是,在 Kafka-Connect 中,您可以在 .properties 文件中指定一些参数以启用完全一次语义。
我一直找不到这些设置;但我找到了其他方法来实现这一点,例如
https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md
甚至更老的
https://cwiki.apache.org/confluence/display /卡夫卡/常见问题
是否可以通过更改 kafka connect 中的设置来实现一次语义?
apache-kafka - 如何在kafka中只发送一次avro模式
我正在使用以下代码(不是真的,但让我们假设它)来创建一个模式并由生产者将其发送给 kafka。
问题是代码只允许我使用此模式发送 1 条消息。然后我需要更改模式名称以发送下一条消息......所以名称字符串现在是随机生成的,所以我可以发送更多消息。这是一个黑客,所以我想知道正确的方法来做到这一点。
我还研究了如何在没有架构的情况下发送消息(即,已经向 kafka 发送了 1 条带有架构的消息,现在所有其他消息都不再需要架构了)——但new GenericData.Record(..)
需要一个架构参数。如果它为空,它会抛出一个错误。
那么将avro模式消息发送到kafka的正确方法是什么?
这是另一个代码示例 - 与我的非常相似:
https ://github.com/confluentinc/examples/blob/kafka-0.10.0.1-cp-3.0.1/kafka-clients/producer/src/main/java/io /confluent/examples/producer/ProducerExample.java
它也没有显示如何在不设置模式的情况下发送。
apache-kafka - 使用连接器通过 ID/主题获取注册模式
是否可以从源连接器中获取已注册的架构?我在模式注册表中注册了一个静态模式,需要从连接器获取该模式并将其传递给 SourceRecord。
我使用 CachedSchemaRegistryClient 使用模式 id 获取模式,但它返回了 Avro 模式。所以我必须将其转换为 org.apache.kafka.connect.data.Schema。请就此提供一些指示。
或者连接器中的任何其他方式,我可以在不使用 CachedSchemaRegistryClient 的情况下直接执行相同操作?
谢谢你,斯里吉斯
apache-kafka - 从 Connect 属性获取架构注册表 URL
如何从连接器中的连接属性中获取模式注册表 url?
我喜欢在我的连接器中访问此架构 url 以获取基于 ID 的架构。
是的,我可以明确地将 schema.url 作为我的连接器中的属性传递。但是还有其他方法可以从 connect proeprties 在连接器中自动获取它吗?
谢谢你,
jms - Kafka 连接器 - 用于 Kafka 主题的 JMSSourceConnector
Confluent 默认情况下是否为 Kafka 主题提供此 JMSSourceConnector。
或者我们需要为此编写自定义连接器?
我在 Confluent 页面上没有看到任何文档。
apache-kafka - 使 Kafka 主题日志保留永久化
我正在将日志消息写入 Kafka 主题,并且我希望永久保留该主题。我在 Kafka 和 Kafka Connect(_schemas、connect-configs、connect-status、connect-offsets 等)中看到了日志保留时间不会删除的特殊主题。我如何强制一个主题与这些其他特殊主题一样?是命名约定还是其他一些属性?
谢谢
java - Kafka Connect:如何从结构中获取嵌套字段
我正在使用 Kafka-Connect 来实现 Kafka-Elasticsearch 连接器。
生产者向 Kafka 主题发送了一个复杂的 JSON,我的连接器代码将使用它来持久化到 Elastic 搜索。连接器以 Struct 的形式获取数据(https://kafka.apache.org/0100/javadoc/org/apache/kafka/connect/data/Struct.html)。
我能够在顶级 Json 中获取 struct 的字段值,但无法从嵌套的 json 中获取。
我能够解析“op”,但不能解析“test.test.employee.Value”。
hadoop - kafka HDFS 连接器在多 DC 设置中连接到私有 IP 而不是主机名
我有 2 个集群:
- 一个在房子里有汇合的 (3.0.0-1)
- AWS中的一个,带有hadoop(hdp 2.4)
我正在尝试使用 hdfs 连接器从 confluent 写入 hadoop。
长话短说:连接器尝试连接到 hadoop 集群的私有 IP,而不是使用主机名。在内部集群上,/etc/hosts 已更新以将内部 hadoop 主机名解析为相关的公共 IP。
我正在使用分布式连接器,我有一堆连接器 JSON 文件,如下所示:
并且工人被定义为:
几点注意事项:
- /kafka-connect 存在于 hdfs 上,世界可写
- 3 个主题 (*.storage.topic) 确实存在
- 我有一个工作人员在每 (3) 个带有 kafka 代理的服务器上运行(在所有代理上都有一个模式注册表、rest API 和 zookeeper 服务器)
- 我已将 dfs.client.use.datanode.hostname 设置为 true,并且此属性在 $HADOOP_HOME/hdfs-site.xml 中的客户端上设置
我看到创建了 /kafka-connect 的子目录以及配置单元元数据。当我启动连接器时,消息是:
createBlockOutputStream (org.apache.hadoop.hdfs.DFSClient:1471) org.apache.hadoop.net.ConnectTimeoutException 中的信息异常:等待通道准备好连接时出现 60000 毫秒超时。ch : java.nio.channels.SocketChannel [在 org.apache.hadoop.net.NetUtils.connect(NetUtils.java:533) 在 org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java: 1610) 在 org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.DFSOutputStream(DFSOutputStream.java:1361) 在 org.apache.hadoop 的 org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1408) .hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:588) 信息放弃 BP-429601535-10.0.0.167-1471011443948:blk_1073742319_1495 (org.apache.hadoop.hdfs.DFSClient:
关于如何解决这个问题的任何想法?看起来融合直接接收 IP,而不是主机名。