fb-cpp 0.0.1
A modern C++ wrapper for the Firebird database API
Loading...
Searching...
No Matches
EventListener.cpp
1/*
2 * MIT License
3 *
4 * Copyright (c) 2025 Adriano dos Santos Fernandes
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to deal
8 * in the Software without restriction, including without limitation the rights
9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 * copies of the Software, and to permit persons to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in all
13 * copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24#include "EventListener.h"
25#include "Exception.h"
26#include <algorithm>
27#include <cassert>
28#include <cstring>
29#include <stdexcept>
30#include <utility>
31#include <limits>
32
33using namespace fbcpp;
34using namespace fbcpp::impl;
35
36
37static std::uint32_t readUint32LE(const std::uint8_t* data) noexcept
38{
39 return static_cast<std::uint32_t>(data[0]) | (static_cast<std::uint32_t>(data[1]) << 8) |
40 (static_cast<std::uint32_t>(data[2]) << 16) | (static_cast<std::uint32_t>(data[3]) << 24);
41}
42
43static void writeUint32LE(std::uint8_t* data, std::uint32_t value) noexcept
44{
45 data[0] = static_cast<std::uint8_t>(value & 0xFF);
46 data[1] = static_cast<std::uint8_t>((value >> 8) & 0xFF);
47 data[2] = static_cast<std::uint8_t>((value >> 16) & 0xFF);
48 data[3] = static_cast<std::uint8_t>((value >> 24) & 0xFF);
49}
50
51
52EventListener::EventListener(Attachment& attachment, const std::vector<std::string>& eventNames, Callback callback)
53 : attachment{attachment},
54 client{attachment.getClient()},
55 eventNames{eventNames},
56 callback{callback},
57 firebirdCallback{*this}
58{
59 assert(attachment.isValid());
60
61 if (eventNames.empty())
62 throw std::invalid_argument{"An EventListener requires at least one event"};
63
64 if (!callback)
65 throw std::invalid_argument{"EventListener callback must not be empty"};
66
67 for (const auto& name : eventNames)
68 {
69 if (name.empty())
70 throw std::invalid_argument{"Event names must not be empty"};
71
72 if (name.size() > std::numeric_limits<std::uint8_t>::max())
73 throw std::invalid_argument{"Event names must be shorter than 256 bytes"};
74 }
75
76 if (eventNames.size() > std::numeric_limits<std::uint8_t>::max())
77 throw std::invalid_argument{"Number of events must be smaller than 256"};
78
79 std::size_t bufferLength = 1; // Event block version byte
80
81 for (const auto& name : eventNames)
82 bufferLength += 1 + name.size() + sizeof(std::uint32_t);
83
84 eventBuffer.assign(bufferLength, 0);
85 resultBuffer.assign(bufferLength, 0);
86
87 auto* eventBufferPtr = eventBuffer.data();
88 *eventBufferPtr++ = 1; // Event parameter block version.
89 countOffsets.resize(eventNames.size());
90
91 for (std::size_t i = 0; i < eventNames.size(); ++i)
92 {
93 const auto& name = eventNames[i];
94 *eventBufferPtr++ = static_cast<std::uint8_t>(name.size());
95 std::memcpy(eventBufferPtr, name.data(), name.size());
96 eventBufferPtr += name.size();
97 countOffsets[i] = static_cast<unsigned>(eventBufferPtr - eventBuffer.data());
98 eventBufferPtr += sizeof(std::uint32_t);
99 }
100
101 assert(static_cast<std::size_t>(eventBufferPtr - eventBuffer.data()) == eventBuffer.size());
102
103 rawCounts.resize(eventNames.size());
104 listening = true;
105 running = true;
106
107 const auto status = client.newStatus();
108 StatusWrapper statusWrapper{client, status.get()};
109
110 eventsHandle.reset(attachment.getHandle()->queEvents(
111 &statusWrapper, &firebirdCallback, static_cast<unsigned>(eventBuffer.size()), eventBuffer.data()));
112
113 dispatcher = std::thread{&EventListener::dispatchLoop, this};
114}
115
117{
118 std::lock_guard mutexGuard{mutex};
119 return listening;
120}
121
123{
124 std::unique_lock mutexGuard{mutex};
125
126 if (!running)
127 return;
128
129 listening = false;
130 mutexGuard.unlock();
131
132 try
133 {
134 cancelEventsHandle();
135 }
136 catch (...)
137 {
138 // swallow
139 }
140
141 firebirdCallback.detach();
142
143 condition.notify_all();
144
145 if (dispatcher.joinable())
146 dispatcher.join();
147
148 mutexGuard.lock();
149 pendingNotifications.clear();
150 running = false;
151 mutexGuard.unlock();
152
153 eventsHandle.reset();
154}
155
156void EventListener::decodeEventCounts()
157{
158 if (eventNames.empty())
159 return;
160
161 const auto* current = resultBuffer.data();
162 auto* baseline = eventBuffer.data();
163
164 for (std::size_t i = 0; i < eventNames.size(); ++i)
165 {
166 const auto offset = countOffsets[i];
167
168 if (offset + sizeof(std::uint32_t) > resultBuffer.size() || offset + sizeof(std::uint32_t) > eventBuffer.size())
169 {
170 rawCounts[i] = 0;
171 continue;
172 }
173
174 const auto newValue = readUint32LE(current + offset);
175 const auto oldValue = readUint32LE(baseline + offset);
176 rawCounts[i] = newValue >= oldValue ? (newValue - oldValue) : 0;
177 writeUint32LE(baseline + offset, newValue);
178 }
179}
180
181void EventListener::handleEvent(unsigned length, const std::uint8_t* events)
182{
183 const auto eventBlockLength = static_cast<unsigned>(eventBuffer.size());
184 unsigned copyLength = 0;
185 bool notify = false;
186 bool shouldRequeue = false;
187
188 {
189 std::lock_guard mutexGuard{mutex};
190
191 if (!listening)
192 return;
193
194 copyLength =
195 static_cast<unsigned>(std::min<std::size_t>(length, std::min(eventBuffer.size(), resultBuffer.size())));
196
197 if (copyLength == 0)
198 return;
199
200 std::memcpy(resultBuffer.data(), events, copyLength);
201
202 decodeEventCounts();
203
204 std::vector<EventCount> counts;
205 counts.reserve(eventNames.size());
206
207 for (std::size_t i = 0; i < eventNames.size(); ++i)
208 {
209 const auto value = rawCounts[i];
210
211 if (value != 0)
212 counts.push_back(EventCount{eventNames[i], static_cast<std::uint32_t>(value)});
213 }
214
215 if (first)
216 {
217 if (!counts.empty())
218 {
219 pendingNotifications.emplace_back(std::move(counts));
220 notify = true;
221 }
222 }
223 else
224 first = true;
225
226 shouldRequeue = listening;
227 }
228
229 if (notify)
230 condition.notify_one();
231
232 if (!shouldRequeue)
233 return;
234
235 auto attachmentHandle = attachment.getHandle();
236
237 if (!attachmentHandle)
238 {
239 std::lock_guard mutexGuard{mutex};
240
241 if (listening)
242 {
243 listening = false;
244 condition.notify_all();
245 }
246
247 return;
248 }
249
250 const auto status = client.newStatus();
251 StatusWrapper statusWrapper{client, status.get()};
252 FbRef<fb::IEvents> newHandle;
253
254 try
255 {
256 newHandle.reset(
257 attachmentHandle->queEvents(&statusWrapper, &firebirdCallback, eventBlockLength, eventBuffer.data()));
258 }
259 catch (...)
260 {
261 { // scope
262 std::lock_guard mutexGuard{mutex};
263
264 if (!listening)
265 return;
266
267 listening = false;
268 condition.notify_all();
269 }
270
271 return;
272 }
273
274 std::lock_guard mutexGuard{mutex};
275
276 if (listening)
277 eventsHandle = std::move(newHandle);
278}
279
280void EventListener::dispatchLoop()
281{
282 while (true)
283 {
284 std::vector<EventCount> notification;
285
286 { // scope
287 std::unique_lock mutexGuard{mutex};
288 condition.wait(mutexGuard, [this] { return !pendingNotifications.empty() || !listening; });
289
290 if (pendingNotifications.empty())
291 {
292 if (!listening)
293 break;
294
295 continue;
296 }
297
298 notification = std::move(pendingNotifications.front());
299 pendingNotifications.pop_front();
300 }
301
302 try
303 {
304 callback(notification);
305 }
306 catch (...)
307 {
308 assert(false);
309 }
310 }
311}
312
313void EventListener::cancelEventsHandle()
314{
315 FbRef<fb::IEvents> handle;
316
317 { // scope
318 std::lock_guard mutexGuard{mutex};
319 handle = eventsHandle;
320 }
321
322 if (!handle)
323 return;
324
325 const auto status = client.newStatus();
326 StatusWrapper statusWrapper{client, status.get()};
327
328 handle->cancel(&statusWrapper);
329}
Represents a connection to a Firebird database.
Definition Attachment.h:177
FbRef< fb::IAttachment > getHandle() noexcept
Returns the internal Firebird IAttachment handle.
Definition Attachment.h:238
bool isValid() noexcept
Returns whether the Attachment object is valid.
Definition Attachment.h:222
FbUniquePtr< fb::IStatus > newStatus()
Creates and returns a Firebird IStatus instance.
Definition Client.h:183
EventListener(Attachment &attachment, const std::vector< std::string > &eventNames, Callback callback)
Creates an event listener for the specified attachment and event names using the provided callback.
bool isListening() noexcept
Returns true if the listener is currently registered for event notifications.
void stop()
Cancels event notifications and releases related resources.
std::function< void(const std::vector< EventCount > &counts)> Callback
Function invoked when new event counts are available.
fb-cpp namespace.
Definition Attachment.h:42
Represents the number of occurrences for a registered event delivered by Firebird.