/* * Copyright 2017 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights grant can be found * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ #include "rtc_tools/network_tester/packet_sender.h" #include #include #include #include #include #include #include #include "absl/functional/any_invocable.h" #include "api/environment/environment.h" #include "api/scoped_refptr.h" #include "api/sequence_checker.h" #include "api/task_queue/pending_task_safety_flag.h" #include "api/task_queue/task_queue_base.h" #include "api/units/time_delta.h" #include "rtc_tools/network_tester/config_reader.h" #include "rtc_tools/network_tester/test_controller.h" #include "system_wrappers/include/clock.h" namespace webrtc { namespace { absl::AnyInvocable SendPacketTask( Clock* clock, PacketSender* packet_sender, scoped_refptr task_safety_flag, int64_t target_time_ms) { return [clock, target_time_ms, packet_sender, task_safety_flag = std::move(task_safety_flag)]() mutable { if (task_safety_flag->alive() && packet_sender->IsSending()) { packet_sender->SendPacket(); target_time_ms += packet_sender->GetSendIntervalMs(); int64_t delay_ms = std::max(static_cast(0), target_time_ms - clock->TimeInMilliseconds()); TaskQueueBase::Current()->PostDelayedTask( SendPacketTask(clock, packet_sender, std::move(task_safety_flag), target_time_ms), TimeDelta::Millis(delay_ms)); } }; } absl::AnyInvocable UpdateTestSettingTask( PacketSender* packet_sender, std::unique_ptr config_reader, scoped_refptr task_safety_flag) { return [packet_sender, config_reader = std::move(config_reader), task_safety_flag = std::move(task_safety_flag)]() mutable { if (!task_safety_flag->alive()) { return; } if (std::optional config = config_reader->GetNextConfig()) { packet_sender->UpdateTestSetting(config->packet_size, config->packet_send_interval_ms); TaskQueueBase::Current()->PostDelayedTask( UpdateTestSettingTask(packet_sender, std::move(config_reader), std::move(task_safety_flag)), TimeDelta::Millis(config->execution_time_ms)); } else { packet_sender->StopSending(); } }; } } // namespace PacketSender::PacketSender( const Environment& env, TestController* test_controller, TaskQueueBase* worker_queue, scoped_refptr task_safety_flag, const std::string& config_file_path) : env_(env), packet_size_(0), send_interval_ms_(0), sequence_number_(0), sending_(false), config_file_path_(config_file_path), test_controller_(test_controller), worker_queue_(worker_queue), task_safety_flag_(task_safety_flag) {} PacketSender::~PacketSender() = default; void PacketSender::StartSending() { worker_queue_checker_.Detach(); worker_queue_->PostTask(SafeTask(task_safety_flag_, [this]() { RTC_DCHECK_RUN_ON(&worker_queue_checker_); sending_ = true; })); worker_queue_->PostTask(UpdateTestSettingTask( this, std::make_unique(config_file_path_), task_safety_flag_)); worker_queue_->PostTask(SendPacketTask(&env_.clock(), this, task_safety_flag_, env_.clock().TimeInMilliseconds())); } void PacketSender::StopSending() { RTC_DCHECK_RUN_ON(&worker_queue_checker_); sending_ = false; test_controller_->OnTestDone(); } bool PacketSender::IsSending() const { RTC_DCHECK_RUN_ON(&worker_queue_checker_); return sending_; } void PacketSender::SendPacket() { RTC_DCHECK_RUN_ON(&worker_queue_checker_); NetworkTesterPacket packet; packet.set_type(NetworkTesterPacket::TEST_DATA); packet.set_sequence_number(sequence_number_++); packet.set_send_timestamp(env_.clock().TimeInMicroseconds()); test_controller_->SendData(packet, packet_size_); } int64_t PacketSender::GetSendIntervalMs() const { RTC_DCHECK_RUN_ON(&worker_queue_checker_); return send_interval_ms_; } void PacketSender::UpdateTestSetting(size_t packet_size, int64_t send_interval_ms) { RTC_DCHECK_RUN_ON(&worker_queue_checker_); send_interval_ms_ = send_interval_ms; packet_size_ = packet_size; } } // namespace webrtc