# Define your custom method defread_digital(): # Implement your digital read logic here # Example: read some sensor or internal state print("Running read_digital")
# Create a socket and listen s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(('', 80)) s.listen(5)
whileTrue: try: # Try to accept an incoming connection conn, addr = s.accept() print('Got a connection from %s' % str(addr)) request = conn.recv(1024).decode() print('Content = %s' % request) # Simple router for HTTP GET request if"/open"in request: print("Activating pin 13") pin.value(1) time.sleep(1) pin.value(0) response = 'HTTP/1.1 200 OK\n\nPin 13 activated!' else: response = 'HTTP/1.1 404 Not Found\n\nThe URL requested is not available.'
conn.send(response) conn.close()
except OSError: # If no incoming connection, run read_digital() read_digital() time.sleep(1) # Adding a small delay to the loop to avoid high CPU usage
# Define the read_digital function defread_digital(): # Add your digital reading logic here print("Running read_digital")
# Set up socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(('', 80)) s.listen(5) s.setblocking(False) # Set the socket to non-blocking mode
whileTrue: # Use select to check socket status rlist, _, _ = select.select([s], [], [], 1) # Timeout set to 1 second
if rlist: # If there is something to read conn, addr = s.accept() print('Got a connection from %s' % str(addr)) request = conn.recv(1024).decode() print('Content = %s' % request)
# Simple router for HTTP GET request if"/open"in request: print("Activating pin 13") pin.value(1) time.sleep(1) pin.value(0) response = 'HTTP/1.1 200 OK\n\nPin 13 activated!' else: response = 'HTTP/1.1 404 Not Found\n\nThe URL requested is not available.'
conn.send(response) conn.close() else: # Run the routine task if no HTTP requests read_digital()
import network import socket import machine import time import select
defsetup_network(ssid, password): """Initialize and connect to the WiFi network.""" station = network.WLAN(network.STA_IF) station.active(True) station.connect(ssid, password) whilenot station.isconnected(): pass print('Connection successful') print(station.ifconfig()) return station
defread_digital(): """Monitor GP14 and send an HTTP request when the pin goes from low to high.""" pin = machine.Pin(14, machine.Pin.IN, machine.Pin.PULL_DOWN) previous_state = pin.value()
whileTrue: current_state = pin.value() if current_state != previous_state: # Check if the pin state has changed if current_state == 1: # Check if the new state is high try: response = urequests.get('http://opal:9999') print('Request sent, response:', response.text) except Exception as e: print('Failed to send request:', str(e)) previous_state = current_state # Update the previous state to the current state time.sleep(0.1) # Add a short delay to reduce CPU usage
# Initialize the pin and set the initial previous state pin = machine.Pin(14, machine.Pin.IN, machine.Pin.PULL_DOWN) previous_state = pin.value() # Set the initial state globally
defread_digital(): global previous_state """Check pin GP14 and send an HTTP request when the pin goes from low to high.""" current_state = pin.value() if current_state != previous_state: # Check if the pin state has changed if current_state == 1: # Check if the new state is high try: response = urequests.get('http://opal:9999') print('Request sent, response:', response.text) response.close() except Exception as e: print('Failed to send request:', str(e)) previous_state = current_state # Update the previous state
# Assign the callback function to the client client.set_callback(sub_cb)
# Connect to the MQTT broker client.connect()
# Subscribe to the topic client.subscribe(MQTT_TOPIC) print("Connected to %s, subscribed to %s topic" % (MQTT_BROKER, MQTT_TOPIC))
try: whileTrue: # Check for new messages on the topic client.wait_msg() except KeyboardInterrupt: print("Disconnected from MQTT Broker") finally: # Disconnect the client client.disconnect()