ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

树莓派科学小实验4B--04_温湿度传感器

2022-01-26 19:03:43  阅读:186  来源: 互联网

标签:__ pull 树莓 04 温湿度 self bytes up GPIO


小实验目录

树莓派科学小实验
001 点亮第一盏LED灯
002 点亮LED灯组
003_开关控制LED灯
004_获取温湿度


提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档

目录


前言

在这个实验中将接触到时钟信号,高低电平的接收。字节转换


提示:以下是本篇文章正文内容,下面案例可供参考

一、实验部件

1 实验元器件

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

2 连接的GPIO针脚

使用的针脚:4,17,27,19

在这里插入图片描述

二、 DHT11 和 单片机或树莓派通信的过程

由于python无法控制us的时钟的精度,所以在实际收取时会出现无法获取或获取信号不完整的现象。但是基本原理如图下。
在这里插入图片描述在这里插入图片描述

三、代码部分

在这里引用了github上的开源项目:https://github.com/szazo/DHT11_Python

1 DHT11的实际工作代码

# -*- coding: utf-8 -*-
"""
#这个是对DHT11的类,将完成传感器的初始化,获取值,以及将值转换为10进制并返回,文章参考了:https://github.com/szazo/DHT11_Python
 
version = '0.0.1'
make day=2022-01-26
"""
__docformat__ = "restructuredtext en"

__all__ = []

__license__ = "MIT license"

import time

import RPi.GPIO as GPIO
import RPi


# 定义显示类用来显示和输出
class DHT11Result:
    'DHT11 返回从DHT11.read()中读取的值'
    ERR_NO_ERROR = 0
    ERR_MISSING_DATA = 1
    ERR_CRC = 2
    error_code = ERR_NO_ERROR
    temperature = -1
    humidity = -1

    def __init__(self, error_code, temperature, humidity):
        self.error_code = error_code
        self.temperature = temperature
        self.humidity = humidity

    def is_valid(self):
        return self.error_code == DHT11Result.ERR_NO_ERROR


# 定义探头初始化和获取的类
class DHT11:
    __pin = 0 #定义Pin的值

    def __init__(self, pin):
        self.__pin = pin

    def __send_and_sleep(self, output, sleep):
        '''
        设定发送的电平值和休眠的时间
        :param output: 电平值
        :param sleep: 休眠时间
        :return:
        '''
        GPIO.output(self.__pin, output)
        time.sleep(sleep)

    def __collect_input(self):
        '''
         这部分完成了对输入数据的采集
        '''
        unchanged_count = 0
        max_unchanged_count = 100

        last=-1
        data=[]
        while True:
            curent=GPIO.input(self.__pin)
            data.append(curent)
            if last != curent:
                unchanged_count=0
                last=curent

            else:
                unchanged_count+=1
                if unchanged_count>max_unchanged_count:
                    break
         
        return data

    def __parse_data_pull_up_lengths(self, data):
        '''
         这部分完成了对数据的初步处理
        ''' 
        STATE_INIT_PULL_DOWN = 1
        STATE_INIT_PULL_UP = 2
        STATE_DATA_FIRST_PULL_DOWN = 3
        STATE_DATA_PULL_UP = 4
        STATE_DATA_PULL_DOWN = 5

        state = STATE_INIT_PULL_DOWN

        lengths = []  # will contain the lengths of data pull up periods
        current_length = 0  # will contain the length of the previous period

        for i in range(len(data)):

            current = data[i]
            current_length += 1
            print(f"id:{i},value:{current},state:{state}")
            if state == STATE_INIT_PULL_DOWN:
                if current == RPi.GPIO.LOW:
                    # ok, we got the initial pull down
                    state = STATE_INIT_PULL_UP
                    continue
                else:
                    continue
            if state == STATE_INIT_PULL_UP:
                if current == RPi.GPIO.HIGH:
                    # ok, we got the initial pull up
                    state = STATE_DATA_FIRST_PULL_DOWN
                    continue
                else:
                    continue
            if state == STATE_DATA_FIRST_PULL_DOWN:
                if current == RPi.GPIO.LOW:
                    # we have the initial pull down, the next will be the data pull up
                    state = STATE_DATA_PULL_UP
                    continue
                else:
                    continue
            if state == STATE_DATA_PULL_UP:
                if current == RPi.GPIO.HIGH:
                    # data pulled up, the length of this pull up will determine whether it is 0 or 1
                    current_length = 0
                    state = STATE_DATA_PULL_DOWN
                    continue
                else:
                    continue
            if state == STATE_DATA_PULL_DOWN:
                if current == RPi.GPIO.LOW:
                    # pulled down, we store the length of the previous pull up period
                    lengths.append(current_length)
                    state = STATE_DATA_PULL_UP
                    continue
                else:
                    continue
         
        return lengths

    def __calculate_bits(self, pull_up_lengths):
        '''
         这部分完成了对数据的整形
        ''' 
        # find shortest and longest period
        shortest_pull_up = 1000
        longest_pull_up = 0

        for i in range(0, len(pull_up_lengths)):
            length = pull_up_lengths[i]
           
            if length < shortest_pull_up:
                shortest_pull_up = length
            if length > longest_pull_up:
                longest_pull_up = length

        # use the halfway to determine whether the period it is long or short
        halfway = shortest_pull_up + (longest_pull_up - shortest_pull_up) / 2
       
        bits = []

        for i in range(0, len(pull_up_lengths)):
            bit = False
            if pull_up_lengths[i] > halfway:
                bit = True
            bits.append(bit)
      
        return bits

    def __bits_to_bytes(self, bits):
        '''
         bit转bytes
        ''' 
        the_bytes = []
        byte = 0

        for i in range(0, len(bits)):
            print(f"left before:{byte}")
            byte = byte << 1
            print(f"left after:{byte}")
            if (bits[i]):
                byte = byte | 1
            else:
                byte = byte | 0
            if ((i + 1) % 8 == 0):
                the_bytes.append(byte)
                byte = 0
        print(f"the_bytes:{the_bytes}")
        return the_bytes

    def __calculate_checksum(self, the_bytes):
        return the_bytes[0] + the_bytes[1] + the_bytes[2] + the_bytes[3] & 255

    def read(self):
       '''
         读取数据
        ''' 
        RPi.GPIO.setup(self.__pin, RPi.GPIO.OUT)

        # send initial high
        self.__send_and_sleep(RPi.GPIO.HIGH, 0.05)

        # pull down to low
        self.__send_and_sleep(RPi.GPIO.LOW, 0.02)

        # change to input using pull up
        RPi.GPIO.setup(self.__pin, RPi.GPIO.IN, RPi.GPIO.PUD_UP)

        # collect data into an array
        data = self.__collect_input()

        # parse lengths of all data pull up periods
        pull_up_lengths = self.__parse_data_pull_up_lengths(data)

        # if bit count mismatch, return error (4 byte data + 1 byte checksum)
        if len(pull_up_lengths) != 40:
            return DHT11Result(DHT11Result.ERR_MISSING_DATA, 0, 0)

        # calculate bits from lengths of the pull up periods
        bits = self.__calculate_bits(pull_up_lengths)

        # we have the bits, calculate bytes
        the_bytes = self.__bits_to_bytes(bits)

        # calculate checksum and check
        checksum = self.__calculate_checksum(the_bytes)
        if the_bytes[4] != checksum:
            return DHT11Result(DHT11Result.ERR_CRC, 0, 0)

        # ok, we have valid data

        # The meaning of the return sensor values
        # the_bytes[0]: humidity int
        # the_bytes[1]: humidity decimal
        # the_bytes[2]: temperature int
        # the_bytes[3]: temperature decimal

        temperature = the_bytes[2] + float(the_bytes[3]) / 10
        humidity = the_bytes[0] + float(the_bytes[1]) / 10

        return DHT11Result(DHT11Result.ERR_NO_ERROR, temperature, humidity)

 

2 外层调用代码

# -*- coding: utf-8 -*-
"""
#这个实验完成了使用DHT11来读取温度和相对湿度
author = "Derek Tian"
version = '0.0.1'
make day=2022-01-25
"""
__docformat__ = "restructuredtext en"

__all__ = []

__license__ = "MIT license"

import time

import RPi.GPIO as GPIO
import DHT11 as dht11
import time
import datetime

# initialize GPIO
GPIO.setwarnings(True)
GPIO.setmode(GPIO.BCM)
# read data using pin 14
instance = dht11.DHT11(pin=4)

try:
    while True:
        result = instance.read()
        if result.is_valid():
            print("Last valid input: " + str(datetime.datetime.now()))

            print("Temperature: %-3.1f C" % result.temperature)
            print("Humidity: %-3.1f %%" % result.humidity)

        time.sleep(6)

except KeyboardInterrupt:
    print("Cleanup")
    GPIO.cleanup()

四、 代码分析

1) 在主函数中:

在这里使用的RPi.GPIO的库,来直接操作GPIO

import RPi.GPIO as GPIO

设定使用BCM方式操作GPIO,设定DHT11 连接的针脚为4并初始化DHT11的工作类

GPIO.setwarnings(True)
GPIO.setmode(GPIO.BCM)
# read data using pin 14
instance = dht11.DHT11(pin=4)

使用循环来获取值:

 while True:
        result = instance.read()
        if result.is_valid():
            print("Last valid input: " + str(datetime.datetime.now()))

            print("Temperature: %-3.1f C" % result.temperature)
            print("Humidity: %-3.1f %%" % result.humidity)

2) 对DTH11.py 的分析

(1) 定义发送信号的函数,在这里同时定义了休眠时间(这个是关键,因为这是使用软件在模拟PWM的数字方式),因为需要多次调用,所以抽象为独立函数

 def __send_and_sleep(self, output, sleep):
        GPIO.output(self.__pin, output)
        time.sleep(sleep)

(2)__collect_input() 函数,接收采集到的数值

 def __collect_input(self):
        '''
         这部分完成了对输入数据的采集
        '''
        unchanged_count = 0
        max_unchanged_count = 100

        last=-1
        data=[]
        while True:
            curent=GPIO.input(self.__pin)
            
            data.append(curent)
            if last != curent:
                unchanged_count=0
                last=curent

            else:
                unchanged_count+=1
                if unchanged_count>max_unchanged_count:
                    break
         
        return data

通过循环来接收信号采集到数字值,并设定读取失败的最大次数为100次。采集到的是0 or 1的数字信号
可以通过添加print打印并显示出来
在这里插入图片描述(3)__parse_data_pull_up_lengths函数完成了对原始数据采集的类型整理
程序的关键是定义了5个状态

        STATE_INIT_PULL_DOWN = 1 #发送前的前置低电平
        STATE_INIT_PULL_UP = 2  #发送前的前置高电平,这里将告诉主机准备接收
        STATE_DATA_FIRST_PULL_DOWN = 3  # 发送低电平标志数据准备发送
        STATE_DATA_PULL_UP = 4  #发送数据
        STATE_DATA_PULL_DOWN = 5  #数据发送结束

通过循环处理接收到的数据包,挑出其中有效数据帧存储,下面是对于每一个bit值得处理输出:
在这里插入图片描述
当数据状态转换为“STATE_DATA_PULL_UP”并且新的bit位为高电平时,将初始化

 if state == STATE_DATA_PULL_UP:
                if current == RPi.GPIO.HIGH:
                    # data pulled up, the length of this pull up will determine whether it is 0 or 1
                    current_length = 0
                    state = STATE_DATA_PULL_DOWN
                    continue
                else:
                    continue

当数据状态为“STATE_DATA_PULL_DOWN”并且新的bit位为低电平值时,将count的值写入list中

            if state == STATE_DATA_PULL_DOWN:
                if current == RPi.GPIO.LOW:
                    # pulled down, we store the length of the previous pull up period
                    lengths.append(current_length)
                    state = STATE_DATA_PULL_UP
                    continue
                else:
                    continue

最终返回的值:
在这里插入图片描述(4)__calculate_bits函数完成了将数值从count值准换为二进制的工作,在这里作者采用了中间值来判断的方式。
设定初始的上下边界,0,1000.由于最大接收的字节有效bit不可能超过1000,所以这里设定为1000(个人因为比较浪费)

        shortest_pull_up = 1000
        longest_pull_up = 0

通过找出上下边界,计算中间值

 for i in range(0, len(pull_up_lengths)):
            length = pull_up_lengths[i]
            print(f"length:{length}|shortest_pull_up:{shortest_pull_up}|longest_pull_up:{longest_pull_up}")
            if length < shortest_pull_up:
                shortest_pull_up = length
            if length > longest_pull_up:
                longest_pull_up = length

        # use the halfway to determine whether the period it is long or short
        halfway = shortest_pull_up + (longest_pull_up - shortest_pull_up) / 2

在这里插入图片描述

根据中间值判断来生成二进制list

 for i in range(0, len(pull_up_lengths)):
            bit = False
            if pull_up_lengths[i] > halfway:
                bit = True
            bits.append(bit)

在这里插入图片描述

(5)__bits_to_bytes 通过按位计算来获取采集到的值:
在这里,作者通过对bit位的左移操作来完成了2进制对10进制的转换。对bit位的连续左移到第8个bit位时,数值将是10进制的值。

 for i in range(0, len(bits)):
            print(f"left before:{byte}")
            byte = byte << 1
            print(f"left after:{byte}")
            if (bits[i]):
                byte = byte | 1
            else:
                byte = byte | 0
            if ((i + 1) % 8 == 0):
                the_bytes.append(byte)

在这里插入图片描述
(6)__calculate_checksum 将生成的4位数值写入一个数组,这里就不多说了。

red()的读取过程
在函数初始化中,第一步是设定PIN针脚的工作方向:(输出、输入),这里设定为输出,用于激活DHT11模块

 RPi.GPIO.setup(self.__pin, RPi.GPIO.OUT)

调用 ”__send_and_sleep”函数来对针脚进行控制
输出一个高电平,并休眠0.05秒,激活DHT11模块

  self.__send_and_sleep(RPi.GPIO.HIGH, 0.05)

输出一个低电平,并休眠0.02秒,准备接收DHT11模块的输出值

 self.__send_and_sleep(RPi.GPIO.LOW, 0.02)

在休眠0.02秒后,将PIN针脚转为输入模式,并设定高电平为信号采集位

RPi.GPIO.setup(self.__pin, RPi.GPIO.IN, RPi.GPIO.PUD_UP)

接收PIN的输入

data = self.__collect_input() 

调用函数对接收到的数据帧进行状态转换

 pull_up_lengths = self.__parse_data_pull_up_lengths(data)

生成4个8字节的byte的数据和1个8byte的效验位数据,

 if len(pull_up_lengths) != 40:
            return DHT11Result(DHT11Result.ERR_MISSING_DATA, 0, 0)
 实际上这个效验位数据就是前4个byte数据的总和。

最后的63就是效验位
在这里插入图片描述
# calculate bits from lengths of the pull up periods
bits = self.__calculate_bits(pull_up_lengths)

    # we have the bits, calculate bytes
    the_bytes = self.__bits_to_bytes(bits)

    # calculate checksum and check
    checksum = self.__calculate_checksum(the_bytes)
    if the_bytes[4] != checksum:
        return DHT11Result(DHT11Result.ERR_CRC, 0, 0)

    # ok, we have valid data

    # The meaning of the return sensor values
    # the_bytes[0]: humidity int
    # the_bytes[1]: humidity decimal
    # the_bytes[2]: temperature int
    # the_bytes[3]: temperature decimal

    temperature = the_bytes[2] + float(the_bytes[3]) / 10
    humidity = the_bytes[0] + float(the_bytes[1]) / 10

    return DHT11Result(DHT11Result.ERR_NO_ERROR, temperature, humidity)

修改代码,在最外层调用函数中添加按键处理和LED灯显示(按下时点亮,有数据时点亮)

# -*- coding: utf-8 -*-
"""
#这个实验完成了使用DHT11来读取温度和相对湿度
author = "Derek Tian"
version = '0.0.1'
make day=2022-01-25
"""
__docformat__ = "restructuredtext en"

__all__ = []

__license__ = "MIT license"

import time
from signal import pause

import RPi.GPIO as GPIO
import DHT11 as dht11
import time
import datetime
from  gpiozero import LED,Button

# initialize GPIO
GPIO.setwarnings(True)
GPIO.setmode(GPIO.BCM)
# read data using pin 14
instance = dht11.DHT11(pin=4)

button=Button(19)
red=LED(17)
yellow=LED(27)
bl=LED(5)
bl.off()
def getValue():
    red.on()
    result = instance.read()
    if result.is_valid():
        print("Last valid input: " + str(datetime.datetime.now()))
        print("Temperature: %-3.1f C" % result.temperature)
        print("Humidity: %-3.1f %%" % result.humidity)
        yellow.on()
    else:
        yellow.off()
        print("not get value !!!!")

def fGetValue():
    red.off()
    yellow.off()


if __name__=='__main__':
 try:
    button.when_pressed = getValue
    button.when_released = fGetValue

    pause()
    # while True:
    #     result = instance.read()
    #     if result.is_valid():
    #         print("Last valid input: " + str(datetime.datetime.now()))
    #
    #         print("Temperature: %-3.1f C" % result.temperature)
    #         print("Humidity: %-3.1f %%" % result.humidity)
    #
    #     time.sleep(6)

 except KeyboardInterrupt:
     print("Cleanup")
     GPIO.cleanup()

最终效果:

按下按键后红灯点亮,通过成功获取数值(黄色灯点亮):
在这里插入图片描述
在这里插入图片描述
按下按键后红色灯点亮,获取数值失败,黄色灯没有点亮:
在这里插入图片描述
在这里插入图片描述

总结:

对于DHT11来说,获取信号的时钟信息时非常关键的,如果无法正确获取到时钟是无法获取值或激活传感器的。而python对于这种需要us来控制的代码不太擅长,如果使用c或golang比较好。

标签:__,pull,树莓,04,温湿度,self,bytes,up,GPIO
来源: https://blog.csdn.net/tt212/article/details/122693927

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有