We can't find the internet
Attempting to reconnect
Something went wrong!
Hang in there while we get back on track
Post 85
Ash Framework 12 - PubSub
Published on: 2025-11-09
Tags:
elixir, Blog, Side Project, LiveView, Ecto, Html/CSS, Phoenix, Ash, Framework
Chapter 10: Delivering Real-Time Updates with PubSub (25O)
Notifying Users About New Albums
Now we need to make sure that we have a way to show a user if there is new music that an artist has made. Let's get started.
Creating the Notification Resource
So this will be a new resource that will be the bridge between Tunez.Accounts Tunez.Music let's generate the empty resource.
mix ash.gen.resource Tunez.Accounts.Notification --extend postgres
Now let's head to the new resource and set up the references and attributes lib/tunez/accounts/Notification.ex
postgres do
table("notifications")
repo(Tunez.Repo)
references do
reference(:user, index?: true, on_delete: :delete)
reference(:album, on_delete: :delete)
end
end
attributes do
uuid_primary_key(:id)
create_timestamp(:inserted_at)
end
relationships do
belongs_to :user, Tunez.Accounts.User do
allow_nil?(false)
end
belongs_to :album, Tunez.Music.Album do
allow_nil?(false)
end
end
We need to have an id as we will need to be able to delete individual messages as they are seen and after they are clicked on. As well as the inserted at so that we can sort them as needed. Now we are going to need to codegen and migrate
mix ash.codegen create_notifications
mix ash.migrate
Creating Notifications on Demand
We need to add in a change to the album that will send Notifications when ever an album is created let's head to lib/tunez/music/album.ex
changes do
change(Tunez.Accounts.Changes.SendNewAlbumNotifications, on: [:create])
change(relate_actor(:created_by, allow_nil?: true), on: [:create])
change(relate_actor(:updated_by, allow_nil?: true))
end
We will need to create a new module that doesn't exist yet, it will need to utilize Ash.Resource.Change. Let's head to lib/tunez/accounts/changes/send_new_album_notifications.ex
defmodule Tunez.Accounts.Changes.SendNewAlbumNotifications do
use Ash.Resource.Change
@impl true
def change(changeset, _opts, _context) do
# Create notifications here!
changeset
end
end
Running Actions in Bulk
Okay so normally you would need to set up a way to send thousands of Emails if an artist creates a new album, we will utilize Bulk Action. We can test this first with a iex session.
Testing Artist Bulk Create
iex(1)> # user is a loaded record with role = :admin
iex(2)> Tunez.Music.create_artist(%{name: "New Artist"}, actor: user)
INSERT INTO "artists" («fields») VALUES ($1,$2,$3,$4,$5,$6,$7) RETURNING
«fields» [«data»]
{:ok, #Tunez.Music.Artist<...>}
We can use the same code to run bulk actions by changing what we pass in.
Instead of a single map, we can call the code interface with a list of maps.
iex(3)> data = [%{name: "New Artist 1"}, %{name: "New Artist 2"}]
[...]
iex(4)> Tunez.Music.create_artist(data, actor: user)
INSERT INTO "artists" («fields») VALUES ($1,$2,$3,$4,$5,$6,$7),
($8,$9,$10,$11,$12,$13,$14) RETURNING «fields» [«data for both records»]
%Ash.BulkResult{
status: :success, errors: [], records: nil,
notifications: nil, error_count: 0
}
There is even a specific way to do bulk creates with Ash.bulk_create.
iex(5)> Ash.bulk_create(data, Tunez.Music.Artist, :create, actor: user)
%Ash.BulkResult{
status: :success, errors: [], records: nil,
notifications: nil, error_count: 0
}
There is even ways to define certain parts of the create or even a way to get more information back after the action is done.
iex(11)> Ash.bulk_create([%{name: "Test"}], Tunez.Music.Artist, :create,
actor: user, return_records?: true)
%Ash.BulkResult{status: :success, records: [#Tunez.Music.Artist<...>], ...}
iex(12)> Tunez.Music.create_artist([%{name: "Test"}], actor: user,
bulk_options: [return_records?: true])
%Ash.BulkResult{status: :success, records: [#Tunez.Music.Artist<...>], ...}
Back to Album Notifications
Now we can use this to create a list of things that need to be created at the same time. We will leverage the bulk_create. lib/tunez/accounts/changes/send_new_album_notifications.ex
def change(changeset, _opts, _context) do
Ash.Changeset.after_action(changeset, fn _changeset, album ->
album = Ash.load!(album, artist: [:follower_relationships])
album.artist.follower_relationships
|> Enum.map(fn %{follower_id: follower_id} ->
%{album_id: album.id, user_id: follower_id}
end)
|> Ash.bulk_create!(Tunez.Accounts.Notification, :create)
{:ok, album}
end)
end
Now that we have this we need to create the create action in the notification.ex, lib/tunez/accounts/notification.ex
actions do
create :create do
accept([:user_id, :album_id])
end
end
Now we need the policy to be sure that you have the ability to do this.
policies do
policy action(:create) do
forbid_if(always())
end
end
This will make sure that only the server itself will be able to create this action. We will need to be sure that we bypass the authorization.
album.artist.follower_relationships
|> Enum.map(fn %{follower_id: follower_id} ->
%{album_id: album.id, user_id: follower_id}
end)
|> Ash.bulk_create!(Tunez.Accounts.Notification, :create, authorize?: false)
Now you can even test this in the server. Just be sure that you are following the artist and then create a new album, this will only show in the logs.
[debug] HANDLE EVENT "save" in TunezWeb.Albums.FormLive
Parameters: %{"form" => %{"cover_image_url" => "", "name" => "Test Album
Name", "year_released" => "2025"}}
INSERT INTO "albums" («fields») VALUES («values») RETURNING «fields»
[«album_uuid», "Test Album Name", «now», «now», «artist_uuid», nil,
«creator_uuid», «creator_uuid», 2025]
«queries to load the album's artist's followers»
INSERT INTO "notifications" ("id","album_id","inserted_at","user_id") VALUES
($1,$2,$3,$4) [«uuid», «album_uuid», «now», «user_uuid»]
"Optimizing Big Queries with Streams"
In order to make this a bit quicker we can use stream to get the followers of the artist with a new read that we will use directly for this action. lib/tunez/music/artist_follower.ex
read :for_artist do
argument :artist_id, :uuid do
allow_nil?(false)
end
filter(expr(artist_id == ^arg(:artist_id)))
pagination(keyset?: true, required?: false)
end
Now we need to set up the codebase for that in Tunez.Music. lib/tunez/music.ex
resource Tunez.Music.ArtistFollower do
...
define :followers_for_artist, action: :for_artist, args: [:artist_id]
end
Now we can rewrite the change in the send_new_album_notifications.ex
def change(changeset, _opts, _context) do
changeset
|> Ash.Changeset.after_action(fn _changeset, album ->
Tunez.Music.followers_for_artist!(album.artist_id, stream?: true)
|> Stream.map(fn %{follower_id: follower_id} ->
%{album_id: album.id, user_id: follower_id}
end)
|> Ash.bulk_create!(Tunez.Accounts.Notification, :create, authorize?: false)
{:ok, album}
end)
end
Showing Notifications to Users
Now we need to show the messages to the users. We need to head to lib/tunez_web/live/notification_live.ex
def mount(_params, _session, socket) do
notifications = Tunez.Accounts.notifications_for_user!(actor: socket.assigns.current_user)
{:ok, assign(socket, notifications: notifications)}
end
Now we need to create the interface function in lib/tunez/accounts.ex
resource Tunez.Accounts.Notification do
define :notifications_for_user, action: :for_user
end
Lastly we need to add in the action to the notification, lib/tunez/accounts/notification.ex
actions do
create :create do
accept([:user_id, :album_id])
end
read :for_user do
prepare build(load: [album: [:artist]], sort: [inserted_at: :desc])
filter expr(user_id == ^actor(:id))
end
end
Then we need to give a policy for the :for_user action. lib/tunez/accounts/notification.ex
policies do
...
policy action(:for_user) do
authorize_if(actor_present())
end
end
A Brief Detour into LiveView Process Shenanigans
The issue is that we don't have the current user within the Notifications as its a almost outside the current LiveView. We need to do something as the sticky childview only have access to the session info at the start. head to lib/tunez_web/live/notification_live.ex
defmodule TunezWeb.NotificationsLive do
use TunezWeb, :live_view
on_mount {TunezWeb.LiveUserAuth, :current_user}➤
def mount(_params, _session, socket) do
# ...
Okay, Tell Me About That New Album... and Then Go away
The window will direct you to the album and even trigger the event to dismiss-notification but it doesn't go away yet, let's deal with that. lib/tunez_web/live/notification_live.ex
def handle_event("dismiss-notification", %{"id" => id}, socket) do
notification = Enum.find(socket.assigns.notifications, &(&1.id == id))
Tunez.Accounts.dismiss_notification(
notification,
actor: socket.assigns.current_user
)
notifications = Enum.reject(socket.assigns.notifications, &(&1.id == id))
{:noreply, assign(socket, notifications: notifications)}
end
Now that we have that we need to define the resource and deal with the destroy. First lib/tunez/accounts.ex
resource Tunez.Accounts.Notification do
define :notifications_for_user, action: :for_user
define :dismiss_notification, action: :destroy
end
Then lib/tunez/accounts/notification.ex
actions do
defaults([:destroy])
Now the destroy action needs a policy. lib/tunez/accounts/notification.ex
policy action(:destroy) do
authorize_if(relates_to_actor_via(:user))
end
Updating Notifications in Real Time
Okay now we need to setup the PubSub part of this network.
Setting Up the Publish Mechanism
Now we can add in the Ash.Notifier.PubSub notifier to the Notification resource. lib/tunez/accounts/notification.ex
defmodule Tunez.Accounts.Notification do
use Ash.Resource,
otp_app: :tunez,
domain: Tunez.Accounts,
data_layer: AshPostgres.DataLayer,
authorizers: [Ash.Policy.Authorizer],
notifiers: [Ash.Notifier.PubSub]
Now that we have the enabled we can send out messages whenever the create action is triggered. So let's do that with the pubsub block. lib/tunez/accounts/notification.ex
pub_sub do
prefix("notifications")
module(TunezWeb.Endpoint)
publish(:create, [:user_id])
end
Debugging Pubsub Publishing
Once that is done there is a way to get better logs for these actions. Head to config/dev.exs
config :ash, :pub_sub, debug?: true
iex(1)> Ash.Changeset.for_action(Tunez.Accounts.Notification, :create,
%{user_id: «user_uuid», album_id: «album_uuid»})
|> Ash.create!(authorize?: false)
INSERT INTO "notifications" ...
[debug] Broadcasting to topics ["notifications:«user_uuid»"] via
TunezWeb.Endpoint.broadcast
Notification:
%Ash.Notifier.Notification{resource: Tunez.Accounts.Notification, domain:
Tunez.Accounts, action: %Ash.Resource.Actions.Create{name: :create,
primary?: true, description: nil, error_handler: nil, accept: ...
Ash has built an Ash.Notifier.Notification struct (not to be confused with a
Tunez.Accounts.Notification!), and that’s what will be sent out in the broadcast.
If we try to generate pubsub messages in iex by creating a new album for an
artist that has at least one follower, though, we won’t see the pubsub debug
message printed:
iex(2)> Tunez.Music.create_album!(%{artist_id: «artist_uuid»,
name: "New Album", year_released: 2022}, actor: user)
INSERT INTO "albums" ("id","name","inserted_at","updated_at", ...
«SELECT query to load the artist followers»
INSERT INTO "notifications" ("id","album_id","inserted_at", ...
%Tunez.Music.Album{...}
"Putting Our Detective Caps On"
There is an issue with this as the bulk_action isn't creating the notification.
ex(3)> Ash.bulk_create([%{user_id: «user_uuid», album_id: «album_id»}],
Tunez.Accounts.Notification, :create, authorize?: false)
INSERT INTO "notifications" ("id","album_id","inserted_at","user_id") ...
%Ash.BulkResult{notifications: nil, ...}
We don’t! Notifications aren’t generated by default for bulk actions, just like
records aren’t returned, also for performance reasons. To configure a bulk
action to generate and auto-send any notifications, you can use the notify? true
option of Ash.bulk_create.15
iex(4)> Ash.bulk_create([%{user_id: «user_uuid», album_id: «album_id»}],
Tunez.Accounts.Notification, :create, authorize?: false, notify?: true)➤
INSERT INTO "notifications" ("id","user_id","album_id","inserted_at") ...
[debug] Broadcasting to topics ["notifications:«user_uuid»"] via
TunezWeb.Endpoint.broadcast
Notification:
%Ash.Notifier.Notification{resource: Tunez.Accounts.Notification, ...}
%Ash.BulkResult{...}
Now we can head to lib/tunez/accounts/changes/send_new_album_notifications.ex
def change(changeset, _opts, _context) do
changeset
|> Ash.Changeset.after_action(fn _changeset, album ->
Tunez.Music.followers_for_artist!(album.artist_id, stream?: true)
|> Stream.map(fn %{follower_id: follower_id} ->
%{album_id: album.id, user_id: follower_id}
end)
|> Ash.bulk_create!(Tunez.Accounts.Notification, :create, authorize?: false, notify?: true)
{:ok, album}
end)
end
That will make sure that we have the notification send out with this action.
"Limiting Data Sent Within Notifications"
We need to make sure that we only send out the right information and be sure that we stop people from seeing information that they shouldn't see. lib/tunez/accounts/notification.ex
transform fn notification ->
Map.take(notification.data, [:id, :user_id, :album_id])
end
Setting Up the Subscribe Mechanism
This is super easy as LiveView is all setup to listen and react we have all the sends taken care of. lib/tunez_web/live/notification_live.ex
def mount(_params, _session, socket) do
notifications = Tunez.Accounts.notifications_for_user!(actor: socket.assigns.current_user)
if connected?(socket) do
"notifications:#{socket.assigns.current_user.id}"
|> TunezWeb.Endpoint.subscribe()
end
{:ok, assign(socket, notifications: notifications)}
end
Now that we have the message sent we need to deal with the listen part with a handle_info in the same file.
def handle_info(%{topic: "notifications:" <> _}, socket) do
notifications = Tunez.Accounts.notifications_for_user!(actor: socket.assigns.current_user)
{:noreply, assign(socket, notifications: notifications)}
end
Deleting Notifications
Now we might need to delete messages that we have sent out. 1) Like when you are looking at the same thing on 2 computers, 2) or when you delete an album and need to pull back the messages. lib/tunez/accounts/notification.ex
pub_sub do
...
publish(:create, [:user_id])
publish(:destroy, [:user_id])
end
Now we need to clean up the dismiss-notification event. lib/tunez_web/live/notification_live.ex
def handle_event("dismiss-notification", %{"id" => id}, socket) do
notification = Enum.find(socket.assigns.notifications, &(&1.id == id))
Tunez.Accounts.dismiss_notification(
notification,
actor: socket.assigns.current_user
)
notifications = Enum.reject(socket.assigns.notifications, &(&1.id == id))
{:noreply, socket}
end
"Cascading Deletes in Code"
Now we need to deal with the issue of the app knowing what is happening. Up till now we have done everything in the database we want to move it to the app. lib/tunez/accounts/notification.ex
references do
reference(:user, index?: true, on_delete: :delete)
reference(:album)
end
This is a change to the database so we will need to setup a migration and codegen.
mix ash.codegen remove_notification_album_cascade_delete
mix ash.migrate
Now we need to manually delete the notifications lib/tunez/music/album.ex
relationships do
...
has_many :notifications, Tunez.Accounts.Notification
end
Then we need to set up the destroy action.
actions do
defaults([:read])
destroy :destroy do
primary?(true)
change(
cascade_destroy(:notifications,
return_notifications?: true,
after_action?: false
)
)
end
end
This utilize the cascading delete. Now we need to read the related notifications so let's set that up. lib/tunez/accounts/notification.ex
actions do
defaults([:read, :destroy])
...
end
Now the Policy lib/tunez/music/album.ex
policies do
bypass actor_attribute_equals(:role, :admin) do
authorize_if(always())
end
...
policy action_type([:update, :destroy]) do
authorize_if(expr(^actor(:role) == :editor and created_by_id == ^actor(:id)))
end
end
"Calculations: Not Just for User-Facing Data"
We can even leverage the calculation to deal with data that we need for a policy or even a function behind the scenes. lib/tunez/music/album.ex
calculations do
...
calculate(
:can_manage_album?,
:boolean,
expr(
^actor(:role) == :admin or
(^actor(:role) == :editor and created_by_id == ^actor(:id))
)
)
end
Now the policy for the actions it can now be done in one function call.
policy action_type([:update, :destroy]) do
authorize_if(expr(can_manage_album?))
end
Now we can move on to the notification resource and utilize the new calculation. lib/tunez/accounts/notification.ex
policies do
policy action(:read) do
authorize_if(expr(album.can_manage_album?))
end
...
policy action(:destroy) do
authorize_if(expr(album.can_manage_album?))
authorize_if(relates_to_actor_via(:user))
end
end
This is why we couldn't do the cascade delete as we deleted the album then the notification and it needs to have the authorization from the album we just deleted.
"I Have Some Bad News for You, Though..."
Okay so now that we have the album done what about Artists that have Albums that have Notifications...
SO we have the cascade option of setting up the entire thing to deal with Artists. We will opt for the cascade_destroy option here. lib/tunez/music/album.ex
postgres do
...
references do
reference(:artist, index?: true)
end
end
Now we need to replace the default destroy with a cascade_destroy. lib/tunez/music/artist.ex
actions do
defaults([:create, :read])
...
destroy :destroy do
primary?(true)
change(
cascade_destroy(::albums,
return_notifications?: true,
after_action?: false
) )
end
end
Now we need to generate and run the migration to update the database.
mix ash.codegen remove_album_artist_cascade_delete
mix ash.migrate
We Need to Talk About Atomics
This is the idea that you don't want to show the follower count (current) + 1. You want to pull the follower count (in the DB) then add 1.
What Does This Mean for Tunez?
Let's deal with the MinutesToSeconds we can make this use atomic very simply.
defmodule Tunez.Music.Changes.MinutesToSeconds do
...
@impl true
def atomic(changeset, opts, context) do
{:ok, change(changeset, opts, context)}
end
end
What if We Really Wanted to Store the Follower Count, Though?
There is a way to leverage the atomic_update to update a field if we are storing it in the db. This is built in and uses atomic logic to deal with the update.
update :follow do
change atomic_update(:follower_count, expr(follower_count + 1))
end
For more complex logic there is the atomic/3 callback.
@impl true
def atomic(_changeset, _opts, _context) do
{:atomic, %{follower_count: expr(follower_count + 1)}}
end
Both these work similar.
Rewriting UpdatePreviousNames to be Atomic
lib/tunez/music/changes/update_previous_names.ex
defmodule Tunez.Music.Changes.UpdatePreviousNames do
use Ash.Resource.Change
@impl true
def change(changeset, _opts, _context) do
Ash.Changeset.before_action(changeset, fn changeset ->
new_name = Ash.Changeset.get_attribute(changeset, :name)
previous_name = Map.get(changeset.data, :name)
previous_names = Map.get(changeset.data, :previous_names) || []
names =
[previous_name | previous_names]
|> Enum.uniq()
|> Enum.reject(&(&1 == new_name))
Ash.Changeset.change_attribute(changeset, :previous_names, names)
end)
end
@impl true
def atomic(_changeset, _opts, _context) do
{:atomic,
%{
previous_names:
{:atomic,
expr(
fragment(
"array_remove(array_prepend(?, ?), ?)",
name,
previous_names,
^atomic_ref(:name)
)
)}
}}
end
end
lib/tunez/music/artist.ex
update :update do
accept [:name, :biography]
change Tunez.Music.Changes.UpdatePreviousNames
end
Wrapping Everything Up
You did it!!! It's all done think about an other project and Build Build Build!!!