1 /* 2 * Copyright (C) 2019, HuntLabs 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 * 16 */ 17 module hunt.database.driver.postgresql.impl.pubsub.PostgreSQLSubscriberImpl; 18 19 // import hunt.database.driver.postgresql.*; 20 // import hunt.database.driver.postgresql.pubsub.PgSubscriber; 21 // import hunt.database.driver.postgresql.pubsub.PgChannel; 22 // import io.vertx.core.*; 23 // import io.vertx.core.logging.Logger; 24 // import io.vertx.core.logging.LoggerFactory; 25 // import io.vertx.core.streams.ReadStream; 26 27 // import java.util.*; 28 // import java.util.function.Function; 29 // import java.util.stream.Collectors; 30 31 // class PgSubscriberImpl implements PgSubscriber { 32 33 // private static Logger log = LoggerFactory.getLogger(PgSubscriberImpl.class); 34 // private static final Function!(Integer, Long) DEFAULT_RECONNECT_POLICY = count -> -1L; 35 36 // private final Vertx vertx; 37 // private final PgConnectOptions options; 38 // private Map!(String, ChannelList) channels = new HashMap<>(); 39 // private Function!(Integer, Long) reconnectPolicy = DEFAULT_RECONNECT_POLICY; 40 41 // private PgConnection conn; 42 // private boolean connecting; 43 // private boolean closed = true; 44 // private VoidHandler closeHandler; 45 46 // PgSubscriberImpl(Vertx vertx, PgConnectOptions options) { 47 // this.vertx = vertx; 48 // this.options = new PgConnectOptions(options); 49 // } 50 51 // // Identifiers in PostgreSQL are currently limited to NAMEDATALEN-1 = 63 52 // // characters (see PostgreSQL lexical structure documentation) 53 // static final int NAMEDATALEN = 64; 54 // static final int MAX_CHANNEL_NAME_LENGTH = NAMEDATALEN - 1; 55 // static String applyIdLengthLimit(String channelName) { 56 // return channelName.length() > MAX_CHANNEL_NAME_LENGTH 57 // ? channelName.substring(0, MAX_CHANNEL_NAME_LENGTH) : channelName; 58 // } 59 60 // private void handleNotification(PgNotification notif) { 61 // List!(Handler!(String)) handlers = new ArrayList<>(); 62 // synchronized (this) { 63 // ChannelList channel = channels.get(notif.getChannel()); 64 // if (channel !is null) { 65 // channel.subs.forEach(sub -> { 66 // if (!sub.paused) { 67 // Handler!(String) handler = sub.eventHandler; 68 // if (handler !is null) { 69 // handlers.add(handler); 70 // } else { 71 // // Race ? 72 // } 73 // } 74 // }); 75 // } else { 76 // // Race ? 77 // } 78 // } 79 // handlers.forEach(handler -> { 80 // handler.handle(notif.getPayload()); 81 // }); 82 // } 83 84 // override 85 // synchronized PgSubscriber closeHandler(VoidHandler handler) { 86 // closeHandler = handler; 87 // return this; 88 // } 89 90 // override 91 // synchronized PgSubscriber reconnectPolicy(Function!(Integer, Long) policy) { 92 // if (policy is null) { 93 // reconnectPolicy = DEFAULT_RECONNECT_POLICY; 94 // } else { 95 // reconnectPolicy = policy; 96 // } 97 // return this; 98 // } 99 100 // private synchronized void handleClose(Void v) { 101 // conn = null; 102 // checkReconnect(0); 103 // } 104 105 // private void checkReconnect(int count) { 106 // if (!closed) { 107 // Long val = reconnectPolicy.apply(count); 108 // if (val >= 0) { 109 // tryConnect(val, ar -> { 110 // if (ar.failed()) { 111 // checkReconnect(count + 1); 112 // } 113 // }); 114 // return; 115 // } 116 // closed = true; 117 // } 118 // List!(VoidHandler) all = channels 119 // .values() 120 // .stream() 121 // .flatMap(channel -> channel.subs.stream()) 122 // .map(sub -> sub.endHandler) 123 // .filter(Objects::nonNull) 124 // .collect(Collectors.toList()); 125 // channels.clear(); 126 // all.forEach(handler -> handler.handle(null)); 127 // VoidHandler handler = closeHandler; 128 // if (handler !is null) { 129 // handler.handle(null); 130 // } 131 // } 132 133 // override 134 // synchronized boolean closed() { 135 // return closed; 136 // } 137 138 // override 139 // synchronized PgConnection actualConnection() { 140 // return conn; 141 // } 142 143 // override 144 // synchronized PgSubscriber connect(VoidHandler handler) { 145 // if (closed) { 146 // closed = false; 147 // tryConnect(0, handler); 148 // } 149 // return this; 150 // } 151 152 // private void tryConnect(long delayMillis, VoidHandler handler) { 153 // if (!connecting) { 154 // connecting = true; 155 // if (delayMillis > 0) { 156 // vertx.setTimer(delayMillis, v -> doConnect(handler)); 157 // } else { 158 // doConnect(handler); 159 // } 160 // } 161 // } 162 163 // private void doConnect(VoidHandler completionHandler) { 164 // PgConnection.connect(vertx, options, ar -> handleConnectResult(completionHandler, ar)); 165 // } 166 167 // private synchronized void handleConnectResult(VoidHandler completionHandler, AsyncResult!(PgConnection) ar1) { 168 // connecting = false; 169 // if (ar1.succeeded()) { 170 // conn = ar1.result(); 171 // conn.notificationHandler(PgSubscriberImpl.this::handleNotification); 172 // conn.closeHandler(PgSubscriberImpl.this::handleClose); 173 // if (channels.size() > 0) { 174 // List!(VoidHandler) handlers = channels.values() 175 // .stream() 176 // .flatMap(channel -> channel.subs.stream()) 177 // .map(sub -> sub.subscribeHandler) 178 // .filter(Objects::nonNull) 179 // .collect(Collectors.toList()); 180 // String sql = channels.values() 181 // .stream() 182 // .map(channel -> { 183 // channel.subscribed = true; 184 // return channel.quotedName; 185 // }) 186 // .collect(Collectors.joining(";LISTEN ", "LISTEN ", "")); 187 // conn.query(sql, ar2 -> { 188 // if (ar2.failed()) { 189 // log.error("Cannot LISTEN to channels", ar2.cause()); 190 // conn.close(); 191 // } else { 192 // handlers.forEach(vertx::runOnContext); 193 // } 194 // completionHandler.handle(ar2.mapEmpty()); 195 // }); 196 // return; 197 // } 198 // } 199 // completionHandler.handle(ar1.mapEmpty()); 200 // } 201 202 // private class ChannelList { 203 204 // final String name; 205 // final String quotedName; 206 // final ArrayList!(ChannelImpl) subs = new ArrayList<>(); 207 // boolean subscribed; 208 209 // ChannelList(String name) { 210 // this.name = name; 211 // quotedName = "\"" ~ name.replace("\"", "\"\"") ~ "\""; 212 // } 213 214 // void add(ChannelImpl sub) { 215 // subs.add(sub); 216 // if (!subscribed) { 217 // if (conn !is null) { 218 // subscribed = true; 219 // String sql = "LISTEN " ~ quotedName; 220 // conn.query(sql, ar -> { 221 // if (ar.succeeded()) { 222 // VoidHandler handler = sub.subscribeHandler; 223 // if (handler !is null) { 224 // handler.handle(null); 225 // } 226 // } else { 227 // log.error("Cannot LISTEN to channel " ~ name, ar.cause()); 228 // } 229 // }); 230 // } 231 // } 232 // } 233 234 // void remove(ChannelImpl sub) { 235 // subs.remove(sub); 236 // if (subs.isEmpty()) { 237 // channels.remove(name, this); 238 // if (conn !is null) { 239 // conn.query("UNLISTEN " ~ quotedName, ar -> { 240 // if (ar.failed()) { 241 // log.error("Cannot UNLISTEN channel " ~ name, ar.cause()); 242 // } 243 // }); 244 // } 245 // } 246 // } 247 // } 248 249 // private class ChannelImpl implements PgChannel { 250 251 // private final String name; 252 // private VoidHandler subscribeHandler; 253 // private Handler!(String) eventHandler; 254 // private VoidHandler endHandler; 255 // private ChannelList channel; 256 // private boolean paused; 257 258 // ChannelImpl(String name) { 259 // this.name = applyIdLengthLimit(name); 260 // } 261 262 // override 263 // PgChannel subscribeHandler(VoidHandler handler) { 264 // synchronized (PgSubscriberImpl.this) { 265 // subscribeHandler = handler; 266 // } 267 // return this; 268 // } 269 270 // override 271 // ChannelImpl exceptionHandler(Handler!(Throwable) handler) { 272 // return this; 273 // } 274 275 // override 276 // ChannelImpl handler(Handler!(String) handler) { 277 // synchronized (PgSubscriberImpl.this) { 278 // if (handler !is null) { 279 // eventHandler = handler; 280 // if (channel is null) { 281 // channel = channels.computeIfAbsent(name, ChannelList::new); 282 // channel.add(this); 283 // } 284 // } else { 285 // if (channel !is null) { 286 // ChannelList ch = channel; 287 // channel = null; 288 // ch.remove(this); 289 // VoidHandler _handler = endHandler; 290 // if (_handler !is null) { 291 // _handler.handle(null); 292 // } 293 // } 294 // } 295 // } 296 // return this; 297 // } 298 299 // override 300 // ChannelImpl endHandler(VoidHandler handler) { 301 // synchronized (PgSubscriberImpl.this) { 302 // endHandler = handler; 303 // } 304 // return this; 305 // } 306 307 // override 308 // ChannelImpl pause() { 309 // synchronized (PgSubscriberImpl.this) { 310 // paused = true; 311 // } 312 // return this; 313 // } 314 315 // override 316 // ChannelImpl resume() { 317 // synchronized (PgSubscriberImpl.this) { 318 // paused = false; 319 // } 320 // return this; 321 // } 322 323 // // Since Vert.x 3.6.0 : todo 324 // ReadStream!(String) fetch(long amount) { 325 // throw new UnsupportedOperationException(); 326 // } 327 // } 328 329 // override 330 // void close() { 331 // synchronized (PgSubscriberImpl.this) { 332 // if (!closed) { 333 // closed = true; 334 // if (conn !is null) { 335 // conn.close(); 336 // } 337 // } 338 // } 339 // } 340 341 // override 342 // PgChannel channel(String name) { 343 // return new ChannelImpl(name); 344 // } 345 // }