fb-cpp 0.0.2
A modern C++ wrapper for the Firebird database API
Loading...
Searching...
No Matches
EventListener.cpp
1/*
2 * MIT License
3 *
4 * Copyright (c) 2025 Adriano dos Santos Fernandes
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to deal
8 * in the Software without restriction, including without limitation the rights
9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 * copies of the Software, and to permit persons to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in all
13 * copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24#include "EventListener.h"
25#include "Exception.h"
26#include <algorithm>
27#include <cassert>
28#include <cstring>
29#include <stdexcept>
30#include <utility>
31#include <limits>
32
33using namespace fbcpp;
34using namespace fbcpp::impl;
35
36
37static std::uint32_t readUint32LE(const std::uint8_t* data) noexcept
38{
39 return static_cast<std::uint32_t>(data[0]) | (static_cast<std::uint32_t>(data[1]) << 8) |
40 (static_cast<std::uint32_t>(data[2]) << 16) | (static_cast<std::uint32_t>(data[3]) << 24);
41}
42
43static void writeUint32LE(std::uint8_t* data, std::uint32_t value) noexcept
44{
45 data[0] = static_cast<std::uint8_t>(value & 0xFF);
46 data[1] = static_cast<std::uint8_t>((value >> 8) & 0xFF);
47 data[2] = static_cast<std::uint8_t>((value >> 16) & 0xFF);
48 data[3] = static_cast<std::uint8_t>((value >> 24) & 0xFF);
49}
50
51
52EventListener::EventListener(Attachment& attachment, const std::vector<std::string>& eventNames, Callback callback)
53 : attachment{attachment},
54 client{attachment.getClient()},
55 eventNames{eventNames},
56 callback{callback},
57 firebirdCallback{*this}
58{
59 assert(attachment.isValid());
60
61 if (eventNames.empty())
62 throw std::invalid_argument{"An EventListener requires at least one event"};
63
64 if (!callback)
65 throw std::invalid_argument{"EventListener callback must not be empty"};
66
67 for (const auto& name : eventNames)
68 {
69 if (name.empty())
70 throw std::invalid_argument{"Event names must not be empty"};
71
72 if (name.size() > std::numeric_limits<std::uint8_t>::max())
73 throw std::invalid_argument{"Event names must be shorter than 256 bytes"};
74 }
75
76 if (eventNames.size() > std::numeric_limits<std::uint8_t>::max())
77 throw std::invalid_argument{"Number of events must be smaller than 256"};
78
79 std::size_t bufferLength = 1; // Event block version byte
80
81 for (const auto& name : eventNames)
82 bufferLength += 1 + name.size() + sizeof(std::uint32_t);
83
84 eventBuffer.assign(bufferLength, 0);
85 resultBuffer.assign(bufferLength, 0);
86
87 auto* eventBufferPtr = eventBuffer.data();
88 *eventBufferPtr++ = 1; // Event parameter block version.
89 countOffsets.resize(eventNames.size());
90
91 for (std::size_t i = 0; i < eventNames.size(); ++i)
92 {
93 const auto& name = eventNames[i];
94 *eventBufferPtr++ = static_cast<std::uint8_t>(name.size());
95 std::memcpy(eventBufferPtr, name.data(), name.size());
96 eventBufferPtr += name.size();
97 countOffsets[i] = static_cast<unsigned>(eventBufferPtr - eventBuffer.data());
98 eventBufferPtr += sizeof(std::uint32_t);
99 }
100
101 assert(static_cast<std::size_t>(eventBufferPtr - eventBuffer.data()) == eventBuffer.size());
102
103 rawCounts.resize(eventNames.size());
104 listening = true;
105 running = true;
106
107 StatusWrapper statusWrapper{client};
108
109 eventsHandle.reset(attachment.getHandle()->queEvents(
110 &statusWrapper, &firebirdCallback, static_cast<unsigned>(eventBuffer.size()), eventBuffer.data()));
111
112 dispatcher = std::thread{&EventListener::dispatchLoop, this};
113}
114
116{
117 std::lock_guard mutexGuard{mutex};
118 return listening;
119}
120
122{
123 std::unique_lock mutexGuard{mutex};
124
125 if (!running)
126 return;
127
128 listening = false;
129 mutexGuard.unlock();
130
131 try
132 {
133 cancelEventsHandle();
134 }
135 catch (...)
136 {
137 // swallow
138 }
139
140 firebirdCallback.detach();
141
142 condition.notify_all();
143
144 if (dispatcher.joinable())
145 dispatcher.join();
146
147 mutexGuard.lock();
148 pendingNotifications.clear();
149 running = false;
150 mutexGuard.unlock();
151
152 eventsHandle.reset();
153}
154
155void EventListener::decodeEventCounts()
156{
157 if (eventNames.empty())
158 return;
159
160 const auto* current = resultBuffer.data();
161 auto* baseline = eventBuffer.data();
162
163 for (std::size_t i = 0; i < eventNames.size(); ++i)
164 {
165 const auto offset = countOffsets[i];
166
167 if (offset + sizeof(std::uint32_t) > resultBuffer.size() || offset + sizeof(std::uint32_t) > eventBuffer.size())
168 {
169 rawCounts[i] = 0;
170 continue;
171 }
172
173 const auto newValue = readUint32LE(current + offset);
174 const auto oldValue = readUint32LE(baseline + offset);
175 rawCounts[i] = newValue >= oldValue ? (newValue - oldValue) : 0;
176 writeUint32LE(baseline + offset, newValue);
177 }
178}
179
180void EventListener::handleEvent(unsigned length, const std::uint8_t* events)
181{
182 try
183 {
184 const auto eventBlockLength = static_cast<unsigned>(eventBuffer.size());
185 unsigned copyLength = 0;
186 bool notify = false;
187 bool shouldRequeue = false;
188
189 { // scope
190 std::lock_guard mutexGuard{mutex};
191
192 if (!listening)
193 return;
194
195 copyLength =
196 static_cast<unsigned>(std::min<std::size_t>(length, std::min(eventBuffer.size(), resultBuffer.size())));
197
198 if (copyLength == 0)
199 return;
200
201 std::memcpy(resultBuffer.data(), events, copyLength);
202
203 decodeEventCounts();
204
205 std::vector<EventCount> counts;
206 counts.reserve(eventNames.size());
207
208 for (std::size_t i = 0; i < eventNames.size(); ++i)
209 {
210 const auto value = rawCounts[i];
211
212 if (value != 0)
213 counts.push_back(EventCount{eventNames[i], static_cast<std::uint32_t>(value)});
214 }
215
216 if (first)
217 {
218 if (!counts.empty())
219 {
220 pendingNotifications.emplace_back(std::move(counts));
221 notify = true;
222 }
223 }
224 else
225 first = true;
226
227 shouldRequeue = listening;
228 }
229
230 if (notify)
231 condition.notify_one();
232
233 if (!shouldRequeue)
234 return;
235
236 auto attachmentHandle = attachment.getHandle();
237
238 if (!attachmentHandle)
239 {
240 std::lock_guard mutexGuard{mutex};
241
242 if (listening)
243 {
244 listening = false;
245 condition.notify_all();
246 }
247
248 return;
249 }
250
251 StatusWrapper statusWrapper{client};
252 FbRef<fb::IEvents> newHandle;
253
254 try
255 {
256 newHandle.reset(
257 attachmentHandle->queEvents(&statusWrapper, &firebirdCallback, eventBlockLength, eventBuffer.data()));
258 }
259 catch (...)
260 {
261 { // scope
262 std::lock_guard mutexGuard{mutex};
263
264 if (!listening)
265 return;
266
267 listening = false;
268 condition.notify_all();
269 }
270
271 return;
272 }
273
274 FbRef<fb::IEvents> previousHandle;
275
276 {
277 std::lock_guard mutexGuard{mutex};
278
279 if (listening)
280 {
281 previousHandle = std::move(eventsHandle);
282 eventsHandle = std::move(newHandle);
283 }
284 }
285 }
286 catch (...)
287 {
288 // Prevent exceptions from escaping into Firebird's C API callback.
289 // If we can't handle the event, stop listening to avoid repeated failures.
290 try
291 {
292 std::lock_guard mutexGuard{mutex};
293
294 if (listening)
295 {
296 listening = false;
297 condition.notify_all();
298 }
299 }
300 catch (...)
301 {
302 // If we can't even acquire the mutex, there's nothing we can do.
303 }
304 }
305}
306
307void EventListener::dispatchLoop()
308{
309 while (true)
310 {
311 std::vector<EventCount> notification;
312
313 { // scope
314 std::unique_lock mutexGuard{mutex};
315 condition.wait(mutexGuard, [this] { return !pendingNotifications.empty() || !listening; });
316
317 if (pendingNotifications.empty())
318 {
319 if (!listening)
320 break;
321
322 continue;
323 }
324
325 notification = std::move(pendingNotifications.front());
326 pendingNotifications.pop_front();
327 }
328
329 try
330 {
331 callback(notification);
332 }
333 catch (...)
334 {
335 assert(false);
336 }
337 }
338}
339
340void EventListener::cancelEventsHandle()
341{
342 FbRef<fb::IEvents> handle;
343
344 { // scope
345 std::lock_guard mutexGuard{mutex};
346 handle = eventsHandle;
347 }
348
349 if (!handle)
350 return;
351
352 StatusWrapper statusWrapper{client};
353
354 handle->cancel(&statusWrapper);
355}
Represents a connection to a Firebird database.
Definition Attachment.h:213
FbRef< fb::IAttachment > getHandle() noexcept
Returns the internal Firebird IAttachment handle.
Definition Attachment.h:289
bool isValid() noexcept
Returns whether the Attachment object is valid.
Definition Attachment.h:273
EventListener(Attachment &attachment, const std::vector< std::string > &eventNames, Callback callback)
Creates an event listener for the specified attachment and event names using the provided callback.
bool isListening() noexcept
Returns true if the listener is currently registered for event notifications.
void stop()
Cancels event notifications and releases related resources.
std::function< void(const std::vector< EventCount > &counts)> Callback
Function invoked when new event counts are available.
Reference-counted smart pointer for Firebird objects using addRef/release semantics.
Definition SmartPtrs.h:70
fb-cpp namespace.
Definition Attachment.h:42
Represents the number of occurrences for a registered event delivered by Firebird.