|
|
|
@ -28,7 +28,18 @@
|
|
|
|
|
define("webodf/editor/server/pullbox/OperationRouter", [], function () {
|
|
|
|
|
"use strict";
|
|
|
|
|
|
|
|
|
|
// TODO: these eventid strings should be defined at OperationRouter interface
|
|
|
|
|
var /**@const @type {!string}*/
|
|
|
|
|
EVENT_BEFORESAVETOFILE = "beforeSaveToFile",
|
|
|
|
|
/**@const @type {!string}*/
|
|
|
|
|
EVENT_SAVEDTOFILE = "savedToFile",
|
|
|
|
|
/**@const @type {!string}*/
|
|
|
|
|
EVENT_HASLOCALUNSYNCEDOPERATIONSCHANGED = "hasLocalUnsyncedOperationsChanged",
|
|
|
|
|
/**@const @type {!string}*/
|
|
|
|
|
EVENT_HASSESSIONHOSTCONNECTIONCHANGED = "hasSessionHostConnectionChanged";
|
|
|
|
|
|
|
|
|
|
runtime.loadClass("ops.OperationTransformer");
|
|
|
|
|
runtime.loadClass("core.EventNotifier");
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* route operations in a networked collaborative manner.
|
|
|
|
@ -43,11 +54,11 @@ define("webodf/editor/server/pullbox/OperationRouter", [], function () {
|
|
|
|
|
* @constructor
|
|
|
|
|
* @implements ops.OperationRouter
|
|
|
|
|
*/
|
|
|
|
|
return function PullBoxOperationRouter(sessionId, memberId, server, odfContainer) {
|
|
|
|
|
return function PullBoxOperationRouter(sessionId, memberId, server, odfContainer, errorCallback) {
|
|
|
|
|
"use strict";
|
|
|
|
|
|
|
|
|
|
var operationFactory,
|
|
|
|
|
/**@type{function(!ops.Operation)}*/
|
|
|
|
|
/**@type{function(!ops.Operation):boolean}*/
|
|
|
|
|
playbackFunction,
|
|
|
|
|
idleTimeout = null,
|
|
|
|
|
syncOpsTimeout = null,
|
|
|
|
@ -58,7 +69,7 @@ define("webodf/editor/server/pullbox/OperationRouter", [], function () {
|
|
|
|
|
/**@type{!boolean}*/
|
|
|
|
|
isSyncCallRunning = false,
|
|
|
|
|
/**@type{!boolean}*/
|
|
|
|
|
hasUnresolvableConflict = false,
|
|
|
|
|
hasError = false,
|
|
|
|
|
/**@type{!boolean}*/
|
|
|
|
|
syncingBlocked = false,
|
|
|
|
|
/** @type {!string} id of latest op stack state known on the server */
|
|
|
|
@ -71,8 +82,20 @@ define("webodf/editor/server/pullbox/OperationRouter", [], function () {
|
|
|
|
|
unplayedServerOpspecQueue = [],
|
|
|
|
|
/** @type {!Array.<!Function>} sync request callbacks which should be called after the received ops have been applied server */
|
|
|
|
|
uncalledSyncRequestCallbacksQueue = [],
|
|
|
|
|
/** @type {!Array.<!function(!boolean):undefined>} ops created since the last sync call to the server */
|
|
|
|
|
hasLocalUnsyncedOpsStateSubscribers = [],
|
|
|
|
|
/**@type{!boolean}*/
|
|
|
|
|
hasLocalUnsyncedOps = false,
|
|
|
|
|
/** @type {!Array.<!function(!boolean):undefined>} */
|
|
|
|
|
hasSessionHostConnectionStateSubscribers = [],
|
|
|
|
|
/**@type{!boolean}*/
|
|
|
|
|
hasSessionHostConnection = true,
|
|
|
|
|
eventNotifier = new core.EventNotifier([
|
|
|
|
|
EVENT_BEFORESAVETOFILE,
|
|
|
|
|
EVENT_SAVEDTOFILE,
|
|
|
|
|
EVENT_HASLOCALUNSYNCEDOPERATIONSCHANGED,
|
|
|
|
|
EVENT_HASSESSIONHOSTCONNECTIONCHANGED
|
|
|
|
|
]),
|
|
|
|
|
/**@type{!boolean} tells if any local ops have been modifying ops */
|
|
|
|
|
hasPushedModificationOps = false,
|
|
|
|
|
operationTransformer = new ops.OperationTransformer(),
|
|
|
|
@ -84,7 +107,8 @@ define("webodf/editor/server/pullbox/OperationRouter", [], function () {
|
|
|
|
|
* @return {undefined}
|
|
|
|
|
*/
|
|
|
|
|
function updateHasLocalUnsyncedOpsState() {
|
|
|
|
|
var hasLocalUnsyncedOpsNow = (unsyncedClientOpspecQueue.length > 0);
|
|
|
|
|
var i,
|
|
|
|
|
hasLocalUnsyncedOpsNow = (unsyncedClientOpspecQueue.length > 0);
|
|
|
|
|
|
|
|
|
|
// no change?
|
|
|
|
|
if (hasLocalUnsyncedOps === hasLocalUnsyncedOpsNow) {
|
|
|
|
@ -92,6 +116,23 @@ define("webodf/editor/server/pullbox/OperationRouter", [], function () {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
hasLocalUnsyncedOps = hasLocalUnsyncedOpsNow;
|
|
|
|
|
eventNotifier.emit(EVENT_HASLOCALUNSYNCEDOPERATIONSCHANGED, hasLocalUnsyncedOps);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param {!boolean} hasConnection
|
|
|
|
|
* @return {undefined}
|
|
|
|
|
*/
|
|
|
|
|
function updateHasSessionHostConnectionState(hasConnection) {
|
|
|
|
|
var i;
|
|
|
|
|
|
|
|
|
|
// no change?
|
|
|
|
|
if (hasSessionHostConnection === hasConnection) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
hasSessionHostConnection = hasConnection;
|
|
|
|
|
eventNotifier.emit(EVENT_HASSESSIONHOSTCONNECTIONCHANGED, hasSessionHostConnection);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@ -122,9 +163,16 @@ define("webodf/editor/server/pullbox/OperationRouter", [], function () {
|
|
|
|
|
op = operationFactory.create(opspec);
|
|
|
|
|
runtime.log(" op in: "+runtime.toJson(opspec));
|
|
|
|
|
if (op !== null) {
|
|
|
|
|
playbackFunction(op);
|
|
|
|
|
if (!playbackFunction(op)) {
|
|
|
|
|
hasError = true;
|
|
|
|
|
errorCallback("opExecutionFailure");
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
hasError = true;
|
|
|
|
|
runtime.log("ignoring invalid incoming opspec: " + opspec);
|
|
|
|
|
errorCallback("unknownOpReceived");
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -214,7 +262,7 @@ define("webodf/editor/server/pullbox/OperationRouter", [], function () {
|
|
|
|
|
}, syncOpsDelay);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (isSyncCallRunning || hasUnresolvableConflict) {
|
|
|
|
|
if (isSyncCallRunning || hasError) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
// TODO: hack, remove
|
|
|
|
@ -243,13 +291,25 @@ runtime.log("OperationRouter: sending sync_ops call");
|
|
|
|
|
client_ops: syncedClientOpspecs
|
|
|
|
|
}
|
|
|
|
|
}, function(responseData) {
|
|
|
|
|
var response = /** @type{{result:string, head_seq:string, ops:Array.<!Object>}} */(runtime.fromJson(responseData));
|
|
|
|
|
var response,
|
|
|
|
|
/**@type{!boolean}*/
|
|
|
|
|
hasUnresolvableConflict = false;
|
|
|
|
|
|
|
|
|
|
updateHasSessionHostConnectionState(true);
|
|
|
|
|
|
|
|
|
|
// TODO: hack, remove
|
|
|
|
|
if (syncingBlocked) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
response = /** @type{{result:string, head_seq:string, ops:Array.<!Object>}} */(runtime.fromJson(responseData));
|
|
|
|
|
} catch (e) {
|
|
|
|
|
hasError = true;
|
|
|
|
|
runtime.log("Could not parse reply: "+responseData);
|
|
|
|
|
errorCallback("unknownServerReply");
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
// TODO: hack, remove
|
|
|
|
|
runtime.log("sync_ops reply: " + responseData);
|
|
|
|
|
|
|
|
|
|
// just new ops?
|
|
|
|
@ -290,20 +350,28 @@ runtime.log("OperationRouter: sending sync_ops call");
|
|
|
|
|
if (!hasUnresolvableConflict) {
|
|
|
|
|
isInstantSyncRequested = true;
|
|
|
|
|
}
|
|
|
|
|
} else if (response.result === "error") {
|
|
|
|
|
runtime.log("server reports an error: "+response.error);
|
|
|
|
|
hasError = true;
|
|
|
|
|
errorCallback(
|
|
|
|
|
response.error === "ENOSESSION" ? "sessionDoesNotExist":
|
|
|
|
|
response.error === "ENOMEMBER" ? "notMemberOfSession":
|
|
|
|
|
"unknownServerReply"
|
|
|
|
|
);
|
|
|
|
|
return;
|
|
|
|
|
} else {
|
|
|
|
|
runtime.assert(false, "Unexpected result on sync-ops call: "+response.result);
|
|
|
|
|
hasError = true;
|
|
|
|
|
runtime.log("Unexpected result on sync-ops call: "+response.result);
|
|
|
|
|
errorCallback("unknownServerReply");
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// unlock
|
|
|
|
|
isSyncCallRunning = false;
|
|
|
|
|
|
|
|
|
|
if (hasUnresolvableConflict) {
|
|
|
|
|
// TODO: offer option to reload session automatically?
|
|
|
|
|
runtime.assert(false,
|
|
|
|
|
"Sorry to tell:\n" +
|
|
|
|
|
"we hit a pair of operations in a state which yet need to be supported for transformation against each other.\n" +
|
|
|
|
|
"Client disconnected from session, no further editing accepted.\n\n" +
|
|
|
|
|
"Please reconnect manually for now.");
|
|
|
|
|
hasError = true;
|
|
|
|
|
errorCallback("unresolvableConflictingOps");
|
|
|
|
|
} else {
|
|
|
|
|
// prepare next sync
|
|
|
|
|
if (isInstantSyncRequested) {
|
|
|
|
@ -318,6 +386,22 @@ runtime.log("OperationRouter: sending sync_ops call");
|
|
|
|
|
}
|
|
|
|
|
playUnplayedServerOpSpecs();
|
|
|
|
|
}
|
|
|
|
|
}, function() {
|
|
|
|
|
runtime.log("meh, server cannot be reached ATM.");
|
|
|
|
|
// signal connection problem, but do not give up for now
|
|
|
|
|
updateHasSessionHostConnectionState(false);
|
|
|
|
|
// put the (not) send ops back into the outgoing queue
|
|
|
|
|
unsyncedClientOpspecQueue = syncedClientOpspecs.concat(unsyncedClientOpspecQueue);
|
|
|
|
|
syncRequestCallbacksQueue = syncRequestCallbacksArray.concat(syncRequestCallbacksQueue);
|
|
|
|
|
// unlock
|
|
|
|
|
isSyncCallRunning = false;
|
|
|
|
|
// nothing on client to sync?
|
|
|
|
|
if (unsyncedClientOpspecQueue.length === 0) {
|
|
|
|
|
idleTimeout = runtime.getWindow().setTimeout(startSyncOpsTimeout, idleDelay);
|
|
|
|
|
} else {
|
|
|
|
|
startSyncOpsTimeout();
|
|
|
|
|
}
|
|
|
|
|
playUnplayedServerOpSpecs();
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -381,7 +465,7 @@ runtime.log("OperationRouter: instant opsSync requested");
|
|
|
|
|
/**
|
|
|
|
|
* Sets the method which should be called to apply operations.
|
|
|
|
|
*
|
|
|
|
|
* @param {!function(!ops.Operation)} playback_func
|
|
|
|
|
* @param {!function(!ops.Operation):boolean} playback_func
|
|
|
|
|
* @return {undefined}
|
|
|
|
|
*/
|
|
|
|
|
this.setPlaybackFunction = function (playback_func) {
|
|
|
|
@ -395,7 +479,10 @@ runtime.log("OperationRouter: instant opsSync requested");
|
|
|
|
|
* @return {undefined}
|
|
|
|
|
*/
|
|
|
|
|
this.push = function (operations) {
|
|
|
|
|
if (hasUnresolvableConflict) {
|
|
|
|
|
var i, op, opspec,
|
|
|
|
|
timestamp = (new Date()).getTime();
|
|
|
|
|
|
|
|
|
|
if (hasError) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
// TODO: should be an assert in the future
|
|
|
|
@ -406,22 +493,27 @@ runtime.log("OperationRouter: instant opsSync requested");
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
operations.forEach(function(op) {
|
|
|
|
|
var timedOp,
|
|
|
|
|
opspec = op.spec();
|
|
|
|
|
for (i = 0; i < operations.length; i += 1) {
|
|
|
|
|
op = operations[i];
|
|
|
|
|
opspec = op.spec();
|
|
|
|
|
|
|
|
|
|
// note if any local ops modified
|
|
|
|
|
hasPushedModificationOps = hasPushedModificationOps || op.isEdit;
|
|
|
|
|
|
|
|
|
|
// apply locally
|
|
|
|
|
opspec.timestamp = (new Date()).getTime();
|
|
|
|
|
timedOp = operationFactory.create(opspec);
|
|
|
|
|
// add timestamp TODO: improve the useless recreation of the op
|
|
|
|
|
opspec.timestamp = timestamp;
|
|
|
|
|
op = operationFactory.create(opspec);
|
|
|
|
|
|
|
|
|
|
playbackFunction(timedOp);
|
|
|
|
|
// apply locally
|
|
|
|
|
if (!playbackFunction(op)) {
|
|
|
|
|
hasError = true;
|
|
|
|
|
errorCallback("opExecutionFailure");
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// send to server
|
|
|
|
|
unsyncedClientOpspecQueue.push(opspec);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
triggerPushingOps();
|
|
|
|
|
|
|
|
|
@ -434,25 +526,65 @@ runtime.log("OperationRouter: instant opsSync requested");
|
|
|
|
|
* A callback is called on success.
|
|
|
|
|
*/
|
|
|
|
|
this.close = function (cb) {
|
|
|
|
|
function cbDoneSaving(err) {
|
|
|
|
|
eventNotifier.emit(EVENT_SAVEDTOFILE, null);
|
|
|
|
|
cb(err);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function cbSuccess(fileData) {
|
|
|
|
|
server.writeSessionStateToFile(sessionId, memberId, lastServerSeq, fileData, cb);
|
|
|
|
|
server.writeSessionStateToFile(sessionId, memberId, lastServerSeq, fileData, cbDoneSaving);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function doClose() {
|
|
|
|
|
syncingBlocked = true;
|
|
|
|
|
if (hasPushedModificationOps) {
|
|
|
|
|
odfContainer.createByteArray(cbSuccess, cb);
|
|
|
|
|
eventNotifier.emit(EVENT_BEFORESAVETOFILE, null);
|
|
|
|
|
|
|
|
|
|
odfContainer.createByteArray(cbSuccess, cbDoneSaving);
|
|
|
|
|
} else {
|
|
|
|
|
cb();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (hasLocalUnsyncedOps) {
|
|
|
|
|
if (hasError) {
|
|
|
|
|
cb();
|
|
|
|
|
} else if (hasLocalUnsyncedOps) {
|
|
|
|
|
requestInstantOpsSync(doClose);
|
|
|
|
|
} else {
|
|
|
|
|
doClose();
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param {!string} eventId
|
|
|
|
|
* @param {!Function} cb
|
|
|
|
|
* @return {undefined}
|
|
|
|
|
*/
|
|
|
|
|
this.subscribe = function (eventId, cb) {
|
|
|
|
|
eventNotifier.subscribe(eventId, cb);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param {!string} eventId
|
|
|
|
|
* @param {!Function} cb
|
|
|
|
|
* @return {undefined}
|
|
|
|
|
*/
|
|
|
|
|
this.unsubscribe = function (eventId, cb) {
|
|
|
|
|
eventNotifier.unsubscribe(eventId, cb);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @return {!boolean}
|
|
|
|
|
*/
|
|
|
|
|
this.hasLocalUnsyncedOps = function () {
|
|
|
|
|
return hasLocalUnsyncedOps;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @return {!boolean}
|
|
|
|
|
*/
|
|
|
|
|
this.hasSessionHostConnection = function () {
|
|
|
|
|
return hasSessionHostConnection;
|
|
|
|
|
};
|
|
|
|
|
};
|
|
|
|
|
});
|
|
|
|
|