Урок «Эрланг на практике. gen_server, продолжение.» Урок «Эрланг на практике. gen_server, продолжение.» Эрланг на...

gen_server, продолжение

Мы реализовали свой gen_server, теперь пора посмотреть на настоящий :)

Наш gen_server стостоит из 3х частей:

  • внешнее АПИ (start/0, add_item/2, remove_item/2, show_items/2, stop/1)
  • общая (generic) часть (call/2)
  • хранение состояния и обработка сообщений (loop/1)

Первые 2 части выполняются в потоке клиента, 3-я часть выполняется в потоке сервера.

Настоящий gen_server устроен сложнее. Код также делится на общую (generic) часть, и кастомную (custom) часть. Общая часть реализована в нескольких модулях OTP фреймворка (gen_server, gen, proc_lib). Кастомную часть мы должны реализовать в своем модуле.

gen_server

На картинке два левых квадрата (верхний и нижний), соответствуют нашему модулю. Два правых квадрата соответствуют коду OTP. Два верхних квадрата выполняются в потоке клиента, два нижних квадрата выполняются в потоке сервера.

Левый верхний квадрат -- это публичное АПИ нашего модуля. Отсюда мы обращаемся к OTP фреймворку. В кастомной реализации, которую мы делали на прошлом уроке, этот квадрат соответствует функциям start, add_item, remove_item, show_items.

Правый верхний квадрат -- это часть OTP, generic код, выполняющийся в потоке клиента. Соответствует функции call в нашей кастомной реализации.

Правый нижний квадрат -- это часть OTP, выполняющаяся в потоке сервера. Соответствует функции loop в нашей реализации. Только там нет кастомной обработки сообщений. А вместо этого OTP вызывает функции обратного вызова (callback) нашего модуля.

Левый нижний квадрат -- функции обратного вызова, принадлежащие нашему модулю, и работающие в потоке сервера.

behaviour в эрланг -- это аналог интерфейсов в джава. Он описывает, какие callback-функции должны быть определены, их имена и аргументы.

behaviour(gen_server) требует, чтобы наш модуль определил функции init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2 и code_change/3.

Дальше мы разберем, как взаимодействуют наш модуль и gen_server. на примере модуля wg_push_sender, из библиотеки wg_push. Это библиотека для отправки сообщений на iOS устройства через Apple Push Notification Service.

инициализация

Все начинается с функции start_link/0:

start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

Здесь мы просим gen_server запустить новый поток.

Макрос ?MODULE разворачивается в имя текущего модуля. Можно было написать:

gen_server:start_link({local, wg_push_sender}, wg_push_sender, [], []).

получится тоже самое.

Первый аргумент {local, ?MODULE} -- это имя, под которым нужно зарегистрировать поток. Это если мы хотим обращаться к нашему серверу по имени. Иначе вызываем gen_server:start_link/3, и созданный поток не будет регистрироваться.

Второй аргумент ?MODULE -- это имя модуля, callback-функции которого будет вызывать gen_server.

Третий аргумент -- это набор параметров, которые нужны при инициализации. В нашем случае никакие не нужны.

Четвертый аргумент -- настройки поведения gen_server. Они довольно специфичны, и необходимость что-то в них менять не возникает. Но загляните в документацию, это полезно :)

Дальше происходит некая магия в правом верхнем квадрате, в результате которой создается серверный поток. Этому потоку нужно получить свое начальное состояние. Для этого вызывается первый callback init/1.

init([]) ->
    {ok, #state{
            apns_host = application:get_env(wg_push, apns_host, "gateway.sandbox.push.apple.com"),
            apns_port = application:get_env(wg_push, apns_port, 2196)
           }}.

Аргумент init, это данные, которые мы передавали третьим аргументом в gen_server:start_link. Здесь нужно создать структуру данных, которая будет хранить состояние сервера.

Часто для этого описывают record с именем state.

-record(state, {
        apns_host :: string(),
        apns_port :: integer(),
        connections = orddict:new() :: orddict:orddict(file:name_all(), port())
         }).

После того, как функция init возвращает #state{}, сервер готов к работе.

gen_server:call

Теперь посмотрим, как делается запрос от клиента к серверу, на примере API-функции send_messages.

send_messages(Messages, SSL_Options) ->
    gen_server:call(?MODULE, {send_messages, Messages, SSL_Options}).

Здесь мы вызываем gen_server:call с двумя аргументами. Первый аргумент -- pid сервера или имя, под которым он зарегистрирован. Второй аргумент -- сообщение, которое посылается серверу.

В недрах OPT вызов проходит через call и loop, и затем вызывается callback-функция handle_call. Ей передаются три аргумента: сообщение от клиента, кортеж {pid клиента, reference} и состояние сервера. Второй аргумент обычно не используется.

handle_call({send_messages, Messages, SSL_Options}, _From, State) ->
    {Reply, State3} = send_messages(Messages, SSL_Options, State),
    {reply, Reply, State3};

handle_call должен обработать сообщение, сформировать ответ для клиента и новое состояние для сервера.

Есть несколько вариантов возвращаемого значения. Но мы не будем рассматривать все возможные случаи. Чаще всего мы отвечаем {reply, Reply, NewState}.

Обычно каждой АПИ функции модуля соответствует отдельное сообщение, а каждому сообщению отдельная ветка handle_call. Если АПИ большое, то и веток handle_call много.

my_api_1(A) ->
    gen_server:call(?MODULE, {msg1, A}).
my_api_2(A, B) ->
    gen_server:call(?MODULE, {msg2, A, B}).
my_api_3(A, B, C) ->
    gen_server:call(?MODULE, {msg3, A, B, C}).
...
handle_call({msg1, A}, _From, State) ->
...
handle_call({msg2, A, B}, _From, State) ->
...
handle_call({msg3, A, B, C}, _From, State) ->

Поэтому внутри handle_call много кода лучше не писать, а выносить его в отдельные функции.

другие callback-функции

gen_server:cast/handle_cast

Вызов gen_server:call блокирует клиента, пока сервер не обработает его запрос и не вернет ответ. Бывают случаи, когда клиенту ответ сервера не нужен. Тогда лучше использовать gen_server:cast. Клиент не блокируется и не ждет ответ сервера. Но сервер получает и обрабатывает сообщение.

Для этого вызывается callback-функция handle_cast:

do_something(A, B) ->
    gen_server:cast(?MODULE, {do_something, A, B}),
    ok.
...
handle_cast({do_something, A, B}, State) ->
    NewState = ...
    {noreply, NewState};

handle_cast должен вернуть измененное состояние.

message/handle_info

Любой поток из любого места в код может отправить серверу сообщение оператором !. Так делать не рекомендуется, потому что это вызовы в обход API сервера. Но иногда так делают.

Если сообщения в функции loop сервера приходят не из gen_server:call/cast, то они обрабатываются в callback-функции handle_info.

handle_info({some_message, A, B}, State) ->
    NewState = ...
    {noreply, NewState};

Сервер и сам может отправлять себе сообщения таким образом. Например, для отложенной инициализации (это мы рассмотрим ниже), или для выполнения повторяющихся операций через интервалы времени.

terminate

Этот callback вызывается, когда gen_server останавливается. Если поток в процессе своей работы занимал какие-то ресурсы (соединение с базой данных, сокеты, файлы и т.д.), то по правилам OTP предлагается освобождать их здесь.

Или если поток накопил какие-то данные, которые нужно куда-то сохранить, то можно делать это здесь. Хотя надежнее сохранять данные периодически, через регулярные интервалы времени. Это минимизирует потери в случае аварийного завершения потока. terminate тогда не вызывается.

code_change

Этот callback вызывается при горячем обновлении кода. Такое обновление тесно связано с релизами, и мы не рассматриваем его в рамках курса. Но для полноты изложения callback упомянем.

В новой версии кода возможно изменилось состояние процесса. В #state{} могло появиться что-то новое, или что-то было убрано, или вообще состояние стало храниться в совсем другой структуре данных.

code_change принимает на входе старый #state{}, и должен его преобразовать и вернуть новый #state{}.

Отложенная инициализация

Вызов init блокирует родительский поток. А с ним и старт приложения. А с ним и старт всей ноды. То есть, нода не начнет работу, пока все init всех gen_server модулей не отработают. Поэтому желательно оставлять init легковесным и возвращаться из него как можно быстрее.

Если инициализация сервера требует долгих действий, то такие вещи лучше делать отложено. Например, устанавливать соединение с базой данных, запрашивать какие-то данные из внешнего источника, создавать большие объекты в памяти -- все это стоит делать отложено.

Есть разные способы реализовать отложенную инициализацию. Мы рассмотрим самый простой.

Здесь в init частично инициализируется State, и поток отправляет сообщение самому себе.

init(Args) ->
    State = some_light_state,
    self() ! heavy_init,
    {ok, State}.

Это сообщение первым ляжет в почтовый ящик, и первым будет обработано в handle_info.

handle_info(heavy_init, State) ->
    NewState = heavy_state,
    {noreply, NewState};

После этого сервер готов обслуживать запросы клиентов.

Происходящее в handle_info не блокирует ничего, кроме потока сервера. И поэтому вся нода в целом может стартовать быстрее, и другие потоки быстрее начинают выполнять свою работу.

Сравнение с ООП

Вы могли заметить некоторое сходство с ООП. Есть объект с внутренним состоянием, публичным АПИ и скрытой реализацией. Таких объектов (потоков) на базе одного класса (модуля) можно создать много. У всех у них будет одинаковое по структуре, но разное по содержанию состояние. Объекты могут взаимодействовать друг с другом, обмениваясь сообщениями.

Если серверный поток регистрируется под определенным именем, то это "одиночка" (singleton). Он такой один, и к нему можно обращаться по имени:

gen_server:call(some_name, some_message)

Если поток не регистрируется, то таких объектов может быть много, и нужно обращаться к ним по Pid:

gen_server:call(Pid1, some_message).
gen_server:call(Pid2, some_message).

Похожесть есть, но есть и нюансы. Для ООП объекта вполне нормально вызывать свои собственные методы. А с gen_server можно попасть в коварную ловушку :)

Deadlock на gen_server:call

С этим сталкивается почти каждый новичок в Erlang, и я тоже в свое время столкнулся.

Внутри handle_call нельзя делать вызов gen_server:call на самого себя. Обычно такое получается не прямо, а опосредованно. Либо вызывается функция из публичного АПИ, которая делает gen_server:call, либо вызывается какой-то другой модуль, а тот вызывает публичное АПИ текущего модуля.

Вызов gen_server:call, это добавление нового сообщения в почтовый ящик. gen_server обрабатывает сообщения по очереди. Пока он не завершит обработку текущего сообщения, он не начнет обработку следующего. Поэтому если обработка текущего сообщения будет ждать результат gen_server:call, то никогда не дождется. 1-й вызов ждет завершения 2-го, а 2-й вызов ждет завершения 1-го. Это deadlock.

gen_server:call по дефолту имеет таймаут в 5 секунд. Если за это время не приходит ответ, то в потоке клиента бросается исключение. Обычно этим и проявляется такой deadlock. Но если вы зачем-то заменили дефолтный таймаут на infinity, то поток в таком состоянии будет висеть бесконечно. В какой-то момент его очередь сообщений исчерпает всю доступную оперативную память, и нода упадет.

Выводы:

  • помните про такой deadlock;
  • хорошо продумывайте таймауты.