Конкуренция в Go

Теория: Каналы: расширенные возможности

После знакомства с базовой работой каналов становится заметно, что простой небуферизованный канал подходит не для всех сценариев. Иногда источник данных работает быстрее потребителя, иногда несколько горутин должны по одному сигналу понять, что поток завершился. Расширенные возможности каналов решают эти задачи: буфер сглаживает разницу скоростей, закрытие служит явным сигналом завершения, а двойное чтение v, ok фиксирует состояние потока.

Буферизованные каналы

Небуферизованный канал связывает отправителя и получателя напрямую. Отправка блокируется до чтения, чтение блокируется до отправки. В некоторых задачах такая жесткая сцепка мешает. Если производитель выдает значения быстрее, чем потребитель успевает обрабатывать, ему приходится постоянно ждать. Буфер добавляет между ними короткую очередь и позволяет производителю немного опережать.

package main

import "fmt"

func main() {
	ch := make(chan string, 2) // канал со встроенным буфером на два элемента

	ch <- "первое" // запись не блокируется
	ch <- "второе" // запись не блокируется, буфер заполнен

	fmt.Println(<-ch) // чтение первого значения
	fmt.Println(<-ch) // чтение второго значения
}

Здесь обе записи проходят сразу, потому что буфер вмещает два значения. Третья запись заблокировала бы горутину до тех пор, пока хотя бы одно значение не будет прочитано и в буфере не освободится место.

Буферизованный канал особенно полезен, когда один этап конвейера работает быстрее следующего. Производитель не простаивает после каждого элемента, а складывает их в очередь, пока потребитель обрабатывает предыдущие.

package main

import "fmt"

func producer(ch chan<- int) {
	for i := 1; i <= 5; i++ {
		fmt.Println("отправка", i)
		ch <- i // блокировка только при полностью заполненном буфере
	}
	close(ch) // сигнал о завершении потока
}

func consumer(ch <-chan int) {
	for v := range ch {
		fmt.Println("получено", v)
	}
}

func main() {
	ch := make(chan int, 2) // буфер на два элемента
	go producer(ch)
	consumer(ch)
}

Буфер на два элемента позволяет производителю немного опережать потребителя, но не дает очереди разрастись бесконтрольно. Состояние буфера можно посмотреть через len(ch) и cap(ch): первое показывает количество элементов внутри, второе — максимальную емкость. Эти вызовы не заменяют синхронизацию, но помогают диагностировать поведение под нагрузкой.

Размер буфера всегда остается компромиссом. Слишком маленький не решит проблему блокировок, слишком большой превратит канал в накопитель, который держит в памяти лишние данные. Канал остается точкой синхронизации: когда буфер пуст или переполнен, горутина снова ждет противоположную сторону.

Закрытие каналов как сигнал завершения

У любого потока данных есть начало и конец. Для каналов конец выражается явным закрытием. Закрытие канала сообщает всем получателям, что новых значений не будет. Сам канал при этом не исчезает: буфер дочитывается до конца, а затем чтение начинает возвращать специальное состояние «канал исчерпан».

package main

import "fmt"

func main() {
	ch := make(chan int)

	go func() {
		for i := 1; i <= 3; i++ {
			ch <- i // отправка очередного значения
		}
		close(ch) // больше значений не будет
	}()

	for v := range ch { // чтение до закрытия канала
		fmt.Println("получено:", v)
	}
	fmt.Println("канал исчерпан")
}
// получено: 1
// получено: 2
// получено: 3
// канал исчерпан

Закрытие инициирует именно отправляющая сторона, потому что только она знает, что поток значений завершен. Попытка закрыть канал с принимающей стороны нарушает протокол и приводит к панике. Запись в уже закрытый канал также вызывает панику.

Чтение из закрытого канала

После закрытия канала чтение остается безопасной операцией. Сначала выдаются все значения, которые успели попасть в буфер до закрытия. Затем канал начинает возвращать нулевые значения и признак того, что поток закончился.

package main

import "fmt"

func main() {
	ch := make(chan int, 3)

	ch <- 10
	ch <- 20
	ch <- 30
	close(ch) // канал закрыт, но значения еще внутри

	for range 4 {
		v, ok := <-ch // чтение с проверкой состояния канала
		fmt.Println(v, ok)
	}
}
// 10 true
// 20 true
// 30 true
// 0 false

Три первых чтения вернут реальные значения и ok == true. Четвертое чтение вернет нулевое значение типа int и ok == false. Этот флаг и используется как универсальный признак окончания потока.

Цикл for range скрывает двойное чтение. Он автоматически вызывает <-ch, пока канал не закроется и не исчерпает очередь. После этого цикл завершается без блокировки.

Проверка через v, ok := <-ch

Двойное присваивание при чтении из канала фиксирует одновременно и значение, и состояние потока. Переменная v получает данные, а ok показывает, открыт канал или уже завершен. Такой прием удобен, когда нужно явно управлять завершением цикла.

package main

import "fmt"

func main() {
	ch := make(chan int, 2)

	ch <- 1
	ch <- 2
	close(ch) // после этого новых значений не будет

	for {
		v, ok := <-ch // попытка чтения значения
		if !ok {      // канал закрыт и буфер пуст
			fmt.Println("канал закрыт, выходим")
			break
		}
		fmt.Println("получено:", v)
	}
}
// получено: 1
// получено: 2
// канал закрыт, выходим

Пока в канале есть данные, ok остается true. Как только буфер опустеет, ok станет false, и цикл корректно завершится. Нулевое значение в v при этом не используется как полезные данные, служит лишь формальной частью контракта.

В воркер-паттернах такой прием делает завершение рабочих горутин предсказуемым.

func worker(id int, ch <-chan string) {
	for {
		msg, ok := <-ch // чтение с проверкой завершения
		if !ok {
			fmt.Println("worker", id, "завершает работу")
			return
		}
		fmt.Println("worker", id, "получил:", msg)
	}
}

Здесь worker() читает сообщения до тех пор, пока канал остается открытым. Закрытие канала превращается в команду завершения для всех рабочих, которые слушают этот канал.

Когда закрывать канал, а когда нет

Закрытие канала — это часть протокола. Канал не нужно закрывать «по привычке», его закрывают только тогда, когда получателям важно явно узнать: «поток данных точно закончился».

Хороший пример — один источник, несколько потребителей. Продюсер генерирует конечную последовательность значений и после этого закрывает канал. Все потребители читают один и тот же канал через range и по закрытию аккуратно завершают работу:

func producer(ch chan<- int) {
	for range 5 {
		ch <- i
	}
	close(ch) // сигнал: больше значений не будет
}

func consumer(id int, ch <-chan int) {
	for v := range ch {
		fmt.Println("consumer", id, "получил", v)
	}
}

Здесь close() — осмысленный шаг: он завершает протокол. Никто не зависнет в ожидании новых данных.

В других ситуациях закрытие не нужно. Если канал используется как внутренний механизм между двумя горутинами в рамках короткой операции — запрос–ответ, обмен одной–двумя структурами — достаточно просто перестать отправлять значения. Канал будет собран GC, когда на него перестанут ссылаться. Лишний close() только добавляет точку, где можно по ошибке получить send on closed channel.

Особенно аккуратным нужно быть с несколькими отправителями. Когда в канал пишут много горутин, ни одна из них не знает, что поток уже «окончательно» завершился. Попытка закрыть такой канал легко заканчивается паникой. В такой схеме обычно используют контекст или отдельный сигнальный канал как односторонний «стоп», а сами каналы данных не трогают.

Пример с done тоже важно подавать правильно. Плохой паттерн — плодить отдельные горутины, которые только делают <-done и что-то печатают. Ради такой мелочи достаточно прочитать из канала в текущей горутине или использовать context.Context(). Канал done имеет смысл там, где уже есть долго живущие горутины, которые делают select и должны уметь корректно завершиться по внешнему сигналу, а не ради одного короткого диалога.

Закрытие канала — это элемент протокола, а не средство «освободить ресурс». Если получателям нужно четко узнать, что поток закончился, канал закрывают отправители. Если такого требования нет, достаточно просто перестать отправлять данные и не усложнять себе жизнь лишним close().

Рекомендуемые программы

+7 800 100 22 47

бесплатно по РФ

+7 495 085 21 62

бесплатно по Москве

108813 г. Москва, вн.тер.г. поселение Московский,
г. Московский, ул. Солнечная, д. 3А, стр. 1, помещ. 20Б/3
ОГРН 1217300010476
ИНН 7325174845