Optimized high speed nRF24L01+ driver class documentation v1.4.8
TMRh20 2020 - Optimized fork of the nRF24L01+ driver
Loading...
Searching...
No Matches
examples_linux/interrupt_configure.py

Written by 2bndy5 in 2020

This is a simple example of using the RF24 class on a Raspberry Pi to detecting (and verifying) the IRQ (interrupt) pin on the nRF24L01.

Remember to install the Python wrapper, then navigate to the "RF24/examples_linux" folder.
To run this example, enter

python3 interrupt_configure.py

and follow the prompts.

Note
this example requires python v3.7 or newer because it measures transmission time with time.monotonic_ns().
1"""
2Simple example of detecting (and verifying) the IRQ (interrupt) pin on the
3nRF24L01
4
5See documentation at https://nRF24.github.io/RF24
6"""
7
8import time
9from RF24 import RF24, RF24_PA_LOW, RF24_DRIVER
10
11try:
12 import gpiod
13 from gpiod.line import Edge
14except ImportError as exc:
15 raise ImportError(
16 "This script requires gpiod installed for observing the IRQ pin. Please run\n"
17 "\n pip install gpiod\n\nMore details at https://pypi.org/project/gpiod/"
18 ) from exc
19
20try: # try RPi5 gpio chip first
21 chip_path = "/dev/gpiochip4"
22 chip = gpiod.Chip(chip_path)
23except FileNotFoundError: # fall back to gpio chip for RPi4 or older
24 chip_path = "/dev/gpiochip0"
25 chip = gpiod.Chip(chip_path)
26finally:
27 print(__file__) # print example name
28 # print gpio chip info
29 info = chip.get_info()
30 print(f"Using {info.name} [{info.label}] ({info.num_lines} lines)")
31
32
33
40CSN_PIN = 0 # GPIO8 aka CE0 on SPI bus 0: /dev/spidev0.0
41if RF24_DRIVER == "MRAA":
42 CE_PIN = 15 # for GPIO22
43elif RF24_DRIVER == "wiringPi":
44 CE_PIN = 3 # for GPIO22
45else:
46 CE_PIN = 22
47radio = RF24(CE_PIN, CSN_PIN)
48
49# select your digital input pin that's connected to the IRQ pin on the nRF24L01
50IRQ_PIN = 24
51
52# For this example, we will use different addresses
53# An address need to be a buffer protocol object (bytearray)
54address = [b"1Node", b"2Node"]
55# It is very helpful to think of an address as a path instead of as
56# an identifying device destination
57
58# to use different addresses on a pair of radios, we need a variable to
59# uniquely identify which address this radio will use to transmit
60# 0 uses address[0] to transmit, 1 uses address[1] to transmit
61radio_number = bool(
62 int(input("Which radio is this? Enter '0' or '1'. Defaults to '0' ") or 0)
63)
64
65# initialize the nRF24L01 on the spi bus
66if not radio.begin():
67 raise OSError("nRF24L01 hardware isn't responding")
68
69# this example uses the ACK payload to trigger the IRQ pin active for
70# the "on data received" event
71radio.enableDynamicPayloads() # ACK payloads are dynamically sized
72radio.enableAckPayload() # enable ACK payloads
73
74# set the Power Amplifier level to -12 dBm since this test example is
75# usually run with nRF24L01 transceivers in close proximity of each other
76radio.setPALevel(RF24_PA_LOW) # RF24_PA_MAX is default
77
78# set the TX address of the RX node into the TX pipe
79radio.openWritingPipe(address[radio_number]) # always uses pipe 0
80
81# set the RX address of the TX node into a RX pipe
82radio.openReadingPipe(1, address[not radio_number]) # using pipe 1
83
84# for debugging, we have 2 options that print a large block of details
85# (smaller) function that prints raw register values
86# radio.printDetails()
87# (larger) function that prints human readable data
88# radio.printPrettyDetails()
89
90# For this example, we'll be using a payload containing
91# a string that changes on every transmission. (successful or not)
92# Make a couple tuples of payloads & an iterator to traverse them
93pl_iterator = [0] # use a 1-item list instead of python's global keyword
94tx_payloads = (b"Ping ", b"Pong ", b"Radio", b"1FAIL")
95ack_payloads = (b"Yak ", b"Back", b" ACK")
96
97
98def interrupt_handler():
99 """This function is called when IRQ pin is detected active LOW"""
100 print("\tIRQ pin went active LOW.")
101 tx_ds, tx_df, rx_dr = radio.whatHappened() # update IRQ status flags
102 print(f"\ttx_ds: {tx_ds}, tx_df: {tx_df}, rx_dr: {rx_dr}")
103 if pl_iterator[0] == 0:
104 print(" 'data ready' event test", ("passed" if rx_dr else "failed"))
105 elif pl_iterator[0] == 1:
106 print(" 'data sent' event test", ("passed" if tx_ds else "failed"))
107 elif pl_iterator[0] == 2:
108 print(" 'data fail' event test", ("passed" if tx_df else "failed"))
109
110
111# setup IRQ GPIO pin
112irq_line = gpiod.request_lines(
113 path=chip_path,
114 consumer="RF24_interrupt_py-example", # optional
115 config={IRQ_PIN: gpiod.LineSettings(edge_detection=Edge.FALLING)},
116)
117
118
119def _wait_for_irq(timeout: float = 5):
120 """Wait till IRQ_PIN goes active (LOW).
121 IRQ pin is LOW when activated. Otherwise it is always HIGH
122 """
123 # wait up to ``timeout`` seconds for event to be detected.
124 if not irq_line.wait_edge_events(timeout):
125 print(f"\tInterrupt event not detected for {timeout} seconds!")
126 return False
127 # read event from kernel buffer
128 for event in irq_line.read_edge_events():
129 if event.line_offset == IRQ_PIN and event.event_type is event.Type.FALLING_EDGE:
130 return True
131 return False
132
133
134def master():
135 """Transmits 4 times and reports results
136
137 1. successfully receive ACK payload first
138 2. successfully transmit on second
139 3. send a third payload to fill RX node's RX FIFO
140 (supposedly making RX node unresponsive)
141 4. intentionally fail transmit on the fourth
142 """
143 radio.stopListening() # put radio in TX mode
144
145 # on data ready test
146 print("\nConfiguring IRQ pin to only ignore 'on data sent' event")
147 radio.maskIRQ(True, False, False) # args = tx_ds, tx_df, rx_dr
148 print(" Pinging slave node for an ACK payload...")
149 pl_iterator[0] = 0
150 radio.startFastWrite(tx_payloads[0], False) # False means expecting an ACK
151 if _wait_for_irq():
152 interrupt_handler()
153
154 # on "data sent" test
155 print("\nConfiguring IRQ pin to only ignore 'on data ready' event")
156 radio.maskIRQ(False, False, True) # args = tx_ds, tx_df, rx_dr
157 print(" Pinging slave node again...")
158 pl_iterator[0] = 1
159 radio.startFastWrite(tx_payloads[1], False) # False means expecting an ACK
160 if _wait_for_irq():
161 interrupt_handler()
162
163 # trigger slave node to exit by filling the slave node's RX FIFO
164 print("\nSending one extra payload to fill RX FIFO on slave node.")
165 print("Disabling IRQ pin for all events.")
166 radio.maskIRQ(True, True, True) # args = tx_ds, tx_df, rx_dr
167 if radio.write(tx_payloads[2]):
168 print("Slave node should not be listening anymore.")
169 else:
170 print("Slave node was unresponsive.")
171
172 # on "data fail" test
173 print("\nConfiguring IRQ pin to go active for all events.")
174 radio.maskIRQ(False, False, False) # args = tx_ds, tx_df, rx_dr
175 print(" Sending a ping to inactive slave node...")
176 radio.flush_tx() # just in case any previous tests failed
177 pl_iterator[0] = 2
178 radio.startFastWrite(tx_payloads[3], False) # False means expecting an ACK
179 if _wait_for_irq():
180 interrupt_handler()
181 radio.flush_tx() # flush artifact payload in TX FIFO from last test
182 # all 3 ACK payloads received were 4 bytes each, and RX FIFO is full
183 # so, fetching 12 bytes from the RX FIFO also flushes RX FIFO
184 print("\nComplete RX FIFO:", radio.read(12))
185
186
187def slave(timeout=6): # will listen for 6 seconds before timing out
188 """Only listen for 3 payload from the master node"""
189 # the "data ready" event will trigger in RX mode
190 # the "data sent" or "data fail" events will trigger when we
191 # receive with ACK payloads enabled (& loaded in TX FIFO)
192 print("\nDisabling IRQ pin for all events.")
193 radio.maskIRQ(True, True, True) # args = tx_ds, tx_df, rx_dr
194 # setup radio to receive pings, fill TX FIFO with ACK payloads
195 radio.writeAckPayload(1, ack_payloads[0])
196 radio.writeAckPayload(1, ack_payloads[1])
197 radio.writeAckPayload(1, ack_payloads[2])
198 radio.startListening() # start listening & clear irq_dr flag
199 start_timer = time.monotonic() # start timer now
200 while not radio.rxFifoFull() and time.monotonic() - start_timer < timeout:
201 # if RX FIFO is not full and timeout is not reached, then keep waiting
202 pass
203 time.sleep(0.5) # wait for last ACK payload to transmit
204 radio.stopListening() # put radio in TX mode & discard any ACK payloads
205 if radio.available(): # if RX FIFO is not empty (timeout did not occur)
206 # all 3 payloads received were 5 bytes each, and RX FIFO is full
207 # so, fetching 15 bytes from the RX FIFO also flushes RX FIFO
208 print("Complete RX FIFO:", radio.read(15))
209
210
211def set_role():
212 """Set the role using stdin stream. Timeout arg for slave() can be
213 specified using a space delimiter (e.g. 'R 10' calls `slave(10)`)
214
215 :return:
216 - True when role is complete & app should continue running.
217 - False when app should exit
218 """
219 user_input = (
220 input(
221 f"Make sure the IRQ pin is connected to the GPIO{IRQ_PIN}\n"
222 "*** Enter 'R' for receiver role.\n"
223 "*** Enter 'T' for transmitter role.\n"
224 "*** Enter 'Q' to quit example.\n"
225 )
226 or "?"
227 )
228 user_input = user_input.split()
229 if user_input[0].upper().startswith("R"):
230 slave(*[int(x) for x in user_input[1:2]])
231 return True
232 if user_input[0].upper().startswith("T"):
233 master()
234 return True
235 if user_input[0].upper().startswith("Q"):
236 radio.powerDown()
237 return False
238 print(user_input[0], "is an unrecognized input. Please try again.")
239 return set_role()
240
241
242if __name__ == "__main__":
243 try:
244 while set_role():
245 pass # continue example until 'Q' is entered
246 except KeyboardInterrupt:
247 print(" Keyboard Interrupt detected. Exiting...")
248 radio.powerDown()
249else:
250 print(
251 f"Make sure the IRQ pin is connected to the GPIO{IRQ_PIN}",
252 "Run slave() on receiver",
253 "Run master() on transmitter",
254 sep="\n",
255 )
Driver class for nRF24L01(+) 2.4GHz Wireless Transceiver.
Definition RF24.h:116