Details of the Main Classes

ActiveMqRemoteReceiver

Calling the Receive() method on this class, attempts to retrieve a message from an ActiveMQ queue. The underlying ActiveMQ consumer Recieve() method is synchronous (i.e. blocks the calling thread until a message is received), which means cancelling the receive operation would not be possible until a message was received and control returned to the calling thread. This would prevent the class from being able to cancel, disconnect, and cleanup cleanly. Hence the ActiveMQ Recieve() method is called with a timeout parameter. If no message is received before the timeout expires, the receive operation returns control to the calling thread, and is again called in a loop. The loop only repeats if a cancel request has not been issued (via the classes' CancelReceive() method).

The length of the timeout is configured via the 'connectLoopTimeout' parameter passed to the ActiveMqRemoteReceiver constructor. Varying the length of the timeout controls the balance between responsiveness to a cancel request and CPU usage of the class. For example if the timeout is set to 200ms, the class will take 100ms on average to respond to a cancel. If the timeout is reduced to 20ms, the average cancel response time will be reduced to 10ms, at the expense of greater CPU usage from iterating through the loop more often. Lower values are better if the CancelReceive() method will be regularly called during the lifetime of the instance of the class. Higher values are preferred if CancelReceive() is only called on shutdown or error.

MethodInvocationRemoteSender

This class contains 2 methods to send remote method invocations InvokeMethod() and InvokeVoidMethod(). Depending on the type of remote method being sent, the appropriate method should be called. In the case of InvokeMethod(), the return value of the remote method will be returned to the client. For InvokeVoidMethod(), the class will expect, and assert that a void return value indicator is returned by the remote invocation.

MethodInvocationRemoteReceiver

Clients of this class should subscribe to the event 'MethodInvocationReceived'. Subsequently they should call the asyncronous Recieve() method, to start receiving incoming method invocations. The receive method starts a worker thread which continually checks for new method invocations, and triggers the 'MethodInvocationReceived' event when any are received. Client code should be aware that the 'MethodInvocationReceived' event will be triggered on the worker thread, and should handle any retrieved method invocations accordingly. For example, if the client code is a Windows forms or Swing-based GUI application, appropriate methods should be called to ensure the GUI is updated in a thread safe way (e.g. using Form.Invoke() or SwingUtilities.invokeAndWait()). The SendReturnValue() method should be called after receiving and processing a method call, to send the return value back to the calling code. In the case of void methods, SendVoidReturn() must be called to send notification back to the calling code that processing has finished. The CancelReceive() method is called to stop the worker thread from receiving further method invocations.

MethodInvocationSerializer

Key to the ability to customize the MethodInvocationRemoting framework, is to change and enhance the functionality of the MethodInvocationSerializer class (e.g. to add functionality to serialize and deserialize your own custom classes). This section explains the internals of this class in detail. The MethodInvocationSerializer constructor requires an object implementing the ISerializerOperationMap interface. ISerializerOperationMap classes are used to store mappings between a native type, its serialized representation, and methods which are used to serialze and deserialize the type. MethodInvocationSerializer itself contains methods to serialize and deserialize method invocations and their return values. The C# version of the class also includes a method AddIXmlSerializableOperations() which adds support for serializing and deserializing objects which implement the System.Xml.Serialization.IXmlSerializable interface.

The operation of the MethodInvocationSerializer class was inspired by the ReadXml() and WriteXml() methods in the IXmlSerializable interface. In IXmlSerializable, implementing classes must provide ReadXml() and WriteXml() methods which take a location pointer within an XML stream as input. When serializing or deserializing, the ReadXml() and WriteXml() methods only deal with the XML data at the location pointer, and know nothing about the previous sections of the XML document. A similar design is used in MethodInvocationSerializer. On the C# side the serialization code is specified via delegates 'XmlSerializationOperation' and 'XmlDeserializationOperation' which also take a location pointer within an XML stream as input and only serialize and deserialize within an isolated area of the XML document. In Java the serialization code is specified by implementing methods Serialize() and Deserialize() in the IObjectSerializer interface, and again these methods are passed a location pointer in an XML stream.

Supported Data Types - MethodInvocationSerializer natively supports the following data types...

Serialized Representation C# Type Java Type
integer System.Int32 java.lang.Integer
string System.String java.lang.String
signedByte System.SByte java.lang.Byte
shortInteger System.Int16 java.lang.Short
longInteger System.Int64 java.lang.Long
float System.Single java.lang.Float
double System.Double java.lang.Double
char System.Char java.lang.Character
bool System.Boolean java.lang.Boolean
decimal System.Decimal java.math.BigDecimal
dateTime System.DateTime java.util.GregorianCalendar
integerArray System.Int32[] java.lang.Integer[]
stringArray System.String[] java.lang.String[]
signedByteArray System.SByte[] java.lang.Byte[]
shortIntegerArray System.Int16[] java.lang.Short[]
longIntegerArray System.Int64[] java.lang.Long[]
floatArray System.Single[] java.lang.Float[]
doubleArray System.Double[] java.lang.Double[]
charArray System.Char[] java.lang.Character[]
boolArray System.Boolean[] java.lang.Boolean[]
decimalArray System.Decimal[] java.lang.BigDecimal[]
dateTimeArray System.DateTime[] java.lang.GregorianCalendar[]

XML Samples - Samples of the native XML document format used by the MethodInvocationSerializer class appear below.

A sample of an invocation of a complete method (i.e. one which has parameters and is non-void) with a single integer parameter and decimal return type appears below...

<?xml version="1.0" encoding="utf-8"?> <MethodInvocation> <MethodName>ApproximatePi</MethodName> <Parameters> <Parameter> <DataType>integer</DataType> <Data>10000</Data> </Parameter> </Parameters> <ReturnType> <DataType>decimal</DataType> </ReturnType> </MethodInvocation>

Parameterless and void methods produce similar XML documents, but with empty 'Parameters' and 'ReturnType' tags. Return values are represented by an XML document similar to the following...

<?xml version="1.0" encoding="utf-8"?> <ReturnValue> <DataType>decimal</DataType> <Data>3.1496</Data> </ReturnValue>

A sample of an array element as part of a method invocation's parameters or return value is as follows (contained in the 'Parameters' or 'ReturnType' tags)...

<DataType>integerArray</DataType> <Data> <ElementDataType>integer</ElementDataType> <Element> <DataType>integer</DataType> <Data>123</Data> </Element> <Element> <DataType>integer</DataType> <Data>-456</Data> </Element> </Data>

Serialization and Deserialization Process - To facilitate maximum reuse of routines, the code inside the MethodInvocationSerializer class is broken up into a number of private methods, each of which handle reading or writing specific sections of the XML document. This is outlined in the table below...

XML Section Serialization Method Deserialization Method
<?xml version="1.0" encoding="utf-8"?> <MethodInvocation> <MethodName>ApproximatePi</MethodName> Serialize() Deserialize()
<Parameters> SerializeParameters() DeserializeParameters()
<Parameter> <DataType>integer</DataType> SerializeItem() DeserializeItem()
<Data> SerializeObject() DeserializeObject()
10000 SerializeInt16() DeserializeInt16()
</Data> SerializeObject() DeserializeObject()*
</Parameter> SerializeItem() DeserializeItem()
</Parameters> SerializeParameters() DeserializeParameters()
<ReturnType> <DataType>decimal</DataType> </ReturnType> SerializeReturnType() DeserializeReturnType()
</MethodInvocation> Serialize() Deserialize()
<?xml version="1.0" encoding="utf-8"?> SerializeReturnValue() DeserializeReturnValue()
<ReturnValue> <DataType>decimal</DataType> SerializeItem() DeserializeItem()
<Data> SerializeObject() DeserializeObject()
3.1496 SerializeDecimal() DeserializeDecimal()
</Data> SerializeObject() DeserializeObject()*
</ReturnValue> SerializeItem() DeserializeItem()

* In the Java implementation the closing </Data> tag is consumed by the specific type deserialization routine (e.g. DeserializeInt16(), DeserializeDecimal()) not DeserializeObject().

C# and Java Implementation Differences - The serialization and deserialization code in MethodInvocationSerializer is highly dependent on the underlying classes and methods which read and write XML documents (System.Xml.XmlWriter and XmlReader in C#, and javax.xml.stream.XMLStreamWriter and XMLStreamReader in Java). As there are some basic differences in the way these underlying classes work, the MethodInvocationSerializer code in C# and Java has some slight differences...

Handling of self-closing XML tags

The Java XMLStreamReader class does not distinguish between the XML tags <Parameters></Parameters> and <Parameters />. The getEventType() method returns the same sequence of events (i.e. a START_ELEMENT followed by an END_ELEMENT) regardless of whether an empty element, or self-closing tags are used. However the IsEmptyElement property of the C# XmlReader class returns true for self-closing tags (e.g. <Parameters />), but false for tags of the format <Parameters></Parameters>. For this reason additional code is required in the C# methods to handle both empty elements and self-closing tags. See the below code from the private method DeserializeParameters() as an example...

if (reader.IsEmptyElement == true) { // Consume parameters self closing tag (e.g. <Parameters />) reader.ReadElementString(parametersElementName); } else { // Consume parameters start tag (e.g. <Parameters>) reader.ReadStartElement(parametersElementName); // If IsStartElement() returns true there are parameters to read. If it returns false, the next tag is a parameters end tag (e.g. </Parameters>). if (IsStartElement(parameterElementName, reader) == true) { int baseDepth = reader.Depth; while (reader.Depth >= baseDepth) { object parameter = DeserializeItem(parameterElementName, reader); returnParameterArray.Add(parameter); } } // Consume parameters end tag (e.g. </Parameters>) reader.ReadEndElement(); }

The equivalent Java code is simpler because there is no need for separate if/else statements to check for self-closing tags...

// Consume parameters start tag (e.g. <Parameters>) reader.ReadStartElement(parametersElementName); // Attempt to consume parameter start tag (e.g. <Parameter>) // If IsNextNodeStartElement() call returns false then the closing </Parameters> tag is consumed if(reader.IsNextNodeStartElement(parameterElementName) == true) { // At this point the <Parameter> tag would have been already read, so need to set base depth 1 level lower than this at <Parameters> int baseDepth = (reader.getDepth() - 1); while(reader.getDepth() >= baseDepth) { Object parameter = DeserializeItem(reader); returnParameterArray.add(parameter); // Consume parameter start tag (e.g. <Parameter>) // If IsNextNodeStartElement() call returns false then the closing </Parameters> tag is consumed, baseDepth becomes lower than depth, and the loop will end reader.IsNextNodeStartElement(parameterElementName); } }

Node 'look ahead'

The C# XmlReader class offers a 'look ahead' functionality, where it is able to return properties of the next XML node in the stream before actually reading the node. Take the following two XML samples as an example (these are both portions of XML created by the MethodInvocationSerializer class)...

<Parameters> <Parameter> <DataType>integer</DataType> <Data>10000</Data> </Parameter>| <Parameter> <DataType>boolean</DataType> <Data>true</Data> </Parameter> </Parameters> <Parameters> <Parameter> <DataType>integer</DataType> <Data>10000</Data> </Parameter>| </Parameters>

Assume the XmlReader class has read up to the position of the red pipe (|) character in the above XML streams. As the number of parameters in a method invocation is variable, at the position of the pipe, the next node could either be the start element of a new parameter, or the closing element of the whole parameters section. Using the XmlReader class, we can call the IsStartElement("Parameter") method to determine whether the next node is the start of a 'Parameter' element, before actually reading the element using the ReadStartElement() method. In the Java case however, the XMLStreamReader class does not have such 'look ahead' functionality. You cannot determine the type of a tag until after reading it using the next() method. The consequence is there are some differences in the deserialization code between the C# and Java versions of the MethodInvocationSerializer class. For example in the code loop which reads the parameters in the above XML samples (the method DeserializeParameters()), the C# version can use the 'IsStartElement' method to look ahead and determine whether the next node is a <Parameter> tag, and hence whether to call the DeserializeItem() method to deserialize the individual parameter. However in the Java version, the next node must be actually read by the next() method (this is performed implicitly by the SimplifiedXMLStreamReader.IsNextNodeStartElement() method), before deciding whether to call DeserializeItem(). The follow on consequence of this is that the C# version of DeserializeItem() always reads the 'Parameter' start tag as its first step, whereas the Java version does not. Code samples from the C# and Java versions of the 2 classes appear below...

C#

private ArrayList DeserializeParameters(XmlReader reader) { ArrayList returnParameterArray = new ArrayList(); if (reader.IsEmptyElement == true) { // Consume parameters self closing tag (e.g. <Parameters />) reader.ReadElementString(parametersElementName); } else { // Consume parameters start tag (e.g. <Parameters>) reader.ReadStartElement(parametersElementName); // If IsStartElement() returns true there are parameters to read. If it returns false, the next tag is a parameters end tag (e.g. </Parameters>). if (IsStartElement(parameterElementName, reader) == true) { int baseDepth = reader.Depth; while (reader.Depth >= baseDepth) { object parameter = DeserializeItem(parameterElementName, reader); returnParameterArray.Add(parameter); } } // Consume parameters end tag (e.g. </Parameters>) reader.ReadEndElement(); } return returnParameterArray; }

Note that the DeserializeItem method always consumes the <Parameter> start element in C#...

private object DeserializeItem(string elementName, XmlReader reader) { string datatype; object data = null; if (reader.IsEmptyElement == true) { // Consume parameter self closing tag (e.g. <Parameter />, <ReturnValue />). Item is null. reader.ReadElementString(elementName); } else { // Consume parameter start tag (e.g. <Parameter>, <ReturnValue>) reader.ReadStartElement(elementName); // If IsStartElement() returns true the item is non-null. If it returns false, the next tag is an end tag (e.g. </Parameter>, </ReturnValue>), and the item is null. if (IsStartElement(dataTypeElementName, reader) == true) { datatype = reader.ReadElementString(dataTypeElementName); data = DeserializeObject(datatype, reader); } // Consume parameter end tag (e.g. </Parameter>, </ReturnValue>) reader.ReadEndElement(); } return data; }

Java

... whereas in Java it is consumed at the end of the DeserializeParameters() method.

private ArrayList DeserializeParameters(SimplifiedXMLStreamReader reader) throws XMLStreamException, Exception { ArrayList returnParameterArray = new ArrayList(); // Consume parameters start tag (e.g. <Parameters>) reader.ReadStartElement(parametersElementName); // Attempt to consume parameter start tag (e.g. <Parameter>) // If IsNextNodeStartElement() call returns false then the closing </Parameters> tag is consumed if(reader.IsNextNodeStartElement(parameterElementName) == true) { // At this point the <Parameter> tag would have been already read, so need to set base depth 1 level lower than this at <Parameters> int baseDepth = (reader.getDepth() - 1); while(reader.getDepth() >= baseDepth) { Object parameter = DeserializeItem(reader); returnParameterArray.add(parameter); // Consume parameter start tag (e.g. <Parameter>) // If IsNextNodeStartElement() call returns false then the closing </Parameters> tag is consumed, baseDepth becomes lower than depth, and the loop will end reader.IsNextNodeStartElement(parameterElementName); } } return returnParameterArray; } private Object DeserializeItem(SimplifiedXMLStreamReader reader) throws XMLStreamException, Exception { String datatype; Object data = null; if(reader.IsNextNodeStartElement(dataTypeElementName) == true) { datatype = reader.ReadString(); // Consume data type end tag (e.g. </DataType>) reader.ReadEndElement(); data = DeserializeObject(datatype, reader); // Consume parameter end tag (e.g. </Parameter>, <ReturnValue>) reader.ReadEndElement(); } return data; }

Differences in SerializerOperationMap Implementation - Delegates vs Inner Classes

The purpose of the SerializerOperationMap class is to provide the MethodInvocationSerializer class with a mapping between an object type, and two methods which serialize and deserialize objects of that type. In C# these method are represented by two delegates, and are also passed into the SerializerOperationMap as two separate parameters. In Java, the two methods are represented by standard methods on classes which implement the IObjectSerializer interface, and a single IObjectSerializer compliant object is passed into the SerializerOperationMap. See the following examples of equivalent methods which serialize and deserialize integer data types...

C#

private void SerializeInt32(object inputObject, XmlWriter writer) { Int32 inputInt32 = (Int32)inputObject; writer.WriteString(inputInt32.ToString(defaultCulture)); } private object DeserializeInt32(XmlReader reader) { return Convert.ToInt32(reader.ReadString(), defaultCulture); }

Java

private class IntegerSerializer implements IObjectSerializer<Integer> { @Override public void Serialize(Integer inputInteger, XMLStreamWriter writer) throws XMLStreamException { writer.writeCharacters(String.format(defaultLocale, "%d", inputInteger)); } @Override public Integer Deserialize(SimplifiedXMLStreamReader reader) throws XMLStreamException { Integer returnInteger = Integer.parseInt(reader.ReadString()); // Consume data end tag (e.g. </Data>) reader.ReadEndElement(); return returnInteger; } }

Extending the Class

MethodInvocationSerializer is designed to be extensible, and allow users to easily add support for serializing their own objects. The class can be extended and customized in the following ways...

  • Serializing additional / custom classes - You can add support for serializing new classes by calling the AddMapping() method in the ISerializerOperationMap object which is injected into the MethodInvocationSerializer constructor. If new classes are supported in both C# and Java, the 'serializedType' parameter of the AddMapping() method must match in the C# and Java sides.
  • Overridding serialization of natively supported data types - You can override the serialization of the natively supported data types by using the UpdateMapping() method in the ISerializerOperationMap object which is injected into the MethodInvocationSerializer constructor.
  • Overriding XML tags - The XML tags written during serialization are contained in protected member variables which can be overridden if deriving from the MethodInvocationSerializer class.
  • C# Support for IXmlSerializable objects - For C#, the MethodInvocationSerializer class can serialize any class which implements the IXmlSerializable interface. Support for serializing these objects can be added using the AddIXmlSerializableOperations() method.

For a detailed example, see the included SampleApplication3 project.

FileRemoteSender and FileRemoteReceiver

These classes use the file system to transport serialized method invocations. Whilst their use between remote systems may be limited (they would need to use a shared directory accessible by both systems, and being disk-bound their performance would not be as good as the TCP or ActiveMQ senders and receivers), they are useful for development and testing. They are lightweight, have no dependency on underlying components (ActiveMQ broker or network), and can be easily substituted by other senders and receivers in production code.

Both classes require paths to 2 files to be provided in the constructor. The 'messageFilePath' parameter is used to specify the full path to the file which will be used to store the messages sent from sender to receiver. The parameter 'lockFilePath' specifies a file which is used as a lock indicator between the 2 classes, to prevent the FileRemoteReceiver trying to read from the message file before the FileRemoteSender has completed writing.

The FileRemoteReceiver class also requires a 'readLoopTimeout' parameter be provided to the constructor. This parameter specifies the number of milliseconds to wait between successive checks for a new incoming message (determined by the presence of the message file). By varying this parameter you can control the balance between responsiveness of the receiver, and CPU usage. Low values are suitable for situations requiring fast response, or high message throughput, at the expense of higher CPU (and disk) usage caused be the FileRemoteReceiver checking the file system for new message files more frequently.

TcpRemoteSender and TcpRemoteReceiver

These classes provide transport of serialized method invocations via a TCP network. Both classes have in-built fault tolerance, and are able to recover from disconnects caused by network outages, temporarily disconnected network cables, and so forth.

Parameters

The TcpRemoteSender class requires constructor parameters 'ipAddress' and 'port', to specify the IP address and port to attempt to connect to. The TcpRemoteReceiver also requires the 'port' parameter to specify which port to listen on for incoming connections. Both classes require parameters 'connectRetryCount' and 'connectRetryInterval'. 'connectRetryCount' specifies the number of times the classes should attempt to retry when either initially connecting (as a result of the Connect() method), or reconnecting as a result of a network disconnect. The 'connectRetryInterval' parameter specifies the number of milliseconds to wait between each successive retry.

The TcpRemoteSender 'acknowledgementReceiveTimeout' parameter specifies the amount of time in milliseconds to wait for a message acknowledgment before reconnecting and retrying the send operation. Parameter 'acknowledgementReceiveRetryInterval' controls how long to wait (in milliseconds) between successive checks for the acknowledgement. As with FileRemoteReceiver parameter 'readLoopTimeout', adjusting the value of this parameter controls the balance between speed of response and CPU usage.

The TcpRemoteReceiver class additionally requires parameter 'receiveRetryInterval', specifying the number of milliseconds to wait between successive checks for a new incoming message. This parameter controls a balance between responsiveness and CPU usage similar to the FileRemoteReceiver 'readLoopTimeout' parameter. The Java TcpRemoteReceiver additionally requires parameter 'socketReadBufferSize'. In C#, because the amount of data buffered in the network can be discerned from the TcpClient.Available property before it is read, the size of the byte array the data is read into can be set dynamically based on the value returned from TcpClient.Available. The Java SocketChannel class cannot preemptively tell how much data is buffered in the network, and hence the size of the local buffer(s) which the data is read into must be predefined. This parameter controls the size of these buffers allocated to read the data from the network. The value of this parameter should be set according to the use case. If it is typically expected large messages (say 10K) will be received, the parameter should be set to match the approximate expected (delimited) message size (e.g. 10,254 bytes), to minimize the need to create additional buffers when reading and parsing messages.

Evolution of Design

Initially, the TcpRemoteSender class simply wrapped the serialized method invocation (message) with start and end delimiting bytes before sending via TCP. This meant that the contents of the message needed to be scanned, and any instances of the end delimiter within the message themselves delimited (to differentiate end delimiter bytes within the message from the actual end delimiter). This was initially implemented by simply repeating any end delimiter bytes found within the message. The issue with this design was that the underlying TCP subsystem does not provide any guarantee nor indication of when the complete delimited message had been received from the network. Hence when the TcpRemoteReceiver parsed the received message, it was impossible to tell whether an end delimiter byte was indeed the message end delimiter, or half of a repeated delimiter of the end delimiter within the message body, with additional bytes still to be received. The solution to this problem was to include a message header which indicated the size of the message body. This way the TcpRemoteReceiver would know exactly how many bytes of the message body to expect to read. The size header is implemented as an 8 byte long data type (encoded as little endian).

After real-world testing, it was discovered that the underlying TCP classes (TcpClient/TcpListener in C#, and SocketChannel/ServerSocketChannel in Java) would not immediately detect a disconnect in the underlying network, and would continue to accept read and write operations even though the network was disconnected. This was especially the case when the disconnect occurred at the remote rather than the local end of the network. It meant messages could be sent from the TcpRemoteSender class and then never arrive at the TcpRemoteReceiver, with no exception generated. The solution to this problem was to implement an acknowledgement. The acknowledgement consists of a single byte which is sent back to (and asserted by) the TcpRemoteSender from the TcpRemoteReceiver after receiving a complete message.

The only remaining problem was that if a disconnect occurred after the TcpRemoteReceiver received a message, but before it sent back an acknowledgement, the TcpRemoteSender would never receive the acknowledgement, and retry the original message. Having assumed the acknowledgement was received successfully, the TcpRemoteReceiver would process the original message, only to subsequently receive a duplicate copy of it. To address this problem, the message header was altered to include a message sequence number. The TcpRemoteSender increments the sequence number after each successfully acknowledged send operation. The TcpRemoteReceiver stores the sequence number of the last message it received. If the sequence number of the currently received message matches the last, the current message is discarded. The sequence number is implemented as a 4 byte integer (encoded as little endian) immediately after the start delimiter.

The following diagram shows the byte layout of a complete delimited message (the string "<Data>ABC</Data>")...

TcpRemoteSender Message Format

Fault Tolerance

In addition to the message header and acknowledgement mechanisms described above which are used to guarantee message delivery, both the TcpRemoteSender and TcpRemoteReceiver are able to recover from network disconnects. A network disconnect is determined by the TcpRemoteSender failing to receive a message acknowledgement, or by either class receiving an exception from the underlying TCP layer classes. Typically the exceptions caused by a network disconnect are the System.IO.IOException and System.Net.Sockets.SocketException exceptions in C#, and exceptions deriving from java.io.IOException in Java. Both classes are designed to handle all possible exceptions generated by the underlying TCP layer classes. In the case of a disconnect, the TcpRemoteSender will attempt to reconnect to the configured IP address and port, and then resend the message. The TcpRemoteReceiver will listen for, and accept a new inbound connection on the configured port. The number of reconnection attempts, and the time between them can be controlled by constructor parameters 'connectRetryCount' and 'connectRetryInterval'. If the connection cannot be re-established after the configured number of retries, the TcpRemoteSender or TcpRemoteReceiver class itself will throw an exception.

Limitations

The TcpRemoteReceiver continually checks for new inbound connections on the configured port. If a new connection is detected, it is assumed that a disconnect error occurred at the TcpRemoteSender's end of the network, and that the TcpRemoteSender is attempting to reconnect. As such, the TcpRemoteReceiver will always accept such a pending connection, and close the current connection. As the TCP subsystem is a shared resource, this does leave the TcpRemoteReceiver class open to interferences from external processes and applications. If an external application attempts to connect to the port that the TcpRemoteReceiver is listening on, the TcpRemoteReceiver will accept that connection and abandon its current connection. It is expected that in environments where external applications cannot be restricted, that appropriate firewalling or network restriction be put in place, especially if the TcpRemoteReceiver is used on a public or external network interface.

Once the TcpRemoteReceiver class starts reading part of a new message from the network, it enters into a while loop which continues reading until the complete message is read and parsed (inside private method SetupAndReadMessage() in both C# and Java). This while loop does not contain any wait interval between successive iterations of the loop. Hence, in the case the TcpRemoteSender sends just part of a message and then has a fatal exception, the TcpRemoteReceiver will get stuck inside this while loop. This could result in high CPU usage, and affect the responsiveness of the host machine.

C# and Java Implementation Differences

Checking the Network for Available Data

The major difference between C# and Java implementations is in the TcpRemoteReceiver class, and results from the aforementioned inability for the Java SocketChannel class to preemptively tell how much data is buffered in the network. In C# the TcpClient.Available property can be read to determine if any data has been received by the network, and this information used to decide whether to start reading and parsing the data, as per the following code in the Receive() method...

// Check if any data has been received from the TcpRemoteSender, and handle and retry if an exception occurs int availableData = 0; try { availableData = client.Available; } catch(Exception e) { availableData = HandleExceptionAndCheckAvailableData(e); } // If data has been received, attempt to read and parse it, and handle and retry if an exception occurs if (availableData != 0) { MessageParseState parseState = MessageParseState.StartOfMessage; Queue<byte> messageBytes = null; // Holds the bytes which form the body of the message received try { messageBytes = SetupAndReadMessage(ref parseState, ref messageSequenceNumber); }

In Java there is no equivalent of the 'Available' property, and it's only possible to tell if data is buffered in the network by calling the SocketChannel.read() method, and then accepting any resulting returned data. This means that this initially received data has to be stored, and then passed on to the SetupAndReadMessage(), if it is returned by the SocketChannel.read() method...

// Check if any data has been received from the socket channel, and handle and retry if an exception occurs ByteBuffer initialReceivedBytes = ByteBuffer.allocate(socketReadBufferSize); int receivedDataCount = 0; try { receivedDataCount = socketChannel.read(initialReceivedBytes); } catch (Exception e) { receivedDataCount = HandleExceptionAndReadReceivedData(initialReceivedBytes, e); } // If data has been received, attempt parse it and read and parse any remaining data, and handle and retry if an exception occurs if (receivedDataCount > 0){ MessageParseState parseState = MessageParseState.StartOfMessage; ByteBuffer messageBytes = null; // Holds the bytes which form the body of the message received SetupAndReadMessageParameters methodParameters = new SetupAndReadMessageParameters(parseState, messageSequenceNumber); try { messageBytes = SetupAndReadMessage(initialReceivedBytes, methodParameters); }

This also means that the SetupAndReadMessage() method is more complex in Java, as it needs to handle the initially received data passed into it. Also, as explained above, the lack of an equivalent method to TcpClient.Available means that temporary buffers storing data read from the network have to be preallocated to a fixed size (specified by constructor parameter 'socketReadBufferSize'), rather than being preallocated to the exact amount of data available. The consequence of this is that extra code is required in the SetupAndReadMessage() method to manage allocation of new buffers, and tracking the position within the buffer of data that has already been parsed (in C# this is not required, as the buffer is preallocated to the correct size, and hence the entire buffer can be parsed in a single operation)...

// Parse the bytes in parameter initialReceivedBytes if (initialReceivedBytes != null) { ParseMessageData(initialReceivedBytes.array(), 0, initialReceivedBytes.position(), messageSequenceNumberBytes, messageSizeHeaderBytes, parseMessageDataParameters); // If the initial buffer is full, create a new one if (initialReceivedBytes.hasRemaining() == false) { tempBuffer = ByteBuffer.allocate(socketReadBufferSize); parseStartIndex = 0; } // Otherwise set the initially read ByteBuffer to member tempBuffer, and update the parse start index to the correct position else { tempBuffer = initialReceivedBytes; parseStartIndex = tempBuffer.position(); } // Copy primitive parameters back to the parameter container methodParameters.parseState = parseMessageDataParameters.parseState; } else { tempBuffer = ByteBuffer.allocate(socketReadBufferSize); } // Continue to read until a complete message has been received, unless a cancel request has been received or there is a pending connection (i.e. TcpRemoteSender has reconnected due to an error) while ((cancelRequest == false) && (PendingConnectionExists() == false) && (methodParameters.parseState != MessageParseState.ReadCompleteMessage)) { parseLength = socketChannel.read(tempBuffer); ParseMessageData(tempBuffer.array(), parseStartIndex, parseLength, messageSequenceNumberBytes, messageSizeHeaderBytes, parseMessageDataParameters); // If the temporary buffer is full, create a new one if (tempBuffer.hasRemaining() == false) { tempBuffer = ByteBuffer.allocate(socketReadBufferSize); parseStartIndex = 0; } // Otherwise update the parse start index to the correct position else { parseStartIndex = parseStartIndex + parseLength; } // Copy primitive parameters back to the parameter container methodParameters.parseState = parseMessageDataParameters.parseState; }

Pass By Reference Method Parameters

In the TcpRemoteReceiver class, the code initiated by the SetupAndReadMessage() method steps through a number of nested while, if, and for statements as it reads data piecewise from the network and parses it. In C#, in order to make the code more readable, the read and parse operation is wrapped in a separate private method ReadAndParseMessageData(), which is called multiple times in a while loop in SetupAndReadMessage(), until the complete message is read. Parameters that are used in the reading process, which must be maintained between calls to ReadAndParseMessageData(), are passed as ref parameters (causing pass by reference).

In Java, parameters cannot be passed by reference. Hence to achieve the same effect, primitive data types are wrapped in container objects, and then passed into the relevant methods. The ParseMessageData() method (roughly equivalent to ReadAndParseMessageData() in C#) takes primitive parameters wrapped in private container class ParseMessageDataParameters. Similarly method SetupAndReadMessage() requires primitive type parameters be wrapped in an instance of private class SetupAndReadMessageParameters. Parameters must be wrapped in the container class before calling the method, and must also be copied from the container back into the relevant variables once the method call is complete. An example of this occurs where SetupAndReadMessage() is called from the TcpRemoteReceiver.Receive() method...

// If data has been received, attempt parse it and read and parse any remaining data, and handle and retry if an exception occurs if (receivedDataCount > 0){ MessageParseState parseState = MessageParseState.StartOfMessage; ByteBuffer messageBytes = null; // Holds the bytes which form the body of the message received SetupAndReadMessageParameters methodParameters = new SetupAndReadMessageParameters(parseState, messageSequenceNumber); try { messageBytes = SetupAndReadMessage(initialReceivedBytes, methodParameters); } catch (Exception e) { messageBytes = HandleExceptionAndRereadMessage(e, methodParameters); } // Copy primitive parameters back to their original variables parseState = methodParameters.parseState; messageSequenceNumber = methodParameters.messageSequenceNumber;

Unit Tests

Both classes, in particular the TcpRemoteReceiver, can have lengthy and complex series of interactions with the underlying TCP classes in unit tests. This is especially the case when testing disconnect and reconnect scenarios. In C#, the expected mock interactions are set sequentially using the NMock2.Expect class. A single call to an Expect class method defines both the expected values passed to a mock, and also the expected number of calls. As such, even though there are often many expected mock interactions and the test code is lengthy, it's relatively easy to follow through the expected sequence of mock interactions, and match against the code of the TcpRemoteReceiver class. For example...

using (mocks.Ordered) { SetConnectExpectations(); // Set expectations for receiving first message SetBeginMessageReceiveExpectations(testMessageByteArray.Length); Expect.Once.On(mockNetworkStream).Method("Read").With(new byte[testMessageByteArray.Length], 0, testMessageByteArray.Length).Will(new SetNamedParameterAction("buffer", testMessageByteArray), Return.Value(testMessageByteArray.Length)); Expect.Once.On(mockTcpListener).Method("Pending").WithNoArguments().Will(Return.Value(false)); Expect.Once.On(mockNetworkStream).Method("WriteByte").With((byte)6); // Set expectations for receiving duplicate message SetBeginMessageReceiveExpectations(testMessageByteArray.Length); Expect.Once.On(mockNetworkStream).Method("Read").With(new byte[testMessageByteArray.Length], 0, testMessageByteArray.Length).Will(new SetNamedParameterAction("buffer", testMessageByteArray), Return.Value(testMessageByteArray.Length)); Expect.Once.On(mockTcpListener).Method("Pending").WithNoArguments().Will(Return.Value(false)); Expect.Once.On(mockNetworkStream).Method("WriteByte").With((byte)6); // Set expectations for receiving next message SetBeginMessageReceiveExpectations(secondMessageByteArray.Length); Expect.Once.On(mockNetworkStream).Method("Read").With(new byte[secondMessageByteArray.Length], 0, secondMessageByteArray.Length).Will(new SetNamedParameterAction("buffer", secondMessageByteArray), Return.Value(secondMessageByteArray.Length)); Expect.Once.On(mockTcpListener).Method("Pending").WithNoArguments().Will(Return.Value(false)); Expect.Once.On(mockNetworkStream).Method("WriteByte").With((byte)6); }

The mockito package used for mocking in the Java unit tests is different, in that expected values passed to and returned from a mock, and the expected number of calls to mock methods are specified using separate mockito methods (when() and verify()). Also if different return values are required from separate calls to a mock, the different return values must be specified in the same when() statements, even if they occur far apart in the TcpRemoteReceiver method being tested. The end result is that the Java unit tests, whilst extensive, can be hard to trace through and maintain. See the following example where return values from sequential calls to the mockSocketChannel.read() method must be specified out of order. Additionally the verify() statements must be specified separately...

@Test public void ReceiveIOExceptionDuringSubsequentReadRereceiveSuccessTest() throws Exception { // Tests receiving a message, where an IO exception occurs during a second read operation when(mockServerSocketChannel.isOpen()).thenReturn(false); when(mockServerSocketChannel.accept()) .thenReturn(mockSocketChannel) .thenReturn(null) .thenReturn(null) // The below return simulates the new pending connection .thenReturn(mockSocketChannel) .thenReturn(null); // Mock the 1st and 3rd calls to read(), first returning just 10 bytes of the message, and then returning the whole message (after reconnecting) when(mockSocketChannel.read(getByteBufferSubSet(testMessageByteArray, 0, 0, socketReadBufferSize, false))) .thenAnswer(new ReadMethodAnswer(getByteBufferSubSet(testMessageByteArray, 0, 10, socketReadBufferSize, true), 10)) .thenAnswer(new ReadMethodAnswer(testMessageByteArray, testMessageByteArray.remaining())); // Mock the 2nd call the read(), causing an exception when(mockSocketChannel.read(getByteBufferSubSet(testMessageByteArray, 0, 10, socketReadBufferSize, false))) .thenThrow(new IOException("Mock IOException.")); testTcpRemoteReceiver.Connect(); String receivedMessage = testTcpRemoteReceiver.Receive(); verify(mockServerSocketChannel, times(2)).isOpen(); verify(mockServerSocketChannel, times(2)).open(); verify(mockServerSocketChannel, times(2)).bind(new InetSocketAddress(testPort), 1); verify(mockServerSocketChannel, times(2)).configureBlocking(false); verify(mockServerSocketChannel, times(6)).accept(); verify(mockSocketChannel, times(2)).configureBlocking(false); verify(mockSocketChannel, times(3)).read(any(ByteBuffer.class)); verify(mockSocketChannel).close(); verify(mockSocketChannel).write(ByteBuffer.wrap(new byte[] { 6 })); verifyNoMoreInteractions(mockServerSocketChannel); verifyNoMoreInteractions(mockSocketChannel); assertEquals("<Data>ABC</Data>", receivedMessage); }

The C# NetworkStream.Read() method, and Java's SocketChannel.read() both pass buffers of bytes which are modified within the method call, with the changes persisted after the call is finished. To implement this behaviour on mocks of those objects, the NMock2 SetNamedParameterAction class, and implementation of mockito's Answer class were required.

Use of TcpRemoteSender and TcpRemoteReceiver outside of MethodInvocationRemoting

As the parameters passed to the TcpRemoteSender.Send() and TcpRemoteReceiver.Receive() methods are standard strings, these classes may be useful outside the context of the MethodInvocationRemoting framework, where there is a need to be able to send data from one machine to another via TCP and be able to handle and recover from network disconnects.

RemoteSenderCompressor and RemoteReceiverDecompressor

These classes are used to compress and decompress serialized method invocations and return values before and after sending or receiving. Their implementation follows a decorator pattern approach, where their constructors require an implementation of the IRemoteSender or IRemoteReceiver interfaces. The RemoteSenderCompressor requires a class implementing interface IRemoteSender to be set on its constructor (e.g. one of the ActiveMQ, File, or TCP sender classes). When the Send() method is called, the RemoteSenderCompressor compresses the message using the gzip algorithm, before calling Send() on the underlying IRemoteSender class to physically send the message. As RemoteSenderCompressor itself implements IRemoteSender, it can be set on the constructor of the MethodInvocationRemoteSender and MethodInvocationRemoteReceiver classes. Similarly the constructor of RemoteReceiverDecompressor requires an implementation of IRemoteReceiver. When Receive() is called, the RemoteReceiverDecompressor calls Receive() on the underlying IRemoteReceiver class before the decompressing and returning the received message. RemoteReceiverDecompressor also itself implements IRemoteReceiver.

RemoteReceiverDecompressor contains optional constructor parameter 'decompressionBufferSize', which specifies how much data (in bytes) should be read from the internal decompression stream in each read operation. If the buffer size is much lower than the total compressed message size, then new buffers must be created multiple times during the decompression process which incurs a performance overhead. On the other hand, if the buffer size is much larger than the total compressed message size, then the memory allocated for the buffer is wasted. Hence, where possible the buffer size should be set as close as possible to the expected size of the compressed messages.