00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "opensync.h"
00022 #include "opensync_internals.h"
00023
00024 #include "opensync-archive.h"
00025 #include "opensync-group.h"
00026 #include "opensync-engine.h"
00027 #include "opensync-data.h"
00028 #include "opensync-format.h"
00029 #include "opensync-mapping.h"
00030
00031 #include "opensync_obj_engine_internals.h"
00032 #include "opensync_sink_engine_internals.h"
00033 #include "opensync_mapping_entry_engine_internals.h"
00034 #include "opensync_mapping_engine_internals.h"
00035
00036 #include "archive/opensync_archive_internals.h"
00037 #include "client/opensync_client_proxy_internals.h"
00038 #include "format/opensync_objformat_internals.h"
00039
00040 OSyncSinkEngine *osync_sink_engine_new(int position, OSyncClientProxy *proxy, OSyncObjEngine *objengine, OSyncError **error)
00041 {
00042 OSyncSinkEngine *sinkengine = NULL;
00043 osync_trace(TRACE_ENTRY, "%s(%i, %p, %p, %p)", __func__, position, proxy, objengine, error);
00044 osync_assert(proxy);
00045 osync_assert(objengine);
00046
00047 sinkengine = osync_try_malloc0(sizeof(OSyncSinkEngine), error);
00048 if (!sinkengine)
00049 goto error;
00050 sinkengine->ref_count = 1;
00051 sinkengine->position = position;
00052
00053
00054
00055 sinkengine->proxy = proxy;
00056
00057 sinkengine->engine = objengine;
00058 osync_obj_engine_ref(objengine);
00059
00060 osync_trace(TRACE_EXIT, "%s: %p", __func__, sinkengine);
00061 return sinkengine;
00062
00063 error:
00064 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00065 return NULL;
00066 }
00067
00068 OSyncSinkEngine *osync_sink_engine_ref(OSyncSinkEngine *engine)
00069 {
00070 osync_assert(engine);
00071
00072 g_atomic_int_inc(&(engine->ref_count));
00073
00074 return engine;
00075 }
00076
00077 void osync_sink_engine_unref(OSyncSinkEngine *engine)
00078 {
00079 osync_assert(engine);
00080
00081 if (g_atomic_int_dec_and_test(&(engine->ref_count))) {
00082 while (engine->unmapped) {
00083 OSyncChange *change = engine->unmapped->data;
00084 osync_change_unref(change);
00085
00086 engine->unmapped = osync_list_remove(engine->unmapped, engine->unmapped->data);
00087 }
00088
00089 while (engine->entries) {
00090 OSyncMappingEntryEngine *entry = engine->entries->data;
00091 osync_entry_engine_unref(entry);
00092
00093 engine->entries = osync_list_remove(engine->entries, engine->entries->data);
00094 }
00095
00096 osync_obj_engine_unref(engine->engine);
00097
00098 osync_free(engine);
00099 }
00100 }
00101
00102 osync_bool osync_sink_engine_is_connected(OSyncSinkEngine *engine)
00103 {
00104 OSyncObjEngine *objengine = NULL;
00105 osync_assert(engine);
00106
00107 objengine = engine->engine;
00108
00109 if (!objengine)
00110 return FALSE;
00111
00112 return !!(objengine->sink_connects & (1 << engine->position));
00113 }
00114
00115 const OSyncList *osync_sink_engine_get_mapping_entry_engines(OSyncSinkEngine *engine)
00116 {
00117 osync_return_val_if_fail(engine, NULL);
00118 return engine->entries;
00119 }
00120
00121 OSyncMember *osync_sink_engine_get_member(OSyncSinkEngine *engine)
00122 {
00123 osync_return_val_if_fail(engine, NULL);
00124 osync_return_val_if_fail(engine->proxy, NULL);
00125 return osync_client_proxy_get_member(engine->proxy);
00126 }
00127
00128 osync_bool osync_sink_engine_demerge(OSyncSinkEngine *engine, OSyncArchive *archive, OSyncError **error)
00129 {
00130 OSyncList *o;
00131 OSyncMember *member;
00132 OSyncCapabilities *caps;
00133
00134 osync_assert(engine);
00135 osync_assert(archive);
00136
00137 member = osync_client_proxy_get_member(engine->proxy);
00138 osync_assert(member);
00139 caps = osync_member_get_capabilities(member);
00140
00141 if (!caps)
00142 return TRUE;
00143
00144 for (o = engine->entries; o; o = o->next) {
00145 OSyncMappingEntryEngine *entry_engine = o->data;
00146 osync_assert(entry_engine);
00147
00148 if (entry_engine->change == NULL)
00149 continue;
00150
00151 if (osync_change_get_changetype(entry_engine->change) == OSYNC_CHANGE_TYPE_DELETED)
00152 continue;
00153
00154 if (!osync_objformat_has_merger(osync_change_get_objformat(entry_engine->change)))
00155 continue;
00156
00157 if (!osync_entry_engine_demerge(entry_engine, archive, caps, error))
00158 goto error;
00159
00160 }
00161
00162 return TRUE;
00163 error:
00164 return FALSE;
00165 }
00166
00167 osync_bool osync_sink_engine_convert_to_dest(OSyncSinkEngine *engine, OSyncFormatEnv *formatenv, OSyncError **error)
00168 {
00169 OSyncList *o;
00170 OSyncMember *member;
00171 OSyncObjTypeSink *objtype_sink;
00172 const char *objtype;
00173 OSyncFormatConverterPath *path = NULL;
00174
00175 osync_assert(engine);
00176 osync_assert(formatenv);
00177
00178 member = osync_client_proxy_get_member(engine->proxy);
00179 osync_assert(member);
00180
00181 objtype = osync_obj_engine_get_objtype(engine->engine);
00182 objtype_sink = osync_member_find_objtype_sink(member, objtype);
00183 osync_assert(objtype_sink);
00184
00185 for (o = engine->entries; o; o = o->next) {
00186 OSyncMappingEntryEngine *entry_engine = o->data;
00187 osync_assert(entry_engine);
00188
00189 if (entry_engine->change == NULL)
00190 continue;
00191
00192 if (osync_change_get_changetype(entry_engine->change) == OSYNC_CHANGE_TYPE_DELETED)
00193 continue;
00194
00195 if (!osync_entry_engine_convert(entry_engine, formatenv, objtype_sink, &path, error))
00196 goto error;
00197 }
00198
00199 if (path)
00200 osync_converter_path_unref(path);
00201
00202
00203 return TRUE;
00204
00205 error:
00206 if (path)
00207 osync_converter_path_unref(path);
00208
00209 return FALSE;
00210 }
00211
00212 osync_bool osync_sink_engine_write(OSyncSinkEngine *engine, OSyncArchive *archive, OSyncError **error)
00213 {
00214 OSyncList *o;
00215 const char *objtype;
00216 OSyncMember *member;
00217
00218 osync_assert(engine);
00219 osync_assert(archive);
00220
00221 objtype = osync_obj_engine_get_objtype(engine->engine);
00222 member = osync_client_proxy_get_member(engine->proxy);
00223
00224 for (o = engine->entries; o; o = o->next) {
00225 OSyncMappingEntryEngine *entry_engine = o->data;
00226 osync_assert(entry_engine);
00227
00228 if (osync_entry_engine_is_dirty(entry_engine)) {
00229 OSyncChange *change = osync_entry_engine_get_change(entry_engine);
00230 osync_assert(change);
00231
00232 osync_trace(TRACE_INTERNAL, "Writing change %s, changetype %i, format %s , objtype %s from member %lli",
00233 osync_change_get_uid(change),
00234 osync_change_get_changetype(change),
00235 osync_objformat_get_name(osync_change_get_objformat(change)),
00236 osync_change_get_objtype(change),
00237 osync_member_get_id(member));
00238
00239 if (!osync_client_proxy_commit_change(engine->proxy,
00240 osync_obj_engine_commit_change_callback,
00241 entry_engine, change, error))
00242 goto error;
00243
00244 } else if (entry_engine->change) {
00245 OSyncMapping *mapping = entry_engine->mapping_engine->mapping;
00246 OSyncMappingEntry *entry = entry_engine->entry;
00247
00248
00249
00250
00251 if (osync_change_get_changetype(entry_engine->change) == OSYNC_CHANGE_TYPE_DELETED) {
00252 if (!osync_archive_delete_change(archive, osync_mapping_entry_get_id(entry),
00253 osync_change_get_objtype(entry_engine->change), error))
00254 goto error;
00255 } else {
00256 if (!osync_archive_save_change(archive,
00257 osync_mapping_entry_get_id(entry),
00258 osync_change_get_uid(entry_engine->change),
00259 osync_change_get_objtype(entry_engine->change),
00260 osync_mapping_get_id(mapping),
00261 osync_member_get_id(member), objtype, error))
00262 goto error;
00263 }
00264 }
00265 }
00266
00267 if (!osync_client_proxy_committed_all(engine->proxy, osync_obj_engine_written_callback, engine, objtype, error))
00268 goto error;
00269
00270 return TRUE;
00271
00272 error:
00273 return FALSE;
00274 }
00275