Optimized high speed nRF24L01+ driver class documentation v1.4.8
TMRh20 2020 - Optimized fork of the nRF24L01+ driver
Loading...
Searching...
No Matches
examples_linux/interrupt_configure.py

Written by 2bndy5 in 2020

This is a simple example of using the RF24 class on a Raspberry Pi to detecting (and verifying) the IRQ (interrupt) pin on the nRF24L01.

Remember to install the Python wrapper, then navigate to the "RF24/examples_linux" folder.
To run this example, enter

python3 interrupt_configure.py

and follow the prompts.

Note
this example requires python v3.7 or newer because it measures transmission time with time.monotonic_ns().
1"""
2This example uses Acknowledgement (ACK) payloads attached to ACK packets to
3demonstrate how the nRF24L01's IRQ (Interrupt Request) pin can be
4configured to detect when data is received, or when data has transmitted
5successfully, or when data has failed to transmit.
6
7This example was written to be used on 2 devices acting as "nodes".
8"""
9import sys
10import argparse
11import time
12import RPi.GPIO as GPIO
13from RF24 import RF24, RF24_PA_LOW
14
15
16parser = argparse.ArgumentParser(
17 description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
18)
19parser.add_argument(
20 "-n",
21 "--node",
22 type=int,
23 choices=range(2),
24 help="the identifying radio number (or node ID number)",
25)
26parser.add_argument(
27 "-r",
28 "--role",
29 type=int,
30 choices=range(2),
31 help="'1' specifies the TX role. '0' specifies the RX role.",
32)
33
34
41CSN_PIN = 0 # connected to GPIO8
42CE_PIN = 22 # connected to GPIO22
43radio = RF24(CE_PIN, CSN_PIN)
44
49
50# select your digital input pin that's connected to the IRQ pin on the nRF24L01
51IRQ_PIN = 12
52
53# For this example, we'll be using a payload containing
54# a string that changes on every transmission. (successful or not)
55# Make a couple tuples of payloads & an iterator to traverse them
56pl_iterator = [0] # use a 1-item list instead of python's global keyword
57tx_payloads = (b"Ping ", b"Pong ", b"Radio", b"1FAIL")
58ack_payloads = (b"Yak ", b"Back", b" ACK")
59
60
61def interrupt_handler(channel):
62 """This function is called when IRQ pin is detected active LOW"""
63 print("IRQ pin", channel, "went active LOW.")
64 tx_ds, tx_df, rx_dr = radio.whatHappened() # get IRQ status flags
65 if tx_df:
66 radio.flush_tx()
67 print(f"\ttx_ds: {tx_ds}, tx_df: {tx_df}, rx_dr: {rx_dr}")
68 if pl_iterator[0] == 0:
69 print(" 'data ready' event test", ("passed" if rx_dr else "failed"))
70 elif pl_iterator[0] == 1:
71 print(" 'data sent' event test", ("passed" if tx_ds else "failed"))
72 elif pl_iterator[0] == 3:
73 print(" 'data fail' event test", ("passed" if tx_df else "failed"))
74
75
76# setup IRQ GPIO pin
77GPIO.setmode(GPIO.BCM)
78GPIO.setup(IRQ_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP)
79GPIO.add_event_detect(IRQ_PIN, GPIO.FALLING, callback=interrupt_handler)
80# IMPORTANT: do not call radio.available() before calling
81# radio.whatHappened() when the interruptHandler() is triggered by the
82# IRQ pin FALLING event. According to the datasheet, the pipe information
83# is unreliable during the IRQ pin FALLING transition.
84
85
86def _ping_n_wait(pl_iter):
87 """private function to ping RX node and wait for IRQ pin to be handled
88
89 :param int pl_iter: The index of the buffer in `tx_payloads` tuple to
90 send. This number is also used to determine if event test was
91 successful or not.
92 """
93 # set pl_iterator[0] so interrupt_handler() can determine if test was
94 # successful or not
95 pl_iterator[0] = pl_iter
96 # the following False parameter means we're expecting an ACK packet
97 radio.startFastWrite(tx_payloads[pl_iter], False)
98 time.sleep(0.1) # wait 100 ms for interrupt_handler() to complete
99
100
101def print_rx_fifo(pl_size: int):
102 """Flush RX FIFO by printing all available payloads with 1 buffer
103
104 :param int pl_size: the expected size of each payload
105 """
106 if radio.rxFifoFull():
107 # all 3 payloads received were 5 bytes each, and RX FIFO is full
108 # so, fetching 15 bytes from the RX FIFO also flushes RX FIFO
109 print("Complete RX FIFO:", radio.read(pl_size * 3).decode("utf-8"))
110 else:
111 buffer = bytearray()
112 while radio.available():
113 buffer += radio.read(pl_size)
114 if buffer: # if any payloads were read from the RX FIFO
115 print("Complete RX FIFO:", buffer.decode("utf-8"))
116
117
118def master():
119 """Transmits 4 times and reports results
120
121 1. successfully receive ACK payload first
122 2. successfully transmit on second
123 3. send a third payload to fill RX node's RX FIFO (supposedly making RX node unresponsive)
124 4. intentionally fail transmit on the fourth
125 """
126 radio.stopListening() # put radio in TX mode
127
128 # on data ready test
129 print("\nConfiguring IRQ pin to only ignore 'on data sent' event")
130 radio.maskIRQ(True, False, False) # args = tx_ds, tx_df, rx_dr
131 print(" Pinging slave node for an ACK payload...", end=" ")
132 _ping_n_wait(0)
133
134 # on "data sent" test
135 print("\nConfiguring IRQ pin to only ignore 'on data ready' event")
136 radio.maskIRQ(False, False, True) # args = tx_ds, tx_df, rx_dr
137 print(" Pinging slave node again... ", end=" ")
138 _ping_n_wait(1)
139
140 # trigger slave node to stopListening() by filling slave node's RX FIFO
141 print("\nSending one extra payload to fill RX FIFO on slave node.")
142 radio.maskIRQ(1, 1, 1) # disable IRQ pin for this step
143 if radio.write(tx_payloads[2]):
144 # when send_only parameter is True, send() ignores RX FIFO usage
145 if radio.rxFifoFull():
146 print("RX node's FIFO is full; it is not listening any more")
147 else:
148 print(
149 "Transmission successful, but the RX node might still be listening."
150 )
151 else:
152 radio.flush_tx()
153 print("Transmission failed or timed out. Continuing anyway.")
154
155 # on "data fail" test
156 print("\nConfiguring IRQ pin to go active for all events.")
157 radio.maskIRQ(False, False, False) # args = tx_ds, tx_df, rx_dr
158 print(" Sending a ping to inactive slave node...", end=" ")
159 _ping_n_wait(3)
160
161 # CE pin is still HIGH which consumes more power. Example is now idling so...
162 radio.stopListening() # ensure CE pin is LOW
163 # stopListening() also calls flush_tx() when ACK payloads are enabled
164
165 print_rx_fifo(len(ack_payloads[0])) # empty RX FIFO
166
167
168def slave(timeout: int = 6):
169 """Only listen for 3 payload from the master node
170
171 :param int timeout: The number of seconds to wait (with no transmission)
172 until exiting function.
173 """
174 pl_iterator[0] = 0 # reset this to indicate event is a 'data_ready' event
175 # setup radio to receive pings, fill TX FIFO with ACK payloads
176 radio.writeAckPayload(1, ack_payloads[0])
177 radio.writeAckPayload(1, ack_payloads[1])
178 radio.writeAckPayload(1, ack_payloads[2])
179 radio.startListening() # start listening & clear status flags
180 start_timer = time.monotonic() # start timer now
181 while not radio.rxFifoFull() and time.monotonic() - start_timer < timeout:
182 # if RX FIFO is not full and timeout is not reached, then keep waiting
183 pass
184 time.sleep(0.1) # wait for last ACK payload to transmit
185 radio.stopListening() # put radio in TX mode & discard any ACK payloads
186 print_rx_fifo(len(tx_payloads[0]))
187
188
189def set_role() -> bool:
190 """Set the role using stdin stream. Timeout arg for slave() can be
191 specified using a space delimiter (e.g. 'R 10' calls `slave(10)`)
192
193 :return:
194 - True when role is complete & app should continue running.
195 - False when app should exit
196 """
197 user_input = (
198 input(
199 "*** Enter 'R' for receiver role.\n"
200 "*** Enter 'T' for transmitter role.\n"
201 "*** Enter 'Q' to quit example.\n"
202 )
203 or "?"
204 )
205 user_input = user_input.split()
206 if user_input[0].upper().startswith("R"):
207 if len(user_input) > 1:
208 slave(int(user_input[1]))
209 else:
210 slave()
211 return True
212 if user_input[0].upper().startswith("T"):
213 master()
214 return True
215 if user_input[0].upper().startswith("Q"):
216 radio.powerDown()
217 return False
218 print(user_input[0], "is an unrecognized input. Please try again.")
219 return set_role()
220
221
222if __name__ == "__main__":
223
224 args = parser.parse_args() # parse any CLI args
225
226 # initialize the nRF24L01 on the spi bus
227 if not radio.begin():
228 raise RuntimeError("radio hardware is not responding")
229
230 # For this example, we will use different addresses
231 # An address need to be a buffer protocol object (bytearray)
232 address = [b"1Node", b"2Node"]
233 # It is very helpful to think of an address as a path instead of as
234 # an identifying device destination
235
236 print(sys.argv[0]) # print example name
237
238 # to use different addresses on a pair of radios, we need a variable to
239 # uniquely identify which address this radio will use to transmit
240 # 0 uses address[0] to transmit, 1 uses address[1] to transmit
241 radio_number = args.node # uses default value from `parser`
242 if args.node is None: # if '--node' arg wasn't specified
243 radio_number = bool(
244 int(input("Which radio is this? Enter '0' or '1'. Defaults to '0' ") or 0)
245 )
246
247 # set the Power Amplifier level to -12 dBm since this test example is
248 # usually run with nRF24L01 transceivers in close proximity of each other
249 radio.setPALevel(RF24_PA_LOW) # RF24_PA_MAX is default
250
251 # ACK payloads are dynamically sized.
252 radio.enableDynamicPayloads() # to use ACK payloads
253
254 # this example uses the ACK payload to trigger the IRQ pin active for
255 # the "on data received" event
256 radio.enableAckPayload() # enable ACK payloads
257
258 # set the TX address of the RX node into the TX pipe
259 radio.openWritingPipe(address[radio_number]) # always uses pipe 0
260
261 # set the RX address of the TX node into a RX pipe
262 radio.openReadingPipe(1, address[not radio_number]) # using pipe 1
263
264 # for debugging, we have 2 options that print a large block of details
265 # (smaller) function that prints raw register values
266 # radio.printDetails()
267 # (larger) function that prints human readable data
268 # radio.printPrettyDetails()
269
270 try:
271 if args.role is None: # if not specified with CLI arg '-r'
272 while set_role():
273 pass # continue example until 'Q' is entered
274 else: # if role was set using CLI args
275 # run role once and exit
276 if bool(args.role):
277 master()
278 else:
279 slave()
280 except KeyboardInterrupt:
281 print(" Keyboard Interrupt detected. Powering down radio.")
282 radio.powerDown()
283
Driver class for nRF24L01(+) 2.4GHz Wireless Transceiver.
Definition: RF24.h:116