fb-cpp 0.0.2
A modern C++ wrapper for the Firebird database API
Loading...
Searching...
No Matches
EventListener.cpp
1/*
2 * MIT License
3 *
4 * Copyright (c) 2025 Adriano dos Santos Fernandes
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to deal
8 * in the Software without restriction, including without limitation the rights
9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 * copies of the Software, and to permit persons to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in all
13 * copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24#include "EventListener.h"
25#include "Exception.h"
26#include <algorithm>
27#include <cassert>
28#include <cstring>
29#include <stdexcept>
30#include <utility>
31#include <limits>
32
33using namespace fbcpp;
34using namespace fbcpp::impl;
35
36
37static std::uint32_t readUint32LE(const std::uint8_t* data) noexcept
38{
39 return static_cast<std::uint32_t>(data[0]) | (static_cast<std::uint32_t>(data[1]) << 8) |
40 (static_cast<std::uint32_t>(data[2]) << 16) | (static_cast<std::uint32_t>(data[3]) << 24);
41}
42
43static void writeUint32LE(std::uint8_t* data, std::uint32_t value) noexcept
44{
45 data[0] = static_cast<std::uint8_t>(value & 0xFF);
46 data[1] = static_cast<std::uint8_t>((value >> 8) & 0xFF);
47 data[2] = static_cast<std::uint8_t>((value >> 16) & 0xFF);
48 data[3] = static_cast<std::uint8_t>((value >> 24) & 0xFF);
49}
50
51
52EventListener::EventListener(Attachment& attachment, const std::vector<std::string>& eventNames, Callback callback)
53 : attachment{attachment},
54 client{attachment.getClient()},
55 eventNames{eventNames},
56 callback{callback},
57 firebirdCallback{*this}
58{
59 assert(attachment.isValid());
60
61 if (eventNames.empty())
62 throw std::invalid_argument{"An EventListener requires at least one event"};
63
64 if (!callback)
65 throw std::invalid_argument{"EventListener callback must not be empty"};
66
67 for (const auto& name : eventNames)
68 {
69 if (name.empty())
70 throw std::invalid_argument{"Event names must not be empty"};
71
72 if (name.size() > std::numeric_limits<std::uint8_t>::max())
73 throw std::invalid_argument{"Event names must be shorter than 256 bytes"};
74 }
75
76 if (eventNames.size() > std::numeric_limits<std::uint8_t>::max())
77 throw std::invalid_argument{"Number of events must be smaller than 256"};
78
79 std::size_t bufferLength = 1; // Event block version byte
80
81 for (const auto& name : eventNames)
82 bufferLength += 1 + name.size() + sizeof(std::uint32_t);
83
84 eventBuffer.assign(bufferLength, 0);
85 resultBuffer.assign(bufferLength, 0);
86
87 auto* eventBufferPtr = eventBuffer.data();
88 *eventBufferPtr++ = 1; // Event parameter block version.
89 countOffsets.resize(eventNames.size());
90
91 for (std::size_t i = 0; i < eventNames.size(); ++i)
92 {
93 const auto& name = eventNames[i];
94 *eventBufferPtr++ = static_cast<std::uint8_t>(name.size());
95 std::memcpy(eventBufferPtr, name.data(), name.size());
96 eventBufferPtr += name.size();
97 countOffsets[i] = static_cast<unsigned>(eventBufferPtr - eventBuffer.data());
98 eventBufferPtr += sizeof(std::uint32_t);
99 }
100
101 assert(static_cast<std::size_t>(eventBufferPtr - eventBuffer.data()) == eventBuffer.size());
102
103 rawCounts.resize(eventNames.size());
104 listening = true;
105 running = true;
106
107 const auto status = client.newStatus();
108 StatusWrapper statusWrapper{client, status.get()};
109
110 eventsHandle.reset(attachment.getHandle()->queEvents(
111 &statusWrapper, &firebirdCallback, static_cast<unsigned>(eventBuffer.size()), eventBuffer.data()));
112
113 dispatcher = std::thread{&EventListener::dispatchLoop, this};
114}
115
117{
118 std::lock_guard mutexGuard{mutex};
119 return listening;
120}
121
123{
124 std::unique_lock mutexGuard{mutex};
125
126 if (!running)
127 return;
128
129 listening = false;
130 mutexGuard.unlock();
131
132 try
133 {
134 cancelEventsHandle();
135 }
136 catch (...)
137 {
138 // swallow
139 }
140
141 firebirdCallback.detach();
142
143 condition.notify_all();
144
145 if (dispatcher.joinable())
146 dispatcher.join();
147
148 mutexGuard.lock();
149 pendingNotifications.clear();
150 running = false;
151 mutexGuard.unlock();
152
153 eventsHandle.reset();
154}
155
156void EventListener::decodeEventCounts()
157{
158 if (eventNames.empty())
159 return;
160
161 const auto* current = resultBuffer.data();
162 auto* baseline = eventBuffer.data();
163
164 for (std::size_t i = 0; i < eventNames.size(); ++i)
165 {
166 const auto offset = countOffsets[i];
167
168 if (offset + sizeof(std::uint32_t) > resultBuffer.size() || offset + sizeof(std::uint32_t) > eventBuffer.size())
169 {
170 rawCounts[i] = 0;
171 continue;
172 }
173
174 const auto newValue = readUint32LE(current + offset);
175 const auto oldValue = readUint32LE(baseline + offset);
176 rawCounts[i] = newValue >= oldValue ? (newValue - oldValue) : 0;
177 writeUint32LE(baseline + offset, newValue);
178 }
179}
180
181void EventListener::handleEvent(unsigned length, const std::uint8_t* events)
182{
183 try
184 {
185 const auto eventBlockLength = static_cast<unsigned>(eventBuffer.size());
186 unsigned copyLength = 0;
187 bool notify = false;
188 bool shouldRequeue = false;
189
190 { // scope
191 std::lock_guard mutexGuard{mutex};
192
193 if (!listening)
194 return;
195
196 copyLength =
197 static_cast<unsigned>(std::min<std::size_t>(length, std::min(eventBuffer.size(), resultBuffer.size())));
198
199 if (copyLength == 0)
200 return;
201
202 std::memcpy(resultBuffer.data(), events, copyLength);
203
204 decodeEventCounts();
205
206 std::vector<EventCount> counts;
207 counts.reserve(eventNames.size());
208
209 for (std::size_t i = 0; i < eventNames.size(); ++i)
210 {
211 const auto value = rawCounts[i];
212
213 if (value != 0)
214 counts.push_back(EventCount{eventNames[i], static_cast<std::uint32_t>(value)});
215 }
216
217 if (first)
218 {
219 if (!counts.empty())
220 {
221 pendingNotifications.emplace_back(std::move(counts));
222 notify = true;
223 }
224 }
225 else
226 first = true;
227
228 shouldRequeue = listening;
229 }
230
231 if (notify)
232 condition.notify_one();
233
234 if (!shouldRequeue)
235 return;
236
237 auto attachmentHandle = attachment.getHandle();
238
239 if (!attachmentHandle)
240 {
241 std::lock_guard mutexGuard{mutex};
242
243 if (listening)
244 {
245 listening = false;
246 condition.notify_all();
247 }
248
249 return;
250 }
251
252 const auto status = client.newStatus();
253 StatusWrapper statusWrapper{client, status.get()};
254 FbRef<fb::IEvents> newHandle;
255
256 try
257 {
258 newHandle.reset(
259 attachmentHandle->queEvents(&statusWrapper, &firebirdCallback, eventBlockLength, eventBuffer.data()));
260 }
261 catch (...)
262 {
263 { // scope
264 std::lock_guard mutexGuard{mutex};
265
266 if (!listening)
267 return;
268
269 listening = false;
270 condition.notify_all();
271 }
272
273 return;
274 }
275
276 FbRef<fb::IEvents> previousHandle;
277
278 {
279 std::lock_guard mutexGuard{mutex};
280
281 if (listening)
282 {
283 previousHandle = std::move(eventsHandle);
284 eventsHandle = std::move(newHandle);
285 }
286 }
287 }
288 catch (...)
289 {
290 // Prevent exceptions from escaping into Firebird's C API callback.
291 // If we can't handle the event, stop listening to avoid repeated failures.
292 try
293 {
294 std::lock_guard mutexGuard{mutex};
295
296 if (listening)
297 {
298 listening = false;
299 condition.notify_all();
300 }
301 }
302 catch (...)
303 {
304 // If we can't even acquire the mutex, there's nothing we can do.
305 }
306 }
307}
308
309void EventListener::dispatchLoop()
310{
311 while (true)
312 {
313 std::vector<EventCount> notification;
314
315 { // scope
316 std::unique_lock mutexGuard{mutex};
317 condition.wait(mutexGuard, [this] { return !pendingNotifications.empty() || !listening; });
318
319 if (pendingNotifications.empty())
320 {
321 if (!listening)
322 break;
323
324 continue;
325 }
326
327 notification = std::move(pendingNotifications.front());
328 pendingNotifications.pop_front();
329 }
330
331 try
332 {
333 callback(notification);
334 }
335 catch (...)
336 {
337 assert(false);
338 }
339 }
340}
341
342void EventListener::cancelEventsHandle()
343{
344 FbRef<fb::IEvents> handle;
345
346 { // scope
347 std::lock_guard mutexGuard{mutex};
348 handle = eventsHandle;
349 }
350
351 if (!handle)
352 return;
353
354 const auto status = client.newStatus();
355 StatusWrapper statusWrapper{client, status.get()};
356
357 handle->cancel(&statusWrapper);
358}
Represents a connection to a Firebird database.
Definition Attachment.h:177
FbRef< fb::IAttachment > getHandle() noexcept
Returns the internal Firebird IAttachment handle.
Definition Attachment.h:238
bool isValid() noexcept
Returns whether the Attachment object is valid.
Definition Attachment.h:222
FbUniquePtr< fb::IStatus > newStatus()
Creates and returns a Firebird IStatus instance.
Definition Client.h:199
EventListener(Attachment &attachment, const std::vector< std::string > &eventNames, Callback callback)
Creates an event listener for the specified attachment and event names using the provided callback.
bool isListening() noexcept
Returns true if the listener is currently registered for event notifications.
void stop()
Cancels event notifications and releases related resources.
std::function< void(const std::vector< EventCount > &counts)> Callback
Function invoked when new event counts are available.
Reference-counted smart pointer for Firebird objects using addRef/release semantics.
Definition SmartPtrs.h:70
fb-cpp namespace.
Definition Attachment.h:42
Represents the number of occurrences for a registered event delivered by Firebird.