Use atomic list for event subscribers

Summary:
Replace the *copy on write* vector with an atomic pointer to a linked list.

This allows to publish without locking a mutex, at the cost of the slower traversal of a linked list (a vector has better locality).

At the moment, the typical use case is to have one subscriber, meaning that the afforementioned slower traversal is not a problem.

Adding subscribers is implemented as atomic *compare and swap.*

Reviewed By: SidharthGuglani

Differential Revision: D15546964

fbshipit-source-id: 41bfa41f1ac6be5c9b6bf4288ea3271ee995877e
This commit is contained in:
David Aurelio
2019-05-31 01:21:25 -07:00
committed by Facebook Github Bot
parent cea3865c74
commit f304990656

View File

@@ -5,51 +5,57 @@
* file in the root directory of this source tree.
*/
#include "event.h"
#include <atomic>
#include <memory>
#include <stdexcept>
#include <mutex>
#include <iostream>
namespace facebook {
namespace yoga {
namespace {
std::mutex& eventSubscribersMutex() {
static std::mutex subscribersMutex;
return subscribersMutex;
}
struct Node {
std::function<Event::Subscriber> subscriber = nullptr;
Node* next = nullptr;
std::shared_ptr<Event::Subscribers>& eventSubscribers() {
static auto subscribers = std::make_shared<Event::Subscribers>();
return subscribers;
Node(std::function<Event::Subscriber>&& subscriber)
: subscriber{std::move(subscriber)} {}
};
std::atomic<Node*> subscribers{nullptr};
Node* push(Node* newHead) {
Node* oldHead;
do {
oldHead = subscribers.load(std::memory_order_relaxed);
if (newHead != nullptr) {
newHead->next = oldHead;
}
} while (!subscribers.compare_exchange_weak(
oldHead, newHead, std::memory_order_release, std::memory_order_relaxed));
return oldHead;
}
} // namespace
void Event::reset() {
eventSubscribers() = std::make_shared<Event::Subscribers>();
auto head = push(nullptr);
while (head != nullptr) {
auto current = head;
head = head->next;
delete current;
}
}
void Event::subscribe(std::function<Subscriber>&& subscriber) {
std::lock_guard<std::mutex> guard(eventSubscribersMutex());
eventSubscribers() =
std::make_shared<Event::Subscribers>(*eventSubscribers());
eventSubscribers()->push_back(subscriber);
push(new Node{std::move(subscriber)});
}
void Event::publish(const YGNode& node, Type eventType, const Data& eventData) {
std::shared_ptr<Event::Subscribers> subscribers;
{
std::lock_guard<std::mutex> guard(eventSubscribersMutex());
subscribers = eventSubscribers();
}
for (auto& subscriber : *subscribers) {
if (subscriber) {
subscriber(node, eventType, eventData);
}
for (auto subscriber = subscribers.load(std::memory_order_relaxed);
subscriber != nullptr;
subscriber = subscriber->next) {
subscriber->subscriber(node, eventType, eventData);
}
}