1 /*
2  * Copyright (C) 2019, HuntLabs
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17 
18 module hunt.database.base.impl.RowStreamImpl;
19 
20 import hunt.database.base.impl.PreparedQueryImpl;
21 
22 import hunt.database.base.Cursor;
23 import hunt.database.base.Common;
24 import hunt.database.base.RowSet;
25 import hunt.database.base.RowStream;
26 import hunt.database.base.Row;
27 import hunt.database.base.Tuple;
28 import hunt.database.base.AsyncResult;
29 
30 import hunt.Exceptions;
31 
32 import std.conv;
33 import std.range;
34 
35 class RowStreamImpl : RowStream!(Row) { // , RowSetHandler 
36 
37     private PreparedQueryImpl ps;
38     private int _fetch;
39     private Tuple params;
40 
41     private VoidHandler _endHandler;
42     private EventHandler!(Row) rowHandler;
43     private EventHandler!(Throwable) _exceptionHandler;
44     private long demand;
45     private bool emitting;
46     private Cursor cursor;
47 
48     private InputRange!(Row) result;
49 
50     this(PreparedQueryImpl ps, int fetch, Tuple params) {
51         this.ps = ps;
52         this._fetch = fetch;
53         this.params = params;
54         this.demand = long.max;
55     }
56 
57     override
58     RowStream!(Row) exceptionHandler(ExceptionHandler handler) {
59         _exceptionHandler = handler;
60         return this;
61     }
62 
63     override
64     RowStream!(Row) handler(EventHandler!(Row) handler) {
65         Cursor c;
66         synchronized (this) {
67             if (handler !is null) {
68                 if (cursor is null) {
69                     rowHandler = handler;
70                     c = cursor = ps.cursor(params);
71                 } else {
72                     throw new UnsupportedOperationException("Handle me gracefully");
73                 }
74             } else {
75                 if (cursor !is null) {
76                     cursor = null;
77                 } else {
78                     rowHandler = null;
79                 }
80                 return this;
81             }
82         }
83         c.read(_fetch, &handle);
84         return this;
85     }
86 
87     override
88     RowStream!(Row) pause() {
89         demand = 0L;
90         return this;
91     }
92 
93     RowStream!(Row) fetch(long amount) {
94         if (amount < 0L) {
95             throw new IllegalArgumentException("Invalid fetch amount " ~ amount.to!string());
96         }
97         synchronized (this) {
98             demand += amount;
99             if (demand < 0L) {
100                 demand = long.max;
101             }
102             if (cursor is null) {
103                 return this;
104             }
105         }
106         checkPending();
107         return this;
108     }
109 
110     override
111     RowStream!(Row) resume() {
112         return fetch(long.max);
113     }
114 
115     override
116     RowStream!(Row) endHandler(VoidHandler handler) {
117         _endHandler = handler;
118         return this;
119     }
120 
121     // override
122     void handle(AsyncResult!(RowSet) ar) {
123         if (ar.failed()) {
124             ExceptionHandler handler;
125             synchronized (this) {
126                 cursor = null;
127                 handler = _exceptionHandler;
128             }
129             if (handler !is null) {
130                 handler(ar.cause());
131             }
132         } else {
133             result = ar.result().iterator();
134             checkPending();
135         }
136     }
137 
138     // override
139     void close() {
140         close(null);
141     }
142 
143     // override
144     void close(AsyncVoidHandler completionHandler) {
145         Cursor c;
146         synchronized (this) {
147             if ((c = cursor) is null) {
148                 return;
149             }
150             cursor = null;
151         }
152         c.close(completionHandler);
153     }
154 
155     private void checkPending() {
156         synchronized (this) {
157             if (emitting) {
158                 return;
159             }
160             emitting = true;
161         }
162         implementationMissing(false);
163         // while (true) {
164         //     synchronized (this) {
165         //         if (demand == 0L || result is null) {
166         //             emitting = false;
167         //             break;
168         //         }
169         //         EventHandler!(Row) handler;
170         //         Object event;
171         //         if (result.hasNext()) {
172         //             handler = rowHandler;
173         //             event = result.next();
174         //             if (demand != long.max) {
175         //                 demand--;
176         //             }
177         //         } else {
178         //             result = null;
179         //             emitting = false;
180         //             if (cursor.hasMore()) {
181         //                 cursor.read(_fetch, this);
182         //                 break;
183         //             } else {
184         //                 cursor = null;
185         //                 handler = _endHandler;
186         //                 event = null;
187         //             }
188         //         }
189         //         if (handler !is null) {
190         //             handler(event);
191         //         }
192         //     }
193         // }
194     }
195 }