Home Posts Tags Post Search Tag Search

Post 61

Weather App 08 - BME280 (Humidity, Temperature, Pressure)

Published on: 2025-08-20 Tags: elixir, Blog, Side Project, Libraries, Nerves, Weather App, Poncho
    BME280 (Humidity, Temperature, Air Pressure)
        Don't forget to add the {:circuits_i2c, "~> 1.1"} to the BME280 project and then add the :Bme280 to the sensor_but mix.exs

        config.ex
            defstruct mode: :normal,
                        osrs_t: :osrs_2x,
                        osrs_p: :osrs_16x,
                        osrs_h: :osrs_1x,
                        standby_time: :standby_0_5_ms,
                        filter: :filter_16,
                        spi3w_en: false

            def new, do: struct(__MODULE__)

            def new(opts) do
                struct(__MODULE__, opts)
            end

            @doc """
            This will be responsible for the: Humidity oversampling.
            """
            def to_ctrl_hum_byte(%__MODULE__{osrs_h: osrs_h}) do
                osrs_h_bit = osrs_h_to_bit(osrs_h)
                0b00000000 ||| osrs_h_bit
            end

            @doc """
            This will be responsible for the: Pressure oversampling, Temperature oversampling and
            the Mode that the sensor is currently operating in.
            """
            def to_ctrl_meas_byte(%__MODULE__{osrs_t: osrs_t, osrs_p: osrs_p, mode: mode}) do
                osrs_t_bit = osrs_t_to_bit(osrs_t)
                osrs_p_bit = osrs_p_to_bit(osrs_p)
                mode_bit = mode_to_bit(mode)
                0b00000000 ||| osrs_t_bit <<< 5 ||| osrs_p_bit <<< 2 ||| mode_bit
            end

            @doc """
            This will be responsable for the: filtering, standby time and SPI 3-wire mode.
            """
            def to_config_byte(%__MODULE__{standby_time: standby_time, filter: filter, spi3w_en: spi3w_en}) do
                standby_time_bit = standby_time_to_bit(standby_time)
                filter_bit = filter_to_bit(filter)
                spi3w_en_bit = spi3w_en_bit(spi3w_en)

                0b00000000 ||| standby_time_bit <<< 5 ||| filter_bit <<< 2 ||| spi3w_en_bit
            end

            @spec integration_ms(%__MODULE__{}) :: non_neg_integer()
            def integration_ms(%__MODULE__{
                    mode: mode,
                    osrs_t: osrs_t,
                    osrs_p: osrs_p,
                    osrs_h: osrs_h,
                    standby_time: standby_time
                }) do
                # Convert oversampling atoms to approximate times in ms
                t = osrs_to_ms(osrs_t)
                p = osrs_to_ms(osrs_p)
                h = osrs_to_ms(osrs_h)

                # Base measurement overhead (ms)
                base = 1

                # Standby time (ms)
                standby_ms =
                case mode do
                    :normal -> standby_time_to_ms(standby_time)
                    _ -> 0
                end

                # Total integration time
                total = t + p + h + base + standby_ms

                # Always round up to the nearest integer
                max(1, round(total))
            end

        comm.ex
            @ctrl_hum 0xF2
            @ctrl_meas 0xF4
            @config_register 0xF5
            @data_register 0xF7

            def write_config(config, i2c, sensor) do
                ctrl_hum_byte = Config.to_ctrl_hum_byte(config)
                ctrl_meas_byte = Config.to_ctrl_meas_byte(config)
                config_byte = Config.to_config_byte(config)

                # Write CTRL_HUM register (1 byte)
                I2C.write(i2c, sensor, <<@ctrl_hum, ctrl_hum_byte>>)

                :timer.sleep(100)  # Small delay to ensure the sensor is ready

                # Write CTRL_MEAS register (1 byte)
                I2C.write(i2c, sensor, <<@ctrl_meas, ctrl_meas_byte>>)

                # Write CONFIG register (1 byte)
                I2C.write(i2c, sensor, <<@config_register, config_byte>>)
            end

            def read(i2c, sensor) do
                <<press_msb, press_lsb, press_xlsb, temp_msb, temp_lsb, temp_xlsb, hum_msb, hum_lsb>> =
                I2C.write_read!(i2c, sensor, <<@data_register>>, 8)

                combined_pressure = (press_msb <<< 12) ||| (press_lsb <<< 4) ||| (press_xlsb >>> 4)
                combined_temperature = (temp_msb <<< 12) ||| (temp_lsb <<< 4) ||| (temp_xlsb >>> 4)
                combined_humidity = (hum_msb <<< 8) ||| hum_lsb

                {combined_pressure, combined_temperature, combined_humidity}
            end

        bme280.ex

        calibration.ex
            alias Bme280.Calibration

            @spec convert({integer, integer, integer}, Calibration.t()) ::
                    %{temperature_c: float, pressure_pa: float, humidity_rh: float}
            def convert({adc_P, adc_T, adc_H}, calib) do
                {temp_c, t_fine} = compensate_temp(adc_T, calib)
                press_pa = compensate_press(adc_P, t_fine, calib)
                hum_pct = compensate_hum(adc_H, t_fine, calib)

                %{
                temperature_c: temp_c,
                pressure_pa: press_pa,
                humidity_rh: hum_pct
                }
            end

            def compensate_temp(adc_T, calib) do
                var1 =
                (adc_T / 16384.0 - calib.dig_T1 / 1024.0) * calib.dig_T2

                var2 =
                (adc_T / 131_072.0 - calib.dig_T1 / 8192.0) *
                    (adc_T / 131_072.0 - calib.dig_T1 / 8192.0) *
                    calib.dig_T3

                t_fine = var1 + var2
                temperature = t_fine / 5120.0
                {temperature, t_fine}
            end

            def compensate_press(adc_P, t_fine, calib) do
                var1 = t_fine / 2.0 - 64000.0
                var2 = var1 * var1 * calib.dig_P6 / 32768.0
                var2 = var2 + var1 * calib.dig_P5 * 2.0
                var2 = var2 / 4.0 + calib.dig_P4 * 65536.0
                var3 = calib.dig_P3 * var1 * var1 / 524_288.0
                var1 = (var3 + calib.dig_P2 * var1) / 524_288.0
                var1 = (1.0 + var1 / 32768.0) * calib.dig_P1

                if var1 == 0 do
                0
                else
                p = 1_048_576.0 - adc_P
                p = (p - var2 / 4096.0) * 6250.0 / var1
                var1 = calib.dig_P9 * p * p / 2_147_483_648.0
                var2 = p * calib.dig_P8 / 32768.0
                p + (var1 + var2 + calib.dig_P7) / 16.0
                end
            end

            def compensate_hum(adc_H, t_fine, calib) do
                h = t_fine - 76800.0

                h =
                (adc_H - (calib.dig_H4 * 64.0 + calib.dig_H5 / 16384.0 * h)) *
                    (calib.dig_H2 / 65536.0) *
                    (1.0 +
                    calib.dig_H6 / 67_108_864.0 * h *
                        (1.0 + calib.dig_H3 / 67_108_864.0 * h))

                cond do
                h > 100.0 -> 100.0
                h < 0.0 -> 0.0
                true -> h
                end
            end
        converter.ex
            defstruct [
                :dig_T1,
                :dig_T2,
                :dig_T3,
                :dig_P1,
                :dig_P2,
                :dig_P3,
                :dig_P4,
                :dig_P5,
                :dig_P6,
                :dig_P7,
                :dig_P8,
                :dig_P9,
                :dig_H1,
                :dig_H2,
                :dig_H3,
                :dig_H4,
                :dig_H5,
                :dig_H6
            ]

            @temp_press_start 0x88
            # 0x88..0xA1
            @temp_press_len 26
            @hum_start 0xE1
            # 0xE1..0xE7
            @hum_len 7

            def read_all(i2c, sensor) do
                # Read temperature & pressure calibration (0x88..0xA0 = 25 bytes)
                t_p_data =
                I2C.write_read!(i2c, sensor, <<@temp_press_start>>, @temp_press_len)
                |> ensure_binary()

                <<
                dig_T1_lsb,
                dig_T1_msb,
                dig_T2_lsb,
                dig_T2_msb,
                dig_T3_lsb,
                dig_T3_msb,
                dig_P1_lsb,
                dig_P1_msb,
                dig_P2_lsb,
                dig_P2_msb,
                dig_P3_lsb,
                dig_P3_msb,
                dig_P4_lsb,
                dig_P4_msb,
                dig_P5_lsb,
                dig_P5_msb,
                dig_P6_lsb,
                dig_P6_msb,
                dig_P7_lsb,
                dig_P7_msb,
                dig_P8_lsb,
                dig_P8_msb,
                dig_P9_lsb,
                dig_P9_msb,
                _reserved,
                dig_H1
                >> = t_p_data

                # Read humidity calibration (0xE1..0xE7 = 7 bytes)
                h_data =
                I2C.write_read!(i2c, sensor, <<@hum_start>>, @hum_len)
                |> ensure_binary()

                <<dig_H2_lsb, dig_H2_msb, dig_H3, h4_lsb, h4_msb_bits, h5_msb_bits, dig_H6>> = h_data

                %__MODULE__{
                dig_T1: dig_T1_msb <<< 8 ||| dig_T1_lsb,
                dig_T2: signed(dig_T2_msb <<< 8 ||| dig_T2_lsb),
                dig_T3: signed(dig_T3_msb <<< 8 ||| dig_T3_lsb),
                dig_P1: dig_P1_msb <<< 8 ||| dig_P1_lsb,
                dig_P2: signed(dig_P2_msb <<< 8 ||| dig_P2_lsb),
                dig_P3: signed(dig_P3_msb <<< 8 ||| dig_P3_lsb),
                dig_P4: signed(dig_P4_msb <<< 8 ||| dig_P4_lsb),
                dig_P5: signed(dig_P5_msb <<< 8 ||| dig_P5_lsb),
                dig_P6: signed(dig_P6_msb <<< 8 ||| dig_P6_lsb),
                dig_P7: signed(dig_P7_msb <<< 8 ||| dig_P7_lsb),
                dig_P8: signed(dig_P8_msb <<< 8 ||| dig_P8_lsb),
                dig_P9: signed(dig_P9_msb <<< 8 ||| dig_P9_lsb),
                dig_H1: dig_H1,
                dig_H2: signed(dig_H2_msb <<< 8 ||| dig_H2_lsb),
                dig_H3: dig_H3,
                dig_H4: signed(h4_lsb <<< 4 ||| (h4_msb_bits &&& 0x0F)),
                dig_H5: signed((h5_msb_bits &&& 0xF0) >>> 4 ||| h5_msb_bits <<< 4),
                dig_H6: signed(dig_H6)
                }
            end

            defp signed(value) when value > 0x7FFF, do: value - 0x10000
            defp signed(value), do: value

            # Convert list to binary if needed
            defp ensure_binary(data) when is_list(data), do: :erlang.list_to_binary(data)
            defp ensure_binary(data) when is_binary(data), do: data
            
        export MIX_TARGET=rpi3a
        ./upload.sh 192.168.x.x
        ssh 192.168.x.x


        If you want to just test the comm.ex and config.ex
        alias Bme280.Comm
        alias Bme280.Config

        bus_name = "i2c-1"          # or whatever your I²C bus is
        {_bus, sensor} = Comm.discover()     # returns 0x76 (the sensor address)

        i2c = Comm.open(bus_name)    # open the bus
        config = Config.new()        # default config

        Comm.write_config(config, i2c, sensor)  # write config
        Comm.read(i2c, sensor)                  # read raw values

        if you want to test the GenServer on Bme280.ex
        Bme280.start_link()
        Bme280.get_measurement()