1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 /** 21 * Transports for reading from/writing to Thrift »log files«. 22 * 23 * These transports are not »stupid« sources and sinks just reading and 24 * writing bytes from a file verbatim, but organize the contents in the form 25 * of so-called »events«, which refers to the data written between two flush() 26 * calls. 27 * 28 * Chunking is supported, events are guaranteed to never span chunk boundaries. 29 * As a consequence, an event can never be larger than the chunk size. The 30 * chunk size used is not saved with the file, so care has to be taken to make 31 * sure the same chunk size is used for reading and writing. 32 */ 33 module thrift.transport.file; 34 35 import core.thread : Thread; 36 import std.array : empty; 37 import std.algorithm : min, max; 38 import std.concurrency; 39 import std.conv : to; 40 import std.datetime : dur, Duration,msecs; 41 42 static if (__VERSION__ >= 2079) 43 import std.datetime.stopwatch : AutoStart, StopWatch; 44 else 45 import std.datetime: AutoStart, StopWatch; 46 47 import std.exception; 48 import std.stdio : File; 49 import thrift.base; 50 import thrift.transport.base; 51 52 /// The default chunk size, in bytes. 53 enum DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024; 54 55 /// The type used to represent event sizes in the file. 56 alias uint EventSize; 57 58 version (BigEndian) { 59 static assert(false, 60 "Little endian byte order is assumed in thrift.transport.file."); 61 } 62 63 /** 64 * A transport used to read log files. It can never be written to, calling 65 * write() throws. 66 * 67 * Contrary to the C++ design, explicitly opening the transport/file before 68 * using is necessary to allow manually closing the file without relying on the 69 * object lifetime. Otherwise, it's a straight port of the C++ implementation. 70 */ 71 final class TFileReaderTransport : TBaseTransport { 72 /** 73 * Creates a new file writer transport. 74 * 75 * Params: 76 * path = Path of the file to opperate on. 77 */ 78 this(string path) { 79 path_ = path; 80 chunkSize_ = DEFAULT_CHUNK_SIZE; 81 readBufferSize_ = DEFAULT_READ_BUFFER_SIZE; 82 readTimeout_ = DEFAULT_READ_TIMEOUT; 83 corruptedEventSleepDuration_ = DEFAULT_CORRUPTED_EVENT_SLEEP_DURATION; 84 maxEventSize = DEFAULT_MAX_EVENT_SIZE; 85 } 86 87 override bool isOpen() @property { 88 return isOpen_; 89 } 90 91 override bool peek() { 92 if (!isOpen) return false; 93 94 // If there is no event currently processed, try fetching one from the 95 // file. 96 if (!currentEvent_) { 97 currentEvent_ = readEvent(); 98 99 if (!currentEvent_) { 100 // Still nothing there, couldn't read a new event. 101 return false; 102 } 103 } 104 // check if there is anything to read 105 return (currentEvent_.length - currentEventPos_) > 0; 106 } 107 108 override void open() { 109 if (isOpen) return; 110 try { 111 file_ = File(path_, "rb"); 112 } catch (Exception e) { 113 throw new TTransportException("Error on opening input file.", 114 TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e); 115 } 116 isOpen_ = true; 117 } 118 119 override void close() { 120 if (!isOpen) return; 121 122 file_.close(); 123 isOpen_ = false; 124 readState_.resetAllValues(); 125 } 126 127 override size_t read(ubyte[] buf) { 128 enforce(isOpen, new TTransportException( 129 "Cannot read if file is not open.", TTransportException.Type.NOT_OPEN)); 130 131 // If there is no event currently processed, try fetching one from the 132 // file. 133 if (!currentEvent_) { 134 currentEvent_ = readEvent(); 135 136 if (!currentEvent_) { 137 // Still nothing there, couldn't read a new event. 138 return 0; 139 } 140 } 141 142 auto len = buf.length; 143 auto remaining = currentEvent_.length - currentEventPos_; 144 145 if (remaining <= len) { 146 // If less than the requested length is available, read as much as 147 // possible. 148 buf[0 .. remaining] = currentEvent_[currentEventPos_ .. $]; 149 currentEvent_ = null; 150 currentEventPos_ = 0; 151 return remaining; 152 } 153 154 // There will still be data left in the buffer after reading, pass out len 155 // bytes. 156 buf[] = currentEvent_[currentEventPos_ .. currentEventPos_ + len]; 157 currentEventPos_ += len; 158 return len; 159 } 160 161 ulong getNumChunks() { 162 enforce(isOpen, new TTransportException( 163 "Cannot get number of chunks if file not open.", 164 TTransportException.Type.NOT_OPEN)); 165 166 try { 167 auto fileSize = file_.size(); 168 if (fileSize == 0) { 169 // Empty files have no chunks. 170 return 0; 171 } 172 return ((fileSize)/chunkSize_) + 1; 173 } catch (Exception e) { 174 throw new TTransportException("Error getting file size.", __FILE__, 175 __LINE__, e); 176 } 177 } 178 179 ulong getCurChunk() { 180 return offset_ / chunkSize_; 181 } 182 183 void seekToChunk(long chunk) { 184 enforce(isOpen, new TTransportException( 185 "Cannot get number of chunks if file not open.", 186 TTransportException.Type.NOT_OPEN)); 187 188 auto numChunks = getNumChunks(); 189 190 if (chunk < 0) { 191 // Count negative indices from the end. 192 chunk += numChunks; 193 } 194 195 if (chunk < 0) { 196 logError("Incorrect chunk number for reverse seek, seeking to " ~ 197 "beginning instead: %s", chunk); 198 chunk = 0; 199 } 200 201 bool seekToEnd; 202 long minEndOffset; 203 if (chunk >= numChunks) { 204 logError("Trying to seek to non-existing chunk, seeking to " ~ 205 "end of file instead: %s", chunk); 206 seekToEnd = true; 207 chunk = numChunks - 1; 208 // this is the min offset to process events till 209 minEndOffset = file_.size(); 210 } 211 212 readState_.resetAllValues(); 213 currentEvent_ = null; 214 215 try { 216 file_.seek(chunk * chunkSize_); 217 offset_ = chunk * chunkSize_; 218 } catch (Exception e) { 219 throw new TTransportException("Error seeking to chunk", __FILE__, 220 __LINE__, e); 221 } 222 223 if (seekToEnd) { 224 // Never wait on the end of the file for new content, we just want to 225 // find the last one. 226 auto oldReadTimeout = readTimeout_; 227 scope (exit) readTimeout_ = oldReadTimeout; 228 readTimeout_ = dur!"hnsecs"(0); 229 230 // Keep on reading unti the last event at point of seekToChunk call. 231 while ((offset_ + readState_.bufferPos_) < minEndOffset) { 232 if (readEvent() is null) { 233 break; 234 } 235 } 236 } 237 } 238 239 void seekToEnd() { 240 seekToChunk(getNumChunks()); 241 } 242 243 /** 244 * The size of the chunks the file is divided into, in bytes. 245 */ 246 ulong chunkSize() @property const { 247 return chunkSize_; 248 } 249 250 /// ditto 251 void chunkSize(ulong value) @property { 252 enforce(!isOpen, new TTransportException( 253 "Cannot set chunk size after TFileReaderTransport has been opened.")); 254 enforce(value > EventSize.sizeof, new TTransportException("Chunks must " ~ 255 "be large enough to accommodate at least a single byte of payload data.")); 256 chunkSize_ = value; 257 } 258 259 /** 260 * If positive, wait the specified duration for new data when arriving at 261 * end of file. If negative, wait forever (tailing mode), waking up to check 262 * in the specified interval. If zero, do not wait at all. 263 * 264 * Defaults to 500 ms. 265 */ 266 Duration readTimeout() @property const { 267 return readTimeout_; 268 } 269 270 /// ditto 271 void readTimeout(Duration value) @property { 272 readTimeout_ = value; 273 } 274 275 /// ditto 276 enum DEFAULT_READ_TIMEOUT = dur!"msecs"(500); 277 278 /** 279 * Read buffer size, in bytes. 280 * 281 * Defaults to 1 MiB. 282 */ 283 size_t readBufferSize() @property const { 284 return readBufferSize_; 285 } 286 287 /// ditto 288 void readBufferSize(size_t value) @property { 289 if (readBuffer_) { 290 enforce(value <= readBufferSize_, 291 "Cannot shrink read buffer after first read."); 292 readBuffer_.length = value; 293 } 294 readBufferSize_ = value; 295 } 296 297 /// ditto 298 enum DEFAULT_READ_BUFFER_SIZE = 1 * 1024 * 1024; 299 300 /** 301 * Arbitrary event size limit, in bytes. Must be smaller than chunk size. 302 * 303 * Defaults to zero (no limit). 304 */ 305 size_t maxEventSize() @property const { 306 return maxEventSize_; 307 } 308 309 /// ditto 310 void maxEventSize(size_t value) @property { 311 enforce(value <= chunkSize_ - EventSize.sizeof, "Events cannot span " ~ 312 "mutiple chunks, maxEventSize must be smaller than chunk size."); 313 maxEventSize_ = value; 314 } 315 316 /// ditto 317 enum DEFAULT_MAX_EVENT_SIZE = 0; 318 319 /** 320 * The interval at which the thread wakes up to check for the next chunk 321 * in tailing mode. 322 * 323 * Defaults to one second. 324 */ 325 Duration corruptedEventSleepDuration() const { 326 return corruptedEventSleepDuration_; 327 } 328 329 /// ditto 330 void corruptedEventSleepDuration(Duration value) { 331 corruptedEventSleepDuration_ = value; 332 } 333 334 /// ditto 335 enum DEFAULT_CORRUPTED_EVENT_SLEEP_DURATION = dur!"seconds"(1); 336 337 /** 338 * The maximum number of corrupted events tolerated before the whole chunk 339 * is skipped. 340 * 341 * Defaults to zero. 342 */ 343 uint maxCorruptedEvents() @property const { 344 return maxCorruptedEvents_; 345 } 346 347 /// ditto 348 void maxCorruptedEvents(uint value) @property { 349 maxCorruptedEvents_ = value; 350 } 351 352 /// ditto 353 enum DEFAULT_MAX_CORRUPTED_EVENTS = 0; 354 355 private: 356 ubyte[] readEvent() { 357 if (!readBuffer_) { 358 readBuffer_ = new ubyte[readBufferSize_]; 359 } 360 361 bool timeoutExpired; 362 while (1) { 363 // read from the file if read buffer is exhausted 364 if (readState_.bufferPos_ == readState_.bufferLen_) { 365 // advance the offset pointer 366 offset_ += readState_.bufferLen_; 367 368 try { 369 // Need to clear eof flag before reading, otherwise tailing a file 370 // does not work. 371 file_.clearerr(); 372 373 auto usedBuf = file_.rawRead(readBuffer_); 374 readState_.bufferLen_ = usedBuf.length; 375 } catch (Exception e) { 376 readState_.resetAllValues(); 377 throw new TTransportException("Error while reading from file", 378 __FILE__, __LINE__, e); 379 } 380 381 readState_.bufferPos_ = 0; 382 readState_.lastDispatchPos_ = 0; 383 384 if (readState_.bufferLen_ == 0) { 385 // Reached end of file. 386 if (readTimeout_ < dur!"hnsecs"(0)) { 387 // Tailing mode, sleep for the specified duration and try again. 388 Thread.sleep(-readTimeout_); 389 continue; 390 } else if (readTimeout_ == dur!"hnsecs"(0) || timeoutExpired) { 391 // Either no timeout set, or it has already expired. 392 readState_.resetState(0); 393 return null; 394 } else { 395 // Timeout mode, sleep for the specified amount of time and retry. 396 Thread.sleep(readTimeout_); 397 timeoutExpired = true; 398 continue; 399 } 400 } 401 } 402 403 // Attempt to read an event from the buffer. 404 while (readState_.bufferPos_ < readState_.bufferLen_) { 405 if (readState_.readingSize_) { 406 if (readState_.eventSizeBuffPos_ == 0) { 407 if ((offset_ + readState_.bufferPos_)/chunkSize_ != 408 ((offset_ + readState_.bufferPos_ + 3)/chunkSize_)) 409 { 410 readState_.bufferPos_++; 411 continue; 412 } 413 } 414 415 readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] = 416 readBuffer_[readState_.bufferPos_++]; 417 418 if (readState_.eventSizeBuffPos_ == 4) { 419 auto size = (cast(uint[])readState_.eventSizeBuff_)[0]; 420 421 if (size == 0) { 422 // This is part of the zero padding between chunks. 423 readState_.resetState(readState_.lastDispatchPos_); 424 continue; 425 } 426 427 // got a valid event 428 readState_.readingSize_ = false; 429 readState_.eventLen_ = size; 430 readState_.eventPos_ = 0; 431 432 // check if the event is corrupted and perform recovery if required 433 if (isEventCorrupted()) { 434 performRecovery(); 435 // start from the top 436 break; 437 } 438 } 439 } else { 440 if (!readState_.event_) { 441 readState_.event_ = new ubyte[readState_.eventLen_]; 442 } 443 444 // take either the entire event or the remaining bytes in the buffer 445 auto reclaimBuffer = min(readState_.bufferLen_ - readState_.bufferPos_, 446 readState_.eventLen_ - readState_.eventPos_); 447 448 // copy data from read buffer into event buffer 449 readState_.event_[ 450 readState_.eventPos_ .. readState_.eventPos_ + reclaimBuffer 451 ] = readBuffer_[ 452 readState_.bufferPos_ .. readState_.bufferPos_ + reclaimBuffer 453 ]; 454 455 // increment position ptrs 456 readState_.eventPos_ += reclaimBuffer; 457 readState_.bufferPos_ += reclaimBuffer; 458 459 // check if the event has been read in full 460 if (readState_.eventPos_ == readState_.eventLen_) { 461 // Reset the read state and return the completed event. 462 auto completeEvent = readState_.event_; 463 readState_.event_ = null; 464 readState_.resetState(readState_.bufferPos_); 465 return completeEvent; 466 } 467 } 468 } 469 } 470 } 471 472 bool isEventCorrupted() { 473 if ((maxEventSize_ > 0) && (readState_.eventLen_ > maxEventSize_)) { 474 // Event size is larger than user-speficied max-event size 475 logError("Corrupt event read: Event size (%s) greater than max " ~ 476 "event size (%s)", readState_.eventLen_, maxEventSize_); 477 return true; 478 } else if (readState_.eventLen_ > chunkSize_) { 479 // Event size is larger than chunk size 480 logError("Corrupt event read: Event size (%s) greater than chunk " ~ 481 "size (%s)", readState_.eventLen_, chunkSize_); 482 return true; 483 } else if (((offset_ + readState_.bufferPos_ - EventSize.sizeof) / chunkSize_) != 484 ((offset_ + readState_.bufferPos_ + readState_.eventLen_ - EventSize.sizeof) / chunkSize_)) 485 { 486 // Size indicates that event crosses chunk boundary 487 logError("Read corrupt event. Event crosses chunk boundary. " ~ 488 "Event size: %s. Offset: %s", readState_.eventLen_, 489 (offset_ + readState_.bufferPos_ + EventSize.sizeof) 490 ); 491 492 return true; 493 } 494 495 return false; 496 } 497 498 void performRecovery() { 499 // perform some kickass recovery 500 auto curChunk = getCurChunk(); 501 if (lastBadChunk_ == curChunk) { 502 numCorruptedEventsInChunk_++; 503 } else { 504 lastBadChunk_ = curChunk; 505 numCorruptedEventsInChunk_ = 1; 506 } 507 508 if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) { 509 // maybe there was an error in reading the file from disk 510 // seek to the beginning of chunk and try again 511 seekToChunk(curChunk); 512 } else { 513 // Just skip ahead to the next chunk if we not already at the last chunk. 514 if (curChunk != (getNumChunks() - 1)) { 515 seekToChunk(curChunk + 1); 516 } else if (readTimeout_ < dur!"hnsecs"(0)) { 517 // We are in tailing mode, wait until there is enough data to start 518 // the next chunk. 519 while(curChunk == (getNumChunks() - 1)) { 520 Thread.sleep(corruptedEventSleepDuration_); 521 } 522 seekToChunk(curChunk + 1); 523 } else { 524 // Pretty hosed at this stage, rewind the file back to the last 525 // successful point and punt on the error. 526 readState_.resetState(readState_.lastDispatchPos_); 527 currentEvent_ = null; 528 currentEventPos_ = 0; 529 530 throw new TTransportException("File corrupted at offset: " ~ 531 to!string(offset_ + readState_.lastDispatchPos_), 532 TTransportException.Type.CORRUPTED_DATA); 533 } 534 } 535 } 536 537 string path_; 538 File file_; 539 bool isOpen_; 540 long offset_; 541 ubyte[] currentEvent_; 542 size_t currentEventPos_; 543 ulong chunkSize_; 544 Duration readTimeout_; 545 size_t maxEventSize_; 546 547 // Read buffer – lazily allocated on the first read(). 548 ubyte[] readBuffer_; 549 size_t readBufferSize_; 550 551 static struct ReadState { 552 ubyte[] event_; 553 size_t eventLen_; 554 size_t eventPos_; 555 556 // keep track of event size 557 ubyte[4] eventSizeBuff_; 558 ubyte eventSizeBuffPos_; 559 bool readingSize_ = true; 560 561 // read buffer variables 562 size_t bufferPos_; 563 size_t bufferLen_; 564 565 // last successful dispatch point 566 size_t lastDispatchPos_; 567 568 void resetState(size_t lastDispatchPos) { 569 readingSize_ = true; 570 eventSizeBuffPos_ = 0; 571 lastDispatchPos_ = lastDispatchPos; 572 } 573 574 void resetAllValues() { 575 resetState(0); 576 bufferPos_ = 0; 577 bufferLen_ = 0; 578 event_ = null; 579 } 580 } 581 ReadState readState_; 582 583 ulong lastBadChunk_; 584 uint maxCorruptedEvents_; 585 uint numCorruptedEventsInChunk_; 586 Duration corruptedEventSleepDuration_; 587 } 588 589 /** 590 * A transport used to write log files. It can never be read from, calling 591 * read() throws. 592 * 593 * Contrary to the C++ design, explicitly opening the transport/file before 594 * using is necessary to allow manually closing the file without relying on the 595 * object lifetime. 596 */ 597 final class TFileWriterTransport : TBaseTransport { 598 /** 599 * Creates a new file writer transport. 600 * 601 * Params: 602 * path = Path of the file to opperate on. 603 */ 604 this(string path) { 605 path_ = path; 606 607 chunkSize_ = DEFAULT_CHUNK_SIZE; 608 eventBufferSize_ = DEFAULT_EVENT_BUFFER_SIZE; 609 ioErrorSleepDuration = DEFAULT_IO_ERROR_SLEEP_DURATION; 610 maxFlushBytes_ = DEFAULT_MAX_FLUSH_BYTES; 611 maxFlushInterval_ = DEFAULT_MAX_FLUSH_INTERVAL; 612 } 613 614 override bool isOpen() @property { 615 return isOpen_; 616 } 617 618 /** 619 * A file writer transport can never be read from. 620 */ 621 override bool peek() { 622 return false; 623 } 624 625 override void open() { 626 if (isOpen) return; 627 628 writerThread_ = spawn( 629 &writerThread, 630 path_, 631 chunkSize_, 632 maxFlushBytes_, 633 maxFlushInterval_, 634 ioErrorSleepDuration_ 635 ); 636 setMaxMailboxSize(writerThread_, eventBufferSize_, OnCrowding.block); 637 isOpen_ = true; 638 } 639 640 /** 641 * Closes the transport, i.e. the underlying file and the writer thread. 642 */ 643 override void close() { 644 if (!isOpen) return; 645 646 prioritySend(writerThread_, ShutdownMessage(), thisTid); // FIXME: Should use normal send here. 647 receive((ShutdownMessage msg, Tid tid){}); 648 isOpen_ = false; 649 } 650 651 /** 652 * Enqueues the passed slice of data for writing and immediately returns. 653 * write() only blocks if the event buffer has been exhausted. 654 * 655 * The transport must be open when calling this. 656 * 657 * Params: 658 * buf = Slice of data to write. 659 */ 660 override void write(in ubyte[] buf) { 661 enforce(isOpen, new TTransportException( 662 "Cannot write to non-open file.", TTransportException.Type.NOT_OPEN)); 663 664 if (buf.empty) { 665 logError("Cannot write empty event, skipping."); 666 return; 667 } 668 669 auto maxSize = chunkSize - EventSize.sizeof; 670 enforce(buf.length <= maxSize, new TTransportException( 671 "Cannot write more than " ~ to!string(maxSize) ~ 672 "bytes at once due to chunk size.")); 673 674 send(writerThread_, buf.idup); 675 } 676 677 /** 678 * Flushes any pending data to be written. 679 * 680 * The transport must be open when calling this. 681 * 682 * Throws: TTransportException if an error occurs. 683 */ 684 override void flush() { 685 enforce(isOpen, new TTransportException( 686 "Cannot flush file if not open.", TTransportException.Type.NOT_OPEN)); 687 688 send(writerThread_, FlushMessage(), thisTid); 689 receive((FlushMessage msg, Tid tid){}); 690 } 691 692 /** 693 * The size of the chunks the file is divided into, in bytes. 694 * 695 * A single event (write call) never spans multiple chunks – this 696 * effectively limits the event size to chunkSize - EventSize.sizeof. 697 */ 698 ulong chunkSize() @property { 699 return chunkSize_; 700 } 701 702 /// ditto 703 void chunkSize(ulong value) @property { 704 enforce(!isOpen, new TTransportException( 705 "Cannot set chunk size after TFileWriterTransport has been opened.")); 706 chunkSize_ = value; 707 } 708 709 /** 710 * The maximum number of write() calls buffered, or zero for no limit. 711 * 712 * If the buffer is exhausted, write() will block until space becomes 713 * available. 714 */ 715 size_t eventBufferSize() @property { 716 return eventBufferSize_; 717 } 718 719 /// ditto 720 void eventBufferSize(size_t value) @property { 721 eventBufferSize_ = value; 722 if (isOpen) { 723 setMaxMailboxSize(writerThread_, value, OnCrowding.throwException); 724 } 725 } 726 727 /// ditto 728 enum DEFAULT_EVENT_BUFFER_SIZE = 10_000; 729 730 /** 731 * Maximum number of bytes buffered before writing and flushing the file 732 * to disk. 733 * 734 * Currently cannot be set after the first call to write(). 735 */ 736 size_t maxFlushBytes() @property { 737 return maxFlushBytes_; 738 } 739 740 /// ditto 741 void maxFlushBytes(size_t value) @property { 742 maxFlushBytes_ = value; 743 if (isOpen) { 744 send(writerThread_, FlushBytesMessage(value)); 745 } 746 } 747 748 /// ditto 749 enum DEFAULT_MAX_FLUSH_BYTES = 1000 * 1024; 750 751 /** 752 * Maximum interval between flushing the file to disk. 753 * 754 * Currenlty cannot be set after the first call to write(). 755 */ 756 Duration maxFlushInterval() @property { 757 return maxFlushInterval_; 758 } 759 760 /// ditto 761 void maxFlushInterval(Duration value) @property { 762 maxFlushInterval_ = value; 763 if (isOpen) { 764 send(writerThread_, FlushIntervalMessage(value)); 765 } 766 } 767 768 /// ditto 769 enum DEFAULT_MAX_FLUSH_INTERVAL = dur!"seconds"(3); 770 771 /** 772 * When the writer thread encounteres an I/O error, it goes pauses for a 773 * short time before trying to reopen the output file. This controls the 774 * sleep duration. 775 */ 776 Duration ioErrorSleepDuration() @property { 777 return ioErrorSleepDuration_; 778 } 779 780 /// ditto 781 void ioErrorSleepDuration(Duration value) @property { 782 ioErrorSleepDuration_ = value; 783 if (isOpen) { 784 send(writerThread_, FlushIntervalMessage(value)); 785 } 786 } 787 788 /// ditto 789 enum DEFAULT_IO_ERROR_SLEEP_DURATION = dur!"msecs"(500); 790 791 private: 792 string path_; 793 ulong chunkSize_; 794 size_t eventBufferSize_; 795 Duration ioErrorSleepDuration_; 796 size_t maxFlushBytes_; 797 Duration maxFlushInterval_; 798 bool isOpen_; 799 Tid writerThread_; 800 } 801 802 private { 803 // Signals that the file should be flushed on disk. Sent to the writer 804 // thread and sent back along with the tid for confirmation. 805 struct FlushMessage {} 806 807 // Signals that the writer thread should close the file and shut down. Sent 808 // to the writer thread and sent back along with the tid for confirmation. 809 struct ShutdownMessage {} 810 811 struct FlushBytesMessage { 812 size_t value; 813 } 814 815 struct FlushIntervalMessage { 816 Duration value; 817 } 818 819 struct IoErrorSleepDurationMessage { 820 Duration value; 821 } 822 823 void writerThread( 824 string path, 825 ulong chunkSize, 826 size_t maxFlushBytes, 827 Duration maxFlushInterval, 828 Duration ioErrorSleepDuration 829 ) { 830 bool errorOpening; 831 File file; 832 ulong offset; 833 try { 834 // Open file in appending and binary mode. 835 file = File(path, "ab"); 836 offset = file.tell(); 837 } catch (Exception e) { 838 logError("Error on opening output file in writer thread: %s", e); 839 errorOpening = true; 840 } 841 842 auto flushTimer = StopWatch(AutoStart.yes); 843 size_t unflushedByteCount; 844 845 Tid shutdownRequestTid; 846 bool shutdownRequested; 847 while (true) { 848 if (shutdownRequested) break; 849 850 bool forceFlush; 851 Tid flushRequestTid; 852 receiveTimeout(max(dur!"hnsecs"(0), maxFlushInterval - flushTimer.peek()), 853 (immutable(ubyte)[] data) { 854 while (errorOpening) { 855 logError("Writer thread going to sleep for %s µs due to IO errors", 856 ioErrorSleepDuration.total!"usecs"); 857 858 // Sleep for ioErrorSleepDuration, being ready to be interrupted 859 // by shutdown requests. 860 auto timedOut = receiveTimeout(ioErrorSleepDuration, 861 (ShutdownMessage msg, Tid tid){ shutdownRequestTid = tid; }); 862 if (!timedOut) { 863 // We got a shutdown request, just drop all events and exit the 864 // main loop as to not block application shutdown with our tries 865 // which we must assume to fail. 866 break; 867 } 868 869 try { 870 file = File(path, "ab"); 871 unflushedByteCount = 0; 872 errorOpening = false; 873 logError("Output file %s reopened during writer thread error " ~ 874 "recovery", path); 875 } catch (Exception e) { 876 logError("Unable to reopen output file %s during writer " ~ 877 "thread error recovery", path); 878 } 879 } 880 881 // Make sure the event does not cross the chunk boundary by writing 882 // a padding consisting of zeroes if it would. 883 auto chunk1 = offset / chunkSize; 884 auto chunk2 = (offset + EventSize.sizeof + data.length - 1) / chunkSize; 885 886 if (chunk1 != chunk2) { 887 // TODO: The C++ implementation refetches the offset here to »keep 888 // in sync« – why would this be needed? 889 auto padding = cast(size_t) 890 ((((offset / chunkSize) + 1) * chunkSize) - offset); 891 auto zeroes = new ubyte[padding]; 892 file.rawWrite(zeroes); 893 unflushedByteCount += padding; 894 offset += padding; 895 } 896 897 // TODO: 2 syscalls here, is this a problem performance-wise? 898 // Probably abysmal performance on Windows due to rawWrite 899 // implementation. 900 uint len = cast(uint)data.length; 901 file.rawWrite(cast(ubyte[])(&len)[0..1]); 902 file.rawWrite(data); 903 904 auto bytesWritten = EventSize.sizeof + data.length; 905 unflushedByteCount += bytesWritten; 906 offset += bytesWritten; 907 }, (FlushBytesMessage msg) { 908 maxFlushBytes = msg.value; 909 }, (FlushIntervalMessage msg) { 910 maxFlushInterval = msg.value; 911 }, (IoErrorSleepDurationMessage msg) { 912 ioErrorSleepDuration = msg.value; 913 }, (FlushMessage msg, Tid tid) { 914 forceFlush = true; 915 flushRequestTid = tid; 916 }, (OwnerTerminated msg) { 917 shutdownRequested = true; 918 }, (ShutdownMessage msg, Tid tid) { 919 shutdownRequested = true; 920 shutdownRequestTid = tid; 921 } 922 ); 923 924 if (errorOpening) continue; 925 926 bool flush; 927 if (forceFlush || shutdownRequested || unflushedByteCount > maxFlushBytes) { 928 flush = true; 929 } else if (cast(Duration)flushTimer.peek() > maxFlushInterval) { 930 if (unflushedByteCount == 0) { 931 // If the flush timer is due, but no data has been written, don't 932 // needlessly fsync, but do reset the timer. 933 flushTimer.reset(); 934 } else { 935 flush = true; 936 } 937 } 938 939 if (flush) { 940 file.flush(); 941 flushTimer.reset(); 942 unflushedByteCount = 0; 943 if (forceFlush) send(flushRequestTid, FlushMessage(), thisTid); 944 } 945 } 946 947 file.close(); 948 949 if (shutdownRequestTid != Tid.init) { 950 send(shutdownRequestTid, ShutdownMessage(), thisTid); 951 } 952 } 953 } 954 955 version (unittest) { 956 import core.memory : GC; 957 import std.file; 958 } 959 960 unittest { 961 void tryRemove(string fileName) { 962 try { 963 remove(fileName); 964 } catch (Exception) {} 965 } 966 967 immutable fileName = "unittest.dat.tmp"; 968 enforce(!exists(fileName), "Unit test output file " ~ fileName ~ 969 " already exists."); 970 971 /* 972 * Check the most basic reading/writing operations. 973 */ 974 { 975 scope (exit) tryRemove(fileName); 976 977 auto writer = new TFileWriterTransport(fileName); 978 writer.open(); 979 scope (exit) writer.close(); 980 981 writer.write([1, 2]); 982 writer.write([3, 4]); 983 writer.write([5, 6, 7]); 984 writer.flush(); 985 986 auto reader = new TFileReaderTransport(fileName); 987 reader.open(); 988 scope (exit) reader.close(); 989 990 auto buf = new ubyte[7]; 991 reader.readAll(buf); 992 enforce(buf == [1, 2, 3, 4, 5, 6, 7]); 993 } 994 995 /* 996 * Check that chunking works as expected. 997 */ 998 { 999 scope (exit) tryRemove(fileName); 1000 1001 static assert(EventSize.sizeof == 4); 1002 enum CHUNK_SIZE = 10; 1003 1004 // Write some contents to the file. 1005 { 1006 auto writer = new TFileWriterTransport(fileName); 1007 writer.chunkSize = CHUNK_SIZE; 1008 writer.open(); 1009 scope (exit) writer.close(); 1010 1011 writer.write([0xde]); 1012 writer.write([0xad]); 1013 // Chunk boundary here. 1014 writer.write([0xbe]); 1015 // The next write doesn't fit in the five bytes remaining, so we expect 1016 // padding zero bytes to be written. 1017 writer.write([0xef, 0x12]); 1018 1019 try { 1020 writer.write(new ubyte[CHUNK_SIZE]); 1021 enforce(false, "Could write event not fitting in a single chunk."); 1022 } catch (TTransportException e) {} 1023 1024 writer.flush(); 1025 } 1026 1027 // Check the raw contents of the file to see if chunk padding was written 1028 // as expected. 1029 auto file = File(fileName, "r"); 1030 enforce(file.size == 26); 1031 auto written = new ubyte[26]; 1032 file.rawRead(written); 1033 enforce(written == [ 1034 1, 0, 0, 0, 0xde, 1035 1, 0, 0, 0, 0xad, 1036 1, 0, 0, 0, 0xbe, 1037 0, 0, 0, 0, 0, 1038 2, 0, 0, 0, 0xef, 0x12 1039 ]); 1040 1041 // Read the data back in, getting all the events at once. 1042 { 1043 auto reader = new TFileReaderTransport(fileName); 1044 reader.chunkSize = CHUNK_SIZE; 1045 reader.open(); 1046 scope (exit) reader.close(); 1047 1048 auto buf = new ubyte[5]; 1049 reader.readAll(buf); 1050 enforce(buf == [0xde, 0xad, 0xbe, 0xef, 0x12]); 1051 } 1052 } 1053 1054 /* 1055 * Make sure that close() exits "quickly", i.e. that there is no problem 1056 * with the worker thread waking up. 1057 */ 1058 { 1059 import std.conv : text; 1060 enum NUM_ITERATIONS = 1000; 1061 1062 uint numOver = 0; 1063 foreach (n; 0 .. NUM_ITERATIONS) { 1064 scope (exit) tryRemove(fileName); 1065 1066 auto transport = new TFileWriterTransport(fileName); 1067 transport.open(); 1068 1069 // Write something so that the writer thread gets started. 1070 transport.write(cast(ubyte[])"foo"); 1071 1072 // Every other iteration, also call flush(), just in case that potentially 1073 // has any effect on how the writer thread wakes up. 1074 if (n & 0x1) { 1075 transport.flush(); 1076 } 1077 1078 // Time the call to close(). 1079 auto sw = StopWatch(AutoStart.yes); 1080 transport.close(); 1081 sw.stop(); 1082 1083 // If any attempt takes more than 500ms, treat that as a fatal failure to 1084 // avoid looping over a potentially very slow operation. 1085 1086 static if (__VERSION__ >= 2079) 1087 { 1088 enforce(sw.peek().total!"msecs" < 1500, 1089 text("close() took ", sw.peek().total!"msecs", "ms.")); 1090 // Normally, it takes less than 5ms on my dev box. 1091 // However, if the box is heavily loaded, some of the test runs can take 1092 // longer. Additionally, on a Windows Server 2008 instance running in 1093 // a VirtualBox VM, it has been observed that about a quarter of the runs 1094 // takes (217 ± 1) ms, for reasons not yet known. 1095 if (sw.peek().total!"msecs" > 50) { 1096 ++numOver; 1097 } 1098 } 1099 else 1100 { 1101 enforce(sw.peek().msecs < 1500, 1102 text("close() took ", sw.peek().msecs, "ms.")); 1103 // Normally, it takes less than 5ms on my dev box. 1104 // However, if the box is heavily loaded, some of the test runs can take 1105 // longer. Additionally, on a Windows Server 2008 instance running in 1106 // a VirtualBox VM, it has been observed that about a quarter of the runs 1107 // takes (217 ± 1) ms, for reasons not yet known. 1108 if (sw.peek().msecs > 50) { 1109 ++numOver; 1110 } 1111 } 1112 1113 1114 // Force garbage collection runs every now and then to make sure we 1115 // don't run out of OS thread handles. 1116 if (!(n % 100)) GC.collect(); 1117 } 1118 1119 // Make sure fewer than a third of the runs took longer than 5ms. 1120 enforce(numOver < NUM_ITERATIONS / 3, 1121 text(numOver, " iterations took more than 10 ms.")); 1122 } 1123 }