Конкуренция в Go

Теория: Паттерны конкуренции в Go

Когда есть горутины, каналы и контекст, следующим шагом становятся устойчивые схемы их использования — паттерны. Они помогают не придумывать заново, как запускать десятки задач, собирать результаты, строить конвейеры и ограничивать параллелизм. Ниже — четыре базовых паттерна, на которых держится большинство конкурентных программ в Go.

Worker Pool

Пул воркеров нужен, когда есть множество однотипных задач и ограниченное число исполнителей. Вместо того чтобы запускать горутину на каждую задачу, создается фиксированное количество рабочих горутин, которые читают задачи из общего канала, обрабатывают их и складывают результаты в другой канал.

Главная идея: один канал — очередь задач, N воркеров — параллельные исполнители, еще один канал — результаты.

package main

import (
	"fmt"
	"sync"
	"time"
)

type Job struct {
	ID  int
	Val int
}

type Result struct {
	ID     int
	Output int
	Err    error
}

// worker читает задачи из jobs и пишет результаты в results
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
	defer wg.Done()

	for job := range jobs { // цикл до закрытия канала jobs
		time.Sleep(50 * time.Millisecond) // имитация работы
		results <- Result{
			ID:     job.ID,
			Output: job.Val * 2,
			Err:    nil,
		}
	}
}

func main() {
	const workers = 4

	jobs := make(chan Job, 16)       // очередь задач
	results := make(chan Result, 16) // очередь результатов

	var wg sync.WaitGroup
	wg.Add(workers)

	// запуск фиксированного числа воркеров
	for id := 0; id < workers; id++ {
		go worker(id, jobs, results, &wg)
	}

	// постановка задач
	go func() {
		for i := 1; i <= 20; i++ {
			jobs <- Job{ID: i, Val: i}
		}
		close(jobs) // сигнал: задач больше не будет
	}()

	// отдельная горутина закрывает канал результатов после всех воркеров
	go func() {
		wg.Wait()      // ждем завершения всех worker
		close(results) // после этого писать в results некому
	}()

	// сбор результатов
	for res := range results {
		fmt.Printf("job=%02d out=%d err=%v\n", res.ID, res.Output, res.Err)
	}
}

Ключевые моменты протокола:

  • jobs заполняет и закрывает только постановщик задач; воркеры его не трогают.
  • results закрывается только после wg.Wait(), то есть когда все воркеры гарантированно закончили запись.
  • ни одна горутина не зависает в ожидании данных и не пишет в закрытый канал.

Если нужен сохраненный порядок результатов, добавляется индекс задачи и буферизация на стороне сборщика (как в примере выше в комментариях к пулу).

Fan-out и Fan-in

Эти два паттерна описывают, как распределять и снова объединять потоки данных между горутинами. На практике они часто используются вместе.

Fan-out: разделить поток на несколько исполнителей

Одна очередь задач — несколько горутин, которые читают из нее и выполняют работу независимо. Это похоже на worker pool, но фокус именно на распределении потока.

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func worker(id int, jobs <-chan int, results chan<- string) {
	for job := range jobs {
		time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
		results <- fmt.Sprintf("worker %d обработал %d", id, job)
	}
}

func main() {
	rand.Seed(time.Now().UnixNano())

	jobs := make(chan int, 10)
	results := make(chan string, 10)

	// fan-out: несколько воркеров читают один канал
	for id := 1; id <= 3; id++ {
		go worker(id, jobs, results)
	}

	for j := 1; j <= 9; j++ {
		jobs <- j
	}
	close(jobs) // сигнал: задач больше не будет

	for i := 0; i < 9; i++ {
		fmt.Println(<-results)
	}
}

Задачи обрабатываются ровно по одной на воркера, порядок не гарантируется — исполнение зависит от времени работы каждой горутины.

Fan-in: собрать несколько потоков в один

Обратная схема: несколько источников данных пишут в один канал, а одна горутина читает общий поток.

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func producer(id int, out chan<- string) {
	for i := 0; i < 3; i++ {
		time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
		out <- fmt.Sprintf("от %d: сообщение %d", id, i)
	}
}

func main() {
	rand.Seed(time.Now().UnixNano())

	out := make(chan string)

	// несколько производителей пишут в один канал (fan-in)
	for id := 1; id <= 3; id++ {
		go producer(id, out)
	}

	// отдельная горутина закрывает канал после задержки
	go func() {
		time.Sleep(700 * time.Millisecond)
		close(out)
	}()

	for msg := range out {
		fmt.Println(msg)
	}
}

Гарантия здесь одна: доступ к каналу сериализован, можно безопасно писать из нескольких горутин без мьютексов.

В более сложных конструкциях fan-out и fan-in объединяются: входной поток распределяется по воркерам и затем собирается обратно в общий канал.

Pipeline

Пайплайн — это цепочка стадий обработки данных. Каждая стадия — отдельная горутина с двумя каналами: входным и выходным. Данные поступают на вход первой стадии, постепенно проходят все шаги обработки и выходят в виде итогового потока.

Главная идея: одна стадия — одна ответственность, один входной канал — один выходной.

package main

import "fmt"

// gen — источник данных
func gen(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out) // закрываем канал после генерации всех значений
		for _, n := range nums {
			out <- n
		}
	}()
	return out
}

// sq — стадия, возводит числа в квадрат
func sq(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out) // закрываем выход, когда вход исчерпался
		for n := range in {
			out <- n * n
		}
	}()
	return out
}

// dbl — стадия, удваивает число
func dbl(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			out <- n * 2
		}
	}()
	return out
}

func main() {
	nums := gen(1, 2, 3, 4)
	squares := sq(nums)
	doubles := dbl(squares)

	for v := range doubles { // читаем из последней стадии до закрытия канала
		fmt.Println(v)
	}
}

Как только первая стадия отправляет первое значение, следующие начинают работу, не дожидаясь завершения всей генерации. Это и есть конвейер: разные этапы обработки идут параллельно.

В реальном коде почти всегда добавляется context.Context, чтобы уметь прерывать конвейер по сигналу — например, по таймауту запроса.

package main

import (
	"context"
	"fmt"
	"time"
)

// стадия с поддержкой контекста
func sqCtx(ctx context.Context, in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			select {
			case <-ctx.Done(): // отмена конвейера
				return
			case out <- n * n:
			}
		}
	}()
	return out
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	in := gen(1, 2, 3, 4, 5)
	sqStream := sqCtx(ctx, in)

	for v := range sqStream {
		fmt.Println(v)
		if v > 10 {
			cancel() // прерываем конвейер при условии
		}
	}
}

Шаблон повторяется на каждой стадии: чтение из входного канала, select с ctx.Done() и запись в выходной канал. Так конвейер всегда может аккуратно остановиться, не оставляя «висящих» горутин.

Правило для пайплайна то же, что и для других паттернов:

  • канал закрывает только тот, кто в него пишет;
  • каждая стадия закрывает свой out после того, как in исчерпан или контекст отменен.

Ограничение числа параллельных задач (semaphore pattern)

Иногда горутин становится слишком много — они начинают перегружать внешнее API, файловую систему, БД или просто CPU. Полностью параллельность отключать не хочется, но нужно ограничить количество задач, которые могут выполняться одновременно. Для этого используют семафорный паттерн: горутины запускаются свободно, но доступ «внутрь критической секции» получает только ограниченное количество исполнителей.

Семафорный паттерн решает это одной конструкцией: канал с буфером фиксированного размера. Каждая горутина «входит» в семафор, записывая значение в канал, и «выходит», читая из него. Когда буфер заполнен, новые горутины блокируются, пока кто-то не освободит слот.

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func main() {
	rand.Seed(time.Now().UnixNano())

	const limit = 3                   // максимум параллельных задач
	sem := make(chan struct{}, limit) // семафор на 3 слота

	var wg sync.WaitGroup

	for id := 1; id <= 10; id++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()

			sem <- struct{}{}        // попытка занять слот; блокируется, если лимит достигнут
			defer func() { <-sem }() // освобождение слота по завершении

			fmt.Printf("старт задачи %d\n", id)
			time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
			fmt.Printf("конец задачи %d\n", id)
		}(id)
	}

	wg.Wait()
	fmt.Println("все задачи выполнены")
}

В любой момент времени одновременно работают не более limit горутин — остальные просто ждут свободного слота.

Если задачи могут отменяться, семафор часто комбинируют с контекстом:

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

func worker(ctx context.Context, id int, sem chan struct{}, wg *sync.WaitGroup) {
	defer wg.Done()

	select {
	case sem <- struct{}{}: // занять слот
		defer func() { <-sem }() // освободить слот
	case <-ctx.Done(): // если слота так и не дождались
		fmt.Println("отмена до старта:", id)
		return
	}

	// основная работа, тоже с учетом ctx
	select {
	case <-time.After(300 * time.Millisecond):
		fmt.Println("завершена задача", id)
	case <-ctx.Done():
		fmt.Println("остановлена во время работы:", id)
	}
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	sem := make(chan struct{}, 2) // максимум две параллельные задачи
	var wg sync.WaitGroup

	for id := 1; id <= 5; id++ {
		wg.Add(1)
		go worker(ctx, id, sem, &wg)
	}

	wg.Wait()
	fmt.Println("main завершен")
}

Такое ограничение полезно при работе с внешними сервисами, лимитами на соединения, тяжелыми вычислениями и любыми местами, где «чем больше горутин, тем хуже».

Подведем итоги:

Worker Pool — фиксированное число воркеров обрабатывает очередь задач.

  • Fan-out / Fan-in — разделяют входной поток на несколько исполнителей и объединяют результаты обратно.
  • Pipeline — последовательность стадий обработки, каждая в своей горутине, связанная каналами.
  • Семафорный паттерн — ограничивает число одновременно работающих горутин через буферизованный канал.

Все эти схемы строятся на одних и тех же примитивах — горутинах, каналах и контексте. Разница только в том, как именно соединяются каналы, кто их закрывает и откуда приходят сигналы отмены. Когда протоколы взаимодействия отточены, конкурентный код в Go становится предсказуемым и хорошо управляемым даже под высокой нагрузкой.

Рекомендуемые программы

+7 800 100 22 47

бесплатно по РФ

+7 495 085 21 62

бесплатно по Москве

108813 г. Москва, вн.тер.г. поселение Московский,
г. Московский, ул. Солнечная, д. 3А, стр. 1, помещ. 20Б/3
ОГРН 1217300010476
ИНН 7325174845