1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 /** 21 * A non-blocking server implementation that operates a set of I/O threads (by 22 * default only one) and either does processing »in-line« or off-loads it to a 23 * task pool. 24 * 25 * It *requires* TFramedTransport to be used on the client side, as it expects 26 * a 4 byte length indicator and writes out responses using the same framing. 27 * 28 * Because I/O is done asynchronous/event based, unfortunately 29 * TServerTransport can't be used. 30 * 31 * This implementation is based on the C++ one, with the exception of request 32 * timeouts and the drain task queue overload handling strategy not being 33 * implemented yet. 34 */ 35 // This really should use a D non-blocking I/O library, once one becomes 36 // available. 37 module thrift.server.nonblocking; 38 version(NeedTServer): 39 import core.atomic : atomicLoad, atomicStore, atomicOp; 40 import core.exception : onOutOfMemoryError; 41 import core.memory : GC; 42 import core.sync.mutex; 43 import core.stdc.stdlib : free, realloc; 44 import core.time : Duration, dur; 45 import core.thread : Thread, ThreadGroup; 46 import deimos.event2.event; 47 import std.array : empty; 48 import std.conv : emplace, to; 49 import std.exception : enforce; 50 import std.parallelism : TaskPool, task; 51 import std.socket : Socket, socketPair, SocketAcceptException, 52 SocketException, TcpSocket; 53 import std.variant : Variant; 54 import thrift.base; 55 import thrift.internal.endian; 56 import thrift.internal.socket; 57 import thrift.internal.traits; 58 import thrift.protocol.base; 59 import thrift.protocol.binary; 60 import thrift.protocol.processor; 61 import thrift.server.base; 62 import thrift.server.transport.socket; 63 import thrift.transport.base; 64 import thrift.transport.memory; 65 import thrift.transport.range; 66 import thrift.transport.socket; 67 import thrift.util.cancellation; 68 69 /** 70 * Possible actions taken on new incoming connections when the server is 71 * overloaded. 72 */ 73 enum TOverloadAction { 74 /// Do not take any special actions while the server is overloaded, just 75 /// continue accepting connections. 76 NONE, 77 78 /// Immediately drop new connections after they have been accepted if the 79 /// server is overloaded. 80 CLOSE_ON_ACCEPT 81 } 82 83 /// 84 class TNonblockingServer : TServer { 85 /// 86 this(TProcessor processor, ushort port, TTransportFactory transportFactory, 87 TProtocolFactory protocolFactory, TaskPool taskPool = null 88 ) { 89 this(new TSingletonProcessorFactory(processor), port, transportFactory, 90 transportFactory, protocolFactory, protocolFactory, taskPool); 91 } 92 93 /// 94 this(TProcessorFactory processorFactory, ushort port, 95 TTransportFactory transportFactory, TProtocolFactory protocolFactory, 96 TaskPool taskPool = null 97 ) { 98 this(processorFactory, port, transportFactory, transportFactory, 99 protocolFactory, protocolFactory, taskPool); 100 } 101 102 /// 103 this( 104 TProcessor processor, 105 ushort port, 106 TTransportFactory inputTransportFactory, 107 TTransportFactory outputTransportFactory, 108 TProtocolFactory inputProtocolFactory, 109 TProtocolFactory outputProtocolFactory, 110 TaskPool taskPool = null 111 ) { 112 this(new TSingletonProcessorFactory(processor), port, 113 inputTransportFactory, outputTransportFactory, 114 inputProtocolFactory, outputProtocolFactory, taskPool); 115 } 116 117 /// 118 this( 119 TProcessorFactory processorFactory, 120 ushort port, 121 TTransportFactory inputTransportFactory, 122 TTransportFactory outputTransportFactory, 123 TProtocolFactory inputProtocolFactory, 124 TProtocolFactory outputProtocolFactory, 125 TaskPool taskPool = null 126 ) { 127 super(processorFactory, null, inputTransportFactory, 128 outputTransportFactory, inputProtocolFactory, outputProtocolFactory); 129 port_ = port; 130 131 this.taskPool = taskPool; 132 133 connectionMutex_ = new Mutex; 134 135 connectionStackLimit = DEFAULT_CONNECTION_STACK_LIMIT; 136 maxActiveProcessors = DEFAULT_MAX_ACTIVE_PROCESSORS; 137 maxConnections = DEFAULT_MAX_CONNECTIONS; 138 overloadHysteresis = DEFAULT_OVERLOAD_HYSTERESIS; 139 overloadAction = DEFAULT_OVERLOAD_ACTION; 140 writeBufferDefaultSize = DEFAULT_WRITE_BUFFER_DEFAULT_SIZE; 141 idleReadBufferLimit = DEFAULT_IDLE_READ_BUFFER_LIMIT; 142 idleWriteBufferLimit = DEFAULT_IDLE_WRITE_BUFFER_LIMIT; 143 resizeBufferEveryN = DEFAULT_RESIZE_BUFFER_EVERY_N; 144 maxFrameSize = DEFAULT_MAX_FRAME_SIZE; 145 numIOThreads_ = DEFAULT_NUM_IO_THREADS; 146 } 147 148 override void serve(TCancellation cancellation = null) { 149 if (cancellation && cancellation.triggered) return; 150 151 // Initialize the listening socket. 152 // TODO: SO_KEEPALIVE, TCP_LOW_MIN_RTO, etc. 153 listenSocket_ = makeSocketAndListen(port_, TServerSocket.ACCEPT_BACKLOG, 154 BIND_RETRY_LIMIT, BIND_RETRY_DELAY, 0, 0, ipv6Only_); 155 listenSocket_.blocking = false; 156 157 logInfo("Using %s I/O thread(s).", numIOThreads_); 158 if (taskPool_) { 159 logInfo("Using task pool with size: %s.", numIOThreads_, taskPool_.size); 160 } 161 162 assert(numIOThreads_ > 0); 163 assert(ioLoops_.empty); 164 foreach (id; 0 .. numIOThreads_) { 165 // The IO loop on the first IO thread (this thread, i.e. the one serve() 166 // is called from) also accepts new connections. 167 auto listenSocket = (id == 0 ? listenSocket_ : null); 168 ioLoops_ ~= new IOLoop(this, listenSocket); 169 } 170 171 if (cancellation) { 172 cancellation.triggering.addCallback({ 173 foreach (i, loop; ioLoops_) loop.stop(); 174 175 // Stop accepting new connections right away. 176 listenSocket_.close(); 177 listenSocket_ = null; 178 }); 179 } 180 181 // Start the IO helper threads for all but the first loop, which we will run 182 // ourselves. Note that the threads run forever, only terminating if stop() 183 // is called. 184 auto threads = new ThreadGroup(); 185 foreach (loop; ioLoops_[1 .. $]) { 186 auto t = new Thread(&loop.run); 187 threads.add(t); 188 t.start(); 189 } 190 191 if (eventHandler) eventHandler.preServe(); 192 193 // Run the primary (listener) IO thread loop in our main thread; this will 194 // block until the server is shutting down. 195 ioLoops_[0].run(); 196 197 // Ensure all threads are finished before leaving serve(). 198 threads.joinAll(); 199 200 ioLoops_ = null; 201 } 202 203 /** 204 * Returns the number of currently active connections, i.e. open sockets. 205 */ 206 size_t numConnections() const @property { 207 return numConnections_; 208 } 209 210 /** 211 * Returns the number of connection objects allocated, but not in use. 212 */ 213 size_t numIdleConnections() const @property { 214 return connectionStack_.length; 215 } 216 217 /** 218 * Return count of number of connections which are currently processing. 219 * 220 * This is defined as a connection where all data has been received, and the 221 * processor was invoked but has not yet completed. 222 */ 223 size_t numActiveProcessors() const @property { 224 return numActiveProcessors_; 225 } 226 227 /// Number of bind() retries. 228 enum BIND_RETRY_LIMIT = 0; 229 230 /// Duration between bind() retries. 231 enum BIND_RETRY_DELAY = dur!"hnsecs"(0); 232 233 /// Whether to listen on IPv6 only, if IPv6 support is detected 234 // (default: false). 235 void ipv6Only(bool value) @property { 236 ipv6Only_ = value; 237 } 238 239 /** 240 * The task pool to use for processing requests. If null, no additional 241 * threads are used and request are processed »inline«. 242 * 243 * Can safely be set even when the server is already running. 244 */ 245 TaskPool taskPool() @property { 246 return taskPool_; 247 } 248 249 /// ditto 250 void taskPool(TaskPool pool) @property { 251 taskPool_ = pool; 252 } 253 254 /** 255 * Hysteresis for overload state. 256 * 257 * This is the fraction of the overload value that needs to be reached 258 * before the overload state is cleared. It must be between 0 and 1, 259 * practical choices probably lie between 0.5 and 0.9. 260 */ 261 double overloadHysteresis() const @property { 262 return overloadHysteresis_; 263 } 264 265 /// Ditto 266 void overloadHysteresis(double value) @property { 267 enforce(0 < value && value <= 1, 268 "Invalid value for overload hysteresis: " ~ to!string(value)); 269 overloadHysteresis_ = value; 270 } 271 272 /// Ditto 273 enum DEFAULT_OVERLOAD_HYSTERESIS = 0.8; 274 275 /** 276 * The action which will be taken on overload. 277 */ 278 TOverloadAction overloadAction; 279 280 /// Ditto 281 enum DEFAULT_OVERLOAD_ACTION = TOverloadAction.NONE; 282 283 /** 284 * The write buffer is initialized (and when idleWriteBufferLimit_ is checked 285 * and found to be exceeded, reinitialized) to this size. 286 */ 287 size_t writeBufferDefaultSize; 288 289 /// Ditto 290 enum size_t DEFAULT_WRITE_BUFFER_DEFAULT_SIZE = 1024; 291 292 /** 293 * Max read buffer size for an idle Connection. When we place an idle 294 * Connection into connectionStack_ or on every resizeBufferEveryN_ calls, 295 * we will free the buffer (such that it will be reinitialized by the next 296 * received frame) if it has exceeded this limit. 0 disables this check. 297 */ 298 size_t idleReadBufferLimit; 299 300 /// Ditto 301 enum size_t DEFAULT_IDLE_READ_BUFFER_LIMIT = 1024; 302 303 /** 304 * Max write buffer size for an idle connection. When we place an idle 305 * Connection into connectionStack_ or on every resizeBufferEveryN_ calls, 306 * we ensure that its write buffer is <= to this size; otherwise we 307 * replace it with a new one of writeBufferDefaultSize_ bytes to ensure that 308 * idle connections don't hog memory. 0 disables this check. 309 */ 310 size_t idleWriteBufferLimit; 311 312 /// Ditto 313 enum size_t DEFAULT_IDLE_WRITE_BUFFER_LIMIT = 1024; 314 315 /** 316 * Every N calls we check the buffer size limits on a connected Connection. 317 * 0 disables (i.e. the checks are only done when a connection closes). 318 */ 319 uint resizeBufferEveryN; 320 321 /// Ditto 322 enum uint DEFAULT_RESIZE_BUFFER_EVERY_N = 512; 323 324 /// Limit for how many Connection objects to cache. 325 size_t connectionStackLimit; 326 327 /// Ditto 328 enum size_t DEFAULT_CONNECTION_STACK_LIMIT = 1024; 329 330 /// Limit for number of open connections before server goes into overload 331 /// state. 332 size_t maxConnections; 333 334 /// Ditto 335 enum size_t DEFAULT_MAX_CONNECTIONS = int.max; 336 337 /// Limit for number of connections processing or waiting to process 338 size_t maxActiveProcessors; 339 340 /// Ditto 341 enum size_t DEFAULT_MAX_ACTIVE_PROCESSORS = int.max; 342 343 /// Maximum frame size, in bytes. 344 /// 345 /// If a client tries to send a message larger than this limit, its 346 /// connection will be closed. This helps to avoid allocating huge buffers 347 /// on bogous input. 348 uint maxFrameSize; 349 350 /// Ditto 351 enum uint DEFAULT_MAX_FRAME_SIZE = 256 * 1024 * 1024; 352 353 354 size_t numIOThreads() @property { 355 return numIOThreads_; 356 } 357 358 void numIOThreads(size_t value) @property { 359 enforce(value >= 1, new TException("Must use at least one I/O thread.")); 360 numIOThreads_ = value; 361 } 362 363 enum DEFAULT_NUM_IO_THREADS = 1; 364 365 private: 366 /** 367 * C callback wrapper around acceptConnections(). Expects the custom argument 368 * to be the this pointer of the associated server instance. 369 */ 370 extern(C) static void acceptConnectionsCallback(int fd, short which, 371 void* serverThis 372 ) { 373 (cast(TNonblockingServer)serverThis).acceptConnections(fd, which); 374 } 375 376 /** 377 * Called by libevent (IO loop 0/serve() thread only) when something 378 * happened on the listening socket. 379 */ 380 void acceptConnections(int fd, short eventFlags) { 381 if (atomicLoad(ioLoops_[0].shuttingDown_)) return; 382 383 assert(!!listenSocket_, 384 "Server should be shutting down if listen socket is null."); 385 assert(fd == listenSocket_.handle); 386 assert(eventFlags & EV_READ); 387 388 // Accept as many new clients as possible, even though libevent signaled 389 // only one. This helps the number of calls into libevent space. 390 while (true) { 391 // It is lame to use exceptions for regular control flow (failing is 392 // excepted due to non-blocking mode of operation), but that's the 393 // interface std.socket offers… 394 Socket clientSocket; 395 try { 396 clientSocket = listenSocket_.accept(); 397 } catch (SocketAcceptException e) { 398 if (e.errorCode != WOULD_BLOCK_ERRNO) { 399 logError("Error accepting connection: %s", e); 400 } 401 break; 402 } 403 404 // If the server is overloaded, this is the point to take the specified 405 // action. 406 if (overloadAction != TOverloadAction.NONE && checkOverloaded()) { 407 nConnectionsDropped_++; 408 nTotalConnectionsDropped_++; 409 if (overloadAction == TOverloadAction.CLOSE_ON_ACCEPT) { 410 clientSocket.close(); 411 return; 412 } 413 } 414 415 try { 416 clientSocket.blocking = false; 417 } catch (SocketException e) { 418 logError("Couldn't set client socket to non-blocking mode: %s", e); 419 clientSocket.close(); 420 return; 421 } 422 423 // Create a new Connection for this client socket. 424 Connection conn = void; 425 IOLoop loop = void; 426 bool thisThread = void; 427 synchronized (connectionMutex_) { 428 // Assign an I/O loop to the connection (round-robin). 429 assert(nextIOLoop_ >= 0); 430 assert(nextIOLoop_ < ioLoops_.length); 431 auto selectedThreadIdx = nextIOLoop_; 432 nextIOLoop_ = (nextIOLoop_ + 1) % ioLoops_.length; 433 434 loop = ioLoops_[selectedThreadIdx]; 435 thisThread = (selectedThreadIdx == 0); 436 437 // Check the connection stack to see if we can re-use an existing one. 438 if (connectionStack_.empty) { 439 ++numConnections_; 440 conn = new Connection(clientSocket, loop); 441 442 // Make sure the connection does not get collected while it is active, 443 // i.e. hooked up with libevent. 444 GC.addRoot(cast(void*)conn); 445 } else { 446 conn = connectionStack_[$ - 1]; 447 connectionStack_ = connectionStack_[0 .. $ - 1]; 448 connectionStack_.assumeSafeAppend(); 449 conn.init(clientSocket, loop); 450 } 451 } 452 453 loop.addConnection(); 454 455 // Either notify the ioThread that is assigned this connection to 456 // start processing, or if it is us, we'll just ask this 457 // connection to do its initial state change here. 458 // 459 // (We need to avoid writing to our own notification pipe, to 460 // avoid possible deadlocks if the pipe is full.) 461 if (thisThread) { 462 conn.transition(); 463 } else { 464 loop.notifyCompleted(conn); 465 } 466 } 467 } 468 469 /// Increment the count of connections currently processing. 470 void incrementActiveProcessors() { 471 atomicOp!"+="(numActiveProcessors_, 1); 472 } 473 474 /// Decrement the count of connections currently processing. 475 void decrementActiveProcessors() { 476 assert(numActiveProcessors_ > 0); 477 atomicOp!"-="(numActiveProcessors_, 1); 478 } 479 480 /** 481 * Determines if the server is currently overloaded. 482 * 483 * If the number of open connections or »processing« connections is over the 484 * respective limit, the server will enter overload handling mode and a 485 * warning will be logged. If below values are below the hysteresis curve, 486 * this will cause the server to exit it again. 487 * 488 * Returns: Whether the server is currently overloaded. 489 */ 490 bool checkOverloaded() { 491 auto activeConnections = numConnections_ - connectionStack_.length; 492 if (numActiveProcessors_ > maxActiveProcessors || 493 activeConnections > maxConnections) { 494 if (!overloaded_) { 495 logInfo("Entering overloaded state."); 496 overloaded_ = true; 497 } 498 } else { 499 if (overloaded_ && 500 (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors) && 501 (activeConnections <= overloadHysteresis_ * maxConnections)) 502 { 503 logInfo("Exiting overloaded state, %s connection(s) dropped (% total).", 504 nConnectionsDropped_, nTotalConnectionsDropped_); 505 nConnectionsDropped_ = 0; 506 overloaded_ = false; 507 } 508 } 509 510 return overloaded_; 511 } 512 513 /** 514 * Marks a connection as inactive and either puts it back into the 515 * connection pool or leaves it for garbage collection. 516 */ 517 void disposeConnection(Connection connection) { 518 synchronized (connectionMutex_) { 519 if (!connectionStackLimit || 520 (connectionStack_.length < connectionStackLimit)) 521 { 522 connection.checkIdleBufferLimit(idleReadBufferLimit, 523 idleWriteBufferLimit); 524 connectionStack_ ~= connection; 525 } else { 526 assert(numConnections_ > 0); 527 --numConnections_; 528 529 // Leave the connection object for collection now. 530 GC.removeRoot(cast(void*)connection); 531 } 532 } 533 } 534 535 /// Socket used to listen for connections and accepting them. 536 Socket listenSocket_; 537 538 /// Port to listen on. 539 ushort port_; 540 541 /// Whether to listen on IPv6 only. 542 bool ipv6Only_; 543 544 /// The total number of connections existing, both active and idle. 545 size_t numConnections_; 546 547 /// The number of connections which are currently waiting for the processor 548 /// to return. 549 shared size_t numActiveProcessors_; 550 551 /// Hysteresis for leaving overload state. 552 double overloadHysteresis_; 553 554 /// Whether the server is currently overloaded. 555 bool overloaded_; 556 557 /// Number of connections dropped since the server entered the current 558 /// overloaded state. 559 uint nConnectionsDropped_; 560 561 /// Number of connections dropped due to overload since the server started. 562 ulong nTotalConnectionsDropped_; 563 564 /// The task pool used for processing requests. 565 TaskPool taskPool_; 566 567 /// Number of IO threads this server will use (>= 1). 568 size_t numIOThreads_; 569 570 /// The IOLoops among which socket handling work is distributed. 571 IOLoop[] ioLoops_; 572 573 /// The index of the loop in ioLoops_ which will handle the next accepted 574 /// connection. 575 size_t nextIOLoop_; 576 577 /// All the connection objects which have been created but are not currently 578 /// in use. When a connection is closed, it it placed here to enable object 579 /// (resp. buffer) reuse. 580 Connection[] connectionStack_; 581 582 /// This mutex protects the connection stack. 583 Mutex connectionMutex_; 584 } 585 586 private { 587 /* 588 * Encapsulates a libevent event loop. 589 * 590 * The design is a bit of a mess, since the first loop is actually run on the 591 * server thread itself and is special because it is the only instance for 592 * which listenSocket_ is not null. 593 */ 594 final class IOLoop { 595 /** 596 * Creates a new instance and set up the event base. 597 * 598 * If listenSocket is not null, the thread will also accept new 599 * connections itself. 600 */ 601 this(TNonblockingServer server, Socket listenSocket) { 602 server_ = server; 603 listenSocket_ = listenSocket; 604 initMutex_ = new Mutex; 605 } 606 607 /** 608 * Runs the event loop; only returns after a call to stop(). 609 */ 610 void run() { 611 assert(!atomicLoad(initialized_), "IOLoop already running?!"); 612 613 synchronized (initMutex_) { 614 if (atomicLoad(shuttingDown_)) return; 615 atomicStore(initialized_, true); 616 617 assert(!eventBase_); 618 eventBase_ = event_base_new(); 619 620 if (listenSocket_) { 621 // Log the libevent version and backend. 622 logInfo("libevent version %s, using method %s.", 623 to!string(event_get_version()), to!string(event_base_get_method(eventBase_))); 624 625 // Register the event for the listening socket. 626 listenEvent_ = event_new(eventBase_, cast(int)listenSocket_.handle, 627 cast(short) (EV_READ | EV_PERSIST | EV_ET), 628 assumeNothrow(&TNonblockingServer.acceptConnectionsCallback), 629 cast(void*)server_); 630 if (event_add(listenEvent_, null) == -1) { 631 throw new TException("event_add for the listening socket event failed."); 632 } 633 } 634 635 auto pair = socketPair(); 636 foreach (s; pair) s.blocking = false; 637 completionSendSocket_ = pair[0]; 638 completionReceiveSocket_ = pair[1]; 639 640 // Register an event for the task completion notification socket. 641 completionEvent_ = event_new(eventBase_, cast(int)completionReceiveSocket_.handle, 642 cast(short)(EV_READ | EV_PERSIST | EV_ET), assumeNothrow(&completedCallback), 643 cast(void*)this); 644 645 if (event_add(completionEvent_, null) == -1) { 646 throw new TException("event_add for the notification socket failed."); 647 } 648 } 649 650 // Run libevent engine, returns only after stop(). 651 event_base_dispatch(eventBase_); 652 653 if (listenEvent_) { 654 event_free(listenEvent_); 655 listenEvent_ = null; 656 } 657 658 event_free(completionEvent_); 659 completionEvent_ = null; 660 661 completionSendSocket_.close(); 662 completionSendSocket_ = null; 663 664 completionReceiveSocket_.close(); 665 completionReceiveSocket_ = null; 666 667 event_base_free(eventBase_); 668 eventBase_ = null; 669 670 atomicStore(shuttingDown_, false); 671 672 initialized_ = false; 673 } 674 675 /** 676 * Adds a new connection handled by this loop. 677 */ 678 void addConnection() { 679 ++numActiveConnections_; 680 } 681 682 /** 683 * Disposes a connection object (typically after it has been closed). 684 */ 685 void disposeConnection(Connection conn) { 686 server_.disposeConnection(conn); 687 assert(numActiveConnections_ > 0); 688 --numActiveConnections_; 689 if (numActiveConnections_ == 0) { 690 if (atomicLoad(shuttingDown_)) { 691 event_base_loopbreak(eventBase_); 692 } 693 } 694 } 695 696 /** 697 * Notifies the event loop that the current step (initialization, 698 * processing of a request) on a certain connection has been completed. 699 * 700 * This function is thread-safe, but should never be called from the 701 * thread running the loop itself. 702 */ 703 void notifyCompleted(Connection conn) { 704 assert(!!completionSendSocket_); 705 auto bytesSent = completionSendSocket_.send(cast(ubyte[])((&conn)[0 .. 1])); 706 707 if (bytesSent != Connection.sizeof) { 708 logError("Sending completion notification failed, connection will " ~ 709 "not be properly terminated."); 710 } 711 } 712 713 /** 714 * Exits the event loop after all currently active connections have been 715 * closed. 716 * 717 * This function is thread-safe. 718 */ 719 void stop() { 720 // There is a bug in either libevent or its documentation, having no 721 // events registered doesn't actually terminate the loop, because 722 // event_base_new() registers some internal one by calling 723 // evthread_make_base_notifiable(). 724 // Due to this, we can't simply remove all events and expect the event 725 // loop to terminate. Instead, we ping the event loop using a null 726 // completion message. This way, we make sure to wake up the libevent 727 // thread if it not currently processing any connections. It will break 728 // out of the loop in disposeConnection() after the last active 729 // connection has been closed. 730 synchronized (initMutex_) { 731 atomicStore(shuttingDown_, true); 732 if (atomicLoad(initialized_)) notifyCompleted(null); 733 } 734 } 735 736 private: 737 /** 738 * C callback to call completed() from libevent. 739 * 740 * Expects the custom argument to be the this pointer of the associated 741 * IOLoop instance. 742 */ 743 extern(C) static void completedCallback(int fd, short what, void* loopThis) { 744 assert(what & EV_READ); 745 auto loop = cast(IOLoop)loopThis; 746 assert(fd == loop.completionReceiveSocket_.handle); 747 loop.completed(); 748 } 749 750 /** 751 * Reads from the completion receive socket and appropriately transitions 752 * the connections and shuts down the loop if requested. 753 */ 754 void completed() { 755 Connection connection; 756 ptrdiff_t bytesRead; 757 while (true) { 758 bytesRead = completionReceiveSocket_.receive( 759 cast(ubyte[])((&connection)[0 .. 1])); 760 if (bytesRead < 0) { 761 auto errno = getSocketErrno(); 762 763 if (errno != WOULD_BLOCK_ERRNO) { 764 logError("Reading from completion socket failed, some connection " ~ 765 "will never be properly terminated: %s", socketErrnoString(errno)); 766 } 767 } 768 769 if (bytesRead != Connection.sizeof) break; 770 771 if (!connection) { 772 assert(atomicLoad(shuttingDown_)); 773 if (numActiveConnections_ == 0) { 774 event_base_loopbreak(eventBase_); 775 } 776 continue; 777 } 778 779 connection.transition(); 780 } 781 782 if (bytesRead > 0) { 783 logError("Unexpected partial read from completion socket " ~ 784 "(%s bytes instead of %s).", bytesRead, Connection.sizeof); 785 } 786 } 787 788 /// associated server 789 TNonblockingServer server_; 790 791 /// The managed listening socket, if any. 792 Socket listenSocket_; 793 794 /// The libevent event base for the loop. 795 event_base* eventBase_; 796 797 /// Triggered on listen socket events. 798 event* listenEvent_; 799 800 /// Triggered on completion receive socket events. 801 event* completionEvent_; 802 803 /// Socket used to send completion notification messages. Paired with 804 /// completionReceiveSocket_. 805 Socket completionSendSocket_; 806 807 /// Socket used to send completion notification messages. Paired with 808 /// completionSendSocket_. 809 Socket completionReceiveSocket_; 810 811 /// Whether the server is currently shutting down (i.e. the cancellation has 812 /// been triggered, but not all client connections have been closed yet). 813 shared bool shuttingDown_; 814 815 /// The number of currently active client connections. 816 size_t numActiveConnections_; 817 818 /// Guards loop startup so that the loop can be reliably shut down even if 819 /// another thread has just started to execute run(). Locked during 820 /// initialization in run(). When unlocked, the completion mechanism is 821 /// expected to be fully set up. 822 Mutex initMutex_; 823 shared bool initialized_; /// Ditto 824 } 825 826 /* 827 * I/O states a socket can be in. 828 */ 829 enum SocketState { 830 RECV_FRAME_SIZE, /// The frame size is received. 831 RECV, /// The payload is received. 832 SEND /// The response is written back out. 833 } 834 835 /* 836 * States a connection can be in. 837 */ 838 enum ConnectionState { 839 INIT, /// The connection will be initialized. 840 READ_FRAME_SIZE, /// The four frame size bytes are being read. 841 READ_REQUEST, /// The request payload itself is being read. 842 WAIT_PROCESSOR, /// The connection waits for the processor to finish. 843 SEND_RESULT /// The result is written back out. 844 } 845 846 /* 847 * A connection that is handled via libevent. 848 * 849 * Data received is buffered until the request is complete (returning back to 850 * libevent if not), at which point the processor is invoked. 851 */ 852 final class Connection { 853 /** 854 * Constructs a new instance. 855 * 856 * To reuse a connection object later on, the init() function can be used 857 * to the same effect on the internal state. 858 */ 859 this(Socket socket, IOLoop loop) { 860 // The input and output transport objects are reused between clients 861 // connections, so initialize them here rather than in init(). 862 inputTransport_ = new TInputRangeTransport!(ubyte[])([]); 863 outputTransport_ = new TMemoryBuffer(loop.server_.writeBufferDefaultSize); 864 865 init(socket, loop); 866 } 867 868 /** 869 * Initializes the connection. 870 * 871 * Params: 872 * socket = The socket to work on. 873 * eventFlags = Any flags to pass to libevent. 874 * s = The server this connection is part of. 875 */ 876 void init(Socket socket, IOLoop loop) { 877 import thrift.server.base; 878 // TODO: This allocation could be avoided. 879 socket_ = new TSocket(socket); 880 881 loop_ = loop; 882 server_ = loop_.server_; 883 connState_ = ConnectionState.INIT; 884 eventFlags_ = 0; 885 886 readBufferPos_ = 0; 887 readWant_ = 0; 888 889 writeBuffer_ = null; 890 writeBufferPos_ = 0; 891 largestWriteBufferSize_ = 0; 892 893 socketState_ = SocketState.RECV_FRAME_SIZE; 894 callsSinceResize_ = 0; 895 896 factoryInputTransport_ = 897 server_.inputTransportFactory_.getTransport(inputTransport_); 898 factoryOutputTransport_ = 899 server_.outputTransportFactory_.getTransport(outputTransport_); 900 901 inputProtocol_ = 902 server_.inputProtocolFactory_.getProtocol(factoryInputTransport_); 903 outputProtocol_ = 904 server_.outputProtocolFactory_.getProtocol(factoryOutputTransport_); 905 906 if (server_.eventHandler) { 907 connectionContext_ = 908 server_.eventHandler.createContext(inputProtocol_, outputProtocol_); 909 } 910 911 auto info = TConnectionInfo(inputProtocol_, outputProtocol_, socket_); 912 processor_ = server_.processorFactory_.getProcessor(info); 913 } 914 915 ~this() { 916 free(readBuffer_); 917 if (event_) { 918 event_free(event_); 919 event_ = null; 920 } 921 } 922 923 /** 924 * Check buffers against the size limits and shrink them if exceeded. 925 * 926 * Params: 927 * readLimit = Read buffer size limit (in bytes, 0 to ignore). 928 * writeLimit = Write buffer size limit (in bytes, 0 to ignore). 929 */ 930 void checkIdleBufferLimit(size_t readLimit, size_t writeLimit) { 931 if (readLimit > 0 && readBufferSize_ > readLimit) { 932 free(readBuffer_); 933 readBuffer_ = null; 934 readBufferSize_ = 0; 935 } 936 937 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) { 938 // just start over 939 outputTransport_.reset(server_.writeBufferDefaultSize); 940 largestWriteBufferSize_ = 0; 941 } 942 } 943 944 /** 945 * Transitions the connection to the next state. 946 * 947 * This is called e.g. when the request has been read completely or all 948 * the data has been written back. 949 */ 950 void transition() { 951 assert(!!loop_); 952 assert(!!server_); 953 954 // Switch upon the state that we are currently in and move to a new state 955 final switch (connState_) { 956 case ConnectionState.READ_REQUEST: 957 // We are done reading the request, package the read buffer into transport 958 // and get back some data from the dispatch function 959 inputTransport_.reset(readBuffer_[0 .. readBufferPos_]); 960 outputTransport_.reset(); 961 962 // Prepend four bytes of blank space to the buffer so we can 963 // write the frame size there later. 964 // Strictly speaking, we wouldn't have to write anything, just 965 // increment the TMemoryBuffer writeOffset_. This would yield a tiny 966 // performance gain. 967 ubyte[4] space = void; 968 outputTransport_.write(space); 969 970 server_.incrementActiveProcessors(); 971 972 taskPool_ = server_.taskPool; 973 if (taskPool_) { 974 // Create a new task and add it to the task pool queue. 975 auto processingTask = task!processRequest(this); 976 connState_ = ConnectionState.WAIT_PROCESSOR; 977 taskPool_.put(processingTask); 978 979 // We don't want to process any more data while the task is active. 980 unregisterEvent(); 981 return; 982 } 983 984 // Just process it right now if there is no task pool set. 985 processRequest(this); 986 goto case; 987 case ConnectionState.WAIT_PROCESSOR: 988 // We have now finished processing the request, set the frame size 989 // for the outputTransport_ contents and set everything up to write 990 // it out via libevent. 991 server_.decrementActiveProcessors(); 992 993 // Acquire the data written to the transport. 994 // KLUDGE: To avoid copying, we simply cast the const away and 995 // modify the internal buffer of the TMemoryBuffer – works with the 996 // current implementation, but isn't exactly beautiful. 997 writeBuffer_ = cast(ubyte[])outputTransport_.getContents(); 998 999 assert(writeBuffer_.length >= 4, "The write buffer should have " ~ 1000 "least the initially added dummy length bytes."); 1001 if (writeBuffer_.length == 4) { 1002 // The request was one-way, no response to write. 1003 goto case ConnectionState.INIT; 1004 } 1005 1006 // Write the frame size into the four bytes reserved for it. 1007 auto size = hostToNet(cast(uint)(writeBuffer_.length - 4)); 1008 writeBuffer_[0 .. 4] = cast(ubyte[])((&size)[0 .. 1]); 1009 1010 writeBufferPos_ = 0; 1011 socketState_ = SocketState.SEND; 1012 connState_ = ConnectionState.SEND_RESULT; 1013 registerEvent(EV_WRITE | EV_PERSIST); 1014 1015 return; 1016 case ConnectionState.SEND_RESULT: 1017 // The result has been sent back to the client, we don't need the 1018 // buffers anymore. 1019 if (writeBuffer_.length > largestWriteBufferSize_) { 1020 largestWriteBufferSize_ = writeBuffer_.length; 1021 } 1022 1023 if (server_.resizeBufferEveryN > 0 && 1024 ++callsSinceResize_ >= server_.resizeBufferEveryN 1025 ) { 1026 checkIdleBufferLimit(server_.idleReadBufferLimit, 1027 server_.idleWriteBufferLimit); 1028 callsSinceResize_ = 0; 1029 } 1030 1031 goto case; 1032 case ConnectionState.INIT: 1033 writeBuffer_ = null; 1034 writeBufferPos_ = 0; 1035 socketState_ = SocketState.RECV_FRAME_SIZE; 1036 connState_ = ConnectionState.READ_FRAME_SIZE; 1037 readBufferPos_ = 0; 1038 registerEvent(EV_READ | EV_PERSIST); 1039 1040 return; 1041 case ConnectionState.READ_FRAME_SIZE: 1042 // We just read the request length, set up the buffers for reading 1043 // the payload. 1044 if (readWant_ > readBufferSize_) { 1045 // The current buffer is too small, exponentially grow the buffer 1046 // until it is big enough. 1047 1048 if (readBufferSize_ == 0) { 1049 readBufferSize_ = 1; 1050 } 1051 1052 auto newSize = readBufferSize_; 1053 while (readWant_ > newSize) { 1054 newSize *= 2; 1055 } 1056 1057 auto newBuffer = cast(ubyte*)realloc(readBuffer_, newSize); 1058 if (!newBuffer) onOutOfMemoryError(); 1059 1060 readBuffer_ = newBuffer; 1061 readBufferSize_ = newSize; 1062 } 1063 1064 readBufferPos_= 0; 1065 1066 socketState_ = SocketState.RECV; 1067 connState_ = ConnectionState.READ_REQUEST; 1068 1069 return; 1070 } 1071 } 1072 1073 private: 1074 /** 1075 * C callback to call workSocket() from libevent. 1076 * 1077 * Expects the custom argument to be the this pointer of the associated 1078 * connection. 1079 */ 1080 extern(C) static void workSocketCallback(int fd, short flags, void* connThis) { 1081 auto conn = cast(Connection)connThis; 1082 assert(fd == conn.socket_.socketHandle); 1083 conn.workSocket(); 1084 } 1085 1086 /** 1087 * Invoked by libevent when something happens on the socket. 1088 */ 1089 void workSocket() { 1090 final switch (socketState_) { 1091 case SocketState.RECV_FRAME_SIZE: 1092 // If some bytes have already been read, they have been kept in 1093 // readWant_. 1094 auto frameSize = readWant_; 1095 1096 try { 1097 // Read from the socket 1098 auto bytesRead = socket_.read( 1099 (cast(ubyte[])((&frameSize)[0 .. 1]))[readBufferPos_ .. $]); 1100 if (bytesRead == 0) { 1101 // Couldn't read anything, but we have been notified – client 1102 // has disconnected. 1103 close(); 1104 return; 1105 } 1106 1107 readBufferPos_ += bytesRead; 1108 } catch (TTransportException te) { 1109 logError("Failed to read frame size from client connection: %s", te); 1110 close(); 1111 return; 1112 } 1113 1114 if (readBufferPos_ < frameSize.sizeof) { 1115 // Frame size not complete yet, save the current buffer in 1116 // readWant_ so that the remaining bytes can be read later. 1117 readWant_ = frameSize; 1118 return; 1119 } 1120 1121 auto size = netToHost(frameSize); 1122 if (size > server_.maxFrameSize) { 1123 logError("Frame size too large (%s > %s), client %s not using " ~ 1124 "TFramedTransport?", size, server_.maxFrameSize, 1125 socket_.getPeerAddress().toHostNameString()); 1126 close(); 1127 return; 1128 } 1129 readWant_ = size; 1130 1131 // Now we know the frame size, set everything up for reading the 1132 // payload. 1133 transition(); 1134 return; 1135 1136 case SocketState.RECV: 1137 // If we already got all the data, we should be in the SEND state. 1138 assert(readBufferPos_ < readWant_); 1139 1140 size_t bytesRead; 1141 try { 1142 // Read as much as possible from the socket. 1143 bytesRead = socket_.read(readBuffer_[readBufferPos_ .. readWant_]); 1144 } catch (TTransportException te) { 1145 logError("Failed to read from client socket: %s", te); 1146 close(); 1147 return; 1148 } 1149 1150 if (bytesRead == 0) { 1151 // We were notified, but no bytes could be read -> the client 1152 // disconnected. 1153 close(); 1154 return; 1155 } 1156 1157 readBufferPos_ += bytesRead; 1158 assert(readBufferPos_ <= readWant_); 1159 1160 if (readBufferPos_ == readWant_) { 1161 // The payload has been read completely, move on. 1162 transition(); 1163 } 1164 1165 return; 1166 case SocketState.SEND: 1167 assert(writeBufferPos_ <= writeBuffer_.length); 1168 1169 if (writeBufferPos_ == writeBuffer_.length) { 1170 // Nothing left to send – this shouldn't happen, just move on. 1171 logInfo("WARNING: In send state, but no data to send.\n"); 1172 transition(); 1173 return; 1174 } 1175 1176 size_t bytesSent; 1177 try { 1178 bytesSent = socket_.writeSome(writeBuffer_[writeBufferPos_ .. $]); 1179 } catch (TTransportException te) { 1180 logError("Failed to write to client socket: %s", te); 1181 close(); 1182 return; 1183 } 1184 1185 writeBufferPos_ += bytesSent; 1186 assert(writeBufferPos_ <= writeBuffer_.length); 1187 1188 if (writeBufferPos_ == writeBuffer_.length) { 1189 // The whole response has been written out, we are done. 1190 transition(); 1191 } 1192 1193 return; 1194 } 1195 } 1196 1197 /** 1198 * Registers a libevent event for workSocket() with the passed flags, 1199 * unregistering the previous one (if any). 1200 */ 1201 void registerEvent(short eventFlags) { 1202 if (eventFlags_ == eventFlags) { 1203 // Nothing to do if flags are the same. 1204 return; 1205 } 1206 1207 // Delete the previously existing event. 1208 unregisterEvent(); 1209 1210 eventFlags_ = eventFlags; 1211 1212 if (eventFlags == 0) return; 1213 1214 if (!event_) { 1215 // If the event was not already allocated, do it now. 1216 event_ = event_new(loop_.eventBase_, cast(int)socket_.socketHandle, 1217 eventFlags_, assumeNothrow(&workSocketCallback), cast(void*)this); 1218 } else { 1219 event_assign(event_, loop_.eventBase_, cast(int)socket_.socketHandle, 1220 eventFlags_, assumeNothrow(&workSocketCallback), cast(void*)this); 1221 } 1222 1223 // Add the event 1224 if (event_add(event_, null) == -1) { 1225 logError("event_add() for client socket failed."); 1226 } 1227 } 1228 1229 /** 1230 * Unregisters the current libevent event, if any. 1231 */ 1232 void unregisterEvent() { 1233 if (event_ && eventFlags_ != 0) { 1234 eventFlags_ = 0; 1235 if (event_del(event_) == -1) { 1236 logError("event_del() for client socket failed."); 1237 return; 1238 } 1239 } 1240 } 1241 1242 /** 1243 * Closes this connection and returns it back to the server. 1244 */ 1245 void close() { 1246 unregisterEvent(); 1247 1248 if (server_.eventHandler) { 1249 server_.eventHandler.deleteContext( 1250 connectionContext_, inputProtocol_, outputProtocol_); 1251 } 1252 1253 // Close the socket 1254 socket_.close(); 1255 1256 // close any factory produced transports. 1257 factoryInputTransport_.close(); 1258 factoryOutputTransport_.close(); 1259 1260 // This connection object can now be reused. 1261 loop_.disposeConnection(this); 1262 } 1263 1264 /// The server this connection belongs to. 1265 TNonblockingServer server_; 1266 1267 /// The task pool used for this connection. This is cached instead of 1268 /// directly using server_.taskPool to avoid confusion if it is changed in 1269 /// another thread while the request is processed. 1270 TaskPool taskPool_; 1271 1272 /// The I/O thread handling this connection. 1273 IOLoop loop_; 1274 1275 /// The socket managed by this connection. 1276 TSocket socket_; 1277 1278 /// The libevent object used for registering the workSocketCallback. 1279 event* event_; 1280 1281 /// Libevent flags 1282 short eventFlags_; 1283 1284 /// Socket mode 1285 SocketState socketState_; 1286 1287 /// Application state 1288 ConnectionState connState_; 1289 1290 /// The size of the frame to read. If still in READ_FRAME_SIZE state, some 1291 /// of the bytes might not have been written, and the value might still be 1292 /// in network byte order. An uint (not a size_t) because the frame size on 1293 /// the wire is specified as one. 1294 uint readWant_; 1295 1296 /// The position in the read buffer, i.e. the number of payload bytes 1297 /// already received from the socket in READ_REQUEST state, resp. the 1298 /// number of size bytes in READ_FRAME_SIZE state. 1299 uint readBufferPos_; 1300 1301 /// Read buffer 1302 ubyte* readBuffer_; 1303 1304 /// Read buffer size 1305 size_t readBufferSize_; 1306 1307 /// Write buffer 1308 ubyte[] writeBuffer_; 1309 1310 /// How far through writing are we? 1311 size_t writeBufferPos_; 1312 1313 /// Largest size of write buffer seen since buffer was constructed 1314 size_t largestWriteBufferSize_; 1315 1316 /// Number of calls since the last time checkIdleBufferLimit has been 1317 /// invoked (see TServer.resizeBufferEveryN). 1318 uint callsSinceResize_; 1319 1320 /// Base transports the processor reads from/writes to. 1321 TInputRangeTransport!(ubyte[]) inputTransport_; 1322 TMemoryBuffer outputTransport_; 1323 1324 /// The actual transports passed to the processor obtained via the 1325 /// transport factory. 1326 TTransport factoryInputTransport_; 1327 TTransport factoryOutputTransport_; /// Ditto 1328 1329 /// Input/output protocols, connected to factory{Input, Output}Transport. 1330 TProtocol inputProtocol_; 1331 TProtocol outputProtocol_; /// Ditto. 1332 1333 /// Connection context optionally created by the server event handler. 1334 Variant connectionContext_; 1335 1336 /// The processor used for this connection. 1337 TProcessor processor_; 1338 } 1339 } 1340 1341 /* 1342 * The request processing function, which invokes the processor for the server 1343 * for all the RPC messages received over a connection. 1344 * 1345 * Must be public because it is passed as alias to std.parallelism.task(). 1346 */ 1347 void processRequest(Connection connection) { 1348 try { 1349 while (true) { 1350 with (connection) { 1351 if (server_.eventHandler) { 1352 server_.eventHandler.preProcess(connectionContext_, socket_); 1353 } 1354 1355 if (!processor_.process(inputProtocol_, outputProtocol_, 1356 connectionContext_) || !inputProtocol_.transport.peek() 1357 ) { 1358 // Something went fundamentally wrong or there is nothing more to 1359 // process, close the connection. 1360 break; 1361 } 1362 } 1363 } 1364 } catch (TTransportException ttx) { 1365 logError("Client died: %s", ttx); 1366 } catch (Exception e) { 1367 logError("Uncaught exception: %s", e); 1368 } 1369 1370 if (connection.taskPool_) connection.loop_.notifyCompleted(connection); 1371 } 1372 1373 unittest { 1374 import thrift.internal.test.server; 1375 1376 // Temporarily disable info log output in order not to spam the test results 1377 // with startup info messages. 1378 auto oldInfoLogSink = g_infoLogSink; 1379 g_infoLogSink = null; 1380 scope (exit) g_infoLogSink = oldInfoLogSink; 1381 1382 // Test in-line processing shutdown with one as well as several I/O threads. 1383 testServeCancel!(TNonblockingServer)(); 1384 testServeCancel!(TNonblockingServer)((TNonblockingServer s) { 1385 s.numIOThreads = 4; 1386 }); 1387 1388 // Test task pool processing shutdown with one as well as several I/O threads. 1389 auto tp = new TaskPool(4); 1390 tp.isDaemon = true; 1391 testServeCancel!(TNonblockingServer)((TNonblockingServer s) { 1392 s.taskPool = tp; 1393 }); 1394 testServeCancel!(TNonblockingServer)((TNonblockingServer s) { 1395 s.taskPool = tp; 1396 s.numIOThreads = 4; 1397 }); 1398 }