We can't find the internet
Attempting to reconnect
Something went wrong!
Hang in there while we get back on track
Post 62
Weather App 09 - SGP40 (VOC)
Published on: 2025-08-20
Tags:
elixir, Blog, Side Project, Libraries, Phoenix, Weather App
So for this sensor we have a slightly different way of doing things. Once the sensor is initialized and sends but proper tests. It will continue to get measurements within its normal retest time. You will start the sensor with the code below and with the correct address. One thing to keep in mind is that at the moment I have it set with default values for Temperature and Humidity values so that I can get RELATIVE values. Once you have the other sensors setup you will need to ask for the temp and humidity values so that you can send them with the measure so that it will give the most accurate values.
@impl true
def init(%{address: address, i2c_bus_name: bus_name} = args) do
i2c = Comm.open(bus_name)
config =
args
|> Map.take([:temperature, :humidity])
|> Config.new()
Comm.initialize_sensor(i2c, address)
# Standard time is ~30ms want to give it some more time.
Process.send_after(self(), :measure, 100)
state = %{
i2c: i2c,
address: address,
config: config,
last_reading: :no_reading,
last_raw_reading: :no_reading
}
{:ok, state}
end
def init(args) do
# Hardcoded bus and address
bus_name = "i2c-1"
address = 0x59
transport = "bus: #{bus_name}, address: #{address}"
Logger.info("Starting SGP40. Please specify an address and a bus.")
Logger.info("Starting on " <> transport)
defaults =
args
|> Map.put(:address, address)
|> Map.put(:i2c_bus_name, bus_name)
init(defaults)
end
@impl true
def handle_info(:measure, %{i2c: i2c, address: address} = state) do
case Comm.measure(i2c, address, state.config) do
{:ok, raw} ->
Logger.info("SGP40 measurement taken: #{inspect(raw)}")
last_reading = CrcHelper.decode_voc(raw)
Process.send_after(self(), :measure, 50)
{:noreply, %{state | last_reading: last_reading, last_raw_reading: raw}}
{:error, reason} ->
Logger.error("SGP40 measurement failed: #{inspect(reason)}")
Process.send_after(self(), :measure, 50)
{:noreply, state}
end
end
@impl true
def terminate(reason, state) do
Logger.info("SGP40 GenServer terminating. Reason: #{inspect(reason)}")
# Clean up I2C bus if it's open
case state do
%{i2c: i2c} when not is_nil(i2c) ->
Comm.close(i2c)
Logger.info("Closed I2C connection.")
_ ->
Logger.warning("No I2C connection found to close.")
end
:ok
end
@impl true
def handle_call(:get_measurement, _from, %{last_reading: last_reading} = state) do
{:reply, last_reading, state}
end
crc_helper.ex
@polynomial 0x31
@init 0xFF
@doc """
Calculate the CRC-8 of a binary or list of bytes
"""
def crc8(data) when is_binary(data), do: crc8(:binary.bin_to_list(data))
def crc8(data) when is_list(data) do
Enum.reduce(data, @init, fn byte, crc ->
crc_byte(Bitwise.^^^(crc, byte))
end)
end
# Process one byte through CRC
defp crc_byte(byte) do
Enum.reduce(0..7, byte, fn _, crc ->
if (crc &&& 0x80) != 0 do
Bitwise.^^^(crc <<< 1, @polynomial) &&& 0xFF
else
crc <<< 1 &&& 0xFF
end
end)
end
@doc """
Encode a 2-byte word + CRC as required by the sensor
Expects {msb, lsb} tuple
"""
def encode_with_crc({msb, lsb}) do
data = <<msb, lsb>>
<<msb, lsb, crc8(data)>>
end
@doc """
Convert raw SGP40 measurement (6 bytes) to VOC value in ppb.
Expects <<msb, lsb, crc, _, _, _>> from get_measurement/0
"""
def decode_voc(<<msb, lsb, crc, _, _, _>>) do
if crc != crc8(<<msb, lsb>>) do
{:error, :crc_mismatch}
else
raw = msb <<< 8 ||| lsb
# Datasheet conversion: VOC index (0..500) ≈ raw / 65535 * 500
voc_ppb = raw * 500 / 65_535
{:ok, voc_ppb}
end
end
If you want to just test the comm.ex and config.ex
alias Sgp40.Comm
alias Sgp40.Config
bus_name = "i2c-1" # or whatever your I²C bus is
{_bus, sensor} = Comm.discover() # returns 0x59 (the sensor address)
i2c = Comm.open(bus_name) # open the bus
config = Config.new() # default config
Comm.write_config(config, i2c, sensor) # write config
Comm.read(i2c, sensor) # read raw values
if you want to test the GenServer on Sgp40.ex
Sgp40.start_link()
Sgp40.get_measurement()