and follow the prompts.
1"""
2Simple example of detecting (and verifying) the IRQ (interrupt) pin on the
3nRF24L01
4
5See documentation at https://nRF24.github.io/RF24
6"""
7
8import time
9from RF24 import (
10 RF24,
11 RF24_PA_LOW,
12 RF24_DRIVER,
13 RF24_TX_DF,
14 RF24_TX_DS,
15 RF24_RX_DR,
16 RF24_IRQ_ALL,
17)
18
19try:
20 import gpiod
21 from gpiod.line import Edge
22except ImportError as exc:
23 raise ImportError(
24 "This script requires gpiod installed for observing the IRQ pin. Please run\n"
25 "\n pip install gpiod\n\nMore details at https://pypi.org/project/gpiod/"
26 ) from exc
27
28try:
29 chip_path = "/dev/gpiochip4"
30 chip = gpiod.Chip(chip_path)
31except FileNotFoundError:
32 chip_path = "/dev/gpiochip0"
33 chip = gpiod.Chip(chip_path)
34finally:
35 print(__file__)
36
37 info = chip.get_info()
38 print(f"Using {info.name} [{info.label}] ({info.num_lines} lines)")
39
40
41
47CSN_PIN = 0
48if RF24_DRIVER == "MRAA":
49 CE_PIN = 15
50elif RF24_DRIVER == "wiringPi":
51 CE_PIN = 3
52else:
53 CE_PIN = 22
54radio =
RF24(CE_PIN, CSN_PIN)
55
56
57IRQ_PIN = 24
58
59
60
61address = [b"1Node", b"2Node"]
62
63
64
65
66
67
68radio_number = bool(
69 int(input("Which radio is this? Enter '0' or '1'. Defaults to '0' ") or 0)
70)
71
72
73if not radio.begin():
74 raise OSError("nRF24L01 hardware isn't responding")
75
76
77
78radio.enableDynamicPayloads()
79radio.enableAckPayload()
80
81
82
83radio.setPALevel(RF24_PA_LOW)
84
85
86radio.stopListening(address[radio_number])
87
88
89radio.openReadingPipe(1, address[not radio_number])
90
91
92
93
94
95
96
97
98
99
100pl_iterator = [0]
101tx_payloads = (b"Ping ", b"Pong ", b"Radio", b"1FAIL")
102ack_payloads = (b"Yak ", b"Back", b" ACK")
103
104
105def interrupt_handler():
106 """This function is called when IRQ pin is detected active LOW"""
107 print("\tIRQ pin went active LOW.")
108 flags = radio.clearStatusFlags()
109
110
111
112 print("\t", end="", flush=True)
113 radio.printStatus(flags)
114 if pl_iterator[0] == 0:
115 print(
116 " 'data ready' event test",
117 ("passed" if flags & RF24_RX_DR else "failed"),
118 )
119 elif pl_iterator[0] == 1:
120 print(
121 " 'data sent' event test", ("passed" if flags & RF24_TX_DS else "failed")
122 )
123 elif pl_iterator[0] == 2:
124 print(
125 " 'data fail' event test", ("passed" if flags & RF24_TX_DF else "failed")
126 )
127
128
129
130irq_line = gpiod.request_lines(
131 path=chip_path,
132 consumer="RF24_interrupt_py-example",
133 config={IRQ_PIN: gpiod.LineSettings(edge_detection=Edge.FALLING)},
134)
135
136
137def _wait_for_irq(timeout: float = 5):
138 """Wait till IRQ_PIN goes active (LOW).
139 IRQ pin is LOW when activated. Otherwise it is always HIGH
140 """
141
142 if not irq_line.wait_edge_events(timeout):
143 print(f"\tInterrupt event not detected for {timeout} seconds!")
144 return False
145
146 for event in irq_line.read_edge_events():
147 if event.line_offset == IRQ_PIN and event.event_type is event.Type.FALLING_EDGE:
148 return True
149 return False
150
151
152def master():
153 """Transmits 4 times and reports results
154
155 1. successfully receive ACK payload first
156 2. successfully transmit on second
157 3. send a third payload to fill RX node's RX FIFO
158 (supposedly making RX node unresponsive)
159 4. intentionally fail transmit on the fourth
160 """
161 radio.stopListening()
162
163
164 print("\nConfiguring IRQ pin to only ignore 'on data sent' event")
165 radio.setStatusFlags(RF24_RX_DR | RF24_TX_DF)
166 print(" Pinging slave node for an ACK payload...")
167 pl_iterator[0] = 0
168 radio.startFastWrite(tx_payloads[0], False)
169 if _wait_for_irq():
170 interrupt_handler()
171
172
173 print("\nConfiguring IRQ pin to only ignore 'on data ready' event")
174 radio.setStatusFlags(RF24_TX_DS | RF24_TX_DF)
175 print(" Pinging slave node again...")
176 pl_iterator[0] = 1
177 radio.startFastWrite(tx_payloads[1], False)
178 if _wait_for_irq():
179 interrupt_handler()
180
181
182 print("\nSending one extra payload to fill RX FIFO on slave node.")
183 print("Disabling IRQ pin for all events.")
184 radio.setStatusFlags()
185 if radio.write(tx_payloads[2]):
186 print("Slave node should not be listening anymore.")
187 else:
188 print("Slave node was unresponsive.")
189
190
191 print("\nConfiguring IRQ pin to go active for all events.")
192 radio.setStatusFlags(RF24_IRQ_ALL)
193 print(" Sending a ping to inactive slave node...")
194 radio.flush_tx()
195 pl_iterator[0] = 2
196 radio.startFastWrite(tx_payloads[3], False)
197 if _wait_for_irq():
198 interrupt_handler()
199 radio.flush_tx()
200
201
202 print("\nComplete RX FIFO:", radio.read(12))
203
204
205def slave(timeout=6):
206 """Only listen for 3 payload from the master node"""
207
208
209
210 print("\nDisabling IRQ pin for all events.")
211 radio.setStatusFlags()
212
213 radio.writeAckPayload(1, ack_payloads[0])
214 radio.writeAckPayload(1, ack_payloads[1])
215 radio.writeAckPayload(1, ack_payloads[2])
216 radio.startListening()
217 start_timer = time.monotonic()
218 while not radio.rxFifoFull() and time.monotonic() - start_timer < timeout:
219
220 pass
221 time.sleep(0.5)
222 radio.stopListening()
223 if radio.available():
224
225
226 print("Complete RX FIFO:", radio.read(15))
227
228
229def set_role():
230 """Set the role using stdin stream. Timeout arg for slave() can be
231 specified using a space delimiter (e.g. 'R 10' calls `slave(10)`)
232
233 :return:
234 - True when role is complete & app should continue running.
235 - False when app should exit
236 """
237 user_input = (
238 input(
239 f"Make sure the IRQ pin is connected to the GPIO{IRQ_PIN}\n"
240 "*** Enter 'R' for receiver role.\n"
241 "*** Enter 'T' for transmitter role.\n"
242 "*** Enter 'Q' to quit example.\n"
243 )
244 or "?"
245 )
246 user_input = user_input.split()
247 if user_input[0].upper().startswith("R"):
248 slave(*[int(x) for x in user_input[1:2]])
249 return True
250 if user_input[0].upper().startswith("T"):
251 master()
252 return True
253 if user_input[0].upper().startswith("Q"):
254 radio.powerDown()
255 return False
256 print(user_input[0], "is an unrecognized input. Please try again.")
257 return set_role()
258
259
260if __name__ == "__main__":
261 try:
262 while set_role():
263 pass
264 except KeyboardInterrupt:
265 print(" Keyboard Interrupt detected. Exiting...")
266 radio.powerDown()
267else:
268 print(
269 f"Make sure the IRQ pin is connected to the GPIO{IRQ_PIN}",
270 "Run slave() on receiver",
271 "Run master() on transmitter",
272 sep="\n",
273 )
Driver class for nRF24L01(+) 2.4GHz Wireless Transceiver.