Эффективная поддержка многопоточности -- одна из главных фишек эрланг. И она же является базой для других фишек: масштабируемости, распределенности, устойчивости к ошибкам.
Легковесные потоки
Эрланг имеет собственную реализацию многопоточности на уровне виртуальной машины. Конечно, это работает поверх процессов операционной системы. Но поверх одного такого процесса могут работать сотни и тысячи потоков эрланг. И виртуальная машина управляет ими независимо от операционной системы.
В разных операционных системах есть разные сущности: процессы, потоки, нити и т.д. Они отличаются реализацией и возможностями. Но в эрланг такая сущность только одна. Я буду называть ее "поток". Но если где-то упомяну "процесс", то знайте, что это одно и то же :)
Особенность потоков эрланг в том, что они легковесные. Это значит, что они:
- быстро стартуют и завершаются;
- быстро переключаются;
- потребляют мало памяти.
Новый поток создается и стартует за 3-5 микросекунд. На старте он получает 2,5Кб памяти (стек, куча и служебная информация о потоке).
Виртуальная машина имеет лимит на число потоков, по умолчанию это 262,144 (218). Лимит можно увеличить до 134,217,727 (227). Но вряд ли кому-то захочется создать 134 млн потоков на одной ноде, тем более, что для этого понадобится 336Гб оперативной памяти :)
Легковесность потоков качественно меняет архитектуру. Потоки не являются ограниченным ресурсом, как в большинстве других языков. Можно легко выделить отдельный поток на обслуживание каждого клиента, и позволить клиенту занимать этот поток сколько угодно долго.
Поэтому типичная область применения эрланг -- это сервера, которые должны обслуживать большое количество клиентов. Особенно если соединения с клиентами являются долгоживущими.
spawn
Для создания нового потока используется функция spawn.
Она имеет несколько вариантов:
- spawn(Fun) -> pid()
- spawn(Node, Fun) -> pid()
- spawn(Module, Function, Args) -> pid()
- spawn(Node, Module, Function, Args) -> pid()
Так или иначе в аргументах spawn указывается точка входа для нового потока, с которой он начинает выполнятся. Далее поток либо выполнит весь код, и завершится; либо попадет в бесконечную рекурсию и будет выполнятся бесконечно; либо завершится аварийно из-за ошибки.
Функция spawn возвращает Pid -- идентификатор процесса (process identifier). Зная Pid, можно посылать процессу сообщения и получать информацию о нем.
Давайте попробуем запустить несколько потоков:
4> G = fun(X) -> timer:sleep(10), io:format("~p~n", [X]) end.
#Fun<erl_eval.6.13229925>
5> [spawn(fun() -> G(X) end) || X <- lists:seq(1,10)].
[<0.273.0>,<0.274.0>,<0.275.0>,<0.276.0>,<0.277.0>,
<0.278.0>,<0.279.0>,<0.280.0>,<0.281.0>,<0.282.0>]
2
1
4
3
5
8
7
6
10
9
Создаем анонимную функцию, в которой после паузы в 10 миллисекунд выводим на консоль ее аргумент. С помощью генератора списков создаем 10 потоков. Каждый из них выполняет эту функцию независимо от остальных. Видим, что очередность вывода чисел на консоль не совпадает с очередностью создания потоков. Вывод будет случайным, в зависимости от того, как планировщик раздавал управление потокам.
Отправка сообщений
Потоки эрланг имеют каждый свою изолированную область памяти (стек и кучу), не читают и не пишут в чужую память, а обмениваются сообщениями.
Для этого используется оператор ! (bang):
Pid ! Message
Pid должен быть идентификатором процесса, а Message -- любая структура данных или атомарное значение.
Отправка сообщения выполняется асинхронно. То есть, поток не блокируется и не ждет ответа, а продолжает выполнение. При попытке отправить сообщение несуществующему процессу, сообщение просто игнорируется. Ошибки при этом не происходит.
Поток может отправить сообщение самому себе.
10> self() ! hello.
hello
11> flush().
Shell got hello
ok
Почтовый ящик
У каждого потока есть специальная область памяти -- почтовый ящик (mailbox), куда копируются адресованные ему сообщения. Там сообщения накапливаются в очереди, в порядке их появления.
Чтобы прочитать сообщения в почтовом ящике, нужно использовать конструкцию receive.
receive
Pattern1 [when Guard1] ->
Expressions1;
Pattern2 [when Guard2] ->
Expressions2;
...
end
Синтаксис аналогичен конструкции case.
В этом примере поток отправляет сообщение самому себе, и получает его с помощью receive:
12> self() ! "hello again".
"hello again"
13> receive
13> Msg -> io:format("got message:~p~n", [Msg])
13> end.
got message:"hello again"
ok
При вызове receive поток берет сообщение из очереди и сопоставляет его с имеющимися шаблонами. Если находится подходящий шаблон, то выполняется соответствующий блок кода, и затем код после receive. А сообщение удаляется из почтового ящика. Другие сообщения в почтовом ящике остаются до следующего вызова receive. Если сообщение не совпало ни с одним шаблоном, то оно остается в очереди, и для проверки берется следующее.
Чтобы четко разобраться, как работает receive в разных ситуациях, сделаем тест. Будем отправлять разные сообщения, и с помощью receive будем выбирать только те, которые соответствуют шаблону {msg, Any}.
-module(mb).
-export([test/0]).
test() ->
test_messages("test1, ящик пустой", []),
test_messages("test2, одно сообщение, матчится",
[{msg, 1}]),
test_messages("test3, одно сообщение, не матчится",
[msg1]),
test_messages("test4, 3 сообщения, все матчатся",
[{msg, 1}, {msg, 2}, {msg, 3}]),
test_messages("test5, 3 сообщения, все не матчатся",
[msg1, msg2, msg3]),
test_messages("test6, 4 сообщения, часть матчится, часть не матчится",
[{msg, 1}, msg2, {msg, 3}, msg4]),
test_messages("test7, 4 сообщения, часть матчится, часть не матчится",
[msg1, {msg, 2}, msg3, {msg, 4}]),
ok.
test_messages(TestName, Messages) ->
io:format("~n### ~ts~ntest_messages: ~p~n", [TestName, Messages]),
flush(),
[self() ! Msg || Msg <- Messages],
io:format("call receive~n"),
Res = receive
{msg, M} -> {msg, M}
after 100 -> timeout
end,
io:format("after receive got: ~p~n", [Res]),
[{messages, Left}] = process_info(self(), [messages]),
io:format("left in mailbox: ~p~n", [Left]),
ok.
flush() ->
receive
_ -> flush()
after 100 -> ok
end.
Функция flush/0 очищает почтовый ящик перед каждым тестом, для чистоты эксперимента.
По результатам теста мы видим, что receive выбирает одно сообщение, совпадающее с шаблоном, если такое есть. Если нет, то поток блокируется.
В тесте этого не видно, но поток блокируется либо на указанное время, либо до получения подходящего сообщения.
Почтовый ящик -- самая частая причина утечки памяти в эрланг. Если receive не вызывается, или вызывается, но обрабатывает не все сообщения, то утечка памяти неизбежна. Кроме того, по мере роста очереди, каждый проход по ней становится все медленнее и медленнее.
Все эрланг программисты об этом знают. И если обнаруживается утечка памяти, то диагностика проблемы начинается с очередей в почтовых ящиках.
Хорошая практика -- делать в receive последний шаблон такой, чтобы он совпадал с любым сообщением. И в этом случае писать в лог о том, что поток получил сообщение, которое он не умеет обрабатывать.
receive
{do_something, Data} -> do_something(Data);
Any -> lager:warning("Got unknown message ~p", [Any])
end,
Это уменьшит вероятность проблем с почтовым ящиком. Но не гарантирует полностью их отсутствие. Возможно ситуация, когда сообщения поступают быстрее, чем поток успевает их обрабатывать. Увы, но универсального рецепта для таких ситуаций нет :)
timeout
В тесте выше уже использовался таймаут, но нужно подробнее его объяснить.
receive
{do_something, Data} -> do_something(Data);
end,
Здесь, если нет подходящего по шаблону сообщения, поток заблокируется и будет ждать, пока сообщение появится. А если оно никогда не появится, то поток так и останется заблокированным.
Поэтому разумно указать максимальное время, на которое можно заблокировать поток.
receive
{do_something, Data} -> do_something(Data);
after
5000 -> exp1
end,
Это либо целое число -- время в милисекундах, либо атом infinity. Впрочем, infinity аналогично отсутствию after.
register
Pid -- штука хорошая, но не всегда удобная. Если мы хотим посылать сообщения в поток из нескольких других потоков, нам придется как-то передать всем им Pid получателя.
Но есть альтернатива -- потоку можно дать некое имя, глобальное на уровне всей ноды, и потом обращаться к нему по этому имени.
register(Name, Pid)
Вызов register сгенерирует исключение, если имя уже связано с другим потоком.
Регистрацию потока можно отменить:
unregister(Name)
Можно узнать все имена зарегистрированных потоков, какие есть в ноде:
registered()
И можно узнать Pid потока по имени:
whereis(Name)
Пробуем:
1> registered().
[erl_prim_loader,error_logger,kernel_safe_sup,init,user,rex,
inet_db,kernel_sup,code_server,standard_error_sup,
global_name_server,application_controller,file_server_2,
user_drv,standard_error,global_group]
2> register(erl_console, self()).
true
3> registered().
[erl_prim_loader,error_logger,kernel_safe_sup,init,user,rex,
inet_db,kernel_sup,code_server,standard_error_sup,
global_name_server,erl_console,application_controller,
file_server_2,user_drv,standard_error,global_group]
4> whereis(erl_console).
<0.33.0>
5> self().
<0.33.0>
6> unregister(erl_console).
true
Выводы
Легкие потоки, обмен сообщениями, отсутствие разделяемой памяти дают хорошую базу для:
- масштабируемости;
- распределенности;
- устойчивости к ошибкам.
Поскольку потоки -- дешевый ресурс, их можно создавать в количестве, адекватном нагрузке на систему, и менять вместе с изменением нагрузки.
Если мы умеем передавать сообщения от одного потока другому в рамках одной ноды, то не сложно это делать и между двумя нодами. Нужен только транспорт поверх TCP, и в виртуальной машине эрланг этот транспорт есть.
Поскольку потоки изолированы друг от друга, то падение одного потока не влияет на работу других потоков. Впрочем, устойчивость к ошибкам не появляется сама по себе, программисту еще нужно постараться, чтобы этого добиться :)
Остались вопросы? Задайте их в разделе «Обсуждение»
Вам ответят команда поддержки Хекслета или другие студенты