1

我正在编写一段 Kotlin 代码,该代码使用反应器框架来实现对 Gitlab 提交 API 的调用。提交 API 是分页的。我正在努力检索的函数“在”两个指定的提交哈希之间检索提交。

只要它实际上可以检索任何提交,该函数就可以正常工作,但如果找不到结果则失败。然后它失败了 java.lang.RuntimeException: Reached end of commit log

我尝试用 替换该行.switchIfEmpty(Flux.error(RuntimeException("Reached end of commit log."))).switchIfEmpty(Flux.empty())但这会产生无限循环。

我不太了解多个通量的嵌套,这让我很难调试。我非常感谢有关如何解决此问题的任何提示。

fun getCommits(fromCommit: String, toCommit: String): Iterable<Commit> {
    val commits = Flux.concat(Flux.generate<Flux<GitLabCommit>, Int>({ 1 }) { state, sink ->
        val page = client.get()
                .uri("/projects/{name}/repository/commits?page=$state&per_page=100")
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToFlux<GitLabCommit>()
                .doOnError({
                    LOGGER.warn("Could not retrieve commits for project '$name': ${it.message}")
                    sink.next(Flux.just(GitLabCommit("xxxxx", "Could not retrieve all commits for project '$name'")))
                    sink.complete()
                })
                .onErrorReturn(GitLabCommit("xxxxx", "Could not retrieve all commits for project '$name'"))
                .switchIfEmpty(Flux.error(RuntimeException("Reached end of commit log.")))

        sink.next(page)
        return@generate state + 1
    })


    return commits
            // The Gitlab API returns commits from newest to oldest
            .skipWhile { !it.id.startsWith(toCommit) } //inclusive
            .takeWhile { !it.id.startsWith(fromCommit) } //exclusive
            .map { Commit(it.title, listOf(it.id), name) }
            .toIterable()
}

关于上述代码的其他提示:

这是 GitlabCommit 类:

@JsonIgnoreProperties(ignoreUnknown = true)
private data class GitLabCommit(val id: String, val title: String)

是的client一个正确初始化的实例org.springframework.web.reactive.function.client.WebClient.Builder,它有助于令牌处理和 URL 编码。

4

1 回答 1

1

异常的来源:您的外部使用commits没有onErrorResume子句。无限循环的来源:在内部没有错误的情况下generate,它将继续循环,增加state和连接一个空结果。

我会采用与生成不同的方法:range + concatMap + takeWhile。

像这样的东西:

fun getCommits(fromCommit: String, toCommit: String): Iterable<String> =
    Flux.range(1, 1000) //tune the second parameter
            .concatMap { page -> client
                .get()
                .uri("/projects/{name}/repository/commits?page=$page&per_page=100")
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToFlux<GitLabCommit>()
                .doOnError({ LOGGER.warn("Could not retrieve commits for project '$name': ${it.message}")})
                .onErrorReturn(GitLabCommit("xxxxx", "Could not retrieve all commits for project '$name'"))
                .defaultIfEmpty(GitLabCommit("xxxxx", "Reached end of commit log."))
                // ^ we now have marker commits in case of error or end of log
            }
            //use the marker commits to short-circuit the range
            .takeWhile { !it.id.equals("xxxxx") } //end condition to cancel the range loop, doesn't include the xxxxx commits
            // The Gitlab API returns commits from newest to oldest
            .skipWhile { !it.id.startsWith(toCommit) } //inclusive
            .takeWhile { !it.id.startsWith(fromCommit) } //exclusive
            .map { Commit(it.title, listOf(it.id), name) }
            .toIterable()

我们从最多 1000 个页面的范围开始,为每个页面发出请求。

这可能会导致无关的请求(例如,如果到第 100 页我们已经收到空响应,则不会有更多页面)。你几乎解决了这个问题:

我们可以同时使用onErrorReturndefaultIfEmpty创建一个标记提交,然后可以concatMaptakeWhile.

一旦附加takeWhile看到第一次xxxxx提交,它将触发将传播到 concatMap/range 的取消,导致范围停止发出页码,从而停止发出无关请求。

然后你有你的业务逻辑skipWhiletakeWhile并且map

于 2018-02-27T14:49:25.437 回答