Examples

RF24 examples

getting_started

A simple example of using the RF24 class

examples/getting_started.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""
Simple example of using the RF24 class.

See documentation at https://nRF24.github.io/pyRF24
"""

import time
import struct
from pyrf24 import RF24, RF24_PA_LOW, RF24_DRIVER

print(__file__)  # print example name

########### USER CONFIGURATION ###########
# See https://github.com/TMRh20/RF24/blob/master/pyRF24/readme.md
# Radio CE Pin, CSN Pin, SPI Speed
# CE Pin uses GPIO number with BCM and SPIDEV drivers, other platforms use
# their own pin numbering
# CS Pin addresses the SPI bus number at /dev/spidev<a>.<b>
# ie: RF24 radio(<ce_pin>, <a>*10+<b>); spidev1.0 is 10, spidev1.1 is 11 etc..
CSN_PIN = 0  # aka CE0 on SPI bus 0: /dev/spidev0.0
if RF24_DRIVER == "MRAA":
    CE_PIN = 15  # for GPIO22
elif RF24_DRIVER == "wiringPi":
    CE_PIN = 3  # for GPIO22
else:
    CE_PIN = 22
radio = RF24(CE_PIN, CSN_PIN)

# using the python keyword global is bad practice. Instead we'll use a 1 item
# list to store our float number for the payloads sent
payload = [0.0]

# For this example, we will use different addresses
# An address need to be a buffer protocol object (bytearray)
address = [b"1Node", b"2Node"]
# It is very helpful to think of an address as a path instead of as
# an identifying device destination

# to use different addresses on a pair of radios, we need a variable to
# uniquely identify which address this radio will use to transmit
# 0 uses address[0] to transmit, 1 uses address[1] to transmit
radio_number = bool(
    int(input("Which radio is this? Enter '0' or '1'. Defaults to '0' ") or 0)
)

# initialize the nRF24L01 on the spi bus
if not radio.begin():
    raise OSError("nRF24L01 hardware isn't responding")

# set the Power Amplifier level to -12 dBm since this test example is
# usually run with nRF24L01 transceivers in close proximity of each other
radio.set_pa_level(RF24_PA_LOW)  # RF24_PA_MAX is default

# set TX address of RX node into the TX pipe
radio.open_tx_pipe(address[radio_number])  # always uses pipe 0

# set RX address of TX node into an RX pipe
radio.open_rx_pipe(1, address[not radio_number])  # using pipe 1

# To save time during transmission, we'll set the payload size to be only what
# we need. A float value occupies 4 bytes in memory using struct.calcsize()
# "<f" means a little endian unsigned float
radio.payload_size = struct.calcsize("<f")

# for debugging
# radio.print_details()
# or for human readable data
# radio.print_pretty_details()


def master(count: int = 5):  # count = 5 will only transmit 5 packets
    """Transmits an incrementing float every second"""
    radio.listen = False  # ensures the nRF24L01 is in TX mode

    while count:
        # use struct.pack() to pack your data into a usable payload
        # into a usable payload
        buffer = struct.pack("<f", payload[0])
        # "<f" means a single little endian (4 byte) float value.
        start_timer = time.monotonic_ns()  # start timer
        result = radio.write(buffer)
        end_timer = time.monotonic_ns()  # end timer
        if not result:
            print("Transmission failed or timed out")
        else:
            print(
                "Transmission successful! Time to Transmit:",
                f"{(end_timer - start_timer) / 1000} us. Sent: {payload[0]}",
            )
            payload[0] += 0.01
        time.sleep(1)
        count -= 1


def slave(timeout: int = 6):
    """Polls the radio and prints the received value. This method expires
    after 6 seconds of no received transmission."""
    radio.listen = True  # put radio into RX mode and power up

    start = time.monotonic()
    while (time.monotonic() - start) < timeout:
        has_payload, pipe_number = radio.available_pipe()
        if has_payload:
            length = radio.payload_size  # grab the payload length
            # fetch 1 payload from RX FIFO
            received = radio.read(length)  # also clears radio.irq_dr status flag
            # expecting a little endian float, thus the format string "<f"
            # received[:4] truncates padded 0s in case dynamic payloads are disabled
            payload[0] = struct.unpack("<f", received[:4])[0]
            # print details about the received packet
            print(f"Received {length} bytes on pipe {pipe_number}: {payload[0]}")
            start = time.monotonic()  # reset the timeout timer

    # recommended behavior is to keep in TX mode while idle
    radio.listen = False  # put the nRF24L01 is in TX mode


acknowledgement_payloads

A simple example of using the RF24 class to attach an Acknowledgement (ACK) payload the automatically generated ACK packet.

examples/acknowledgement_payloads.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""
Simple example of using the library to transmit
and retrieve custom automatic acknowledgment payloads.

See documentation at https://nRF24.github.io/pyRF24
"""

import time
from pyrf24 import RF24, RF24_PA_LOW, RF24_DRIVER

print(__file__)  # print example name

########### USER CONFIGURATION ###########
# See https://github.com/TMRh20/RF24/blob/master/pyRF24/readme.md
# Radio CE Pin, CSN Pin, SPI Speed
# CE Pin uses GPIO number with BCM and SPIDEV drivers, other platforms use
# their own pin numbering
# CS Pin addresses the SPI bus number at /dev/spidev<a>.<b>
# ie: RF24 radio(<ce_pin>, <a>*10+<b>); spidev1.0 is 10, spidev1.1 is 11 etc..
CSN_PIN = 0  # aka CE0 on SPI bus 0: /dev/spidev0.0
if RF24_DRIVER == "MRAA":
    CE_PIN = 15  # for GPIO22
elif RF24_DRIVER == "wiringPi":
    CE_PIN = 3  # for GPIO22
else:
    CE_PIN = 22
radio = RF24(CE_PIN, CSN_PIN)

# using the python keyword global is bad practice. Instead we'll use a 1 item
# list to store our integer number for the payloads' counter
counter = [0]

# For this example, we will use different addresses
# An address need to be a buffer protocol object (bytearray)
address = [b"1Node", b"2Node"]
# It is very helpful to think of an address as a path instead of as
# an identifying device destination

# to use different addresses on a pair of radios, we need a variable to
# uniquely identify which address this radio will use to transmit
# 0 uses address[0] to transmit, 1 uses address[1] to transmit
radio_number = bool(
    int(input("Which radio is this? Enter '0' or '1'. Defaults to '0' ") or 0)
)

# initialize the nRF24L01 on the spi bus
if not radio.begin():
    raise OSError("nRF24L01 hardware isn't responding")

# set the Power Amplifier level to -12 dBm since this test example is
# usually run with nRF24L01 transceivers in close proximity of each other
radio.set_pa_level(RF24_PA_LOW)  # RF24_PA_MAX is default

# ACK payloads are dynamically sized, so we need to enable that feature also
radio.dynamic_payloads = True

# to enable the custom ACK payload feature
radio.ack_payloads = True

# set TX address of RX node into the TX pipe
radio.open_tx_pipe(address[radio_number])  # always uses pipe 0

# set RX address of TX node into an RX pipe
radio.open_rx_pipe(1, address[not radio_number])  # using pipe 1

# for debugging
# radio.print_pretty_details()


def master(count: int = 5):  # count = 5 will only transmit 5 packets
    """Transmits a payload every second and prints the ACK payload"""
    radio.listen = False  # put radio in TX mode

    while count:
        # construct a payload to send
        buffer = b"Hello \x00" + bytes([counter[0]])

        # send the payload and prompt
        start_timer = time.monotonic_ns()  # start timer
        result = radio.write(buffer)  # save the report
        end_timer = time.monotonic_ns()  # stop timer
        if result:
            # print timer results upon transmission success
            print(
                "Transmission successful! Time to transmit:",
                f"{int((end_timer - start_timer) / 1000)} us. Sent:",
                f"{buffer[:6].decode('utf-8')}{counter[0]}",
                end=" ",
            )
            if radio.available():
                # print the received ACK that was automatically sent
                result = radio.read(radio.get_dynamic_payload_size())
                print(f" Received: {result[:6].decode('utf-8')}{result[7:8][0]}")
                counter[0] += 1  # increment payload counter
            else:
                print(" Received an empty ACK packet")
        else:
            print("Transmission failed or timed out")
        time.sleep(1)  # let the RX node prepare a new ACK payload
        count -= 1


def slave(timeout: int = 6):
    """Prints the received value and sends an ACK payload"""
    radio.listen = True  # put radio into RX mode, power it up

    # setup the first transmission's ACK payload
    buffer = b"World \x00" + bytes([counter[0]])
    # we must set the ACK payload data and corresponding
    # pipe number [0,5]
    radio.write_ack_payload(1, buffer)  # load ACK for first response

    start = time.monotonic()  # start timer
    while (time.monotonic() - start) < timeout:
        has_payload, pipe_number = radio.available_pipe()
        if has_payload:
            length = radio.get_dynamic_payload_size()  # grab the payload length
            received = radio.read(length)  # fetch 1 payload from RX FIFO
            # increment counter from received payload
            counter[0] = received[7:8][0] + 1
            print(
                f"Received {length} bytes on pipe {pipe_number}:",
                f"{received[:6].decode('utf-8')}{received[7:8][0]} Sent:",
                f"{buffer[:6].decode('utf-8')}{counter[0]}",
            )
            start = time.monotonic()  # reset timer

            # build a new ACK payload
            buffer = b"World \x00" + bytes([counter[0]])
            radio.write_ack_payload(1, buffer)  # load ACK for next response

    # recommended behavior is to keep in TX mode while idle
    radio.listen = False  # put radio in TX mode & flush unused ACK payloads


manual_acknowledgements

A simple example of using the RF24 class to receive a payload and then send a responding payload as an Acknowledgement. This functionality is similar to using the nRF24L01’s ACK payload feature, but doe not append ACK payloads to the ACK packets.

examples/manual_acknowledgements.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""
Simple example of using the RF24 class to transmit and respond with
acknowledgment (ACK) transmissions. Notice that the auto-ack feature is
enabled, but this example doesn't use automatic ACK payloads because automatic
ACK payloads' data will always be outdated by 1 transmission. Instead, this
example uses a call and response paradigm.

See documentation at https://nRF24.github.io/pyRF24
"""

import time
import struct
from pyrf24 import RF24, RF24_PA_LOW, RF24_DRIVER

print(__file__)  # print example name

########### USER CONFIGURATION ###########
# See https://github.com/TMRh20/RF24/blob/master/pyRF24/readme.md
# Radio CE Pin, CSN Pin, SPI Speed
# CE Pin uses GPIO number with BCM and SPIDEV drivers, other platforms use
# their own pin numbering
# CS Pin addresses the SPI bus number at /dev/spidev<a>.<b>
# ie: RF24 radio(<ce_pin>, <a>*10+<b>); spidev1.0 is 10, spidev1.1 is 11 etc..
CSN_PIN = 0  # aka CE0 on SPI bus 0: /dev/spidev0.0
if RF24_DRIVER == "MRAA":
    CE_PIN = 15  # for GPIO22
elif RF24_DRIVER == "wiringPi":
    CE_PIN = 3  # for GPIO22
else:
    CE_PIN = 22
radio = RF24(CE_PIN, CSN_PIN)

# For this example, we will use different addresses
# An address need to be a buffer protocol object (bytearray)
address = [b"1Node", b"2Node"]
# It is very helpful to think of an address as a path instead of as
# an identifying device destination

# to use different addresses on a pair of radios, we need a variable to
# uniquely identify which address this radio will use to transmit
# 0 uses address[radio_number] to transmit, 1 uses address[not radio_number] to transmit
radio_number = bool(
    int(input("Which radio is this? Enter '0' or '1'. Defaults to '0' ") or 0)
)

# initialize the nRF24L01 on the spi bus
if not radio.begin():
    raise OSError("nRF24L01 hardware isn't responding")

# set the Power Amplifier level to -12 dBm since this test example is
# usually run with nRF24L01 transceivers in close proximity of each other
radio.set_pa_level(RF24_PA_LOW)  # RF24_PA_MAX is default

# set TX address of RX node into the TX pipe
radio.open_tx_pipe(address[radio_number])  # always uses pipe 0

# set RX address of TX node into an RX pipe
radio.open_rx_pipe(1, address[not radio_number])  # using pipe 1

# To save time during transmission, we'll set the payload size to be only what
# we need. A float value occupies 4 bytes in memory using struct.calcsize()
# "<b" means a little endian unsigned byte
# we also need an addition 7 bytes for the payload message
radio.payload_size = struct.calcsize("<b") + 7

# for debugging
radio.print_pretty_details()

# using the python keyword global is bad practice. Instead we'll use a 1 item
# list to store our float number for the payloads sent
payload = [0]


def master(count: int = 10):
    """Transmits a message and an incrementing integer every second"""
    radio.listen = False  # ensures the nRF24L01 is in TX mode

    while count:  # only transmit `count` packets
        # use struct.pack() to pack your data into a usable payload
        # "<b" means a single little endian unsigned byte.
        # NOTE we added a b"\x00" byte as a c-string's NULL terminating 0
        buffer = b"Hello \x00" + struct.pack("<b", payload[0])
        start_timer = time.monotonic_ns()  # start timer
        result = radio.write(buffer)
        if not result:
            print("Transmission failed or timed out")
        else:
            radio.listen = True
            timout = time.monotonic() * 1000 + 200  # use 200 ms timeout
            ack = b"\x00" * len(buffer)  # variable used for the response
            while ack[0] == 0 and time.monotonic() * 1000 < timout:
                if radio.available():
                    # get the response & save it to ack variable
                    ack = radio.read(radio.payload_size)
            end_timer = time.monotonic_ns()  # end timer
            radio.listen = False
            print(
                "Transmission successful. Sent: ",
                f"{buffer[:6].decode('utf-8')}{payload[0]}.",
                end=" ",
            )
            if ack[0] == 0:
                print("No response received.")
            else:
                # decode response's text as an string
                # NOTE ack[:6] ignores the NULL terminating 0
                response = ack[:6].decode("utf-8")
                # use struct.unpack() to get the repsonse's appended int
                # NOTE ack[7:] discards NULL terminating 0, and
                # "<b" means its a single little endian unsigned byte
                payload[0] = struct.unpack("<b", ack[7:])[0]
                print(
                    f"Received: {response}{payload[0]}. Roundtrip delay:",
                    f"{(end_timer - start_timer) / 1000} us.",
                )
        time.sleep(1)  # make example readable by slowing down transmissions
        count -= 1


def slave(timeout: int = 6):
    """Polls the radio and prints the received value. This method expires
    after 6 seconds of no received transmission"""
    radio.listen = True  # put radio into RX mode and power up

    start_timer = time.monotonic()  # start a timer to detect timeout
    while (time.monotonic() - start_timer) < timeout:
        # receive payloads or wait 6 seconds till timing out
        has_payload, pipe_number = radio.available_pipe()
        if has_payload:
            length = radio.payload_size  # grab the payload length
            received = radio.read(length)  # fetch 1 payload from RX FIFO
            # use struct.unpack() to get the payload's appended int
            # NOTE received[7:] discards NULL terminating 0, and
            # "<b" means its a single little endian unsigned byte
            payload[0] = struct.unpack("<b", received[7:])[0] + 1
            # use bytes() to pack our data into a usable payload
            # NOTE b"\x00" byte is a c-string's NULL terminating 0
            buffer = b"World \x00" + bytes([payload[0]])
            radio.listen = False  # set radio to TX mode
            radio.write_fast(buffer)  # load payload into radio's RX buffer
            # keep retrying to send response for 150 milliseconds
            response = radio.tx_standby(150)  # save response's result
            radio.listen = True  # set radio back into RX mode
            # print the payload received and the response's payload
            print(
                f"Received {length} bytes on pipe {pipe_number}:",
                f"{received[:6].decode('utf-8')}{payload[0] - 1}.",
                end=" ",
            )
            if response:
                print(f"Sent: {buffer[:6].decode('utf-8')}{payload[0]}")
            else:
                print("Response failed or timed out")
            start_timer = time.monotonic()  # reset the timeout timer

    # recommended behavior is to keep in TX mode while idle
    radio.listen = False  # put the nRF24L01 is in TX mode


multiceiver_demo

A simple example of using the RF24 class to receive data from multiple transmitting nRF24L01 transceivers. This technique is trademarked by Nordic Semiconductors as “MulticeiverTM

examples/multiceiver_demo.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
"""
Simple example of using 1 nRF24L01 to receive data from up to 6 other
transceivers. This technique is called "multiceiver" in the datasheet.

See documentation at https://nRF24.github.io/pyRF24
"""

import time
import struct
from pyrf24 import RF24, RF24_PA_LOW, RF24_DRIVER

print(__file__)  # print example name

########### USER CONFIGURATION ###########
# See https://github.com/TMRh20/RF24/blob/master/pyRF24/readme.md
# Radio CE Pin, CSN Pin, SPI Speed
# CE Pin uses GPIO number with BCM and SPIDEV drivers, other platforms use
# their own pin numbering
# CS Pin addresses the SPI bus number at /dev/spidev<a>.<b>
# ie: RF24 radio(<ce_pin>, <a>*10+<b>); spidev1.0 is 10, spidev1.1 is 11 etc..
CSN_PIN = 0  # aka CE0 on SPI bus 0: /dev/spidev0.0
if RF24_DRIVER == "MRAA":
    CE_PIN = 15  # for GPIO22
elif RF24_DRIVER == "wiringPi":
    CE_PIN = 3  # for GPIO22
else:
    CE_PIN = 22
radio = RF24(CE_PIN, CSN_PIN)

# setup the addresses for all transmitting nRF24L01 nodes
addresses = [
    b"\x78" * 5,
    b"\xf1\xb6\xb5\xb4\xb3",
    b"\xcd\xb6\xb5\xb4\xb3",
    b"\xa3\xb6\xb5\xb4\xb3",
    b"\x0f\xb6\xb5\xb4\xb3",
    b"\x05\xb6\xb5\xb4\xb3",
]
# It is very helpful to think of an address as a path instead of as
# an identifying device destination

# initialize the nRF24L01 on the spi bus
if not radio.begin():
    raise OSError("nRF24L01 hardware isn't responding")

# set the Power Amplifier level to -12 dBm since this test example is
# usually run with nRF24L01 transceivers in close proximity of each other
radio.set_pa_level(RF24_PA_LOW)  # RF24_PA_MAX is default

# To save time during transmission, we'll set the payload size to be only what
# we need.
# 2 int occupy 8 bytes in memory using len(struct.pack())
# "<ii" means 2x little endian unsigned int
radio.payload_size = struct.calcsize("<ii")

# for debugging
radio.print_pretty_details()


def master(node_number: int = 0, count: int = 6):
    """start transmitting to the base station.

    :param int node_number: the node's identifying index (from the
        the `addresses` list). This is a required parameter
    :param int count: the number of times that the node will transmit
        to the base station.
    """
    # According to the datasheet, the auto-retry features's delay value should
    # be "skewed" to allow the RX node to receive 1 transmission at a time.
    # So, use varying delay between retry attempts and 15 (at most) retry attempts
    radio.set_retries(((node_number * 3) % 12) + 3, 15)  # max value is 15 for both args

    radio.listen = False
    # set the TX address to the address of the base station.
    radio.open_tx_pipe(addresses[node_number])
    counter = 0
    # use the node_number to identify where the payload came from
    while counter < count:
        counter += 1
        # payloads will include the node_number and a payload ID character
        payload = struct.pack("<ii", node_number, counter)
        start_timer = time.monotonic_ns()
        report = radio.write(payload)
        end_timer = time.monotonic_ns()
        # show something to see it isn't frozen
        if report:
            print(
                f"Transmission of payloadID {counter} as node {node_number}",
                f"successfull! Transmission time: {(end_timer - start_timer) / 1000}",
                "us",
            )
        else:
            print("Transmission failed or timed out")
        time.sleep(0.5)  # slow down the test for readability


def slave(timeout=10):
    """Use the nRF24L01 as a base station for listening to all nodes"""
    # write the addresses to all pipes.
    for pipe_n, addr in enumerate(addresses):
        radio.open_rx_pipe(pipe_n, addr)
    radio.listen = True  # put base station into RX mode
    start_timer = time.monotonic()  # start timer
    while time.monotonic() - start_timer < timeout:
        has_payload, pipe_number = radio.available_pipe()
        if has_payload:
            length = radio.payload_size  # grab the payload length
            # unpack payload
            node_id, payload_id = struct.unpack("<ii", radio.read(radio.payload_size))
            # show the pipe number that received the payload
            print(
                f"Received {length} bytes on pipe {pipe_number} from node {node_id}.",
                f"PayloadID: {payload_id}",
            )
            start_timer = time.monotonic()  # reset timer with every payload
    radio.listen = False


streaming_data

A simple example of using the RF24 class to stream data.

examples/streaming_data.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""
Example of library usage for streaming multiple payloads.

See documentation at https://nRF24.github.io/pyRF24
"""

import time
from pyrf24 import RF24, RF24_PA_LOW, RF24_DRIVER

print(__file__)  # print example name

########### USER CONFIGURATION ###########
# See https://github.com/TMRh20/RF24/blob/master/pyRF24/readme.md
# Radio CE Pin, CSN Pin, SPI Speed
# CE Pin uses GPIO number with BCM and SPIDEV drivers, other platforms use
# their own pin numbering
# CS Pin addresses the SPI bus number at /dev/spidev<a>.<b>
# ie: RF24 radio(<ce_pin>, <a>*10+<b>); spidev1.0 is 10, spidev1.1 is 11 etc..
CSN_PIN = 0  # aka CE0 on SPI bus 0: /dev/spidev0.0
if RF24_DRIVER == "MRAA":
    CE_PIN = 15  # for GPIO22
elif RF24_DRIVER == "wiringPi":
    CE_PIN = 3  # for GPIO22
else:
    CE_PIN = 22
radio = RF24(CE_PIN, CSN_PIN)

# For this example, we will use different addresses
# An address need to be a buffer protocol object (bytearray)
address = [b"1Node", b"2Node"]
# It is very helpful to think of an address as a path instead of as
# an identifying device destination

# to use different addresses on a pair of radios, we need a variable to
# uniquely identify which address this radio will use to transmit
# 0 uses address[0] to transmit, 1 uses address[1] to transmit
radio_number = bool(
    int(input("Which radio is this? Enter '0' or '1'. Defaults to '0' ") or 0)
)

# initialize the nRF24L01 on the spi bus
if not radio.begin():
    raise OSError("nRF24L01 hardware isn't responding")

# set the Power Amplifier level to -12 dBm since this test example is
# usually run with nRF24L01 transceivers in close proximity of each other
radio.set_pa_level(RF24_PA_LOW)  # RF24_PA_MAX is default

# set TX address of RX node into the TX pipe
radio.open_tx_pipe(address[radio_number])  # always uses pipe 0

# set RX address of TX node into an RX pipe
radio.open_rx_pipe(1, address[not radio_number])  # using pipe 1

# for debugging
radio.print_pretty_details()


def make_buffer(buf_iter: int, size: int = 32):
    """return a list of payloads"""
    # we'll use `size` for the number of payloads in the list and the
    # payloads' length
    # prefix payload with a sequential letter to indicate which
    # payloads were lost (if any)
    buff = bytes([buf_iter + (65 if 0 <= buf_iter < 26 else 71)])
    for j in range(size - 1):
        char = bool(j >= (size - 1) / 2 + abs((size - 1) / 2 - buf_iter))
        char |= bool(j < (size - 1) / 2 - abs((size - 1) / 2 - buf_iter))
        buff += bytes([char + 48])
    return buff


def master(count: int = 1, size: int = 32):
    """Uses all 3 levels of the TX FIFO `RF24.writeFast()`"""
    if size < 6:
        print("setting size to 6;", size, "is not allowed for this test.")
        size = 6

    # save on transmission time by setting the radio to only transmit the
    #  number of bytes we need to transmit
    radio.payload_size = size  # the default is the maximum 32 bytes

    radio.listen = False  # ensures the nRF24L01 is in TX mode
    for cnt in range(count):  # transmit the same payloads this many times
        radio.flush_tx()  # clear the TX FIFO so we can use all 3 levels
        # NOTE the write_only parameter does not initiate sending
        buf_iter = 0  # iterator of payloads for the while loop
        failures = 0  # keep track of manual retries
        start_timer = time.monotonic() * 1000  # start timer
        while buf_iter < size:  # cycle through all the payloads
            buf = make_buffer(buf_iter, size)  # make a payload
            if not radio.write_fast(buf):
                # reception failed; we need to reset the irq_rf flag
                failures += 1  # increment manual retries
                radio.reuse_tx()
                if failures > 99 and buf_iter < 7 and cnt < 2:
                    # we need to prevent an infinite loop
                    print("Make sure slave() node is listening. Quitting master_fifo()")
                    buf_iter = size + 1  # be sure to exit the while loop
                    radio.flush_tx()  # discard all payloads in TX FIFO
                    break
            buf_iter += 1
        end_timer = time.monotonic() * 1000  # end timer
        print(
            f"Transmission took {end_timer - start_timer} ms with",
            f"{failures} failures detected.",
        )


def slave(timeout: int = 5, size: int = 32):
    """Stops listening after a `timeout` with no response"""
    # set address of TX node into an RX pipe. NOTE you MUST specify

    # save on transmission time by setting the radio to only transmit the
    #  number of bytes we need to transmit
    radio.payload_size = size  # the default is the maximum 32 bytes

    radio.listen = True  # put radio into RX mode and power up
    count = 0  # keep track of the number of received payloads
    start_timer = time.monotonic()  # start timer
    while time.monotonic() < start_timer + timeout:
        if radio.available():
            count += 1
            # retrieve the received packet's payload
            length = radio.get_dynamic_payload_size()
            receive_payload = radio.read(length)
            print(f"Received: {receive_payload} - {count}")
            start_timer = time.monotonic()  # reset timer on every RX payload

    # recommended behavior is to keep in TX mode while idle
    radio.listen = False  # put the nRF24L01 is in TX mode


interrupt_configure

A simple example of using the nRF24L01’s IRQ pin to catch different interrupt request events.

examples/interrupt_configure.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
"""
Simple example of detecting (and verifying) the IRQ (interrupt) pin on the
nRF24L01

See documentation at https://nRF24.github.io/pyRF24
"""

import time
from pyrf24 import RF24, RF24_PA_LOW, RF24_DRIVER

try:
    import gpiod
    from gpiod.line import Edge
except ImportError as exc:
    raise ImportError(
        "This script requires gpiod installed for observing the IRQ pin. Please run\n"
        "\n    pip install gpiod\n\nMore details at https://pypi.org/project/gpiod/"
    ) from exc

try:  # try RPi5 gpio chip first
    chip_path = "/dev/gpiochip4"
    chip = gpiod.Chip(chip_path)
except FileNotFoundError:  # fall back to gpio chip for RPi4 or older
    chip_path = "/dev/gpiochip0"
    chip = gpiod.Chip(chip_path)
finally:
    print(__file__)  # print example name
    # print gpio chip info
    info = chip.get_info()
    print(f"Using {info.name} [{info.label}] ({info.num_lines} lines)")


########### USER CONFIGURATION ###########
# See https://github.com/TMRh20/RF24/blob/master/pyRF24/readme.md
# Radio CE Pin, CSN Pin, SPI Speed
# CE Pin uses GPIO number with BCM and SPIDEV drivers, other platforms use
# their own pin numbering
# CS Pin addresses the SPI bus number at /dev/spidev<a>.<b>
# ie: RF24 radio(<ce_pin>, <a>*10+<b>); spidev1.0 is 10, spidev1.1 is 11 etc..
CSN_PIN = 0  # aka CE0 on SPI bus 0: /dev/spidev0.0
if RF24_DRIVER == "MRAA":
    CE_PIN = 15  # for GPIO22
elif RF24_DRIVER == "wiringPi":
    CE_PIN = 3  # for GPIO22
else:
    CE_PIN = 22
radio = RF24(CE_PIN, CSN_PIN)

# select your digital input pin that's connected to the IRQ pin on the nRF24L01
IRQ_PIN = 24

# For this example, we will use different addresses
# An address need to be a buffer protocol object (bytearray)
address = [b"1Node", b"2Node"]
# It is very helpful to think of an address as a path instead of as
# an identifying device destination

# to use different addresses on a pair of radios, we need a variable to
# uniquely identify which address this radio will use to transmit
# 0 uses address[0] to transmit, 1 uses address[1] to transmit
radio_number = bool(
    int(input("Which radio is this? Enter '0' or '1'. Defaults to '0' ") or 0)
)

# initialize the nRF24L01 on the spi bus
if not radio.begin():
    raise OSError("nRF24L01 hardware isn't responding")

# this example uses the ACK payload to trigger the IRQ pin active for
# the "on data received" event
radio.ack_payloads = True  # enable ACK payloads
radio.dynamic_payloads = True  # ACK payloads are dynamically sized

# set the Power Amplifier level to -12 dBm since this test example is
# usually run with nRF24L01 transceivers in close proximity of each other
radio.set_pa_level(RF24_PA_LOW)  # RF24_PA_MAX is default

# set TX address of RX node into the TX pipe
radio.open_tx_pipe(address[radio_number])  # always uses pipe 0

# set RX address of TX node into an RX pipe
radio.open_rx_pipe(1, address[not radio_number])  # using pipe 1

# for debugging
# radio.print_pretty_details()

# For this example, we'll be using a payload containing
# a string that changes on every transmission. (successful or not)
# Make a couple tuples of payloads & an iterator to traverse them
pl_iterator = [0]  # use a 1-item list instead of python's global keyword
tx_payloads = (b"Ping ", b"Pong ", b"Radio", b"1FAIL")
ack_payloads = (b"Yak ", b"Back", b" ACK")


def interrupt_handler():
    """This function is called when IRQ pin is detected active LOW"""
    print("\tIRQ pin went active LOW.")
    tx_ds, tx_df, rx_dr = radio.what_happened()  # update IRQ status flags
    print(f"\ttx_ds: {tx_ds}, tx_df: {tx_df}, rx_dr: {rx_dr}")
    if pl_iterator[0] == 0:
        print("'data ready' event test", ("passed" if rx_dr else "failed"))
    elif pl_iterator[0] == 1:
        print("'data sent' event test", ("passed" if tx_ds else "failed"))
    elif pl_iterator[0] == 2:
        print("'data fail' event test", ("passed" if tx_df else "failed"))


# setup IRQ GPIO pin
irq_line = gpiod.request_lines(
    path=chip_path,
    consumer="pyrf24/examples/interrupt",  # optional
    config={IRQ_PIN: gpiod.LineSettings(edge_detection=Edge.FALLING)},
)


def _wait_for_irq(timeout: float = 5):
    """Wait till IRQ_PIN goes active (LOW).
    IRQ pin is LOW when activated. Otherwise it is always HIGH
    """
    # wait up to ``timeout`` seconds for event to be detected.
    if not irq_line.wait_edge_events(timeout):
        print(f"\tInterrupt event not detected for {timeout} seconds!")
        return False
    # read event from kernel buffer
    for event in irq_line.read_edge_events():
        if event.line_offset == IRQ_PIN and event.event_type is event.Type.FALLING_EDGE:
            return True
    return False


def master():
    """Transmits 4 times and reports results

    1. successfully receive ACK payload first
    2. successfully transmit on second
    3. send a third payload to fill RX node's RX FIFO
       (supposedly making RX node unresponsive)
    4. intentionally fail transmit on the fourth
    """
    radio.listen = False  # put radio in TX mode

    # on data ready test
    print("\nConfiguring IRQ pin to only ignore 'on data sent' event")
    radio.mask_irq(True, False, False)  # args = tx_ds, tx_df, rx_dr
    print("    Pinging slave node for an ACK payload...")
    pl_iterator[0] = 0
    radio.start_fast_write(tx_payloads[0])
    if _wait_for_irq():
        interrupt_handler()

    # on "data sent" test
    print("\nConfiguring IRQ pin to only ignore 'on data ready' event")
    radio.mask_irq(False, False, True)  # args = tx_ds, tx_df, rx_dr
    print("    Pinging slave node again...")
    pl_iterator[0] = 1
    radio.start_fast_write(tx_payloads[1])
    if _wait_for_irq():
        interrupt_handler()

    # trigger slave node to exit by filling the slave node's RX FIFO
    print("\nSending one extra payload to fill RX FIFO on slave node.")
    print("Disabling IRQ pin for all events.")
    radio.mask_irq(True, True, True)  # args = tx_ds, tx_df, rx_dr
    if radio.write(tx_payloads[2]):
        print("Slave node should not be listening anymore.")
    else:
        print("Slave node was unresponsive.")

    # on "data fail" test
    print("\nConfiguring IRQ pin to go active for all events.")
    radio.mask_irq(False, False, False)  # args = tx_ds, tx_df, rx_dr
    print("    Sending a ping to inactive slave node...")
    radio.flush_tx()  # just in case any previous tests failed
    pl_iterator[0] = 2
    radio.start_fast_write(tx_payloads[3])
    if _wait_for_irq():
        interrupt_handler()
    radio.flush_tx()  # flush artifact payload in TX FIFO from last test
    # all 3 ACK payloads received were 4 bytes each, and RX FIFO is full
    # so, fetching 12 bytes from the RX FIFO also flushes RX FIFO
    print("\nComplete RX FIFO:", radio.read(12))


def slave(timeout=6):  # will listen for 6 seconds before timing out
    """Only listen for 3 payload from the master node"""
    # the "data ready" event will trigger in RX mode
    # the "data sent" or "data fail" events will trigger when we
    # receive with ACK payloads enabled (& loaded in TX FIFO)
    print("\nDisabling IRQ pin for all events.")
    radio.mask_irq(True, True, True)  # args = tx_ds, tx_df, rx_dr
    # setup radio to receive pings, fill TX FIFO with ACK payloads
    radio.write_ack_payload(1, ack_payloads[0])
    radio.write_ack_payload(1, ack_payloads[1])
    radio.write_ack_payload(1, ack_payloads[2])
    radio.listen = True  # start listening & clear irq_dr flag
    start_timer = time.monotonic()  # start timer now
    while not radio.is_fifo(False, False) and time.monotonic() - start_timer < timeout:
        # if RX FIFO is not full and timeout is not reached, then keep waiting
        pass
    time.sleep(0.5)  # wait for last ACK payload to transmit
    radio.listen = False  # put radio in TX mode & discard any ACK payloads
    if radio.available():  # if RX FIFO is not empty (timeout did not occur)
        # all 3 payloads received were 5 bytes each, and RX FIFO is full
        # so, fetching 15 bytes from the RX FIFO also flushes RX FIFO
        print("Complete RX FIFO:", radio.read(15))


def set_role():
    """Set the role using stdin stream. Timeout arg for slave() can be
    specified using a space delimiter (e.g. 'R 10' calls `slave(10)`)

    :return:
        - True when role is complete & app should continue running.
        - False when app should exit
    """
    user_input = (
        input(
            "*** Enter 'R' for receiver role.\n"
            "*** Enter 'T' for transmitter role.\n"
            "*** Enter 'Q' to quit example.\n"
        )
        or "?"
    )
    user_input = user_input.split()
    if user_input[0].upper().startswith("R"):
        slave(*[int(x) for x in user_input[1:2]])
        return True
    if user_input[0].upper().startswith("T"):
        master()
        return True
    if user_input[0].upper().startswith("Q"):
        radio.power = False
        return False
    print(user_input[0], "is an unrecognized input. Please try again.")
    return set_role()


if __name__ == "__main__":
    try:
        while set_role():
            pass  # continue example until 'Q' is entered
    except KeyboardInterrupt:
        print(" Keyboard Interrupt detected. Exiting...")
        radio.power = False
else:
    print(
        f"Make sure the IRQ pin is connected to the GPIO{IRQ_PIN}",
        "Run slave() on receiver",
        "Run master() on transmitter",
        sep="\n",
    )

Fake BLE example

A simple example of using the nRF24L01 as fake BLE beacon.

examples/fake_ble_test.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
"""
This example uses the nRF24L01 as a 'fake' BLE Beacon

See documentation at https://nRF24.github.io/pyRF24
"""

import time
from pyrf24 import (
    RF24,
    RF24_PA_LOW,
    RF24_DRIVER,
    FakeBLE,
    chunk,
    address_repr,
    UrlServiceData,
    BatteryServiceData,
    TemperatureServiceData,
)

print(__file__)  # print example name

########### USER CONFIGURATION ###########
# See https://github.com/TMRh20/RF24/blob/master/pyRF24/readme.md
# Radio CE Pin, CSN Pin, SPI Speed
# CE Pin uses GPIO number with BCM and SPIDEV drivers, other platforms use
# their own pin numbering
# CS Pin addresses the SPI bus number at /dev/spidev<a>.<b>
# ie: RF24 radio(<ce_pin>, <a>*10+<b>); spidev1.0 is 10, spidev1.1 is 11 etc..
CSN_PIN = 0  # aka CE0 on SPI bus 0: /dev/spidev0.0
if RF24_DRIVER == "MRAA":
    CE_PIN = 15  # for GPIO22
elif RF24_DRIVER == "wiringPi":
    CE_PIN = 3  # for GPIO22
else:
    CE_PIN = 22
radio = RF24(CE_PIN, CSN_PIN)
ble = FakeBLE(radio)

if not ble.begin():
    raise OSError("radio hardware not responding")

# the name parameter is going to be its broadcasted BLE name
# this can be changed at any time using the `name` attribute
# ble.name = b"foobar"

# you can optionally set the arbitrary MAC address to be used as the
# BLE device's MAC address. Otherwise this is randomly generated upon
# instantiation of the FakeBLE object.
# ble.mac = b"\x19\x12\x14\x26\x09\xE0"

# set the Power Amplifier level to -12 dBm since this test example is
# usually run with nRF24L01 transceiver in close proximity to the
# BLE scanning application
radio.pa_level = RF24_PA_LOW


def _prompt(remaining):
    if remaining % 5 == 0 or remaining < 5:
        if remaining - 1:
            print(remaining, "advertisements left to go!")
        else:
            print(remaining, "advertisement left to go!")


# create an object for manipulating the battery level data
battery_service = BatteryServiceData()
# battery level data is 1 unsigned byte representing a percentage
battery_service.data = 85


def master(count=50):
    """Sends out the device information."""
    # using the "with" statement is highly recommended if the nRF24L01 is
    # to be used for more than a BLE configuration
    ble.name = b"nRF24L01"
    # include the radio's pa_level attribute in the payload
    ble.show_pa_level = True
    print(
        "available bytes in next payload:",
        ble.len_available(chunk(battery_service.buffer)),
    )  # using chunk() gives an accurate estimate of available bytes
    radio.listen = False  # ensure radio is in TX mode
    for i in range(count):  # advertise data this many times
        if ble.len_available(chunk(battery_service.buffer)) >= 0:
            _prompt(count - i)  # something to show that it isn't frozen
            # broadcast the device name, MAC address, &
            # battery charge info; 0x16 means service data
            ble.advertise(battery_service.buffer, data_type=0x16)
            # channel hoping is recommended per BLE specs
            ble.hop_channel()
            time.sleep(0.5)  # wait till next broadcast
    ble.show_pa_level, ble.name = (False, None)


# create an object for manipulating temperature measurements
temperature_service = TemperatureServiceData()
# temperature's float data has up to 2 decimal places of percision
temperature_service.data = 42.0


def send_temp(count=50):
    """Sends out a fake temperature."""
    ble.name = b"nRF24L01"
    print(
        "available bytes in next payload:",
        ble.len_available(chunk(temperature_service.buffer)),
    )
    radio.listen = False  # ensure radio is in TX mode
    for i in range(count):
        if ble.len_available(chunk(temperature_service.buffer)) >= 0:
            _prompt(count - i)
            # broadcast a temperature measurement; 0x16 means service data
            ble.advertise(temperature_service.buffer, data_type=0x16)
            ble.hop_channel()
            time.sleep(0.2)
    ble.name = None


# use the Eddystone protocol from Google to broadcast a URL as
# service data. We'll need an object to manipulate that also
url_service = UrlServiceData()
# the data attribute converts a URL string into a simplified
# bytes object using byte codes defined by the Eddystone protocol.
url_service.data = "http://www.google.com"
# Eddystone protocol requires an estimated TX PA level at 1 meter
# lower this estimate since we lowered the actual `ble.pa_level`
url_service.pa_level_at_1_meter = -45  # defaults to -25 dBm


def send_url(count=50):
    """Sends out a URL."""
    print(
        "available bytes in next payload:",
        ble.len_available(chunk(url_service.buffer)),
    )
    # NOTE we did NOT set a device name in this with block
    radio.listen = False  # ensure radio is in TX mode
    for i in range(count):
        # URLs easily exceed the nRF24L01's max payload length
        if ble.len_available(chunk(url_service.buffer)) >= 0:
            _prompt(count - i)
            ble.advertise(url_service.buffer, 0x16)
            ble.hop_channel()
            time.sleep(0.2)


def slave(timeout=6):
    """read and decipher BLE payloads for `timeout` seconds."""
    radio.listen = True  # ensure radio is in RX mode
    end_timer = time.monotonic() + timeout
    while time.monotonic() <= end_timer:
        if ble.available():
            result = ble.read()
            print(
                "received payload from MAC address",
                address_repr(result.mac, delimit=":"),
            )
            if result.name is not None:
                print("\tdevice name:", result.name)
            if result.pa_level is not None:
                print("\tdevice transmitting PA Level:", result.pa_level, "dbm")
            for service_data in result.data:
                if isinstance(service_data, (bytearray, bytes)):
                    print("\traw buffer:", address_repr(service_data, False, " "))
                else:
                    print("\t" + repr(service_data))
    radio.listen = False  # ensure radio is in TX mode (preferred when idling)


Networking examples

RF24Network examples

A simple example of using the nRF24L01 as network child (non-master) node.

examples/network_helloworld_tx.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#!/usr/bin/env python
"""
Simplest possible example of using RF24Network.

TRANSMITTER NODE
Sends messages from to receiver.

See documentation at https://nRF24.github.io/pyRF24
"""

import struct
import time
from pyrf24 import RF24, RF24Network, RF24NetworkHeader, RF24_DRIVER

print(__file__)  # print example name

CSN_PIN = 0  # aka CE0 on SPI bus 0: /dev/spidev0.0
if RF24_DRIVER == "MRAA":
    CE_PIN = 15  # for GPIO22
elif RF24_DRIVER == "wiringPi":
    CE_PIN = 3  # for GPIO22
else:
    CE_PIN = 22
radio = RF24(CE_PIN, CSN_PIN)
network = RF24Network(radio)

# Address of our node in Octal format (01,021, etc)
THIS_NODE = 0o1

# Address of the other node
OTHER_NODE = 0o0

# milliseconds - How long to wait before sending the next message
INTERVAL = 2000


# initialize the nRF24L01 on the spi bus
if not radio.begin():
    raise OSError("nRF24L01 hardware isn't responding")

radio.channel = 90
network.begin(THIS_NODE)
radio.print_pretty_details()

PACKETS_SENT = 0
LAST_SENT = time.monotonic_ns() / 1000000

try:
    while True:
        network.update()
        now = int(time.monotonic_ns() / 1000000)

        # If it's time to send a message, send it!
        if now - LAST_SENT >= INTERVAL:
            LAST_SENT = now
            PACKETS_SENT += 1
            payload = struct.pack("<LL", now, PACKETS_SENT)
            ok = network.write(RF24NetworkHeader(OTHER_NODE), payload)
            print(f"Sending {PACKETS_SENT}...", ("ok" if ok else "failed"))
except KeyboardInterrupt:
    print("powering down radio and exiting.")
    radio.power = False

A simple example of using the nRF24L01 as a network’s master node.

examples/network_helloworld_rx.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#!/usr/bin/env python
"""
Simplest possible example of using RF24Network

RECEIVER NODE
Listens for messages from the transmitter and prints them out.

See documentation at https://nRF24.github.io/pyRF24
"""

import struct
from pyrf24 import RF24, RF24Network, RF24_DRIVER

print(__file__)  # print example name

CSN_PIN = 0  # aka CE0 on SPI bus 0: /dev/spidev0.0
if RF24_DRIVER == "MRAA":
    CE_PIN = 15  # for GPIO22
elif RF24_DRIVER == "wiringPi":
    CE_PIN = 3  # for GPIO22
else:
    CE_PIN = 22
radio = RF24(CE_PIN, CSN_PIN)
network = RF24Network(radio)

# Address of our node in Octal format (01, 021, etc)
THIS_NODE = 0o0  # make this node behave like the network master node

# initialize the nRF24L01 on the spi bus
if not radio.begin():
    raise OSError("nRF24L01 hardware isn't responding")

radio.channel = 90
network.begin(THIS_NODE)
radio.print_pretty_details()

EXPECTED_SIZE = struct.calcsize("<LL")

try:
    while True:
        network.update()
        while network.available():
            header, payload = network.read()
            print("payload length ", len(payload))
            millis, packet_count = struct.unpack("<LL", payload[:EXPECTED_SIZE])
            print(
                f"Received payload {packet_count} at (origin's timestamp) {millis}.",
                f"Header details {header.to_string()}",
            )
except KeyboardInterrupt:
    print("powering down radio and exiting.")
    radio.power = False

RF24Mesh examples

A simple example of using the nRF24L01 as mesh network child (non-master) node.

examples/mesh_example.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""
Simplest RF24Mesh example that transmits a time stamp (in milliseconds) 1 per second.

See documentation at https://nRF24.github.io/pyRF24
"""

import sys
import time
import struct
from pyrf24 import RF24, RF24Network, RF24Mesh, MESH_DEFAULT_ADDRESS, RF24_DRIVER

print(__file__)  # print example name

start = time.monotonic()


def millis():
    """:Returns: Delta time since started example in milliseconds. Wraps value around
    the width of a ``long`` integer."""
    return int((time.monotonic() - start) * 1000) % (2**32)


CSN_PIN = 0  # aka CE0 on SPI bus 0: /dev/spidev0.0
if RF24_DRIVER == "MRAA":
    CE_PIN = 15  # for GPIO22
elif RF24_DRIVER == "wiringPi":
    CE_PIN = 3  # for GPIO22
else:
    CE_PIN = 22
radio = RF24(CE_PIN, CSN_PIN)
network = RF24Network(radio)
mesh = RF24Mesh(radio, network)

mesh.node_id = 4
print("starting nodeID", mesh.node_id)
if not mesh.begin():
    if radio.is_chip_connected:
        try:
            print("Could not connect to network.\nConnecting to mesh...")
            while mesh.renew_address() == MESH_DEFAULT_ADDRESS:
                print("Could not connect to network.\nConnecting to mesh...")
        except KeyboardInterrupt:
            radio.power = False
            sys.exit()
    else:
        raise OSError("Radio hardware not responding or could not connect to mesh.")
radio.print_pretty_details()

TIMER = 0

try:
    while True:
        # Call mesh.update to keep the network updated
        mesh.update()

        if (millis() - TIMER) >= 1000:
            TIMER = millis()

            if not mesh.write(struct.pack("L", TIMER), ord("M")):
                # If a write fails, check connectivity to the mesh network
                if not mesh.check_connection():
                    # The address could be refreshed per a specified time frame
                    # or only when sequential writes fail, etc.
                    print("Send fail. Renewing Address...")
                    while mesh.renew_address() == MESH_DEFAULT_ADDRESS:
                        print("Renewing Address...")
                else:
                    print("Send fail, Test OK")
            else:
                print("Send OK:", TIMER)
        time.sleep(0.001)  # delay 1 ms
except KeyboardInterrupt:
    print("powering down radio and exiting.")
    radio.power = False

A simple example of using the nRF24L01 as a mesh network’s master node.

examples/mesh_example_master.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
"""
Example of using the rf24_mesh module to operate the nRF24L01 transceiver as
a Mesh network master node.

See documentation at https://nRF24.github.io/pyRF24
"""

from pyrf24 import RF24, RF24Network, RF24Mesh, RF24_DRIVER

print(__file__)  # print example name

CSN_PIN = 0  # aka CE0 on SPI bus 0: /dev/spidev0.0
if RF24_DRIVER == "MRAA":
    CE_PIN = 15  # for GPIO22
elif RF24_DRIVER == "wiringPi":
    CE_PIN = 3  # for GPIO22
else:
    CE_PIN = 22
radio = RF24(CE_PIN, CSN_PIN)
network = RF24Network(radio)
mesh = RF24Mesh(radio, network)
mesh.node_id = 0
if not mesh.begin():
    # if mesh.begin() returns false for a master node,
    # then radio.begin() returned false.
    raise OSError("Radio hardware not responding.")
radio.print_pretty_details()

try:
    while True:
        mesh.update()
        mesh.dhcp()

        while network.available():
            header, payload = network.read()
            print(f"Received message {header.to_string()}")
except KeyboardInterrupt:
    print("powering down radio and exiting.")
    radio.power = False

Generic Network example

A generic example demonstrating the differences in code for using RF24Network or RF24Mesh API.

examples/general_network_test.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
"""
An all-purpose example of using the nRF24L01 transceiver in a network of nodes.

See documentation at https://nRF24.github.io/pyRF24
"""

import time
import struct
from pyrf24 import (
    RF24,
    RF24Network,
    RF24NetworkHeader,  # only need to construct frame headers for RF24Network.write()
    RF24Mesh,
    MAX_PAYLOAD_SIZE,
    MESH_DEFAULT_ADDRESS,
    RF24_PA_LOW,
    RF24_DRIVER,
)

print(__file__)  # print example name

CSN_PIN = 0  # aka CE0 on SPI bus 0: /dev/spidev0.0
if RF24_DRIVER == "MRAA":
    CE_PIN = 15  # for GPIO22
elif RF24_DRIVER == "wiringPi":
    CE_PIN = 3  # for GPIO22
else:
    CE_PIN = 22
radio = RF24(CE_PIN, CSN_PIN)
network = RF24Network(radio)
mesh = RF24Mesh(radio, network)

IS_MESH = (
    (
        input("Would you like to run as a mesh network node (y/n)? Defaults to 'Y' ")
        or "Y"
    )
    .upper()
    .startswith("Y")
)

# to use different addresses on a set of radios, we need a variable to
# uniquely identify this radio.
THIS_NODE = 0
print(
    "Remember, the master node always uses `0` as the mesh_address and node_id."
    "\nWhich node is this? Enter",
    end=" ",
)
if IS_MESH:
    # node_id must be less than 256
    THIS_NODE = int(input("a unique int. Defaults to '0' ") or "0") & 0xFF
else:
    # logical node_address is in octal
    THIS_NODE = int(input("an octal int. Defaults to '0' ") or "0", 8)

if not radio.begin():
    # this is done with mesh.begin(), but it helps to check if the radio hardware is ok
    raise OSError("radio hardware not responding")
print("    radio hardware initialized")
if IS_MESH:
    mesh.node_id = THIS_NODE
    if THIS_NODE:
        print(f"Connecting to mesh network as node_id {THIS_NODE}...", end=" ")
    else:
        print("Acting as mesh master node.")
    # RF24Mesh C++ library uses channel 97 by default
    if not mesh.begin():
        print("failed. Try again with mesh.renew_address()")
    elif THIS_NODE:
        print(f"ok. Using assigned address {oct(mesh.mesh_address)}")
else:
    # C++ examples use channel 90 for RF24Network library
    radio.channel = 90
    network.begin(THIS_NODE)
    if THIS_NODE:
        print(f"Using network address {oct(THIS_NODE)}.")
    else:
        print("Acting as network master node.")

# set the Power Amplifier level to -12 dBm since this test example is
# usually run with nRF24L01 transceivers in close proximity
radio.pa_level = RF24_PA_LOW

# This example covers fragmented payloads also. Set a sentinel for readability.
MAX_FRAG_SIZE = 24

# using the python keyword global is bad practice. Instead we'll use a 1 item
# list to store our number of the payloads sent
packets_sent = [0]


def idle(timeout: int = 30, strict_timeout: bool = False):
    """Listen for any payloads and print the transaction

    :param int timeout: The number of seconds to wait (with no transmission)
        until exiting function.
    :param bool strict_timeout: If set to True, then the timer is not reset when
        processing incoming traffic
    """
    print("idling for", timeout, "seconds")
    start_timer = time.monotonic()
    while (time.monotonic() - start_timer) < timeout:
        if IS_MESH and mesh.update() and not mesh.node_id:
            # if this is a mesh master node and update() returned a non-zero value
            mesh.dhcp()  # only needed for master node
        else:
            network.update()  # keep the network layer current
        while network.available():
            if not strict_timeout:
                start_timer = time.monotonic()  # reset timer
            header, payload = network.read()
            payload_len = len(payload)
            print("Received payload", end=" ")
            # C++ examples only use 1 or 2 long ints as small messages
            if payload_len < MAX_FRAG_SIZE and payload_len % 4 == 0:
                # if not a large fragmented message and multiple of 4 bytes
                fmt = "<" + "L" * int(payload_len / 4)
                print(struct.unpack(fmt, payload), end=" ")
            print(header.to_string(), "length", payload_len)


def emit(
    node: int = not THIS_NODE, frag: bool = False, count: int = 5, interval: int = 1
):
    """Transmits 1 (or 2) integers or a large buffer

    :param int node: The target node for network transmissions.
        If using RF24Mesh, this is a unique node_id.
        If using RF24Network, this is the node's logical address.
    :param bool frag: Only use fragmented messages?
    :param int count: The max number of messages to transmit.
    :param int interval: time (in seconds) between transmitting messages.
    """
    while count:
        # for this example, we use idle() to update() the network/mesh layer
        idle(interval, True)  # idle till its time to emit
        count -= 1
        packets_sent[0] += 1
        # TMRh20's RF24Mesh examples use 1 long int containing a timestamp (in ms)
        message = struct.pack("<L", int(time.monotonic() * 1000))
        if frag:
            message = bytes(range((packets_sent[0] + MAX_FRAG_SIZE) % MAX_PAYLOAD_SIZE))
        elif not IS_MESH:  # if using RF24Network
            # TMRh20's RF24Network examples use 2 long ints, so add another
            message += struct.pack("<L", packets_sent[0])
        result = False
        start = time.monotonic_ns()
        if IS_MESH:  # write() is a little different for RF24Mesh vs RF24Network
            result = mesh.write(message, ord("M"), node)
        else:
            result = network.write(RF24NetworkHeader(node, ord("T")), message)
        end = time.monotonic_ns()
        print(
            f"Sending {packets_sent[0]} (len {len(message)})...",
            "ok." if result else "failed.",
            f"Transmission took {int((end - start) / 1000000)} ms",
        )