Конкуренция в Go

Теория: Синхронизация с каналами

Когда каналы уже освоены как средство передачи данных, открывается их второе назначение. Канал в Go — это одновременно очередь значений, средство коммуникации горутин и механизм их синхронизации.

Каждая операция отправки или приема — это точка встречи: две независимые горутины выравниваются во времени именно в этот момент. На этом свойстве строится синхронизация — вместо флагов и мьютексов программа просто ждет нужное сообщение или событие.

Каналы как средство синхронизации

Иногда сами данные не важны. Значение может выступать лишь маркером события: «задача началась», «задача завершилась», «нужно остановиться». В таких случаях канал превращается в сигнальный. Одна горутина отправляет в него факт, другая ждет этот факт и продолжает работу только после его получения.

package main

import (
	"fmt"
	"time"
)

func worker(done chan struct{}) {
	fmt.Println("работа началась")
	time.Sleep(200 * time.Millisecond)
	fmt.Println("работа завершена")

	close(done) // одноразовый сигнал завершения
}

func main() {
	done := make(chan struct{}) // сигнальный канал

	go worker(done)

	<-done // ждем закрытия канала
	fmt.Println("основная горутина продолжает выполнение")
}

Здесь используется chan struct{}, потому что значимость имеет только сам факт отправки. Пустая структура не занимает памяти, а тип канала четко показывает, что через него не передаются данные, а проходят сигналы. Основная горутина блокируется на чтении из done, пока worker не отправит сообщение о завершении.

Ожидание завершения группы горутин

Когда параллельных задач несколько, синхронизация превращается в ожидание не одного, а целого набора сигналов. Самый простой путь — имитировать поведение sync.WaitGroup через общий канал: каждая горутина отправляет уведомление, а основная горутина столько же раз читает из канала.

package main

import (
	"fmt"
	"time"
)

func main() {
	done := make(chan struct{}) // общий сигнальный канал

	for id := 0; id < 3; id++ {
		go func(id int) {
			time.Sleep(time.Duration(id) * 100 * time.Millisecond)
			fmt.Println("worker", id, "готов")

			done <- struct{}{} // сигнал о завершении
		}(id)
	}

	for i := 0; i < 3; i++ {
		<-done // ожидание очередного сигнала
	}

	fmt.Println("все горутины завершены")
}

Канал действительно работает как точка синхронизации: каждая горутина в своем темпе доходит до отправки сигнала, а основная по очереди их получает. Формально поведение корректное.

Но в реальных программах такой паттерн почти всегда считается неудачным. Он хрупкий: количество ожидаемых сигналов нужно знать заранее, и любое изменение числа горутин ломает протокол. Если задач становится больше или меньше, цикл чтения придется править вручную. Кроме того, канал тут не несет полезных данных — он используется как костыль для ожидания завершения.

Для таких сценариев в Go уже есть специально предназначенный инструмент — sync.WaitGroup. Он явно задает количество работ и гарантирует корректное ожидание, не требуя ручного подсчета сигналов:

var wg sync.WaitGroup
wg.Add(3)

for id := 0; id < 3; id++ {
	go func(id int) {
		defer wg.Done()
		// работа...
	}(id)
}

wg.Wait()
fmt.Println("все горутины завершили работу")

Такой код проще, надежнее и масштабируется лучше. Каналы уместны там, где есть поток данных или протокол обмена сообщениям, а ожидание завершения группы горутин — задача именно для WaitGroup(), а не для сигналов через канал.

Пул воркеров на каналах

Каналы особенно ясно проявляют себя в шаблоне worker pool. Одна часть программы формирует задачи и складывает их в канал jobs. Несколько горутин воркеров читают из этого канала, обрабатывают данные и отправляют результаты в другой канал. Закрытие jobs служит сигналом, что новых задач не будет, и воркеры могут завершать цикл обработки.

package main

import (
	"fmt"
	"time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
	for job := range jobs { // чтение задач до закрытия канала
		fmt.Printf("worker %d выполняет задачу %d\n", id, job)
		time.Sleep(100 * time.Millisecond) // имитация обработки
		results <- job * 2                 // отправка результата
	}
	// выход из цикла означает корректное завершение воркера
}

func main() {
	jobs := make(chan int, 5)    // канал задач с буфером
	results := make(chan int, 5) // канал результатов

	for w := 1; w <= 3; w++ {
		go worker(w, jobs, results) // запуск трех воркеров
	}

	for j := 1; j <= 5; j++ {
		jobs <- j // отправка задач
	}
	close(jobs) // сигнал воркерам: задач больше нет

	for i := 0; i < 5; i++ {
		fmt.Println("результат:", <-results) // получение всех результатов
	}
}

В этой схеме каналы выполняют сразу несколько функций. jobs распределяет работу между воркерами, results собирает результаты, а закрытие jobs завершает внутренний цикл for range во всех рабочих горутинах. Порядок получения результатов не гарантируется, но гарантируется отсутствие взаимных блокировок: каждый воркер завершит работу, как только задач в jobs не останется.

Fan-out и fan-in на каналах

Fan-out / fan-in — это про форму потока данных: одна линия распадается на несколько, потом несколько сходятся обратно в одну.

Worker pool — это частный случай такого fan-out: у нас есть очередь задач и несколько одинаковых воркеров, которые ее разгребают. Разница больше в акценте: fan-out / fan-in описывает движение данных, worker pool — организацию «бригад» обработчиков и ограничение конкуренции.

По коду они выглядят очень похоже, так что в примерах важно проговорить, что мы показываем именно форму потока, а не “идеальный пул”.

Fan-out: один поток задач, несколько воркеров

Оставим твой пример, но чуть подчистим комментарии, чтобы было видно, что это и fan-out, и фактически worker pool:

package main

import (
	"fmt"
	"time"
)

func worker(id int, in <-chan int, out chan<- int) {
	for n := range in { // читаем задачи до закрытия in
		time.Sleep(50 * time.Millisecond) // имитация работы
		out <- n * n                      // отправляем результат
	}
}

func main() {
	in := make(chan int)
	out := make(chan int)

	// Fan-out: один поток задач, несколько воркеров.
	for id := 0; id < 3; id++ {
		go worker(id, in, out)
	}

	// Отдельная горутина — источник задач.
	go func() {
		for n := 1; n <= 5; n++ {
			in <- n
		}
		close(in) // сигнал: задач больше не будет
	}()

	for i := 0; i < 5; i++ {
		result := <-out
		fmt.Println("результат:", result)
	}
}

Здесь входной поток чисел расширяется на три воркера — это и есть fan-out. По сути, тот же worker pool, просто без ограничителя «максимум N задач одновременно» и без отдельной логики остановки воркеров, кроме закрытия канала.

Fan-in: несколько источников, один сборщик (без panic)

Каналы в Go удобно использовать не только как очередь значений. Они помогают строить параллельные схемы, где поток данных расширяется на несколько обработчиков или, наоборот, собирается обратно в одну линию. Эти две формы называются fan-out и fan-in.

Канал остается средством передачи данных и одновременно точкой синхронизации: операции отправки и чтения выстраивают горутины во времени и задают порядок работы.

Fan-out: один поток разделяется на несколько воркеров

В fan-out одна горутина отправляет значения в канал, а несколько рабочих горутин обрабатывают их параллельно. Это похоже на worker pool, но цель здесь — показать именно расширение потока данных.

package main

import (
	"fmt"
	"time"
)

func worker(id int, in <-chan int, out chan<- int) {
	for n := range in { // читаем задачи до закрытия канала
		time.Sleep(50 * time.Millisecond) // имитация вычислений
		out <- n * n                      // отправляем результат
	}
}

func main() {
	in := make(chan int)
	out := make(chan int)

	// Несколько воркеров читают один входной поток
	for id := 0; id < 3; id++ {
		go worker(id, in, out)
	}

	// Источник данных
	go func() {
		for n := 1; n <= 5; n++ {
			in <- n
		}
		close(in) // сигнал всем воркерам: задач больше нет
	}()

	// Получение пяти результатов (порядок не гарантируется)
	for i := 0; i < 5; i++ {
		fmt.Println("результат:", <-out)
	}
}

Закрытие in завершает работу всех воркеров. Каждый из них дочитывает канал до конца и завершает свою горутину без блокировок.

Fan-in: несколько источников объединяются в один поток

Fan-in — обратная схема: несколько горутин отправляют данные в один общий канал, а одна горутина последовательно читает все сообщения.

Главное отличие от предыдущего примера — нельзя закрывать канал вслепую. Закрывать его должен тот, кто точно знает, что все отправители закончили работу. Поэтому используется WaitGroup().

package main

import (
	"fmt"
	"sync"
)

func producer(id int, out chan<- string, wg *sync.WaitGroup) {
	defer wg.Done()

	for i := 0; i < 2; i++ {
		out <- fmt.Sprintf("от %d: сообщение %d", id, i)
	}
}

func main() {
	out := make(chan string)
	var wg sync.WaitGroup

	// Три производителя пишут в один канал
	for id := 0; id < 3; id++ {
		wg.Add(1)
		go producer(id, out, &wg)
	}

	// Закрытие после завершения всех producers
	go func() {
		wg.Wait()
		close(out)
	}()

	// Единый поток сообщений
	for msg := range out {
		fmt.Println(msg)
	}
}

Такой вариант безопасен: канал закрывается только после того, как все отправители отработали.

Чем fan-out отличается от worker pool

Разница в акценте:

  • fan-out — про форму потока: одна линия превращается в несколько параллельных;
  • worker pool — про управление нагрузкой и числом активных обработчиков.

В простых примерах они выглядят одинаково, но в fan-out важен именно поток данных, а не ограничение ресурсов.

Fan-out и fan-in — это базовые схемы построения параллельных конвейеров. Закрытие входного канала корректно завершает работу всех воркеров, а закрытие выходного канала через WaitGroup() позволяет безопасно получить объединенный поток сообщений от нескольких источников.

Подведем итоги

Синхронизация через каналы строится не вокруг флагов, счетчиков и ручных блокировок, а вокруг движения данных и сигналов. Канал становится местом встречи горутин, а операция <- — естественной точкой ожидания. Через сигнальные каналы оформляется завершение задач, через каналы задач и результатов — работа пулов воркеров, через общие каналы — схемы fan-out и fan-in. В результате координация превращается в управление потоками значений, а порядок действий задается протоколом взаимодействия, а не набором низкоуровневых примитивов.

Рекомендуемые программы

+7 800 100 22 47

бесплатно по РФ

+7 495 085 21 62

бесплатно по Москве

108813 г. Москва, вн.тер.г. поселение Московский,
г. Московский, ул. Солнечная, д. 3А, стр. 1, помещ. 20Б/3
ОГРН 1217300010476
ИНН 7325174845