Optimized high speed nRF24L01+ driver class documentation v1.5.0
TMRh20 2020 - Optimized fork of the nRF24L01+ driver
Loading...
Searching...
No Matches
examples_linux/interrupt_configure.py

Written by 2bndy5 in 2020

This is a simple example of using the RF24 class on a Raspberry Pi to detecting (and verifying) the IRQ (interrupt) pin on the nRF24L01.

Remember to install the Python wrapper, then navigate to the "RF24/examples_linux" folder.
To run this example, enter

python3 interrupt_configure.py

and follow the prompts.

Note
this example requires python v3.7 or newer because it measures transmission time with time.monotonic_ns().
1"""
2Simple example of detecting (and verifying) the IRQ (interrupt) pin on the
3nRF24L01
4
5See documentation at https://nRF24.github.io/RF24
6"""
7
8import time
9from RF24 import (
10 RF24,
11 RF24_PA_LOW,
12 RF24_DRIVER,
13 RF24_TX_DF,
14 RF24_TX_DS,
15 RF24_RX_DR,
16 RF24_IRQ_ALL,
17)
18
19try:
20 import gpiod
21 from gpiod.line import Edge
22except ImportError as exc:
23 raise ImportError(
24 "This script requires gpiod installed for observing the IRQ pin. Please run\n"
25 "\n pip install gpiod\n\nMore details at https://pypi.org/project/gpiod/"
26 ) from exc
27
28try: # try RPi5 gpio chip first
29 chip_path = "/dev/gpiochip4"
30 chip = gpiod.Chip(chip_path)
31except FileNotFoundError: # fall back to gpio chip for RPi4 or older
32 chip_path = "/dev/gpiochip0"
33 chip = gpiod.Chip(chip_path)
34finally:
35 print(__file__) # print example name
36 # print gpio chip info
37 info = chip.get_info()
38 print(f"Using {info.name} [{info.label}] ({info.num_lines} lines)")
39
40
41
47CSN_PIN = 0 # GPIO8 aka CE0 on SPI bus 0: /dev/spidev0.0
48if RF24_DRIVER == "MRAA":
49 CE_PIN = 15 # for GPIO22
50elif RF24_DRIVER == "wiringPi":
51 CE_PIN = 3 # for GPIO22
52else:
53 CE_PIN = 22
54radio = RF24(CE_PIN, CSN_PIN)
55
56# select your digital input pin that's connected to the IRQ pin on the nRF24L01
57IRQ_PIN = 24
58
59# For this example, we will use different addresses
60# An address need to be a buffer protocol object (bytearray)
61address = [b"1Node", b"2Node"]
62# It is very helpful to think of an address as a path instead of as
63# an identifying device destination
64
65# to use different addresses on a pair of radios, we need a variable to
66# uniquely identify which address this radio will use to transmit
67# 0 uses address[0] to transmit, 1 uses address[1] to transmit
68radio_number = bool(
69 int(input("Which radio is this? Enter '0' or '1'. Defaults to '0' ") or 0)
70)
71
72# initialize the nRF24L01 on the spi bus
73if not radio.begin():
74 raise OSError("nRF24L01 hardware isn't responding")
75
76# this example uses the ACK payload to trigger the IRQ pin active for
77# the "on data received" event
78radio.enableDynamicPayloads() # ACK payloads are dynamically sized
79radio.enableAckPayload() # enable ACK payloads
80
81# set the Power Amplifier level to -12 dBm since this test example is
82# usually run with nRF24L01 transceivers in close proximity of each other
83radio.setPALevel(RF24_PA_LOW) # RF24_PA_MAX is default
84
85# set the TX address of the RX node for use on the TX pipe (pipe 0)
86radio.stopListening(address[radio_number])
87
88# set the RX address of the TX node into a RX pipe
89radio.openReadingPipe(1, address[not radio_number]) # using pipe 1
90
91# for debugging, we have 2 options that print a large block of details
92# (smaller) function that prints raw register values
93# radio.printDetails()
94# (larger) function that prints human readable data
95# radio.printPrettyDetails()
96
97# For this example, we'll be using a payload containing
98# a string that changes on every transmission. (successful or not)
99# Make a couple tuples of payloads & an iterator to traverse them
100pl_iterator = [0] # use a 1-item list instead of python's global keyword
101tx_payloads = (b"Ping ", b"Pong ", b"Radio", b"1FAIL")
102ack_payloads = (b"Yak ", b"Back", b" ACK")
103
104
105def interrupt_handler():
106 """This function is called when IRQ pin is detected active LOW"""
107 print("\tIRQ pin went active LOW.")
108 flags = radio.clearStatusFlags()
109 # Resetting the tx_df flag is required for
110 # continued TX operations when a transmission fails.
111 # clearing the status flags resets the IRQ pin to its inactive state (HIGH)
112 print("\t", end="", flush=True)
113 radio.printStatus(flags)
114 if pl_iterator[0] == 0:
115 print(
116 " 'data ready' event test",
117 ("passed" if flags & RF24_RX_DR else "failed"),
118 )
119 elif pl_iterator[0] == 1:
120 print(
121 " 'data sent' event test", ("passed" if flags & RF24_TX_DS else "failed")
122 )
123 elif pl_iterator[0] == 2:
124 print(
125 " 'data fail' event test", ("passed" if flags & RF24_TX_DF else "failed")
126 )
127
128
129# setup IRQ GPIO pin
130irq_line = gpiod.request_lines(
131 path=chip_path,
132 consumer="RF24_interrupt_py-example", # optional
133 config={IRQ_PIN: gpiod.LineSettings(edge_detection=Edge.FALLING)},
134)
135
136
137def _wait_for_irq(timeout: float = 5):
138 """Wait till IRQ_PIN goes active (LOW).
139 IRQ pin is LOW when activated. Otherwise it is always HIGH
140 """
141 # wait up to ``timeout`` seconds for event to be detected.
142 if not irq_line.wait_edge_events(timeout):
143 print(f"\tInterrupt event not detected for {timeout} seconds!")
144 return False
145 # read event from kernel buffer
146 for event in irq_line.read_edge_events():
147 if event.line_offset == IRQ_PIN and event.event_type is event.Type.FALLING_EDGE:
148 return True
149 return False
150
151
152def master():
153 """Transmits 4 times and reports results
154
155 1. successfully receive ACK payload first
156 2. successfully transmit on second
157 3. send a third payload to fill RX node's RX FIFO
158 (supposedly making RX node unresponsive)
159 4. intentionally fail transmit on the fourth
160 """
161 radio.stopListening() # put radio in TX mode
162
163 # on data ready test
164 print("\nConfiguring IRQ pin to only ignore 'on data sent' event")
165 radio.setStatusFlags(RF24_RX_DR | RF24_TX_DF)
166 print(" Pinging slave node for an ACK payload...")
167 pl_iterator[0] = 0
168 radio.startFastWrite(tx_payloads[0], False) # False means expecting an ACK
169 if _wait_for_irq():
170 interrupt_handler()
171
172 # on "data sent" test
173 print("\nConfiguring IRQ pin to only ignore 'on data ready' event")
174 radio.setStatusFlags(RF24_TX_DS | RF24_TX_DF)
175 print(" Pinging slave node again...")
176 pl_iterator[0] = 1
177 radio.startFastWrite(tx_payloads[1], False) # False means expecting an ACK
178 if _wait_for_irq():
179 interrupt_handler()
180
181 # trigger slave node to exit by filling the slave node's RX FIFO
182 print("\nSending one extra payload to fill RX FIFO on slave node.")
183 print("Disabling IRQ pin for all events.")
184 radio.setStatusFlags()
185 if radio.write(tx_payloads[2]):
186 print("Slave node should not be listening anymore.")
187 else:
188 print("Slave node was unresponsive.")
189
190 # on "data fail" test
191 print("\nConfiguring IRQ pin to go active for all events.")
192 radio.setStatusFlags(RF24_IRQ_ALL)
193 print(" Sending a ping to inactive slave node...")
194 radio.flush_tx() # just in case any previous tests failed
195 pl_iterator[0] = 2
196 radio.startFastWrite(tx_payloads[3], False) # False means expecting an ACK
197 if _wait_for_irq():
198 interrupt_handler()
199 radio.flush_tx() # flush artifact payload in TX FIFO from last test
200 # all 3 ACK payloads received were 4 bytes each, and RX FIFO is full
201 # so, fetching 12 bytes from the RX FIFO also flushes RX FIFO
202 print("\nComplete RX FIFO:", radio.read(12))
203
204
205def slave(timeout=6): # will listen for 6 seconds before timing out
206 """Only listen for 3 payload from the master node"""
207 # the "data ready" event will trigger in RX mode
208 # the "data sent" or "data fail" events will trigger when we
209 # receive with ACK payloads enabled (& loaded in TX FIFO)
210 print("\nDisabling IRQ pin for all events.")
211 radio.setStatusFlags()
212 # setup radio to receive pings, fill TX FIFO with ACK payloads
213 radio.writeAckPayload(1, ack_payloads[0])
214 radio.writeAckPayload(1, ack_payloads[1])
215 radio.writeAckPayload(1, ack_payloads[2])
216 radio.startListening() # start listening & clear irq_dr flag
217 start_timer = time.monotonic() # start timer now
218 while not radio.rxFifoFull() and time.monotonic() - start_timer < timeout:
219 # if RX FIFO is not full and timeout is not reached, then keep waiting
220 pass
221 time.sleep(0.5) # wait for last ACK payload to transmit
222 radio.stopListening() # put radio in TX mode & discard any ACK payloads
223 if radio.available(): # if RX FIFO is not empty (timeout did not occur)
224 # all 3 payloads received were 5 bytes each, and RX FIFO is full
225 # so, fetching 15 bytes from the RX FIFO also flushes RX FIFO
226 print("Complete RX FIFO:", radio.read(15))
227
228
229def set_role():
230 """Set the role using stdin stream. Timeout arg for slave() can be
231 specified using a space delimiter (e.g. 'R 10' calls `slave(10)`)
232
233 :return:
234 - True when role is complete & app should continue running.
235 - False when app should exit
236 """
237 user_input = (
238 input(
239 f"Make sure the IRQ pin is connected to the GPIO{IRQ_PIN}\n"
240 "*** Enter 'R' for receiver role.\n"
241 "*** Enter 'T' for transmitter role.\n"
242 "*** Enter 'Q' to quit example.\n"
243 )
244 or "?"
245 )
246 user_input = user_input.split()
247 if user_input[0].upper().startswith("R"):
248 slave(*[int(x) for x in user_input[1:2]])
249 return True
250 if user_input[0].upper().startswith("T"):
251 master()
252 return True
253 if user_input[0].upper().startswith("Q"):
254 radio.powerDown()
255 return False
256 print(user_input[0], "is an unrecognized input. Please try again.")
257 return set_role()
258
259
260if __name__ == "__main__":
261 try:
262 while set_role():
263 pass # continue example until 'Q' is entered
264 except KeyboardInterrupt:
265 print(" Keyboard Interrupt detected. Exiting...")
266 radio.powerDown()
267else:
268 print(
269 f"Make sure the IRQ pin is connected to the GPIO{IRQ_PIN}",
270 "Run slave() on receiver",
271 "Run master() on transmitter",
272 sep="\n",
273 )
Driver class for nRF24L01(+) 2.4GHz Wireless Transceiver.
Definition RF24.h:160