@ -28,7 +28,18 @@
define ( "webodf/editor/server/pullbox/OperationRouter" , [ ] , function ( ) {
"use strict" ;
// TODO: these eventid strings should be defined at OperationRouter interface
var /**@const @type {!string}*/
EVENT _BEFORESAVETOFILE = "beforeSaveToFile" ,
/**@const @type {!string}*/
EVENT _SAVEDTOFILE = "savedToFile" ,
/**@const @type {!string}*/
EVENT _HASLOCALUNSYNCEDOPERATIONSCHANGED = "hasLocalUnsyncedOperationsChanged" ,
/**@const @type {!string}*/
EVENT _HASSESSIONHOSTCONNECTIONCHANGED = "hasSessionHostConnectionChanged" ;
runtime . loadClass ( "ops.OperationTransformer" ) ;
runtime . loadClass ( "core.EventNotifier" ) ;
/ * *
* route operations in a networked collaborative manner .
@ -43,11 +54,11 @@ define("webodf/editor/server/pullbox/OperationRouter", [], function () {
* @ constructor
* @ implements ops . OperationRouter
* /
return function PullBoxOperationRouter ( sessionId , memberId , server , odfContainer ) {
return function PullBoxOperationRouter ( sessionId , memberId , server , odfContainer , errorCallback ) {
"use strict" ;
var operationFactory ,
/**@type{function(!ops.Operation) }*/
/**@type{function(!ops.Operation) :boolean }*/
playbackFunction ,
idleTimeout = null ,
syncOpsTimeout = null ,
@ -58,7 +69,7 @@ define("webodf/editor/server/pullbox/OperationRouter", [], function () {
/**@type{!boolean}*/
isSyncCallRunning = false ,
/**@type{!boolean}*/
has UnresolvableConflict = false ,
has Error = false ,
/**@type{!boolean}*/
syncingBlocked = false ,
/** @type {!string} id of latest op stack state known on the server */
@ -71,8 +82,20 @@ define("webodf/editor/server/pullbox/OperationRouter", [], function () {
unplayedServerOpspecQueue = [ ] ,
/** @type {!Array.<!Function>} sync request callbacks which should be called after the received ops have been applied server */
uncalledSyncRequestCallbacksQueue = [ ] ,
/** @type {!Array.<!function(!boolean):undefined>} ops created since the last sync call to the server */
hasLocalUnsyncedOpsStateSubscribers = [ ] ,
/**@type{!boolean}*/
hasLocalUnsyncedOps = false ,
/** @type {!Array.<!function(!boolean):undefined>} */
hasSessionHostConnectionStateSubscribers = [ ] ,
/**@type{!boolean}*/
hasSessionHostConnection = true ,
eventNotifier = new core . EventNotifier ( [
EVENT _BEFORESAVETOFILE ,
EVENT _SAVEDTOFILE ,
EVENT _HASLOCALUNSYNCEDOPERATIONSCHANGED ,
EVENT _HASSESSIONHOSTCONNECTIONCHANGED
] ) ,
/**@type{!boolean} tells if any local ops have been modifying ops */
hasPushedModificationOps = false ,
operationTransformer = new ops . OperationTransformer ( ) ,
@ -84,7 +107,8 @@ define("webodf/editor/server/pullbox/OperationRouter", [], function () {
* @ return { undefined }
* /
function updateHasLocalUnsyncedOpsState ( ) {
var hasLocalUnsyncedOpsNow = ( unsyncedClientOpspecQueue . length > 0 ) ;
var i ,
hasLocalUnsyncedOpsNow = ( unsyncedClientOpspecQueue . length > 0 ) ;
// no change?
if ( hasLocalUnsyncedOps === hasLocalUnsyncedOpsNow ) {
@ -92,6 +116,23 @@ define("webodf/editor/server/pullbox/OperationRouter", [], function () {
}
hasLocalUnsyncedOps = hasLocalUnsyncedOpsNow ;
eventNotifier . emit ( EVENT _HASLOCALUNSYNCEDOPERATIONSCHANGED , hasLocalUnsyncedOps ) ;
}
/ * *
* @ param { ! boolean } hasConnection
* @ return { undefined }
* /
function updateHasSessionHostConnectionState ( hasConnection ) {
var i ;
// no change?
if ( hasSessionHostConnection === hasConnection ) {
return ;
}
hasSessionHostConnection = hasConnection ;
eventNotifier . emit ( EVENT _HASSESSIONHOSTCONNECTIONCHANGED , hasSessionHostConnection ) ;
}
/ * *
@ -122,9 +163,16 @@ define("webodf/editor/server/pullbox/OperationRouter", [], function () {
op = operationFactory . create ( opspec ) ;
runtime . log ( " op in: " + runtime . toJson ( opspec ) ) ;
if ( op !== null ) {
playbackFunction ( op ) ;
if ( ! playbackFunction ( op ) ) {
hasError = true ;
errorCallback ( "opExecutionFailure" ) ;
return ;
}
} else {
hasError = true ;
runtime . log ( "ignoring invalid incoming opspec: " + opspec ) ;
errorCallback ( "unknownOpReceived" ) ;
return ;
}
}
@ -214,7 +262,7 @@ define("webodf/editor/server/pullbox/OperationRouter", [], function () {
} , syncOpsDelay ) ;
}
if ( isSyncCallRunning || has UnresolvableConflict ) {
if ( isSyncCallRunning || has Error ) {
return ;
}
// TODO: hack, remove
@ -243,13 +291,25 @@ runtime.log("OperationRouter: sending sync_ops call");
client _ops : syncedClientOpspecs
}
} , function ( responseData ) {
var response = /** @type{{result:string, head_seq:string, ops:Array.<!Object>}} */ ( runtime . fromJson ( responseData ) ) ;
var response ,
/**@type{!boolean}*/
hasUnresolvableConflict = false ;
updateHasSessionHostConnectionState ( true ) ;
// TODO: hack, remove
if ( syncingBlocked ) {
return ;
}
try {
response = /** @type{{result:string, head_seq:string, ops:Array.<!Object>}} */ ( runtime . fromJson ( responseData ) ) ;
} catch ( e ) {
hasError = true ;
runtime . log ( "Could not parse reply: " + responseData ) ;
errorCallback ( "unknownServerReply" ) ;
return ;
}
// TODO: hack, remove
runtime . log ( "sync_ops reply: " + responseData ) ;
// just new ops?
@ -290,20 +350,28 @@ runtime.log("OperationRouter: sending sync_ops call");
if ( ! hasUnresolvableConflict ) {
isInstantSyncRequested = true ;
}
} else if ( response . result === "error" ) {
runtime . log ( "server reports an error: " + response . error ) ;
hasError = true ;
errorCallback (
response . error === "ENOSESSION" ? "sessionDoesNotExist" :
response . error === "ENOMEMBER" ? "notMemberOfSession" :
"unknownServerReply"
) ;
return ;
} else {
runtime . assert ( false , "Unexpected result on sync-ops call: " + response . result ) ;
hasError = true ;
runtime . log ( "Unexpected result on sync-ops call: " + response . result ) ;
errorCallback ( "unknownServerReply" ) ;
return ;
}
// unlock
isSyncCallRunning = false ;
if ( hasUnresolvableConflict ) {
// TODO: offer option to reload session automatically?
runtime . assert ( false ,
"Sorry to tell:\n" +
"we hit a pair of operations in a state which yet need to be supported for transformation against each other.\n" +
"Client disconnected from session, no further editing accepted.\n\n" +
"Please reconnect manually for now." ) ;
hasError = true ;
errorCallback ( "unresolvableConflictingOps" ) ;
} else {
// prepare next sync
if ( isInstantSyncRequested ) {
@ -318,6 +386,22 @@ runtime.log("OperationRouter: sending sync_ops call");
}
playUnplayedServerOpSpecs ( ) ;
}
} , function ( ) {
runtime . log ( "meh, server cannot be reached ATM." ) ;
// signal connection problem, but do not give up for now
updateHasSessionHostConnectionState ( false ) ;
// put the (not) send ops back into the outgoing queue
unsyncedClientOpspecQueue = syncedClientOpspecs . concat ( unsyncedClientOpspecQueue ) ;
syncRequestCallbacksQueue = syncRequestCallbacksArray . concat ( syncRequestCallbacksQueue ) ;
// unlock
isSyncCallRunning = false ;
// nothing on client to sync?
if ( unsyncedClientOpspecQueue . length === 0 ) {
idleTimeout = runtime . getWindow ( ) . setTimeout ( startSyncOpsTimeout , idleDelay ) ;
} else {
startSyncOpsTimeout ( ) ;
}
playUnplayedServerOpSpecs ( ) ;
} ) ;
}
@ -381,7 +465,7 @@ runtime.log("OperationRouter: instant opsSync requested");
/ * *
* Sets the method which should be called to apply operations .
*
* @ param { ! function ( ! ops . Operation ) } playback _func
* @ param { ! function ( ! ops . Operation ) : boolean } playback _func
* @ return { undefined }
* /
this . setPlaybackFunction = function ( playback _func ) {
@ -395,7 +479,10 @@ runtime.log("OperationRouter: instant opsSync requested");
* @ return { undefined }
* /
this . push = function ( operations ) {
if ( hasUnresolvableConflict ) {
var i , op , opspec ,
timestamp = ( new Date ( ) ) . getTime ( ) ;
if ( hasError ) {
return ;
}
// TODO: should be an assert in the future
@ -406,22 +493,27 @@ runtime.log("OperationRouter: instant opsSync requested");
return ;
}
operations . forEach ( function ( op ) {
var timedOp ,
opspec = op . spec ( ) ;
for ( i = 0 ; i < operations . length ; i += 1 ) {
op = operations [ i ] ;
opspec = op . spec ( ) ;
// note if any local ops modified
hasPushedModificationOps = hasPushedModificationOps || op . isEdit ;
// a pply locally
opspec . timestamp = ( new Date ( ) ) . getTime ( ) ;
timedO p = operationFactory . create ( opspec ) ;
// a dd timestamp TODO: improve the useless recreation of the op
opspec . timestamp = timestamp ;
o p = operationFactory . create ( opspec ) ;
playbackFunction ( timedOp ) ;
// apply locally
if ( ! playbackFunction ( op ) ) {
hasError = true ;
errorCallback ( "opExecutionFailure" ) ;
return ;
}
// send to server
unsyncedClientOpspecQueue . push ( opspec ) ;
} ) ;
}
triggerPushingOps ( ) ;
@ -434,25 +526,65 @@ runtime.log("OperationRouter: instant opsSync requested");
* A callback is called on success .
* /
this . close = function ( cb ) {
function cbDoneSaving ( err ) {
eventNotifier . emit ( EVENT _SAVEDTOFILE , null ) ;
cb ( err ) ;
}
function cbSuccess ( fileData ) {
server . writeSessionStateToFile ( sessionId , memberId , lastServerSeq , fileData , cb ) ;
server . writeSessionStateToFile ( sessionId , memberId , lastServerSeq , fileData , cb DoneSaving ) ;
}
function doClose ( ) {
syncingBlocked = true ;
if ( hasPushedModificationOps ) {
odfContainer . createByteArray ( cbSuccess , cb ) ;
eventNotifier . emit ( EVENT _BEFORESAVETOFILE , null ) ;
odfContainer . createByteArray ( cbSuccess , cbDoneSaving ) ;
} else {
cb ( ) ;
}
}
if ( hasLocalUnsyncedOps ) {
if ( hasError ) {
cb ( ) ;
} else if ( hasLocalUnsyncedOps ) {
requestInstantOpsSync ( doClose ) ;
} else {
doClose ( ) ;
}
} ;
/ * *
* @ param { ! string } eventId
* @ param { ! Function } cb
* @ return { undefined }
* /
this . subscribe = function ( eventId , cb ) {
eventNotifier . subscribe ( eventId , cb ) ;
} ;
/ * *
* @ param { ! string } eventId
* @ param { ! Function } cb
* @ return { undefined }
* /
this . unsubscribe = function ( eventId , cb ) {
eventNotifier . unsubscribe ( eventId , cb ) ;
} ;
/ * *
* @ return { ! boolean }
* /
this . hasLocalUnsyncedOps = function ( ) {
return hasLocalUnsyncedOps ;
} ;
/ * *
* @ return { ! boolean }
* /
this . hasSessionHostConnection = function ( ) {
return hasSessionHostConnection ;
} ;
} ;
} ) ;