blob: 5b0aa236901eddae88f7008aa2059e1b041ec99f [file] [log] [blame]
#
# Copyright 2020 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime, timedelta
from bluetooth_packets_python3 import RawBuilder
from cert.matchers import L2capMatchers
from cert.truth import assertThat
from cert.performance_test_logger import PerformanceTestLogger
from l2cap.classic.cert.cert_l2cap import CertL2cap
from l2cap.classic.cert.l2cap_test_lib import GeneralL2capTestBase
from l2cap.classic.facade_pb2 import RetransmissionFlowControlMode
from bluetooth_packets_python3.l2cap_packets import FcsType
from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction
class L2capPerformanceTestBase(GeneralL2capTestBase):
def setup_test(self, dut, cert):
GeneralL2capTestBase.setup_test(self, dut, cert)
self.performance_test_logger = PerformanceTestLogger()
def teardown_test(self):
GeneralL2capTestBase.teardown_test(self)
def _basic_mode_tx(self, mtu, packets):
"""
Send the specified number of packets and return the time interval in ms.
"""
self._setup_link_from_cert()
(dut_channel, cert_channel) = self._open_channel_from_cert()
self.performance_test_logger.start_interval("TX")
for _ in range(packets):
dut_channel.send(b'a' * mtu)
assertThat(cert_channel).emits(
L2capMatchers.Data(b'a' * mtu), at_least_times=packets, timeout=timedelta(seconds=60))
self.performance_test_logger.end_interval("TX")
duration = self.performance_test_logger.get_duration_of_intervals("TX")[0]
self.log.info("Duration: %s" % str(duration))
return duration
def _basic_mode_tx_fixed_interval(self, mtu, interval=timedelta(seconds=10), batch_size=20):
"""
Send packets as much as possible over a certain interval, and return the
number of packets sent
"""
self._setup_link_from_cert()
(dut_channel, cert_channel) = self._open_channel_from_cert()
start_time = datetime.now()
end_time = start_time + interval
packets_sent = 0
while datetime.now() < end_time:
for _ in range(batch_size):
dut_channel.send(b'a' * mtu)
packets_sent += batch_size
assertThat(cert_channel).emits(L2capMatchers.Data(b'a' * mtu), at_least_times=batch_size)
return packets_sent
def _basic_mode_rx(self, mtu, packets):
self._setup_link_from_cert()
(dut_channel, cert_channel) = self._open_channel_from_cert()
self.performance_test_logger.start_interval("RX")
data = b"a" * mtu
data_packet = RawBuilder([x for x in data])
for _ in range(packets):
cert_channel.send(data_packet)
assertThat(dut_channel).emits(
L2capMatchers.PacketPayloadRawData(data), at_least_times=packets, timeout=timedelta(seconds=60))
self.performance_test_logger.end_interval("RX")
duration = self.performance_test_logger.get_duration_of_intervals("RX")[0]
self.log.info("Duration: %s" % str(duration))
def _ertm_mode_tx(self, mtu, packets, tx_window_size=10):
"""
Send the specified number of packets and return the time interval in ms.
"""
# Make sure that number of packets is a multiple of tx_window_size
packets = packets // tx_window_size * tx_window_size
# For ERTM TX test, we have to do it sequentially because cert needs to ack
self._setup_link_from_cert()
config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=tx_window_size)
(dut_channel, cert_channel) = self._open_channel_from_cert(
mode=RetransmissionFlowControlMode.ERTM,
fcs=FcsType.NO_FCS,
req_config_options=config,
rsp_config_options=config)
self.performance_test_logger.start_interval("TX")
for i in range(packets):
dut_channel.send(b'a' * mtu)
if i % tx_window_size == tx_window_size - 1:
assertThat(cert_channel).emits(L2capMatchers.IFrame(payload=b'a' * mtu), at_least_times=tx_window_size)
cert_channel.send_s_frame(req_seq=(i + 1) % 64, s=SupervisoryFunction.RECEIVER_READY)
self.performance_test_logger.end_interval("TX")
duration = self.performance_test_logger.get_duration_of_intervals("TX")[0]
self.log.info("Duration: %s" % str(duration))
return duration
def _ertm_mode_rx(self, mtu, packets, tx_window_size=10):
# Make sure that number of packets is a multiple of tx_window_size
packets = packets // tx_window_size * tx_window_size
self._setup_link_from_cert()
config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=tx_window_size)
(dut_channel, cert_channel) = self._open_channel_from_cert(
mode=RetransmissionFlowControlMode.ERTM,
fcs=FcsType.NO_FCS,
req_config_options=config,
rsp_config_options=config)
data = b"a" * mtu
data_packet = RawBuilder([x for x in data])
self.performance_test_logger.start_interval("RX")
for i in range(packets):
cert_channel.send_i_frame(tx_seq=i % 64, req_seq=0, payload=data_packet)
if i % tx_window_size == (tx_window_size - 1):
assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=(i + 1) % 64))
self.performance_test_logger.end_interval("RX")
duration = self.performance_test_logger.get_duration_of_intervals("RX")[0]
self.log.info("Duration: %s" % str(duration))
def test_basic_mode_tx_672_100(self):
duration = self._basic_mode_tx(672, 100)
assertThat(duration).isWithin(timedelta(seconds=2))
def test_basic_mode_tx_100_100(self):
duration = self._basic_mode_tx(100, 100)
assertThat(duration).isWithin(timedelta(seconds=2))
def test_ertm_mode_tx_672_100(self):
duration = self._ertm_mode_tx(672, 100)
assertThat(duration).isWithin(timedelta(seconds=5))
def test_basic_mode_rx_672_100(self):
self._basic_mode_rx(672, 100)
def test_ertm_mode_rx_672_100(self):
self._ertm_mode_rx(672, 100)
def test_basic_mode_end_to_end_latency(self):
self._setup_link_from_cert()
(dut_channel, cert_channel) = self._open_channel_from_cert()
data = b"a" * 100
data_packet = RawBuilder([x for x in data])
for i in range(100):
self.performance_test_logger.start_interval("RX")
cert_channel.send(data_packet)
assertThat(dut_channel).emits(L2capMatchers.PacketPayloadRawData(data))
self.performance_test_logger.end_interval("RX")
duration = self.performance_test_logger.get_duration_of_intervals("RX")
mean = sum(duration, timedelta()) / len(duration)
self.log.info("Mean: %s" % str(mean))
def test_basic_mode_number_of_packets_10_seconds_672(self):
number_packets = self._basic_mode_tx_fixed_interval(672)
# Requiring that 500 packets (20ms period on average) are sent
self.log.info("Packets sent: %d" % number_packets)
assertThat(number_packets > 500).isTrue()