Optimized high speed nRF24L01+ driver class documentation v1.4.8
TMRh20 2020 - Optimized fork of the nRF24L01+ driver
Loading...
Searching...
No Matches
examples_linux/streaming_data.py

Written by 2bndy5 in 2020

This is a simple example of using the RF24 class on a Raspberry Pi for streaming multiple payloads.

Remember to install the Python wrapper, then navigate to the "RF24/examples_linux" folder.
To run this example, enter

python3 streaming_data.py

and follow the prompts.

Note
this example requires python v3.7 or newer because it measures transmission time with time.monotonic_ns().
1"""
2A simple example of streaming data from 1 nRF24L01 transceiver to another.
3
4This example was written to be used on 2 devices acting as 'nodes'.
5
6See documentation at https://nRF24.github.io/RF24
7"""
8
9import time
10from RF24 import RF24, RF24_PA_LOW, RF24_DRIVER
11
12print(__file__) # print example name
13
14
21CSN_PIN = 0 # GPIO8 aka CE0 on SPI bus 0: /dev/spidev0.0
22if RF24_DRIVER == "MRAA":
23 CE_PIN = 15 # for GPIO22
24elif RF24_DRIVER == "wiringPi":
25 CE_PIN = 3 # for GPIO22
26else:
27 CE_PIN = 22
28radio = RF24(CE_PIN, CSN_PIN)
29
30# initialize the nRF24L01 on the spi bus
31if not radio.begin():
32 raise RuntimeError("radio hardware is not responding")
33
34# For this example, we will use different addresses
35# An address need to be a buffer protocol object (bytearray)
36address = [b"1Node", b"2Node"]
37# It is very helpful to think of an address as a path instead of as
38# an identifying device destination
39
40# to use different addresses on a pair of radios, we need a variable to
41# uniquely identify which address this radio will use to transmit
42# 0 uses address[0] to transmit, 1 uses address[1] to transmit
43radio_number = bool(
44 int(input("Which radio is this? Enter '0' or '1'. Defaults to '0' ") or 0)
45)
46
47# set the Power Amplifier level to -12 dBm since this test example is
48# usually run with nRF24L01 transceivers in close proximity of each other
49radio.setPALevel(RF24_PA_LOW) # RF24_PA_MAX is default
50
51# set the TX address of the RX node into the TX pipe
52radio.openWritingPipe(address[radio_number]) # always uses pipe 0
53
54# set the RX address of the TX node into a RX pipe
55radio.openReadingPipe(1, address[not radio_number]) # using pipe 1
56
57
58# Specify the number of bytes in the payload. This is also used to
59# specify the number of payloads in 1 stream of data
60SIZE = 32 # this is the default maximum payload size
61
62# To save time during transmission, we'll set the payload size to be only
63# what we need. For this example, we'll be using the default maximum 32
64radio.payloadSize = SIZE
65
66# for debugging, we have 2 options that print a large block of details
67# (smaller) function that prints raw register values
68# radio.printDetails()
69# (larger) function that prints human readable data
70# radio.printPrettyDetails()
71
72
73def make_buffer(buf_iter: int) -> bytes:
74 """Returns a dynamically created payloads
75
76 :param int buf_iter: The position of the payload in the data stream
77 """
78 # we'll use `SIZE` for the number of payloads in the list and the
79 # payloads' length
80 # prefix payload with a sequential letter to indicate which
81 # payloads were lost (if any)
82 buff = bytes([buf_iter + (65 if 0 <= buf_iter < 26 else 71)])
83 for j in range(SIZE - 1):
84 char = bool(j >= (SIZE - 1) / 2 + abs((SIZE - 1) / 2 - buf_iter))
85 char |= bool(j < (SIZE - 1) / 2 - abs((SIZE - 1) / 2 - buf_iter))
86 buff += bytes([char + 48])
87 return buff
88
89
90def master(count: int = 1):
91 """Uses all 3 levels of the TX FIFO to send a stream of data
92
93 :param int count: how many times to transmit the stream of data.
94 """
95 radio.stopListening() # put radio in TX mode
96 radio.flush_tx() # clear the TX FIFO so we can use all 3 levels
97 failures = 0 # keep track of manual retries
98 start_timer = time.monotonic_ns() # start timer
99 for multiplier in range(count): # repeat transmit the same data stream
100 buf_iter = 0 # iterator of payloads for the while loop
101 while buf_iter < SIZE: # cycle through all the payloads
102 buffer = make_buffer(buf_iter) # make a payload
103
104 if not radio.writeFast(buffer): # transmission failed
105 failures += 1 # increment manual retry count
106 if failures > 99 and buf_iter < 7 and multiplier < 2:
107 # we need to prevent an infinite loop
108 print("Too many failures detected. Aborting at payload ", buffer[0])
109 multiplier = count # be sure to exit the for loop
110 break # exit the while loop
111 radio.reUseTX() # resend payload in top level of TX FIFO
112 else: # transmission succeeded
113 buf_iter += 1
114 end_timer = time.monotonic_ns() # end timer
115 print(
116 f"Time to transmit data = {(end_timer - start_timer) / 1000} us.",
117 f"Detected {failures} failures.",
118 )
119
120
121def slave(timeout: int = 6):
122 """Listen for any payloads and print them out (suffixed with received
123 counter)
124
125 :param int timeout: The number of seconds to wait (with no transmission)
126 until exiting function.
127 """
128 radio.startListening() # put radio in RX mode
129 count = 0 # keep track of the number of received payloads
130 start_timer = time.monotonic() # start timer
131 while (time.monotonic() - start_timer) < timeout:
132 if radio.available():
133 count += 1
134 # retrieve the received packet's payload
135 receive_payload = radio.read(radio.payloadSize)
136 print("Received:", receive_payload, "-", count)
137 start_timer = time.monotonic() # reset timer on every RX payload
138
139 print("Nothing received in", timeout, "seconds. Leaving RX role")
140 # recommended behavior is to keep in TX mode while idle
141 radio.stopListening() # put the radio in TX mode
142
143
144def set_role() -> bool:
145 """Set the role using stdin stream. Role args can be specified using space
146 delimiters (e.g. 'R 10' calls `slave(10)` & 'T 3' calls `master(3)`)
147
148 :return:
149 - True when role is complete & app should continue running.
150 - False when app should exit
151 """
152 user_input = (
153 input(
154 "*** Enter 'R' for receiver role.\n"
155 "*** Enter 'T' for transmitter role.\n"
156 "*** Enter 'Q' to quit example.\n"
157 )
158 or "?"
159 )
160 user_input = user_input.split()
161 if user_input[0].upper().startswith("R"):
162 if len(user_input) > 1:
163 slave(int(user_input[1]))
164 else:
165 slave()
166 return True
167 if user_input[0].upper().startswith("T"):
168 if len(user_input) > 1:
169 master(int(user_input[1]))
170 else:
171 master()
172 return True
173 if user_input[0].upper().startswith("Q"):
174 radio.powerDown()
175 return False
176 print(user_input[0], "is an unrecognized input. Please try again.")
177 return set_role()
178
179
180if __name__ == "__main__":
181 try:
182 while set_role():
183 pass # continue example until 'Q' is entered
184 except KeyboardInterrupt:
185 print(" Keyboard Interrupt detected. Powering down radio.")
186 radio.powerDown()
187else:
188 print(" Run slave() on receiver\n Run master() on transmitter")
Driver class for nRF24L01(+) 2.4GHz Wireless Transceiver.
Definition RF24.h:116