Home Posts Tags Post Search Tag Search

Post 62

Weather App 09 - SGP40 (VOC)

Published on: 2025-08-20 Tags: elixir, Blog, Side Project, Libraries, Phoenix, Weather App
So for this sensor we have a slightly different way of doing things. Once the sensor is initialized and sends but proper tests. It will continue to get measurements within its normal retest time. You will start the sensor with the code below and with the correct address. One thing to keep in mind is that at the moment I have it set with default values for Temperature and Humidity values so that I can get RELATIVE values. Once you have the other sensors setup you will need to ask for the temp and humidity values so that you can send them with the measure so that it will give the most accurate values.

            @impl true
            def init(%{address: address, i2c_bus_name: bus_name} = args) do
                i2c = Comm.open(bus_name)

                config =
                args
                |> Map.take([:temperature, :humidity])
                |> Config.new()

                Comm.initialize_sensor(i2c, address)

                # Standard time is ~30ms want to give it some more time.
                Process.send_after(self(), :measure, 100)

                state = %{
                i2c: i2c,
                address: address,
                config: config,
                last_reading: :no_reading,
                last_raw_reading: :no_reading
                }

                {:ok, state}
            end

            def init(args) do
                # Hardcoded bus and address
                bus_name = "i2c-1"
                address = 0x59
                transport = "bus: #{bus_name}, address: #{address}"
                Logger.info("Starting SGP40. Please specify an address and a bus.")
                Logger.info("Starting on " <> transport)

                defaults =
                args
                |> Map.put(:address, address)
                |> Map.put(:i2c_bus_name, bus_name)

                init(defaults)
            end

            @impl true
            def handle_info(:measure, %{i2c: i2c, address: address} = state) do
                case Comm.measure(i2c, address, state.config) do
                {:ok, raw} ->
                    Logger.info("SGP40 measurement taken: #{inspect(raw)}")
                    last_reading = CrcHelper.decode_voc(raw)
                    Process.send_after(self(), :measure, 50)
                    {:noreply, %{state | last_reading: last_reading, last_raw_reading: raw}}

                {:error, reason} ->
                    Logger.error("SGP40 measurement failed: #{inspect(reason)}")
                    Process.send_after(self(), :measure, 50)
                    {:noreply, state}
                end
            end

            @impl true
            def terminate(reason, state) do
                Logger.info("SGP40 GenServer terminating. Reason: #{inspect(reason)}")

                # Clean up I2C bus if it's open
                case state do
                %{i2c: i2c} when not is_nil(i2c) ->
                    Comm.close(i2c)
                    Logger.info("Closed I2C connection.")

                _ ->
                    Logger.warning("No I2C connection found to close.")
                end

                :ok
            end

            @impl true
            def handle_call(:get_measurement, _from, %{last_reading: last_reading} = state) do
                {:reply, last_reading, state}
            end
        
        crc_helper.ex
            @polynomial 0x31
            @init 0xFF

            @doc """
            Calculate the CRC-8 of a binary or list of bytes
            """
            def crc8(data) when is_binary(data), do: crc8(:binary.bin_to_list(data))

            def crc8(data) when is_list(data) do
                Enum.reduce(data, @init, fn byte, crc ->
                crc_byte(Bitwise.^^^(crc, byte))
                end)
            end

            # Process one byte through CRC
            defp crc_byte(byte) do
                Enum.reduce(0..7, byte, fn _, crc ->
                if (crc &&& 0x80) != 0 do
                    Bitwise.^^^(crc <<< 1, @polynomial) &&& 0xFF
                else
                    crc <<< 1 &&& 0xFF
                end
                end)
            end

            @doc """
            Encode a 2-byte word + CRC as required by the sensor
            Expects {msb, lsb} tuple
            """
            def encode_with_crc({msb, lsb}) do
                data = <<msb, lsb>>
                <<msb, lsb, crc8(data)>>
            end

            @doc """
            Convert raw SGP40 measurement (6 bytes) to VOC value in ppb.
            Expects <<msb, lsb, crc, _, _, _>> from get_measurement/0
            """
            def decode_voc(<<msb, lsb, crc, _, _, _>>) do
                if crc != crc8(<<msb, lsb>>) do
                {:error, :crc_mismatch}
                else
                raw = msb <<< 8 ||| lsb
                # Datasheet conversion: VOC index (0..500) ≈ raw / 65535 * 500
                voc_ppb = raw * 500 / 65_535
                {:ok, voc_ppb}
                end
            end

    If you want to just test the comm.ex and config.ex
        alias Sgp40.Comm
        alias Sgp40.Config

        bus_name = "i2c-1"          # or whatever your I²C bus is
        {_bus, sensor} = Comm.discover()     # returns 0x59 (the sensor address)

        i2c = Comm.open(bus_name)    # open the bus
        config = Config.new()        # default config

        Comm.write_config(config, i2c, sensor)  # write config
        Comm.read(i2c, sensor)                  # read raw values

    if you want to test the GenServer on Sgp40.ex
        Sgp40.start_link()
        Sgp40.get_measurement()