00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00031
00032 #ifdef HAVE_CONFIG_H
00033 #include "autoconfig.h"
00034 #endif
00035
00036 #include "buffered_io_handler.h"
00037 #include "tools.h"
00038
00039 using namespace zmm;
00040
00041 BufferedIOHandler::BufferedIOHandler(Ref<IOHandler> underlyingHandler, size_t bufSize, size_t maxChunkSize, size_t initialFillSize) : IOHandlerBufferHelper(bufSize, initialFillSize)
00042 {
00043 if (underlyingHandler == nil)
00044 throw _Exception(_("underlyingHandler must not be nil"));
00045 if (maxChunkSize <= 0)
00046 throw _Exception(_("maxChunkSize must be positive"));
00047 this->underlyingHandler = underlyingHandler;
00048 this->maxChunkSize = maxChunkSize;
00049
00050
00051
00052 }
00053
00054 void BufferedIOHandler::open(IN enum UpnpOpenFileMode mode)
00055 {
00056
00057 underlyingHandler->open(mode);
00058 IOHandlerBufferHelper::open(mode);
00059 }
00060
00061 void BufferedIOHandler::close()
00062 {
00063 IOHandlerBufferHelper::close();
00064
00065 underlyingHandler->close();
00066 }
00067
00068 void BufferedIOHandler::threadProc()
00069 {
00070 int readBytes;
00071 size_t maxWrite;
00072
00073 #ifdef TOMBDEBUG
00074 struct timespec last_log;
00075 bool first_log = true;
00076 #endif
00077
00078 AUTOLOCK(mutex);
00079 do
00080 {
00081
00082 #ifdef TOMBDEBUG
00083 if (first_log || getDeltaMillis(&last_log) > 1000)
00084 {
00085 if (first_log)
00086 first_log = false;
00087 getTimespecNow(&last_log);
00088 float percentFillLevel = 0;
00089 if (! empty)
00090 {
00091 int currentFillSize = b - a;
00092 if (currentFillSize <= 0)
00093 currentFillSize += bufSize;
00094 percentFillLevel = ((float)currentFillSize / (float)bufSize) * 100;
00095 }
00096 log_debug("buffer fill level: %3.2f%% (bufSize: %d; a: %d; b: %d)\n", percentFillLevel, bufSize, a, b);
00097 }
00098 #endif
00099 if (empty)
00100 a = b = 0;
00101
00102 if (doSeek && ! empty &&
00103 (
00104 seekWhence == SEEK_SET ||
00105 (seekWhence == SEEK_CUR && seekOffset > 0)
00106 )
00107 )
00108 {
00109 int currentFillSize = b - a;
00110 if (currentFillSize <= 0)
00111 currentFillSize += bufSize;
00112
00113 int relSeek = seekOffset;
00114 if (seekWhence == SEEK_SET)
00115 relSeek -= posRead;
00116
00117 if (relSeek <= currentFillSize)
00118 {
00119 a += relSeek;
00120 posRead += relSeek;
00121 if (a >= bufSize)
00122 a -= bufSize;
00123 if (a == b)
00124 {
00125 empty = true;
00126 a = b = 0;
00127 }
00128
00130
00131 doSeek = false;
00132 cond->signal();
00133 }
00134 }
00135
00136
00137
00138
00139 if (doSeek)
00140 {
00141 try
00142 {
00143 underlyingHandler->seek(seekOffset, seekWhence);
00144 empty = true;
00145 a = b = 0;
00146 }
00147 catch (Exception e)
00148 {
00149 log_error("Error while seeking in buffer: %s\n", e.getMessage().c_str());
00150 e.printStackTrace();
00151 }
00152
00154 waitForInitialFillSize = (initialFillSize > 0);
00155
00156 doSeek = false;
00157 cond->signal();
00158 }
00159
00160 maxWrite = (empty ? bufSize : (a < b ? bufSize - b : a - b));
00161 if (maxWrite == 0)
00162 {
00163 cond->wait();
00164 }
00165 else
00166 {
00167 AUTOUNLOCK();
00168 size_t chunkSize = (maxChunkSize > maxWrite ? maxWrite : maxChunkSize);
00169 readBytes = underlyingHandler->read(buffer + b, chunkSize);
00170 AUTORELOCK();
00171 if (readBytes > 0)
00172 {
00173 b += readBytes;
00174 assert(b <= bufSize);
00175 if (b == bufSize)
00176 b = 0;
00177 if (empty)
00178 {
00179 empty = false;
00180 cond->signal();
00181 }
00182 if (waitForInitialFillSize)
00183 {
00184 int currentFillSize = b - a;
00185 if (currentFillSize <= 0)
00186 currentFillSize += bufSize;
00187 if ((size_t)currentFillSize >= initialFillSize)
00188 {
00189 log_debug("buffer: initial fillsize reached\n");
00190 waitForInitialFillSize = false;
00191 cond->signal();
00192 }
00193 }
00194 }
00195 else if (readBytes == CHECK_SOCKET)
00196 {
00197 checkSocket = true;
00198 cond->signal();
00199 }
00200 }
00201 }
00202 while((maxWrite == 0 || readBytes > 0 || readBytes == CHECK_SOCKET) && ! threadShutdown);
00203 if (! threadShutdown)
00204 {
00205 if (readBytes == 0)
00206 eof = true;
00207 if (readBytes < 0)
00208 readError = true;
00209 }
00210
00211 cond->signal();
00212 }