写在前面

  • 树莓派很极客吧!
  • 我学不会吧
  • 根本不敢开始?怕什么!

HTTP Port for Pico W.

2024年7月8日
https://chatgpt.com/share/4b9190d8-4a50-4eb5-bdde-ebf457e2cdd5


HTTP Port for Pico W.

🧑‍🦰User

1
为 Raspberry Pi Pico W Zero 配置 HTTP 请求的端口

🤖 Assistant

要为 Raspberry Pi Pico W Zero 配置一个用于接收 HTTP 请求的端口,您需要按照以下步骤进行:

  1. 连接 Pico W Zero 到您的网络:

    • 确保您的 Pico W Zero 具备 WiFi 功能并已连接至本地网络。
  2. 设置 Pico W Zero 的 Web 服务器:

    • 您需要编写代码,让 Pico W Zero 能够处理 HTTP 请求。Micropython 是非常适合 Pico W Zero 这类微控制器的编程环境。
    • 如果还未安装 MicroPython,请先将其安装到您的 Pico W Zero 上。
    • 编写一个 Micropython 脚本以启动基础的 Web 服务器。以下是一个示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import network
import socket

ssid = 'YOUR_SSID'
password = 'YOUR_PASSWORD'

station = network.WLAN(network.STA_IF)
station.active(True)
station.connect(ssid, password)

while station.isconnected() == False:
pass

print('Connection successful')
print(station.ifconfig())

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('', 80))
s.listen(5)

while True:
conn, addr = s.accept()
print('Got a connection from %s' % str(addr))
request = conn.recv(1024)
print('Content = %s' % str(request))
response = 'HTTP/1.1 200 OK\n\nHello World!'
conn.send(response)
conn.close()
  1. 在路由器上设置端口转发:

    • 如果您希望从外部网络访问 Pico W Zero,需要将外部端口转发到 Pico W Zero 的内部 IP 地址及端口。
    • 登录到路由器的管理界面,找到端口转发设置。
    • 创建一条规则,将外部端口(如 8080)转发到 Pico W Zero 的内部 IP 地址的 80 端口。
    • 确保您的 ISP 不会阻止您所使用的端口。
  2. 测试设置:

    • 在同一网络中的设备上访问 http://<Pico W IP 地址>/ 来测试服务器。
    • 从外部网络访问时,使用您的公共 IP 地址并添加转发的端口,例如 http://<公共 IP 地址>:8080/
  3. 安全性建议:

    • 在将设备直接暴露在互联网上时要格外小心。确保只开放必要的端口,并考虑使用防火墙或 VPN 增强安全性。

如果您需要更详细的指导,请告知我!

🧑‍🦰User

1
2
3
4
5
6
扩展程序:导入 `urequests` 模块并使其支持以下功能:

1. 尝试接受请求,如果没有请求传入,则运行自定义的 `read_digital()` 方法。
2. 如果有请求传入,则处理请求。如果请求路径为 `/open`,则在引脚 13 上写入 1 并保持 1 秒。

循环执行以上操作。

🤖 Assistant

这是一个扩展版的 Micropython 程序,适用于 Raspberry Pi Pico W Zero,能处理 HTTP 请求并控制 GPIO 引脚。程序会尝试接收 HTTP 请求,如果没有请求,它将调用自定义的 read_digital() 方法。若收到 /open 请求,它将在引脚 13 上写入 1 并保持 1 秒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import network
import socket
import machine
import time
import urequests

# Setup network
ssid = 'YOUR_SSID'
password = 'YOUR_PASSWORD'

station = network.WLAN(network.STA_IF)
station.active(True)
station.connect(ssid, password)

while not station.isconnected():
pass

print('Connection successful')
print(station.ifconfig())

# Setup pin
pin = machine.Pin(13, machine.Pin.OUT)

# Define your custom method
def read_digital():
# Implement your digital read logic here
# Example: read some sensor or internal state
print("Running read_digital")

# Create a socket and listen
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('', 80))
s.listen(5)

while True:
try:
# Try to accept an incoming connection
conn, addr = s.accept()
print('Got a connection from %s' % str(addr))
request = conn.recv(1024).decode()
print('Content = %s' % request)

# Simple router for HTTP GET request
if "/open" in request:
print("Activating pin 13")
pin.value(1)
time.sleep(1)
pin.value(0)
response = 'HTTP/1.1 200 OK\n\nPin 13 activated!'
else:
response = 'HTTP/1.1 404 Not Found\n\nThe URL requested is not available.'

conn.send(response)
conn.close()

except OSError:
# If no incoming connection, run read_digital()
read_digital()
time.sleep(1) # Adding a small delay to the loop to avoid high CPU usage

代码说明:

  • 网络配置: 初始化 WiFi 连接。
  • GPIO 引脚配置: 将引脚 13 设置为输出,用于控制连接的设备。
  • 服务器循环: 服务器尝试接收连接。如果请求路径为 /open,引脚 13 将激活并保持 1 秒。
  • 错误处理: 如果没有接收到连接(即 conn.accept() 抛出 OSError),则调用 read_digital()

请将 YOUR_SSIDYOUR_PASSWORD 替换为实际的 WiFi 凭据,并根据需要调整 read_digital() 方法的内容。

🧑‍🦰User

1
不,我不希望它在错误发生时这样做。我希望它在没有请求时执行,所以它类似于多线程,但更加明确。

🤖 Assistant

要在像 Raspberry Pi Pico W 这样的微控制器上使用 Micropython 模拟“多线程”行为,即在处理 HTTP 请求时,能同时执行其他任务(如 read_digital()),可以使用非阻塞套接字。这将允许您在不阻塞的情况下处理 HTTP 请求,同时执行其他任务。

以下是更新后的程序,使用非阻塞套接字检查 HTTP 请求,如果没有请求,则运行 read_digital() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import network
import socket
import machine
import time
import select

# Network setup
ssid = 'YOUR_SSID'
password = 'YOUR_PASSWORD'

station = network.WLAN(network.STA_IF)
station.active(True)
station.connect(ssid, password)

while not station.isconnected():
pass

print('Connection successful')
print(station.ifconfig())

# Pin setup
pin = machine.Pin(13, machine.Pin.OUT)

# Define the read_digital function
def read_digital():
# Add your digital reading logic here
print("Running read_digital")

# Set up socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('', 80))
s.listen(5)
s.setblocking(False) # Set the socket to non-blocking mode

while True:
# Use select to check socket status
rlist, _, _ = select.select([s], [], [], 1) # Timeout set to 1 second

if rlist: # If there is something to read
conn, addr = s.accept()
print('Got a connection from %s' % str(addr))
request = conn.recv(1024).decode()
print('Content = %s' % request)

# Simple router for HTTP GET request
if "/open" in request:
print("Activating pin 13")
pin.value(1)
time.sleep(1)
pin.value(0)
response = 'HTTP/1.1 200 OK\n\nPin 13 activated!'
else:
response = 'HTTP/1.1 404 Not Found\n\nThe URL requested is not available.'

conn.send(response)
conn.close()
else:
# Run the routine task if no HTTP requests
read_digital()

该版本如何工作:

  • 非阻塞套接字: 套接字被设置为非阻塞模式 (s.setblocking(False))。这意味着如果没有客户端连接,s.accept() 不会阻塞。
  • 选择函数: 使用 select.select() 函数检查在指定超时时间(此处为 1 秒)内是否有套接字活动。如果有连接请求,则处理请求;否则执行 read_digital() 方法。
  • 任务处理: 这种设置使设备可以间歇性地检查请求,在没有请求时执行其他任务,从而有效模拟多线程行为。

请确保替换 YOUR_SSIDYOUR_PASSWORD 为实际 WiFi 信息,并根据需要调整超时时间,以便在响应请求和执行任务之间取得平衡。

🧑‍🦰User

1
现在优化代码,将功能分离到函数中,并添加 `main()` 等主程序结构。

🤖 Assistant

为了优化代码,将其组织为模块化的结构,可以定义一些用于网络设置、引脚配置、HTTP 请求处理以及自定义函数 read_digital() 的函数,并在主执行块中调用这些函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import network
import socket
import machine
import time
import select

def setup_network(ssid, password):
"""Initialize and connect to the WiFi network."""
station = network.WLAN(network.STA_IF)
station.active(True)
station.connect(ssid, password)

while not station.isconnected():
pass

print('Connection successful')
print(station.ifconfig())
return station

def setup_pin(pin_number):
"""Configure GPIO pin."""
pin = machine.Pin(pin_number, machine.Pin.OUT)
return pin

def read_digital():
"""Add your digital reading logic here."""
print("Running read_digital")

def http_server(socket, pin):
"""Handle HTTP requests."""
rlist, _, _ = select.select([socket], [], [], 1) # 1 second timeout

if rlist:
conn, addr = socket.accept()
print('Got a connection from %s' % str(addr))
request = conn.recv(1024).decode()
print('Content = %s' % request)

if "/open" in request:
print("Activating pin")
pin.value(1)
time.sleep(1)
pin.value(0)
response = 'HTTP/1.1 200 OK\n\nPin activated!'
else:
response = 'HTTP/1.1 404 Not Found\n\nThe URL requested is not available.'

conn.send(response)
conn.close()
else:
read_digital()

def main():
"""Main function to initialize components and run the server loop."""
ssid = 'YOUR_SSID'
password = 'YOUR_PASSWORD'
pin_number = 13

# Setup network and pin
setup_network(ssid, password)
pin = setup_pin(pin_number)

# Setup socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('', 80))
s.listen(5)
s.setblocking(False)

print("Server is running...")

while True:
http_server(s, pin)

if __name__ == "__main__":
main()

代码解读:

  • 函数化组织: setup_networksetup_pinhttp_server 等函数负责处理初始化和处理任务,使主程序(main())清晰且易于理解。
  • 模块化设计: 每个组件(网络、GPIO、HTTP 请求处理)由独立函数处理,使代码更易于维护和修改。
  • 非阻塞服务器模型: http_server 函数使用非阻塞套接字和 select 函数高效处理 HTTP 请求,并在无请求时执行 read_digital()
  • 主执行块: if __name__ == "__main__": 确保 main() 仅在直接执行脚本时被调用,而不会在作为模块导入时被执行。

这种结构使代码更清晰、更易维护,同时仍然满足在处理 HTTP 请求时执行其他任务的功能需求。请填充正确的 WiFi 信息并在 Pico W Zero 上运行代码。

🧑‍🦰User

1
现在让我们处理 `read_digital()` 方法。当前任务是读取引脚 GP 14。当引脚从低电平变为高电平时,发送 HTTP 请求到 `opal:9999`。下面是这个新函数的实现。

🤖 Assistant

要创建 read_digital() 函数,监控 GPIO 引脚(GP 14),并在引脚状态从低电平变为高电平时,向指定地址发送 HTTP 请求,你可以使用 machine 模块访问 GPIO,并使用 urequests 进行 HTTP 请求。以下是函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import machine
import urequests

def read_digital():
"""Monitor GP14 and send an HTTP request when the pin goes from low to high."""
pin = machine.Pin(14, machine.Pin.IN, machine.Pin.PULL_DOWN)
previous_state = pin.value()

while True:
current_state = pin.value()
if current_state != previous_state: # Check if the pin state has changed
if current_state == 1: # Check if the new state is high
try:
response = urequests.get('http://opal:9999')
print('Request sent, response:', response.text)
except Exception as e:
print('Failed to send request:', str(e))
previous_state = current_state # Update the previous state to the current state
time.sleep(0.1) # Add a short delay to reduce CPU usage

函数的关键内容:

  • 引脚初始化: 将 GP 14 设置为输入并启用下拉电阻,确保引脚未被驱动时默认为低电平。

🧑‍🦰User

1
不建议使用永久循环,因为还需处理 HTTP 请求。使用全局变量来跟踪状态。

🤖 Assistant

考虑到程序还需要处理 HTTP 请求,因此在 read_digital() 方法中避免使用阻塞的循环是明智的选择。可以通过使用全局变量来跟踪引脚的前一状态,确保状态切换时响应请求,而不阻塞主程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import machine
import urequests

# Initialize the pin and set the initial previous state
pin = machine.Pin(14, machine.Pin.IN, machine.Pin.PULL_DOWN)
previous_state = pin.value() # Set the initial state globally

def read_digital():
global previous_state
"""Check pin GP14 and send an HTTP request when the pin goes from low to high."""
current_state = pin.value()
if current_state != previous_state: # Check if the pin state has changed
if current_state == 1: # Check if the new state is high
try:
response = urequests.get('http://opal:9999')
print('Request sent, response:', response.text)
response.close()
except Exception as e:
print('Failed to send request:', str(e))
previous_state = current_state # Update the previous state

关键点:

  • 全局变量: previous_state 在函数外定义,以便在多次调用 read_digital() 时保留其状态。这样避免在函数中使用阻塞循环,确保函数能被定期调用。
  • HTTP 请求: 当引脚从低电平转为高电平时,发送一个 GET 请求到 http://opal:9999,并输出响应文本以便调试,关闭响应以释放资源。

使用建议:

定期在主循环中调用 read_digital(),例如:

1
2
3
While True:
Http_server (s, pin)
Read_digital ()

这种实现方式允许 read_digital() 周期性地检查引脚状态,而不会使用阻塞的循环,从而确保与现有 HTTP 服务器循环的良好协作。

🧑‍🦰User

1
Micropython 上是否可以使用 Paho MQTT 客户端?

🤖 Assistant

在 Micropython 上直接使用 Paho MQTT 客户端并不容易,因为 Paho 主要面向标准的 python 环境。然而,Micropython 自带的 MQTT 库(通常称为 umqtt)非常适合在微控制器上使用。这个库有两个版本:

  1. umqtt. Simple - 一个较简单的 MQTT 客户端,适用于基本的 MQTT 功能。
  2. umqtt. Robust - 提供了更强大的连接管理功能,在连接失败时会尝试重新连接。

以下是使用 umqtt.simple 库发布消息的基本指南,类似于 Paho 的一些常见用法:

使用 umqtt.simple 设置 MQTT

首先,确保您的 Micropython 固件中包含 umqtt 库。如果没有,可能需要手动安装。

以下是一个连接到 MQTT 代理并发布消息的简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from umqtt.simple import MQTTClient
import machine

def mqtt_connect_and_publish():
# Configuration
SERVER = 'your.mqtt.broker.address' # Replace with your MQTT broker's address
CLIENT_ID = 'umqtt_client'
TOPIC = 'test/topic'
MESSAGE = 'Hello MQTT'

# Create an MQTT client instance
client = MQTTClient(CLIENT_ID, SERVER)

# Connect to the MQTT server
client.connect()

# Publish a message
client.publish(TOPIC, MESSAGE)
print("Message Published")

# Disconnect the client
client.disconnect()

# Call the function
mqtt_connect_and_publish()

如何安装 umqtt 如果未包含

如果您的 Micropython 版本不包含 umqtt 库,可以从 Micropython 库存储库手动下载并传输到设备。以下是添加库的一般步骤:

  1. 下载库: 从 Micropython 的 GitHub 存储库 获取 umqtt 库文件(umqtt/simple.pyumqtt/robust.py)。
  2. 上传到设备: 使用 ampyrshell 或支持 Micropython 的 IDE(如 Thonny)将这些文件上传到设备。

使用注意事项

在微控制器上使用 MQTT 时,需注意设备的内存和处理能力限制。请勿频繁发送高频消息或大负载,以免导致设备不稳定或崩溃。

通过这种方式使用 MQTT,您可以将设备与各种 IoT 平台集成,实现与 Paho MQTT 客户端在标准 python 环境中类似的功能。

🧑‍🦰User

1
2
3
4
5
6
7
8
9
10
11
给我一个使用 `umqtt.robust` 的基本示例,执行以下任务:


MQTT_BROKER = "opal"
MQTT_PORT = 1883
MQTT_TOPIC = 'frigate/events'
MQTT_USERNAME = 'admin'
MQTT_PASSWORD = 'admin'


并且仅监听所有消息。

🤖 Assistant

要使用 Micropython 中的 umqtt.robust 库设置 MQTT 订阅客户端,该库提供了更可靠的连接管理功能,以下是一个基本示例。此示例将连接到指定的 MQTT 代理,订阅一个主题,并监听所有传入消息。

使用 umqtt.robust 设置 MQTT 订阅客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from umqtt.robust import MQTTClient
import time

# MQTT Configuration
MQTT_BROKER = "opal"
MQTT_PORT = 1883
MQTT_TOPIC = 'frigate/events'
MQTT_USERNAME = 'admin'
MQTT_PASSWORD = 'admin'

def sub_cb(topic, msg):
""" Callback function handles received messages """
print("Received message on topic: %s; message: %s" % (topic.decode(), msg.decode()))

def main():
# Create an MQTT client instance
client = MQTTClient("umqtt_robust_client", MQTT_BROKER, MQTT_PORT, MQTT_USERNAME, MQTT_PASSWORD)

# Assign the callback function to the client
client.set_callback(sub_cb)

# Connect to the MQTT broker
client.connect()

# Subscribe to the topic
client.subscribe(MQTT_TOPIC)
print("Connected to %s, subscribed to %s topic" % (MQTT_BROKER, MQTT_TOPIC))

try:
while True:
# Check for new messages on the topic
client.wait_msg()
except KeyboardInterrupt:
print("Disconnected from MQTT Broker")
finally:
# Disconnect the client
client.disconnect()

if __name__ == "__main__":
main()

代码说明:

  • MQTT 配置: 设定代理、端口、主题、用户名和密码。
  • 回调函数(sub_cb): 当接收到新消息时调用此函数,它会打印主题和消息内容。
  • MQTT 客户端设置: 使用提供的凭据创建 umqtt.robust 模块中的 MQTTClient 实例。
  • 连接与订阅: 客户端连接到 MQTT 代理,设置回调函数并订阅指定主题。
  • 消息处理循环: 程序进入无限循环(client.wait_msg()),等待并处理接收到的消息。wait_msg() 是一个阻塞调用,等待接收消息后调用回调函数。
  • 清理退出: 在键盘中断(如按 CTRL+C)时,客户端将从 MQTT 代理断开连接。

运行代码:

在运行此代码前,请确保:

  • 您的设备上已安装 umqtt.robust 库。
  • 您的 MQTT 代理(如 “opal”)可通过设备所在的网络访问。
  • MQTT 代理允许使用指定用户名和密码连接,且设置了正确的主题权限。

这种设置将使您的 Micropython 设备成为指定主题的 MQTT 消息监听器,并在连接断开时自动重新连接。

🧑‍🦰User

1
如何在 Micropython 设备上安装 `umqtt` 库

🤖 Assistant

如果你想在 Micropython 设备上使用 umqtt 库,有几种方式可以选择,具体取决于你的开发环境和工具支持情况。以下是详细的安装步骤,重点介绍手动安装方法,因为 Micropython 中的自动包管理功能有限。

步骤 1:下载库文件

首先,你需要从 Micropython 的 GitHub 仓库中手动下载 umqtt 库文件:

  1. 访问 MicroPython-lib 仓库:micropython-lib
  2. 找到 umqtt.robustumqtt.simple 目录。
  3. 下载 robust.pysimple.py 文件,以及可能需要的其他依赖文件。

步骤 2:将文件上传到你的设备

接下来,你需要将下载的库文件上传到 Micropython 设备。可以使用 ampy 工具、rshell,或者通过支持 Micropython 的 IDE(如 Thonny 或 uPyCraft)来完成。

使用 ampy:

  • 确保 ampy 已安装在你的计算机上:
1
pip install adafruit-ampy
  • 连接你的 Micropython 设备到电脑,确定串口位置(例如,Windows 上通常是 COM 3,Linux 上通常是 /dev/ttyUSB 0)。

  • 使用 ampy 将文件传输到设备:

1
2
Ampy --port /dev/ttyUSB 0 put robust. Py
Ampy --port /dev/ttyUSB 0 put simple. Py

使用 Thonny:

  • 打开 Thonny,并连接到你的 Micropython 设备。
  • 通过 Thonny 的文件管理功能,将下载的 robust.pysimple.py 文件上传到设备。

步骤 3:确认安装成功

文件上传后,你可以通过以下步骤检查是否安装成功:

  • 连接到 Micropython 的 REPL 终端。
  • 尝试导入 umqtt 模块:
1
2
from umqtt. Robust import MQTTClient
from umqtt. Simple import MQTTClient

如果没有报错,则说明安装成功。

注意事项:

  • 如果你的 Micropython 设备具有网络连接,并且支持 upip(一个简单的包管理工具),你可以直接通过 upip 在线安装库:
1
2
3
import upip
upip. Install ('micropython-umqtt.robust')
ipip. Install ('micropython-umqtt.simple')

然而,并非所有 Micropython 环境都支持 upip,并且设备的网络连接可能不稳定,因此手动安装往往是更好的选择。