24#include "EventListener.h"
34using namespace fbcpp::impl;
37static std::uint32_t readUint32LE(
const std::uint8_t* data)
noexcept
39 return static_cast<std::uint32_t
>(data[0]) | (
static_cast<std::uint32_t
>(data[1]) << 8) |
40 (
static_cast<std::uint32_t
>(data[2]) << 16) | (
static_cast<std::uint32_t
>(data[3]) << 24);
43static void writeUint32LE(std::uint8_t* data, std::uint32_t value)
noexcept
45 data[0] =
static_cast<std::uint8_t
>(value & 0xFF);
46 data[1] =
static_cast<std::uint8_t
>((value >> 8) & 0xFF);
47 data[2] =
static_cast<std::uint8_t
>((value >> 16) & 0xFF);
48 data[3] =
static_cast<std::uint8_t
>((value >> 24) & 0xFF);
53 : attachment{attachment},
54 client{attachment.getClient()},
55 eventNames{eventNames},
57 firebirdCallback{*this}
61 if (eventNames.empty())
62 throw std::invalid_argument{
"An EventListener requires at least one event"};
65 throw std::invalid_argument{
"EventListener callback must not be empty"};
67 for (
const auto& name : eventNames)
70 throw std::invalid_argument{
"Event names must not be empty"};
72 if (name.size() > std::numeric_limits<std::uint8_t>::max())
73 throw std::invalid_argument{
"Event names must be shorter than 256 bytes"};
76 if (eventNames.size() > std::numeric_limits<std::uint8_t>::max())
77 throw std::invalid_argument{
"Number of events must be smaller than 256"};
79 std::size_t bufferLength = 1;
81 for (
const auto& name : eventNames)
82 bufferLength += 1 + name.size() +
sizeof(std::uint32_t);
84 eventBuffer.assign(bufferLength, 0);
85 resultBuffer.assign(bufferLength, 0);
87 auto* eventBufferPtr = eventBuffer.data();
88 *eventBufferPtr++ = 1;
89 countOffsets.resize(eventNames.size());
91 for (std::size_t i = 0; i < eventNames.size(); ++i)
93 const auto& name = eventNames[i];
94 *eventBufferPtr++ =
static_cast<std::uint8_t
>(name.size());
95 std::memcpy(eventBufferPtr, name.data(), name.size());
96 eventBufferPtr += name.size();
97 countOffsets[i] =
static_cast<unsigned>(eventBufferPtr - eventBuffer.data());
98 eventBufferPtr +=
sizeof(std::uint32_t);
101 assert(
static_cast<std::size_t
>(eventBufferPtr - eventBuffer.data()) == eventBuffer.size());
103 rawCounts.resize(eventNames.size());
108 StatusWrapper statusWrapper{client, status.get()};
110 eventsHandle.reset(attachment.
getHandle()->queEvents(
111 &statusWrapper, &firebirdCallback,
static_cast<unsigned>(eventBuffer.size()), eventBuffer.data()));
113 dispatcher = std::thread{&EventListener::dispatchLoop,
this};
118 std::lock_guard mutexGuard{mutex};
124 std::unique_lock mutexGuard{mutex};
134 cancelEventsHandle();
141 firebirdCallback.detach();
143 condition.notify_all();
145 if (dispatcher.joinable())
149 pendingNotifications.clear();
153 eventsHandle.reset();
156void EventListener::decodeEventCounts()
158 if (eventNames.empty())
161 const auto* current = resultBuffer.data();
162 auto* baseline = eventBuffer.data();
164 for (std::size_t i = 0; i < eventNames.size(); ++i)
166 const auto offset = countOffsets[i];
168 if (offset +
sizeof(std::uint32_t) > resultBuffer.size() || offset +
sizeof(std::uint32_t) > eventBuffer.size())
174 const auto newValue = readUint32LE(current + offset);
175 const auto oldValue = readUint32LE(baseline + offset);
176 rawCounts[i] = newValue >= oldValue ? (newValue - oldValue) : 0;
177 writeUint32LE(baseline + offset, newValue);
181void EventListener::handleEvent(
unsigned length,
const std::uint8_t* events)
185 const auto eventBlockLength =
static_cast<unsigned>(eventBuffer.size());
186 unsigned copyLength = 0;
188 bool shouldRequeue =
false;
191 std::lock_guard mutexGuard{mutex};
197 static_cast<unsigned>(std::min<std::size_t>(length, std::min(eventBuffer.size(), resultBuffer.size())));
202 std::memcpy(resultBuffer.data(), events, copyLength);
206 std::vector<EventCount> counts;
207 counts.reserve(eventNames.size());
209 for (std::size_t i = 0; i < eventNames.size(); ++i)
211 const auto value = rawCounts[i];
214 counts.push_back(
EventCount{eventNames[i],
static_cast<std::uint32_t
>(value)});
221 pendingNotifications.emplace_back(std::move(counts));
228 shouldRequeue = listening;
232 condition.notify_one();
237 auto attachmentHandle = attachment.
getHandle();
239 if (!attachmentHandle)
241 std::lock_guard mutexGuard{mutex};
246 condition.notify_all();
253 StatusWrapper statusWrapper{client, status.get()};
259 attachmentHandle->queEvents(&statusWrapper, &firebirdCallback, eventBlockLength, eventBuffer.data()));
264 std::lock_guard mutexGuard{mutex};
270 condition.notify_all();
279 std::lock_guard mutexGuard{mutex};
283 previousHandle = std::move(eventsHandle);
284 eventsHandle = std::move(newHandle);
294 std::lock_guard mutexGuard{mutex};
299 condition.notify_all();
309void EventListener::dispatchLoop()
313 std::vector<EventCount> notification;
316 std::unique_lock mutexGuard{mutex};
317 condition.wait(mutexGuard, [
this] {
return !pendingNotifications.empty() || !listening; });
319 if (pendingNotifications.empty())
327 notification = std::move(pendingNotifications.front());
328 pendingNotifications.pop_front();
333 callback(notification);
342void EventListener::cancelEventsHandle()
347 std::lock_guard mutexGuard{mutex};
348 handle = eventsHandle;
355 StatusWrapper statusWrapper{client, status.get()};
357 handle->cancel(&statusWrapper);
Represents a connection to a Firebird database.
FbRef< fb::IAttachment > getHandle() noexcept
Returns the internal Firebird IAttachment handle.
bool isValid() noexcept
Returns whether the Attachment object is valid.
FbUniquePtr< fb::IStatus > newStatus()
Creates and returns a Firebird IStatus instance.
EventListener(Attachment &attachment, const std::vector< std::string > &eventNames, Callback callback)
Creates an event listener for the specified attachment and event names using the provided callback.
bool isListening() noexcept
Returns true if the listener is currently registered for event notifications.
void stop()
Cancels event notifications and releases related resources.
std::function< void(const std::vector< EventCount > &counts)> Callback
Function invoked when new event counts are available.
Reference-counted smart pointer for Firebird objects using addRef/release semantics.
Represents the number of occurrences for a registered event delivered by Firebird.