1 /* 2 * Copyright (C) 2019, HuntLabs 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 * 16 */ 17 18 module hunt.database.base.impl.RowStreamImpl; 19 20 import hunt.database.base.impl.PreparedQueryImpl; 21 22 import hunt.database.base.Cursor; 23 import hunt.database.base.Common; 24 import hunt.database.base.RowSet; 25 import hunt.database.base.RowStream; 26 import hunt.database.base.Row; 27 import hunt.database.base.Tuple; 28 import hunt.database.base.AsyncResult; 29 30 import hunt.Exceptions; 31 32 import std.conv; 33 import std.range; 34 35 class RowStreamImpl : RowStream!(Row) { // , RowSetHandler 36 37 private PreparedQueryImpl ps; 38 private int _fetch; 39 private Tuple params; 40 41 private VoidHandler _endHandler; 42 private EventHandler!(Row) rowHandler; 43 private EventHandler!(Throwable) _exceptionHandler; 44 private long demand; 45 private bool emitting; 46 private Cursor cursor; 47 48 private InputRange!(Row) result; 49 50 this(PreparedQueryImpl ps, int fetch, Tuple params) { 51 this.ps = ps; 52 this._fetch = fetch; 53 this.params = params; 54 this.demand = long.max; 55 } 56 57 override 58 RowStream!(Row) exceptionHandler(ExceptionHandler handler) { 59 _exceptionHandler = handler; 60 return this; 61 } 62 63 override 64 RowStream!(Row) handler(EventHandler!(Row) handler) { 65 Cursor c; 66 synchronized (this) { 67 if (handler !is null) { 68 if (cursor is null) { 69 rowHandler = handler; 70 c = cursor = ps.cursor(params); 71 } else { 72 throw new UnsupportedOperationException("Handle me gracefully"); 73 } 74 } else { 75 if (cursor !is null) { 76 cursor = null; 77 } else { 78 rowHandler = null; 79 } 80 return this; 81 } 82 } 83 c.read(_fetch, &handle); 84 return this; 85 } 86 87 override 88 RowStream!(Row) pause() { 89 demand = 0L; 90 return this; 91 } 92 93 RowStream!(Row) fetch(long amount) { 94 if (amount < 0L) { 95 throw new IllegalArgumentException("Invalid fetch amount " ~ amount.to!string()); 96 } 97 synchronized (this) { 98 demand += amount; 99 if (demand < 0L) { 100 demand = long.max; 101 } 102 if (cursor is null) { 103 return this; 104 } 105 } 106 checkPending(); 107 return this; 108 } 109 110 override 111 RowStream!(Row) resume() { 112 return fetch(long.max); 113 } 114 115 override 116 RowStream!(Row) endHandler(VoidHandler handler) { 117 _endHandler = handler; 118 return this; 119 } 120 121 // override 122 void handle(AsyncResult!(RowSet) ar) { 123 if (ar.failed()) { 124 ExceptionHandler handler; 125 synchronized (this) { 126 cursor = null; 127 handler = _exceptionHandler; 128 } 129 if (handler !is null) { 130 handler(ar.cause()); 131 } 132 } else { 133 result = ar.result().iterator(); 134 checkPending(); 135 } 136 } 137 138 // override 139 void close() { 140 close(null); 141 } 142 143 // override 144 void close(AsyncVoidHandler completionHandler) { 145 Cursor c; 146 synchronized (this) { 147 if ((c = cursor) is null) { 148 return; 149 } 150 cursor = null; 151 } 152 c.close(completionHandler); 153 } 154 155 private void checkPending() { 156 synchronized (this) { 157 if (emitting) { 158 return; 159 } 160 emitting = true; 161 } 162 implementationMissing(false); 163 // while (true) { 164 // synchronized (this) { 165 // if (demand == 0L || result is null) { 166 // emitting = false; 167 // break; 168 // } 169 // EventHandler!(Row) handler; 170 // Object event; 171 // if (result.hasNext()) { 172 // handler = rowHandler; 173 // event = result.next(); 174 // if (demand != long.max) { 175 // demand--; 176 // } 177 // } else { 178 // result = null; 179 // emitting = false; 180 // if (cursor.hasMore()) { 181 // cursor.read(_fetch, this); 182 // break; 183 // } else { 184 // cursor = null; 185 // handler = _endHandler; 186 // event = null; 187 // } 188 // } 189 // if (handler !is null) { 190 // handler(event); 191 // } 192 // } 193 // } 194 } 195 }