Optimized high speed nRF24L01+ driver class documentation v1.4.11
TMRh20 2020 - Optimized fork of the nRF24L01+ driver
Loading...
Searching...
No Matches
examples_linux/streaming_data.py

Written by 2bndy5 in 2020

This is a simple example of using the RF24 class on a Raspberry Pi for streaming multiple payloads.

Remember to install the Python wrapper, then navigate to the "RF24/examples_linux" folder.
To run this example, enter

python3 streaming_data.py

and follow the prompts.

Note
this example requires python v3.7 or newer because it measures transmission time with time.monotonic_ns().
1"""
2A simple example of streaming data from 1 nRF24L01 transceiver to another.
3
4This example was written to be used on 2 devices acting as 'nodes'.
5
6See documentation at https://nRF24.github.io/RF24
7"""
8
9import time
10from RF24 import RF24, RF24_PA_LOW, RF24_DRIVER
11
12print(__file__) # print example name
13
14
20CSN_PIN = 0 # GPIO8 aka CE0 on SPI bus 0: /dev/spidev0.0
21if RF24_DRIVER == "MRAA":
22 CE_PIN = 15 # for GPIO22
23elif RF24_DRIVER == "wiringPi":
24 CE_PIN = 3 # for GPIO22
25else:
26 CE_PIN = 22
27radio = RF24(CE_PIN, CSN_PIN)
28
29# initialize the nRF24L01 on the spi bus
30if not radio.begin():
31 raise RuntimeError("radio hardware is not responding")
32
33# For this example, we will use different addresses
34# An address need to be a buffer protocol object (bytearray)
35address = [b"1Node", b"2Node"]
36# It is very helpful to think of an address as a path instead of as
37# an identifying device destination
38
39# to use different addresses on a pair of radios, we need a variable to
40# uniquely identify which address this radio will use to transmit
41# 0 uses address[0] to transmit, 1 uses address[1] to transmit
42radio_number = bool(
43 int(input("Which radio is this? Enter '0' or '1'. Defaults to '0' ") or 0)
44)
45
46# set the Power Amplifier level to -12 dBm since this test example is
47# usually run with nRF24L01 transceivers in close proximity of each other
48radio.setPALevel(RF24_PA_LOW) # RF24_PA_MAX is default
49
50# set the TX address of the RX node into the TX pipe
51radio.openWritingPipe(address[radio_number]) # always uses pipe 0
52
53# set the RX address of the TX node into a RX pipe
54radio.openReadingPipe(1, address[not radio_number]) # using pipe 1
55
56
57# Specify the number of bytes in the payload. This is also used to
58# specify the number of payloads in 1 stream of data
59SIZE = 32 # this is the default maximum payload size
60
61# To save time during transmission, we'll set the payload size to be only
62# what we need. For this example, we'll be using the default maximum 32
63radio.payloadSize = SIZE
64
65# for debugging, we have 2 options that print a large block of details
66# (smaller) function that prints raw register values
67# radio.printDetails()
68# (larger) function that prints human readable data
69# radio.printPrettyDetails()
70
71
72def make_buffer(buf_iter: int) -> bytes:
73 """Returns a dynamically created payloads
74
75 :param int buf_iter: The position of the payload in the data stream
76 """
77 # we'll use `SIZE` for the number of payloads in the list and the
78 # payloads' length
79 # prefix payload with a sequential letter to indicate which
80 # payloads were lost (if any)
81 buff = bytes([buf_iter + (65 if 0 <= buf_iter < 26 else 71)])
82 for j in range(SIZE - 1):
83 char = bool(j >= (SIZE - 1) / 2 + abs((SIZE - 1) / 2 - buf_iter))
84 char |= bool(j < (SIZE - 1) / 2 - abs((SIZE - 1) / 2 - buf_iter))
85 buff += bytes([char + 48])
86 return buff
87
88
89def master(count: int = 1):
90 """Uses all 3 levels of the TX FIFO to send a stream of data
91
92 :param int count: how many times to transmit the stream of data.
93 """
94 radio.stopListening() # put radio in TX mode
95 radio.flush_tx() # clear the TX FIFO so we can use all 3 levels
96 failures = 0 # keep track of manual retries
97 start_timer = time.monotonic_ns() # start timer
98 for multiplier in range(count): # repeat transmit the same data stream
99 buf_iter = 0 # iterator of payloads for the while loop
100 while buf_iter < SIZE: # cycle through all the payloads
101 buffer = make_buffer(buf_iter) # make a payload
102
103 if not radio.writeFast(buffer): # transmission failed
104 failures += 1 # increment manual retry count
105 if failures > 99 and buf_iter < 7 and multiplier < 2:
106 # we need to prevent an infinite loop
107 print("Too many failures detected. Aborting at payload ", buffer[0])
108 multiplier = count # be sure to exit the for loop
109 break # exit the while loop
110 radio.reUseTX() # resend payload in top level of TX FIFO
111 else: # transmission succeeded
112 buf_iter += 1
113 end_timer = time.monotonic_ns() # end timer
114 print(
115 f"Time to transmit data = {(end_timer - start_timer) / 1000} us.",
116 f"Detected {failures} failures.",
117 )
118
119
120def slave(timeout: int = 6):
121 """Listen for any payloads and print them out (suffixed with received
122 counter)
123
124 :param int timeout: The number of seconds to wait (with no transmission)
125 until exiting function.
126 """
127 radio.startListening() # put radio in RX mode
128 count = 0 # keep track of the number of received payloads
129 start_timer = time.monotonic() # start timer
130 while (time.monotonic() - start_timer) < timeout:
131 if radio.available():
132 count += 1
133 # retrieve the received packet's payload
134 receive_payload = radio.read(radio.payloadSize)
135 print("Received:", receive_payload, "-", count)
136 start_timer = time.monotonic() # reset timer on every RX payload
137
138 print("Nothing received in", timeout, "seconds. Leaving RX role")
139 # recommended behavior is to keep in TX mode while idle
140 radio.stopListening() # put the radio in TX mode
141
142
143def set_role() -> bool:
144 """Set the role using stdin stream. Role args can be specified using space
145 delimiters (e.g. 'R 10' calls `slave(10)` & 'T 3' calls `master(3)`)
146
147 :return:
148 - True when role is complete & app should continue running.
149 - False when app should exit
150 """
151 user_input = (
152 input(
153 "*** Enter 'R' for receiver role.\n"
154 "*** Enter 'T' for transmitter role.\n"
155 "*** Enter 'Q' to quit example.\n"
156 )
157 or "?"
158 )
159 user_input = user_input.split()
160 if user_input[0].upper().startswith("R"):
161 if len(user_input) > 1:
162 slave(int(user_input[1]))
163 else:
164 slave()
165 return True
166 if user_input[0].upper().startswith("T"):
167 if len(user_input) > 1:
168 master(int(user_input[1]))
169 else:
170 master()
171 return True
172 if user_input[0].upper().startswith("Q"):
173 radio.powerDown()
174 return False
175 print(user_input[0], "is an unrecognized input. Please try again.")
176 return set_role()
177
178
179if __name__ == "__main__":
180 try:
181 while set_role():
182 pass # continue example until 'Q' is entered
183 except KeyboardInterrupt:
184 print(" Keyboard Interrupt detected. Powering down radio.")
185 radio.powerDown()
186else:
187 print(" Run slave() on receiver\n Run master() on transmitter")
Driver class for nRF24L01(+) 2.4GHz Wireless Transceiver.
Definition RF24.h:136