Конкуренция в Go

Теория: Ошибки и конкуренция

Когда все выполняется в одном потоке, ошибок немного: функция вернула error, вызывающий ее код посмотрел и решил, что делать дальше. С горутинами такой роскоши нет. Ошибка может случиться «где-то в стороне», в отдельной горутине, и если заранее не продумать протокол, она просто потеряется в логах.

В конкурентном коде всегда нужен явный путь, по которому ошибки возвращаются к «владельцу» операции. Для этого используются каналы (chan error) и контекст (context.Context).

Передача ошибок из горутин

Базовый принцип простой: каждая горутина либо тихо завершилась успешно, либо отправила свою ошибку туда, где ее ждут.

Самый прямой способ — завести отдельный канал для ошибок и дождаться завершения всех горутин через sync.WaitGroup.

package main

import (
	"errors"
	"fmt"
	"sync"
)

// worker делает работу и возвращает ошибку через обычный return
func worker(id int) error {
	if id%3 == 0 {
		// имитация ошибки для некоторых задач
		return errors.New("что-то пошло не так")
	}
	return nil
}

func main() {
	const tasks = 5

	// буфер равен количеству задач, чтобы отправители не блокировались на errs <- err
	errs := make(chan error, tasks)

	var wg sync.WaitGroup

	for i := 1; i <= tasks; i++ {
		wg.Add(1)

		// для читаемости копируем i в id внутри замыкания
		go func(id int) {
			defer wg.Done()

			if err := worker(id); err != nil {
				// оборачиваем, чтобы не потерять контекст
				errs <- fmt.Errorf("worker %d: %w", id, err)
			}
		}(i)
	}

	// отдельная горутина ждет всех worker и закрывает канал ошибок
	go func() {
		wg.Wait()
		close(errs) // после этого никто не пишет в errs
	}()

	var hadErrors bool

	// читаем ошибки, пока канал не закроется
	for err := range errs {
		hadErrors = true
		fmt.Println("получена ошибка:", err)
	}

	if !hadErrors {
		fmt.Println("все задачи выполнены без ошибок")
	}
}

Что здесь важно:

  • worker возвращает обычный error — это локальное решение внутри горутины.
  • Владелец параллельной работы создает errs и отвечает за закрытие канала.
  • Все горутины пишут только в errs, но не закрывают его — иначе будет panic: close of closed channel.
  • После wg.Wait() гарантировано, что новых записей не будет, и канал можно закрывать безопасно.

Этот паттерн подходит, когда нужно собрать все ошибки и потом решить, как с ними жить (логировать, агрегировать, считать статистику).

Иногда вместе с ошибкой нужно вернуть и результат. Тогда вместо отдельного chan error удобнее использовать один канал структур с полем Err.

type Result struct {
	ID  int
	Val int
	Err error
}

func worker(id int, out chan<- Result) {
	// имитация: часть задач падает с ошибкой
	if id%4 == 0 {
		out <- Result{ID: id, Err: fmt.Errorf("bad id %d", id)}
		return
	}

	out <- Result{ID: id, Val: id * id, Err: nil}
}

func main() {
	out := make(chan Result, 8)
	var wg sync.WaitGroup

	for id := 1; id <= 8; id++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			worker(id, out)
		}(id)
	}

	go func() {
		wg.Wait()
		close(out)
	}()

	for r := range out {
		if r.Err != nil {
			fmt.Println("err:", r.Err)
			continue
		}
		fmt.Println("ok:", r.ID, "=", r.Val)
	}
}

Здесь данные и ошибки всегда идут вместе. Приемнику не нужно синхронизировать два канала — достаточно одного for range.

Каналы ошибок (chan error)

chan error — не отдельная магия, а все тот же канал, просто с устоявшимся назначением: собирать ошибки из конкурентных участков и передавать их в одну точку принятия решения.

Несколько типичных схем:

1. Собрать все ошибки

Уже рассмотренный пример: буферный канал, WaitGroup(), закрытие после wg.Wait(). Подходит, когда важно узнать полную картину — какие задачи упали, какие прошли.

2. «Первая ошибка — стоп»

Иногда продолжать работу после первого сбоя бессмысленно. В этом случае канал ошибок обычно буферизуют размером 1 и читают только первую ошибку. Остальные пытаются записаться, но не блокируют систему:

package main

import (
	"context"
	"errors"
	"fmt"
	"sync"
	"time"
)

func task(ctx context.Context, id int) error {
	// имитация длительной работы
	select {
	case <-ctx.Done(): // если выше уже решили остановиться
		return ctx.Err()
	case <-time.After(100 * time.Millisecond):
		if id == 3 {
			return errors.New("падение на задаче 3")
		}
		return nil
	}
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	errCh := make(chan error, 1) // храним только первую ошибку
	var wg sync.WaitGroup

	for id := 1; id <= 5; id++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()

			if err := task(ctx, id); err != nil {
				// пробуем отправить первую ошибку
				select {
				case errCh <- fmt.Errorf("task %d: %w", id, err):
					// только тот, кто успел записать ошибку, отменяет контекст
					cancel()
				default:
					// в канале уже есть ошибка, ничего не делаем
				}
			}
		}(id)
	}

	go func() {
		wg.Wait()
		close(errCh)
	}()

	if err, ok := <-errCh; ok {
		fmt.Println("остановлено по ошибке:", err)
	} else {
		fmt.Println("успешное завершение")
	}
}

Здесь chan error играет роль однокомпонентного «почтового ящика»: кто первым донес плохую новость, тот и запустил отмену.

3. Правила работы с каналом ошибок

  • Канал ошибок закрывает только читатель, когда уверен, что все отправители завершились (после wg.Wait()).
  • Если канал не буферизовать и не читать из него параллельно, горутины с errs <- err зависнут. Поэтому либо буфер, либо отдельная горутина-сборщик.
  • Запись в закрытый канал мгновенно падает в panic, поэтому закрытие всегда должно быть централизованным.

Использование контекста для ошибок

Канал ошибок отвечает на вопрос «что случилось», но сам по себе не дает сигнал остальным горутинам, что пора остановиться. Для этого нужен context.Context.

Связка выглядит так:

  • каждая горутина принимает ctx и периодически проверяет <-ctx.Done(),
  • при первой значимой ошибке вызывается cancel(),
  • все остальные горутины видят отмену контекста и корректно завершаются.
package main

import (
	"context"
	"errors"
	"fmt"
	"sync"
	"time"
)

func fetch(ctx context.Context, id int) error {
	// имитация запроса наружу
	select {
	case <-ctx.Done(): // кто-то выше решил, что работа больше не нужна
		return ctx.Err()
	case <-time.After(time.Duration(id) * 150 * time.Millisecond):
		if id == 2 {
			return errors.New("ошибка при fetch 2")
		}
		return nil
	}
}

func main() {
	// корневой контекст с возможностью отмены
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	errCh := make(chan error, 1) // достаточно первой ошибки
	var wg sync.WaitGroup

	for id := 1; id <= 4; id++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()

			if err := fetch(ctx, id); err != nil {
				// передаем первую ошибку и инициируем отмену
				select {
				case errCh <- fmt.Errorf("fetch %d: %w", id, err):
					cancel()
				default:
					// ошибка уже зафиксирована, просто выходим
				}
			}
		}(id)
	}

	go func() {
		wg.Wait()
		close(errCh)
	}()

	if err, ok := <-errCh; ok {
		fmt.Println("завершение по ошибке:", err)
	} else {
		fmt.Println("все запросы завершились успешно")
	}
}

Что здесь делает контекст:

  • связывает все горутины в одну «операцию»;
  • дает единый канал Done(), который срабатывает и при явном cancel(), и при таймауте/дедлайне;
  • не прерывает горутину грубо, а дает сигнал, который она сама должна обработать.

В результате получается четкий протокол:

  1. Любая горутина может обнаружить ошибку.
  2. Ошибка передается через chan error к владельцу операции.
  3. Владелец (или та же горутина) вызывает cancel().
  4. Все остальные горутины получают <-ctx.Done() и аккуратно останавливаются.
  5. После wg.Wait() канал ошибок закрывается, и система завершает работу в контролируемом состоянии.

Ошибки в конкурентном коде — не побочный эффект, а часть протокола. Если с самого начала заложить в архитектуру chan error и context.Context, программа будет не только параллельной, но и управляемой: ошибки видно, реакции на них предсказуемы, а горутины не живут дольше, чем нужно.

Рекомендуемые программы

+7 800 100 22 47

бесплатно по РФ

+7 495 085 21 62

бесплатно по Москве

108813 г. Москва, вн.тер.г. поселение Московский,
г. Московский, ул. Солнечная, д. 3А, стр. 1, помещ. 20Б/3
ОГРН 1217300010476
ИНН 7325174845