1 /*
2  * Copyright (C) 2019, HuntLabs
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17 module hunt.database.driver.postgresql.impl.pubsub.PostgreSQLSubscriberImpl;
18 
19 // import hunt.database.driver.postgresql.*;
20 // import hunt.database.driver.postgresql.pubsub.PgSubscriber;
21 // import hunt.database.driver.postgresql.pubsub.PgChannel;
22 // import io.vertx.core.*;
23 // import io.vertx.core.logging.Logger;
24 // import io.vertx.core.logging.LoggerFactory;
25 // import io.vertx.core.streams.ReadStream;
26 
27 // import java.util.*;
28 // import java.util.function.Function;
29 // import java.util.stream.Collectors;
30 
31 // class PgSubscriberImpl implements PgSubscriber {
32 
33 //     private static Logger log = LoggerFactory.getLogger(PgSubscriberImpl.class);
34 //     private static final Function!(Integer, Long) DEFAULT_RECONNECT_POLICY = count -> -1L;
35 
36 //     private final Vertx vertx;
37 //     private final PgConnectOptions options;
38 //     private Map!(String, ChannelList) channels = new HashMap<>();
39 //     private Function!(Integer, Long) reconnectPolicy = DEFAULT_RECONNECT_POLICY;
40 
41 //     private PgConnection conn;
42 //     private boolean connecting;
43 //     private boolean closed = true;
44 //     private VoidHandler closeHandler;
45 
46 //     PgSubscriberImpl(Vertx vertx, PgConnectOptions options) {
47 //         this.vertx = vertx;
48 //         this.options = new PgConnectOptions(options);
49 //     }
50 
51 //     // Identifiers in PostgreSQL are currently limited to NAMEDATALEN-1 = 63
52 //     // characters (see PostgreSQL lexical structure documentation)
53 //     static final int NAMEDATALEN = 64;
54 //     static final int MAX_CHANNEL_NAME_LENGTH = NAMEDATALEN - 1;
55 //     static String applyIdLengthLimit(String channelName) {
56 //         return channelName.length() > MAX_CHANNEL_NAME_LENGTH
57 //                 ? channelName.substring(0, MAX_CHANNEL_NAME_LENGTH) : channelName;
58 //     }
59 
60 //     private void handleNotification(PgNotification notif) {
61 //         List!(Handler!(String)) handlers = new ArrayList<>();
62 //         synchronized (this) {
63 //             ChannelList channel = channels.get(notif.getChannel());
64 //             if (channel !is null) {
65 //                 channel.subs.forEach(sub -> {
66 //                     if (!sub.paused) {
67 //                         Handler!(String) handler = sub.eventHandler;
68 //                         if (handler !is null) {
69 //                             handlers.add(handler);
70 //                         } else {
71 //                             // Race ?
72 //                         }
73 //                     }
74 //                 });
75 //             } else {
76 //                 // Race ?
77 //             }
78 //         }
79 //         handlers.forEach(handler -> {
80 //             handler.handle(notif.getPayload());
81 //         });
82 //     }
83 
84 //     override
85 //     synchronized PgSubscriber closeHandler(VoidHandler handler) {
86 //         closeHandler = handler;
87 //         return this;
88 //     }
89 
90 //     override
91 //     synchronized PgSubscriber reconnectPolicy(Function!(Integer, Long) policy) {
92 //         if (policy is null) {
93 //             reconnectPolicy = DEFAULT_RECONNECT_POLICY;
94 //         } else {
95 //             reconnectPolicy = policy;
96 //         }
97 //         return this;
98 //     }
99 
100 //     private synchronized void handleClose(Void v) {
101 //         conn = null;
102 //         checkReconnect(0);
103 //     }
104 
105 //     private void checkReconnect(int count) {
106 //         if (!closed) {
107 //             Long val = reconnectPolicy.apply(count);
108 //             if (val >= 0) {
109 //                 tryConnect(val, ar -> {
110 //                     if (ar.failed()) {
111 //                         checkReconnect(count + 1);
112 //                     }
113 //                 });
114 //                 return;
115 //             }
116 //             closed = true;
117 //         }
118 //         List!(VoidHandler) all = channels
119 //             .values()
120 //             .stream()
121 //             .flatMap(channel -> channel.subs.stream())
122 //             .map(sub -> sub.endHandler)
123 //             .filter(Objects::nonNull)
124 //             .collect(Collectors.toList());
125 //         channels.clear();
126 //         all.forEach(handler -> handler.handle(null));
127 //         VoidHandler handler = closeHandler;
128 //         if (handler !is null) {
129 //             handler.handle(null);
130 //         }
131 //     }
132 
133 //     override
134 //     synchronized boolean closed() {
135 //         return closed;
136 //     }
137 
138 //     override
139 //     synchronized PgConnection actualConnection() {
140 //         return conn;
141 //     }
142 
143 //     override
144 //     synchronized PgSubscriber connect(VoidHandler handler) {
145 //         if (closed) {
146 //             closed = false;
147 //             tryConnect(0, handler);
148 //         }
149 //         return this;
150 //     }
151 
152 //     private void tryConnect(long delayMillis, VoidHandler handler) {
153 //         if (!connecting) {
154 //             connecting = true;
155 //             if (delayMillis > 0) {
156 //                 vertx.setTimer(delayMillis, v -> doConnect(handler));
157 //             } else {
158 //                 doConnect(handler);
159 //             }
160 //         }
161 //     }
162 
163 //     private void doConnect(VoidHandler completionHandler) {
164 //         PgConnection.connect(vertx, options, ar -> handleConnectResult(completionHandler, ar));
165 //     }
166 
167 //     private synchronized void handleConnectResult(VoidHandler completionHandler, AsyncResult!(PgConnection) ar1) {
168 //         connecting = false;
169 //         if (ar1.succeeded()) {
170 //             conn = ar1.result();
171 //             conn.notificationHandler(PgSubscriberImpl.this::handleNotification);
172 //             conn.closeHandler(PgSubscriberImpl.this::handleClose);
173 //             if (channels.size() > 0) {
174 //                 List!(VoidHandler) handlers = channels.values()
175 //                     .stream()
176 //                     .flatMap(channel -> channel.subs.stream())
177 //                     .map(sub -> sub.subscribeHandler)
178 //                     .filter(Objects::nonNull)
179 //                     .collect(Collectors.toList());
180 //                 String sql = channels.values()
181 //                     .stream()
182 //                     .map(channel -> {
183 //                         channel.subscribed = true;
184 //                         return channel.quotedName;
185 //                     })
186 //                     .collect(Collectors.joining(";LISTEN ", "LISTEN ", ""));
187 //                 conn.query(sql, ar2 -> {
188 //                     if (ar2.failed()) {
189 //                         log.error("Cannot LISTEN to channels", ar2.cause());
190 //                         conn.close();
191 //                     } else {
192 //                         handlers.forEach(vertx::runOnContext);
193 //                     }
194 //                     completionHandler.handle(ar2.mapEmpty());
195 //                 });
196 //                 return;
197 //             }
198 //         }
199 //         completionHandler.handle(ar1.mapEmpty());
200 //     }
201 
202 //     private class ChannelList {
203 
204 //         final String name;
205 //     final String quotedName;
206 //         final ArrayList!(ChannelImpl) subs = new ArrayList<>();
207 //         boolean subscribed;
208 
209 //         ChannelList(String name) {
210 //             this.name = name;
211 //         quotedName = "\"" ~ name.replace("\"", "\"\"") ~ "\"";
212 //         }
213 
214 //         void add(ChannelImpl sub) {
215 //             subs.add(sub);
216 //             if (!subscribed) {
217 //                 if (conn !is null) {
218 //                     subscribed = true;
219 //                     String sql = "LISTEN " ~ quotedName;
220 //                     conn.query(sql, ar -> {
221 //                         if (ar.succeeded()) {
222 //                             VoidHandler handler = sub.subscribeHandler;
223 //                             if (handler !is null) {
224 //                                 handler.handle(null);
225 //                             }
226 //                         } else {
227 //                             log.error("Cannot LISTEN to channel " ~ name, ar.cause());
228 //                         }
229 //                     });
230 //                 }
231 //             }
232 //         }
233 
234 //         void remove(ChannelImpl sub) {
235 //             subs.remove(sub);
236 //             if (subs.isEmpty()) {
237 //                 channels.remove(name, this);
238 //                 if (conn !is null) {
239 //                     conn.query("UNLISTEN " ~ quotedName, ar -> {
240 //                         if (ar.failed()) {
241 //                             log.error("Cannot UNLISTEN channel " ~ name, ar.cause());
242 //                         }
243 //                     });
244 //                 }
245 //             }
246 //         }
247 //     }
248 
249 //     private class ChannelImpl implements PgChannel {
250 
251 //         private final String name;
252 //         private VoidHandler subscribeHandler;
253 //         private Handler!(String) eventHandler;
254 //         private VoidHandler endHandler;
255 //         private ChannelList channel;
256 //         private boolean paused;
257 
258 //         ChannelImpl(String name) {
259 //             this.name = applyIdLengthLimit(name);
260 //         }
261 
262 //         override
263 //         PgChannel subscribeHandler(VoidHandler handler) {
264 //             synchronized (PgSubscriberImpl.this) {
265 //                 subscribeHandler = handler;
266 //             }
267 //             return this;
268 //         }
269 
270 //         override
271 //         ChannelImpl exceptionHandler(Handler!(Throwable) handler) {
272 //             return this;
273 //         }
274 
275 //         override
276 //         ChannelImpl handler(Handler!(String) handler) {
277 //             synchronized (PgSubscriberImpl.this) {
278 //                 if (handler !is null) {
279 //                     eventHandler = handler;
280 //                     if (channel is null) {
281 //                         channel = channels.computeIfAbsent(name, ChannelList::new);
282 //                         channel.add(this);
283 //                     }
284 //                 } else {
285 //                     if (channel !is null) {
286 //                         ChannelList ch = channel;
287 //                         channel = null;
288 //                         ch.remove(this);
289 //                         VoidHandler _handler = endHandler;
290 //                         if (_handler !is null) {
291 //                             _handler.handle(null);
292 //                         }
293 //                     }
294 //                 }
295 //             }
296 //             return this;
297 //         }
298 
299 //         override
300 //         ChannelImpl endHandler(VoidHandler handler) {
301 //             synchronized (PgSubscriberImpl.this) {
302 //                 endHandler = handler;
303 //             }
304 //             return this;
305 //         }
306 
307 //         override
308 //         ChannelImpl pause() {
309 //             synchronized (PgSubscriberImpl.this) {
310 //                 paused = true;
311 //             }
312 //             return this;
313 //         }
314 
315 //         override
316 //         ChannelImpl resume() {
317 //             synchronized (PgSubscriberImpl.this) {
318 //                 paused = false;
319 //             }
320 //             return this;
321 //         }
322 
323 //         // Since Vert.x 3.6.0 : todo
324 //         ReadStream!(String) fetch(long amount) {
325 //             throw new UnsupportedOperationException();
326 //         }
327 //     }
328 
329 //     override
330 //     void close() {
331 //         synchronized (PgSubscriberImpl.this) {
332 //             if (!closed) {
333 //                 closed = true;
334 //                 if (conn !is null) {
335 //                     conn.close();
336 //                 }
337 //             }
338 //         }
339 //     }
340 
341 //     override
342 //     PgChannel channel(String name) {
343 //         return new ChannelImpl(name);
344 //     }
345 // }