00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00031
00032 #ifdef HAVE_CONFIG_H
00033 #include "autoconfig.h"
00034 #endif
00035
00036 #include "io_handler_buffer_helper.h"
00037 #include "config_manager.h"
00038 #include "tools.h"
00039
00040 using namespace zmm;
00041
00042 IOHandlerBufferHelper::IOHandlerBufferHelper(size_t bufSize, size_t initialFillSize) : IOHandler()
00043 {
00044 if (bufSize <=0)
00045 throw _Exception(_("bufSize must be positive"));
00046 if (initialFillSize < 0 || initialFillSize > bufSize)
00047 throw _Exception(_("initialFillSize must be non-negative and must be lesser than or equal to the size of the buffer"));
00048
00049 mutex = Ref<Mutex>(new Mutex());
00050 cond = Ref<Cond>(new Cond(mutex));
00051
00052 this->bufSize = bufSize;
00053 this->initialFillSize = initialFillSize;
00054 waitForInitialFillSize = (initialFillSize > 0);
00055 buffer = NULL;
00056 isOpen = false;
00057 threadShutdown = false;
00058 eof = false;
00059 readError = false;
00060 a = b = posRead = 0;
00061 empty = true;
00062 signalAfterEveryRead = false;
00063 checkSocket = false;
00064
00065 seekEnabled = false;
00066 doSeek = false;
00067 }
00068
00069 void IOHandlerBufferHelper::open(IN enum UpnpOpenFileMode mode)
00070 {
00071 if (isOpen)
00072 throw _Exception(_("tried to reopen an open IOHandlerBufferHelper"));
00073 buffer = (char *)MALLOC(bufSize);
00074 if (buffer == NULL)
00075 throw _Exception(_("Failed to allocate memory for transcoding buffer!"));
00076
00077 startBufferThread();
00078 isOpen = true;
00079 }
00080
00081 IOHandlerBufferHelper::~IOHandlerBufferHelper()
00082 {
00083 if (isOpen)
00084 close();
00085 }
00086
00087 int IOHandlerBufferHelper::read(OUT char *buf, IN size_t length)
00088 {
00089
00090 assert(isOpen);
00091
00092 assert(length > 0);
00093
00094 AUTOLOCK(mutex);
00095
00096 while ((empty || waitForInitialFillSize) && ! (threadShutdown || eof || readError))
00097 {
00098 if (checkSocket)
00099 {
00100 checkSocket = false;
00101 return CHECK_SOCKET;
00102 }
00103 else
00104 cond->wait();
00105 }
00106
00107 if (readError || threadShutdown)
00108 return -1;
00109 if (empty && eof)
00110 return 0;
00111
00112 size_t bLocal = b;
00113 AUTOUNLOCK();
00114
00115
00116 int currentFillSize = bLocal - a;
00117 if (currentFillSize <= 0)
00118 currentFillSize += bufSize;
00119 size_t maxRead1 = (a < bLocal ? bLocal - a : bufSize - a);
00120 size_t read1 = (maxRead1 > length ? length : maxRead1);
00121 size_t maxRead2 = currentFillSize - read1;
00122 size_t read2 = (read1 < length ? length - read1 : 0);
00123 if (read2 > maxRead2)
00124 read2 = maxRead2;
00125
00126 memcpy(buf, buffer + a, read1);
00127 if (read2)
00128 memcpy(buf + read1, buffer, read2);
00129
00130 size_t didRead = read1+read2;
00131
00132 AUTORELOCK();
00133
00134 bool signalled = false;
00135
00136 if (signalAfterEveryRead || a == b)
00137 {
00138 cond->signal();
00139 signalled = true;
00140 }
00141
00142 a += didRead;
00143 if (a >= bufSize)
00144 a -= bufSize;
00145 if (a == b)
00146 {
00147 empty = true;
00148 if (! signalled)
00149 cond->signal();
00150 }
00151
00152 posRead += didRead;
00153 return didRead;
00154 }
00155
00156 void IOHandlerBufferHelper::seek(IN off_t offset, IN int whence)
00157 {
00158 log_debug("seek called: %lld %d\n", offset, whence);
00159 if (! seekEnabled)
00160 throw _Exception(_("seek currently disabled in this IOHandlerBufferHelper"));
00161
00162 assert(isOpen);
00163
00164
00165 assert(whence == SEEK_SET || whence == SEEK_CUR || whence == SEEK_END);
00166 assert(whence != SEEK_SET || offset >= 0);
00167 assert(whence != SEEK_END || offset <= 0);
00168
00169
00170 if (whence == SEEK_CUR && offset == 0)
00171 return;
00172
00173 AUTOLOCK(mutex);
00174
00175
00176
00177 doSeek = true;
00178 seekOffset = offset;
00179 seekWhence = whence;
00180
00181
00182 cond->signal();
00183
00184
00185 while(doSeek && ! (threadShutdown || eof || readError))
00186 cond->wait();
00187 }
00188
00189 void IOHandlerBufferHelper::close()
00190 {
00191 if (! isOpen)
00192 throw _Exception(_("close called on closed IOHandlerBufferHelper"));
00193 isOpen = false;
00194 stopBufferThread();
00195 FREE(buffer);
00196 buffer = NULL;
00197 }
00198
00199
00200
00201 void IOHandlerBufferHelper::startBufferThread()
00202 {
00203 pthread_create(
00204 &bufferThread,
00205 NULL,
00206 IOHandlerBufferHelper::staticThreadProc,
00207 this
00208 );
00209 }
00210
00211 void IOHandlerBufferHelper::stopBufferThread()
00212 {
00213 AUTOLOCK(mutex);
00214 threadShutdown = true;
00215 cond->signal();
00216 AUTOUNLOCK();
00217 if (bufferThread)
00218 pthread_join(bufferThread, NULL);
00219 bufferThread = 0;
00220 }
00221
00222 void *IOHandlerBufferHelper::staticThreadProc(void *arg)
00223 {
00224 log_debug("starting buffer thread... thread: %d\n", pthread_self());
00225 IOHandlerBufferHelper *inst = (IOHandlerBufferHelper *)arg;
00226 inst->threadProc();
00227 log_debug("buffer thread shut down. thread: %d\n", pthread_self());
00228 pthread_exit(NULL);
00229 return NULL;
00230 }