1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 /**
21  * A non-blocking server implementation that operates a set of I/O threads (by
22  * default only one) and either does processing »in-line« or off-loads it to a
23  * task pool.
24  *
25  * It *requires* TFramedTransport to be used on the client side, as it expects
26  * a 4 byte length indicator and writes out responses using the same framing.
27  *
28  * Because I/O is done asynchronous/event based, unfortunately
29  * TServerTransport can't be used.
30  *
31  * This implementation is based on the C++ one, with the exception of request
32  * timeouts and the drain task queue overload handling strategy not being
33  * implemented yet.
34  */
35 // This really should use a D non-blocking I/O library, once one becomes
36 // available.
37 module thrift.server.nonblocking;
38 version(NeedTServer):
39 import core.atomic : atomicLoad, atomicStore, atomicOp;
40 import core.exception : onOutOfMemoryError;
41 import core.memory : GC;
42 import core.sync.mutex;
43 import core.stdc.stdlib : free, realloc;
44 import core.time : Duration, dur;
45 import core.thread : Thread, ThreadGroup;
46 import deimos.event2.event;
47 import std.array : empty;
48 import std.conv : emplace, to;
49 import std.exception : enforce;
50 import std.parallelism : TaskPool, task;
51 import std.socket : Socket, socketPair, SocketAcceptException,
52   SocketException, TcpSocket;
53 import std.variant : Variant;
54 import thrift.base;
55 import thrift.internal.endian;
56 import thrift.internal.socket;
57 import thrift.internal.traits;
58 import thrift.protocol.base;
59 import thrift.protocol.binary;
60 import thrift.protocol.processor;
61 import thrift.server.base;
62 import thrift.server.transport.socket;
63 import thrift.transport.base;
64 import thrift.transport.memory;
65 import thrift.transport.range;
66 import thrift.transport.socket;
67 import thrift.util.cancellation;
68 
69 /**
70  * Possible actions taken on new incoming connections when the server is
71  * overloaded.
72  */
73 enum TOverloadAction {
74   /// Do not take any special actions while the server is overloaded, just
75   /// continue accepting connections.
76   NONE,
77 
78   /// Immediately drop new connections after they have been accepted if the
79   /// server is overloaded.
80   CLOSE_ON_ACCEPT
81 }
82 
83 ///
84 class TNonblockingServer : TServer {
85   ///
86   this(TProcessor processor, ushort port, TTransportFactory transportFactory,
87     TProtocolFactory protocolFactory, TaskPool taskPool = null
88   ) {
89     this(new TSingletonProcessorFactory(processor), port, transportFactory,
90       transportFactory, protocolFactory, protocolFactory, taskPool);
91   }
92 
93   ///
94   this(TProcessorFactory processorFactory, ushort port,
95     TTransportFactory transportFactory, TProtocolFactory protocolFactory,
96     TaskPool taskPool = null
97   ) {
98     this(processorFactory, port, transportFactory, transportFactory,
99       protocolFactory, protocolFactory, taskPool);
100   }
101 
102   ///
103   this(
104     TProcessor processor,
105     ushort port,
106     TTransportFactory inputTransportFactory,
107     TTransportFactory outputTransportFactory,
108     TProtocolFactory inputProtocolFactory,
109     TProtocolFactory outputProtocolFactory,
110     TaskPool taskPool = null
111   ) {
112     this(new TSingletonProcessorFactory(processor), port,
113       inputTransportFactory, outputTransportFactory,
114       inputProtocolFactory, outputProtocolFactory, taskPool);
115   }
116 
117   ///
118   this(
119     TProcessorFactory processorFactory,
120     ushort port,
121     TTransportFactory inputTransportFactory,
122     TTransportFactory outputTransportFactory,
123     TProtocolFactory inputProtocolFactory,
124     TProtocolFactory outputProtocolFactory,
125     TaskPool taskPool = null
126   ) {
127     super(processorFactory, null, inputTransportFactory,
128       outputTransportFactory, inputProtocolFactory, outputProtocolFactory);
129     port_ = port;
130 
131     this.taskPool = taskPool;
132 
133     connectionMutex_ = new Mutex;
134 
135     connectionStackLimit = DEFAULT_CONNECTION_STACK_LIMIT;
136     maxActiveProcessors = DEFAULT_MAX_ACTIVE_PROCESSORS;
137     maxConnections = DEFAULT_MAX_CONNECTIONS;
138     overloadHysteresis = DEFAULT_OVERLOAD_HYSTERESIS;
139     overloadAction = DEFAULT_OVERLOAD_ACTION;
140     writeBufferDefaultSize = DEFAULT_WRITE_BUFFER_DEFAULT_SIZE;
141     idleReadBufferLimit = DEFAULT_IDLE_READ_BUFFER_LIMIT;
142     idleWriteBufferLimit = DEFAULT_IDLE_WRITE_BUFFER_LIMIT;
143     resizeBufferEveryN = DEFAULT_RESIZE_BUFFER_EVERY_N;
144     maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
145     numIOThreads_ = DEFAULT_NUM_IO_THREADS;
146   }
147 
148   override void serve(TCancellation cancellation = null) {
149     if (cancellation && cancellation.triggered) return;
150 
151     // Initialize the listening socket.
152     // TODO: SO_KEEPALIVE, TCP_LOW_MIN_RTO, etc.
153     listenSocket_ = makeSocketAndListen(port_, TServerSocket.ACCEPT_BACKLOG,
154       BIND_RETRY_LIMIT, BIND_RETRY_DELAY, 0, 0, ipv6Only_);
155     listenSocket_.blocking = false;
156 
157     logInfo("Using %s I/O thread(s).", numIOThreads_);
158     if (taskPool_) {
159       logInfo("Using task pool with size: %s.", numIOThreads_, taskPool_.size);
160     }
161 
162     assert(numIOThreads_ > 0);
163     assert(ioLoops_.empty);
164     foreach (id; 0 .. numIOThreads_) {
165       // The IO loop on the first IO thread (this thread, i.e. the one serve()
166       // is called from) also accepts new connections.
167       auto listenSocket = (id == 0 ? listenSocket_ : null);
168       ioLoops_ ~= new IOLoop(this, listenSocket);
169     }
170 
171     if (cancellation) {
172       cancellation.triggering.addCallback({
173         foreach (i, loop; ioLoops_) loop.stop();
174 
175         // Stop accepting new connections right away.
176         listenSocket_.close();
177         listenSocket_ = null;
178       });
179     }
180 
181     // Start the IO helper threads for all but the first loop, which we will run
182     // ourselves. Note that the threads run forever, only terminating if stop()
183     // is called.
184     auto threads = new ThreadGroup();
185     foreach (loop; ioLoops_[1 .. $]) {
186       auto t = new Thread(&loop.run);
187       threads.add(t);
188       t.start();
189     }
190 
191     if (eventHandler) eventHandler.preServe();
192 
193     // Run the primary (listener) IO thread loop in our main thread; this will
194     // block until the server is shutting down.
195     ioLoops_[0].run();
196 
197     // Ensure all threads are finished before leaving serve().
198     threads.joinAll();
199 
200     ioLoops_ = null;
201   }
202 
203   /**
204    * Returns the number of currently active connections, i.e. open sockets.
205    */
206   size_t numConnections() const @property {
207     return numConnections_;
208   }
209 
210   /**
211    * Returns the number of connection objects allocated, but not in use.
212    */
213   size_t numIdleConnections() const @property {
214     return connectionStack_.length;
215   }
216 
217   /**
218    * Return count of number of connections which are currently processing.
219    *
220    * This is defined as a connection where all data has been received, and the
221    * processor was invoked but has not yet completed.
222    */
223   size_t numActiveProcessors() const @property {
224     return numActiveProcessors_;
225   }
226 
227   /// Number of bind() retries.
228   enum BIND_RETRY_LIMIT = 0;
229 
230   /// Duration between bind() retries.
231   enum BIND_RETRY_DELAY = dur!"hnsecs"(0);
232 
233   /// Whether to listen on IPv6 only, if IPv6 support is detected
234   // (default: false).
235   void ipv6Only(bool value) @property {
236     ipv6Only_ = value;
237   }
238 
239   /**
240    * The task pool to use for processing requests. If null, no additional
241    * threads are used and request are processed »inline«.
242    *
243    * Can safely be set even when the server is already running.
244    */
245   TaskPool taskPool() @property {
246     return taskPool_;
247   }
248 
249   /// ditto
250   void taskPool(TaskPool pool) @property {
251     taskPool_ = pool;
252   }
253 
254   /**
255    * Hysteresis for overload state.
256    *
257    * This is the fraction of the overload value that needs to be reached
258    * before the overload state is cleared. It must be between 0 and 1,
259    * practical choices probably lie between 0.5 and 0.9.
260    */
261   double overloadHysteresis() const @property {
262     return overloadHysteresis_;
263   }
264 
265   /// Ditto
266   void overloadHysteresis(double value) @property {
267     enforce(0 < value && value <= 1,
268       "Invalid value for overload hysteresis: " ~ to!string(value));
269     overloadHysteresis_ = value;
270   }
271 
272   /// Ditto
273   enum DEFAULT_OVERLOAD_HYSTERESIS = 0.8;
274 
275   /**
276    * The action which will be taken on overload.
277    */
278   TOverloadAction overloadAction;
279 
280   /// Ditto
281   enum DEFAULT_OVERLOAD_ACTION = TOverloadAction.NONE;
282 
283   /**
284    * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
285    * and found to be exceeded, reinitialized) to this size.
286    */
287   size_t writeBufferDefaultSize;
288 
289   /// Ditto
290   enum size_t DEFAULT_WRITE_BUFFER_DEFAULT_SIZE = 1024;
291 
292   /**
293    * Max read buffer size for an idle Connection. When we place an idle
294    * Connection into connectionStack_ or on every resizeBufferEveryN_ calls,
295    * we will free the buffer (such that it will be reinitialized by the next
296    * received frame) if it has exceeded this limit. 0 disables this check.
297    */
298   size_t idleReadBufferLimit;
299 
300   /// Ditto
301   enum size_t DEFAULT_IDLE_READ_BUFFER_LIMIT = 1024;
302 
303   /**
304    * Max write buffer size for an idle connection.  When we place an idle
305    * Connection into connectionStack_ or on every resizeBufferEveryN_ calls,
306    * we ensure that its write buffer is <= to this size; otherwise we
307    * replace it with a new one of writeBufferDefaultSize_ bytes to ensure that
308    * idle connections don't hog memory. 0 disables this check.
309    */
310   size_t idleWriteBufferLimit;
311 
312   /// Ditto
313   enum size_t DEFAULT_IDLE_WRITE_BUFFER_LIMIT = 1024;
314 
315   /**
316    * Every N calls we check the buffer size limits on a connected Connection.
317    * 0 disables (i.e. the checks are only done when a connection closes).
318    */
319   uint resizeBufferEveryN;
320 
321   /// Ditto
322   enum uint DEFAULT_RESIZE_BUFFER_EVERY_N = 512;
323 
324   /// Limit for how many Connection objects to cache.
325   size_t connectionStackLimit;
326 
327   /// Ditto
328   enum size_t DEFAULT_CONNECTION_STACK_LIMIT = 1024;
329 
330   /// Limit for number of open connections before server goes into overload
331   /// state.
332   size_t maxConnections;
333 
334   /// Ditto
335   enum size_t DEFAULT_MAX_CONNECTIONS = int.max;
336 
337   /// Limit for number of connections processing or waiting to process
338   size_t maxActiveProcessors;
339 
340   /// Ditto
341   enum size_t DEFAULT_MAX_ACTIVE_PROCESSORS = int.max;
342 
343   /// Maximum frame size, in bytes.
344   ///
345   /// If a client tries to send a message larger than this limit, its
346   /// connection will be closed. This helps to avoid allocating huge buffers
347   /// on bogous input.
348   uint maxFrameSize;
349 
350   /// Ditto
351   enum uint DEFAULT_MAX_FRAME_SIZE = 256 * 1024 * 1024;
352 
353 
354   size_t numIOThreads() @property {
355     return numIOThreads_;
356   }
357 
358   void numIOThreads(size_t value) @property {
359     enforce(value >= 1, new TException("Must use at least one I/O thread."));
360     numIOThreads_ = value;
361   }
362 
363   enum DEFAULT_NUM_IO_THREADS = 1;
364 
365 private:
366   /**
367    * C callback wrapper around acceptConnections(). Expects the custom argument
368    * to be the this pointer of the associated server instance.
369    */
370   extern(C) static void acceptConnectionsCallback(int fd, short which,
371     void* serverThis
372   ) {
373     (cast(TNonblockingServer)serverThis).acceptConnections(fd, which);
374   }
375 
376   /**
377    * Called by libevent (IO loop 0/serve() thread only) when something
378    * happened on the listening socket.
379    */
380   void acceptConnections(int fd, short eventFlags) {
381     if (atomicLoad(ioLoops_[0].shuttingDown_)) return;
382 
383     assert(!!listenSocket_,
384       "Server should be shutting down if listen socket is null.");
385     assert(fd == listenSocket_.handle);
386     assert(eventFlags & EV_READ);
387 
388     // Accept as many new clients as possible, even though libevent signaled
389     // only one. This helps the number of calls into libevent space.
390     while (true) {
391       // It is lame to use exceptions for regular control flow (failing is
392       // excepted due to non-blocking mode of operation), but that's the
393       // interface std.socket offers…
394       Socket clientSocket;
395       try {
396         clientSocket = listenSocket_.accept();
397       } catch (SocketAcceptException e) {
398         if (e.errorCode != WOULD_BLOCK_ERRNO) {
399           logError("Error accepting connection: %s", e);
400         }
401         break;
402       }
403 
404       // If the server is overloaded, this is the point to take the specified
405       // action.
406       if (overloadAction != TOverloadAction.NONE && checkOverloaded()) {
407         nConnectionsDropped_++;
408         nTotalConnectionsDropped_++;
409         if (overloadAction == TOverloadAction.CLOSE_ON_ACCEPT) {
410           clientSocket.close();
411           return;
412         }
413       }
414 
415       try {
416         clientSocket.blocking = false;
417       } catch (SocketException e) {
418         logError("Couldn't set client socket to non-blocking mode: %s", e);
419         clientSocket.close();
420         return;
421       }
422 
423       // Create a new Connection for this client socket.
424       Connection conn = void;
425       IOLoop loop = void;
426       bool thisThread = void;
427       synchronized (connectionMutex_) {
428         // Assign an I/O loop to the connection (round-robin).
429         assert(nextIOLoop_ >= 0);
430         assert(nextIOLoop_ < ioLoops_.length);
431         auto selectedThreadIdx = nextIOLoop_;
432         nextIOLoop_ = (nextIOLoop_ + 1) % ioLoops_.length;
433 
434         loop = ioLoops_[selectedThreadIdx];
435         thisThread = (selectedThreadIdx == 0);
436 
437         // Check the connection stack to see if we can re-use an existing one.
438         if (connectionStack_.empty) {
439           ++numConnections_;
440           conn = new Connection(clientSocket, loop);
441 
442           // Make sure the connection does not get collected while it is active,
443           // i.e. hooked up with libevent.
444           GC.addRoot(cast(void*)conn);
445         } else {
446           conn = connectionStack_[$ - 1];
447           connectionStack_ = connectionStack_[0 .. $ - 1];
448           connectionStack_.assumeSafeAppend();
449           conn.init(clientSocket, loop);
450         }
451       }
452 
453       loop.addConnection();
454 
455       // Either notify the ioThread that is assigned this connection to
456       // start processing, or if it is us, we'll just ask this
457       // connection to do its initial state change here.
458       //
459       // (We need to avoid writing to our own notification pipe, to
460       // avoid possible deadlocks if the pipe is full.)
461       if (thisThread) {
462         conn.transition();
463       } else {
464         loop.notifyCompleted(conn);
465       }
466     }
467   }
468 
469   /// Increment the count of connections currently processing.
470   void incrementActiveProcessors() {
471     atomicOp!"+="(numActiveProcessors_, 1);
472   }
473 
474   /// Decrement the count of connections currently processing.
475   void decrementActiveProcessors() {
476     assert(numActiveProcessors_ > 0);
477     atomicOp!"-="(numActiveProcessors_, 1);
478   }
479 
480   /**
481    * Determines if the server is currently overloaded.
482    *
483    * If the number of open connections or »processing« connections is over the
484    * respective limit, the server will enter overload handling mode and a
485    * warning will be logged. If below values are below the hysteresis curve,
486    * this will cause the server to exit it again.
487    *
488    * Returns: Whether the server is currently overloaded.
489    */
490   bool checkOverloaded() {
491     auto activeConnections = numConnections_ - connectionStack_.length;
492     if (numActiveProcessors_ > maxActiveProcessors ||
493         activeConnections > maxConnections) {
494       if (!overloaded_) {
495         logInfo("Entering overloaded state.");
496         overloaded_ = true;
497       }
498     } else {
499       if (overloaded_ &&
500         (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors) &&
501         (activeConnections <= overloadHysteresis_ * maxConnections))
502       {
503         logInfo("Exiting overloaded state, %s connection(s) dropped (% total).",
504           nConnectionsDropped_, nTotalConnectionsDropped_);
505         nConnectionsDropped_ = 0;
506         overloaded_ = false;
507       }
508     }
509 
510     return overloaded_;
511   }
512 
513   /**
514    * Marks a connection as inactive and either puts it back into the
515    * connection pool or leaves it for garbage collection.
516    */
517   void disposeConnection(Connection connection) {
518     synchronized (connectionMutex_) {
519       if (!connectionStackLimit ||
520         (connectionStack_.length < connectionStackLimit))
521       {
522         connection.checkIdleBufferLimit(idleReadBufferLimit,
523           idleWriteBufferLimit);
524         connectionStack_ ~= connection;
525       } else {
526         assert(numConnections_ > 0);
527         --numConnections_;
528 
529         // Leave the connection object for collection now.
530         GC.removeRoot(cast(void*)connection);
531       }
532     }
533   }
534 
535   /// Socket used to listen for connections and accepting them.
536   Socket listenSocket_;
537 
538   /// Port to listen on.
539   ushort port_;
540 
541   /// Whether to listen on IPv6 only.
542   bool ipv6Only_;
543 
544   /// The total number of connections existing, both active and idle.
545   size_t numConnections_;
546 
547   /// The number of connections which are currently waiting for the processor
548   /// to return.
549   shared size_t numActiveProcessors_;
550 
551   /// Hysteresis for leaving overload state.
552   double overloadHysteresis_;
553 
554   /// Whether the server is currently overloaded.
555   bool overloaded_;
556 
557   /// Number of connections dropped since the server entered the current
558   /// overloaded state.
559   uint nConnectionsDropped_;
560 
561   /// Number of connections dropped due to overload since the server started.
562   ulong nTotalConnectionsDropped_;
563 
564   /// The task pool used for processing requests.
565   TaskPool taskPool_;
566 
567   /// Number of IO threads this server will use (>= 1).
568   size_t numIOThreads_;
569 
570   /// The IOLoops among which socket handling work is distributed.
571   IOLoop[] ioLoops_;
572 
573   /// The index of the loop in ioLoops_ which will handle the next accepted
574   /// connection.
575   size_t nextIOLoop_;
576 
577   /// All the connection objects which have been created but are not currently
578   /// in use. When a connection is closed, it it placed here to enable object
579   /// (resp. buffer) reuse.
580   Connection[] connectionStack_;
581 
582   /// This mutex protects the connection stack.
583   Mutex connectionMutex_;
584 }
585 
586 private {
587   /*
588    * Encapsulates a libevent event loop.
589    *
590    * The design is a bit of a mess, since the first loop is actually run on the
591    * server thread itself and is special because it is the only instance for
592    * which listenSocket_ is not null.
593    */
594   final class IOLoop {
595     /**
596      * Creates a new instance and set up the event base.
597      *
598      * If listenSocket is not null, the thread will also accept new
599      * connections itself.
600      */
601     this(TNonblockingServer server, Socket listenSocket) {
602       server_ = server;
603       listenSocket_ = listenSocket;
604       initMutex_ = new Mutex;
605     }
606 
607     /**
608      * Runs the event loop; only returns after a call to stop().
609      */
610     void run() {
611       assert(!atomicLoad(initialized_), "IOLoop already running?!");
612 
613       synchronized (initMutex_) {
614         if (atomicLoad(shuttingDown_)) return;
615         atomicStore(initialized_, true);
616 
617         assert(!eventBase_);
618         eventBase_ = event_base_new();
619 
620         if (listenSocket_) {
621           // Log the libevent version and backend.
622           logInfo("libevent version %s, using method %s.",
623             to!string(event_get_version()), to!string(event_base_get_method(eventBase_)));
624 
625           // Register the event for the listening socket.
626           listenEvent_ = event_new(eventBase_, cast(int)listenSocket_.handle,
627             cast(short) (EV_READ | EV_PERSIST | EV_ET),
628             assumeNothrow(&TNonblockingServer.acceptConnectionsCallback),
629             cast(void*)server_);
630           if (event_add(listenEvent_, null) == -1) {
631             throw new TException("event_add for the listening socket event failed.");
632           }
633         }
634 
635         auto pair = socketPair();
636         foreach (s; pair) s.blocking = false;
637         completionSendSocket_ = pair[0];
638         completionReceiveSocket_ = pair[1];
639 
640         // Register an event for the task completion notification socket.
641         completionEvent_ = event_new(eventBase_, cast(int)completionReceiveSocket_.handle,
642           cast(short)(EV_READ | EV_PERSIST | EV_ET), assumeNothrow(&completedCallback),
643           cast(void*)this);
644 
645         if (event_add(completionEvent_, null) == -1) {
646           throw new TException("event_add for the notification socket failed.");
647         }
648       }
649 
650       // Run libevent engine, returns only after stop().
651       event_base_dispatch(eventBase_);
652 
653       if (listenEvent_) {
654         event_free(listenEvent_);
655         listenEvent_ = null;
656       }
657 
658       event_free(completionEvent_);
659       completionEvent_ = null;
660 
661       completionSendSocket_.close();
662       completionSendSocket_ = null;
663 
664       completionReceiveSocket_.close();
665       completionReceiveSocket_ = null;
666 
667       event_base_free(eventBase_);
668       eventBase_ = null;
669 
670       atomicStore(shuttingDown_, false);
671 
672       initialized_ = false;
673     }
674 
675     /**
676      * Adds a new connection handled by this loop.
677      */
678     void addConnection() {
679       ++numActiveConnections_;
680     }
681 
682     /**
683      * Disposes a connection object (typically after it has been closed).
684      */
685     void disposeConnection(Connection conn) {
686       server_.disposeConnection(conn);
687       assert(numActiveConnections_ > 0);
688       --numActiveConnections_;
689       if (numActiveConnections_ == 0) {
690         if (atomicLoad(shuttingDown_)) {
691           event_base_loopbreak(eventBase_);
692         }
693       }
694     }
695 
696     /**
697      * Notifies the event loop that the current step (initialization,
698      * processing of a request) on a certain connection has been completed.
699      *
700      * This function is thread-safe, but should never be called from the
701      * thread running the loop itself.
702      */
703     void notifyCompleted(Connection conn) {
704       assert(!!completionSendSocket_);
705       auto bytesSent = completionSendSocket_.send(cast(ubyte[])((&conn)[0 .. 1]));
706 
707       if (bytesSent != Connection.sizeof) {
708         logError("Sending completion notification failed, connection will " ~
709           "not be properly terminated.");
710       }
711     }
712 
713     /**
714      * Exits the event loop after all currently active connections have been
715      * closed.
716      *
717      * This function is thread-safe.
718      */
719     void stop() {
720       // There is a bug in either libevent or its documentation, having no
721       // events registered doesn't actually terminate the loop, because
722       // event_base_new() registers some internal one by calling
723       // evthread_make_base_notifiable().
724       // Due to this, we can't simply remove all events and expect the event
725       // loop to terminate. Instead, we ping the event loop using a null
726       // completion message. This way, we make sure to wake up the libevent
727       // thread if it not currently processing any connections. It will break
728       // out of the loop in disposeConnection() after the last active
729       // connection has been closed.
730       synchronized (initMutex_) {
731         atomicStore(shuttingDown_, true);
732         if (atomicLoad(initialized_)) notifyCompleted(null);
733       }
734     }
735 
736   private:
737     /**
738      * C callback to call completed() from libevent.
739      *
740      * Expects the custom argument to be the this pointer of the associated
741      * IOLoop instance.
742      */
743     extern(C) static void completedCallback(int fd, short what, void* loopThis) {
744       assert(what & EV_READ);
745       auto loop = cast(IOLoop)loopThis;
746       assert(fd == loop.completionReceiveSocket_.handle);
747       loop.completed();
748     }
749 
750     /**
751      * Reads from the completion receive socket and appropriately transitions
752      * the connections and shuts down the loop if requested.
753      */
754     void completed() {
755       Connection connection;
756       ptrdiff_t bytesRead;
757       while (true) {
758         bytesRead = completionReceiveSocket_.receive(
759           cast(ubyte[])((&connection)[0 .. 1]));
760         if (bytesRead < 0) {
761           auto errno = getSocketErrno();
762 
763           if (errno != WOULD_BLOCK_ERRNO) {
764             logError("Reading from completion socket failed, some connection " ~
765               "will never be properly terminated: %s", socketErrnoString(errno));
766           }
767         }
768 
769         if (bytesRead != Connection.sizeof) break;
770 
771         if (!connection) {
772           assert(atomicLoad(shuttingDown_));
773           if (numActiveConnections_ == 0) {
774             event_base_loopbreak(eventBase_);
775           }
776           continue;
777         }
778 
779         connection.transition();
780       }
781 
782       if (bytesRead > 0) {
783         logError("Unexpected partial read from completion socket " ~
784           "(%s bytes instead of %s).", bytesRead, Connection.sizeof);
785       }
786     }
787 
788     /// associated server
789     TNonblockingServer server_;
790 
791     /// The managed listening socket, if any.
792     Socket listenSocket_;
793 
794     /// The libevent event base for the loop.
795     event_base* eventBase_;
796 
797     /// Triggered on listen socket events.
798     event* listenEvent_;
799 
800     /// Triggered on completion receive socket events.
801     event* completionEvent_;
802 
803     /// Socket used to send completion notification messages. Paired with
804     /// completionReceiveSocket_.
805     Socket completionSendSocket_;
806 
807     /// Socket used to send completion notification messages. Paired with
808     /// completionSendSocket_.
809     Socket completionReceiveSocket_;
810 
811     /// Whether the server is currently shutting down (i.e. the cancellation has
812     /// been triggered, but not all client connections have been closed yet).
813     shared bool shuttingDown_;
814 
815     /// The number of currently active client connections.
816     size_t numActiveConnections_;
817 
818     /// Guards loop startup so that the loop can be reliably shut down even if
819     /// another thread has just started to execute run(). Locked during
820     /// initialization in run(). When unlocked, the completion mechanism is
821     /// expected to be fully set up.
822     Mutex initMutex_;
823     shared bool initialized_; /// Ditto
824   }
825 
826   /*
827    * I/O states a socket can be in.
828    */
829   enum SocketState {
830     RECV_FRAME_SIZE, /// The frame size is received.
831     RECV, /// The payload is received.
832     SEND /// The response is written back out.
833   }
834 
835   /*
836    * States a connection can be in.
837    */
838   enum ConnectionState {
839     INIT, /// The connection will be initialized.
840     READ_FRAME_SIZE, /// The four frame size bytes are being read.
841     READ_REQUEST, /// The request payload itself is being read.
842     WAIT_PROCESSOR, /// The connection waits for the processor to finish.
843     SEND_RESULT /// The result is written back out.
844   }
845 
846   /*
847    * A connection that is handled via libevent.
848    *
849    * Data received is buffered until the request is complete (returning back to
850    * libevent if not), at which point the processor is invoked.
851    */
852   final class Connection {
853     /**
854      * Constructs a new instance.
855      *
856      * To reuse a connection object later on, the init() function can be used
857      * to the same effect on the internal state.
858      */
859     this(Socket socket, IOLoop loop) {
860       // The input and output transport objects are reused between clients
861       // connections, so initialize them here rather than in init().
862       inputTransport_ = new TInputRangeTransport!(ubyte[])([]);
863       outputTransport_ = new TMemoryBuffer(loop.server_.writeBufferDefaultSize);
864 
865       init(socket, loop);
866     }
867 
868     /**
869      * Initializes the connection.
870      *
871      * Params:
872      *   socket = The socket to work on.
873      *   eventFlags = Any flags to pass to libevent.
874      *   s = The server this connection is part of.
875      */
876     void init(Socket socket, IOLoop loop) {
877       import thrift.server.base;
878       // TODO: This allocation could be avoided.
879       socket_ = new TSocket(socket);
880 
881       loop_ = loop;
882       server_ = loop_.server_;
883       connState_ = ConnectionState.INIT;
884       eventFlags_ = 0;
885 
886       readBufferPos_ = 0;
887       readWant_ = 0;
888 
889       writeBuffer_ = null;
890       writeBufferPos_ = 0;
891       largestWriteBufferSize_ = 0;
892 
893       socketState_ = SocketState.RECV_FRAME_SIZE;
894       callsSinceResize_ = 0;
895 
896       factoryInputTransport_ =
897         server_.inputTransportFactory_.getTransport(inputTransport_);
898       factoryOutputTransport_ =
899         server_.outputTransportFactory_.getTransport(outputTransport_);
900 
901       inputProtocol_ =
902         server_.inputProtocolFactory_.getProtocol(factoryInputTransport_);
903       outputProtocol_ =
904         server_.outputProtocolFactory_.getProtocol(factoryOutputTransport_);
905 
906       if (server_.eventHandler) {
907         connectionContext_ =
908           server_.eventHandler.createContext(inputProtocol_, outputProtocol_);
909       }
910 
911       auto info = TConnectionInfo(inputProtocol_, outputProtocol_, socket_);
912       processor_ = server_.processorFactory_.getProcessor(info);
913     }
914 
915     ~this() {
916       free(readBuffer_);
917       if (event_) {
918         event_free(event_);
919         event_ = null;
920       }
921     }
922 
923     /**
924      * Check buffers against the size limits and shrink them if exceeded.
925      *
926      * Params:
927      *   readLimit = Read buffer size limit (in bytes, 0 to ignore).
928      *   writeLimit = Write buffer size limit (in bytes, 0 to ignore).
929      */
930     void checkIdleBufferLimit(size_t readLimit, size_t writeLimit) {
931       if (readLimit > 0 && readBufferSize_ > readLimit) {
932         free(readBuffer_);
933         readBuffer_ = null;
934         readBufferSize_ = 0;
935       }
936 
937       if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
938         // just start over
939         outputTransport_.reset(server_.writeBufferDefaultSize);
940         largestWriteBufferSize_ = 0;
941       }
942     }
943 
944     /**
945      * Transitions the connection to the next state.
946      *
947      * This is called e.g. when the request has been read completely or all
948      * the data has been written back.
949      */
950     void transition() {
951       assert(!!loop_);
952       assert(!!server_);
953 
954       // Switch upon the state that we are currently in and move to a new state
955       final switch (connState_) {
956         case ConnectionState.READ_REQUEST:
957           // We are done reading the request, package the read buffer into transport
958           // and get back some data from the dispatch function
959           inputTransport_.reset(readBuffer_[0 .. readBufferPos_]);
960           outputTransport_.reset();
961 
962           // Prepend four bytes of blank space to the buffer so we can
963           // write the frame size there later.
964           // Strictly speaking, we wouldn't have to write anything, just
965           // increment the TMemoryBuffer writeOffset_. This would yield a tiny
966           // performance gain.
967           ubyte[4] space = void;
968           outputTransport_.write(space);
969 
970           server_.incrementActiveProcessors();
971 
972           taskPool_ = server_.taskPool;
973           if (taskPool_) {
974             // Create a new task and add it to the task pool queue.
975             auto processingTask = task!processRequest(this);
976             connState_ = ConnectionState.WAIT_PROCESSOR;
977             taskPool_.put(processingTask);
978 
979             // We don't want to process any more data while the task is active.
980             unregisterEvent();
981             return;
982           }
983 
984           // Just process it right now if there is no task pool set.
985           processRequest(this);
986           goto case;
987         case ConnectionState.WAIT_PROCESSOR:
988           // We have now finished processing the request, set the frame size
989           // for the outputTransport_ contents and set everything up to write
990           // it out via libevent.
991           server_.decrementActiveProcessors();
992 
993           // Acquire the data written to the transport.
994           // KLUDGE: To avoid copying, we simply cast the const away and
995           // modify the internal buffer of the TMemoryBuffer – works with the
996           // current implementation, but isn't exactly beautiful.
997           writeBuffer_ = cast(ubyte[])outputTransport_.getContents();
998 
999           assert(writeBuffer_.length >= 4, "The write buffer should have " ~
1000             "least the initially added dummy length bytes.");
1001           if (writeBuffer_.length == 4) {
1002             // The request was one-way, no response to write.
1003             goto case ConnectionState.INIT;
1004           }
1005 
1006           // Write the frame size into the four bytes reserved for it.
1007           auto size = hostToNet(cast(uint)(writeBuffer_.length - 4));
1008           writeBuffer_[0 .. 4] = cast(ubyte[])((&size)[0 .. 1]);
1009 
1010           writeBufferPos_ = 0;
1011           socketState_ = SocketState.SEND;
1012           connState_ = ConnectionState.SEND_RESULT;
1013           registerEvent(EV_WRITE | EV_PERSIST);
1014 
1015           return;
1016         case ConnectionState.SEND_RESULT:
1017           // The result has been sent back to the client, we don't need the
1018           // buffers anymore.
1019           if (writeBuffer_.length > largestWriteBufferSize_) {
1020             largestWriteBufferSize_ = writeBuffer_.length;
1021           }
1022 
1023           if (server_.resizeBufferEveryN > 0 &&
1024               ++callsSinceResize_ >= server_.resizeBufferEveryN
1025           ) {
1026             checkIdleBufferLimit(server_.idleReadBufferLimit,
1027               server_.idleWriteBufferLimit);
1028             callsSinceResize_ = 0;
1029           }
1030 
1031           goto case;
1032         case ConnectionState.INIT:
1033           writeBuffer_ = null;
1034           writeBufferPos_ = 0;
1035           socketState_ = SocketState.RECV_FRAME_SIZE;
1036           connState_ = ConnectionState.READ_FRAME_SIZE;
1037           readBufferPos_ = 0;
1038           registerEvent(EV_READ | EV_PERSIST);
1039 
1040           return;
1041         case ConnectionState.READ_FRAME_SIZE:
1042           // We just read the request length, set up the buffers for reading
1043           // the payload.
1044           if (readWant_ > readBufferSize_) {
1045             // The current buffer is too small, exponentially grow the buffer
1046             // until it is big enough.
1047 
1048             if (readBufferSize_ == 0) {
1049               readBufferSize_ = 1;
1050             }
1051 
1052             auto newSize = readBufferSize_;
1053             while (readWant_ > newSize) {
1054               newSize *= 2;
1055             }
1056 
1057             auto newBuffer = cast(ubyte*)realloc(readBuffer_, newSize);
1058             if (!newBuffer) onOutOfMemoryError();
1059 
1060             readBuffer_ = newBuffer;
1061             readBufferSize_ = newSize;
1062           }
1063 
1064           readBufferPos_= 0;
1065 
1066           socketState_ = SocketState.RECV;
1067           connState_ = ConnectionState.READ_REQUEST;
1068 
1069           return;
1070       }
1071     }
1072 
1073   private:
1074     /**
1075      * C callback to call workSocket() from libevent.
1076      *
1077      * Expects the custom argument to be the this pointer of the associated
1078      * connection.
1079      */
1080     extern(C) static void workSocketCallback(int fd, short flags, void* connThis) {
1081       auto conn = cast(Connection)connThis;
1082       assert(fd == conn.socket_.socketHandle);
1083       conn.workSocket();
1084     }
1085 
1086     /**
1087      * Invoked by libevent when something happens on the socket.
1088      */
1089     void workSocket() {
1090       final switch (socketState_) {
1091         case SocketState.RECV_FRAME_SIZE:
1092           // If some bytes have already been read, they have been kept in
1093           // readWant_.
1094           auto frameSize = readWant_;
1095 
1096           try {
1097             // Read from the socket
1098             auto bytesRead = socket_.read(
1099               (cast(ubyte[])((&frameSize)[0 .. 1]))[readBufferPos_ .. $]);
1100             if (bytesRead == 0) {
1101               // Couldn't read anything, but we have been notified – client
1102               // has disconnected.
1103               close();
1104               return;
1105             }
1106 
1107             readBufferPos_ += bytesRead;
1108           } catch (TTransportException te) {
1109             logError("Failed to read frame size from client connection: %s", te);
1110             close();
1111             return;
1112           }
1113 
1114           if (readBufferPos_ < frameSize.sizeof) {
1115             // Frame size not complete yet, save the current buffer in
1116             // readWant_ so that the remaining bytes can be read later.
1117             readWant_ = frameSize;
1118             return;
1119           }
1120 
1121           auto size = netToHost(frameSize);
1122           if (size > server_.maxFrameSize) {
1123             logError("Frame size too large (%s > %s), client %s not using " ~
1124               "TFramedTransport?", size, server_.maxFrameSize,
1125               socket_.getPeerAddress().toHostNameString());
1126             close();
1127             return;
1128           }
1129           readWant_ = size;
1130 
1131           // Now we know the frame size, set everything up for reading the
1132           // payload.
1133           transition();
1134           return;
1135 
1136         case SocketState.RECV:
1137           // If we already got all the data, we should be in the SEND state.
1138           assert(readBufferPos_ < readWant_);
1139 
1140           size_t bytesRead;
1141           try {
1142             // Read as much as possible from the socket.
1143             bytesRead = socket_.read(readBuffer_[readBufferPos_ .. readWant_]);
1144           } catch (TTransportException te) {
1145             logError("Failed to read from client socket: %s", te);
1146             close();
1147             return;
1148           }
1149 
1150           if (bytesRead == 0) {
1151             // We were notified, but no bytes could be read -> the client
1152             // disconnected.
1153             close();
1154             return;
1155           }
1156 
1157           readBufferPos_ += bytesRead;
1158           assert(readBufferPos_ <= readWant_);
1159 
1160           if (readBufferPos_ == readWant_) {
1161             // The payload has been read completely, move on.
1162             transition();
1163           }
1164 
1165           return;
1166         case SocketState.SEND:
1167           assert(writeBufferPos_ <= writeBuffer_.length);
1168 
1169           if (writeBufferPos_ == writeBuffer_.length) {
1170             // Nothing left to send – this shouldn't happen, just move on.
1171             logInfo("WARNING: In send state, but no data to send.\n");
1172             transition();
1173             return;
1174           }
1175 
1176           size_t bytesSent;
1177           try {
1178             bytesSent = socket_.writeSome(writeBuffer_[writeBufferPos_ .. $]);
1179           } catch (TTransportException te) {
1180             logError("Failed to write to client socket: %s", te);
1181             close();
1182             return;
1183           }
1184 
1185           writeBufferPos_ += bytesSent;
1186           assert(writeBufferPos_ <= writeBuffer_.length);
1187 
1188           if (writeBufferPos_ == writeBuffer_.length) {
1189             // The whole response has been written out, we are done.
1190             transition();
1191           }
1192 
1193           return;
1194       }
1195     }
1196 
1197     /**
1198      * Registers a libevent event for workSocket() with the passed flags,
1199      * unregistering the previous one (if any).
1200      */
1201     void registerEvent(short eventFlags) {
1202       if (eventFlags_ == eventFlags) {
1203         // Nothing to do if flags are the same.
1204         return;
1205       }
1206 
1207       // Delete the previously existing event.
1208       unregisterEvent();
1209 
1210       eventFlags_ = eventFlags;
1211 
1212       if (eventFlags == 0) return;
1213 
1214       if (!event_) {
1215         // If the event was not already allocated, do it now.
1216         event_ = event_new(loop_.eventBase_, cast(int)socket_.socketHandle,
1217           eventFlags_, assumeNothrow(&workSocketCallback), cast(void*)this);
1218       } else {
1219         event_assign(event_, loop_.eventBase_, cast(int)socket_.socketHandle,
1220           eventFlags_, assumeNothrow(&workSocketCallback), cast(void*)this);
1221       }
1222 
1223       // Add the event
1224       if (event_add(event_, null) == -1) {
1225         logError("event_add() for client socket failed.");
1226       }
1227     }
1228 
1229     /**
1230      * Unregisters the current libevent event, if any.
1231      */
1232     void unregisterEvent() {
1233       if (event_ && eventFlags_ != 0) {
1234         eventFlags_ = 0;
1235         if (event_del(event_) == -1) {
1236           logError("event_del() for client socket failed.");
1237           return;
1238         }
1239       }
1240     }
1241 
1242     /**
1243      * Closes this connection and returns it back to the server.
1244      */
1245     void close() {
1246       unregisterEvent();
1247 
1248       if (server_.eventHandler) {
1249         server_.eventHandler.deleteContext(
1250           connectionContext_, inputProtocol_, outputProtocol_);
1251       }
1252 
1253       // Close the socket
1254       socket_.close();
1255 
1256       // close any factory produced transports.
1257       factoryInputTransport_.close();
1258       factoryOutputTransport_.close();
1259 
1260       // This connection object can now be reused.
1261       loop_.disposeConnection(this);
1262     }
1263 
1264     /// The server this connection belongs to.
1265     TNonblockingServer server_;
1266 
1267     /// The task pool used for this connection. This is cached instead of
1268     /// directly using server_.taskPool to avoid confusion if it is changed in
1269     /// another thread while the request is processed.
1270     TaskPool taskPool_;
1271 
1272     /// The I/O thread handling this connection.
1273     IOLoop loop_;
1274 
1275     /// The socket managed by this connection.
1276     TSocket socket_;
1277 
1278     /// The libevent object used for registering the workSocketCallback.
1279     event* event_;
1280 
1281     /// Libevent flags
1282     short eventFlags_;
1283 
1284     /// Socket mode
1285     SocketState socketState_;
1286 
1287     /// Application state
1288     ConnectionState connState_;
1289 
1290     /// The size of the frame to read. If still in READ_FRAME_SIZE state, some
1291     /// of the bytes might not have been written, and the value might still be
1292     /// in network byte order. An uint (not a size_t) because the frame size on
1293     /// the wire is specified as one.
1294     uint readWant_;
1295 
1296     /// The position in the read buffer, i.e. the number of payload bytes
1297     /// already received from the socket in READ_REQUEST state, resp. the
1298     /// number of size bytes in READ_FRAME_SIZE state.
1299     uint readBufferPos_;
1300 
1301     /// Read buffer
1302     ubyte* readBuffer_;
1303 
1304     /// Read buffer size
1305     size_t readBufferSize_;
1306 
1307     /// Write buffer
1308     ubyte[] writeBuffer_;
1309 
1310     /// How far through writing are we?
1311     size_t writeBufferPos_;
1312 
1313     /// Largest size of write buffer seen since buffer was constructed
1314     size_t largestWriteBufferSize_;
1315 
1316     /// Number of calls since the last time checkIdleBufferLimit has been
1317     /// invoked (see TServer.resizeBufferEveryN).
1318     uint callsSinceResize_;
1319 
1320     /// Base transports the processor reads from/writes to.
1321     TInputRangeTransport!(ubyte[]) inputTransport_;
1322     TMemoryBuffer outputTransport_;
1323 
1324     /// The actual transports passed to the processor obtained via the
1325     /// transport factory.
1326     TTransport factoryInputTransport_;
1327     TTransport factoryOutputTransport_; /// Ditto
1328 
1329     /// Input/output protocols, connected to factory{Input, Output}Transport.
1330     TProtocol inputProtocol_;
1331     TProtocol outputProtocol_; /// Ditto.
1332 
1333     /// Connection context optionally created by the server event handler.
1334     Variant connectionContext_;
1335 
1336     /// The processor used for this connection.
1337     TProcessor processor_;
1338   }
1339 }
1340 
1341 /*
1342  * The request processing function, which invokes the processor for the server
1343  * for all the RPC messages received over a connection.
1344  *
1345  * Must be public because it is passed as alias to std.parallelism.task().
1346  */
1347 void processRequest(Connection connection) {
1348   try {
1349     while (true) {
1350       with (connection) {
1351         if (server_.eventHandler) {
1352           server_.eventHandler.preProcess(connectionContext_, socket_);
1353         }
1354 
1355         if (!processor_.process(inputProtocol_, outputProtocol_,
1356           connectionContext_) || !inputProtocol_.transport.peek()
1357         ) {
1358           // Something went fundamentally wrong or there is nothing more to
1359           // process, close the connection.
1360           break;
1361         }
1362       }
1363     }
1364   } catch (TTransportException ttx) {
1365     logError("Client died: %s", ttx);
1366   } catch (Exception e) {
1367     logError("Uncaught exception: %s", e);
1368   }
1369 
1370   if (connection.taskPool_) connection.loop_.notifyCompleted(connection);
1371 }
1372 
1373 unittest {
1374   import thrift.internal.test.server;
1375 
1376   // Temporarily disable info log output in order not to spam the test results
1377   // with startup info messages.
1378   auto oldInfoLogSink = g_infoLogSink;
1379   g_infoLogSink = null;
1380   scope (exit) g_infoLogSink = oldInfoLogSink;
1381 
1382   // Test in-line processing shutdown with one as well as several I/O threads.
1383   testServeCancel!(TNonblockingServer)();
1384   testServeCancel!(TNonblockingServer)((TNonblockingServer s) {
1385     s.numIOThreads = 4;
1386   });
1387 
1388   // Test task pool processing shutdown with one as well as several I/O threads.
1389   auto tp = new TaskPool(4);
1390   tp.isDaemon = true;
1391   testServeCancel!(TNonblockingServer)((TNonblockingServer s) {
1392     s.taskPool = tp;
1393   });
1394   testServeCancel!(TNonblockingServer)((TNonblockingServer s) {
1395     s.taskPool = tp;
1396     s.numIOThreads = 4;
1397   });
1398 }