00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "opensync.h"
00022 #include "opensync_internals.h"
00023
00024 #include "opensync-context.h"
00025 #include "opensync-plugin.h"
00026 #include "plugin/opensync_plugin_internals.h"
00027 #include "plugin/opensync_objtype_sink_internals.h"
00028
00029 #include "opensync-ipc.h"
00030 #include "ipc/opensync_serializer_internals.h"
00031 #include "ipc/opensync_message_internals.h"
00032 #include "ipc/opensync_queue_internals.h"
00033
00034 #include "opensync-format.h"
00035
00036 #include "opensync-version.h"
00037 #include "version/opensync_version_internals.h"
00038
00039 #include "opensync-merger.h"
00040
00041 #include "opensync-client.h"
00042 #include "opensync_client_internals.h"
00043 #include "opensync_client_private.h"
00044
00045 #ifdef OPENSYNC_UNITTESTS
00046 #include "plugin/opensync_plugin_info_private.h"
00047 #endif
00048
00049 typedef struct callContext {
00050 OSyncClient *client;
00051 OSyncMessage *message;
00052 OSyncChange *change;
00053 } callContext;
00054
00055 static OSyncContext *_create_context(OSyncClient *client, OSyncMessage *message, OSyncContextCallbackFn callback, OSyncChange *change, OSyncError **error)
00056 {
00057 OSyncContext *context = NULL;
00058 callContext *baton = NULL;
00059 context = osync_context_new(error);
00060 if (!context)
00061 goto error;
00062
00063 baton = osync_try_malloc0(sizeof(callContext), error);
00064 if (!baton)
00065 goto error_free_context;
00066
00067 baton->client = client;
00068 osync_client_ref(baton->client);
00069
00070 baton->message = message;
00071 osync_message_ref(message);
00072
00073 baton->change = change;
00074 if (baton->change)
00075 osync_change_ref(baton->change);
00076
00077 osync_context_set_callback(context, callback, baton);
00078 return context;
00079
00080 error_free_context:
00081 osync_context_unref(context);
00082 error:
00083 return FALSE;
00084 }
00085
00086 static void _free_baton(callContext *baton)
00087 {
00088 osync_client_unref(baton->client);
00089 osync_message_unref(baton->message);
00090
00091 if (baton->change)
00092 osync_change_unref(baton->change);
00093
00094 osync_free(baton);
00095 }
00096
00097 static void _osync_client_connect_callback(void *data, OSyncError *error)
00098 {
00099 OSyncError *locerror = NULL;
00100 callContext *baton = NULL;
00101 OSyncMessage *message = NULL;
00102 OSyncClient *client = NULL;
00103 char *objtype = NULL;
00104 OSyncObjTypeSink *sink = NULL;
00105 int slowsync = 0;
00106 OSyncMessage *reply = NULL;
00107
00108 baton = data;
00109 message = baton->message;
00110 client = baton->client;
00111 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, data, error);
00112 osync_message_read_string(message, &objtype);
00113
00114 if (objtype) {
00115
00116 sink = osync_plugin_info_find_objtype(client->plugin_info, objtype);
00117
00118 if (!sink) {
00119 osync_error_set(&locerror, OSYNC_ERROR_GENERIC, "Unable to find sink for %s", objtype);
00120 osync_free(objtype);
00121 goto error;
00122 }
00123
00124 osync_free(objtype);
00125 } else {
00126
00127 sink = osync_plugin_info_get_sink(client->plugin_info);
00128 }
00129 slowsync = osync_objtype_sink_get_slowsync(sink);
00130 osync_trace(TRACE_INTERNAL, "%s: slowsync %i", __func__, slowsync);
00131
00132 if (!osync_error_is_set(&error)) {
00133 reply = osync_message_new_reply(message, &locerror);
00134 if (!reply)
00135 goto error;
00136
00137
00138 osync_message_write_int(reply, slowsync);
00139
00140 } else {
00141 reply = osync_message_new_errorreply(message, error, &locerror);
00142 }
00143 if (!reply)
00144 goto error;
00145 osync_trace(TRACE_INTERNAL, "Reply id %lli", osync_message_get_id(reply));
00146
00147 _free_baton(baton);
00148
00149 if (!osync_queue_send_message(client->outgoing, NULL, reply, &locerror))
00150 goto error_free_message;
00151
00152 osync_message_unref(reply);
00153
00154 osync_trace(TRACE_EXIT, "%s", __func__);
00155 return;
00156
00157 error_free_message:
00158 osync_message_unref(reply);
00159 error:
00160 _free_baton(baton);
00161 osync_client_error_shutdown(client, locerror);
00162 osync_error_unref(&locerror);
00163 osync_trace(TRACE_EXIT, "%s", __func__);
00164 return;
00165 }
00166
00167 static void _osync_client_connect_done_callback(void *data, OSyncError *error)
00168 {
00169 OSyncError *locerror = NULL;
00170 callContext *baton = NULL;
00171 OSyncMessage *message = NULL;
00172 OSyncClient *client = NULL;
00173 OSyncMessage *reply = NULL;
00174
00175 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, data, error);
00176 baton = data;
00177
00178 message = baton->message;
00179 client = baton->client;
00180
00181 if (!osync_error_is_set(&error)) {
00182 reply = osync_message_new_reply(message, &locerror);
00183 } else {
00184 reply = osync_message_new_errorreply(message, error, &locerror);
00185 }
00186 if (!reply)
00187 goto error;
00188 osync_trace(TRACE_INTERNAL, "Reply id %lli", osync_message_get_id(reply));
00189
00190 _free_baton(baton);
00191
00192 if (!osync_queue_send_message(client->outgoing, NULL, reply, &locerror))
00193 goto error_free_message;
00194
00195 osync_message_unref(reply);
00196
00197 osync_trace(TRACE_EXIT, "%s", __func__);
00198 return;
00199
00200 error_free_message:
00201 osync_message_unref(reply);
00202 error:
00203 _free_baton(baton);
00204 osync_client_error_shutdown(client, locerror);
00205 osync_error_unref(&locerror);
00206 osync_trace(TRACE_EXIT, "%s", __func__);
00207 return;
00208 }
00209
00210 static void _osync_client_disconnect_callback(void *data, OSyncError *error)
00211 {
00212 OSyncError *locerror = NULL;
00213 callContext *baton = NULL;
00214 OSyncMessage *message = NULL;
00215 OSyncClient *client = NULL;
00216 OSyncMessage *reply = NULL;
00217 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, data, error);
00218
00219 baton = data;
00220 message = baton->message;
00221 client = baton->client;
00222
00223 if (!osync_error_is_set(&error)) {
00224 reply = osync_message_new_reply(message, &locerror);
00225 } else {
00226 reply = osync_message_new_errorreply(message, error, &locerror);
00227 }
00228 if (!reply)
00229 goto error;
00230 osync_trace(TRACE_INTERNAL, "Reply id %lli", osync_message_get_id(reply));
00231
00232 _free_baton(baton);
00233
00234 if (!osync_queue_send_message(client->outgoing, NULL, reply, &locerror))
00235 goto error_free_message;
00236
00237 osync_message_unref(reply);
00238
00239 osync_trace(TRACE_EXIT, "%s", __func__);
00240 return;
00241
00242 error_free_message:
00243 osync_message_unref(reply);
00244 error:
00245 _free_baton(baton);
00246 osync_client_error_shutdown(client, locerror);
00247 osync_error_unref(&locerror);
00248 osync_trace(TRACE_EXIT, "%s", __func__);
00249 return;
00250 }
00251
00252 static void _osync_client_get_changes_callback(void *data, OSyncError *error)
00253 {
00254 OSyncError *locerror = NULL;
00255 callContext *baton = NULL;
00256 OSyncMessage *message = NULL;
00257 OSyncClient *client = NULL;
00258 OSyncMessage *reply = NULL;
00259 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, data, error);
00260
00261 baton = data;
00262 message = baton->message;
00263 client = baton->client;
00264
00265 if (!osync_error_is_set(&error)) {
00266 reply = osync_message_new_reply(message, &locerror);
00267
00268 } else {
00269 reply = osync_message_new_errorreply(message, error, &locerror);
00270 }
00271 if (!reply)
00272 goto error;
00273 osync_trace(TRACE_INTERNAL, "Reply id %lli", osync_message_get_id(reply));
00274
00275 _free_baton(baton);
00276
00277 if (!osync_queue_send_message(client->outgoing, NULL, reply, &locerror))
00278 goto error_free_message;
00279
00280 osync_message_unref(reply);
00281
00282 osync_trace(TRACE_EXIT, "%s", __func__);
00283 return;
00284
00285 error_free_message:
00286 osync_message_unref(reply);
00287 error:
00288 _free_baton(baton);
00289 osync_client_error_shutdown(client, locerror);
00290 osync_error_unref(&locerror);
00291 osync_trace(TRACE_EXIT, "%s", __func__);
00292 return;
00293 }
00294
00295 static void _osync_client_change_callback(OSyncChange *change, void *data)
00296 {
00297 callContext *baton = NULL;
00298 OSyncError *locerror = NULL;
00299 OSyncClient *client = NULL;
00300 OSyncMessage *message = NULL;
00301
00302 baton = data;
00303 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, change, data);
00304
00305 client = baton->client;
00306 message = osync_message_new(OSYNC_MESSAGE_NEW_CHANGE, 0, &locerror);
00307 if (!message)
00308 goto error;
00309
00310 if (!osync_marshal_change(message, change, &locerror))
00311 goto error_free_message;
00312
00313 if (!osync_queue_send_message(client->outgoing, NULL, message, &locerror))
00314 goto error_free_message;
00315
00316 osync_message_unref(message);
00317
00318 osync_trace(TRACE_EXIT, "%s", __func__);
00319 return;
00320
00321 error_free_message:
00322 osync_message_unref(message);
00323 error:
00324 _free_baton(baton);
00325 osync_client_error_shutdown(client, locerror);
00326 osync_error_unref(&locerror);
00327 osync_trace(TRACE_EXIT, "%s", __func__);
00328 return;
00329 }
00330
00331 static void _osync_client_ignored_conflict_callback(OSyncChange *change, void *data)
00332 {
00333 callContext *baton = NULL;
00334 OSyncError *locerror = NULL;
00335 OSyncClient *client = NULL;
00336 OSyncMessage *message = NULL;
00337
00338 baton = data;
00339 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, change, data);
00340
00341 client = baton->client;
00342 message = osync_message_new(OSYNC_MESSAGE_READ_CHANGE, 0, &locerror);
00343 if (!message)
00344 goto error;
00345
00346 if (!osync_marshal_change(message, change, &locerror))
00347 goto error_free_message;
00348
00349 if (!osync_queue_send_message(client->outgoing, NULL, message, &locerror))
00350 goto error_free_message;
00351
00352 osync_message_unref(message);
00353
00354 osync_trace(TRACE_EXIT, "%s", __func__);
00355 return;
00356
00357 error_free_message:
00358 osync_message_unref(message);
00359 error:
00360 _free_baton(baton);
00361 osync_client_error_shutdown(client, locerror);
00362 osync_error_unref(&locerror);
00363 osync_trace(TRACE_EXIT, "%s", __func__);
00364 return;
00365 }
00366
00367 static void _osync_client_read_callback(void *data, OSyncError *error)
00368 {
00369 OSyncError *locerror = NULL;
00370 callContext *baton = NULL;
00371 OSyncMessage *message = NULL;
00372 OSyncClient *client = NULL;
00373 OSyncMessage *reply = NULL;
00374
00375 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, data, error);
00376 baton = data;
00377
00378 message = baton->message;
00379 client = baton->client;
00380
00381 osync_trace(TRACE_INTERNAL, "ignored chnaged: %p", baton->change);
00382
00383 if (!osync_error_is_set(&error)) {
00384 reply = osync_message_new_reply(message, &locerror);
00385 if (!reply)
00386 goto error;
00387
00388
00389 osync_message_write_string(reply, osync_change_get_uid(baton->change));
00390 } else {
00391 reply = osync_message_new_errorreply(message, error, &locerror);
00392 }
00393 if (!reply)
00394 goto error;
00395 osync_trace(TRACE_INTERNAL, "Reply id %lli", osync_message_get_id(reply));
00396
00397
00398 if (!osync_queue_send_message(client->outgoing, NULL, reply, &locerror))
00399 goto error_free_message;
00400
00401 _osync_client_ignored_conflict_callback(baton->change, baton);
00402
00403 _free_baton(baton);
00404
00405 osync_message_unref(reply);
00406
00407 osync_trace(TRACE_EXIT, "%s", __func__);
00408 return;
00409
00410 error_free_message:
00411 osync_message_unref(reply);
00412 error:
00413 _free_baton(baton);
00414 osync_client_error_shutdown(client, locerror);
00415 osync_error_unref(&locerror);
00416 osync_trace(TRACE_EXIT, "%s", __func__);
00417 return;
00418 }
00419
00420 static void _osync_client_commit_change_callback(void *data, OSyncError *error)
00421 {
00422 OSyncError *locerror = NULL;
00423 callContext *baton = NULL;
00424 OSyncMessage *message = NULL;
00425 OSyncClient *client = NULL;
00426 OSyncMessage *reply = NULL;
00427
00428 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, data, error);
00429 baton = data;
00430
00431 message = baton->message;
00432 client = baton->client;
00433
00434 if (!osync_error_is_set(&error)) {
00435 reply = osync_message_new_reply(message, &locerror);
00436 if (!reply)
00437 goto error;
00438
00439
00440 osync_message_write_string(reply, osync_change_get_uid(baton->change));
00441 } else {
00442 reply = osync_message_new_errorreply(message, error, &locerror);
00443 }
00444 if (!reply)
00445 goto error;
00446 osync_trace(TRACE_INTERNAL, "Reply id %lli", osync_message_get_id(reply));
00447
00448 _free_baton(baton);
00449
00450 if (!osync_queue_send_message(client->outgoing, NULL, reply, &locerror))
00451 goto error_free_message;
00452
00453 osync_message_unref(reply);
00454
00455 osync_trace(TRACE_EXIT, "%s", __func__);
00456 return;
00457
00458 error_free_message:
00459 osync_message_unref(reply);
00460 error:
00461 _free_baton(baton);
00462 osync_client_error_shutdown(client, locerror);
00463 osync_error_unref(&locerror);
00464 osync_trace(TRACE_EXIT, "%s", __func__);
00465 return;
00466 }
00467
00468 static void _osync_client_committed_all_callback(void *data, OSyncError *error)
00469 {
00470 OSyncError *locerror = NULL;
00471 callContext *baton = NULL;
00472 OSyncMessage *message = NULL;
00473 OSyncClient *client = NULL;
00474 OSyncMessage *reply = NULL;
00475
00476 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, data, error);
00477 baton = data;
00478
00479 message = baton->message;
00480 client = baton->client;
00481
00482 if (!osync_error_is_set(&error)) {
00483 reply = osync_message_new_reply(message, &locerror);
00484
00485 } else {
00486 reply = osync_message_new_errorreply(message, error, &locerror);
00487 }
00488 if (!reply)
00489 goto error;
00490 osync_trace(TRACE_INTERNAL, "Reply id %lli", osync_message_get_id(reply));
00491
00492 _free_baton(baton);
00493
00494 if (!osync_queue_send_message(client->outgoing, NULL, reply, &locerror))
00495 goto error_free_message;
00496
00497 osync_message_unref(reply);
00498
00499 osync_trace(TRACE_EXIT, "%s", __func__);
00500 return;
00501
00502 error_free_message:
00503 osync_message_unref(reply);
00504 error:
00505 _free_baton(baton);
00506 osync_client_error_shutdown(client, locerror);
00507 osync_error_unref(&locerror);
00508 osync_trace(TRACE_EXIT, "%s", __func__);
00509 return;
00510 }
00511
00512 static void _osync_client_sync_done_callback(void *data, OSyncError *error)
00513 {
00514 OSyncError *locerror = NULL;
00515 callContext *baton = NULL;
00516 OSyncMessage *message = NULL;
00517 OSyncClient *client = NULL;
00518 OSyncMessage *reply = NULL;
00519
00520 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, data, error);
00521 baton = data;
00522
00523 message = baton->message;
00524 client = baton->client;
00525
00526 if (!osync_error_is_set(&error)) {
00527 reply = osync_message_new_reply(message, &locerror);
00528
00529 } else {
00530 reply = osync_message_new_errorreply(message, error, &locerror);
00531 }
00532 if (!reply)
00533 goto error;
00534 osync_trace(TRACE_INTERNAL, "Reply id %lli", osync_message_get_id(reply));
00535
00536 _free_baton(baton);
00537
00538 if (!osync_queue_send_message(client->outgoing, NULL, reply, &locerror))
00539 goto error_free_message;
00540
00541 osync_message_unref(reply);
00542
00543 osync_trace(TRACE_EXIT, "%s", __func__);
00544 return;
00545
00546 error_free_message:
00547 osync_message_unref(reply);
00548 error:
00549 _free_baton(baton);
00550 osync_client_error_shutdown(client, locerror);
00551 osync_error_unref(&locerror);
00552 osync_trace(TRACE_EXIT, "%s", __func__);
00553 return;
00554 }
00555
00556 static osync_bool _osync_client_handle_initialize(OSyncClient *client, OSyncMessage *message, OSyncError **error)
00557 {
00558 OSyncMessage *reply = NULL;
00559 char *enginepipe = NULL;
00560 char *pluginname = NULL;
00561 char *plugindir = NULL;
00562 char *groupname = NULL;
00563 char *configdir = NULL;
00564 char *formatdir = NULL;
00565 int haspluginconfig = 0;
00566 OSyncPluginConfig *config = NULL;
00567 OSyncQueue *outgoing = NULL;
00568 OSyncList *r = NULL;
00569 OSyncPluginResource *res = NULL;
00570 OSyncObjTypeSink *sink = NULL, *main_sink = NULL;
00571 const char *objtype = NULL;
00572 const char *preferred_format = NULL;
00573 OSyncList *o = NULL;
00574 OSyncObjFormatSink *format_sink = NULL;
00575 unsigned int n, num_sinks;
00576 osync_bool couldinit;
00577
00578 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, message, error);
00579
00580 osync_message_read_string(message, &enginepipe);
00581 osync_message_read_string(message, &formatdir);
00582 osync_message_read_string(message, &plugindir);
00583 osync_message_read_string(message, &pluginname);
00584 osync_message_read_string(message, &groupname);
00585 osync_message_read_string(message, &configdir);
00586 osync_message_read_int(message, &haspluginconfig);
00587
00588 if (haspluginconfig && !osync_demarshal_pluginconfig(message, &config, error))
00589 goto error;
00590
00591 osync_trace(TRACE_INTERNAL, "enginepipe %s, formatdir %s, plugindir %s, pluginname %s", enginepipe, formatdir, plugindir, pluginname);
00592
00593
00594 if (enginepipe) {
00595 outgoing = osync_queue_new(enginepipe, error);
00596 if (!outgoing)
00597 goto error;
00598
00599 osync_trace(TRACE_INTERNAL, "connecting to engine");
00600
00601 if (!osync_queue_connect(outgoing, OSYNC_QUEUE_SENDER, error)) {
00602 osync_queue_unref(outgoing);
00603 goto error;
00604 }
00605
00606 osync_client_set_outgoing_queue(client, outgoing);
00607 osync_queue_cross_link(client->incoming, client->outgoing);
00608 osync_queue_unref(outgoing);
00609 osync_trace(TRACE_INTERNAL, "done connecting to engine");
00610 }
00611
00612 if (!client->plugin) {
00613 client->plugin_env = osync_plugin_env_new(error);
00614 if (!client->plugin_env)
00615 goto error;
00616
00617 if (!osync_plugin_env_load(client->plugin_env, plugindir, error))
00618 goto error;
00619
00620 client->plugin = osync_plugin_env_find_plugin(client->plugin_env, pluginname);
00621 if (!client->plugin) {
00622 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to find plugin %s", pluginname);
00623 goto error;
00624 }
00625 osync_plugin_ref(client->plugin);
00626 }
00627
00628 client->format_env = osync_format_env_new(error);
00629 if (!client->format_env)
00630 goto error;
00631
00632 if (!osync_format_env_load_plugins(client->format_env, formatdir, error))
00633 goto error;
00634
00635 client->plugin_info = osync_plugin_info_new(error);
00636 if (!client->plugin_info)
00637 goto error;
00638
00639 osync_plugin_info_set_configdir(client->plugin_info, configdir);
00640 osync_plugin_info_set_loop(client->plugin_info, client->context);
00641 osync_plugin_info_set_format_env(client->plugin_info, client->format_env);
00642 osync_plugin_info_set_groupname(client->plugin_info, groupname);
00643
00644 if (config)
00645 osync_plugin_info_set_config(client->plugin_info, config);
00646
00647 #ifdef OPENSYNC_UNITTESTS
00648 {
00649 long long int memberid;
00650 osync_message_read_long_long_int(message, &memberid);
00651 client->plugin_info->memberid = memberid;
00652 }
00653 #endif
00654
00655
00656
00657 if (config)
00658 r = osync_plugin_config_get_resources(config);
00659
00660 for (; r; r = r->next) {
00661 res = r->data;
00662
00663
00664 if (!osync_plugin_resource_is_enabled(res))
00665 continue;
00666
00667 objtype = osync_plugin_resource_get_objtype(res);
00668
00669 if (!(sink = osync_plugin_info_find_objtype(client->plugin_info, objtype))) {
00670 sink = osync_objtype_sink_new(objtype, error);
00671 if (!sink)
00672 goto error_finalize;
00673
00674 osync_plugin_info_add_objtype(client->plugin_info, sink);
00675 osync_objtype_sink_unref(sink);
00676 } else {
00677 osync_error_set(error, OSYNC_ERROR_MISCONFIGURATION, "Duplicate sink objtype \"%s\" configured in plugin %s", objtype, pluginname);
00678 goto error;
00679 }
00680
00681 preferred_format = osync_plugin_resource_get_preferred_format(res);
00682 osync_objtype_sink_set_preferred_format(sink, preferred_format);
00683 o = osync_plugin_resource_get_objformat_sinks(res);
00684 for (; o; o = o->next) {
00685 format_sink = (OSyncObjFormatSink *) o->data;
00686 osync_objtype_sink_add_objformat_sink(sink, format_sink);
00687 }
00688 }
00689
00690 couldinit = osync_plugin_initialize(client->plugin, &(client->plugin_data), client->plugin_info, error);
00691 if (!couldinit) {
00692 if (!osync_error_is_set(error))
00693 osync_error_set(error, OSYNC_ERROR_GENERIC, "Plugin \"%s\" failed to initialize but gave no reason", pluginname);
00694 goto error;
00695 }
00696
00697 num_sinks = osync_plugin_info_num_objtypes(client->plugin_info);
00698 for (n = 0; n < num_sinks; n++) {
00699
00700 sink = osync_plugin_info_nth_objtype(client->plugin_info, n);
00701 if (!osync_objtype_sink_load_anchor(sink, client->plugin_info, error)) {
00702 goto error_finalize;
00703 }
00704 }
00705
00706 main_sink = osync_plugin_info_get_main_sink(client->plugin_info);
00707 if (main_sink) {
00708 if (!osync_objtype_sink_load_anchor(main_sink,
00709 client->plugin_info, error))
00710 goto error_finalize;
00711
00712 }
00713
00714 reply = osync_message_new_reply(message, error);
00715 if (!reply)
00716 goto error_finalize;
00717
00718 if (!osync_queue_send_message(client->outgoing, NULL, reply, error))
00719 goto error_free_message;
00720
00721 osync_message_unref(reply);
00722
00723 osync_free(enginepipe);
00724 osync_free(pluginname);
00725 osync_free(configdir);
00726 osync_free(plugindir);
00727 osync_free(groupname);
00728 osync_free(formatdir);
00729
00730 if (config)
00731 osync_plugin_config_unref(config);
00732
00733 osync_trace(TRACE_EXIT, "%s", __func__);
00734 return TRUE;
00735
00736 error_free_message:
00737 osync_message_unref(reply);
00738 error_finalize:
00739 osync_plugin_finalize(client->plugin, client->plugin_data);
00740 error:
00741 osync_free(enginepipe);
00742 osync_free(pluginname);
00743 osync_free(configdir);
00744 osync_free(plugindir);
00745 osync_free(groupname);
00746 osync_free(formatdir);
00747
00748 if (config)
00749 osync_plugin_config_unref(config);
00750
00751 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00752 return FALSE;
00753 }
00754
00755 static osync_bool _osync_client_handle_finalize(OSyncClient *client, OSyncMessage *message, OSyncError **error)
00756 {
00757 OSyncMessage *reply = NULL;
00758 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, message, error);
00759
00760 if (client->plugin) {
00761 if (client->plugin_data)
00762 osync_plugin_finalize(client->plugin, client->plugin_data);
00763
00764 osync_plugin_unref(client->plugin);
00765 client->plugin = NULL;
00766 }
00767
00768 if (client->plugin_env) {
00769 osync_plugin_env_unref(client->plugin_env);
00770 client->plugin_env = NULL;
00771 }
00772
00773 if (client->plugin_info) {
00774 osync_plugin_info_unref(client->plugin_info);
00775 client->plugin_info = NULL;
00776 }
00777
00778 if (client->format_env) {
00779 osync_format_env_unref(client->format_env);
00780 client->format_env = NULL;
00781 }
00782
00783 if (!client->outgoing) {
00784 osync_error_set(error, OSYNC_ERROR_GENERIC, "No outgoing queue yet");
00785 goto error;
00786 }
00787
00788 reply = osync_message_new_reply(message, NULL);
00789 if (!reply)
00790 goto error;
00791
00792 if (!osync_queue_send_message(client->outgoing, NULL, reply, NULL))
00793 goto error_free_message;
00794
00795 osync_message_unref(reply);
00796
00797 osync_trace(TRACE_EXIT, "%s", __func__);
00798 return TRUE;
00799
00800 error_free_message:
00801 osync_message_unref(reply);
00802 error:
00803 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00804 return FALSE;
00805 }
00806
00807 static osync_bool _osync_client_handle_discover(OSyncClient *client, OSyncMessage *message, OSyncError **error)
00808 {
00809 OSyncMessage *reply = NULL;
00810 unsigned int i = 0;
00811 OSyncPluginConfig *config = NULL;
00812 OSyncList *res = NULL;
00813 unsigned int numobjs = 0;
00814 unsigned int avail = 0;
00815 OSyncObjTypeSink *sink = NULL;
00816 OSyncVersion *version = NULL;
00817 OSyncCapabilities *capabilities = NULL;
00818 char* buffer = NULL;
00819 int size = 0;
00820 unsigned int num_res = 0;
00821 OSyncPluginResource *resource = NULL;
00822
00823 config = osync_plugin_info_get_config(client->plugin_info);
00824 res = osync_plugin_config_get_resources(config);
00825
00826 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, message, error);
00827
00828 if (!osync_plugin_discover(client->plugin, client->plugin_data, client->plugin_info, error))
00829 goto error;
00830
00831 reply = osync_message_new_reply(message, error);
00832 if (!reply)
00833 goto error;
00834
00835 if (osync_plugin_info_get_main_sink(client->plugin_info))
00836 osync_message_write_int(reply, 1);
00837 else
00838 osync_message_write_int(reply, 0);
00839
00840 numobjs = osync_plugin_info_num_objtypes(client->plugin_info);
00841 for (i = 0; i < numobjs; i++) {
00842 sink = osync_plugin_info_nth_objtype(client->plugin_info, i);
00843 if (osync_objtype_sink_is_available(sink)) {
00844 avail++;
00845 }
00846 }
00847
00848 osync_message_write_uint(reply, avail);
00849
00850 for (i = 0; i < numobjs; i++) {
00851 sink = osync_plugin_info_nth_objtype(client->plugin_info, i);
00852 if (osync_objtype_sink_is_available(sink)) {
00853 if (!osync_marshal_objtype_sink(reply, sink, error))
00854 goto error_free_message;
00855 }
00856 }
00857
00858 version = osync_plugin_info_get_version(client->plugin_info);
00859 if (version) {
00860 osync_message_write_int(reply, 1);
00861 osync_message_write_string(reply, osync_version_get_plugin(version));
00862 osync_message_write_string(reply, osync_version_get_priority(version));
00863 osync_message_write_string(reply, osync_version_get_vendor(version));
00864 osync_message_write_string(reply, osync_version_get_modelversion(version));
00865 osync_message_write_string(reply, osync_version_get_firmwareversion(version));
00866 osync_message_write_string(reply, osync_version_get_softwareversion(version));
00867 osync_message_write_string(reply, osync_version_get_hardwareversion(version));
00868 osync_message_write_string(reply, osync_version_get_identifier(version));
00869 }else
00870 osync_message_write_int(reply, 0);
00871
00872
00873 capabilities = osync_plugin_info_get_capabilities(client->plugin_info);
00874 if (capabilities) {
00875 osync_message_write_int(reply, 1);
00876 if(!osync_capabilities_assemble(capabilities, &buffer, &size))
00877 goto error_free_message;
00878 osync_message_write_string(reply, buffer);
00879 g_free(buffer);
00880 }else
00881 osync_message_write_int(reply, 0);
00882
00883
00884 res = osync_plugin_config_get_resources(config);
00885 num_res = osync_list_length(res);
00886
00887 osync_message_write_uint(reply, num_res);
00888 for (; res; res = res->next) {
00889 resource = res->data;
00890 if (!osync_marshal_pluginresource(reply, resource, error))
00891 goto error_free_message;
00892 }
00893
00894 if (!osync_queue_send_message(client->outgoing, NULL, reply, error))
00895 goto error_free_message;
00896
00897 osync_message_unref(reply);
00898
00899 osync_trace(TRACE_EXIT, "%s", __func__);
00900 return TRUE;
00901
00902 error_free_message:
00903 osync_message_unref(reply);
00904 error:
00905 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00906 return FALSE;
00907 }
00908
00909 static osync_bool _osync_client_handle_connect(OSyncClient *client, OSyncMessage *message, OSyncError **error)
00910 {
00911 char *objtype = NULL;
00912 int slowsync;
00913 OSyncMessage *reply = NULL;
00914 OSyncObjTypeSink *sink = NULL;
00915 OSyncContext *context = NULL;
00916
00917 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, message, error);
00918
00919
00920
00921
00922
00923
00924
00925 osync_message_read_string(message, &objtype);
00926 osync_message_read_int(message, &slowsync);
00927 osync_message_write_string(message, objtype);
00928 osync_trace(TRACE_INTERNAL, "Searching sink for %s", objtype);
00929
00930 if (objtype) {
00931 sink = osync_plugin_info_find_objtype(client->plugin_info, objtype);
00932
00933 if (!sink) {
00934 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to find sink for %s", objtype);
00935 osync_free(objtype);
00936 goto error;
00937 }
00938
00939 osync_free(objtype);
00940 } else
00941 sink = osync_plugin_info_get_main_sink(client->plugin_info);
00942
00943 if (!sink) {
00944 reply = osync_message_new_reply(message, error);
00945 if (!reply)
00946 goto error;
00947
00948
00949 osync_message_write_int(reply, FALSE);
00950
00951 if (!osync_queue_send_message(client->outgoing, NULL, reply, error))
00952 goto error_free_reply;
00953
00954 osync_message_unref(reply);
00955 } else {
00956
00957
00958
00959
00960 if (slowsync)
00961 osync_objtype_sink_set_slowsync(sink, TRUE);
00962 else
00963 osync_objtype_sink_set_slowsync(sink, FALSE);
00964
00965 context = _create_context(client, message, _osync_client_connect_callback, NULL, error);
00966 if (!context)
00967 goto error;
00968
00969 osync_plugin_info_set_sink(client->plugin_info, sink);
00970 osync_objtype_sink_connect(sink, client->plugin_data, client->plugin_info, context);
00971
00972 osync_context_unref(context);
00973 }
00974
00975 osync_trace(TRACE_EXIT, "%s", __func__);
00976 return TRUE;
00977
00978 error_free_reply:
00979 osync_message_unref(reply);
00980 error:
00981 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00982 return FALSE;
00983 }
00984
00985 static osync_bool _osync_client_handle_connect_done(OSyncClient *client, OSyncMessage *message, OSyncError **error)
00986 {
00987 char *objtype = NULL;
00988 OSyncMessage *reply = NULL;
00989 OSyncObjTypeSink *sink = NULL;
00990 OSyncContext *context = NULL;
00991
00992 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, message, error);
00993
00994 osync_message_read_string(message, &objtype);
00995 osync_trace(TRACE_INTERNAL, "Searching sink for %s", objtype);
00996
00997 if (objtype) {
00998 sink = osync_plugin_info_find_objtype(client->plugin_info, objtype);
00999
01000 if (!sink) {
01001 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to find sink for %s", objtype);
01002 osync_free(objtype);
01003 goto error;
01004 }
01005
01006 osync_free(objtype);
01007 } else
01008 sink = osync_plugin_info_get_main_sink(client->plugin_info);
01009
01010 if (!sink) {
01011 reply = osync_message_new_reply(message, error);
01012 if (!reply)
01013 goto error;
01014
01015 if (!osync_queue_send_message(client->outgoing, NULL, reply, error))
01016 goto error_free_reply;
01017
01018 osync_message_unref(reply);
01019 } else {
01020 context = _create_context(client, message, _osync_client_connect_done_callback, NULL, error);
01021 if (!context)
01022 goto error;
01023
01024 osync_plugin_info_set_sink(client->plugin_info, sink);
01025 osync_objtype_sink_connect_done(sink, client->plugin_data, client->plugin_info, context);
01026
01027 osync_context_unref(context);
01028 }
01029 osync_trace(TRACE_EXIT, "%s", __func__);
01030 return TRUE;
01031
01032 error_free_reply:
01033 osync_message_unref(reply);
01034 error:
01035 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
01036 return FALSE;
01037 }
01038
01039 static osync_bool _osync_client_handle_disconnect(OSyncClient *client, OSyncMessage *message, OSyncError **error)
01040 {
01041 char *objtype = NULL;
01042 OSyncMessage *reply = NULL;
01043 OSyncObjTypeSink *sink = NULL;
01044 OSyncContext *context = NULL;
01045
01046 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, message, error);
01047
01048 osync_message_read_string(message, &objtype);
01049 osync_trace(TRACE_INTERNAL, "Searching sink for %s", objtype);
01050
01051 if (objtype) {
01052 sink = osync_plugin_info_find_objtype(client->plugin_info, objtype);
01053
01054 if (!sink) {
01055 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to find sink for %s", objtype);
01056 osync_free(objtype);
01057 goto error;
01058 }
01059
01060 osync_free(objtype);
01061 } else
01062 sink = osync_plugin_info_get_main_sink(client->plugin_info);
01063
01064 if (!sink) {
01065 reply = osync_message_new_reply(message, error);
01066 if (!reply)
01067 goto error;
01068
01069 if (!osync_queue_send_message(client->outgoing, NULL, reply, error))
01070 goto error_free_reply;
01071
01072 osync_message_unref(reply);
01073 } else {
01074 context = _create_context(client, message, _osync_client_disconnect_callback, NULL, error);
01075 if (!context)
01076 goto error;
01077
01078 osync_plugin_info_set_sink(client->plugin_info, sink);
01079 osync_objtype_sink_disconnect(sink, client->plugin_data, client->plugin_info, context);
01080
01081 osync_context_unref(context);
01082 }
01083 osync_trace(TRACE_EXIT, "%s", __func__);
01084 return TRUE;
01085
01086 error_free_reply:
01087 osync_message_unref(reply);
01088 error:
01089 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
01090 return FALSE;
01091 }
01092
01093 static osync_bool _osync_client_handle_get_changes(OSyncClient *client, OSyncMessage *message, OSyncError **error)
01094 {
01095 int slowsync;
01096 char *objtype = NULL;
01097 OSyncMessage *reply = NULL;
01098 OSyncObjTypeSink *sink = NULL;
01099 OSyncContext *context = NULL;
01100
01101 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, message, error);
01102
01103 osync_message_read_string(message, &objtype);
01104 osync_message_read_int(message, &slowsync);
01105 osync_trace(TRACE_INTERNAL, "Searching sink for %s (slowsync: %i)", objtype, slowsync);
01106
01107 if (objtype) {
01108 sink = osync_plugin_info_find_objtype(client->plugin_info, objtype);
01109
01110 if (!sink) {
01111 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to find sink for %s", objtype);
01112 osync_free(objtype);
01113 goto error;
01114 }
01115
01116 osync_free(objtype);
01117 } else
01118 sink = osync_plugin_info_get_main_sink(client->plugin_info);
01119
01120 if (!sink) {
01121 reply = osync_message_new_reply(message, error);
01122 if (!reply)
01123 goto error;
01124
01125 if (!osync_queue_send_message(client->outgoing, NULL, reply, error))
01126 goto error_free_reply;
01127
01128 osync_message_unref(reply);
01129 } else {
01130
01131
01132 if (slowsync)
01133 osync_objtype_sink_set_slowsync(sink, TRUE);
01134 else
01135 osync_objtype_sink_set_slowsync(sink, FALSE);
01136
01137 context = _create_context(client, message, _osync_client_get_changes_callback, NULL, error);
01138 if (!context)
01139 goto error;
01140 osync_context_set_changes_callback(context, _osync_client_change_callback);
01141
01142 osync_plugin_info_set_sink(client->plugin_info, sink);
01143
01144 osync_objtype_sink_get_changes(sink, client->plugin_data, client->plugin_info, context);
01145
01146 osync_context_unref(context);
01147 }
01148 osync_trace(TRACE_EXIT, "%s", __func__);
01149 return TRUE;
01150
01151 error_free_reply:
01152 osync_message_unref(reply);
01153 error:
01154 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
01155 return FALSE;
01156 }
01157
01158 static osync_bool _osync_client_handle_read_change(OSyncClient *client, OSyncMessage *message, OSyncError **error)
01159 {
01160 const char *objtype = NULL;
01161 OSyncMessage *reply = NULL;
01162 OSyncChange *change = NULL;
01163 OSyncObjTypeSink *sink = NULL;
01164 OSyncContext *context = NULL;
01165
01166
01167 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, message, error);
01168
01169 if (!osync_demarshal_change(message, &change, client->format_env, error))
01170 goto error;
01171
01172 osync_trace(TRACE_INTERNAL, "Change %p", change);
01173
01174 objtype = osync_data_get_objtype(osync_change_get_data(change));
01175
01176 if (objtype) {
01177 sink = osync_plugin_info_find_objtype(client->plugin_info, objtype);
01178
01179 if (!sink) {
01180 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to find sink for %s", objtype);
01181 goto error;
01182 }
01183 } else {
01184 sink = osync_plugin_info_get_main_sink(client->plugin_info);
01185 }
01186
01187 if (!sink) {
01188 reply = osync_message_new_reply(message, error);
01189 if (!reply)
01190 goto error;
01191
01192 if (!osync_queue_send_message(client->outgoing, NULL, reply, error))
01193 goto error_free_reply;
01194
01195 osync_message_unref(reply);
01196 } else {
01197 context = _create_context(client, message, _osync_client_read_callback, change, error);
01198 if (!context)
01199 goto error;
01200
01201 osync_plugin_info_set_sink(client->plugin_info, sink);
01202
01203 osync_objtype_sink_read_change(sink, client->plugin_data, client->plugin_info, change, context);
01204
01205 osync_context_unref(context);
01206 }
01207
01208 osync_trace(TRACE_EXIT, "%s", __func__);
01209 return TRUE;
01210
01211 error_free_reply:
01212 osync_message_unref(reply);
01213 error:
01214 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
01215 return FALSE;
01216 }
01217
01218
01219 static osync_bool _osync_client_handle_commit_change(OSyncClient *client, OSyncMessage *message, OSyncError **error)
01220 {
01221 OSyncChange *change = NULL;
01222 OSyncData *data = NULL;
01223 OSyncObjTypeSink *sink = NULL;
01224 OSyncContext *context = NULL;
01225
01226 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, message, error);
01227
01228 if (!osync_demarshal_change(message, &change, client->format_env, error))
01229 goto error;
01230
01231 osync_trace(TRACE_INTERNAL, "Change %p", change);
01232
01233 data = osync_change_get_data(change);
01234
01235 osync_trace(TRACE_INTERNAL, "Searching sink for %s", osync_data_get_objtype(data));
01236
01237 sink = osync_plugin_info_find_objtype(client->plugin_info, osync_data_get_objtype(data));
01238
01239 if (!sink) {
01240 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to find sink for %s", osync_data_get_objtype(data));
01241 osync_change_unref(change);
01242 goto error;
01243 }
01244
01245 context = _create_context(client, message, _osync_client_commit_change_callback, change, error);
01246 if (!context)
01247 goto error;
01248
01249 osync_plugin_info_set_sink(client->plugin_info, sink);
01250 osync_objtype_sink_commit_change(sink, client->plugin_data, client->plugin_info, change, context);
01251
01252 osync_change_unref(change);
01253 osync_context_unref(context);
01254
01255 osync_trace(TRACE_EXIT, "%s", __func__);
01256 return TRUE;
01257
01258 error:
01259 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
01260 return FALSE;
01261 }
01262
01263 static osync_bool _osync_client_handle_committed_all(OSyncClient *client, OSyncMessage *message, OSyncError **error)
01264 {
01265 char *objtype = NULL;
01266 OSyncMessage *reply = NULL;
01267 OSyncObjTypeSink *sink = NULL;
01268 OSyncContext *context = NULL;
01269
01270 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, message, error);
01271
01272 osync_message_read_string(message, &objtype);
01273 osync_trace(TRACE_INTERNAL, "Searching sink for %s", objtype);
01274
01275 if (objtype) {
01276 sink = osync_plugin_info_find_objtype(client->plugin_info, objtype);
01277
01278 if (!sink) {
01279 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to find sink for %s", objtype);
01280 osync_free(objtype);
01281 goto error;
01282 }
01283
01284 osync_free(objtype);
01285 } else
01286 sink = osync_plugin_info_get_main_sink(client->plugin_info);
01287
01288 if (!sink) {
01289 reply = osync_message_new_reply(message, error);
01290 if (!reply)
01291 goto error;
01292
01293 if (!osync_queue_send_message(client->outgoing, NULL, reply, error))
01294 goto error_free_reply;
01295
01296 osync_message_unref(reply);
01297 } else {
01298 context = _create_context(client, message, _osync_client_committed_all_callback, NULL, error);
01299 if (!context)
01300 goto error;
01301
01302 osync_plugin_info_set_sink(client->plugin_info, sink);
01303 osync_objtype_sink_committed_all(sink, client->plugin_data, client->plugin_info, context);
01304
01305 osync_context_unref(context);
01306 }
01307 osync_trace(TRACE_EXIT, "%s", __func__);
01308 return TRUE;
01309
01310 error_free_reply:
01311 osync_message_unref(reply);
01312 error:
01313 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
01314 return FALSE;
01315 }
01316
01317 static osync_bool _osync_client_handle_sync_done(OSyncClient *client, OSyncMessage *message, OSyncError **error)
01318 {
01319 char *objtype = NULL;
01320 OSyncMessage *reply = NULL;
01321 OSyncObjTypeSink *sink = NULL;
01322 OSyncContext *context = NULL;
01323
01324 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, message, error);
01325
01326 osync_message_read_string(message, &objtype);
01327 osync_trace(TRACE_INTERNAL, "Searching sink for %s", objtype);
01328
01329 if (objtype) {
01330 sink = osync_plugin_info_find_objtype(client->plugin_info, objtype);
01331
01332 if (!sink) {
01333 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to find sink for %s", objtype);
01334 osync_free(objtype);
01335 goto error;
01336 }
01337
01338 osync_free(objtype);
01339 } else
01340 sink = osync_plugin_info_get_main_sink(client->plugin_info);
01341
01342 if (!sink) {
01343 reply = osync_message_new_reply(message, error);
01344 if (!reply)
01345 goto error;
01346
01347 if (!osync_queue_send_message(client->outgoing, NULL, reply, error))
01348 goto error_free_reply;
01349
01350 osync_message_unref(reply);
01351 } else {
01352 context = _create_context(client, message, _osync_client_sync_done_callback, NULL, error);
01353 if (!context)
01354 goto error;
01355
01356 osync_plugin_info_set_sink(client->plugin_info, sink);
01357 osync_objtype_sink_sync_done(sink, client->plugin_data, client->plugin_info, context);
01358
01359 osync_context_unref(context);
01360 }
01361 osync_trace(TRACE_EXIT, "%s", __func__);
01362 return TRUE;
01363
01364 error_free_reply:
01365 osync_message_unref(reply);
01366 error:
01367 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
01368 return FALSE;
01369 }
01370
01371 static void _osync_client_message_handler(OSyncMessage *message, void *user_data)
01372 {
01373 OSyncClient *client = NULL;
01374 OSyncError *error = NULL;
01375 OSyncError *locerror = NULL;
01376 OSyncMessage *errorreply = NULL;
01377
01378 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, user_data);
01379 client = user_data;
01380 osync_trace(TRACE_INTERNAL, "plugin received command %i (%s)", osync_message_get_command(message), osync_message_get_commandstr(message));
01381
01382 switch (osync_message_get_command(message)) {
01383 case OSYNC_MESSAGE_NOOP:
01384 case OSYNC_MESSAGE_REPLY:
01385 case OSYNC_MESSAGE_ERRORREPLY:
01386 case OSYNC_MESSAGE_NEW_CHANGE:
01387 case OSYNC_MESSAGE_SYNCHRONIZE:
01388 case OSYNC_MESSAGE_ENGINE_CHANGED:
01389 case OSYNC_MESSAGE_MAPPING_CHANGED:
01390 case OSYNC_MESSAGE_MAPPINGENTRY_CHANGED:
01391
01392 break;
01393 case OSYNC_MESSAGE_QUEUE_ERROR:
01394 case OSYNC_MESSAGE_ERROR:
01395 case OSYNC_MESSAGE_QUEUE_HUP:
01396
01397 break;
01398
01399 case OSYNC_MESSAGE_INITIALIZE:
01400 if (!_osync_client_handle_initialize(client, message, &error))
01401 goto error;
01402 break;
01403
01404 case OSYNC_MESSAGE_FINALIZE:
01405 if (!_osync_client_handle_finalize(client, message, &error))
01406 goto error;
01407 break;
01408
01409 case OSYNC_MESSAGE_DISCOVER:
01410 if (!_osync_client_handle_discover(client, message, &error))
01411 goto error;
01412 break;
01413
01414 case OSYNC_MESSAGE_CONNECT:
01415 if (!_osync_client_handle_connect(client, message, &error))
01416 goto error;
01417 break;
01418
01419 case OSYNC_MESSAGE_CONNECT_DONE:
01420 if (!_osync_client_handle_connect_done(client, message, &error))
01421 goto error;
01422 break;
01423
01424 case OSYNC_MESSAGE_DISCONNECT:
01425 if (!_osync_client_handle_disconnect(client, message, &error))
01426 goto error;
01427 break;
01428
01429 case OSYNC_MESSAGE_GET_CHANGES:
01430 if (!_osync_client_handle_get_changes(client, message, &error))
01431 goto error;
01432 break;
01433
01434 case OSYNC_MESSAGE_COMMIT_CHANGE:
01435 if (!_osync_client_handle_commit_change(client, message, &error))
01436 goto error;
01437 break;
01438
01439 case OSYNC_MESSAGE_SYNC_DONE:
01440 if (!_osync_client_handle_sync_done(client, message, &error))
01441 goto error;
01442 break;
01443
01444 case OSYNC_MESSAGE_COMMITTED_ALL:
01445 if (!_osync_client_handle_committed_all(client, message, &error))
01446 goto error;
01447 break;
01448
01449 case OSYNC_MESSAGE_READ_CHANGE:
01450 if (!_osync_client_handle_read_change(client, message, &error))
01451 goto error;
01452 break;
01453
01454 case OSYNC_MESSAGE_CALL_PLUGIN:
01455
01456
01457
01458
01459
01460
01461
01462
01463
01464
01465
01466
01467
01468
01469
01470
01471
01472
01473
01474 break;
01475 }
01476
01477 osync_trace(TRACE_EXIT, "%s", __func__);
01478 return;
01479
01480 error:;
01481 if (!client->outgoing) {
01482 client->thread = NULL;
01483 osync_client_shutdown(client);
01484 osync_trace(TRACE_EXIT_ERROR, "%s: Unable to notify parent. no outgoing queue: %s", __func__, osync_error_print(&error));
01485 osync_error_unref(&error);
01486 return;
01487 }
01488
01489 errorreply = osync_message_new_errorreply(message, error, &locerror);
01490 if (!errorreply) {
01491 osync_client_error_shutdown(client, locerror);
01492 osync_error_unref(&error);
01493 osync_trace(TRACE_EXIT_ERROR, "%s: Error while sending error: %s", __func__, osync_error_print(&locerror));
01494 osync_error_unref(&locerror);
01495 return;
01496 }
01497
01498 if (!osync_queue_send_message(client->outgoing, NULL, errorreply, &locerror)) {
01499 osync_client_error_shutdown(client, locerror);
01500 osync_error_unref(&error);
01501 osync_trace(TRACE_EXIT_ERROR, "%s: Error while sending error: %s", __func__, osync_error_print(&locerror));
01502 osync_error_unref(&locerror);
01503 return;
01504 }
01505
01506 osync_message_unref(errorreply);
01507
01508 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(&error));
01509 osync_error_unref(&error);
01510 }
01511
01514 static void _osync_client_hup_handler(OSyncMessage *message, void *user_data)
01515 {
01516 OSyncError *error = NULL;
01517 OSyncClient *client = NULL;
01518 client = user_data;
01519 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, user_data);
01520
01521 osync_trace(TRACE_INTERNAL, "plugin received command %i on sending queue", osync_message_get_command(message));
01522
01523 if ( (osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_ERROR)
01524 || (osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_HUP) ) {
01525
01526
01527 if (!osync_queue_disconnect(client->outgoing, &error))
01528 osync_error_unref(&error);
01529
01530 if (!osync_queue_disconnect(client->incoming, &error))
01531 osync_error_unref(&error);
01532
01533 if (client->syncloop) {
01534 g_main_loop_quit(client->syncloop);
01535 }
01536 } else {
01537
01538 osync_trace(TRACE_ERROR, "received neither a hup, nor a error on a sending queue...");
01539 }
01540
01541 osync_trace(TRACE_EXIT, "%s", __func__);
01542 return;
01543 }
01544
01545 OSyncClient *osync_client_new(OSyncError **error)
01546 {
01547 OSyncClient *client = NULL;
01548 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, error);
01549
01550 client = osync_try_malloc0(sizeof(OSyncClient), error);
01551 if (!client) {
01552 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
01553 return NULL;
01554 }
01555
01556 client->ref_count = 1;
01557 client->context = g_main_context_new();
01558
01559 osync_trace(TRACE_EXIT, "%s: %p", __func__, client);
01560 return client;
01561 }
01562
01563 OSyncClient *osync_client_ref(OSyncClient *client)
01564 {
01565 osync_assert(client);
01566
01567 g_atomic_int_inc(&(client->ref_count));
01568
01569 return client;
01570 }
01571
01572 void osync_client_unref(OSyncClient *client)
01573 {
01574 osync_assert(client);
01575
01576 if (g_atomic_int_dec_and_test(&(client->ref_count))) {
01577 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, client);
01578
01579 if(client->disconnectThread) {
01580 g_thread_join(client->disconnectThread);
01581 client->disconnectThread = NULL;
01582 }
01583
01584 if (client->incoming) {
01585 if (osync_queue_is_connected(client->incoming))
01586 osync_queue_disconnect(client->incoming, NULL);
01587 osync_queue_remove(client->incoming, NULL);
01588 osync_queue_unref(client->incoming);
01589 }
01590
01591 if (client->outgoing) {
01592 if (osync_queue_is_connected(client->outgoing))
01593 osync_queue_disconnect(client->outgoing, NULL);
01594 osync_queue_unref(client->outgoing);
01595 }
01596
01597 if (client->plugin)
01598 osync_plugin_unref(client->plugin);
01599
01600 if (client->thread)
01601 osync_thread_unref(client->thread);
01602
01603 osync_free(client);
01604
01605 osync_trace(TRACE_EXIT, "%s", __func__);
01606 }
01607 }
01608
01609 void osync_client_set_incoming_queue(OSyncClient *client, OSyncQueue *incoming)
01610 {
01611 osync_queue_set_message_handler(incoming, _osync_client_message_handler, client);
01612 osync_queue_setup_with_gmainloop(incoming, client->context);
01613 client->incoming = osync_queue_ref(incoming);
01614 osync_queue_set_pending_limit(incoming, OSYNC_QUEUE_PENDING_LIMIT);
01615 }
01616
01617 void osync_client_set_outgoing_queue(OSyncClient *client, OSyncQueue *outgoing)
01618 {
01619 osync_queue_set_message_handler(outgoing, _osync_client_hup_handler, client);
01620 osync_queue_setup_with_gmainloop(outgoing, client->context);
01621 client->outgoing = osync_queue_ref(outgoing);
01622 }
01623
01624 void osync_client_run_and_block(OSyncClient *client)
01625 {
01626 client->syncloop = g_main_loop_new(client->context, TRUE);
01627 g_main_loop_run(client->syncloop);
01628 }
01629
01630 osync_bool osync_client_run(OSyncClient *client, OSyncError **error)
01631 {
01632 client->thread = osync_thread_new(client->context, error);
01633 if (!client->thread)
01634 return FALSE;
01635
01636 osync_thread_start(client->thread);
01637
01638 return TRUE;
01639 }
01640
01641 static gboolean osyncClientConnectCallback(gpointer data)
01642 {
01643 OSyncClient *client = NULL;
01644 client = data;
01645 osync_trace(TRACE_INTERNAL, "About to connect to the incoming queue");
01646
01647
01648 if (!osync_queue_connect(client->incoming, OSYNC_QUEUE_RECEIVER, NULL))
01649 return TRUE;
01650
01651 return FALSE;
01652 }
01653
01654
01655 osync_bool osync_client_run_external(OSyncClient *client, char *pipe_path, OSyncPlugin *plugin, OSyncError **error)
01656 {
01657 OSyncQueue *incoming = NULL;
01658 GSource *source = NULL;
01659 osync_trace(TRACE_ENTRY, "%s(%p, %s, %p, %p)", __func__, client, pipe_path, plugin, error);
01660
01661 incoming = osync_queue_new(pipe_path, error);
01662 if (!incoming)
01663 goto error;
01664
01665 if (!osync_queue_create(incoming, error))
01666 goto error_free_queue;
01667
01668 osync_client_set_incoming_queue(client, incoming);
01669
01670 client->thread = osync_thread_new(client->context, error);
01671 if (!client->thread)
01672 goto error_remove_queue;
01673
01674 osync_thread_start(client->thread);
01675
01676 client->plugin = plugin;
01677 osync_plugin_ref(client->plugin);
01678
01679 source = g_idle_source_new();
01680 g_source_set_callback(source, osyncClientConnectCallback, client, NULL);
01681 g_source_attach(source, client->context);
01682
01683 osync_queue_unref(incoming);
01684
01685 osync_trace(TRACE_EXIT, "%s", __func__);
01686 return TRUE;
01687
01688 error_remove_queue:
01689 osync_queue_remove(incoming, NULL);
01690 error_free_queue:
01691 osync_queue_unref(incoming);
01692 error:
01693 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
01694 return FALSE;
01695 }
01696
01697 static gboolean osyncClientDisconnectCallback(gpointer data)
01698 {
01699 OSyncClient *client = data;
01700
01701
01702
01703
01704 osync_queue_disconnect(client->incoming, NULL);
01705
01706 if (client->outgoing) {
01707
01708 while (osync_queue_is_connected(client->outgoing)) { g_usleep(100); }
01709
01710
01711 g_usleep(200);
01712
01713 osync_queue_disconnect(client->outgoing, NULL);
01714 }
01715
01716 return FALSE;
01717 }
01718
01719
01720 static void client_disconnect_workerthread(gpointer data)
01721 {
01722 OSyncClient *client = data;
01723 GMainContext *context = g_main_context_new();
01724 GSource *source = NULL;
01725 OSyncThread *thread = NULL;
01726 source = g_idle_source_new();
01727 g_source_set_callback(source, osyncClientDisconnectCallback, client, NULL);
01728 g_source_attach(source, context);
01729 thread = osync_thread_new(context, NULL);
01730 osync_thread_start(thread);
01731
01732 osync_thread_stop(thread);
01733 osync_thread_unref(thread);
01734 thread = NULL;
01735 g_source_unref(source);
01736 }
01737
01738
01739 void osync_client_disconnect(OSyncClient *client)
01740 {
01741 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, client);
01742
01743 client->disconnectThread = g_thread_create((GThreadFunc)client_disconnect_workerthread, client, TRUE, NULL);
01744
01745 osync_trace(TRACE_EXIT, "%s", __func__);
01746 }
01747
01748 void osync_client_shutdown(OSyncClient *client)
01749 {
01750 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, client);
01751 osync_assert(client);
01752
01753 osync_client_disconnect(client);
01754
01755 if (client->syncloop) {
01756 if (g_main_loop_is_running(client->syncloop)) {
01757
01758
01759 g_main_loop_quit(client->syncloop);
01760 }
01761
01762 g_main_loop_unref(client->syncloop);
01763 client->syncloop = NULL;
01764 } else if (client->thread) {
01765 osync_thread_stop(client->thread);
01766 osync_thread_unref(client->thread);
01767 client->thread = NULL;
01768 }
01769
01770 osync_trace(TRACE_EXIT, "%s", __func__);
01771 }
01772
01773 void osync_client_error_shutdown(OSyncClient *client, OSyncError *error)
01774 {
01775 OSyncMessage *message = NULL;
01776 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, client, error);
01777
01778 message = osync_message_new_error(error, NULL);
01779 if (message) {
01780 osync_queue_send_message(client->outgoing, NULL, message, NULL);
01781 osync_message_unref(message);
01782 }
01783
01784 osync_client_shutdown(client);
01785
01786 osync_trace(TRACE_EXIT, "%s", __func__);
01787 }
01788