1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 /**
21  * Transports for reading from/writing to Thrift »log files«.
22  *
23  * These transports are not »stupid« sources and sinks just reading and
24  * writing bytes from a file verbatim, but organize the contents in the form
25  * of so-called »events«, which refers to the data written between two flush()
26  * calls.
27  *
28  * Chunking is supported, events are guaranteed to never span chunk boundaries.
29  * As a consequence, an event can never be larger than the chunk size. The
30  * chunk size used is not saved with the file, so care has to be taken to make
31  * sure the same chunk size is used for reading and writing.
32  */
33 module thrift.transport.file;
34 
35 import core.thread : Thread;
36 import std.array : empty;
37 import std.algorithm : min, max;
38 import std.concurrency;
39 import std.conv : to;
40 import std.datetime : dur, Duration,msecs;
41 
42 static if (__VERSION__ >= 2079)
43   import std.datetime.stopwatch : AutoStart, StopWatch;
44 else
45   import std.datetime: AutoStart, StopWatch;
46 
47 import std.exception;
48 import std.stdio : File;
49 import thrift.base;
50 import thrift.transport.base;
51 
52 /// The default chunk size, in bytes.
53 enum DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
54 
55 /// The type used to represent event sizes in the file.
56 alias uint EventSize;
57 
58 version (BigEndian) {
59   static assert(false,
60     "Little endian byte order is assumed in thrift.transport.file.");
61 }
62 
63 /**
64  * A transport used to read log files. It can never be written to, calling
65  * write() throws.
66  *
67  * Contrary to the C++ design, explicitly opening the transport/file before
68  * using is necessary to allow manually closing the file without relying on the
69  * object lifetime. Otherwise, it's a straight port of the C++ implementation.
70  */
71 final class TFileReaderTransport : TBaseTransport {
72   /**
73    * Creates a new file writer transport.
74    *
75    * Params:
76    *   path = Path of the file to opperate on.
77    */
78   this(string path) {
79     path_ = path;
80     chunkSize_ = DEFAULT_CHUNK_SIZE;
81     readBufferSize_ = DEFAULT_READ_BUFFER_SIZE;
82     readTimeout_ = DEFAULT_READ_TIMEOUT;
83     corruptedEventSleepDuration_ = DEFAULT_CORRUPTED_EVENT_SLEEP_DURATION;
84     maxEventSize = DEFAULT_MAX_EVENT_SIZE;
85   }
86 
87   override bool isOpen() @property {
88     return isOpen_;
89   }
90 
91   override bool peek() {
92     if (!isOpen) return false;
93 
94     // If there is no event currently processed, try fetching one from the
95     // file.
96     if (!currentEvent_) {
97       currentEvent_ = readEvent();
98 
99       if (!currentEvent_) {
100         // Still nothing there, couldn't read a new event.
101         return false;
102       }
103     }
104     // check if there is anything to read
105     return (currentEvent_.length - currentEventPos_) > 0;
106   }
107 
108   override void open() {
109     if (isOpen) return;
110     try {
111       file_ = File(path_, "rb");
112     } catch (Exception e) {
113       throw new TTransportException("Error on opening input file.",
114         TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e);
115     }
116     isOpen_ = true;
117   }
118 
119   override void close() {
120     if (!isOpen) return;
121 
122     file_.close();
123     isOpen_ = false;
124     readState_.resetAllValues();
125   }
126 
127   override size_t read(ubyte[] buf) {
128     enforce(isOpen, new TTransportException(
129       "Cannot read if file is not open.", TTransportException.Type.NOT_OPEN));
130 
131     // If there is no event currently processed, try fetching one from the
132     // file.
133     if (!currentEvent_) {
134       currentEvent_ = readEvent();
135 
136       if (!currentEvent_) {
137         // Still nothing there, couldn't read a new event.
138         return 0;
139       }
140     }
141 
142     auto len = buf.length;
143     auto remaining = currentEvent_.length - currentEventPos_;
144 
145     if (remaining <= len) {
146       // If less than the requested length is available, read as much as
147       // possible.
148       buf[0 .. remaining] = currentEvent_[currentEventPos_ .. $];
149       currentEvent_ = null;
150       currentEventPos_ = 0;
151       return remaining;
152     }
153 
154     // There will still be data left in the buffer after reading, pass out len
155     // bytes.
156     buf[] = currentEvent_[currentEventPos_ .. currentEventPos_ + len];
157     currentEventPos_ += len;
158     return len;
159   }
160 
161   ulong getNumChunks() {
162     enforce(isOpen, new TTransportException(
163       "Cannot get number of chunks if file not open.",
164       TTransportException.Type.NOT_OPEN));
165 
166     try {
167       auto fileSize = file_.size();
168       if (fileSize == 0) {
169         // Empty files have no chunks.
170         return 0;
171       }
172       return ((fileSize)/chunkSize_) + 1;
173     } catch (Exception e) {
174       throw new TTransportException("Error getting file size.", __FILE__,
175         __LINE__, e);
176     }
177   }
178 
179   ulong getCurChunk() {
180     return offset_ / chunkSize_;
181   }
182 
183   void seekToChunk(long chunk) {
184     enforce(isOpen, new TTransportException(
185       "Cannot get number of chunks if file not open.",
186       TTransportException.Type.NOT_OPEN));
187 
188     auto numChunks = getNumChunks();
189 
190     if (chunk < 0) {
191       // Count negative indices from the end.
192       chunk += numChunks;
193     }
194 
195     if (chunk < 0) {
196       logError("Incorrect chunk number for reverse seek, seeking to " ~
197        "beginning instead: %s", chunk);
198       chunk = 0;
199     }
200 
201     bool seekToEnd;
202     long minEndOffset;
203     if (chunk >= numChunks) {
204       logError("Trying to seek to non-existing chunk, seeking to " ~
205        "end of file instead: %s", chunk);
206       seekToEnd = true;
207       chunk = numChunks - 1;
208       // this is the min offset to process events till
209       minEndOffset = file_.size();
210     }
211 
212     readState_.resetAllValues();
213     currentEvent_ = null;
214 
215     try {
216       file_.seek(chunk * chunkSize_);
217       offset_ = chunk * chunkSize_;
218     } catch (Exception e) {
219       throw new TTransportException("Error seeking to chunk", __FILE__,
220         __LINE__, e);
221     }
222 
223     if (seekToEnd) {
224       // Never wait on the end of the file for new content, we just want to
225       // find the last one.
226       auto oldReadTimeout = readTimeout_;
227       scope (exit) readTimeout_ = oldReadTimeout;
228       readTimeout_ = dur!"hnsecs"(0);
229 
230       // Keep on reading unti the last event at point of seekToChunk call.
231       while ((offset_ + readState_.bufferPos_) < minEndOffset) {
232         if (readEvent() is null) {
233           break;
234         }
235       }
236     }
237   }
238 
239   void seekToEnd() {
240     seekToChunk(getNumChunks());
241   }
242 
243   /**
244    * The size of the chunks the file is divided into, in bytes.
245    */
246   ulong chunkSize() @property const {
247     return chunkSize_;
248   }
249 
250   /// ditto
251   void chunkSize(ulong value) @property {
252     enforce(!isOpen, new TTransportException(
253       "Cannot set chunk size after TFileReaderTransport has been opened."));
254     enforce(value > EventSize.sizeof, new TTransportException("Chunks must " ~
255       "be large enough to accommodate at least a single byte of payload data."));
256     chunkSize_ = value;
257   }
258 
259   /**
260    * If positive, wait the specified duration for new data when arriving at
261    * end of file. If negative, wait forever (tailing mode), waking up to check
262    * in the specified interval. If zero, do not wait at all.
263    *
264    * Defaults to 500 ms.
265    */
266   Duration readTimeout() @property const {
267     return readTimeout_;
268   }
269 
270   /// ditto
271   void readTimeout(Duration value) @property {
272     readTimeout_ = value;
273   }
274 
275   /// ditto
276   enum DEFAULT_READ_TIMEOUT = dur!"msecs"(500);
277 
278   /**
279    * Read buffer size, in bytes.
280    *
281    * Defaults to 1 MiB.
282    */
283   size_t readBufferSize() @property const {
284     return readBufferSize_;
285   }
286 
287   /// ditto
288   void readBufferSize(size_t value) @property {
289     if (readBuffer_) {
290       enforce(value <= readBufferSize_,
291         "Cannot shrink read buffer after first read.");
292       readBuffer_.length = value;
293     }
294     readBufferSize_ = value;
295   }
296 
297   /// ditto
298   enum DEFAULT_READ_BUFFER_SIZE = 1 * 1024 * 1024;
299 
300   /**
301    * Arbitrary event size limit, in bytes. Must be smaller than chunk size.
302    *
303    * Defaults to zero (no limit).
304    */
305   size_t maxEventSize() @property const {
306     return maxEventSize_;
307   }
308 
309   /// ditto
310   void maxEventSize(size_t value) @property {
311     enforce(value <= chunkSize_ - EventSize.sizeof, "Events cannot span " ~
312       "mutiple chunks, maxEventSize must be smaller than chunk size.");
313     maxEventSize_ = value;
314   }
315 
316   /// ditto
317   enum DEFAULT_MAX_EVENT_SIZE = 0;
318 
319   /**
320    * The interval at which the thread wakes up to check for the next chunk
321    * in tailing mode.
322    *
323    * Defaults to one second.
324    */
325   Duration corruptedEventSleepDuration() const {
326     return corruptedEventSleepDuration_;
327   }
328 
329   /// ditto
330   void corruptedEventSleepDuration(Duration value) {
331     corruptedEventSleepDuration_ = value;
332   }
333 
334   /// ditto
335   enum DEFAULT_CORRUPTED_EVENT_SLEEP_DURATION = dur!"seconds"(1);
336 
337   /**
338    * The maximum number of corrupted events tolerated before the whole chunk
339    * is skipped.
340    *
341    * Defaults to zero.
342    */
343   uint maxCorruptedEvents() @property const {
344     return maxCorruptedEvents_;
345   }
346 
347   /// ditto
348   void maxCorruptedEvents(uint value) @property {
349     maxCorruptedEvents_ = value;
350   }
351 
352   /// ditto
353   enum DEFAULT_MAX_CORRUPTED_EVENTS = 0;
354 
355 private:
356   ubyte[] readEvent() {
357     if (!readBuffer_) {
358       readBuffer_ = new ubyte[readBufferSize_];
359     }
360 
361     bool timeoutExpired;
362     while (1) {
363       // read from the file if read buffer is exhausted
364       if (readState_.bufferPos_ == readState_.bufferLen_) {
365         // advance the offset pointer
366         offset_ += readState_.bufferLen_;
367 
368         try {
369           // Need to clear eof flag before reading, otherwise tailing a file
370           // does not work.
371           file_.clearerr();
372 
373           auto usedBuf = file_.rawRead(readBuffer_);
374           readState_.bufferLen_ = usedBuf.length;
375         } catch (Exception e) {
376           readState_.resetAllValues();
377           throw new TTransportException("Error while reading from file",
378             __FILE__, __LINE__, e);
379         }
380 
381         readState_.bufferPos_ = 0;
382         readState_.lastDispatchPos_ = 0;
383 
384         if (readState_.bufferLen_ == 0) {
385           // Reached end of file.
386           if (readTimeout_ < dur!"hnsecs"(0)) {
387             // Tailing mode, sleep for the specified duration and try again.
388             Thread.sleep(-readTimeout_);
389             continue;
390           } else if (readTimeout_ == dur!"hnsecs"(0) || timeoutExpired) {
391             // Either no timeout set, or it has already expired.
392             readState_.resetState(0);
393             return null;
394           } else {
395             // Timeout mode, sleep for the specified amount of time and retry.
396             Thread.sleep(readTimeout_);
397             timeoutExpired = true;
398             continue;
399           }
400         }
401       }
402 
403       // Attempt to read an event from the buffer.
404       while (readState_.bufferPos_ < readState_.bufferLen_) {
405         if (readState_.readingSize_) {
406           if (readState_.eventSizeBuffPos_ == 0) {
407             if ((offset_ + readState_.bufferPos_)/chunkSize_ !=
408               ((offset_ + readState_.bufferPos_ + 3)/chunkSize_))
409             {
410               readState_.bufferPos_++;
411               continue;
412             }
413           }
414 
415           readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] =
416             readBuffer_[readState_.bufferPos_++];
417 
418           if (readState_.eventSizeBuffPos_ == 4) {
419             auto size = (cast(uint[])readState_.eventSizeBuff_)[0];
420 
421             if (size == 0) {
422               // This is part of the zero padding between chunks.
423               readState_.resetState(readState_.lastDispatchPos_);
424               continue;
425             }
426 
427             // got a valid event
428             readState_.readingSize_ = false;
429             readState_.eventLen_ = size;
430             readState_.eventPos_ = 0;
431 
432             // check if the event is corrupted and perform recovery if required
433             if (isEventCorrupted()) {
434               performRecovery();
435               // start from the top
436               break;
437             }
438           }
439         } else {
440           if (!readState_.event_) {
441             readState_.event_ = new ubyte[readState_.eventLen_];
442           }
443 
444           // take either the entire event or the remaining bytes in the buffer
445           auto reclaimBuffer = min(readState_.bufferLen_ - readState_.bufferPos_,
446             readState_.eventLen_ - readState_.eventPos_);
447 
448           // copy data from read buffer into event buffer
449           readState_.event_[
450             readState_.eventPos_ .. readState_.eventPos_ + reclaimBuffer
451           ] = readBuffer_[
452             readState_.bufferPos_ .. readState_.bufferPos_ + reclaimBuffer
453           ];
454 
455           // increment position ptrs
456           readState_.eventPos_ += reclaimBuffer;
457           readState_.bufferPos_ += reclaimBuffer;
458 
459           // check if the event has been read in full
460           if (readState_.eventPos_ == readState_.eventLen_) {
461             // Reset the read state and return the completed event.
462             auto completeEvent = readState_.event_;
463             readState_.event_ = null;
464             readState_.resetState(readState_.bufferPos_);
465             return completeEvent;
466           }
467         }
468       }
469     }
470   }
471 
472   bool isEventCorrupted() {
473     if ((maxEventSize_ > 0) && (readState_.eventLen_ > maxEventSize_)) {
474       // Event size is larger than user-speficied max-event size
475       logError("Corrupt event read: Event size (%s) greater than max " ~
476         "event size (%s)", readState_.eventLen_, maxEventSize_);
477       return true;
478     } else if (readState_.eventLen_ > chunkSize_) {
479       // Event size is larger than chunk size
480       logError("Corrupt event read: Event size (%s) greater than chunk " ~
481         "size (%s)", readState_.eventLen_, chunkSize_);
482       return true;
483     } else if (((offset_ + readState_.bufferPos_ - EventSize.sizeof) / chunkSize_) !=
484       ((offset_ + readState_.bufferPos_ + readState_.eventLen_ - EventSize.sizeof) / chunkSize_))
485     {
486       // Size indicates that event crosses chunk boundary
487       logError("Read corrupt event. Event crosses chunk boundary. " ~
488         "Event size: %s. Offset: %s", readState_.eventLen_,
489         (offset_ + readState_.bufferPos_ + EventSize.sizeof)
490       );
491 
492       return true;
493     }
494 
495     return false;
496   }
497 
498   void performRecovery() {
499     // perform some kickass recovery
500     auto curChunk = getCurChunk();
501     if (lastBadChunk_ == curChunk) {
502       numCorruptedEventsInChunk_++;
503     } else {
504       lastBadChunk_ = curChunk;
505       numCorruptedEventsInChunk_ = 1;
506     }
507 
508     if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) {
509       // maybe there was an error in reading the file from disk
510       // seek to the beginning of chunk and try again
511       seekToChunk(curChunk);
512     } else {
513       // Just skip ahead to the next chunk if we not already at the last chunk.
514       if (curChunk != (getNumChunks() - 1)) {
515         seekToChunk(curChunk + 1);
516       } else if (readTimeout_ < dur!"hnsecs"(0)) {
517         // We are in tailing mode, wait until there is enough data to start
518         // the next chunk.
519         while(curChunk == (getNumChunks() - 1)) {
520           Thread.sleep(corruptedEventSleepDuration_);
521         }
522         seekToChunk(curChunk + 1);
523       } else {
524         // Pretty hosed at this stage, rewind the file back to the last
525         // successful point and punt on the error.
526         readState_.resetState(readState_.lastDispatchPos_);
527         currentEvent_ = null;
528         currentEventPos_ = 0;
529 
530         throw new TTransportException("File corrupted at offset: " ~
531           to!string(offset_ + readState_.lastDispatchPos_),
532           TTransportException.Type.CORRUPTED_DATA);
533       }
534     }
535   }
536 
537   string path_;
538   File file_;
539   bool isOpen_;
540   long offset_;
541   ubyte[] currentEvent_;
542   size_t currentEventPos_;
543   ulong chunkSize_;
544   Duration readTimeout_;
545   size_t maxEventSize_;
546 
547   // Read buffer – lazily allocated on the first read().
548   ubyte[] readBuffer_;
549   size_t readBufferSize_;
550 
551   static struct ReadState {
552     ubyte[] event_;
553     size_t eventLen_;
554     size_t eventPos_;
555 
556     // keep track of event size
557     ubyte[4] eventSizeBuff_;
558     ubyte eventSizeBuffPos_;
559     bool readingSize_ = true;
560 
561     // read buffer variables
562     size_t bufferPos_;
563     size_t bufferLen_;
564 
565     // last successful dispatch point
566     size_t lastDispatchPos_;
567 
568     void resetState(size_t lastDispatchPos) {
569       readingSize_ = true;
570       eventSizeBuffPos_ = 0;
571       lastDispatchPos_ = lastDispatchPos;
572     }
573 
574     void resetAllValues() {
575       resetState(0);
576       bufferPos_ = 0;
577       bufferLen_ = 0;
578       event_ = null;
579     }
580   }
581   ReadState readState_;
582 
583   ulong lastBadChunk_;
584   uint maxCorruptedEvents_;
585   uint numCorruptedEventsInChunk_;
586   Duration corruptedEventSleepDuration_;
587 }
588 
589 /**
590  * A transport used to write log files. It can never be read from, calling
591  * read() throws.
592  *
593  * Contrary to the C++ design, explicitly opening the transport/file before
594  * using is necessary to allow manually closing the file without relying on the
595  * object lifetime.
596  */
597 final class TFileWriterTransport : TBaseTransport {
598   /**
599    * Creates a new file writer transport.
600    *
601    * Params:
602    *   path = Path of the file to opperate on.
603    */
604   this(string path) {
605     path_ = path;
606 
607     chunkSize_ = DEFAULT_CHUNK_SIZE;
608     eventBufferSize_ = DEFAULT_EVENT_BUFFER_SIZE;
609     ioErrorSleepDuration = DEFAULT_IO_ERROR_SLEEP_DURATION;
610     maxFlushBytes_ = DEFAULT_MAX_FLUSH_BYTES;
611     maxFlushInterval_ = DEFAULT_MAX_FLUSH_INTERVAL;
612   }
613 
614   override bool isOpen() @property {
615     return isOpen_;
616   }
617 
618   /**
619    * A file writer transport can never be read from.
620    */
621   override bool peek() {
622     return false;
623   }
624 
625   override void open() {
626     if (isOpen) return;
627 
628     writerThread_ = spawn(
629       &writerThread,
630       path_,
631       chunkSize_,
632       maxFlushBytes_,
633       maxFlushInterval_,
634       ioErrorSleepDuration_
635     );
636     setMaxMailboxSize(writerThread_, eventBufferSize_, OnCrowding.block);
637     isOpen_ = true;
638   }
639 
640   /**
641    * Closes the transport, i.e. the underlying file and the writer thread.
642    */
643   override void close() {
644     if (!isOpen) return;
645 
646     prioritySend(writerThread_, ShutdownMessage(), thisTid); // FIXME: Should use normal send here.
647     receive((ShutdownMessage msg, Tid tid){});
648     isOpen_ = false;
649   }
650 
651   /**
652    * Enqueues the passed slice of data for writing and immediately returns.
653    * write() only blocks if the event buffer has been exhausted.
654    *
655    * The transport must be open when calling this.
656    *
657    * Params:
658    *   buf = Slice of data to write.
659    */
660   override void write(in ubyte[] buf) {
661     enforce(isOpen, new TTransportException(
662       "Cannot write to non-open file.", TTransportException.Type.NOT_OPEN));
663 
664     if (buf.empty) {
665       logError("Cannot write empty event, skipping.");
666       return;
667     }
668 
669     auto maxSize = chunkSize - EventSize.sizeof;
670     enforce(buf.length <= maxSize, new TTransportException(
671       "Cannot write more than " ~ to!string(maxSize) ~
672       "bytes at once due to chunk size."));
673 
674     send(writerThread_, buf.idup);
675   }
676 
677   /**
678    * Flushes any pending data to be written.
679    *
680    * The transport must be open when calling this.
681    *
682    * Throws: TTransportException if an error occurs.
683    */
684   override void flush() {
685     enforce(isOpen, new TTransportException(
686       "Cannot flush file if not open.", TTransportException.Type.NOT_OPEN));
687 
688     send(writerThread_, FlushMessage(), thisTid);
689     receive((FlushMessage msg, Tid tid){});
690   }
691 
692   /**
693    * The size of the chunks the file is divided into, in bytes.
694    *
695    * A single event (write call) never spans multiple chunks – this
696    * effectively limits the event size to chunkSize - EventSize.sizeof.
697    */
698   ulong chunkSize() @property {
699     return chunkSize_;
700   }
701 
702   /// ditto
703   void chunkSize(ulong value) @property {
704     enforce(!isOpen, new TTransportException(
705       "Cannot set chunk size after TFileWriterTransport has been opened."));
706     chunkSize_ = value;
707   }
708 
709   /**
710    * The maximum number of write() calls buffered, or zero for no limit.
711    *
712    * If the buffer is exhausted, write() will block until space becomes
713    * available.
714    */
715   size_t eventBufferSize() @property {
716     return eventBufferSize_;
717   }
718 
719   /// ditto
720   void eventBufferSize(size_t value) @property {
721     eventBufferSize_ = value;
722     if (isOpen) {
723       setMaxMailboxSize(writerThread_, value, OnCrowding.throwException);
724     }
725   }
726 
727   /// ditto
728   enum DEFAULT_EVENT_BUFFER_SIZE = 10_000;
729 
730   /**
731    * Maximum number of bytes buffered before writing and flushing the file
732    * to disk.
733    *
734    * Currently cannot be set after the first call to write().
735    */
736   size_t maxFlushBytes() @property {
737     return maxFlushBytes_;
738   }
739 
740   /// ditto
741   void maxFlushBytes(size_t value) @property {
742     maxFlushBytes_ = value;
743     if (isOpen) {
744       send(writerThread_, FlushBytesMessage(value));
745     }
746   }
747 
748   /// ditto
749   enum DEFAULT_MAX_FLUSH_BYTES = 1000 * 1024;
750 
751   /**
752    * Maximum interval between flushing the file to disk.
753    *
754    * Currenlty cannot be set after the first call to write().
755    */
756   Duration maxFlushInterval() @property {
757     return maxFlushInterval_;
758   }
759 
760   /// ditto
761   void maxFlushInterval(Duration value) @property {
762     maxFlushInterval_ = value;
763     if (isOpen) {
764       send(writerThread_, FlushIntervalMessage(value));
765     }
766   }
767 
768   /// ditto
769   enum DEFAULT_MAX_FLUSH_INTERVAL = dur!"seconds"(3);
770 
771   /**
772    * When the writer thread encounteres an I/O error, it goes pauses for a
773    * short time before trying to reopen the output file. This controls the
774    * sleep duration.
775    */
776   Duration ioErrorSleepDuration() @property {
777     return ioErrorSleepDuration_;
778   }
779 
780   /// ditto
781   void ioErrorSleepDuration(Duration value) @property {
782     ioErrorSleepDuration_ = value;
783     if (isOpen) {
784       send(writerThread_, FlushIntervalMessage(value));
785     }
786   }
787 
788   /// ditto
789   enum DEFAULT_IO_ERROR_SLEEP_DURATION = dur!"msecs"(500);
790 
791 private:
792   string path_;
793   ulong chunkSize_;
794   size_t eventBufferSize_;
795   Duration ioErrorSleepDuration_;
796   size_t maxFlushBytes_;
797   Duration maxFlushInterval_;
798   bool isOpen_;
799   Tid writerThread_;
800 }
801 
802 private {
803   // Signals that the file should be flushed on disk. Sent to the writer
804   // thread and sent back along with the tid for confirmation.
805   struct FlushMessage {}
806 
807   // Signals that the writer thread should close the file and shut down. Sent
808   // to the writer thread and sent back along with the tid for confirmation.
809   struct ShutdownMessage {}
810 
811   struct FlushBytesMessage {
812     size_t value;
813   }
814 
815   struct FlushIntervalMessage {
816     Duration value;
817   }
818 
819   struct IoErrorSleepDurationMessage {
820     Duration value;
821   }
822 
823   void writerThread(
824     string path,
825     ulong chunkSize,
826     size_t maxFlushBytes,
827     Duration maxFlushInterval,
828     Duration ioErrorSleepDuration
829   ) {
830     bool errorOpening;
831     File file;
832     ulong offset;
833     try {
834       // Open file in appending and binary mode.
835       file = File(path, "ab");
836       offset = file.tell();
837     } catch (Exception e) {
838       logError("Error on opening output file in writer thread: %s", e);
839       errorOpening = true;
840     }
841 
842     auto flushTimer = StopWatch(AutoStart.yes);
843     size_t unflushedByteCount;
844 
845     Tid shutdownRequestTid;
846     bool shutdownRequested;
847     while (true) {
848       if (shutdownRequested) break;
849 
850       bool forceFlush;
851       Tid flushRequestTid;
852       receiveTimeout(max(dur!"hnsecs"(0), maxFlushInterval - flushTimer.peek()),
853         (immutable(ubyte)[] data) {
854           while (errorOpening) {
855             logError("Writer thread going to sleep for %s µs due to IO errors",
856               ioErrorSleepDuration.total!"usecs");
857 
858             // Sleep for ioErrorSleepDuration, being ready to be interrupted
859             // by shutdown requests.
860             auto timedOut = receiveTimeout(ioErrorSleepDuration,
861               (ShutdownMessage msg, Tid tid){ shutdownRequestTid = tid; });
862             if (!timedOut) {
863               // We got a shutdown request, just drop all events and exit the
864               // main loop as to not block application shutdown with our tries
865               // which we must assume to fail.
866               break;
867             }
868 
869             try {
870               file = File(path, "ab");
871               unflushedByteCount = 0;
872               errorOpening = false;
873               logError("Output file %s reopened during writer thread error " ~
874                 "recovery", path);
875             } catch (Exception e) {
876               logError("Unable to reopen output file %s during writer " ~
877                 "thread error recovery", path);
878             }
879           }
880 
881           // Make sure the event does not cross the chunk boundary by writing
882           // a padding consisting of zeroes if it would.
883           auto chunk1 = offset / chunkSize;
884           auto chunk2 = (offset + EventSize.sizeof + data.length - 1) / chunkSize;
885 
886           if (chunk1 != chunk2) {
887             // TODO: The C++ implementation refetches the offset here to »keep
888             // in sync« – why would this be needed?
889             auto padding = cast(size_t)
890               ((((offset / chunkSize) + 1) * chunkSize) - offset);
891             auto zeroes = new ubyte[padding];
892             file.rawWrite(zeroes);
893             unflushedByteCount += padding;
894             offset += padding;
895           }
896 
897           // TODO: 2 syscalls here, is this a problem performance-wise?
898           // Probably abysmal performance on Windows due to rawWrite
899           // implementation.
900           uint len = cast(uint)data.length;
901           file.rawWrite(cast(ubyte[])(&len)[0..1]);
902           file.rawWrite(data);
903 
904           auto bytesWritten = EventSize.sizeof + data.length;
905           unflushedByteCount += bytesWritten;
906           offset += bytesWritten;
907         }, (FlushBytesMessage msg) {
908           maxFlushBytes = msg.value;
909         }, (FlushIntervalMessage msg) {
910           maxFlushInterval = msg.value;
911         }, (IoErrorSleepDurationMessage msg) {
912           ioErrorSleepDuration = msg.value;
913         }, (FlushMessage msg, Tid tid) {
914           forceFlush = true;
915           flushRequestTid = tid;
916         }, (OwnerTerminated msg) {
917           shutdownRequested = true;
918         }, (ShutdownMessage msg, Tid tid) {
919           shutdownRequested = true;
920           shutdownRequestTid = tid;
921         }
922       );
923 
924       if (errorOpening) continue;
925 
926       bool flush;
927       if (forceFlush || shutdownRequested || unflushedByteCount > maxFlushBytes) {
928         flush = true;
929       } else if (cast(Duration)flushTimer.peek() > maxFlushInterval) {
930         if (unflushedByteCount == 0) {
931           // If the flush timer is due, but no data has been written, don't
932           // needlessly fsync, but do reset the timer.
933           flushTimer.reset();
934         } else {
935           flush = true;
936         }
937       }
938 
939       if (flush) {
940         file.flush();
941         flushTimer.reset();
942         unflushedByteCount = 0;
943         if (forceFlush) send(flushRequestTid, FlushMessage(), thisTid);
944       }
945     }
946 
947     file.close();
948 
949     if (shutdownRequestTid != Tid.init) {
950       send(shutdownRequestTid, ShutdownMessage(), thisTid);
951     }
952   }
953 }
954 
955 version (unittest) {
956   import core.memory : GC;
957   import std.file;
958 }
959 
960 unittest {
961   void tryRemove(string fileName) {
962     try {
963       remove(fileName);
964     } catch (Exception) {}
965   }
966 
967   immutable fileName = "unittest.dat.tmp";
968   enforce(!exists(fileName), "Unit test output file " ~ fileName ~
969     " already exists.");
970 
971   /*
972    * Check the most basic reading/writing operations.
973    */
974   {
975     scope (exit) tryRemove(fileName);
976 
977     auto writer = new TFileWriterTransport(fileName);
978     writer.open();
979     scope (exit) writer.close();
980 
981     writer.write([1, 2]);
982     writer.write([3, 4]);
983     writer.write([5, 6, 7]);
984     writer.flush();
985 
986     auto reader = new TFileReaderTransport(fileName);
987     reader.open();
988     scope (exit) reader.close();
989 
990     auto buf = new ubyte[7];
991     reader.readAll(buf);
992     enforce(buf == [1, 2, 3, 4, 5, 6, 7]);
993   }
994 
995   /*
996    * Check that chunking works as expected.
997    */
998   {
999     scope (exit) tryRemove(fileName);
1000 
1001     static assert(EventSize.sizeof == 4);
1002     enum CHUNK_SIZE = 10;
1003 
1004     // Write some contents to the file.
1005     {
1006       auto writer = new TFileWriterTransport(fileName);
1007       writer.chunkSize = CHUNK_SIZE;
1008       writer.open();
1009       scope (exit) writer.close();
1010 
1011       writer.write([0xde]);
1012       writer.write([0xad]);
1013       // Chunk boundary here.
1014       writer.write([0xbe]);
1015       // The next write doesn't fit in the five bytes remaining, so we expect
1016       // padding zero bytes to be written.
1017       writer.write([0xef, 0x12]);
1018 
1019       try {
1020         writer.write(new ubyte[CHUNK_SIZE]);
1021         enforce(false, "Could write event not fitting in a single chunk.");
1022       } catch (TTransportException e) {}
1023 
1024       writer.flush();
1025     }
1026 
1027     // Check the raw contents of the file to see if chunk padding was written
1028     // as expected.
1029     auto file = File(fileName, "r");
1030     enforce(file.size == 26);
1031     auto written = new ubyte[26];
1032     file.rawRead(written);
1033     enforce(written == [
1034       1, 0, 0, 0, 0xde,
1035       1, 0, 0, 0, 0xad,
1036       1, 0, 0, 0, 0xbe,
1037       0, 0, 0, 0, 0,
1038       2, 0, 0, 0, 0xef, 0x12
1039     ]);
1040 
1041     // Read the data back in, getting all the events at once.
1042     {
1043       auto reader = new TFileReaderTransport(fileName);
1044       reader.chunkSize = CHUNK_SIZE;
1045       reader.open();
1046       scope (exit) reader.close();
1047 
1048       auto buf = new ubyte[5];
1049       reader.readAll(buf);
1050       enforce(buf == [0xde, 0xad, 0xbe, 0xef, 0x12]);
1051     }
1052   }
1053 
1054   /*
1055    * Make sure that close() exits "quickly", i.e. that there is no problem
1056    * with the worker thread waking up.
1057    */
1058   {
1059     import std.conv : text;
1060     enum NUM_ITERATIONS = 1000;
1061 
1062     uint numOver = 0;
1063     foreach (n; 0 .. NUM_ITERATIONS) {
1064       scope (exit) tryRemove(fileName);
1065 
1066       auto transport = new TFileWriterTransport(fileName);
1067       transport.open();
1068 
1069       // Write something so that the writer thread gets started.
1070       transport.write(cast(ubyte[])"foo");
1071 
1072       // Every other iteration, also call flush(), just in case that potentially
1073       // has any effect on how the writer thread wakes up.
1074       if (n & 0x1) {
1075         transport.flush();
1076       }
1077 
1078       // Time the call to close().
1079       auto sw = StopWatch(AutoStart.yes);
1080       transport.close();
1081       sw.stop();
1082 
1083       // If any attempt takes more than 500ms, treat that as a fatal failure to
1084       // avoid looping over a potentially very slow operation.
1085 
1086 	static if (__VERSION__ >= 2079)
1087 	{
1088 		enforce(sw.peek().total!"msecs" < 1500,
1089 		text("close() took ", sw.peek().total!"msecs", "ms."));
1090 	      // Normally, it takes less than 5ms on my dev box.
1091 	      // However, if the box is heavily loaded, some of the test runs can take
1092 	      // longer. Additionally, on a Windows Server 2008 instance running in
1093 	      // a VirtualBox VM, it has been observed that about a quarter of the runs
1094 	      // takes (217 ± 1) ms, for reasons not yet known.
1095 	      if (sw.peek().total!"msecs" > 50) {
1096 		++numOver;
1097 	      }
1098 	}
1099         else
1100         {
1101 		enforce(sw.peek().msecs < 1500,
1102 		text("close() took ", sw.peek().msecs, "ms."));
1103 	      // Normally, it takes less than 5ms on my dev box.
1104 	      // However, if the box is heavily loaded, some of the test runs can take
1105 	      // longer. Additionally, on a Windows Server 2008 instance running in
1106 	      // a VirtualBox VM, it has been observed that about a quarter of the runs
1107 	      // takes (217 ± 1) ms, for reasons not yet known.
1108 	      if (sw.peek().msecs > 50) {
1109 		++numOver;
1110 	      }
1111 	}
1112 
1113 
1114       // Force garbage collection runs every now and then to make sure we
1115       // don't run out of OS thread handles.
1116       if (!(n % 100)) GC.collect();
1117     }
1118 
1119     // Make sure fewer than a third of the runs took longer than 5ms.
1120     enforce(numOver < NUM_ITERATIONS / 3,
1121       text(numOver, " iterations took more than 10 ms."));
1122   }
1123 }