Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some performance improvements #4431

Draft
wants to merge 6 commits into
base: 25.lts.1+
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 66 additions & 20 deletions base/message_loop/message_pump_io_starboard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,17 @@ MessagePumpIOStarboard::SocketWatcher::~SocketWatcher() {
bool MessagePumpIOStarboard::SocketWatcher::StopWatchingSocket() {
watcher_ = nullptr;
interests_ = kSbSocketWaiterInterestNone;
if (!SbSocketIsValid(socket_)) {
pump_ = nullptr;
// If this watcher is not watching anything, no-op and return success.
return true;
}

SbSocket socket = Release();
bool result = true;
if (SbSocketIsValid(socket)) {
DCHECK(pump_);
#if defined(STARBOARD)
// This may get called multiple times from TCPSocketStarboard.
if (pump_) {
result = pump_->StopWatching(socket);
result = pump_->UnregisterInterest(
socket, kSbSocketWaiterInterestRead || kSbSocketWaiterInterestWrite,
this);
}
#else
result = pump_->StopWatching(socket);
#endif
}
pump_ = nullptr;
return result;
Expand Down Expand Up @@ -109,27 +102,75 @@ MessagePumpIOStarboard::~MessagePumpIOStarboard() {
SbSocketWaiterDestroy(waiter_);
}

bool MessagePumpIOStarboard::UnregisterInterest(SbSocket socket,
int dropped_interests,
SocketWatcher* controller) {
DCHECK(SbSocketIsValid(socket));
DCHECK(controller);
DCHECK(dropped_interests == kSbSocketWaiterInterestRead ||
dropped_interests == kSbSocketWaiterInterestWrite ||
dropped_interests ==
(kSbSocketWaiterInterestRead | kSbSocketWaiterInterestWrite));
DCHECK_CALLED_ON_VALID_THREAD(watch_socket_caller_checker_);

// Make sure we don't pick up any funky internal masks.
int old_interest_mask =
controller->interests() &
(kSbSocketWaiterInterestRead | kSbSocketWaiterInterestWrite);
int interests = old_interest_mask & (~dropped_interests);
if (interests == old_interest_mask) {
// Interests didn't change, return.
return true;
}

SbSocket old_socket = controller->Release();
if (SbSocketIsValid(old_socket)) {
// It's illegal to use this function to listen on 2 separate fds with the
// same |controller|.
if (old_socket != socket) {
NOTREACHED() << "Sockets don't match" << old_socket << "!=" << socket;
return false;
}

// Must disarm the event before we can reuse it.
SbSocketWaiterRemove(waiter_, old_socket);
} else {
interests = kSbSocketWaiterInterestNone;
}

if (!SbSocketIsValid(socket)) {
NOTREACHED() << "Invalid socket" << socket;
return false;
}

if (interests) {
// Set current interest mask and waiter for this event.
if (!SbSocketWaiterAdd(waiter_, socket, controller,
OnSocketWaiterNotification, interests,
controller->persistent())) {
return false;
}
controller->Init(socket, controller->persistent());
}
return true;
}

bool MessagePumpIOStarboard::Watch(SbSocket socket,
bool persistent,
int mode,
int interests,
SocketWatcher* controller,
Watcher* delegate) {
DCHECK(SbSocketIsValid(socket));
DCHECK(controller);
DCHECK(delegate);
DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
DCHECK(interests == kSbSocketWaiterInterestRead ||
interests == kSbSocketWaiterInterestWrite ||
interests ==
(kSbSocketWaiterInterestRead | kSbSocketWaiterInterestWrite));
// Watch should be called on the pump thread. It is not threadsafe, and your
// watcher may never be registered.
DCHECK_CALLED_ON_VALID_THREAD(watch_socket_caller_checker_);

int interests = kSbSocketWaiterInterestNone;
if (mode & WATCH_READ) {
interests |= kSbSocketWaiterInterestRead;
}
if (mode & WATCH_WRITE) {
interests |= kSbSocketWaiterInterestWrite;
}

SbSocket old_socket = controller->Release();
if (SbSocketIsValid(old_socket)) {
// It's illegal to use this function to listen on 2 separate fds with the
Expand All @@ -151,6 +192,11 @@ bool MessagePumpIOStarboard::Watch(SbSocket socket,
SbSocketWaiterRemove(waiter_, old_socket);
}

if (!SbSocketIsValid(socket)) {
NOTREACHED() << "Invalid socket" << socket;
return false;
}

// Set current interest mask and waiter for this event.
if (!SbSocketWaiterAdd(waiter_, socket, controller,
OnSocketWaiterNotification, interests, persistent)) {
Expand Down
13 changes: 6 additions & 7 deletions base/message_loop/message_pump_io_starboard.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,6 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {
base::WeakPtrFactory<SocketWatcher> weak_factory_;
};

enum Mode {
WATCH_READ = 1 << 0,
WATCH_WRITE = 1 << 1,
WATCH_READ_WRITE = WATCH_READ | WATCH_WRITE
};

MessagePumpIOStarboard();
virtual ~MessagePumpIOStarboard();

Expand All @@ -125,10 +119,15 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {
// success. Must be called on the same thread the message_pump is running on.
bool Watch(SbSocket socket,
bool persistent,
int mode,
int interests,
SocketWatcher* controller,
Watcher* delegate);

// Removes an interest from a socket, and stops watching the socket if needed.
bool UnregisterInterest(SbSocket socket,
int dropped_interests,
SocketWatcher* controller);

// Stops watching the socket.
bool StopWatching(SbSocket socket);

Expand Down
10 changes: 8 additions & 2 deletions base/task/current_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,17 @@ MessagePumpForIO* CurrentIOThread::GetMessagePumpForIO() const {
#if defined(STARBOARD)
bool CurrentIOThread::Watch(SbSocket socket,
bool persistent,
int mode,
SbSocketWaiterInterest interests,
SocketWatcher* controller,
Watcher* delegate) {
return static_cast<MessagePumpIOStarboard*>(GetMessagePumpForIO())
->Watch(socket, persistent, mode, controller, delegate);
->Watch(socket, persistent, interests, controller, delegate);
}
bool CurrentIOThread::UnregisterInterest(SbSocket socket,
int dropped_interests,
SocketWatcher* controller) {
return static_cast<MessagePumpIOStarboard*>(GetMessagePumpForIO())
->UnregisterInterest(socket, dropped_interests, controller);
}
#elif BUILDFLAG(IS_WIN)
HRESULT CurrentIOThread::RegisterIOHandler(
Expand Down
9 changes: 4 additions & 5 deletions base/task/current_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,15 +275,14 @@ class BASE_EXPORT CurrentIOThread : public CurrentThread {
typedef base::MessagePumpIOStarboard::SocketWatcher SocketWatcher;
typedef base::MessagePumpIOStarboard::IOObserver IOObserver;

enum Mode{WATCH_READ = base::MessagePumpIOStarboard::WATCH_READ,
WATCH_WRITE = base::MessagePumpIOStarboard::WATCH_WRITE,
WATCH_READ_WRITE = base::MessagePumpIOStarboard::WATCH_READ_WRITE};

bool Watch(SbSocket socket,
bool persistent,
int mode,
SbSocketWaiterInterest interests,
SocketWatcher* controller,
Watcher* delegate);
bool UnregisterInterest(SbSocket socket,
int dropped_interests,
SocketWatcher* controller);
#elif BUILDFLAG(IS_WIN)
// Please see MessagePumpWin for definitions of these methods.
HRESULT RegisterIOHandler(HANDLE file, MessagePumpForIO::IOHandler* handler);
Expand Down
4 changes: 4 additions & 0 deletions base/task/sequence_manager/task_queue_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1402,6 +1402,9 @@ void TaskQueueImpl::ActivateDelayedFenceIfNeeded(const Task& task) {
main_thread_only().delayed_fence = absl::nullopt;
}

#if !defined(STARBOARD)
// We disable the "lifecycles" tracing group in Cobalt for performance
// reasons.
void TaskQueueImpl::MaybeReportIpcTaskQueuedFromMainThread(
const Task& pending_task) {
if (!pending_task.ipc_hash)
Expand Down Expand Up @@ -1502,6 +1505,7 @@ void TaskQueueImpl::ReportIpcTaskQueued(
&ctx, pending_task.posted_from));
});
}
#endif

void TaskQueueImpl::OnQueueUnblocked() {
DCHECK(IsQueueEnabled());
Expand Down
17 changes: 17 additions & 0 deletions base/task/sequence_manager/task_queue_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,22 @@ class BASE_EXPORT TaskQueueImpl {

// Reports the task if it was due to IPC and was posted to a disabled queue.
// This should be called after WillQueueTask has been called for the task.
#if defined(STARBOARD)
// We disable the "lifecycles" tracing group in Cobalt for performance
// reasons.
void MaybeReportIpcTaskQueuedFromMainThread(const Task& pending_task) {};
bool ShouldReportIpcTaskQueuedFromAnyThreadLocked(
base::TimeDelta* time_since_disabled)
EXCLUSIVE_LOCKS_REQUIRED(any_thread_lock_) {
return false;
}
void MaybeReportIpcTaskQueuedFromAnyThreadLocked(const Task& pending_task)
EXCLUSIVE_LOCKS_REQUIRED(any_thread_lock_) {}
void MaybeReportIpcTaskQueuedFromAnyThreadUnlocked(const Task& pending_task) {
}
void ReportIpcTaskQueued(const Task& pending_task,
const base::TimeDelta& time_since_disabled) {}
#else // !defined(STARBOARD)
void MaybeReportIpcTaskQueuedFromMainThread(const Task& pending_task);
bool ShouldReportIpcTaskQueuedFromAnyThreadLocked(
base::TimeDelta* time_since_disabled)
Expand All @@ -545,6 +561,7 @@ class BASE_EXPORT TaskQueueImpl {
void MaybeReportIpcTaskQueuedFromAnyThreadUnlocked(const Task& pending_task);
void ReportIpcTaskQueued(const Task& pending_task,
const base::TimeDelta& time_since_disabled);
#endif // !defined(STARBOARD)

// Invoked when the queue becomes enabled and not blocked by a fence.
void OnQueueUnblocked();
Expand Down
12 changes: 12 additions & 0 deletions base/threading/post_task_and_reply_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,17 @@ bool PostTaskAndReplyImpl::PostTaskAndReply(const Location& from_here,
DCHECK(task) << from_here.ToString();
DCHECK(reply) << from_here.ToString();

#if defined(STARBOARD)
// This is a slight performance optimization for Starboard.
// With Starboard, HasCurrentDefault() and GetCurrentDefault() are quite
// expensive, and GetCurrentDefault() is safe to call and will return
// nullptr when needed.
const bool post_task_success = PostTask(
from_here, BindOnce(&PostTaskAndReplyRelay::RunTaskAndPostReply,
PostTaskAndReplyRelay(
from_here, std::move(task), std::move(reply),
SequencedTaskRunner::GetCurrentDefault())));
#else
const bool has_sequenced_context = SequencedTaskRunner::HasCurrentDefault();

const bool post_task_success = PostTask(
Expand All @@ -156,6 +167,7 @@ bool PostTaskAndReplyImpl::PostTaskAndReply(const Location& from_here,
// allowed when posting the task fails, to simplify calls during shutdown
// (https://crbug.com/922938).
CHECK(has_sequenced_context || !post_task_success);
#endif

return post_task_success;
}
Expand Down
18 changes: 17 additions & 1 deletion net/quic/platform/impl/quic_chromium_clock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@

namespace quic {

namespace {
QuicTime approximate_now_{QuicTime::Zero()};
int approximate_now_usage_counter_{0};
} // namespace

QuicChromiumClock* QuicChromiumClock::GetInstance() {
static base::NoDestructor<QuicChromiumClock> instance;
return instance.get();
Expand All @@ -19,16 +24,27 @@ QuicChromiumClock::QuicChromiumClock() = default;

QuicChromiumClock::~QuicChromiumClock() = default;

void QuicChromiumClock::ZeroApproximateNow() {
approximate_now_ = QuicTime::Zero();
approximate_now_usage_counter_ = 0;
};

QuicTime QuicChromiumClock::ApproximateNow() const {
// At the moment, Chrome does not have a distinct notion of ApproximateNow().
// We should consider implementing this using MessageLoop::recent_time_.
if (approximate_now_.IsInitialized() &&
++approximate_now_usage_counter_ < 16) {
return approximate_now_;
}
return Now();
}

QuicTime QuicChromiumClock::Now() const {
int64_t ticks = (base::TimeTicks::Now() - base::TimeTicks()).InMicroseconds();
DCHECK_GE(ticks, 0);
return CreateTimeFromMicroseconds(ticks);
approximate_now_ = CreateTimeFromMicroseconds(ticks);
approximate_now_usage_counter_ = 0;
return approximate_now_;
}

QuicWallTime QuicChromiumClock::WallNow() const {
Expand Down
2 changes: 2 additions & 0 deletions net/quic/platform/impl/quic_chromium_clock.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class NET_EXPORT_PRIVATE QuicChromiumClock : public QuicClock {

~QuicChromiumClock() override;

void ZeroApproximateNow();

// QuicClock implementation:
QuicTime ApproximateNow() const override;
QuicTime Now() const override;
Expand Down
8 changes: 6 additions & 2 deletions net/quic/quic_chromium_packet_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "base/task/single_thread_task_runner.h"
#include "net/base/net_errors.h"
#include "net/quic/address_utils.h"
#include "net/quic/platform/impl/quic_chromium_clock.h"
#include "net/third_party/quiche/src/quiche/quic/core/quic_clock.h"

namespace net {
Expand Down Expand Up @@ -92,6 +93,7 @@ int QuicChromiumPacketReader::StartReadingMultiplePackets() {
}

bool QuicChromiumPacketReader::ProcessMultiplePacketReadResult(int result) {
quic::QuicChromiumClock::GetInstance()->ZeroApproximateNow();
read_pending_ = false;
if (result <= 0 && net_log_.IsCapturing()) {
net_log_.AddEventWithIntParams(NetLogEventType::QUIC_READ_ERROR,
Expand Down Expand Up @@ -129,7 +131,7 @@ bool QuicChromiumPacketReader::ProcessMultiplePacketReadResult(int result) {
continue;
}
quic::QuicReceivedPacket packet(read_packet->buffer, read_packet->result,
clock_->Now());
clock_->ApproximateNow());
if (!(visitor_->OnPacket(packet, quick_local_address, quick_peer_address) &&
self)) {
return false;
Expand Down Expand Up @@ -202,6 +204,7 @@ void QuicChromiumPacketReader::StartReading() {
}

bool QuicChromiumPacketReader::ProcessReadResult(int result) {
quic::QuicChromiumClock::GetInstance()->ZeroApproximateNow();
read_pending_ = false;
if (result <= 0 && net_log_.IsCapturing()) {
net_log_.AddEventWithIntParams(NetLogEventType::QUIC_READ_ERROR,
Expand All @@ -221,7 +224,8 @@ bool QuicChromiumPacketReader::ProcessReadResult(int result) {
return visitor_->OnReadError(result, socket_);
}

quic::QuicReceivedPacket packet(read_buffer_->data(), result, clock_->Now());
quic::QuicReceivedPacket packet(read_buffer_->data(), result,
clock_->ApproximateNow());
IPEndPoint local_address;
IPEndPoint peer_address;
socket_->GetLocalAddress(&local_address);
Expand Down
6 changes: 6 additions & 0 deletions net/quic/quic_chromium_packet_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ void QuicChromiumPacketWriter::WritePacketToSocket(
}

quic::WriteResult QuicChromiumPacketWriter::WritePacketToSocketImpl() {
#if !defined(STARBOARD)
// Tracking the histogram takes 25% of the CPU time on some devices.
base::TimeTicks now = base::TimeTicks::Now();
#endif

int rv = socket_->Write(packet_.get(), packet_->size(), write_callback_,
kTrafficAnnotation);
Expand All @@ -168,12 +171,15 @@ quic::WriteResult QuicChromiumPacketWriter::WritePacketToSocketImpl() {
}
}

#if !defined(STARBOARD)
// Tracking the histogram here takes 25% of the CPU time on some devices.
base::TimeDelta delta = base::TimeTicks::Now() - now;
if (status == quic::WRITE_STATUS_OK) {
UMA_HISTOGRAM_TIMES("Net.QuicSession.PacketWriteTime.Synchronous", delta);
} else if (quic::IsWriteBlockedStatus(status)) {
UMA_HISTOGRAM_TIMES("Net.QuicSession.PacketWriteTime.Asynchronous", delta);
}
#endif

return quic::WriteResult(status, rv);
}
Expand Down
Loading
Loading