Home Posts Tags Post Search Tag Search

Post 85

Ash Framework 12 - PubSub

Published on: 2025-11-09 Tags: elixir, Blog, Side Project, LiveView, Ecto, Html/CSS, Phoenix, Ash, Framework
Chapter 10: Delivering Real-Time Updates with PubSub (25O)
    Notifying Users About New Albums
        Now we need to make sure that we have a way to show a user if there is new music that an artist has made. Let's get started.

        Creating the Notification Resource
            So this will be a new resource that will be the bridge between Tunez.Accounts Tunez.Music let's generate the empty resource.
                mix ash.gen.resource Tunez.Accounts.Notification --extend postgres
            
            Now let's head to the new resource and set up the references and attributes lib/tunez/accounts/Notification.ex
                postgres do
                    table("notifications")
                    repo(Tunez.Repo)

                    references do
                    reference(:user, index?: true, on_delete: :delete)
                    reference(:album, on_delete: :delete)
                    end
                end

                attributes do
                    uuid_primary_key(:id)
                    create_timestamp(:inserted_at)
                end

                relationships do
                    belongs_to :user, Tunez.Accounts.User do
                    allow_nil?(false)
                    end

                    belongs_to :album, Tunez.Music.Album do
                    allow_nil?(false)
                    end
                end

            We need to have an id as we will need to be able to delete individual messages as they are seen and after they are clicked on. As well as the inserted at so that we can sort them as needed. Now we are going to need to codegen and migrate
                mix ash.codegen create_notifications
                mix ash.migrate


        Creating Notifications on Demand
            We need to add in a change to the album that will send Notifications when ever an album is created let's head to lib/tunez/music/album.ex
                changes do
                    change(Tunez.Accounts.Changes.SendNewAlbumNotifications, on: [:create])
                    change(relate_actor(:created_by, allow_nil?: true), on: [:create])
                    change(relate_actor(:updated_by, allow_nil?: true))
                end
            
            We will need to create a new module that doesn't exist yet, it will need to utilize Ash.Resource.Change. Let's head to lib/tunez/accounts/changes/send_new_album_notifications.ex
                defmodule Tunez.Accounts.Changes.SendNewAlbumNotifications do
                    use Ash.Resource.Change

                    @impl true
                    def change(changeset, _opts, _context) do
                        # Create notifications here!
                        changeset
                    end
                end                
            
    Running Actions in Bulk
        Okay so normally you would need to set up a way to send thousands of Emails if an artist creates a new album, we will utilize Bulk Action. We can test this first with a iex session.

        Testing Artist Bulk Create
            iex(1)> # user is a loaded record with role = :admin
            iex(2)> Tunez.Music.create_artist(%{name: "New Artist"}, actor: user)
            INSERT INTO "artists" («fields») VALUES ($1,$2,$3,$4,$5,$6,$7) RETURNING
            «fields» [«data»]
            {:ok, #Tunez.Music.Artist<...>}

            We can use the same code to run bulk actions by changing what we pass in.
            Instead of a single map, we can call the code interface with a list of maps.
            iex(3)> data = [%{name: "New Artist 1"}, %{name: "New Artist 2"}]
            [...]
            iex(4)> Tunez.Music.create_artist(data, actor: user)
            INSERT INTO "artists" («fields») VALUES ($1,$2,$3,$4,$5,$6,$7),
            ($8,$9,$10,$11,$12,$13,$14) RETURNING «fields» [«data for both records»]
            %Ash.BulkResult{
            status: :success, errors: [], records: nil,
            notifications: nil, error_count: 0
            }

            There is even a specific way to do bulk creates with Ash.bulk_create.
            iex(5)> Ash.bulk_create(data, Tunez.Music.Artist, :create, actor: user)
            %Ash.BulkResult{
            status: :success, errors: [], records: nil,
            notifications: nil, error_count: 0
            }

            There is even ways to define certain parts of the create or even a way to get more information back after the action is done.
            iex(11)> Ash.bulk_create([%{name: "Test"}], Tunez.Music.Artist, :create,
            actor: user, return_records?: true)
            %Ash.BulkResult{status: :success, records: [#Tunez.Music.Artist<...>], ...}
            iex(12)> Tunez.Music.create_artist([%{name: "Test"}], actor: user,
            bulk_options: [return_records?: true])
            %Ash.BulkResult{status: :success, records: [#Tunez.Music.Artist<...>], ...}

        Back to Album Notifications
            Now we can use this to create a list of things that need to be created at the same time. We will leverage the bulk_create. lib/tunez/accounts/changes/send_new_album_notifications.ex
                def change(changeset, _opts, _context) do
                    Ash.Changeset.after_action(changeset, fn _changeset, album ->
                    album = Ash.load!(album, artist: [:follower_relationships])

                    album.artist.follower_relationships
                    |> Enum.map(fn %{follower_id: follower_id} ->
                        %{album_id: album.id, user_id: follower_id}
                    end)
                    |> Ash.bulk_create!(Tunez.Accounts.Notification, :create)

                    {:ok, album}
                    end)
                end

            Now that we have this we need to create the create action in the notification.ex, lib/tunez/accounts/notification.ex
                actions do
                    create :create do
                        accept([:user_id, :album_id])
                    end
                end

            Now we need the policy to be sure that you have the ability to do this.
                policies do
                    policy action(:create) do
                     forbid_if(always())
                    end
                end
            
            This will make sure that only the server itself will be able to create this action. We will need to be sure that we bypass the authorization.
                album.artist.follower_relationships
                |> Enum.map(fn %{follower_id: follower_id} ->
                    %{album_id: album.id, user_id: follower_id}
                end)
                |> Ash.bulk_create!(Tunez.Accounts.Notification, :create, authorize?: false)

            Now you can even test this in the server. Just be sure that you are following the artist and then create a new album, this will only show in the logs.
                [debug] HANDLE EVENT "save" in TunezWeb.Albums.FormLive
                Parameters: %{"form" => %{"cover_image_url" => "", "name" => "Test Album
                Name", "year_released" => "2025"}}
                INSERT INTO "albums" («fields») VALUES («values») RETURNING «fields»
                [«album_uuid», "Test Album Name", «now», «now», «artist_uuid», nil,
                «creator_uuid», «creator_uuid», 2025]
                «queries to load the album's artist's followers»
                INSERT INTO "notifications" ("id","album_id","inserted_at","user_id") VALUES
                ($1,$2,$3,$4) [«uuid», «album_uuid», «now», «user_uuid»]

            "Optimizing Big Queries with Streams"
                In order to make this a bit quicker we can use stream to get the followers of the artist with a new read that we will use directly for this action. lib/tunez/music/artist_follower.ex
                    read :for_artist do
                        argument :artist_id, :uuid do
                            allow_nil?(false)
                        end

                        filter(expr(artist_id == ^arg(:artist_id)))
                        pagination(keyset?: true, required?: false)
                    end

                Now we need to set up the codebase for that in Tunez.Music. lib/tunez/music.ex
                    resource Tunez.Music.ArtistFollower do
                        ...

                        define :followers_for_artist, action: :for_artist, args: [:artist_id]
                    end

                Now we can rewrite the change in the send_new_album_notifications.ex
                    def change(changeset, _opts, _context) do
                        changeset
                        |> Ash.Changeset.after_action(fn _changeset, album ->
                        Tunez.Music.followers_for_artist!(album.artist_id, stream?: true)
                        |> Stream.map(fn %{follower_id: follower_id} ->
                            %{album_id: album.id, user_id: follower_id}
                        end)
                        |> Ash.bulk_create!(Tunez.Accounts.Notification, :create, authorize?: false)

                        {:ok, album}
                        end)
                    end

    Showing Notifications to Users
        Now we need to show the messages to the users. We need to head to lib/tunez_web/live/notification_live.ex
            def mount(_params, _session, socket) do
                notifications = Tunez.Accounts.notifications_for_user!(actor: socket.assigns.current_user)
                {:ok, assign(socket, notifications: notifications)}
            end

        Now we need to create the interface function in lib/tunez/accounts.ex
            resource Tunez.Accounts.Notification do
                define :notifications_for_user, action: :for_user
            end

        Lastly we need to add in the action to the notification, lib/tunez/accounts/notification.ex
            actions do
                create :create do
                accept([:user_id, :album_id])
                end

                read :for_user do
                prepare build(load: [album: [:artist]], sort: [inserted_at: :desc])
                filter expr(user_id == ^actor(:id))
                end
            end

        Then we need to give a policy for the :for_user action. lib/tunez/accounts/notification.ex
            policies do
                ... 

                policy action(:for_user) do
                    authorize_if(actor_present())
                end
            end

        A Brief Detour into LiveView Process Shenanigans
            The issue is that we don't have the current user within the Notifications as its a almost outside the current LiveView. We need to do something as the sticky childview only have access to the session info at the start. head to lib/tunez_web/live/notification_live.ex
                defmodule TunezWeb.NotificationsLive do
                    use TunezWeb, :live_view
                    on_mount {TunezWeb.LiveUserAuth, :current_user}➤
                    def mount(_params, _session, socket) do
                    # ...


        Okay, Tell Me About That New Album... and Then Go away
            The window will direct you to the album and even trigger the event to dismiss-notification but it doesn't go away yet, let's deal with that. lib/tunez_web/live/notification_live.ex
                def handle_event("dismiss-notification", %{"id" => id}, socket) do
                    notification = Enum.find(socket.assigns.notifications, &(&1.id == id))

                    Tunez.Accounts.dismiss_notification(
                    notification,
                    actor: socket.assigns.current_user
                    )

                    notifications = Enum.reject(socket.assigns.notifications, &(&1.id == id))
                    {:noreply, assign(socket, notifications: notifications)}
                end

            Now that we have that we need to define the resource and deal with the destroy. First lib/tunez/accounts.ex
                resource Tunez.Accounts.Notification do
                    define :notifications_for_user, action: :for_user
                    define :dismiss_notification, action: :destroy
                end

            Then lib/tunez/accounts/notification.ex
                actions do
                    defaults([:destroy])

            Now the destroy action needs a policy. lib/tunez/accounts/notification.ex
                policy action(:destroy) do
                    authorize_if(relates_to_actor_via(:user))
                end

    Updating Notifications in Real Time
        Okay now we need to setup the PubSub part of this network.
        
        Setting Up the Publish Mechanism
            Now we can add in the Ash.Notifier.PubSub notifier to the Notification resource. lib/tunez/accounts/notification.ex
                defmodule Tunez.Accounts.Notification do
                    use Ash.Resource,
                        otp_app: :tunez,
                        domain: Tunez.Accounts,
                        data_layer: AshPostgres.DataLayer,
                        authorizers: [Ash.Policy.Authorizer],
                        notifiers: [Ash.Notifier.PubSub]

            Now that we have the enabled we can send out messages whenever the create action is triggered. So let's do that with the pubsub block. lib/tunez/accounts/notification.ex
                pub_sub do
                    prefix("notifications")
                    module(TunezWeb.Endpoint)
                    publish(:create, [:user_id])
                end

            Debugging Pubsub Publishing
                Once that is done there is a way to get better logs for these actions. Head to config/dev.exs
                    config :ash, :pub_sub, debug?: true

                iex(1)> Ash.Changeset.for_action(Tunez.Accounts.Notification, :create,
                %{user_id: «user_uuid», album_id: «album_uuid»})
                |> Ash.create!(authorize?: false)
                INSERT INTO "notifications" ...
                [debug] Broadcasting to topics ["notifications:«user_uuid»"] via
                TunezWeb.Endpoint.broadcast
                Notification:
                %Ash.Notifier.Notification{resource: Tunez.Accounts.Notification, domain:
                Tunez.Accounts, action: %Ash.Resource.Actions.Create{name: :create,
                primary?: true, description: nil, error_handler: nil, accept: ...
                Ash has built an Ash.Notifier.Notification struct (not to be confused with a
                Tunez.Accounts.Notification!), and that’s what will be sent out in the broadcast.
                If we try to generate pubsub messages in iex by creating a new album for an
                artist that has at least one follower, though, we won’t see the pubsub debug
                message printed:
                iex(2)> Tunez.Music.create_album!(%{artist_id: «artist_uuid»,
                name: "New Album", year_released: 2022}, actor: user)
                INSERT INTO "albums" ("id","name","inserted_at","updated_at", ...
                «SELECT query to load the artist followers»
                INSERT INTO "notifications" ("id","album_id","inserted_at", ...
                %Tunez.Music.Album{...}

                "Putting Our Detective Caps On"
                    There is an issue with this as the bulk_action isn't creating the notification.
                    ex(3)> Ash.bulk_create([%{user_id: «user_uuid», album_id: «album_id»}],
                    Tunez.Accounts.Notification, :create, authorize?: false)
                    INSERT INTO "notifications" ("id","album_id","inserted_at","user_id") ...
                    %Ash.BulkResult{notifications: nil, ...}

                    We don’t! Notifications aren’t generated by default for bulk actions, just like
                    records aren’t returned, also for performance reasons. To configure a bulk
                    action to generate and auto-send any notifications, you can use the notify? true
                    option of Ash.bulk_create.15
                    iex(4)> Ash.bulk_create([%{user_id: «user_uuid», album_id: «album_id»}],
                    Tunez.Accounts.Notification, :create, authorize?: false, notify?: true)➤
                    INSERT INTO "notifications" ("id","user_id","album_id","inserted_at") ...
                    [debug] Broadcasting to topics ["notifications:«user_uuid»"] via
                    TunezWeb.Endpoint.broadcast
                    Notification:
                    %Ash.Notifier.Notification{resource: Tunez.Accounts.Notification, ...}
                    %Ash.BulkResult{...}

                    Now we can head to lib/tunez/accounts/changes/send_new_album_notifications.ex
                        def change(changeset, _opts, _context) do
                            changeset
                            |> Ash.Changeset.after_action(fn _changeset, album ->
                            Tunez.Music.followers_for_artist!(album.artist_id, stream?: true)
                            |> Stream.map(fn %{follower_id: follower_id} ->
                                %{album_id: album.id, user_id: follower_id}
                            end)
                            |> Ash.bulk_create!(Tunez.Accounts.Notification, :create, authorize?: false, notify?: true)

                            {:ok, album}
                            end)
                        end

                    That will make sure that we have the notification send out with this action.

                "Limiting Data Sent Within Notifications"
                    We need to make sure that we only send out the right information and be sure that we stop people from seeing information that they shouldn't see. lib/tunez/accounts/notification.ex
                        transform fn notification ->
                            Map.take(notification.data, [:id, :user_id, :album_id])
                        end

        Setting Up the Subscribe Mechanism
            This is super easy as LiveView is all setup to listen and react we have all the sends taken care of. lib/tunez_web/live/notification_live.ex
                def mount(_params, _session, socket) do
                    notifications = Tunez.Accounts.notifications_for_user!(actor: socket.assigns.current_user)

                    if connected?(socket) do
                    "notifications:#{socket.assigns.current_user.id}"
                    |> TunezWeb.Endpoint.subscribe()
                    end

                    {:ok, assign(socket, notifications: notifications)}
                end

            Now that we have the message sent we need to deal with the listen part with a handle_info in the same file.
                def handle_info(%{topic: "notifications:" <> _}, socket) do
                    notifications = Tunez.Accounts.notifications_for_user!(actor: socket.assigns.current_user)
                    {:noreply, assign(socket, notifications: notifications)}
                end

            Deleting Notifications
                Now we might need to delete messages that we have sent out. 1) Like when you are looking at the same thing on 2 computers, 2) or when you delete an album and need to pull back the messages. lib/tunez/accounts/notification.ex
                    pub_sub do
                        ...

                        publish(:create, [:user_id])
                        publish(:destroy, [:user_id])
                    end

                Now we need to clean up the dismiss-notification event. lib/tunez_web/live/notification_live.ex
                    def handle_event("dismiss-notification", %{"id" => id}, socket) do
                        notification = Enum.find(socket.assigns.notifications, &(&1.id == id))

                        Tunez.Accounts.dismiss_notification(
                        notification,
                        actor: socket.assigns.current_user
                        )

                        notifications = Enum.reject(socket.assigns.notifications, &(&1.id == id))
                        {:noreply, socket}
                    end            

                "Cascading Deletes in Code"
                    Now we need to deal with the issue of the app knowing what is happening. Up till now we have done everything in the database we want to move it to the app. lib/tunez/accounts/notification.ex
                        references do
                            reference(:user, index?: true, on_delete: :delete)
                            reference(:album)
                        end

                    This is a change to the database so we will need to setup a migration and codegen.
                        mix ash.codegen remove_notification_album_cascade_delete
                        mix ash.migrate

                    Now we need to manually delete the notifications lib/tunez/music/album.ex
                        relationships do
                            ...

                            has_many :notifications, Tunez.Accounts.Notification
                        end

                    Then we need to set up the destroy action.
                        actions do
                            defaults([:read])

                            destroy :destroy do
                                primary?(true)

                                change(
                                cascade_destroy(:notifications,
                                    return_notifications?: true,
                                    after_action?: false
                                )
                                )
                            end
                        end

                    This utilize the cascading delete. Now we need to read the related notifications so let's set that up. lib/tunez/accounts/notification.ex
                        actions do
                            defaults([:read, :destroy])
                            ...
                        end

                    Now the Policy lib/tunez/music/album.ex
                        policies do
                            bypass actor_attribute_equals(:role, :admin) do
                                authorize_if(always())
                            end

                            ...

                            policy action_type([:update, :destroy]) do
                                authorize_if(expr(^actor(:role) == :editor and created_by_id == ^actor(:id)))
                            end
                        end
                "Calculations: Not Just for User-Facing Data"
                    We can even leverage the calculation to deal with data that we need for a policy or even a function behind the scenes. lib/tunez/music/album.ex
                        calculations do
                            ...

                            calculate(
                                :can_manage_album?,
                                :boolean,
                                expr(
                                    ^actor(:role) == :admin or
                                    (^actor(:role) == :editor and created_by_id == ^actor(:id))
                                )
                            )
                        end

                    Now the policy for the actions it can now be done in one function call.
                        policy action_type([:update, :destroy]) do
                            authorize_if(expr(can_manage_album?))
                        end

                    Now we can move on to the notification resource and utilize the new calculation. lib/tunez/accounts/notification.ex
                        policies do
                            policy action(:read) do
                                authorize_if(expr(album.can_manage_album?))
                            end
                            ...
                            policy action(:destroy) do
                                authorize_if(expr(album.can_manage_album?))
                                authorize_if(relates_to_actor_via(:user))
                            end
                        end

                    This is why we couldn't do the cascade delete as we deleted the album then the notification and it needs to have the authorization from the album we just deleted.

                "I Have Some Bad News for You, Though..."
                    Okay so now that we have the album done what about Artists that have Albums that have Notifications...

                    SO we have the cascade option of setting up the entire thing to deal with Artists. We will opt for the cascade_destroy option here. lib/tunez/music/album.ex
                        postgres do
                            ...

                            references do
                                reference(:artist, index?: true)
                            end
                        end

                    Now we need to replace the default destroy with a cascade_destroy. lib/tunez/music/artist.ex
                        actions do
                            defaults([:create, :read])
                            
                            ...
                            
                            destroy :destroy do
                                primary?(true)

                                change(
                                    cascade_destroy(::albums,
                                    return_notifications?: true,
                                    after_action?: false
                                    ) )
                            end
                        end

                    Now we need to generate and run the migration to update the database.
                        mix ash.codegen remove_album_artist_cascade_delete
                        mix ash.migrate

    We Need to Talk About Atomics
        This is the idea that you don't want to show the follower count (current) + 1. You want to pull the follower count (in the DB) then add 1. 

        What Does This Mean for Tunez?
            Let's deal with the MinutesToSeconds we can make this use atomic very simply.
                defmodule Tunez.Music.Changes.MinutesToSeconds do
                    ...

                    @impl true
                    def atomic(changeset, opts, context) do
                        {:ok, change(changeset, opts, context)}
                    end
                end

        What if We Really Wanted to Store the Follower Count, Though?
            There is a way to leverage the atomic_update to update a field if we are storing it in the db. This is built in and uses atomic logic to deal with the update.
                update :follow do
                    change atomic_update(:follower_count, expr(follower_count + 1))
                end

            For more complex logic there is the atomic/3 callback. 
                @impl true
                def atomic(_changeset, _opts, _context) do
                    {:atomic, %{follower_count: expr(follower_count + 1)}}
                end
            
            Both these work similar.

        Rewriting UpdatePreviousNames to be Atomic
            lib/tunez/music/changes/update_previous_names.ex
                defmodule Tunez.Music.Changes.UpdatePreviousNames do
                    use Ash.Resource.Change

                    @impl true
                    def change(changeset, _opts, _context) do
                        Ash.Changeset.before_action(changeset, fn changeset ->
                        new_name = Ash.Changeset.get_attribute(changeset, :name)
                        previous_name = Map.get(changeset.data, :name)
                        previous_names = Map.get(changeset.data, :previous_names) || []

                        names =
                            [previous_name | previous_names]
                            |> Enum.uniq()
                            |> Enum.reject(&(&1 == new_name))

                        Ash.Changeset.change_attribute(changeset, :previous_names, names)
                        end)
                    end

                    @impl true
                    def atomic(_changeset, _opts, _context) do
                        {:atomic,
                        %{
                        previous_names:
                            {:atomic,
                            expr(
                                fragment(
                                "array_remove(array_prepend(?, ?), ?)",
                                name,
                                previous_names,
                                ^atomic_ref(:name)
                                )
                            )}
                        }}
                    end
                end

            lib/tunez/music/artist.ex
                update :update do
                    accept [:name, :biography]
                    change Tunez.Music.Changes.UpdatePreviousNames
                end

    Wrapping Everything Up
        You did it!!! It's all done think about an other project and Build Build Build!!!