24#include "EventListener.h"
34using namespace fbcpp::impl;
37static std::uint32_t readUint32LE(
const std::uint8_t* data)
noexcept
39 return static_cast<std::uint32_t
>(data[0]) | (
static_cast<std::uint32_t
>(data[1]) << 8) |
40 (
static_cast<std::uint32_t
>(data[2]) << 16) | (
static_cast<std::uint32_t
>(data[3]) << 24);
43static void writeUint32LE(std::uint8_t* data, std::uint32_t value)
noexcept
45 data[0] =
static_cast<std::uint8_t
>(value & 0xFF);
46 data[1] =
static_cast<std::uint8_t
>((value >> 8) & 0xFF);
47 data[2] =
static_cast<std::uint8_t
>((value >> 16) & 0xFF);
48 data[3] =
static_cast<std::uint8_t
>((value >> 24) & 0xFF);
53 : attachment{attachment},
54 client{attachment.getClient()},
55 eventNames{eventNames},
57 firebirdCallback{*this}
61 if (eventNames.empty())
62 throw std::invalid_argument{
"An EventListener requires at least one event"};
65 throw std::invalid_argument{
"EventListener callback must not be empty"};
67 for (
const auto& name : eventNames)
70 throw std::invalid_argument{
"Event names must not be empty"};
72 if (name.size() > std::numeric_limits<std::uint8_t>::max())
73 throw std::invalid_argument{
"Event names must be shorter than 256 bytes"};
76 if (eventNames.size() > std::numeric_limits<std::uint8_t>::max())
77 throw std::invalid_argument{
"Number of events must be smaller than 256"};
79 std::size_t bufferLength = 1;
81 for (
const auto& name : eventNames)
82 bufferLength += 1 + name.size() +
sizeof(std::uint32_t);
84 eventBuffer.assign(bufferLength, 0);
85 resultBuffer.assign(bufferLength, 0);
87 auto* eventBufferPtr = eventBuffer.data();
88 *eventBufferPtr++ = 1;
89 countOffsets.resize(eventNames.size());
91 for (std::size_t i = 0; i < eventNames.size(); ++i)
93 const auto& name = eventNames[i];
94 *eventBufferPtr++ =
static_cast<std::uint8_t
>(name.size());
95 std::memcpy(eventBufferPtr, name.data(), name.size());
96 eventBufferPtr += name.size();
97 countOffsets[i] =
static_cast<unsigned>(eventBufferPtr - eventBuffer.data());
98 eventBufferPtr +=
sizeof(std::uint32_t);
101 assert(
static_cast<std::size_t
>(eventBufferPtr - eventBuffer.data()) == eventBuffer.size());
103 rawCounts.resize(eventNames.size());
108 StatusWrapper statusWrapper{client, status.get()};
110 eventsHandle.reset(attachment.
getHandle()->queEvents(
111 &statusWrapper, &firebirdCallback,
static_cast<unsigned>(eventBuffer.size()), eventBuffer.data()));
113 dispatcher = std::thread{&EventListener::dispatchLoop,
this};
118 std::lock_guard mutexGuard{mutex};
124 std::unique_lock mutexGuard{mutex};
134 cancelEventsHandle();
141 firebirdCallback.detach();
143 condition.notify_all();
145 if (dispatcher.joinable())
149 pendingNotifications.clear();
153 eventsHandle.reset();
156void EventListener::decodeEventCounts()
158 if (eventNames.empty())
161 const auto* current = resultBuffer.data();
162 auto* baseline = eventBuffer.data();
164 for (std::size_t i = 0; i < eventNames.size(); ++i)
166 const auto offset = countOffsets[i];
168 if (offset +
sizeof(std::uint32_t) > resultBuffer.size() || offset +
sizeof(std::uint32_t) > eventBuffer.size())
174 const auto newValue = readUint32LE(current + offset);
175 const auto oldValue = readUint32LE(baseline + offset);
176 rawCounts[i] = newValue >= oldValue ? (newValue - oldValue) : 0;
177 writeUint32LE(baseline + offset, newValue);
181void EventListener::handleEvent(
unsigned length,
const std::uint8_t* events)
183 const auto eventBlockLength =
static_cast<unsigned>(eventBuffer.size());
184 unsigned copyLength = 0;
186 bool shouldRequeue =
false;
189 std::lock_guard mutexGuard{mutex};
195 static_cast<unsigned>(std::min<std::size_t>(length, std::min(eventBuffer.size(), resultBuffer.size())));
200 std::memcpy(resultBuffer.data(), events, copyLength);
204 std::vector<EventCount> counts;
205 counts.reserve(eventNames.size());
207 for (std::size_t i = 0; i < eventNames.size(); ++i)
209 const auto value = rawCounts[i];
212 counts.push_back(
EventCount{eventNames[i],
static_cast<std::uint32_t
>(value)});
219 pendingNotifications.emplace_back(std::move(counts));
226 shouldRequeue = listening;
230 condition.notify_one();
235 auto attachmentHandle = attachment.
getHandle();
237 if (!attachmentHandle)
239 std::lock_guard mutexGuard{mutex};
244 condition.notify_all();
251 StatusWrapper statusWrapper{client, status.get()};
257 attachmentHandle->queEvents(&statusWrapper, &firebirdCallback, eventBlockLength, eventBuffer.data()));
262 std::lock_guard mutexGuard{mutex};
268 condition.notify_all();
277 std::lock_guard mutexGuard{mutex};
281 previousHandle = std::move(eventsHandle);
282 eventsHandle = std::move(newHandle);
287void EventListener::dispatchLoop()
291 std::vector<EventCount> notification;
294 std::unique_lock mutexGuard{mutex};
295 condition.wait(mutexGuard, [
this] {
return !pendingNotifications.empty() || !listening; });
297 if (pendingNotifications.empty())
305 notification = std::move(pendingNotifications.front());
306 pendingNotifications.pop_front();
311 callback(notification);
320void EventListener::cancelEventsHandle()
325 std::lock_guard mutexGuard{mutex};
326 handle = eventsHandle;
333 StatusWrapper statusWrapper{client, status.get()};
335 handle->cancel(&statusWrapper);
Represents a connection to a Firebird database.
FbRef< fb::IAttachment > getHandle() noexcept
Returns the internal Firebird IAttachment handle.
bool isValid() noexcept
Returns whether the Attachment object is valid.
FbUniquePtr< fb::IStatus > newStatus()
Creates and returns a Firebird IStatus instance.
EventListener(Attachment &attachment, const std::vector< std::string > &eventNames, Callback callback)
Creates an event listener for the specified attachment and event names using the provided callback.
bool isListening() noexcept
Returns true if the listener is currently registered for event notifications.
void stop()
Cancels event notifications and releases related resources.
std::function< void(const std::vector< EventCount > &counts)> Callback
Function invoked when new event counts are available.
Represents the number of occurrences for a registered event delivered by Firebird.