24#include "EventListener.h"
34using namespace fbcpp::impl;
37static std::uint32_t readUint32LE(
const std::uint8_t* data)
noexcept
39 return static_cast<std::uint32_t
>(data[0]) | (
static_cast<std::uint32_t
>(data[1]) << 8) |
40 (
static_cast<std::uint32_t
>(data[2]) << 16) | (
static_cast<std::uint32_t
>(data[3]) << 24);
43static void writeUint32LE(std::uint8_t* data, std::uint32_t value)
noexcept
45 data[0] =
static_cast<std::uint8_t
>(value & 0xFF);
46 data[1] =
static_cast<std::uint8_t
>((value >> 8) & 0xFF);
47 data[2] =
static_cast<std::uint8_t
>((value >> 16) & 0xFF);
48 data[3] =
static_cast<std::uint8_t
>((value >> 24) & 0xFF);
53 : attachment{attachment},
54 client{attachment.getClient()},
55 eventNames{eventNames},
57 firebirdCallback{*this}
61 if (eventNames.empty())
62 throw std::invalid_argument{
"An EventListener requires at least one event"};
65 throw std::invalid_argument{
"EventListener callback must not be empty"};
67 for (
const auto& name : eventNames)
70 throw std::invalid_argument{
"Event names must not be empty"};
72 if (name.size() > std::numeric_limits<std::uint8_t>::max())
73 throw std::invalid_argument{
"Event names must be shorter than 256 bytes"};
76 if (eventNames.size() > std::numeric_limits<std::uint8_t>::max())
77 throw std::invalid_argument{
"Number of events must be smaller than 256"};
79 std::size_t bufferLength = 1;
81 for (
const auto& name : eventNames)
82 bufferLength += 1 + name.size() +
sizeof(std::uint32_t);
84 eventBuffer.assign(bufferLength, 0);
85 resultBuffer.assign(bufferLength, 0);
87 auto* eventBufferPtr = eventBuffer.data();
88 *eventBufferPtr++ = 1;
89 countOffsets.resize(eventNames.size());
91 for (std::size_t i = 0; i < eventNames.size(); ++i)
93 const auto& name = eventNames[i];
94 *eventBufferPtr++ =
static_cast<std::uint8_t
>(name.size());
95 std::memcpy(eventBufferPtr, name.data(), name.size());
96 eventBufferPtr += name.size();
97 countOffsets[i] =
static_cast<unsigned>(eventBufferPtr - eventBuffer.data());
98 eventBufferPtr +=
sizeof(std::uint32_t);
101 assert(
static_cast<std::size_t
>(eventBufferPtr - eventBuffer.data()) == eventBuffer.size());
103 rawCounts.resize(eventNames.size());
107 StatusWrapper statusWrapper{client};
109 eventsHandle.reset(attachment.
getHandle()->queEvents(
110 &statusWrapper, &firebirdCallback,
static_cast<unsigned>(eventBuffer.size()), eventBuffer.data()));
112 dispatcher = std::thread{&EventListener::dispatchLoop,
this};
117 std::lock_guard mutexGuard{mutex};
123 std::unique_lock mutexGuard{mutex};
133 cancelEventsHandle();
140 firebirdCallback.detach();
142 condition.notify_all();
144 if (dispatcher.joinable())
148 pendingNotifications.clear();
152 eventsHandle.reset();
155void EventListener::decodeEventCounts()
157 if (eventNames.empty())
160 const auto* current = resultBuffer.data();
161 auto* baseline = eventBuffer.data();
163 for (std::size_t i = 0; i < eventNames.size(); ++i)
165 const auto offset = countOffsets[i];
167 if (offset +
sizeof(std::uint32_t) > resultBuffer.size() || offset +
sizeof(std::uint32_t) > eventBuffer.size())
173 const auto newValue = readUint32LE(current + offset);
174 const auto oldValue = readUint32LE(baseline + offset);
175 rawCounts[i] = newValue >= oldValue ? (newValue - oldValue) : 0;
176 writeUint32LE(baseline + offset, newValue);
180void EventListener::handleEvent(
unsigned length,
const std::uint8_t* events)
184 const auto eventBlockLength =
static_cast<unsigned>(eventBuffer.size());
185 unsigned copyLength = 0;
187 bool shouldRequeue =
false;
190 std::lock_guard mutexGuard{mutex};
196 static_cast<unsigned>(std::min<std::size_t>(length, std::min(eventBuffer.size(), resultBuffer.size())));
201 std::memcpy(resultBuffer.data(), events, copyLength);
205 std::vector<EventCount> counts;
206 counts.reserve(eventNames.size());
208 for (std::size_t i = 0; i < eventNames.size(); ++i)
210 const auto value = rawCounts[i];
213 counts.push_back(
EventCount{eventNames[i],
static_cast<std::uint32_t
>(value)});
220 pendingNotifications.emplace_back(std::move(counts));
227 shouldRequeue = listening;
231 condition.notify_one();
236 auto attachmentHandle = attachment.
getHandle();
238 if (!attachmentHandle)
240 std::lock_guard mutexGuard{mutex};
245 condition.notify_all();
251 StatusWrapper statusWrapper{client};
257 attachmentHandle->queEvents(&statusWrapper, &firebirdCallback, eventBlockLength, eventBuffer.data()));
262 std::lock_guard mutexGuard{mutex};
268 condition.notify_all();
277 std::lock_guard mutexGuard{mutex};
281 previousHandle = std::move(eventsHandle);
282 eventsHandle = std::move(newHandle);
292 std::lock_guard mutexGuard{mutex};
297 condition.notify_all();
307void EventListener::dispatchLoop()
311 std::vector<EventCount> notification;
314 std::unique_lock mutexGuard{mutex};
315 condition.wait(mutexGuard, [
this] {
return !pendingNotifications.empty() || !listening; });
317 if (pendingNotifications.empty())
325 notification = std::move(pendingNotifications.front());
326 pendingNotifications.pop_front();
331 callback(notification);
340void EventListener::cancelEventsHandle()
345 std::lock_guard mutexGuard{mutex};
346 handle = eventsHandle;
352 StatusWrapper statusWrapper{client};
354 handle->cancel(&statusWrapper);
Represents a connection to a Firebird database.
FbRef< fb::IAttachment > getHandle() noexcept
Returns the internal Firebird IAttachment handle.
bool isValid() noexcept
Returns whether the Attachment object is valid.
EventListener(Attachment &attachment, const std::vector< std::string > &eventNames, Callback callback)
Creates an event listener for the specified attachment and event names using the provided callback.
bool isListening() noexcept
Returns true if the listener is currently registered for event notifications.
void stop()
Cancels event notifications and releases related resources.
std::function< void(const std::vector< EventCount > &counts)> Callback
Function invoked when new event counts are available.
Reference-counted smart pointer for Firebird objects using addRef/release semantics.
Represents the number of occurrences for a registered event delivered by Firebird.