1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */19 20 /**
21 * Defines the interface used for client-side handling of asynchronous
22 * I/O operations, based on coroutines.
23 *
24 * The main piece of the »client side« (e.g. for TAsyncClient users) of the
25 * API is TFuture, which represents an asynchronously executed operation,
26 * which can have a return value, throw exceptions, and which can be waited
27 * upon.
28 *
29 * On the »implementation side«, the idea is that by using a TAsyncTransport
30 * instead of a normal TTransport and executing the work through a
31 * TAsyncManager, the same code as for synchronous I/O can be used for
32 * asynchronous operation as well, for example:
33 *
34 * ---
35 * auto socket = new TAsyncSocket(someTAsyncSocketManager(), host, port);
36 * // …
37 * socket.asyncManager.execute(socket, {
38 * SomeThriftStruct s;
39 *
40 * // Waiting for socket I/O will not block an entire thread but cause
41 * // the async manager to execute another task in the meantime, because
42 * // we are using TAsyncSocket instead of TSocket.
43 * s.read(socket);
44 *
45 * // Do something with s, e.g. set a TPromise result to it.
46 * writeln(s);
47 * });
48 * ---
49 */50 modulethrift.async.base;
51 52 importcore.time : Duration, dur;
53 importstd.socket/+ : Socket+/; // DMD @@BUG314@@54 importthrift.base;
55 importthrift.transport.base;
56 importthrift.util.cancellation;
57 58 /**
59 * Manages one or more asynchronous transport resources (e.g. sockets in the
60 * case of TAsyncSocketManager) and allows work items to be submitted for them.
61 *
62 * Implementations will typically run one or more background threads for
63 * executing the work, which is one of the reasons for a TAsyncManager to be
64 * used. Each work item is run in its own fiber and is expected to yield() away
65 * while waiting for time-consuming operations.
66 *
67 * The second important purpose of TAsyncManager is to serialize access to
68 * the transport resources – without taking care of that, e.g. issuing multiple
69 * RPC calls over the same connection in rapid succession would likely lead to
70 * more than one request being written at the same time, causing only garbage
71 * to arrive at the remote end.
72 *
73 * All methods are thread-safe.
74 */75 interfaceTAsyncManager {
76 /**
77 * Submits a work item to be executed asynchronously.
78 *
79 * Access to asnyc transports is serialized – if two work items associated
80 * with the same transport are submitted, the second delegate will not be
81 * invoked until the first has returned, even it the latter context-switches
82 * away (because it is waiting for I/O) and the async manager is idle
83 * otherwise.
84 *
85 * Optionally, a TCancellation instance can be specified. If present,
86 * triggering it will be considered a request to cancel the work item, if it
87 * is still waiting for the associated transport to become available.
88 * Delegates which are already being processed (i.e. waiting for I/O) are not
89 * affected because this would bring the connection into an undefined state
90 * (as probably half-written request or a half-read response would be left
91 * behind).
92 *
93 * Params:
94 * transport = The TAsyncTransport the work delegate will operate on. Must
95 * be associated with this TAsyncManager instance.
96 * work = The operations to execute on the given transport. Must never
97 * throw, errors should be handled in another way. nothrow semantics are
98 * difficult to enforce in combination with fibres though, so currently
99 * exceptions are just swallowed by TAsyncManager implementations.
100 * cancellation = If set, can be used to request cancellatinon of this work
101 * item if it is still waiting to be executed.
102 *
103 * Note: The work item will likely be executed in a different thread, so make
104 * sure the code it relies on is thread-safe. An exception are the async
105 * transports themselves, to which access is serialized as noted above.
106 */107 voidexecute(TAsyncTransporttransport, voiddelegate() work,
108 TCancellationcancellation = null109 ) in {
110 assert(transport.asyncManageristhis,
111 "The given transport must be associated with this TAsyncManager.");
112 }
113 114 /**
115 * Submits a delegate to be executed after a certain amount of time has
116 * passed.
117 *
118 * The actual amount of time elapsed can be higher if the async manager
119 * instance is busy and thus should not be relied on. The
120 *
121 * Params:
122 * duration = The amount of time to wait before starting to execute the
123 * work delegate.
124 * work = The code to execute after the specified amount of time has passed.
125 *
126 * Example:
127 * ---
128 * // A very basic example – usually, the actuall work item would enqueue
129 * // some async transport operation.
130 * auto asyncMangager = someAsyncManager();
131 *
132 * TFuture!int calculate() {
133 * // Create a promise and asynchronously set its value after three
134 * // seconds have passed.
135 * auto promise = new TPromise!int;
136 * asyncManager.delay(dur!"seconds"(3), {
137 * promise.succeed(42);
138 * });
139 *
140 * // Immediately return it to the caller.
141 * return promise;
142 * }
143 *
144 * // This will wait until the result is available and then print it.
145 * writeln(calculate().waitGet());
146 * ---
147 */148 voiddelay(Durationduration, voiddelegate() work);
149 150 /**
151 * Shuts down all background threads or other facilities that might have
152 * been started in order to execute work items. This function is typically
153 * called during program shutdown.
154 *
155 * If there are still tasks to be executed when the timeout expires, any
156 * currently executed work items will never receive any notifications
157 * for async transports managed by this instance, queued work items will
158 * be silently dropped, and implementations are allowed to leak resources.
159 *
160 * Params:
161 * waitFinishTimeout = If positive, waits for all work items to be
162 * finished for the specified amount of time, if negative, waits for
163 * completion without ever timing out, if zero, immediately shuts down
164 * the background facilities.
165 */166 boolstop(DurationwaitFinishTimeout = dur!"hnsecs"(-1));
167 }
168 169 /**
170 * A TTransport which uses a TAsyncManager to schedule non-blocking operations.
171 *
172 * The actual type of device is not specified; typically, implementations will
173 * depend on an interface derived from TAsyncManager to be notified of changes
174 * in the transport state.
175 *
176 * The peeking, reading, writing and flushing methods must always be called
177 * from within the associated async manager.
178 */179 interfaceTAsyncTransport : TTransport {
180 /**
181 * The TAsyncManager associated with this transport.
182 */183 TAsyncManagerasyncManager() @property;
184 }
185 186 /**
187 * A TAsyncManager providing notificiations for socket events.
188 */189 interfaceTAsyncSocketManager : TAsyncManager {
190 /**
191 * Adds a listener that is triggered once when an event of the specified type
192 * occurs, and removed afterwards.
193 *
194 * Params:
195 * socket = The socket to listen for events at.
196 * eventType = The type of the event to listen for.
197 * timeout = The period of time after which the listener will be called
198 * with TAsyncEventReason.TIMED_OUT if no event happened.
199 * listener = The delegate to call when an event happened.
200 */201 voidaddOneshotListener(Socketsocket, TAsyncEventTypeeventType,
202 Durationtimeout, TSocketEventListenerlistener);
203 204 /// Ditto205 voidaddOneshotListener(Socketsocket, TAsyncEventTypeeventType,
206 TSocketEventListenerlistener);
207 }
208 209 /**
210 * Types of events that can happen for an asynchronous transport.
211 */212 enumTAsyncEventType {
213 READ, /// New data became available to read.214 WRITE/// The transport became ready to be written to.215 }
216 217 /**
218 * The type of the delegates used to register socket event handlers.
219 */220 aliasvoiddelegate(TAsyncEventReasoncallReason) TSocketEventListener;
221 222 /**
223 * The reason a listener was called.
224 */225 enumTAsyncEventReason : byte {
226 NORMAL, /// The event listened for was triggered normally.227 TIMED_OUT/// A timeout for the event was set, and it expired.228 }