1

我在计划的作业中使用 Simba JDBC 驱动程序进行 bigquery。很多时候这项工作有效,有时它失败,因为:

sept. 08 12:40:38.307 TRACE 677 com.simba.googlebigquery.googlebigquery.client.BQClient.insertJob: Read timed out
java.net.SocketTimeoutException: Read timed out
    at java.base/java.net.SocketInputStream.socketRead0(Native Method)
    at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:115)
    at java.base/java.net.SocketInputStream.read(SocketInputStream.java:168)
    at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
    at java.base/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:478)
    at java.base/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:472)
    at java.base/sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:70)
    at java.base/sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1354)
    at java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:963)
    at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:252)
    at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:292)
    at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
    at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:754)
    at java.base/sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:689)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1615)
    at java.base/sun.net.www.protocol.http.HttpURLConnection$9.run(HttpURLConnection.java:1512)
    at java.base/sun.net.www.protocol.http.HttpURLConnection$9.run(HttpURLConnection.java:1510)
    at java.base/java.security.AccessController.doPrivileged(Native Method)
    at java.base/java.security.AccessController.doPrivilegedWithCombiner(AccessController.java:795)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1509)
    at java.base/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:527)
    at java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:334)
    at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:36)
    at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:149)
    at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:84)
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1012)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:514)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:455)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:565)
    at com.simba.googlebigquery.googlebigquery.client.BQClient.insertJob(Unknown Source)
    at com.simba.googlebigquery.googlebigquery.client.BQClient.prepare(Unknown Source)
    at com.simba.googlebigquery.googlebigquery.dataengine.BQSQLExecutor.internalPrepare(Unknown Source)
    at com.simba.googlebigquery.googlebigquery.dataengine.BQSQLExecutor.<init>(Unknown Source)
    at com.simba.googlebigquery.googlebigquery.dataengine.BQDataEngine.prepare(Unknown Source)
    at com.simba.googlebigquery.jdbc.common.SPreparedStatement.<init>(Unknown Source)
    at com.simba.googlebigquery.jdbc.jdbc41.S41PreparedStatement.<init>(Unknown Source)
    at com.simba.googlebigquery.jdbc.jdbc42.S42PreparedStatement.<init>(Unknown Source)
    at com.simba.googlebigquery.googlebigquery.jdbc42.BQJDBC42ObjectFactory.createPreparedStatement(Unknown Source)
    at com.simba.googlebigquery.jdbc.common.JDBCObjectFactory.newPreparedStatement(Unknown Source)
    at com.simba.googlebigquery.jdbc.common.SConnection$5.create(Unknown Source)
    at com.simba.googlebigquery.jdbc.common.SConnection$5.create(Unknown Source)
    at com.simba.googlebigquery.jdbc.common.SConnection$StatementCreator.create(Unknown Source)
    at com.simba.googlebigquery.jdbc.common.SConnection.prepareStatement(Unknown Source)
    at com.simba.googlebigquery.jdbc.common.SConnection.prepareStatement(Unknown Source)
    at com.simba.googlebigquery.jdbc.common.SConnection.prepareStatement(Unknown Source)
    at com.zaxxer.hikari.pool.ProxyConnection.prepareStatement(ProxyConnection.java:337)
    at com.zaxxer.hikari.pool.HikariProxyConnection.prepareStatement(HikariProxyConnection.java)
    at doobie.free.KleisliInterpreter$ConnectionInterpreter.$anonfun$prepareStatement$1(kleisliinterpreter.scala:791)
    at doobie.free.KleisliInterpreter.$anonfun$primitive$2(kleisliinterpreter.scala:109)
    at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:104)
    at cats.effect.internals.IORunLoop$.restartCancelable(IORunLoop.scala:51)
    at cats.effect.internals.IOBracket$BracketStart.run(IOBracket.scala:100)
    at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:67)
    at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:35)
    at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:90)
    at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:90)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:94)
    at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:90)
    at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
    at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:42)
    at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:80)
    at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:58)
    at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:183)
    at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:463)
    at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:484)
    at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:422)
    at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

   at com.simba.googlebigquery.dsi.core.impl.DSILogger.logTrace(Unknown Source)
   at com.simba.googlebigquery.support.LogUtilities.logTrace(Unknown Source)
   at com.simba.googlebigquery.googlebigquery.client.BQClient.insertJob(Unknown Source)
   at com.simba.googlebigquery.googlebigquery.client.BQClient.prepare(Unknown Source)
   at ...

无法设置 NetworkTimeout,因为驱动程序没有实现它

sept. 08 12:40:18.085 INFO  612 com.simba.googlebigquery.jdbc.common.SConnection.SConnection: Driver version is: 01.02.19.1023
sept. 08 12:40:18.085 TRACE 612 com.simba.googlebigquery.dsi.core.impl.DSIConnection.getProperty(170): +++++ enter +++++
sept. 08 12:40:18.085 INFO  612 com.simba.googlebigquery.jdbc.common.SConnection.SConnection: Datasource version is: 01.02.19.1023
sept. 08 12:40:18.085 TRACE 612 com.simba.googlebigquery.jdbc.jdbc41.S41Connection.getNetworkTimeout(): +++++ enter +++++
sept. 08 12:40:18.092 ERROR 612 com.simba.googlebigquery.jdbc.jdbc41.S41Connection.getNetworkTimeout: [Simba][JDBC](10220) Cette fonctionnalité optionnelle n’est pas prise en charge par le pilote.
java.sql.SQLFeatureNotSupportedException: [Simba][JDBC](10220) Cette fonctionnalité optionnelle n’est pas prise en charge par le pilote.
    at com.simba.googlebigquery.exceptions.ExceptionConverter.toSQLException(Unknown Source)
    at com.simba.googlebigquery.jdbc.jdbc41.S41Connection.getNetworkTimeout(Unknown Source)
    at com.zaxxer.hikari.pool.PoolBase.getAndSetNetworkTimeout(PoolBase.java:526)
    at com.zaxxer.hikari.pool.PoolBase.setupConnection(PoolBase.java:396)
    at com.zaxxer.hikari.pool.PoolBase.newConnection(PoolBase.java:363)
    at com.zaxxer.hikari.pool.PoolBase.newPoolEntry(PoolBase.java:206)
    at com.zaxxer.hikari.pool.HikariPool.createPoolEntry(HikariPool.java:477)
    at com.zaxxer.hikari.pool.HikariPool.access$100(HikariPool.java:71)
    at com.zaxxer.hikari.pool.HikariPool$PoolEntryCreator.call(HikariPool.java:725)
    at com.zaxxer.hikari.pool.HikariPool$PoolEntryCreator.call(HikariPool.java:711)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

如何设置连接超时?我们可以用 simba 驱动设置的全局超时没有效果(设置为 360s -> 6min)。

bq 作业插入后约 20 秒连接失败

sept. 08 12:40:18.185 TRACE 677 com.simba.googlebigquery.googlebigquery.client.BQClient.insertJob(GenericData{classInfo=[configuration, etag, id, jobReference, kind, selfLink, statistics, status, user_email], {configuration=GenericData{classInfo=[copy, dryRun, extract, jobTimeoutMs, jobType, labels, load, query], {dryRun=true, query=GenericData{classInfo=[allowLargeResults, clustering, connectionProperties, createDisposition, createSession, defaultDataset, destinationEncryptionConfiguration, destinationTable, flattenResults, maximumBillingTier, maximumBytesBilled, parameterMode, preserveNulls, priority, query, queryParameters, rangePartitioning, schemaUpdateOptions, tableDefinitions, timePartitioning, useLegacySql, useQueryCache, userDefinedFunctionResources, writeDisposition], {parameterMode=POSITIONAL, query=
DECLARE foo INT64;
DECLARE bar INT64;

SET foo = (
    SELECT MIN(table_name) FROM (
        SELECT CAST(table_name AS INT64) AS table_name
        FROM `project.dataset.INFORMATION_SCHEMA.TABLES`
        WHERE creation_time < DATE_SUB(CURRENT_TIMESTAMP(), INTERVAL '15' MINUTE)
        ORDER BY 1 DESC
        LIMIT 72
    )
);

SET bar = (
    SELECT CAST(table_name AS INT64)
    FROM `project.dataset.INFORMATION_SCHEMA.TABLES`
    WHERE creation_time < DATE_SUB(CURRENT_TIMESTAMP(), INTERVAL '15' MINUTE)
    ORDER BY 1
    DESC LIMIT 1
);

WITH q1 AS (
     SELECT
         cell,
         toto,
         SUM(cell1) AS cells1,
         SUM(cell2) AS cells2
     FROM `project.dataset.*`
     WHERE CAST(_TABLE_SUFFIX AS INT64) BETWEEN foo AND bar
     GROUP BY row
 ),
 q2 AS (
     SELECT
         cell,
         SUM(cells1) AS total_cells1,
         SUM(cells2) AS total_cells2
     FROM q1
     GROUP BY cell
     HAVING total_cells1 > 5000 AND total_cells2 > 0
 )
 SELECT *
 FROM q2 ...
 ;
  , useLegacySql=false}}}}, jobReference=GenericData{classInfo=[jobId, location, projectId], {jobId=SimbaJDBC_Job_f20c577e-5270-44b2-968a-1d449197650e}}}}, "project"): 

我正在使用 simba 驱动程序 1.2.19.1023 包装在 doobie DriverManager 0.10.0 Scala 版本中:2.13

4

2 回答 2

0

SocketTimeoutException 表示 TCP 连接中断。驱动程序有一个超时重试机制,在抛出错误之前会以指数退避重试5次(导致错误很少发生)。

为了解决这个问题,产品团队最近发布了 BigQueryJDBC 1.2.18.1022,他们在其中添加了额外的处理,因此驱动程序将在可重试错误时正确重试查询,直到达到超时。因此,我建议您更新到 1.12.18 驱动程序版本。

于 2021-09-10T10:32:01.363 回答
0

检查您的连接字符串参数:

jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=...;Timeout=3600;LargeResultDataset=_simba_jdbc;

我修复了在 linux 服务器和 NiFi 1.14 中运行的问题。但是我在带有 NiFi 1.15 的 Windows 10 本地实例下面临同样的问题,无论是在 SimbaJDBCDriverforGoogleBigQuery42_1.2.21.1025 上。

于 2021-12-20T20:34:11.627 回答