Optimized high speed nRF24L01+ driver class documentation v1.4.10
TMRh20 2020 - Optimized fork of the nRF24L01+ driver
Loading...
Searching...
No Matches
examples_linux/interrupt_configure.py

Written by 2bndy5 in 2020

This is a simple example of using the RF24 class on a Raspberry Pi to detecting (and verifying) the IRQ (interrupt) pin on the nRF24L01.

Remember to install the Python wrapper, then navigate to the "RF24/examples_linux" folder.
To run this example, enter

python3 interrupt_configure.py

and follow the prompts.

Note
this example requires python v3.7 or newer because it measures transmission time with time.monotonic_ns().
1"""
2Simple example of detecting (and verifying) the IRQ (interrupt) pin on the
3nRF24L01
4
5See documentation at https://nRF24.github.io/RF24
6"""
7
8import time
9from RF24 import RF24, RF24_PA_LOW, RF24_DRIVER
10
11try:
12 import gpiod
13 from gpiod.line import Edge
14except ImportError as exc:
15 raise ImportError(
16 "This script requires gpiod installed for observing the IRQ pin. Please run\n"
17 "\n pip install gpiod\n\nMore details at https://pypi.org/project/gpiod/"
18 ) from exc
19
20try: # try RPi5 gpio chip first
21 chip_path = "/dev/gpiochip4"
22 chip = gpiod.Chip(chip_path)
23except FileNotFoundError: # fall back to gpio chip for RPi4 or older
24 chip_path = "/dev/gpiochip0"
25 chip = gpiod.Chip(chip_path)
26finally:
27 print(__file__) # print example name
28 # print gpio chip info
29 info = chip.get_info()
30 print(f"Using {info.name} [{info.label}] ({info.num_lines} lines)")
31
32
33
39CSN_PIN = 0 # GPIO8 aka CE0 on SPI bus 0: /dev/spidev0.0
40if RF24_DRIVER == "MRAA":
41 CE_PIN = 15 # for GPIO22
42elif RF24_DRIVER == "wiringPi":
43 CE_PIN = 3 # for GPIO22
44else:
45 CE_PIN = 22
46radio = RF24(CE_PIN, CSN_PIN)
47
48# select your digital input pin that's connected to the IRQ pin on the nRF24L01
49IRQ_PIN = 24
50
51# For this example, we will use different addresses
52# An address need to be a buffer protocol object (bytearray)
53address = [b"1Node", b"2Node"]
54# It is very helpful to think of an address as a path instead of as
55# an identifying device destination
56
57# to use different addresses on a pair of radios, we need a variable to
58# uniquely identify which address this radio will use to transmit
59# 0 uses address[0] to transmit, 1 uses address[1] to transmit
60radio_number = bool(
61 int(input("Which radio is this? Enter '0' or '1'. Defaults to '0' ") or 0)
62)
63
64# initialize the nRF24L01 on the spi bus
65if not radio.begin():
66 raise OSError("nRF24L01 hardware isn't responding")
67
68# this example uses the ACK payload to trigger the IRQ pin active for
69# the "on data received" event
70radio.enableDynamicPayloads() # ACK payloads are dynamically sized
71radio.enableAckPayload() # enable ACK payloads
72
73# set the Power Amplifier level to -12 dBm since this test example is
74# usually run with nRF24L01 transceivers in close proximity of each other
75radio.setPALevel(RF24_PA_LOW) # RF24_PA_MAX is default
76
77# set the TX address of the RX node into the TX pipe
78radio.openWritingPipe(address[radio_number]) # always uses pipe 0
79
80# set the RX address of the TX node into a RX pipe
81radio.openReadingPipe(1, address[not radio_number]) # using pipe 1
82
83# for debugging, we have 2 options that print a large block of details
84# (smaller) function that prints raw register values
85# radio.printDetails()
86# (larger) function that prints human readable data
87# radio.printPrettyDetails()
88
89# For this example, we'll be using a payload containing
90# a string that changes on every transmission. (successful or not)
91# Make a couple tuples of payloads & an iterator to traverse them
92pl_iterator = [0] # use a 1-item list instead of python's global keyword
93tx_payloads = (b"Ping ", b"Pong ", b"Radio", b"1FAIL")
94ack_payloads = (b"Yak ", b"Back", b" ACK")
95
96
97def interrupt_handler():
98 """This function is called when IRQ pin is detected active LOW"""
99 print("\tIRQ pin went active LOW.")
100 tx_ds, tx_df, rx_dr = radio.whatHappened() # update IRQ status flags
101 print(f"\ttx_ds: {tx_ds}, tx_df: {tx_df}, rx_dr: {rx_dr}")
102 if pl_iterator[0] == 0:
103 print(" 'data ready' event test", ("passed" if rx_dr else "failed"))
104 elif pl_iterator[0] == 1:
105 print(" 'data sent' event test", ("passed" if tx_ds else "failed"))
106 elif pl_iterator[0] == 2:
107 print(" 'data fail' event test", ("passed" if tx_df else "failed"))
108
109
110# setup IRQ GPIO pin
111irq_line = gpiod.request_lines(
112 path=chip_path,
113 consumer="RF24_interrupt_py-example", # optional
114 config={IRQ_PIN: gpiod.LineSettings(edge_detection=Edge.FALLING)},
115)
116
117
118def _wait_for_irq(timeout: float = 5):
119 """Wait till IRQ_PIN goes active (LOW).
120 IRQ pin is LOW when activated. Otherwise it is always HIGH
121 """
122 # wait up to ``timeout`` seconds for event to be detected.
123 if not irq_line.wait_edge_events(timeout):
124 print(f"\tInterrupt event not detected for {timeout} seconds!")
125 return False
126 # read event from kernel buffer
127 for event in irq_line.read_edge_events():
128 if event.line_offset == IRQ_PIN and event.event_type is event.Type.FALLING_EDGE:
129 return True
130 return False
131
132
133def master():
134 """Transmits 4 times and reports results
135
136 1. successfully receive ACK payload first
137 2. successfully transmit on second
138 3. send a third payload to fill RX node's RX FIFO
139 (supposedly making RX node unresponsive)
140 4. intentionally fail transmit on the fourth
141 """
142 radio.stopListening() # put radio in TX mode
143
144 # on data ready test
145 print("\nConfiguring IRQ pin to only ignore 'on data sent' event")
146 radio.maskIRQ(True, False, False) # args = tx_ds, tx_df, rx_dr
147 print(" Pinging slave node for an ACK payload...")
148 pl_iterator[0] = 0
149 radio.startFastWrite(tx_payloads[0], False) # False means expecting an ACK
150 if _wait_for_irq():
151 interrupt_handler()
152
153 # on "data sent" test
154 print("\nConfiguring IRQ pin to only ignore 'on data ready' event")
155 radio.maskIRQ(False, False, True) # args = tx_ds, tx_df, rx_dr
156 print(" Pinging slave node again...")
157 pl_iterator[0] = 1
158 radio.startFastWrite(tx_payloads[1], False) # False means expecting an ACK
159 if _wait_for_irq():
160 interrupt_handler()
161
162 # trigger slave node to exit by filling the slave node's RX FIFO
163 print("\nSending one extra payload to fill RX FIFO on slave node.")
164 print("Disabling IRQ pin for all events.")
165 radio.maskIRQ(True, True, True) # args = tx_ds, tx_df, rx_dr
166 if radio.write(tx_payloads[2]):
167 print("Slave node should not be listening anymore.")
168 else:
169 print("Slave node was unresponsive.")
170
171 # on "data fail" test
172 print("\nConfiguring IRQ pin to go active for all events.")
173 radio.maskIRQ(False, False, False) # args = tx_ds, tx_df, rx_dr
174 print(" Sending a ping to inactive slave node...")
175 radio.flush_tx() # just in case any previous tests failed
176 pl_iterator[0] = 2
177 radio.startFastWrite(tx_payloads[3], False) # False means expecting an ACK
178 if _wait_for_irq():
179 interrupt_handler()
180 radio.flush_tx() # flush artifact payload in TX FIFO from last test
181 # all 3 ACK payloads received were 4 bytes each, and RX FIFO is full
182 # so, fetching 12 bytes from the RX FIFO also flushes RX FIFO
183 print("\nComplete RX FIFO:", radio.read(12))
184
185
186def slave(timeout=6): # will listen for 6 seconds before timing out
187 """Only listen for 3 payload from the master node"""
188 # the "data ready" event will trigger in RX mode
189 # the "data sent" or "data fail" events will trigger when we
190 # receive with ACK payloads enabled (& loaded in TX FIFO)
191 print("\nDisabling IRQ pin for all events.")
192 radio.maskIRQ(True, True, True) # args = tx_ds, tx_df, rx_dr
193 # setup radio to receive pings, fill TX FIFO with ACK payloads
194 radio.writeAckPayload(1, ack_payloads[0])
195 radio.writeAckPayload(1, ack_payloads[1])
196 radio.writeAckPayload(1, ack_payloads[2])
197 radio.startListening() # start listening & clear irq_dr flag
198 start_timer = time.monotonic() # start timer now
199 while not radio.rxFifoFull() and time.monotonic() - start_timer < timeout:
200 # if RX FIFO is not full and timeout is not reached, then keep waiting
201 pass
202 time.sleep(0.5) # wait for last ACK payload to transmit
203 radio.stopListening() # put radio in TX mode & discard any ACK payloads
204 if radio.available(): # if RX FIFO is not empty (timeout did not occur)
205 # all 3 payloads received were 5 bytes each, and RX FIFO is full
206 # so, fetching 15 bytes from the RX FIFO also flushes RX FIFO
207 print("Complete RX FIFO:", radio.read(15))
208
209
210def set_role():
211 """Set the role using stdin stream. Timeout arg for slave() can be
212 specified using a space delimiter (e.g. 'R 10' calls `slave(10)`)
213
214 :return:
215 - True when role is complete & app should continue running.
216 - False when app should exit
217 """
218 user_input = (
219 input(
220 f"Make sure the IRQ pin is connected to the GPIO{IRQ_PIN}\n"
221 "*** Enter 'R' for receiver role.\n"
222 "*** Enter 'T' for transmitter role.\n"
223 "*** Enter 'Q' to quit example.\n"
224 )
225 or "?"
226 )
227 user_input = user_input.split()
228 if user_input[0].upper().startswith("R"):
229 slave(*[int(x) for x in user_input[1:2]])
230 return True
231 if user_input[0].upper().startswith("T"):
232 master()
233 return True
234 if user_input[0].upper().startswith("Q"):
235 radio.powerDown()
236 return False
237 print(user_input[0], "is an unrecognized input. Please try again.")
238 return set_role()
239
240
241if __name__ == "__main__":
242 try:
243 while set_role():
244 pass # continue example until 'Q' is entered
245 except KeyboardInterrupt:
246 print(" Keyboard Interrupt detected. Exiting...")
247 radio.powerDown()
248else:
249 print(
250 f"Make sure the IRQ pin is connected to the GPIO{IRQ_PIN}",
251 "Run slave() on receiver",
252 "Run master() on transmitter",
253 sep="\n",
254 )
Driver class for nRF24L01(+) 2.4GHz Wireless Transceiver.
Definition RF24.h:116