Phoenix WebSockets Under a Microscope 🔬
This is a code-reading and exploration post about the WebSockets side of Phoenix. It builds upon some of the tracing techniques showcased in the previous post, to observe some of the internals of Phoenix. It also features some tricks I commonly employ to debug WebSocket related issues. The title and nature of this post are inspired by the marvellous book Ruby Under a Microscope written by Pat Shaughenessy.
WebSockets
The WebSocket Protocol enables two-way communication between a client running untrusted code in a controlled environment to a remote host that has opted-in to communications from that code
The goal of this technology is to provide a mechanism for browser-based applications that need two-way communication with servers that does not rely on opening multiple HTTP connections (e.g., using
XMLHttpRequest
or<iframe>
s and long polling)
Conceptually, WebSocket is really just a layer on top of TCP that does the following:
-
Adds a web origin-based security model for browsers
-
Adds an addressing and protocol naming mechanism to support multiple services on one port and multiple host names on one IP address
-
Layers a framing mechanism on top of TCP to get back to the IP packet mechanism that TCP is built on, but without length limits
-
Includes an additional closing handshake in-band that is designed to work in the presence of proxies and other intermediaries
(source: RFC 6455)
One of the most radical elements of Phoenix is the ease and productivity it brings when developing WebSocket applications.
We’ll start off by creating a simple application to dive in some aspects of Phoenix and understand its inner workings.
The Number Generator Application
We’ll quickly build a sample application to start the exploration, leveraging the excellent scaffolding tasks of Phoenix, to start the exploration.
The sample application assigns a random Integer to each connected client and streams numbers down the socket on a pre-configured interval.
mix archive.install https://github.com/phoenixframework/archives/raw/master/phoenix_new.ez
mix phoenix.new numbers --no-brunch --no-ecto
With the --no-brunch
option, we skip generating Brunch
related scaffolding, as none of the examples below require JavaScript and
--no-ecto
to skip database related configuration and modules.
Phoenix@v1.2.5 is used for all the examples of this post.
By design, a Phoenix channel is an abstraction on sending and receiving
messages about topics. Phoenix.Transports.WebSocket
is the default
transport mechanism, there are others available and you can even
write your own. You may read the documentation about channels
here.
Let’s code our first channel by creating a web/channels/integers_channels.ex
.
defmodule Numbers.IntegersChannel do
use Numbers.Web, :channel
def join("numbers:" <> type, _params, socket) when type in ~w(positive negative) do
with type <- type |> String.to_existing_atom,
socket <- socket
|> assign(:number, number(type))
|> assign(:joined_at, NaiveDateTime.utc_now) do
send self(), {:update, type}
{:ok, socket}
end
end
def handle_info({:update, type}, socket), do: socket |> push_update(type)
defp push_update(socket, type) do
Process.send_after(self(), {:update, type}, 1000)
push socket, "update", %{number: number(type)}
{:noreply, socket}
end
defp number(:positive), do: :erlang.unique_integer([:positive])
defp number(:negative), do: - number(:positive)
end
Then update the web/channels/user_socket.ex
file to route to the
channel we just defined:
defmodule Numbers.UserSocket do
use Phoenix.Socket
## Channels
channel "numbers:*", Numbers.IntegersChannel
## Transports
transport :websocket, Phoenix.Transports.WebSocket
def connect(_params, socket) do
{:ok, socket}
end
def id(_socket), do: nil
end
Two topics are defined, numbers:positive
streaming positive numbers
and numbers:negative
for negative ones.
You may notice that we’re assigning a :joined_at
attribute to each
client. This is just a convention which can come handy to calculate for how
long a client has been connected. For real-time join/leave tracking,
Phoenix.Presence should be preferred.
We can now start the application using:
iex -S mix phoenix.server
Connecting to WebSockets from the terminal
Now that we have the server up and running, we can initiate some connections using wsta, a cli tool written in Rust ⚙️, which follows the Unix philosophy letting you pipe streams to and from to other scripts or files.
Installation
Debian ❤️
echo 'deb http://download.opensuse.org/repositories/home:/esphen/Debian_8.0/ /' > /etc/apt/sources.list.d/wsta.list
apt-get update
apt-get install wsta
Mac OS X
brew tap esphen/wsta https://github.com/esphen/wsta.git
brew install wsta
Windows
Ensure you have a command prompt with GNU libraries, for example the git prompt, and run the provided binary file from there. You can find binary releases on the releases page.
With wsta installed, we can connect to the positive numbers topic using:
wsta 'ws://localhost:4000/socket/websocket' \
'{"topic":"numbers:positive","event":"phx_join","payload":{},"ref":"1"}'
The output will look like:
Connected to ws://localhost:4000/socket/websocket
{"topic":"numbers:positive","ref":"1","payload":{"status":"ok","response":{}},"event":"phx_reply"}
{"topic":"numbers:positive","ref":null,"payload":{"number":62402},"event":"update"}
{"topic":"numbers:positive","ref":null,"payload":{"number":62434},"event":"update"}
{"topic":"numbers:positive","ref":null,"payload":{"number":62466},"event":"update"}
and for the negative ones:
wsta 'ws://localhost:4000/socket/websocket' \
'{"topic":"numbers:negative","event":"phx_join","payload":{},"ref":"1"}'
The output will look like:
Connected to ws://localhost:4000/socket/websocket
{"topic":"numbers:negative","ref":"1","payload":{"status":"ok","response":{}},"event":"phx_reply"}
{"topic":"numbers:negative","ref":null,"payload":{"number":-61890},"event":"update"}
{"topic":"numbers:negative","ref":null,"payload":{"number":-61922},"event":"update"}
{"topic":"numbers:negative","ref":null,"payload":{"number":-61954},"event":"update"}
You’ll notice that wsta will print Disconnected!
and stop streaming numbers after around 1 minute.
This is due to a default Phoenix.Transports.WebSocket
:timeout
configuration which specifies:
The timeout for keeping websocket connections open after it last received data, defaults to 60_000ms
To make it easier to interact with WebSockets in development you may set
it to :infinity
.
Differences between push/3
, broadcast_from/3
and broadcast/3
As you may read in the Phoenix.Channel documentation
there are 3 functions push/3
, broadcast_from3/
and broadcast/3
which can be used to send data to connected clients, but have some
not so subtle differences.
- push/3 Broadcasts the event to the current subscriber
- broadcast_from/3 Broadcasts the event to all but the current subscribers of a given topic
- broadcast/3 Broadcasts an event to all the subscribers of a given topic
You don’t want to use broadcast/3
if you’re only interested to send
updates to a single client, use push/3
instead 😇.
Phoenix WebSocket Wiring
The supervision tree of the application is the following:
I’ve changed the number of ranch HTTP acceptor processes to
just 5 to reduce clutter, using the following config/config.exs
:
config :numbers, Numbers.Endpoint,
url: [host: "localhost"],
http: [acceptors: 5], # 👈
secret_key_base: "something-secret",
render_errors: [view: Numbers.ErrorView, accepts: ~w(html json)],
pubsub: [name: Numbers.PubSub,
adapter: Phoenix.PubSub.PG2]
With a connected WebSocket client, the above supervision tree will look like:
In the above image,0.643.0
is a GenServer which holds Socket assigns for a client, it is created by 0.641.0
which
loops in cowboy_websocket:handler_loop/4 and
monitored by Numbers.PubSub.Local0
.
The cowboy_websocket
handler in turn is spawned by 0.340.0
, a ranch_conns_sup
which is supervised by 0.339.0
a ranch_listener_sup.
What about all those wild-west themed applications and modules?
Where do they come from?
How do our socket pushes manage to reach the client?
The following passages unravel some of the most important function calls which take place for a WebSocket connection to be established and a message to be pushed from the server to the client.
If you find it hard to catch on the whys and the hows of all those function calls, don’t fret, skip to the Phoenix and Cowboy section.
Cowboy 🤠
This is the default web server for Phoenix. It depends on ranch to handle TCP connections and cowlib for HTTP protocol parsing and utility functions.
The HTTP parsing code of cowlib is especially interesting as a demonstration of what can be achieved using pattern-matching.
🎉 Cowboy 2.0 was recently released with support for HTTP/2 and lots of other exciting changes (see announcement and this talk). Phoenix support is on the way (see here).
HTTP/2 support is being finalized thanks to @TheGazler's work in Plug & Phoenix, and of course @lhoguin's continued fantastic work on Cowboy
— Phoenix Framework (@elixirphoenix) November 7, 2017
Further below there are code references, which show how Phoenix uses Cowboy to respond to HTTP/HTTPS requests.
Phoenix.PubSub
phoenix_pubsub is an application dependency of Phoenix handling distributed PubSub messaging and Presence.
In order to understand how essential it is and how things are tied to together, we’ll trace the framework’s data flows.
In our example, a client connects to the channel "numbers:42"
, then from
the server a message {"number": 42}
is broadcasted to all connected
clients.
Take a deep breath and let’s probe 🔎 Phoenix to find out what goes on under the hood, for our message to reach its recipients.
Subscribing
When a client joins a channel,
Phoenix.Socket.Transport.connect/6 is
called. It builds a Phoenix.Socket
with pubsub_server
set to
the configured name in the :pubsub
setting of the Endpoint. For
this sample application it defaults to Numbers.PubSub
.
# Signature
connect(endpoint, handler, transport_name, transport, serializer, params)
# Actual arguments
connect(Numbers.Endpoint, Numbers.UserSocket, :websocket, Phoenix.Transports.WebSocket, Phoenix.Transports.WebSocketSerializer, %{}
It will call Numbers.UserSocket.connect/2
with the following arguments:
connect(%{}, %Phoenix.Socket{
assigns: %{},
channel: nil,
channel_pid: nil,
endpoint: Numbers.Endpoint,
handler: Numbers.UserSocket,
id: nil,
joined: false,
pubsub_server: Numbers.PubSub,
ref: nil,
serializer: Phoenix.Transports.WebSocketSerializer,
topic: nil,
transport Phoenix.Transports.WebSocket,
transport_name: websocket,
transport_pid: <0.421.0> # This is a :cowboy_websocket process
})
Then a Phoenix.Channel.Server
process is started by Phoenix.Channel.Server.join/2
.
File: lib/phoenix/channel/server.ex
(link)
defmodule Phoenix.Channel.Server do
use GenServer
def join(socket, auth_payload) do
Phoenix.Endpoint.instrument socket, :phoenix_channel_join,
%{params: auth_payload, socket: socket}, fn ->
ref = make_ref()
# 👀 A Phoenix.Channel.Server is started here
case GenServer.start_link(__MODULE__, {socket, auth_payload, self(), ref}) do
{:ok, pid} ->
receive do: ({^ref, reply} -> {:ok, reply, pid})
:ignore ->
receive do: ({^ref, reply} -> {:error, reply})
{:error, reason} ->
Logger.error fn -> Exception.format_exit(reason) end
{:error, %{reason: "join crashed"}}
end
end
end
def init({socket, auth_payload, parent, ref}) do
socket = %{socket | channel_pid: self()}
# 👀 socket.channel will be Numbers.IntegersChannel here
case socket.channel.join(socket.topic, auth_payload, socket) do
{:ok, socket} ->
join(socket, %{}, parent, ref)
{:ok, reply, socket} ->
join(socket, reply, parent, ref)
{:error, reply} ->
send(parent, {ref, reply})
:ignore
other ->
# ✂️ error handling omitted
end
end
defp join(socket, reply, parent, ref) do
PubSub.subscribe(socket.pubsub_server, socket.topic,
link: true,
fastlane: {socket.transport_pid,
socket.serializer,
socket.channel.__intercepts__()})
send(parent, {ref, reply})
{:ok, %{socket | joined: true}}
end
end
Phoenix.Channel.Server.join/2
is called with arguments:
# Signature
join(socket, auth_payload)
# Actual Arguments
join(%Phoenix.Socket{
assigns: %{},
channel: Numbers.IntegersChannel,
channel_pid: nil,
endpoint: Numbers.Endpoint,
handler: Numbers.UserSocket,
id: nil,
joined: false,
pubsub_server: Numbers.PubSub,
ref: nil,
serializer: Phoenix.Transports.WebSocketSerializer,
topic: "numbers:42",
transport: Phoenix.Transports.WebSocket,
transport_name: websocket,
transport_pid: <0.438.0>},
%{})
and the Phoenix.Channel.Server.init/1
callback is invoked with arguments:
# Signature
init({socket, auth_payload, parent, ref})
# Actual Arguments
init({
%Phoenix.Socket{
assigns: %{},
channel: Numbers.IntegersChannel,
channel_pid: nil,
endpoint: Numbers.Endpoint,
handler: Numbers.UserSocket,
id: nil,
joined: false,
pubsub_server: Numbers.PubSub,
ref: nil,
serializer: Phoenix.Transports.WebSocketSerializer,
topic: "numbers:42",
transport:Phoenix.Transports.WebSocket,
transport_name: :websocket,
transport_pid: <0.438.0>i
},
%{},
<0.438.0>,
#Ref<0.3148875998.1158676484.67712>})
Then Phoenix.Channel.Server.init/1
will call Phoenix.Channel.Server.join/4
which calls PubSub.subscribe/3
with arguments:
# Signature
subscribe(server, topic, opts)
# Actual Arguments
subscribe(Numbers.PubSub,
"numbers:42",
link: true,
fastlane: {<0.291.0>, Phoenix.Transports.WebSocketSerializer, []})
File: lib/phoenix/pubsub.ex
(link)
defmodule Phoenix.PubSub do
def subscribe(server, topic, opts)
when is_atom(server) and is_binary(topic) and is_list(opts) do
call(server, :subscribe, [self(), topic, opts])
end
# 👇 For the subscription call/3 will be called with:
# server here is Numbers.PubSub
# kind is :subscribe
defp call(server, kind, args) do
[{^kind, module, head}] = :ets.lookup(server, kind)
# :ets.lookup(Numbers.PubSub, :subscribe)
apply(module, kind, head ++ args)
end
end
The Numbers.PubSub
named ETS table will have the following entry for :subscribe
:
{:subscribe, Phoenix.PubSub.Local, [Numbers.PubSub, 1]}
So apply(module, kind, head ++ args)
is in this case is a
Phoenix.PubSub.Local.subscribe/5
call.
File: lib/phoenix/pubsub/local.ex
(link)
defmodule Phoenix.PubSub.Local do
# 👇 Will be called with:
# subscribe(Numbers.PubSub, 1, <0.332.0>, "numbers:42",
# link: true, fastlane: {<0.330.0>,Phoenix.Transports.WebSocketSerializer,[]})
# Also the default pool_size is 1
def subscribe(pubsub_server, pool_size, pid, topic, opts \\ [])
when is_atom(pubsub_server) do
{local, gc} =
pid
|> :erlang.phash2(pool_size)
|> pools_for_shard(pubsub_server)
:ok = GenServer.call(local, {:monitor, pid, opts})
true = :ets.insert(gc, {pid, topic})
# 🔎 :ets.insert(Numbers.PubSub.GC0, {<0.344.0>, "numbers:42"})
true = :ets.insert(local, {topic, {pid, opts[:fastlane]}})
# 🔎 :ets.insert(Numbers.PubSub.Local0,
# {"numbers:42", {<0.344.0>, {<0.342.0>, Phoenix.Transports.WebSocketSerializer, []}}})
:ok
end
end
We ended up with 2 new entries in the Numbers.PubSub.GC0
and
Numbers.PubSub.Local0
tables. Hopefully it will become clear further
down this post how they’re used.
We’re done with the subscription part 😅, moving on to the broadcast..
Publishing
When we broadcast/3
from one of our channels, as in the following example:
defmodule Numbers.IntegersChannel do
use Numbers.Web, :channel
def join("numbers:" <> type, _params, socket) do
send self(), {:update, type}
{:ok, socket}
end
end
def handle_info({:update, "42"}, socket) do
# 👉 We broadcast to all the connected clients
broadcast socket, "update", %{number: 42}
{:noreply, socket}
end
end
Phoenix.Channel.Server.broadcast/3 will be called.
File: lib/phoenix/channel/server
(link)
defmodule Phoenix.Channel.Server do
# 👇 Will be called with arguments:
# broadcast(Numbers.PubSub, "numbers:42", "update", %{number: 42})
def broadcast(pubsub_server, topic, event, payload)
when is_binary(topic) and is_binary(event) and is_map(payload) do
PubSub.broadcast pubsub_server, topic, %Broadcast{
topic: topic,
event: event,
payload: payload
}
end
end
It calls PubSub.broadcast/3
.
File: lib/phoenix/pubsub.ex
(link)
defmodule Phoenix.PubSub do
def broadcast(server, topic, message) when is_atom(server) or is_tuple(server),
do: call(server, :broadcast, [:none, topic, message])
defp call(server, kind, args) do
[{^kind, module, head}] = :ets.lookup(server, kind)
apply(module, kind, head ++ args)
end
end
Our sample application will have the following entry for
:ets.lookup(Numbers.PubSub, :broadcast)
:
{:broadcast, Phoenix.PubSub.PG2Server, [Phoenix.Channel.Server, Numbers.PubSub, 1]}
So apply(module, kind, head ++ args)
in this case is a Phoenix.PubSub.PG2Server.broadcast/6
call.
File: lib/phoenix/pubsub/pg2_server.ex
(link)
defmodule Phoenix.PubSub.PG2Server do
# 👇 Will be called with:
# broadcast(Phoenix.Channel.Server, Numbers.PubSub, 1, :none, "numbers:42",
# %Phoenix.Socket.Broadcast{event: "update", payload: %{number: 42}, topic: "numbers:42"})
def broadcast(fastlane, server_name, pool_size, from_pid, topic, msg) do
server_name
|> get_members() # 👉 Returns the Numbers.PubSub registered process
|> do_broadcast(fastlane, server_name, pool_size, from_pid, topic, msg)
end
defp do_broadcast(pids, fastlane, server_name, pool_size, from_pid, topic, msg)
when is_list(pids) do
local_node = Phoenix.PubSub.node_name(server_name)
Enum.each(pids, fn
pid when is_pid(pid) and node(pid) == node() ->
Local.broadcast(fastlane, server_name, pool_size, from_pid, topic, msg)
{^server_name, node_name} when node_name == local_node ->
# 🏃 Next Phoenix.PubSub.Local.broadcast/6 is called
Local.broadcast(fastlane, server_name, pool_size, from_pid, topic, msg)
pid_or_tuple ->
send(pid_or_tuple, {:forward_to_local, fastlane, from_pid, topic, msg})
end)
:ok
end
defp get_members(server_name) do
# 👇 Will be called with:
# :pg2.get_members({:phx, Numbers.PubSub})
:pg2.get_members(pg2_namespace(server_name))
end
end
Next Phoenix.PubSub.Local.broadcast/6
is called.
File: lib/phoenix/pubsub/local.ex
(link)
defmodule Phoenix.PubSub.Local do
# 👇 Will be called with:
# broadcast(Phoenix.Channel.Server, Numbers.PubSub, 1, :none, "numbers:42",
# %Phoenix.Socket.Broadcast{event: "update", payload: %{number: 42}, topic: "numbers:42"})
def broadcast(fastlane, pubsub_server, 1 = _pool_size, from, topic, msg)
when is_atom(pubsub_server) do
do_broadcast(fastlane, pubsub_server, _shard = 0, from, topic, msg)
:ok
end
defp do_broadcast(fastlane, pubsub_server, shard, from, topic, msg) do
pubsub_server
|> subscribers_with_fastlanes(topic, shard)
# Returns a List of tuples like:
# => {#PID<0.359.0>, {#PID<0.357.0>, Phoenix.Transports.WebSocketSerializer, []}}
# Where the <0.359.0> => a Phoenix.Channel.Server process
# <0.357.0> a cowboy_websocket process
# and calls Phoenix.Channel.Server.fastlane/3
|> fastlane.fastlane(from, msg)
end
end
Next Phoenix.Channel.Server.fastlane/3
is called.
File: lib/phoenix/channel/server.ex
(link)
defmodule Phoenix.Channel.Server do
# Will be called with:
# fastlane([{<0.393.0>, {<0.391.0>, Phoenix.Transports.WebSocketSerializer, []}}],
# :none,
# %Phoenix.Socket.Broadcast{event: "update", payload: %{number: 42}, topic: "numbers:42"})
def fastlane(subscribers, from, %Broadcast{event: event} = msg) do
Enum.reduce(subscribers, %{}, fn
{pid, _fastlanes}, cache when pid == from ->
cache
{pid, nil}, cache ->
send(pid, msg)
cache
{pid, {fastlane_pid, serializer, event_intercepts}}, cache ->
# 🤓 Read about message interception
# https://github.com/phoenixframework/phoenix/blob/v1.2.5/lib/phoenix/channel.ex#L101
if event in event_intercepts do
send(pid, msg)
cache
else
case Map.fetch(cache, serializer) do
{:ok, encoded_msg} ->
send(fastlane_pid, encoded_msg)
cache
:error ->
# 🔎 serializer here is Phoenix.Transports.WebSocketSerializer
encoded_msg = serializer.fastlane!(msg)
# 🔎 fastlane_pid here is a cowboy_websocket process
send(fastlane_pid, encoded_msg)
Map.put(cache, serializer, encoded_msg)
end
end
end)
end
end
Next the message is serialized and sent to a cowboy_websocket
handler process.
File: lib/phoenix/transports/websocket_serializer.ex
(link)
defmodule Phoenix.Transports.WebSocketSerializer do
@moduledoc false
@behaviour Phoenix.Transports.Serializer
alias Phoenix.Socket.Message
alias Phoenix.Socket.Broadcast
@doc """
Translates a `Phoenix.Socket.Broadcast` into a `Phoenix.Socket.Message`.
"""
def fastlane!(%Broadcast{} = msg) do
{:socket_push, :text, Poison.encode_to_iodata!(%Message{
topic: msg.topic,
event: msg.event,
payload: msg.payload
})}
end
end
The message is handled by Phoenix.Endpoint.CowboyWebSocket.websocket_info/3
.
File: lib/phoenix/endpoint/cowboy_web_socket.ex
(link)
defmodule Phoenix.Endpoint.CowboyWebSocket do
# Implementation of the WebSocket transport for Cowboy.
@moduledoc false
# 🤓 Read about :cowboy_webocket_handler => https://github.com/ninenines/cowboy/blob/1.1.2/src/cowboy_websocket_handler.erl
@behaviour :cowboy_websocket_handler
@connection Plug.Adapters.Cowboy.Conn
def websocket_info(message, req, {handler, state}) do
handle_reply req, handler, handler.ws_info(message, state)
end
defp handle_reply(req, handler, {:reply, {opcode, payload}, new_state}) do
{:reply, {opcode, payload}, req, {handler, new_state}}
end
end
Phoenix.Endpoint.CowboyWebSocket.websocket_info/3
is called with arguments:
websocket_info(
# message =
{:socket_push, :text,
[123,
[[34,["topic"],34],
58,
[34,["numbers:42"],34],
44,
[34,["ref"],34],
58,"null",44,
[34,["payload"],34],
58,
[123,[[34,["number"],34],58,"42"],125],
44,
[34,["event"],34],
58,
[34,["update"],34]],
125]},
# message is the charlist format of
# {"topic": "numbers:42", "ref": "null", "payload": "{\"number\": \"42\"}", "event": "update"}
# req =
{:http_req, #Port<0.12968>, :ranch_tcp, :keepalive, <0.381.0>, "GET", 'HTTP/1.1',
{{127,0,0,1},50079},
"localhost", :undefined, 4000, "/socket/websocket", :undefined,
<<>>, :undefined, :undefined, [], [], [],
[websocket_version: 13, websocket_compress: false],
:waiting, <<>>, undefined, false, :done, [], <<>>, undefined},
# {handler, state} =
{Phoenix.Transports.WebSocket,
%{channels: %{"numbers:42" => <0.383.0>},
channels_inverse: %{<0.383.0> => {"numbers:42", "1"}},
serializer: Phoenix.Transports.WebSocketSerializer,
socket:
%Phoenix.Socket{
assigns: %{},
channel: nil,
channel_pid: nil,
endpoint: Numbers.Endpoint,
handler: Numbers.UserSocket,
id: nil,
joined: false,
pubsub_server: Numbers.PubSub,
ref: nil,
serializer: Phoenix.Transports.WebSocketSerializer,
topic: nil,
transport: Phoenix.Transports.WebSocket,
transport_name: :websocket,
transport_pid: <0.381.0>}}})
Finally Phoenix.Transports.Websocket.ws_info/2
is called.
File: lib/phoenix/transports/websocket.ex
(link)
defmodule Phoenix.Transports.WebSocket do
@behaviour Phoenix.Socket.Transport
def ws_info({:socket_push, _, _encoded_payload} = msg, state) do
format_reply(msg, state)
end
defp format_reply({:socket_push, encoding, encoded_payload}, state) do
{:reply, {encoding, encoded_payload}, state}
end
end
At this point it may still not be evident how the return value of
Phoenix.Endpoint.CowboyWebSocket.websocket_info/3
manages to send data
down the socket. For this to be demystified, proceed to the next
section.
Phoenix and Cowboy
When the numbers
application is started, its Numbers.Endpoint
supervisor is started, supervising the following children:
Supervisor.which_children Numbers.Endpoint
#=> [{Phoenix.CodeReloader.Server, #PID<0.354.0>, :worker,
#=> [Phoenix.CodeReloader.Server]},
#=>
#=> {{:node,
#=> ["node_modules/brunch/bin/brunch", "watch", "--stdin",
#=> {:cd, "/Users/zorbash/dev/opensource/blog_examples/numbers"}]},
#=> #PID<0.353.0>, :worker, [Phoenix.Endpoint.Watcher]},
#=>
#=> 👇 We'll focus on this supervisor.
#=> It's responsible for the HTTP part of the application
#=> {Phoenix.Endpoint.Server, #PID<0.344.0>, :supervisor,
#=> [Phoenix.Endpoint.Server]},
#=>
#=> {Phoenix.PubSub.PG2, #PID<0.337.0>, :supervisor, [Phoenix.PubSub.PG2]},
#=>
#=> {Phoenix.Config, #PID<0.336.0>, :worker, [Phoenix.Config]}]
File: lib/phoenix/endpoint/server.ex
(link)
defmodule Phoenix.Endpoint.Server do
# The supervisor for the underlying handlers.
@moduledoc false
use Supervisor
require Logger
def start_link(otp_app, endpoint, opts \\ []) do
Supervisor.start_link(__MODULE__, {otp_app, endpoint}, opts)
end
def init({otp_app, endpoint}) do
handler = endpoint.config(:handler) # 👉 returns Phoenix.Endpoint.CowboyHandler
children =
for {scheme, port} <- [http: 4000, https: 4040],
config = endpoint.config(scheme) do
handler.child_spec(scheme, endpoint, default(config, otp_app, port))
# 👇 Phoenix.Endpoint.child_spec/3 will return:
# {{:ranch_listener_sup, Numbers.Endpoint.HTTP},
# {Phoenix.Endpoint.CowboyHandler, :start_link,
# [:http, Numbers.Endpoint,
# {:ranch_listener_sup, :start_link,
# [Numbers.Endpoint.HTTP, 5, :ranch_tcp, [max_connections: 16384, port: 4000],
# :cowboy_protocol,
# [env: [dispatch: [{:_, [],
# [{["socket", "websocket"], [], Phoenix.Endpoint.CowboyWebSocket,
# {Phoenix.Transports.WebSocket,
# {Numbers.Endpoint, Numbers.UserSocket, :websocket}}},
# {:_, [], Plug.Adapters.Cowboy.Handler,
# {Numbers.Endpoint, []}}]}]]]]}]},
# :permanent, :infinity, :supervisor, [:ranch_listener_sup]}
end
supervise(children, strategy: :one_for_one)
end
end
Phoenix.Endpoint.CowboyWebSocket.start_link/3
will be called by the
Phoenix.Endpoint.Server
supervisor:
File: lib/phoenix/endpoint/cowboy_handler.ex
(link)
defmodule Phoenix.Endpoint.CowboyHandler do
@behaviour Phoenix.Endpoint.Handler
require Logger
@doc """
Generates a childspec to be used in the supervision tree.
"""
def child_spec(scheme, endpoint, config) do
if scheme == :https do
Application.ensure_all_started(:ssl)
end
dispatches =
for {path, socket} <- endpoint.__sockets__,
{transport, {module, config}} <- socket.__transports__,
# Allow handlers to be configured at the transport level
handler = config[:cowboy] || default_for(module),
do: {Path.join(path, Atom.to_string(transport)),
handler,
{module, {endpoint, socket, transport}}}
dispatches =
dispatches ++ [{:_, Plug.Adapters.Cowboy.Handler, {endpoint, []}}]
# Use put_new to allow custom dispatches
config = Keyword.put_new(config, :dispatch, [{:_, dispatches}])
{ref, mfa, type, timeout, kind, modules} =
Plug.Adapters.Cowboy.child_spec(scheme, endpoint, [], config)
# Rewrite MFA for proper error reporting
mfa = {__MODULE__, :start_link, [scheme, endpoint, mfa]}
{ref, mfa, type, timeout, kind, modules}
end
defp default_for(Phoenix.Transports.LongPoll), do: Plug.Adapters.Cowboy.Handler
defp default_for(_), do: nil
@doc """
Callback to start the Cowboy endpoint.
"""
# 👉 start_link/3 will be called with:
# [
# :http,
# Numbers.Endpoint,
# {
# :ranch_listener_sup, :start_link, [
# Numbers.Endpoint.HTTP, 5, :ranch_tcp, [max_connections: 16384, port: 4000],
# :cowboy_protocol, [env: [dispatch: [{:_, [],
# [{["socket", "websocket"], [], Phoenix.Endpoint.CowboyWebSocket,
# {Phoenix.Transports.WebSocket,
# {Numbers.Endpoint, Numbers.UserSocket, :websocket}}},
# {:_, [], Plug.Adapters.Cowboy.Handler,
# {Numbers.Endpoint, []}}]}]]]
# ]
# }
# ]
# ✂️ Parts of the env responsible for livereloading have been omitted.
def start_link(scheme, endpoint, {m, f, [ref | _] = a}) do
# ref is used by Ranch to identify its listeners, defaulting
# to plug.HTTP and plug.HTTPS and overridable by users.
# 👀 apply/3 becomes :ranch_listener_sup.start_link/6
case apply(m, f, a) do
{:ok, pid} ->
Logger.info info(scheme, endpoint, ref)
{:ok, pid}
{:error, {:shutdown, {_, _, {{_, {:error, :eaddrinuse}}, _}}}} = error ->
Logger.error [info(scheme, endpoint, ref), " failed, port already in use"]
error
{:error, _} = error ->
error
end
end
end
Next :ranch_listener_sup.start_link/6
is called.
File: src/ranch_listener_sup.erl
(link)
-module(ranch_listener_sup).
-behaviour(supervisor).
-export([start_link/6]).
-export([init/1]).
-spec start_link(ranch:ref(), non_neg_integer(), module(), any(), module(), any())
-> {ok, pid()}.
% 👉 start_link/6 is called as:
% start_link(
% Ref = Numbers.Endpoint.HTTP,
% NbAcceptors = 5,
% Transport = :ranch_tcp,
% TransOpts = [max_connections: 16384, port: 4000],
% Protocol = :cowboy_protocol,
% ProtoOpts = [env:
% [dispatch: [{:_, [],
% [{["socket", "websocket"], [], Phoenix.Endpoint.CowboyWebSocket,
% {Phoenix.Transports.WebSocket,
% {Numbers.Endpoint, Numbers.UserSocket, :websocket}}},
% {:_, [], Plug.Adapters.Cowboy.Handler,
% {Numbers.Endpoint, []}}]}]]
% ]
start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) ->
MaxConns = proplists:get_value(max_connections, TransOpts, 1024),
% 👀 The options of the new listener are kept in an ETS table
ranch_server:set_new_listener_opts(Ref, MaxConns, ProtoOpts),
supervisor:start_link(?MODULE, {
Ref, NbAcceptors, Transport, TransOpts, Protocol
}).
init({Ref, NbAcceptors, Transport, TransOpts, Protocol}) ->
AckTimeout = proplists:get_value(ack_timeout, TransOpts, 5000),
ConnType = proplists:get_value(connection_type, TransOpts, worker),
Shutdown = proplists:get_value(shutdown, TransOpts, 5000),
ChildSpecs = [
{ranch_conns_sup, {ranch_conns_sup, start_link,
[Ref, ConnType, Shutdown, Transport, AckTimeout, Protocol]},
permanent, infinity, supervisor, [ranch_conns_sup]},
{ranch_acceptors_sup, {ranch_acceptors_sup, start_link,
[Ref, NbAcceptors, Transport, TransOpts]},
permanent, infinity, supervisor, [ranch_acceptors_sup]}
],
{ok, {{rest_for_one, 10, 10}, ChildSpecs}}.
We can fetch (or even modify 🙈) the listener opts:
:ets.lookup :ranch_server, {:opts, Numbers.Endpoint.HTTP}
[{{:opts, Numbers.Endpoint.HTTP},
[env: [dispatch: [{:_, [],
[{["socket", "websocket"], [], Phoenix.Endpoint.CowboyWebSocket,
{Phoenix.Transports.WebSocket,
{Numbers.Endpoint, Numbers.UserSocket, :websocket}}},
{:_, [], Plug.Adapters.Cowboy.Handler, {Numbers.Endpoint, []}}]}]]]}]
The Phoenix.Endpoint.Server
will supervise the following children:
[{{:ranch_listener_sup, Numbers.Endpoint.HTTP}, #PID<0.345.0>, :supervisor,
[:ranch_listener_sup]}]
and that child will supervise:
[{:ranch_acceptors_sup, #PID<0.350.0>, :supervisor, [:ranch_acceptors_sup]},
{:ranch_conns_sup, #PID<0.349.0>, :supervisor, [:ranch_conns_sup]}]
A ranch_acceptors_sup
uses ranch_tcp.listen/1
to start listening.
ranch_tcp
is a wrapper around gen_tcp and gen_tcp.listen/2 sets up a socket
to listen on a random port on the local host.
ranch_acceptors_sup
then starts a pool of acceptor supervised processes.
Each one of them waits (with :infinity
timeout) to accept a connection
on the listening socket.
If a connection is established, the ranch_acceptor
will transfer the
control of the socket to the connections supervisor so that it receives
messages from the socket.
When a TCP connection is established ranch_conns_sup
will start a cowboy_protocol
process using :cowboy_protocol.start_link/4. When the cowboy_protocol
process is started successfully, ranch_conns_sup
will transfer the control
of the socket to that process so that it receives messages from the socket.
The cowboy_protocol
handler is responsible for receiving and parsing messages of the HTTP protocol. It handles requests
by executing all the layers of its middleware stack. By default this
stack contains cowboy_router
.
cowboy_router.execute/2
will be called as:
:cowboy_router.execute({
:http_req, #Port<0.12695>, :ranch_tcp, :keepalive, <0.371.0>, "GET", 'HTTP/1.1',
{{127,0,0,1},61936},
"localhost>, :undefined, 4000, "/socket/websocket", :undefined,
<<>>, :undefined, :undefined,
[{"host", "localhost:4000"},
{"connection", "Upgrade"},
{"upgrade", "websocket"},
{"sec-websocket-version", "13"},
{"sec-websocket-key", "IpbxMsfW2JriERk8clVssw=="},
{"origin", "http://localhost"}],
[{"connection", ["upgrade"]}],
:undefined, [], :waiting, <<>>, :undefined, false, :waiting, [], <<>>, :undefined},
[{:listener, Numbers.Endpoint.HTTP},
{:dispatch,
[{'_',[],
[{["socket", "websocket"],
[], Phoenix.Endpoint.CowboyWebSocket,
{Phoenix.Transports.WebSocket,
{Numbers.Endpoint, Numbers.UserSocket, :websocket}}},
{'_', [], Plug.Adapters.Cowboy.Handler,
{Numbers.Endpoint, []}}]}]}]
)
It will dispatch the GET /socket/websocket
request to Phoenix.Endpoint.CowboyWebSocket which
has the behaviour of :cowboy_websocket_handler
.
File: lib/phoenix/endpoint/cowboy_websocket.ex
(link)
defmodule Phoenix.Endpoint.CowboyWebSocket do
# Implementation of the WebSocket transport for Cowboy.
@moduledoc false
@behaviour :cowboy_websocket_handler
@connection Plug.Adapters.Cowboy.Conn
@already_sent {:plug_conn, :sent}
# 👇 Phoenix.Endpoint.CowboyWebSocket.init/3 will be called with:
# {transport, :http} = {:tcp, :http},
#
# req = {:http_req, #Port<0.12671>, :ranch_tcp, :keepalive, #PID<0.356.0>, "GET",
# :"HTTP/1.1", {{127, 0, 0, 1}, 60935}, "localhost", :undefined, 4000,
# "/socket/websocket", :undefined, "", :undefined, [],
# [{"host", "localhost:4000"}, {"connection", "Upgrade"},
# {"upgrade", "websocket"}, {"sec-websocket-version", "13"},
# {"sec-websocket-key", "Is5bfu9A9tnZ9AO/V0vrEg=="},
# {"origin", "http://localhost"}], [{"connection", ["upgrade"]}], :undefined,
# [], :waiting, "", :undefined, false, :waiting, [], "", :undefined},
#
# {module, opts} = {Phoenix.Transports.WebSocket, {Numbers.Endpoint, Numbers.UserSocket, :websocket}}
def init({transport, :http}, req, {module, opts}) when transport in [:tcp, :ssl] do
conn = @connection.conn(req, transport)
try do
# 👀 Phoenix.Transports.WebSocket.init/2 is called here
case module.init(conn, opts) do
{:ok, %{adapter: {@connection, req}}, args} ->
{:upgrade, :protocol, __MODULE__, req, args}
{:error, %{adapter: {@connection, req}}} ->
{:shutdown, req, :no_state}
end
catch
kind, reason ->
# Although we are not performing a call, we are using the call
# function for now so it is properly handled in error reports.
mfa = {module, :call, [conn, opts]}
{:upgrade, :protocol, __MODULE__, req, {:error, mfa, kind, reason, System.stacktrace}}
after
receive do
@already_sent -> :ok
after
0 -> :ok
end
end
end
def upgrade(req, env, __MODULE__, {handler, opts}) do
args = [req, env, __MODULE__, {handler, opts}]
resume(:cowboy_websocket, :upgrade, args)
end
def resume(module, fun, args) do
apply(module, fun, args)
# ✂️ error handling omitted
end
## Websocket callbacks
# 👇 websocket_init/3 will be called with:
# _transport = tcp,
#
# req = {
# :http_req, #Port<0.12964>, :ranch_tcp, :keepalive, <0.373.0>, "GET", 'HTTP/1.1',
# {{127,0,0,1},50034},
# "localhost", :undefined, 4000, "/socket/websocket",undefined,
# <<>>, :undefined, [],
# [{"host", "localhost:4000"},
# {"connection", "Upgrade"},
# {"upgrade", "websocket"},
# {"sec-websocket-version", "13"},
# {"sec-websocket-key", "kxCvlKM3nApp8UdFrqh9Ug=="},
# {"origin", "http://localhost">>}],
# [{"upgrade",["websocket"]},
# {"connection",["upgrade"]}],
# :undefined,
# [{:websocket_version,13},{:websocket_compress,false}],
# :waiting,<<>>,:undefined,false,:waiting,[],<<>>,:undefined},
#
# {handler, args} = {Phoenix.Transports.WebSocket,
# {%Phoenix.Socket{assigns => %{},channel => nil,
# channel_pid => nil,endpoint => Numbers.Endpoint,
# handler => Numbers.UserSocket, id => nil,joined => false,
# pubsub_server => Numbers.PubSub,ref => nil,
# serializer => Phoenix.Transports.WebSocketSerializer,
# topic => nil, transport => Phoenix.Transports.WebSocket,
# transport_name => websocket,transport_pid => <0.373.0>},
# [{:serializer, Phoenix.Transports.WebSocketSerializer},
# {:transport_log,false},
# {:timeout, :infinity},
# {:acceptors,5}]}})
def websocket_init(_transport, req, {handler, args}) do
{:ok, state, timeout} = handler.ws_init(args)
{:ok, :cowboy_req.compact(req), {handler, state}, timeout}
end
end
Phoenix will respond with:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
which is generated from (link) and concludes the WebSocket handshake, so that the data transfer starts. It is now a two-way communication channel where each side can, independently from the other, send data at will.
After a successful handshake, clients and servers transfer data back and forth in conceptual units referred to in this specification as “messages”. On the wire, a message is composed of one or more frames
With an established WebSocket connection, a client (phoenix.js
, see link) will push a:
{"topic":"numbers:42","event":"phx_join","payload":{},"ref":"1"}
The cowboy_websocket
processs, which has the control of the socket,
receives a message in its mailbox, decodes it (WebSocket Protocol) and
calls Phoenix.Endpoint.CowboyWebSocket.websocket_handle/3
as follows:
Phoenix.Endpoint.CowboyWebSocket.websocket_handle
{opcode, payload} = {:text, "{\"topic\":\"numbers:42\" ,\"event\":\"phx_join\",\"payload\":{},\"ref\":\"1\"}"}
req = {:http_req, #Port<0.12964>, :ranch_tcp, :keepalive, <0.373.0>, "GET",'HTTP/1.1',
{{127,0,0,1},50034},
"localhost", :undefined, 4000, "/socket/websocket", :undefined,
<<>>,:undefined,:undefined,[],[],[],
[{websocket_version,13},{websocket_compress,false}],
waiting,<<>>,:undefined,false,done,[],<<>>,:undefined}
{handler, state} = {Phoenix.Transports.WebSocket,
%{channels: %{}, channels_inverse: %{},
serializer: Phoenix.Transports.WebSocketSerializer,
socket: %Phoenix.Socket{assigns: %{},
channel: nil,
channel_pid: nil,
endpoint => Numbers.Endpoint,
handler => Numbers.UserSocket,
id => nil,
joined => false,
pubsub_server => Numbers.PubSub,
ref => nil,
serializer => Phoenix.Transports.WebSocketSerializer,
topic => nil,
transport => Phoenix.Transports.WebSocket,
transport_name => websocket,transport_pid => <0.373.0>}}})
Then Phoenix.Transports.WebSocket.ws_handle/3
is called which parses
the message as JSON and calls Phoenix.Socket.Transport.dispatch/3
which will pattern-match on the event
part of the message so that
phx_join
will finally call Phoenix.Channel.Server.join/2
.
What happens next has already been described in the subscribing section.
Debugging WebSockets Essentials
Armed with some knowledge about the inner-workings of Phoenix, we can have some common questions answered. Use the following code snippets with caution in production environments, as some of the Phoenix functions used are documented as:
This is an expensive and private operation. DO NOT USE IT IN PROD.
Getting a list of all the topics which have subscribers:
pool_size = Application.get_env(:numbers, Numbers.Endpoint)[:pubsub][:pool_size] || 1
0..(pool_size - 1) |> Enum.flat_map &Phoenix.PubSub.Local.list(Numbers.PubSub, &1)
#=> ["numbers:42"]
💡 Keep in mind that the default pool_size
is 1.
Getting the subscribers for a topic:
pool_size = Application.get_env(:numbers, Numbers.Endpoint)[:pubsub][:pool_size] || 1
0..(pool_size - 1) |> Enum.flat_map &Phoenix.PubSub.Local.subscribers(Numbers.PubSub, "numbers:42", &1)
#=> [#PID<0.273.0>]
Getting the subscribers for a topic across nodes:
# With the assumptions that all the connected nodes are members of the PubSub cluster:
pool_size = Application.get_env(:numbers, Numbers.Endpoint)[:pubsub][:pool_size] || 1
fun = &Phoenix.PubSub.Local.subscribers(Numbers.PubSub, "numbers:42", &1)
:rpc.multicall [node() | Node.list], Enum, :flat_map, [0..(pool_size - 1), fun]
#=> {[[#PID<0.19570.2363>, #PID<0.4457.2620>, #PID<0.18260.2750>, #PID<0.6396.2762>,
#=> #PID<0.7481.2801>, #PID<0.2731.2866>, #PID<0.13608.2904>,
#=> #PID<0.20333.2910>],
#=> [#PID<37723.31494.2599>, #PID<37723.20571.2660>, #PID<37723.26132.2671>,
#=> #PID<37723.30777.2690>, #PID<37723.23804.2712>, #PID<37723.27526.2731>,
#=> #PID<37723.7742.2776>]], []}
# If you don't like assumptions you can use this to get the actual nodes (with Phoenix.PubSub.PG2 adapter):
nodes = :pg2.get_members({:phx, Numbers.PubSub}) |> Enum.map(&node/1)
[:"numbers1@autoverse", :"numbers2@autoverse"]
Getting the socket state for a subscriber:
# Using the pid from the previous example
:recon.get_state '<0.273.0>'
#=> %Phoenix.Socket{assigns: %{}, channel: Numbers.IntegersChannel,
#=> channel_pid: #PID<0.273.0>, endpoint: Numbers.Endpoint,
#=> handler: Numbers.UserSocket, id: "users_socket:42",
#=> joined: true, pubsub_server: Numbers.PubSub, ref: nil,
#=> serializer: Phoenix.Transports.WebSocketSerializer, topic: "numbers:42",
#=> transport: Phoenix.Transports.WebSocket, transport_name: :websocket,
#=> transport_pid: #PID<0.271.0>}
# You may as well use :sys.get_state/2 but always mind to provide a timeout
:sys.get_state pid("0.273.0"), 1000
Getting the subscriptions of a client by IP address:
You can use :inet.i
Lists all TCP and UDP sockets, including those that the Erlang runtime system uses as well as those you have created
iex(numbers@autoverse)2> :inet.i
Port Module Recv Sent Owner Local Address Foreign Address State Type
57824 inet_tcp 1932 242 <0.9779.2801> 10.16.194.182:4000 10.16.74.135:54514 ???? STREAM
580432 inet_tcp 1095 130 <0.9683.2817> 10.16.194.182:4000 10.16.74.135:55660 ???? STREAM
581976 inet_tcp 1653 110 <0.9693.2818> 10.16.194.182:4000 10.16.74.135:59498 ???? STREAM
583824 inet_tcp 474 129 <0.239.0> 10.16.194.182:4000 10.16.10.150:59540 ???? STREAM
584016 inet_tcp 565 218 <0.22683.2819> 10.16.194.182:4000 10.16.74.135:36924 ???? STREAM
Let’s say that we’re interested for the subscriptions of 10.16.74.150
.
A heuristic would be to get the processes spawned by the owner process of the socket:
owner = pid "0.239.0"
links = Process.info(owner, [:links])[:links]
pids = for pid <- links, pid > owner, do: pid
#=> [#PID<0.241.0>, #PID<0.246.0>]
Alternatively you can use :recon.tcp
to enumerate the TCP ports and
keep the ones matching the IP as peername
:
search_ip = {10,16,74,150}
:recon.tcp |> Enum.map(&:recon.port_info/1) |> Enum.filter(fn port ->
case port[:type][:peername] do
{^search_ip, _} -> true
_ -> false
end
end)
|> Enum.map(&(&1[:signals][:connected]))
Recon
Recon is a library to be dropped into any other Erlang project, to be used to assist DevOps people diagnose problems in production nodes.
I’ve found recon to be an indispensable tool for both regular debugging and critical production issues.
A handful of WebSocket related traces you can attempt with recon are:
# Trace the next 100 subscriptions for the Numbers Phoenix app
:recon_trace.calls {:ets, :insert, [{[:"Elixir.Numbers.PubSub.Local0", :_], [], [:"$_"]}]}, 100
#=> 0:28:39.377466 <0.621.0> ets:insert('Elixir.Numbers.PubSub.Local0', {<<"numbers:42">>,
#=> {<0.621.0>,{<0.619.0>,'Elixir.Phoenix.Transports.WebSocketSerializer',[]}}})
# Trace the next 50 subscriptions for the Numbers Phoenix app
:recon_trace.calls {:ets, :insert, [{[:"Elixir.Numbers.PubSub.Local0", :_], [], [:"$_"]}]}, 50
#=> 0:28:39.377466 <0.621.0> ets:insert('Elixir.Numbers.PubSub.Local0', {<<"numbers:42">>,
#=> {<0.621.0>,{<0.619.0>,'Elixir.Phoenix.Transports.WebSocketSerializer',[]}}})
:recon.get_state '<0.621.0>'
#=> %Phoenix.Socket{assigns: %{}, channel: Numbers.IntegersChannel,
#=> channel_pid: #PID<0.621.0>, endpoint: Numbers.Endpoint,
#=> handler: Numbers.UserSocket, id: nil, joined: true,
#=> pubsub_server: Numbers.PubSub, ref: nil,
#=> serializer: Phoenix.Transports.WebSocketSerializer, topic: "numbers:42",
#=> transport: Phoenix.Transports.WebSocket, transport_name: :websocket,
#=> transport_pid: #PID<0.619.0>}
# Trace the next 10 calls to pg2.get_members/1 and show the return value
:recon_trace.calls {:pg2, :get_members, [{:_, [], [{:return_trace}]}]}, 10
#=> 1:15:03.621778 <0.706.0> pg2:get_members({phx,'Elixir.Numbers.PubSub'})
#=> 1:15:03.621956 <0.706.0> pg2:get_members/1 --> [<0.279.0>]
# In a distributed Phoenix scenario, we'd get something like:
#=> 1:30:41.842307 <0.257.0> pg2:get_members({phx,'Elixir.Numbers.PubSub'})
#=> 1:30:41.845794 <0.257.0> pg2:get_members/1 --> [<0.279.0>,<16611.342.0>]
If you find it hard to build complex match specs by hand 🙀, you can try ex2ms which is a rough equivalent of :dbg.fun2ms/1 or :ets.fun2ms/1.
Conclusion
The Phoenix codebase is a very interesting one and there’s lot to learn from it. I really hope that sharing my quest to understand and explore Phoenix will be beneficial for others. If you found a mistake please submit a pull request or mention it in the comments.