00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00031
00032 #ifdef HAVE_CONFIG_H
00033 #include "autoconfig.h"
00034 #endif
00035
00036 #include "update_manager.h"
00037
00038 #include "upnp_cds.h"
00039 #include "storage.h"
00040 #include "tools.h"
00041 #include <sys/types.h>
00042 #include <signal.h>
00043
00044
00045 #define SPEC_INTERVAL 2000
00046 #define MIN_SLEEP 1
00047
00048 #define MAX_OBJECT_IDS 1000
00049 #define MAX_OBJECT_IDS_OVERLOAD 30
00050 #define OBJECT_ID_HASH_CAPACITY 3109
00051
00052 using namespace zmm;
00053
00054 SINGLETON_MUTEX(UpdateManager, false);
00055
00056 UpdateManager::UpdateManager() : Singleton<UpdateManager>()
00057 {
00058 objectIDHash = Ref<DBRHash<int> >(new DBRHash<int>(OBJECT_ID_HASH_CAPACITY, MAX_OBJECT_IDS + 2 * MAX_OBJECT_IDS_OVERLOAD, INVALID_OBJECT_ID, INVALID_OBJECT_ID_2));
00059 shutdownFlag = false;
00060 flushPolicy = FLUSH_SPEC;
00061 lastContainerChanged = INVALID_OBJECT_ID;
00062 cond = Ref<Cond>(new Cond(mutex));
00063 }
00064
00065 void UpdateManager::init()
00066 {
00067
00068
00069
00070
00071
00072
00073 pthread_create(
00074 &updateThread,
00075 NULL,
00076 UpdateManager::staticThreadProc,
00077 this
00078 );
00079
00080
00081
00082 }
00083
00084 void UpdateManager::shutdown()
00085 {
00086 log_debug("start\n");
00087 AUTOLOCK(mutex);
00088 shutdownFlag = true;
00089 log_debug("signalling...\n");
00090 cond->signal();
00091 AUTOUNLOCK();
00092 log_debug("waiting for thread\n");
00093 if (updateThread)
00094 pthread_join(updateThread, NULL);
00095 updateThread = 0;
00096 log_debug("end\n");
00097 }
00098
00099 void UpdateManager::containersChanged(Ref<IntArray> objectIDs, int flushPolicy)
00100 {
00101 if (objectIDs == nil)
00102 return;
00103 AUTOLOCK(mutex);
00104
00105
00106 bool signal = (! haveUpdates());
00107
00108
00109 if (flushPolicy > this->flushPolicy)
00110 {
00111 this->flushPolicy = flushPolicy;
00112 signal = true;
00113 }
00114 int size = objectIDs->size();
00115 int hashSize = objectIDHash->size();
00116 bool split = (hashSize + size >= MAX_OBJECT_IDS + MAX_OBJECT_IDS_OVERLOAD);
00117 for (int i = 0; i < size; i++)
00118 {
00119 int objectID = objectIDs->get(i);
00120 if (objectID != lastContainerChanged)
00121 {
00122
00123 objectIDHash->put(objectID);
00124 if (split && objectIDHash->size() > MAX_OBJECT_IDS)
00125 {
00126 while(objectIDHash->size() > MAX_OBJECT_IDS)
00127 {
00128 log_debug("in-between signalling...\n");
00129 cond->signal();
00130 AUTOUNLOCK();
00131 AUTORELOCK();
00132 }
00133 }
00134 }
00135 }
00136 if (objectIDHash->size() >= MAX_OBJECT_IDS)
00137 signal = true;
00138 if (signal)
00139 {
00140 log_debug("signalling...\n");
00141 cond->signal();
00142 }
00143 }
00144
00145 void UpdateManager::containerChanged(int objectID, int flushPolicy)
00146 {
00147 if (objectID == INVALID_OBJECT_ID)
00148 return;
00149 AUTOLOCK(mutex);
00150 if (objectID != lastContainerChanged || flushPolicy > this->flushPolicy)
00151 {
00152
00153
00154 bool signal = (! haveUpdates());
00155 log_debug("containerChanged. id: %d, signal: %d\n", objectID, signal);
00156 objectIDHash->put(objectID);
00157
00158
00159 if (objectIDHash->size() >= MAX_OBJECT_IDS)
00160 signal = true;
00161
00162
00163 lastContainerChanged = objectID;
00164
00165
00166
00167 if (flushPolicy > this->flushPolicy)
00168 {
00169 this->flushPolicy = flushPolicy;
00170 signal = true;
00171 }
00172 if (signal)
00173 {
00174 log_debug("signalling...\n");
00175 cond->signal();
00176 }
00177 }
00178 else
00179 {
00180 log_debug("last container changed!\n");
00181 }
00182 }
00183
00184
00185
00186 void UpdateManager::threadProc()
00187 {
00188 struct timespec lastUpdate;
00189 getTimespecNow(&lastUpdate);
00190
00191 AUTOLOCK(mutex);
00192
00193 while (! shutdownFlag)
00194 {
00195 if (haveUpdates())
00196 {
00197 long sleepMillis = 0;
00198 struct timespec now;
00199 getTimespecNow(&now);
00200 long timeDiff = getDeltaMillis(&lastUpdate, &now);
00201 switch (flushPolicy)
00202 {
00203 case FLUSH_SPEC:
00204 sleepMillis = SPEC_INTERVAL - timeDiff;
00205 break;
00206 case FLUSH_ASAP:
00207 sleepMillis = 0;
00208 break;
00209 }
00210 bool sendUpdates = true;
00211 if (sleepMillis >= MIN_SLEEP && objectIDHash->size() < MAX_OBJECT_IDS)
00212 {
00213 struct timespec timeout;
00214 getTimespecAfterMillis(sleepMillis, &timeout, &now);
00215 log_debug("threadProc: sleeping for %ld millis\n", sleepMillis);
00216
00217 int ret = cond->timedwait(&timeout);
00218
00219 if (! shutdownFlag)
00220 {
00221 if (ret != 0 && ret != ETIMEDOUT)
00222 {
00223 log_error("Fatal error: pthread_cond_timedwait returned errorcode %d\n", ret);
00224 log_error("Forcing MediaTomb shutdown.\n");
00225 print_backtrace();
00226 kill(0, SIGINT);
00227 }
00228 if (ret == ETIMEDOUT)
00229 sendUpdates = false;
00230 }
00231 else
00232 sendUpdates = false;
00233 }
00234
00235 if (sendUpdates)
00236 {
00237 log_debug("sending updates...\n");
00238 lastContainerChanged = INVALID_OBJECT_ID;
00239 flushPolicy = FLUSH_SPEC;
00240 String updateString;
00241
00242 try
00243 {
00244 hash_data_array_t<int> hash_data_array;
00245
00246
00247 objectIDHash->getAll(&hash_data_array);
00248 updateString = Storage::getInstance()->incrementUpdateIDs(hash_data_array.data,hash_data_array.size);
00249 objectIDHash->clear();
00250 }
00251 catch (Exception e)
00252 {
00253 e.printStackTrace();
00254 log_error("Fatal error when sending updates: %s\n", e.getMessage().c_str());
00255 log_error("Forcing MediaTomb shutdown.\n");
00256 kill(0, SIGINT);
00257 }
00258 AUTOUNLOCK();
00259 if (string_ok(updateString))
00260 {
00261 try
00262 {
00263 ContentDirectoryService::getInstance()->subscription_update(updateString);
00264 log_debug("updates sent.\n");
00265 getTimespecNow(&lastUpdate);
00266 }
00267 catch (Exception e)
00268 {
00269 log_error("Fatal error when sending updates: %s\n", e.getMessage().c_str());
00270 log_error("Forcing MediaTomb shutdown.\n");
00271 kill(0, SIGINT);
00272 }
00273 }
00274 else
00275 {
00276 log_debug("NOT sending updates (string empty or invalid).\n");
00277 }
00278 AUTORELOCK();
00279 }
00280 }
00281 else
00282 {
00283
00284 cond->wait();
00285 }
00286 }
00287 }
00288
00289 void *UpdateManager::staticThreadProc(void *arg)
00290 {
00291 log_debug("starting update thread... thread: %d\n", pthread_self());
00292 UpdateManager *inst = (UpdateManager *)arg;
00293 inst->threadProc();
00294 Storage::getInstance()->threadCleanup();
00295
00296 log_debug("update thread shut down. thread: %d\n", pthread_self());
00297 pthread_exit(NULL);
00298 return NULL;
00299 }