0

我在我的代码中运行 goroutines。比如说,如果我将线程设置为 50,它将不会运行前 49 个请求,但它将运行第 50 个请求并继续其余的请求。我不确定如何描述我遇到的问题,并且没有错误。这仅在使用 fasthttp 时发生,并且适用于 net/http。可能是fasthttp的问题吗?(这不是我的全部代码,只是我认为问题发生的区域)

    threads := 50
    var Lock sync.Mutex
    semaphore := make(chan bool, threads)

    for len(userArray) != 0 {
        semaphore <- true
        go func() {
            Lock.Lock()
            var values []byte
            defer func() { <-semaphore }()
            fmt.Println(len(userArray))
            if len(userArray) == 0 {
                return
            }
            values, _ = json.Marshal(userArray[0])
            currentArray := userArray[0]
            userArray = userArray[1:]
            client := &fasthttp.Client{
                Dial: fasthttpproxy.FasthttpHTTPDialerTimeout(proxy, time.Second * 5),
            }
            time.Sleep(1 * time.Nanosecond)
            Lock.Unlock()

这是我得到的输出(数字是剩下的请求数量)

200
199
198
197
196
195
194
193
192
191
190
189
188
187
186
185
184
183
182
181
180
179
178
177
176
175
174
173
172
171
170
169
168
167
166
165
164
163
162
161
160
159
158
157
156
155
154
153
152
151
(10 lines of output from req 151)
150
(10 lines of output from req 150)
cont.

抱歉,如果我的解释令人困惑,我真的不知道如何解释这个错误

4

1 回答 1

0

我认为问题在于变量的范围。为了表示排队,我有一个并行工作线程池,它们都从同一个通道中提取,然后使用等待组等待它们。确切的代码可能需要调整,因为我手头没有 go 编译器,但想法是这样的:

    threads := 50
    queueSize := 100 // trying to add more into the queue will blocke

    semaphore := make(chan bool, threads)
    jobQueue := make(chan MyItemType, queueSize)

    var wg sync.WaitGroup

    func processQueue(jobQueue <- chan MyItemType) {
      defer wg.Done()
      for item := range jobQueue {
         values, _ = json.Marshal(item) // doesn't seem to be used?
         client := &fasthttp.Client{
            Dial: fasthttpproxy.FasthttpHTTPDialerTimeout(proxy, time.Second * 5),
         }
      }
    }

    for i := 0; i < threads; i++ {
      wg.Add(1)
      go processQueue(jobQueue)
    }
  
    close(jobQueue)
    wg.Wait()

现在您可以将项目放入jobQueue其中,它们将由这些线程之一处理。

于 2021-05-22T21:41:30.003 回答