00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "opensync.h"
00022 #include "opensync_internals.h"
00023
00024 #include "opensync-archive.h"
00025 #include "opensync-group.h"
00026 #include "opensync-engine.h"
00027 #include "opensync-client.h"
00028 #include "opensync-data.h"
00029 #include "opensync-mapping.h"
00030 #include "opensync-format.h"
00031 #include "opensync-merger.h"
00032 #include "opensync-plugin.h"
00033 #include "opensync-xmlformat.h"
00034
00035 #include "opensync_engine_internals.h"
00036 #include "opensync_sink_engine_internals.h"
00037 #include "opensync_mapping_entry_engine_internals.h"
00038 #include "opensync_status_internals.h"
00039
00040 #include "opensync_mapping_engine.h"
00041 #include "opensync_mapping_engine_internals.h"
00042
00043 #include "opensync_obj_engine.h"
00044 #include "opensync_obj_engine_internals.h"
00045
00046 #include "archive/opensync_archive_internals.h"
00047 #include "data/opensync_change_internals.h"
00048 #include "client/opensync_client_proxy_internals.h"
00049 #include "group/opensync_member_internals.h"
00050 #include "format/opensync_objformat_internals.h"
00051
00052 OSyncMappingEngine *_osync_obj_engine_create_mapping_engine(OSyncObjEngine *engine, OSyncError **error)
00053 {
00054
00055 OSyncMapping *mapping = osync_mapping_new(error);
00056 OSyncList *s = NULL;
00057 OSyncMappingEngine *mapping_engine = NULL;
00058 if (!mapping)
00059 goto error;
00060
00061 osync_mapping_set_id(mapping, osync_mapping_table_get_next_id(engine->mapping_table));
00062 osync_mapping_table_add_mapping(engine->mapping_table, mapping);
00063
00064 for (s = engine->sink_engines; s; s = s->next) {
00065 OSyncSinkEngine *sink_engine = s->data;
00066
00067 OSyncMember *member = osync_client_proxy_get_member(sink_engine->proxy);
00068
00069 OSyncMappingEntry *mapping_entry = osync_mapping_entry_new(error);
00070 osync_mapping_entry_set_member_id(mapping_entry, osync_member_get_id(member));
00071 osync_mapping_add_entry(mapping, mapping_entry);
00072 osync_mapping_entry_unref(mapping_entry);
00073 }
00074
00075 mapping_engine = osync_mapping_engine_new(engine, mapping, error);
00076 if (!mapping_engine)
00077 goto error_free_mapping;
00078 osync_mapping_unref(mapping);
00079
00080 return mapping_engine;
00081
00082 error_free_mapping:
00083 osync_mapping_unref(mapping);
00084 error:
00085 return NULL;
00086 }
00087
00088 static void _osync_obj_engine_connect_callback(OSyncClientProxy *proxy, void *userdata, osync_bool slowsync, OSyncError *error)
00089 {
00090 OSyncSinkEngine *sinkengine = userdata;
00091 OSyncObjEngine *engine = sinkengine->engine;
00092 OSyncError *locerror = NULL;
00093
00094 osync_trace(TRACE_ENTRY, "%s(%p, %p, %i, %p)", __func__, proxy, userdata, slowsync, error);
00095
00096 if (error) {
00097 osync_trace(TRACE_INTERNAL, "Obj Engine received connect error: %s", osync_error_print(&error));
00098 osync_obj_engine_set_error(engine, error);
00099 engine->sink_errors = engine->sink_errors | (0x1 << sinkengine->position);
00100 osync_status_update_member(engine->parent, osync_client_proxy_get_member(proxy), OSYNC_CLIENT_EVENT_ERROR, engine->objtype, error);
00101 } else {
00102 engine->sink_connects = engine->sink_connects | (0x1 << sinkengine->position);
00103 osync_status_update_member(engine->parent, osync_client_proxy_get_member(proxy), OSYNC_CLIENT_EVENT_CONNECTED, engine->objtype, NULL);
00104 }
00105
00106 if (slowsync) {
00107 osync_obj_engine_set_slowsync(engine, TRUE);
00108 osync_trace(TRACE_INTERNAL, "SlowSync requested during connect.");
00109 }
00110
00111 if (osync_bitcount(engine->sink_errors | engine->sink_connects) == osync_obj_engine_num_sinkengines(engine)) {
00112 if (osync_bitcount(engine->sink_errors)) {
00113 osync_error_set(&locerror, OSYNC_ERROR_GENERIC, "At least one sink_engine failed while connecting");
00114 osync_obj_engine_set_error(engine, locerror);
00115 }
00116
00117 osync_obj_engine_event(engine, OSYNC_ENGINE_EVENT_CONNECTED, locerror ? locerror : error);
00118 } else
00119 osync_trace(TRACE_INTERNAL, "Not yet: %i", osync_bitcount(engine->sink_errors | engine->sink_connects));
00120
00121 osync_sink_engine_unref(sinkengine);
00122 osync_trace(TRACE_EXIT, "%s", __func__);
00123 }
00124
00125 static void _osync_obj_engine_connect_done_callback(OSyncClientProxy *proxy, void *userdata, OSyncError *error)
00126 {
00127 OSyncSinkEngine *sinkengine = userdata;
00128 OSyncObjEngine *engine = sinkengine->engine;
00129 OSyncError *locerror = NULL;
00130
00131 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, proxy, userdata, error);
00132
00133 if (error) {
00134 osync_obj_engine_set_error(engine, error);
00135 engine->sink_errors = engine->sink_errors | (0x1 << sinkengine->position);
00136 osync_status_update_member(engine->parent, osync_client_proxy_get_member(proxy), OSYNC_CLIENT_EVENT_ERROR, engine->objtype, error);
00137 } else {
00138 engine->sink_connect_done = engine->sink_connect_done | (0x1 << sinkengine->position);
00139 osync_status_update_member(engine->parent, osync_client_proxy_get_member(proxy), OSYNC_CLIENT_EVENT_CONNECT_DONE, engine->objtype, NULL);
00140 }
00141
00142 if (osync_bitcount(engine->sink_errors | engine->sink_connect_done) == osync_obj_engine_num_sinkengines(engine)) {
00143 if (osync_bitcount(engine->sink_connect_done) < osync_bitcount(engine->sink_connects)) {
00144 osync_error_set(&locerror, OSYNC_ERROR_GENERIC, "Fewer sink_engines reported connect_done than connected");
00145 osync_obj_engine_set_error(engine, locerror);
00146 }
00147
00148 osync_obj_engine_event(engine, OSYNC_ENGINE_EVENT_CONNECT_DONE, locerror ? locerror : error);
00149 } else
00150 osync_trace(TRACE_INTERNAL, "Not yet: %i", osync_bitcount(engine->sink_errors | engine->sink_connect_done));
00151
00152 osync_sink_engine_unref(sinkengine);
00153 osync_trace(TRACE_EXIT, "%s", __func__);
00154 }
00155
00156 static void _osync_obj_engine_generate_event_disconnected(OSyncObjEngine *engine, OSyncError *error)
00157 {
00158 OSyncError *locerror = NULL;
00159 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
00160
00161 if (osync_bitcount(engine->sink_errors | engine->sink_disconnects) == osync_obj_engine_num_sinkengines(engine)) {
00162 if (osync_bitcount(engine->sink_disconnects) < osync_bitcount(engine->sink_connects)) {
00163 osync_error_set(&locerror, OSYNC_ERROR_GENERIC, "Fewer sink_engines disconnected than connected");
00164 osync_obj_engine_set_error(engine, locerror);
00165 osync_error_unref(&locerror);
00166 }
00167
00168
00169
00170
00171 osync_obj_engine_event(engine, OSYNC_ENGINE_EVENT_DISCONNECTED, NULL);
00172 } else
00173 osync_trace(TRACE_INTERNAL, "Not yet: %i", osync_bitcount(engine->sink_errors | engine->sink_disconnects));
00174
00175 osync_trace(TRACE_EXIT, "%s", __func__);
00176 }
00177
00178 static void _osync_obj_engine_disconnect_callback(OSyncClientProxy *proxy, void *userdata, OSyncError *error)
00179 {
00180 OSyncSinkEngine *sinkengine = userdata;
00181 OSyncObjEngine *engine = sinkengine->engine;
00182
00183 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, proxy, userdata, error);
00184
00185 if (error) {
00186 osync_obj_engine_set_error(engine, error);
00187 engine->sink_errors = engine->sink_errors | (0x1 << sinkengine->position);
00188 osync_status_update_member(engine->parent, osync_client_proxy_get_member(proxy), OSYNC_CLIENT_EVENT_ERROR, engine->objtype, error);
00189 } else {
00190 engine->sink_disconnects = engine->sink_disconnects | (0x1 << sinkengine->position);
00191 osync_status_update_member(engine->parent, osync_client_proxy_get_member(proxy), OSYNC_CLIENT_EVENT_DISCONNECTED, engine->objtype, NULL);
00192 }
00193
00194 _osync_obj_engine_generate_event_disconnected(engine, error);
00195
00196 osync_sink_engine_unref(sinkengine);
00197
00198 osync_trace(TRACE_EXIT, "%s", __func__);
00199 }
00200
00201
00202
00203
00204
00205 static OSyncConvCmpResult _osync_obj_engine_mapping_find(OSyncList *mapping_engines, OSyncChange *change, OSyncSinkEngine *sinkengine, OSyncMappingEngine **mapping_engine)
00206 {
00207 OSyncList *m = NULL;
00208 OSyncList *e = NULL;
00209 osync_bool found_similar = FALSE;
00210 OSyncConvCmpResult result = OSYNC_CONV_DATA_MISMATCH;
00211 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p, %p)", __func__, mapping_engines, change, sinkengine, mapping_engine);
00212
00213 for (m = mapping_engines; m; m = m->next) {
00214 *mapping_engine = m->data;
00215
00216
00217
00218
00219 for (e = (*mapping_engine)->entries; e; e = e->next) {
00220 OSyncMappingEntryEngine *entry_engine = e->data;
00221 OSyncChange *mapping_change = NULL;
00222
00223 if (entry_engine->sink_engine == sinkengine) {
00224 if (!found_similar)
00225 *mapping_engine = NULL;
00226
00227 break;
00228 }
00229
00230 mapping_change = osync_entry_engine_get_change(entry_engine);
00231 if (!mapping_change)
00232 continue;
00233
00234 result = osync_change_compare(mapping_change, change);
00235 if (result == OSYNC_CONV_DATA_MISMATCH) {
00236 if (!found_similar)
00237 *mapping_engine = NULL;
00238 } else if (result == OSYNC_CONV_DATA_SAME) {
00239 break;
00240 } else if (result == OSYNC_CONV_DATA_SIMILAR) {
00241 found_similar = TRUE;
00242 }
00243 }
00244
00245 if (*mapping_engine) {
00246 osync_trace(TRACE_EXIT, "%s: Found %p", __func__, *mapping_engine);
00247 return result;
00248 }
00249 }
00250
00251 osync_trace(TRACE_EXIT, "%s: Mismatch", __func__);
00252 return OSYNC_CONV_DATA_MISMATCH;
00253 }
00254
00255 osync_bool osync_obj_engine_map_changes(OSyncObjEngine *engine, OSyncError **error)
00256 {
00257 OSyncMappingEngine *mapping_engine = NULL;
00258 OSyncList *new_mappings = NULL, *v = NULL;
00259 OSyncList *unmapped_mappings = NULL;
00260 OSyncConvCmpResult result = 0;
00261 OSyncMappingEntryEngine *entry_engine = NULL;
00262
00263 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
00264
00265
00266
00267 for (v = engine->sink_engines; v; v = v->next) {
00268 OSyncSinkEngine *sinkengine = v->data;
00269
00270
00271
00272
00273
00274 long long int memberid = osync_member_get_id(osync_client_proxy_get_member(sinkengine->proxy));
00275 osync_trace(TRACE_INTERNAL, "Sinkengine of member %lli", memberid);
00276
00277 unmapped_mappings = osync_list_copy(new_mappings);
00278
00279
00280 while (sinkengine->unmapped) {
00281 OSyncChange *change = sinkengine->unmapped->data;
00282
00283 osync_trace(TRACE_INTERNAL, "Looking for mapping for change %s, changetype %i from member %lli", osync_change_get_uid(change), osync_change_get_changetype(change), memberid);
00284
00285
00286 result = _osync_obj_engine_mapping_find(unmapped_mappings, change, sinkengine, &mapping_engine);
00287 if (result == OSYNC_CONV_DATA_MISMATCH) {
00288
00289 mapping_engine = _osync_obj_engine_create_mapping_engine(engine, error);
00290 if (!mapping_engine)
00291 goto error;
00292
00293 osync_trace(TRACE_INTERNAL, "Unable to find mapping. Creating new mapping with id %lli", osync_mapping_get_id(mapping_engine->mapping));
00294
00295 new_mappings = osync_list_append(new_mappings, mapping_engine);
00296 unmapped_mappings = osync_list_append(unmapped_mappings, mapping_engine);
00297 } else if (result == OSYNC_CONV_DATA_SIMILAR) {
00298 mapping_engine->conflict = TRUE;
00299 } else if (result == OSYNC_CONV_DATA_SAME) {
00300 unmapped_mappings = osync_list_remove(unmapped_mappings, mapping_engine);
00301 }
00302
00303 entry_engine = osync_mapping_engine_get_entry(mapping_engine, sinkengine);
00304 osync_assert(entry_engine);
00305
00306 osync_entry_engine_update(entry_engine, change);
00307 sinkengine->unmapped = osync_list_remove(sinkengine->unmapped, sinkengine->unmapped->data);
00308 osync_change_unref(change);
00309 }
00310
00311 osync_list_free(unmapped_mappings);
00312
00313 }
00314
00315 engine->mapping_engines = osync_list_concat(engine->mapping_engines, new_mappings);
00316
00317
00318 osync_trace(TRACE_EXIT, "%s", __func__);
00319 return TRUE;
00320
00321 error:
00322 osync_trace_enable();
00323 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00324 return FALSE;
00325 }
00326
00327 static void _osync_obj_engine_read_ignored_callback(OSyncClientProxy *proxy, void *userdata, OSyncError *error)
00328 {
00329 OSyncSinkEngine *sinkengine = userdata;
00330 OSyncObjEngine *engine = sinkengine->engine;
00331
00332
00333
00334 osync_sink_engine_unref(sinkengine);
00335 }
00336
00337
00338 static void _osync_obj_engine_read_callback(OSyncClientProxy *proxy, void *userdata, OSyncError *error)
00339 {
00340 OSyncSinkEngine *sinkengine = userdata;
00341 OSyncObjEngine *engine = sinkengine->engine;
00342 OSyncError *locerror = NULL;
00343
00344 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, proxy, userdata, error);
00345
00346 if (error) {
00347 osync_obj_engine_set_error(engine, error);
00348 engine->sink_errors = engine->sink_errors | (0x1 << sinkengine->position);
00349 osync_status_update_member(engine->parent, osync_client_proxy_get_member(proxy), OSYNC_CLIENT_EVENT_ERROR, engine->objtype, error);
00350 } else {
00351 engine->sink_get_changes = engine->sink_get_changes | (0x1 << sinkengine->position);
00352 osync_status_update_member(engine->parent, osync_client_proxy_get_member(proxy), OSYNC_CLIENT_EVENT_READ, engine->objtype, NULL);
00353 }
00354
00355 if (osync_bitcount(engine->sink_errors | engine->sink_get_changes) == osync_obj_engine_num_sinkengines(engine)) {
00356
00357 if (osync_bitcount(engine->sink_get_changes) < osync_bitcount(engine->sink_connects)) {
00358 osync_error_set(&locerror, OSYNC_ERROR_GENERIC, "Fewer sink_engines reported get_changes than connected");
00359 osync_obj_engine_set_error(engine, locerror);
00360 }
00361
00362 osync_obj_engine_event(engine, OSYNC_ENGINE_EVENT_READ, locerror ? locerror : error);
00363 } else
00364 osync_trace(TRACE_INTERNAL, "Not yet: %i", osync_bitcount(engine->sink_errors | engine->sink_get_changes));
00365
00366 osync_sink_engine_unref(sinkengine);
00367 osync_trace(TRACE_EXIT, "%s", __func__);
00368 }
00369
00370 osync_bool osync_obj_engine_receive_change(OSyncObjEngine *objengine, OSyncClientProxy *proxy, OSyncChange *change, OSyncError **error)
00371 {
00372 OSyncSinkEngine *sinkengine = NULL;
00373 OSyncList *e = NULL;
00374
00375 osync_assert(objengine);
00376
00377 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p, %p)", __func__, objengine, proxy, change, error);
00378
00379
00380 sinkengine = osync_obj_engine_find_proxy_sinkengine(objengine, proxy);
00381
00382 if (!sinkengine) {
00383 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to find sinkengine");
00384 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00385 return FALSE;
00386 }
00387
00388
00389 for (e = sinkengine->entries; e; e = e->next) {
00390 OSyncMappingEntryEngine *mapping_engine = e->data;
00391
00392 if (osync_entry_engine_matches(mapping_engine, change)) {
00393 osync_entry_engine_update(mapping_engine, change);
00394
00395 osync_status_update_change(sinkengine->engine->parent, change, osync_client_proxy_get_member(proxy), mapping_engine->mapping_engine->mapping, OSYNC_CHANGE_EVENT_READ, NULL);
00396
00397 osync_trace(TRACE_EXIT, "%s: Updated", __func__);
00398 return TRUE;
00399 }
00400 }
00401
00402 osync_status_update_change(sinkengine->engine->parent, change, osync_client_proxy_get_member(proxy), NULL, OSYNC_CHANGE_EVENT_READ, NULL);
00403
00404
00405
00406 sinkengine->unmapped = osync_list_append(sinkengine->unmapped, change);
00407 osync_change_ref(change);
00408
00409 osync_trace(TRACE_EXIT, "%s: Unmapped", __func__);
00410 return TRUE;
00411 }
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424 static void _osync_obj_engine_generate_written_event(OSyncObjEngine *engine, OSyncError *error)
00425 {
00426 osync_bool dirty = FALSE;
00427 OSyncList *p = NULL;
00428 OSyncList *e = NULL;
00429 OSyncSinkEngine *sinkengine = NULL;
00430 OSyncError *locerror = NULL;
00431
00432 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, engine, error);
00433
00434
00435 for (p = engine->sink_engines; p; p = p->next) {
00436 OSyncMember *member = NULL;
00437 OSyncObjTypeSink *objtype_sink = NULL;
00438
00439 sinkengine = p->data;
00440 member = osync_client_proxy_get_member(sinkengine->proxy);
00441 objtype_sink = osync_member_find_objtype_sink(member, engine->objtype);
00442
00443
00444 if (!objtype_sink || !osync_objtype_sink_get_write(objtype_sink))
00445 break;
00446
00447 for (e = sinkengine->entries; e; e = e->next) {
00448 OSyncMappingEntryEngine *entry_engine = e->data;
00449 if (osync_entry_engine_is_dirty(entry_engine) == TRUE) {
00450 dirty = TRUE;
00451 break;
00452 }
00453 }
00454 if (dirty) {
00455 osync_trace(TRACE_EXIT, "%s: Still dirty", __func__);
00456 return;
00457 }
00458 }
00459 osync_trace(TRACE_INTERNAL, "%s: Not dirty anymore", __func__);
00460
00461
00462
00463
00464 if (osync_bitcount(engine->sink_errors | engine->sink_written) == osync_obj_engine_num_sinkengines(engine)) {
00465 if (osync_bitcount(engine->sink_written) < osync_bitcount(engine->sink_connects)) {
00466 osync_error_set(&locerror, OSYNC_ERROR_GENERIC, "Fewer sink_engines reported committed all than connected");
00467 osync_obj_engine_set_error(engine, locerror);
00468 } else if (osync_bitcount(engine->sink_errors)) {
00469
00470 osync_error_set(&locerror, OSYNC_ERROR_GENERIC, "At least one Sink Engine failed while committing");
00471 osync_obj_engine_set_error(engine, locerror);
00472 }
00473
00474 osync_obj_engine_event(engine, OSYNC_ENGINE_EVENT_WRITTEN, locerror ? locerror : error);
00475
00476 } else
00477 osync_trace(TRACE_INTERNAL, "Not yet: %i", osync_bitcount(engine->sink_errors | engine->sink_written));
00478
00479 osync_trace(TRACE_EXIT, "%s", __func__);
00480 }
00481
00482 void osync_obj_engine_commit_change_callback(OSyncClientProxy *proxy, void *userdata, const char *uid, OSyncError *error)
00483 {
00484 OSyncMappingEntryEngine *entry_engine = userdata;
00485 OSyncObjEngine *engine = entry_engine->objengine;
00486 OSyncSinkEngine *sinkengine = entry_engine->sink_engine;
00487 OSyncError *locerror = NULL;
00488 OSyncMapping *mapping = NULL;
00489 OSyncMember *member = NULL;
00490 OSyncMappingEntry *entry = NULL;
00491 const char *objtype = NULL;
00492 const char *objengine_objtype = NULL;
00493 long long int id = 0;
00494
00495 objengine_objtype = osync_obj_engine_get_objtype(engine);
00496
00497 osync_trace(TRACE_ENTRY, "%s(%p, %p, %s, %p)", __func__, proxy, userdata, uid, error);
00498
00499 osync_entry_engine_set_dirty(entry_engine, FALSE);
00500
00501 mapping = entry_engine->mapping_engine->mapping;
00502 member = osync_client_proxy_get_member(proxy);
00503 entry = entry_engine->entry;
00504 objtype = osync_change_get_objtype(entry_engine->change);
00505 id = osync_mapping_entry_get_id(entry);
00506
00507 if (error) {
00508
00509
00510
00511 osync_status_update_change(engine->parent, entry_engine->change, osync_client_proxy_get_member(proxy), entry_engine->mapping_engine->mapping, OSYNC_CHANGE_EVENT_ERROR, error);
00512 osync_status_update_mapping(engine->parent, entry_engine->mapping_engine, OSYNC_MAPPING_EVENT_ERROR, error);
00513
00514 osync_obj_engine_set_error(engine, error);
00515 engine->sink_errors = engine->sink_errors | (0x1 << sinkengine->position);
00516 goto error;
00517 }
00518
00519 if (uid)
00520 osync_change_set_uid(entry_engine->change, uid);
00521
00522 if (engine->archive) {
00523 if (osync_change_get_changetype(entry_engine->change) == OSYNC_CHANGE_TYPE_DELETED) {
00524
00525 osync_archive_delete_change(engine->archive, id, objtype, &locerror);
00526 } else {
00527
00528
00529 osync_archive_save_change(engine->archive, id, osync_change_get_uid(entry_engine->change), objtype, osync_mapping_get_id(mapping), osync_member_get_id(member), objengine_objtype, &locerror);
00530 }
00531 }
00532
00533 osync_assert(entry_engine->mapping_engine);
00534 osync_status_update_change(engine->parent, entry_engine->change, osync_client_proxy_get_member(proxy), entry_engine->mapping_engine->mapping, OSYNC_CHANGE_EVENT_WRITTEN, NULL);
00535 osync_entry_engine_update(entry_engine, NULL);
00536
00537 osync_trace(TRACE_EXIT, "%s", __func__);
00538 return;
00539
00540 error:
00541 _osync_obj_engine_generate_written_event(engine, error);
00542 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(&error));
00543 }
00544
00545 void osync_obj_engine_written_callback(OSyncClientProxy *proxy, void *userdata, OSyncError *error)
00546 {
00547 OSyncSinkEngine *sinkengine = userdata;
00548 OSyncObjEngine *engine = sinkengine->engine;
00549
00550 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, proxy, userdata, error);
00551
00552 if (error) {
00553 osync_obj_engine_set_error(engine, error);
00554 engine->sink_errors = engine->sink_errors | (0x1 << sinkengine->position);
00555 osync_status_update_member(engine->parent, osync_client_proxy_get_member(proxy), OSYNC_CLIENT_EVENT_ERROR, engine->objtype, error);
00556 } else {
00557 engine->sink_written = engine->sink_written | (0x1 << sinkengine->position);
00558 osync_status_update_member(engine->parent, osync_client_proxy_get_member(proxy), OSYNC_CLIENT_EVENT_WRITTEN, engine->objtype, NULL);
00559 }
00560
00561 _osync_obj_engine_generate_written_event(engine, error);
00562
00563 osync_trace(TRACE_EXIT, "%s", __func__);
00564 }
00565
00566 static void _osync_obj_engine_sync_done_callback(OSyncClientProxy *proxy, void *userdata, OSyncError *error)
00567 {
00568 OSyncSinkEngine *sinkengine = userdata;
00569 OSyncObjEngine *engine = sinkengine->engine;
00570 OSyncError *locerror = NULL;
00571
00572 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, proxy, userdata, error);
00573
00574 if (error) {
00575 osync_obj_engine_set_error(engine, error);
00576 engine->sink_errors = engine->sink_errors | (0x1 << sinkengine->position);
00577 osync_status_update_member(engine->parent, osync_client_proxy_get_member(proxy), OSYNC_CLIENT_EVENT_ERROR, engine->objtype, error);
00578 } else {
00579 engine->sink_sync_done = engine->sink_sync_done | (0x1 << sinkengine->position);
00580 osync_status_update_member(engine->parent, osync_client_proxy_get_member(proxy), OSYNC_CLIENT_EVENT_SYNC_DONE, engine->objtype, NULL);
00581 }
00582
00583 if (osync_bitcount(engine->sink_errors | engine->sink_sync_done) == osync_obj_engine_num_sinkengines(engine)) {
00584 if (osync_bitcount(engine->sink_sync_done) < osync_bitcount(engine->sink_connects)) {
00585 osync_error_set(&locerror, OSYNC_ERROR_GENERIC, "Fewer sink_engines reported sync_done than connected");
00586 osync_obj_engine_set_error(engine, locerror);
00587 }
00588
00589 osync_obj_engine_event(engine, OSYNC_ENGINE_EVENT_SYNC_DONE, locerror ? locerror : error);
00590 } else
00591 osync_trace(TRACE_INTERNAL, "Not yet: %i", osync_bitcount(engine->sink_errors | engine->sink_sync_done));
00592
00593 osync_sink_engine_unref(sinkengine);
00594 osync_trace(TRACE_EXIT, "%s", __func__);
00595 }
00596
00597 static osync_bool _create_mapping_engines(OSyncObjEngine *engine, OSyncError **error)
00598 {
00599 int i = 0;
00600 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, engine, error);
00601
00602 for (i = 0; i < osync_mapping_table_num_mappings(engine->mapping_table); i++) {
00603 OSyncMapping *mapping = osync_mapping_table_nth_mapping(engine->mapping_table, i);
00604
00605 OSyncMappingEngine *mapping_engine = osync_mapping_engine_new(engine, mapping, error);
00606 if (!mapping_engine)
00607 goto error;
00608
00609 engine->mapping_engines = osync_list_append(engine->mapping_engines, mapping_engine);
00610 }
00611
00612 osync_trace(TRACE_EXIT, "%s", __func__);
00613 return TRUE;
00614
00615 error:
00616 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00617 return FALSE;
00618 }
00619
00620 static osync_bool _inject_changelog_entries(OSyncObjEngine *engine, OSyncError **error) {
00621 OSyncList *ids = NULL;
00622 OSyncList *changetypes = NULL;
00623 OSyncList *j = NULL, *t = NULL;
00624
00625 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
00626
00627 osync_assert(engine);
00628 osync_assert(engine->archive);
00629 osync_assert(engine->objtype);
00630
00631 if (!osync_archive_load_ignored_conflicts(engine->archive, engine->objtype, &ids, &changetypes, error)) {
00632 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00633 return FALSE;
00634 }
00635
00636 t = changetypes;
00637 for (j = ids; j; j = j->next) {
00638 long long int id = (long long int)GPOINTER_TO_INT(j->data);
00639
00640 OSyncMapping *ignored_mapping = osync_mapping_table_find_mapping(engine->mapping_table, id);
00641
00642 OSyncList *e;
00643 for (e = engine->mapping_engines; e; e = e->next) {
00644 OSyncMappingEngine *mapping_engine = e->data;
00645
00646 if (mapping_engine->mapping == ignored_mapping) {
00647 OSyncList *m;
00648 for (m = mapping_engine->entries; m; m = m->next) {
00649 OSyncMappingEntryEngine *entry = m->data;
00650 OSyncChangeType changetype = (OSyncChangeType) t->data;
00651 OSyncChange *ignored_change = osync_change_new(error);
00652 OSyncObjFormat *dummyformat = NULL;
00653 OSyncData *data = NULL;
00654
00655 osync_change_set_changetype(ignored_change, changetype);
00656 osync_entry_engine_update(entry, ignored_change);
00657
00658 dummyformat = osync_objformat_new("plain", engine->objtype, NULL);
00659 data = osync_data_new(NULL, 0, dummyformat, NULL);
00660 osync_change_set_data(ignored_change, data);
00661 osync_objformat_unref(dummyformat);
00662
00663 osync_change_set_uid(ignored_change, osync_mapping_entry_get_uid(entry->entry));
00664
00665 osync_trace(TRACE_INTERNAL, "CHANGE: %p", entry->change);
00666 }
00667 break;
00668 }
00669 }
00670
00671 t = t->next;
00672 }
00673
00674 osync_list_free(ids);
00675 osync_list_free(changetypes);
00676
00677 osync_trace(TRACE_EXIT, "%s", __func__);
00678 return TRUE;
00679 }
00680
00681
00682 OSyncObjEngine *osync_obj_engine_new(OSyncEngine *parent, const char *objtype, OSyncFormatEnv *formatenv, OSyncError **error)
00683 {
00684 OSyncObjEngine *engine = NULL;
00685 osync_assert(parent);
00686 osync_assert(objtype);
00687
00688 osync_trace(TRACE_ENTRY, "%s(%p, %s, %p, %p)", __func__, parent, objtype, formatenv, error);
00689
00690 engine = osync_try_malloc0(sizeof(OSyncObjEngine), error);
00691 if (!engine)
00692 goto error;
00693 engine->ref_count = 1;
00694 engine->slowsync = FALSE;
00695 engine->written = FALSE;
00696
00697
00698
00699 engine->parent = parent;
00700
00701 engine->objtype = osync_strdup(objtype);
00702 engine->formatenv = osync_format_env_ref(formatenv);
00703
00704 engine->mapping_table = osync_mapping_table_new(error);
00705 if (!engine->mapping_table)
00706 goto error_free_engine;
00707
00708 engine->archive = osync_engine_get_archive(parent);
00709
00710 osync_trace(TRACE_EXIT, "%s: %p", __func__, engine);
00711 return engine;
00712
00713 error_free_engine:
00714 osync_obj_engine_unref(engine);
00715 error:
00716 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00717 return NULL;
00718 }
00719
00720 OSyncObjEngine *osync_obj_engine_ref(OSyncObjEngine *engine)
00721 {
00722 osync_assert(engine);
00723
00724 g_atomic_int_inc(&(engine->ref_count));
00725
00726 return engine;
00727 }
00728
00729 void osync_obj_engine_unref(OSyncObjEngine *engine)
00730 {
00731 osync_assert(engine);
00732
00733 if (g_atomic_int_dec_and_test(&(engine->ref_count))) {
00734 while (engine->sink_engines) {
00735 OSyncSinkEngine *sinkengine = engine->sink_engines->data;
00736 osync_sink_engine_unref(sinkengine);
00737
00738 engine->sink_engines = osync_list_remove(engine->sink_engines, sinkengine);
00739 }
00740
00741 while (engine->mapping_engines) {
00742 OSyncMappingEngine *mapping_engine = engine->mapping_engines->data;
00743 osync_mapping_engine_unref(mapping_engine);
00744
00745 engine->mapping_engines = osync_list_remove(engine->mapping_engines, mapping_engine);
00746 }
00747
00748 if (engine->error)
00749 osync_error_unref(&engine->error);
00750
00751 if (engine->objtype)
00752 osync_free(engine->objtype);
00753
00754 if (engine->mapping_table)
00755 osync_mapping_table_unref(engine->mapping_table);
00756
00757 if (engine->formatenv)
00758 osync_format_env_unref(engine->formatenv);
00759
00760 osync_free(engine);
00761 }
00762 }
00763
00764 static int _osync_obj_engine_num_write_sinks(OSyncObjEngine *objengine) {
00765 int num = 0;
00766 OSyncList *p = NULL;
00767 OSyncSinkEngine *sink;
00768
00769 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, objengine);
00770
00771 for (p = objengine->sink_engines; p; p = p->next) {
00772 sink = p->data;
00773
00774
00775 }
00776
00777 osync_trace(TRACE_EXIT, "%s: %i", __func__, num);
00778 return num;
00779 }
00780
00781 osync_bool osync_obj_engine_initialize(OSyncObjEngine *engine, OSyncError **error)
00782 {
00783 const char *objtype = NULL;
00784 int num = 0;
00785 int i = 0;
00786
00787 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, engine, error);
00788
00789 osync_trace(TRACE_INTERNAL, "Loaded %i mappings", osync_mapping_table_num_mappings(engine->mapping_table));
00790
00791 objtype = osync_obj_engine_get_objtype(engine);
00792
00793 num = osync_engine_num_proxies(engine->parent);
00794 for (i = 0; i < num; i++) {
00795 OSyncClientProxy *proxy = osync_engine_nth_proxy(engine->parent, i);
00796 OSyncObjTypeSink *sink = osync_client_proxy_find_objtype_sink(proxy, objtype);
00797 OSyncSinkEngine *sinkengine = NULL;
00798
00799
00800 sinkengine = osync_sink_engine_new(i, proxy, engine, error);
00801 if (!sinkengine)
00802 goto error;
00803
00804 if (!sink)
00805 engine->dummy_sink_engines = osync_list_append(engine->dummy_sink_engines, sinkengine);
00806 else
00807 engine->active_sink_engines = osync_list_append(engine->active_sink_engines, sinkengine);
00808
00809 engine->sink_engines = osync_list_append(engine->sink_engines, sinkengine);
00810 }
00811
00812 if (engine->archive && engine->slowsync) {
00813 if (!osync_mapping_table_flush(engine->mapping_table, engine->archive, engine->objtype, error))
00814 goto error;
00815 }
00816
00817 if (engine->archive) {
00818 if (!osync_mapping_table_load(engine->mapping_table, engine->archive, engine->objtype, error))
00819 goto error;
00820 }
00821
00822 if (!_create_mapping_engines(engine, error))
00823 goto error;
00824
00825 osync_trace(TRACE_INTERNAL, "Created %i mapping engine", osync_list_length(engine->mapping_engines));
00826
00827 if (engine->archive) {
00828
00829 if (!_inject_changelog_entries(engine, error))
00830 goto error;
00831 }
00832
00833 osync_trace(TRACE_EXIT, "%s", __func__);
00834 return TRUE;
00835 error:
00836
00837 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00838 return FALSE;
00839 }
00840
00841 void osync_obj_engine_finalize(OSyncObjEngine *engine)
00842 {
00843 OSyncMappingEngine *mapping_engine;
00844 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
00845
00846 engine->slowsync = FALSE;
00847 engine->written = FALSE;
00848
00849 engine->sink_errors = 0;
00850 engine->sink_connects = 0;
00851 engine->sink_connect_done = 0;
00852 engine->sink_disconnects = 0;
00853 engine->sink_get_changes = 0;
00854 engine->sink_sync_done = 0;
00855 engine->sink_written = 0;
00856
00857 engine->conflicts_solved = 0;
00858
00859 while (engine->sink_engines) {
00860 OSyncSinkEngine *sinkengine = engine->sink_engines->data;
00861 osync_sink_engine_unref(sinkengine);
00862
00863 engine->sink_engines = osync_list_remove(engine->sink_engines, sinkengine);
00864 }
00865
00866 osync_list_free(engine->active_sink_engines);
00867 osync_list_free(engine->dummy_sink_engines);
00868 engine->active_sink_engines = NULL;
00869 engine->dummy_sink_engines = NULL;
00870
00871 while (engine->conflicts) {
00872 mapping_engine = engine->conflicts->data;
00873
00874
00875 engine->conflicts = osync_list_remove(engine->conflicts, mapping_engine);
00876 }
00877
00878 while (engine->mapping_engines) {
00879 mapping_engine = engine->mapping_engines->data;
00880 osync_mapping_engine_unref(mapping_engine);
00881
00882 engine->mapping_engines = osync_list_remove(engine->mapping_engines, mapping_engine);
00883 }
00884
00885 if (engine->mapping_table)
00886 osync_mapping_table_close(engine->mapping_table);
00887
00888 osync_trace(TRACE_EXIT, "%s", __func__);
00889 }
00890
00891 const char *osync_obj_engine_get_objtype(OSyncObjEngine *engine)
00892 {
00893 osync_return_val_if_fail(engine, "");
00894 return engine->objtype;
00895 }
00896
00897 void osync_obj_engine_set_slowsync(OSyncObjEngine *engine, osync_bool slowsync)
00898 {
00899 osync_assert(engine);
00900 engine->slowsync = slowsync;
00901 }
00902
00903 osync_bool osync_obj_engine_get_slowsync(OSyncObjEngine *engine)
00904 {
00905 osync_assert(engine);
00906 return engine->slowsync;
00907 }
00908
00909 osync_bool osync_obj_engine_command(OSyncObjEngine *engine, OSyncEngineCmd cmd, OSyncError **error)
00910 {
00911 OSyncList *p = NULL;
00912 OSyncList *o = NULL;
00913 OSyncSinkEngine *sinkengine = NULL;
00914
00915
00916 osync_trace(TRACE_ENTRY, "%s(%p:%s, %s, %p)", __func__,
00917 engine, osync_obj_engine_get_objtype(engine),
00918 osync_engine_get_cmdstr(cmd), error);
00919
00920 osync_assert(engine);
00921
00922 int write_sinks = 0;
00923 osync_bool proxy_disconnect = FALSE;
00924
00925 switch (cmd) {
00926 case OSYNC_ENGINE_COMMAND_CONNECT:
00927 for (p = engine->active_sink_engines; p; p = p->next) {
00928 sinkengine = p->data;
00929
00930 if (!osync_client_proxy_connect(sinkengine->proxy, _osync_obj_engine_connect_callback, sinkengine, engine->objtype, engine->slowsync, error))
00931 goto error;
00932 osync_sink_engine_ref(sinkengine);
00933 }
00934 break;
00935 case OSYNC_ENGINE_COMMAND_CONNECT_DONE:
00936 for (p = engine->active_sink_engines; p; p = p->next) {
00937 sinkengine = p->data;
00938
00939 if (!osync_client_proxy_connect_done(sinkengine->proxy, _osync_obj_engine_connect_done_callback, sinkengine, engine->objtype, error))
00940 goto error;
00941 osync_sink_engine_ref(sinkengine);
00942 }
00943 break;
00944 case OSYNC_ENGINE_COMMAND_READ:
00945 for (p = engine->active_sink_engines; p; p = p->next) {
00946 sinkengine = p->data;
00947
00948 for (o = sinkengine->entries; o; o = o->next) {
00949 OSyncMappingEntryEngine *entry = o->data;
00950 OSyncChange *change = entry->change;
00951
00952 if (!change)
00953 continue;
00954
00955 if (!osync_client_proxy_read(sinkengine->proxy, _osync_obj_engine_read_ignored_callback, sinkengine, change, error))
00956 goto error;
00957 osync_sink_engine_ref(sinkengine);
00958 }
00959 }
00960
00961 if (engine->archive) {
00962
00963 if (!osync_archive_flush_ignored_conflict(engine->archive, engine->objtype, error))
00964 goto error;
00965 }
00966
00967 write_sinks = _osync_obj_engine_num_write_sinks(engine);
00968
00969
00970 for (p = engine->active_sink_engines; p; p = p->next) {
00971 OSyncMember *member = NULL;
00972 OSyncObjTypeSink *objtype_sink = NULL;
00973
00974 sinkengine = p->data;
00975
00976 member = osync_client_proxy_get_member(sinkengine->proxy);
00977
00978 objtype_sink = osync_member_find_objtype_sink(member, engine->objtype);
00979
00980
00981 if (objtype_sink && osync_objtype_sink_get_write(objtype_sink) && write_sinks) {
00982 _osync_obj_engine_read_callback(sinkengine->proxy, sinkengine, *error);
00983 osync_trace(TRACE_INTERNAL, "no other writable sinks .... SKIP");
00984 continue;
00985 }
00986
00987 if (!osync_client_proxy_get_changes(sinkengine->proxy, _osync_obj_engine_read_callback, sinkengine, engine->objtype, engine->slowsync, error))
00988 goto error;
00989 osync_sink_engine_ref(sinkengine);
00990 }
00991
00992 break;
00993 case OSYNC_ENGINE_COMMAND_PREPARE_MAP:
00994
00995
00996
00997 osync_obj_engine_event(engine, OSYNC_ENGINE_EVENT_PREPARED_MAP, *error);
00998 break;
00999 case OSYNC_ENGINE_COMMAND_MAP:
01000
01001 if (osync_obj_engine_map_changes(engine, error)) {
01002 for (p = engine->mapping_engines; p; p = p->next) {
01003 OSyncMappingEngine *mapping_engine = p->data;
01004
01005
01006 if (mapping_engine->synced)
01007 continue;
01008
01009 if (!osync_mapping_engine_check_conflict(mapping_engine)) {
01010 osync_error_set(error, OSYNC_ERROR_GENERIC, "Error while resolving conflicts");
01011 break;
01012
01013
01014
01015
01016 }
01017 }
01018 }
01019
01020 osync_obj_engine_event(engine, OSYNC_ENGINE_EVENT_MAPPED, *error);
01021 if (*error) {
01022 osync_obj_engine_set_error(engine, *error);
01023 osync_error_unref(error);
01024 }
01025
01026 break;
01027 case OSYNC_ENGINE_COMMAND_END_CONFLICTS:
01028
01029 osync_trace(TRACE_INTERNAL, "Check for pending conflicts");
01030
01031 if (engine->conflicts) {
01032 osync_trace(TRACE_INTERNAL, "Delay. Total pending conflicts: %u", osync_list_length(engine->conflicts));
01033 break;
01034 }
01035
01036 if (engine->conflicts_solved) {
01037 osync_trace(TRACE_INTERNAL, "Conflicts already solved.");
01038 break;
01039 }
01040
01041 engine->conflicts_solved = TRUE;
01042
01043 osync_obj_engine_event(engine, OSYNC_ENGINE_EVENT_END_CONFLICTS, *error);
01044 if (*error) {
01045 osync_obj_engine_set_error(engine, *error);
01046 osync_error_unref(error);
01047 }
01048
01049 break;
01050 case OSYNC_ENGINE_COMMAND_MULTIPLY:
01051
01052
01053 osync_trace(TRACE_INTERNAL, "Multiplying %u mappings", osync_list_length(engine->mapping_engines));
01054 for (p = engine->mapping_engines; p; p = p->next) {
01055 OSyncMappingEngine *mapping_engine = p->data;
01056 if (!osync_mapping_engine_multiply(mapping_engine, error))
01057 break;
01058 }
01059
01060 osync_obj_engine_event(engine, OSYNC_ENGINE_EVENT_MULTIPLIED, *error);
01061
01062 break;
01063 case OSYNC_ENGINE_COMMAND_PREPARE_WRITE:
01064
01065 osync_obj_engine_prepare_write(engine, error);
01066
01067 osync_obj_engine_event(engine, OSYNC_ENGINE_EVENT_PREPARED_WRITE, *error);
01068 break;
01069 case OSYNC_ENGINE_COMMAND_WRITE:
01070 if (engine->conflicts) {
01071 osync_trace(TRACE_INTERNAL, "We still have conflict. Delaying write");
01072 break;
01073 }
01074
01075 if (engine->written) {
01076 osync_trace(TRACE_INTERNAL, "Already written");
01077 break;
01078 }
01079
01080 engine->written = TRUE;
01081
01082
01083 osync_trace(TRACE_INTERNAL, "Starting to write");
01084 if (!osync_obj_engine_write(engine, error))
01085 goto error;
01086
01087
01088
01089 break;
01090 case OSYNC_ENGINE_COMMAND_SYNC_DONE:
01091 for (p = engine->active_sink_engines; p; p = p->next) {
01092 sinkengine = p->data;
01093
01094 if (!osync_client_proxy_sync_done(sinkengine->proxy, _osync_obj_engine_sync_done_callback, sinkengine, engine->objtype, error))
01095 goto error;
01096 osync_sink_engine_ref(sinkengine);
01097 }
01098 break;
01099 case OSYNC_ENGINE_COMMAND_DISCONNECT:;
01100 for (p = engine->active_sink_engines; p; p = p->next) {
01101 sinkengine = p->data;
01102
01103
01104
01105
01106 if (!osync_sink_engine_is_connected(sinkengine))
01107 continue;
01108
01109 proxy_disconnect = TRUE;
01110
01111 if (!osync_client_proxy_disconnect(sinkengine->proxy, _osync_obj_engine_disconnect_callback, sinkengine, engine->objtype, error))
01112 goto error;
01113 osync_sink_engine_ref(sinkengine);
01114 }
01115
01116
01117
01118
01119 if (!proxy_disconnect)
01120 _osync_obj_engine_generate_event_disconnected(engine, NULL);
01121
01122 break;
01123 case OSYNC_ENGINE_COMMAND_SOLVE:
01124 case OSYNC_ENGINE_COMMAND_DISCOVER:
01125 case OSYNC_ENGINE_COMMAND_ABORT:
01126 break;
01127 }
01128
01129 osync_trace(TRACE_EXIT, "%s", __func__);
01130 return TRUE;
01131
01132 error:
01133 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
01134 return FALSE;
01135 }
01136
01137
01138 void osync_obj_engine_event(OSyncObjEngine *engine, OSyncEngineEvent event, OSyncError *error)
01139 {
01140 osync_trace(TRACE_ENTRY, "%s(%p, %s, %p)", __func__, engine, osync_engine_get_eventstr(event), error);
01141 osync_assert(engine);
01142
01143
01144 osync_assert_msg(event != OSYNC_ENGINE_EVENT_ERROR, "OSyncObjEngine isn't supposed to emit OSYNC_ENGINE_EVENT_ERROR events!");
01145
01146
01147
01148
01149
01150
01151
01152
01153
01154
01155
01156
01157
01158
01159
01160
01161
01162
01163 engine->callback(engine, event, error, engine->callback_userdata);
01164
01165 osync_trace(TRACE_EXIT, "%s", __func__);
01166 return;
01167 }
01168
01169 void osync_obj_engine_set_callback(OSyncObjEngine *engine, OSyncObjEngineEventCallback callback, void *userdata)
01170 {
01171 osync_assert(engine);
01172 engine->callback = callback;
01173 engine->callback_userdata = userdata;
01174 }
01175
01176 void osync_obj_engine_set_error(OSyncObjEngine *engine, OSyncError *error)
01177 {
01178 osync_assert(engine);
01179 if (engine->error) {
01180 osync_error_stack(&error, &engine->error);
01181 osync_error_unref(&engine->error);
01182 }
01183 engine->error = error;
01184 osync_error_ref(&error);
01185 }
01186
01187 OSyncSinkEngine *osync_obj_engine_find_proxy_sinkengine(OSyncObjEngine *engine, OSyncClientProxy *proxy)
01188 {
01189 OSyncList *s;
01190 OSyncSinkEngine *sinkengine;
01191 osync_return_val_if_fail(engine, NULL);
01192 osync_return_val_if_fail(proxy, NULL);
01193
01194 for (s = engine->sink_engines; s; s = s->next) {
01195 sinkengine = s->data;
01196 if (sinkengine->proxy == proxy)
01197 break;
01198 sinkengine = NULL;
01199 }
01200
01201 return sinkengine;
01202 }
01203
01204 OSyncSinkEngine *osync_obj_engine_nth_sinkengine(OSyncObjEngine *engine, unsigned int nth)
01205 {
01206 osync_return_val_if_fail(engine, NULL);
01207 return osync_list_nth_data(engine->sink_engines, nth);
01208 }
01209
01210 unsigned int osync_obj_engine_num_sinkengines(OSyncObjEngine *engine)
01211 {
01212 osync_assert(engine);
01213 return osync_list_length(engine->active_sink_engines);
01214 }
01215
01216 unsigned int osync_obj_engine_num_mapping_engines(OSyncObjEngine *engine)
01217 {
01218 osync_assert(engine);
01219 return osync_list_length(engine->mapping_engines);
01220 }
01221
01222 unsigned int osync_obj_engine_num_members(OSyncObjEngine *engine)
01223 {
01224 osync_assert(engine);
01225 return osync_obj_engine_num_sinkengines(engine);
01226 }
01227
01228 OSyncMember *osync_obj_engine_nth_member(OSyncObjEngine *engine, unsigned int nth)
01229 {
01230 OSyncSinkEngine *sinkengine;
01231 osync_return_val_if_fail(engine, NULL);
01232 sinkengine = osync_list_nth_data(engine->active_sink_engines, nth);
01233 osync_assert(sinkengine);
01234 return osync_sink_engine_get_member(sinkengine);
01235 }
01236
01237 const OSyncList *osync_obj_engine_get_mapping_entry_engines_of_member(OSyncObjEngine *engine, OSyncMember *member)
01238 {
01239 OSyncList *s;
01240 osync_return_val_if_fail(engine, NULL);
01241 osync_return_val_if_fail(member, NULL);
01242
01243 for (s = engine->active_sink_engines; s; s = s->next) {
01244 OSyncSinkEngine *sinkengine = s->data;
01245
01246 if (member != osync_sink_engine_get_member(sinkengine))
01247 continue;
01248
01249 return osync_sink_engine_get_mapping_entry_engines(sinkengine);
01250 }
01251
01252 return NULL;
01253 }
01254
01255 osync_bool osync_obj_engine_prepare_write(OSyncObjEngine *engine, OSyncError **error)
01256 {
01257 OSyncList *p;
01258 osync_bool merger_enabled, converter_enabled;
01259 OSyncGroup *group;
01260
01261 osync_assert(engine);
01262
01263 group = osync_engine_get_group(engine->parent);
01264 merger_enabled = osync_group_get_merger_enabled(group);
01265 converter_enabled = osync_group_get_converter_enabled(group);
01266
01267 if (!merger_enabled && !converter_enabled)
01268 return TRUE;
01269
01270 for (p = engine->active_sink_engines; p; p = p->next) {
01271 OSyncSinkEngine *sinkengine = p->data;
01272
01273 if (merger_enabled
01274 && !osync_sink_engine_demerge(sinkengine, engine->archive, error))
01275 goto error;
01276
01277 if (converter_enabled
01278 && !osync_sink_engine_convert_to_dest(sinkengine, engine->formatenv, error))
01279 goto error;
01280
01281 }
01282
01283 return TRUE;
01284
01285 error:
01286 return FALSE;
01287 }
01288
01289 osync_bool osync_obj_engine_write(OSyncObjEngine *engine, OSyncError **error)
01290 {
01291 OSyncList *p;
01292 OSyncMember *member;
01293 OSyncObjTypeSink *objtype_sink;
01294
01295 osync_assert(engine);
01296
01297 for (p = engine->active_sink_engines; p; p = p->next) {
01298 OSyncSinkEngine *sinkengine = p->data;
01299
01300 member = osync_client_proxy_get_member(sinkengine->proxy);
01301 objtype_sink = osync_member_find_objtype_sink(member, engine->objtype);
01302
01303
01304 osync_assert(objtype_sink);
01305
01306
01307 if (!osync_objtype_sink_get_write(objtype_sink))
01308 continue;
01309
01310 if (!osync_sink_engine_write(sinkengine, engine->archive, error))
01311 goto error;
01312 }
01313
01314 return TRUE;
01315
01316 error:
01317 return FALSE;
01318 }
01319