Home Posts Post Search Tag Search

Weather App 08 - BME280 (Humidity, Temperature, Pressure)
Published on: 2025-08-20 Tags: elixir, Blog, Side Project, Libraries, Nerves, Weather App, Poncho
BME280 (Humidity, Temperature, Air Pressure)
    Don't forget to add the {:circuits_i2c, "~> 1.1"} to the BME280 project and then add the :Bme280 to the sensor_but mix.exs

    config.ex
        defstruct mode: :normal,
                    osrs_t: :osrs_2x,
                    osrs_p: :osrs_16x,
                    osrs_h: :osrs_1x,
                    standby_time: :standby_0_5_ms,
                    filter: :filter_16,
                    spi3w_en: false

        def new, do: struct(__MODULE__)

        def new(opts) do
            struct(__MODULE__, opts)
        end

        @doc """
        This will be responsible for the: Humidity oversampling.
        """
        def to_ctrl_hum_byte(%__MODULE__{osrs_h: osrs_h}) do
            osrs_h_bit = osrs_h_to_bit(osrs_h)
            0b00000000 ||| osrs_h_bit
        end

        @doc """
        This will be responsible for the: Pressure oversampling, Temperature oversampling and
        the Mode that the sensor is currently operating in.
        """
        def to_ctrl_meas_byte(%__MODULE__{osrs_t: osrs_t, osrs_p: osrs_p, mode: mode}) do
            osrs_t_bit = osrs_t_to_bit(osrs_t)
            osrs_p_bit = osrs_p_to_bit(osrs_p)
            mode_bit = mode_to_bit(mode)
            0b00000000 ||| osrs_t_bit <<< 5 ||| osrs_p_bit <<< 2 ||| mode_bit
        end

        @doc """
        This will be responsable for the: filtering, standby time and SPI 3-wire mode.
        """
        def to_config_byte(%__MODULE__{standby_time: standby_time, filter: filter, spi3w_en: spi3w_en}) do
            standby_time_bit = standby_time_to_bit(standby_time)
            filter_bit = filter_to_bit(filter)
            spi3w_en_bit = spi3w_en_bit(spi3w_en)

            0b00000000 ||| standby_time_bit <<< 5 ||| filter_bit <<< 2 ||| spi3w_en_bit
        end

        @spec integration_ms(%__MODULE__{}) :: non_neg_integer()
        def integration_ms(%__MODULE__{
                mode: mode,
                osrs_t: osrs_t,
                osrs_p: osrs_p,
                osrs_h: osrs_h,
                standby_time: standby_time
            }) do
            # Convert oversampling atoms to approximate times in ms
            t = osrs_to_ms(osrs_t)
            p = osrs_to_ms(osrs_p)
            h = osrs_to_ms(osrs_h)

            # Base measurement overhead (ms)
            base = 1

            # Standby time (ms)
            standby_ms =
            case mode do
                :normal -> standby_time_to_ms(standby_time)
                _ -> 0
            end

            # Total integration time
            total = t + p + h + base + standby_ms

            # Always round up to the nearest integer
            max(1, round(total))
        end

    comm.ex
        @ctrl_hum 0xF2
        @ctrl_meas 0xF4
        @config_register 0xF5
        @data_register 0xF7

        def write_config(config, i2c, sensor) do
            ctrl_hum_byte = Config.to_ctrl_hum_byte(config)
            ctrl_meas_byte = Config.to_ctrl_meas_byte(config)
            config_byte = Config.to_config_byte(config)

            # Write CTRL_HUM register (1 byte)
            I2C.write(i2c, sensor, <<@ctrl_hum, ctrl_hum_byte>>)

            :timer.sleep(100)  # Small delay to ensure the sensor is ready

            # Write CTRL_MEAS register (1 byte)
            I2C.write(i2c, sensor, <<@ctrl_meas, ctrl_meas_byte>>)

            # Write CONFIG register (1 byte)
            I2C.write(i2c, sensor, <<@config_register, config_byte>>)
        end

        def read(i2c, sensor) do
            <<press_msb, press_lsb, press_xlsb, temp_msb, temp_lsb, temp_xlsb, hum_msb, hum_lsb>> =
            I2C.write_read!(i2c, sensor, <<@data_register>>, 8)

            combined_pressure = (press_msb <<< 12) ||| (press_lsb <<< 4) ||| (press_xlsb >>> 4)
            combined_temperature = (temp_msb <<< 12) ||| (temp_lsb <<< 4) ||| (temp_xlsb >>> 4)
            combined_humidity = (hum_msb <<< 8) ||| hum_lsb

            {combined_pressure, combined_temperature, combined_humidity}
        end

    bme280.ex

    calibration.ex
        alias Bme280.Calibration

        @spec convert({integer, integer, integer}, Calibration.t()) ::
                %{temperature_c: float, pressure_pa: float, humidity_rh: float}
        def convert({adc_P, adc_T, adc_H}, calib) do
            {temp_c, t_fine} = compensate_temp(adc_T, calib)
            press_pa = compensate_press(adc_P, t_fine, calib)
            hum_pct = compensate_hum(adc_H, t_fine, calib)

            %{
            temperature_c: temp_c,
            pressure_pa: press_pa,
            humidity_rh: hum_pct
            }
        end

        def compensate_temp(adc_T, calib) do
            var1 =
            (adc_T / 16384.0 - calib.dig_T1 / 1024.0) * calib.dig_T2

            var2 =
            (adc_T / 131_072.0 - calib.dig_T1 / 8192.0) *
                (adc_T / 131_072.0 - calib.dig_T1 / 8192.0) *
                calib.dig_T3

            t_fine = var1 + var2
            temperature = t_fine / 5120.0
            {temperature, t_fine}
        end

        def compensate_press(adc_P, t_fine, calib) do
            var1 = t_fine / 2.0 - 64000.0
            var2 = var1 * var1 * calib.dig_P6 / 32768.0
            var2 = var2 + var1 * calib.dig_P5 * 2.0
            var2 = var2 / 4.0 + calib.dig_P4 * 65536.0
            var3 = calib.dig_P3 * var1 * var1 / 524_288.0
            var1 = (var3 + calib.dig_P2 * var1) / 524_288.0
            var1 = (1.0 + var1 / 32768.0) * calib.dig_P1

            if var1 == 0 do
            0
            else
            p = 1_048_576.0 - adc_P
            p = (p - var2 / 4096.0) * 6250.0 / var1
            var1 = calib.dig_P9 * p * p / 2_147_483_648.0
            var2 = p * calib.dig_P8 / 32768.0
            p + (var1 + var2 + calib.dig_P7) / 16.0
            end
        end

        def compensate_hum(adc_H, t_fine, calib) do
            h = t_fine - 76800.0

            h =
            (adc_H - (calib.dig_H4 * 64.0 + calib.dig_H5 / 16384.0 * h)) *
                (calib.dig_H2 / 65536.0) *
                (1.0 +
                calib.dig_H6 / 67_108_864.0 * h *
                    (1.0 + calib.dig_H3 / 67_108_864.0 * h))

            cond do
            h > 100.0 -> 100.0
            h < 0.0 -> 0.0
            true -> h
            end
        end
    converter.ex
        defstruct [
            :dig_T1,
            :dig_T2,
            :dig_T3,
            :dig_P1,
            :dig_P2,
            :dig_P3,
            :dig_P4,
            :dig_P5,
            :dig_P6,
            :dig_P7,
            :dig_P8,
            :dig_P9,
            :dig_H1,
            :dig_H2,
            :dig_H3,
            :dig_H4,
            :dig_H5,
            :dig_H6
        ]

        @temp_press_start 0x88
        # 0x88..0xA1
        @temp_press_len 26
        @hum_start 0xE1
        # 0xE1..0xE7
        @hum_len 7

        def read_all(i2c, sensor) do
            # Read temperature & pressure calibration (0x88..0xA0 = 25 bytes)
            t_p_data =
            I2C.write_read!(i2c, sensor, <<@temp_press_start>>, @temp_press_len)
            |> ensure_binary()

            <<
            dig_T1_lsb,
            dig_T1_msb,
            dig_T2_lsb,
            dig_T2_msb,
            dig_T3_lsb,
            dig_T3_msb,
            dig_P1_lsb,
            dig_P1_msb,
            dig_P2_lsb,
            dig_P2_msb,
            dig_P3_lsb,
            dig_P3_msb,
            dig_P4_lsb,
            dig_P4_msb,
            dig_P5_lsb,
            dig_P5_msb,
            dig_P6_lsb,
            dig_P6_msb,
            dig_P7_lsb,
            dig_P7_msb,
            dig_P8_lsb,
            dig_P8_msb,
            dig_P9_lsb,
            dig_P9_msb,
            _reserved,
            dig_H1
            >> = t_p_data

            # Read humidity calibration (0xE1..0xE7 = 7 bytes)
            h_data =
            I2C.write_read!(i2c, sensor, <<@hum_start>>, @hum_len)
            |> ensure_binary()

            <<dig_H2_lsb, dig_H2_msb, dig_H3, h4_lsb, h4_msb_bits, h5_msb_bits, dig_H6>> = h_data

            %__MODULE__{
            dig_T1: dig_T1_msb <<< 8 ||| dig_T1_lsb,
            dig_T2: signed(dig_T2_msb <<< 8 ||| dig_T2_lsb),
            dig_T3: signed(dig_T3_msb <<< 8 ||| dig_T3_lsb),
            dig_P1: dig_P1_msb <<< 8 ||| dig_P1_lsb,
            dig_P2: signed(dig_P2_msb <<< 8 ||| dig_P2_lsb),
            dig_P3: signed(dig_P3_msb <<< 8 ||| dig_P3_lsb),
            dig_P4: signed(dig_P4_msb <<< 8 ||| dig_P4_lsb),
            dig_P5: signed(dig_P5_msb <<< 8 ||| dig_P5_lsb),
            dig_P6: signed(dig_P6_msb <<< 8 ||| dig_P6_lsb),
            dig_P7: signed(dig_P7_msb <<< 8 ||| dig_P7_lsb),
            dig_P8: signed(dig_P8_msb <<< 8 ||| dig_P8_lsb),
            dig_P9: signed(dig_P9_msb <<< 8 ||| dig_P9_lsb),
            dig_H1: dig_H1,
            dig_H2: signed(dig_H2_msb <<< 8 ||| dig_H2_lsb),
            dig_H3: dig_H3,
            dig_H4: signed(h4_lsb <<< 4 ||| (h4_msb_bits &&& 0x0F)),
            dig_H5: signed((h5_msb_bits &&& 0xF0) >>> 4 ||| h5_msb_bits <<< 4),
            dig_H6: signed(dig_H6)
            }
        end

        defp signed(value) when value > 0x7FFF, do: value - 0x10000
        defp signed(value), do: value

        # Convert list to binary if needed
        defp ensure_binary(data) when is_list(data), do: :erlang.list_to_binary(data)
        defp ensure_binary(data) when is_binary(data), do: data

    export MIX_TARGET=rpi3a
    ./upload.sh 192.168.x.x
    ssh 192.168.x.x


    If you want to just test the comm.ex and config.ex
    alias Bme280.Comm
    alias Bme280.Config

    bus_name = "i2c-1"          # or whatever your I²C bus is
    {_bus, sensor} = Comm.discover()     # returns 0x76 (the sensor address)

    i2c = Comm.open(bus_name)    # open the bus
    config = Config.new()        # default config

    Comm.write_config(config, i2c, sensor)  # write config
    Comm.read(i2c, sensor)                  # read raw values

    if you want to test the GenServer on Bme280.ex
    Bme280.start_link()
    Bme280.get_measurement()