Language

The Free and Open Productivity Suite
Released: Apache OpenOffice 4.1.15

Streaming interfaces

OpenOffice.org

Abstract

This draft introduces the streaming interfaces (package com.sun.star.uno.io) and gives some examples about how they can be used.

Note: In some pictures there is a ignoreable 2 in the interface names (e.g. XInputStream2).

Contents

Introduction
The interfaces
  • The 1st level : XInputStream/XOutputStream
  • The 2nd level : XActiveDataSink/XActiveDataSource
  • Chaining
  • Piping
  • The 3rd level : XActiveDataControl
  • The 4th level : XConnectable
  • Errorhandling

Standard services

Introduction

This chapter introduces some notions that you will come across later in this document.
A datasource is an object, that has some data. A datasink is an object that gets some data. Note that no assumption is made about how data is transfered, only the direction of dataflow is fixed (Fig 1.). Note also, that one object can be datasource and datasink at the same time, for example a filter receives data from somewhere (=sink), modifies this data, and broadcasts this data again (=source).
Rahmen1
Datasources and datasinks can be active or passive.
1) A datasink is active when in the thread of execution the datasink calls a method of datasource to get some data. In this case, the datasource is passive.
Rahmen2
2) A datasource is active when in the thread of execution the datasource calls a method of datasink to write some data. In this case, the datasink is passive.
Datasink passive diagram
Streams offer fixed interfaces to do the data transfer. At the lowest level, streams transport bytes. Data does not need to be transported in one piece but can be sliced into multiple pieces. To reflect the two cases above, there exist inputstreams and outputstreams. Inputstreams are used in the active sink-passive source scenario and outputstreams are used in the active source-passive sink scenario.

The interfaces

The design of the streaming interfaces had three major goals :
Below you can find the results of the design.
There is a kind of a four-level hierarchy indicated by the dotted lines.
Stream interfaces
Fig 5. The new streaming interfaces in a 4 level structure

The 1st level : XInputStream/XOutputStream

A basic streaming facility can be implemented with XInputStream or XOutputStream alone.
Example (see also the introduction chapter) :
There is an object source that has some data, and an object sink that wants to get some data. There are two possibilities to fulfil the task :
Sample implementation ( for simplicity no error handling here ).
//simple scenario : 200 bytes shall be transfered from source to target
//active sink, passive source scenario.
class source : public XInputStream, public ....
{
     ...
     // set the actual position to zero
     source() { index = 0; }

     // XinputStream.readBytes()
     INT32 readBytes( Sequence<unsigned char> &seq , INT32 nBytes )
     {
          // beware eof !
	  INT32 nCopy = min( nBytes, 200-index );

          // allocate memory for the requested number of bytes
          seq.realloc( nCopy );

          // put the data into the sequence
          memcpy( seq.getArray() , &acData[index] , nCopy );

          // increase actual position
          index += nCopy;

          return nCopy;
     }

     // XInputStream.skipBytes()
     INT32 skipBytes( INT32 nBytes )
     {
          // beware eof !
          INT32 nCopy = min( nBytes, 200-index );

          // increase actual position
          index += nCopy;

          return nCopy;
      }

     // XInputStream.available()
     INT32 available()
     {
        return 200-index;
     }

     // XInputStream.closeInput()
     void closeInput()
     {
         // no special closing procedure in this simple example
     }

     ...
private:
     unsigned char acData[200];
     INT32 index;
};


// the active datasink
// No error or exception handling for simplicity
class sink : public XXXXXXX, .....
{
     // method of interface XXXXXXX
     void readFromStream( const Reference< XInputStream > &r )
     {
         Sequence<unsigned char> seq(10);

         BOOL bContinue = TRUE;

         // number of bytes that have been read
         INT32 nRead;

         while( bContinue ) {
             // read from the inputstream in 10 byte steps
             nRead = r->readBytes( seq , 10 )

             // ... do something with seq

             // check, if eof has been reached
	      bContinue = ( 10 == nRead );
         }
         // close the stream
         r->closeInput();
     }
}

///in main :
{
      Reference< XInterface > source;
      Reference< XInterface > sink;

      // instantiate source and sink through some service.
      ....

      // query the inputstream
      Reference< XInputStream > input_stream( source , UNO_QUERY );

      // query the sink
      Reference< XXXXXXX > some_interface( sink , UNO_QUERY );

      // do the reading
      some_interface->readFromStream( input_stream );
}

The 2nd level : XActiveDataSink2/XActiveDataSource2

To make the above sample code more flexible, the sink class may implement XActiveDataSink. Calling the setInputStream()-method informs the sink from where to read the data. Thus the code becomes more generic, (sink is only required to support the small XActiveDataSink rather than Reference< XXXXXXX > ).
When to start the data transfer, depends on the implementer of XActiveDataSink.
Here there are three possibilities.
  1. In the call to XActiveDataSink.setInputStream() the whole data transfer is done. This is the worst possibility, because a user may not expect a set-method to behave this way.
  2. The call to XActiveDataSink.setInputStream() starts a thread that does the data transfer. This is a better solution, because the call is not blocking. Nevertheless the user must still be aware, that everything is well prepared for transfer when the call is made.
  3. The call to XActiveDataSink2.setInputStream() just informs the object where to read from. The start of data transfer is done over another method. This may be the start()-method of XActiveDataContro2 (see below).
The 3rd solution is by far the best one, because 1) + 2) are more difficult to reuse and may lead to deadlocks, etc.
Note that everything said about XActiveDataSink also holds for XActiveDataSource.

2.2.1 Chaining

An interesting application of the first 2 interface-levels is the chaining concept.
Imagine you have written the XInputStream implementation OFile, that offers raw data stored in a file. Then you have an arbitrary application, that has implemented XActiveDataSource. After you have connected both objects via setInputStream(), you have the connection shown in Fig. 6a.
Rahmen6
Now a file shall be read, that uses a different file format. One could either modify OFile or application to do the conversion, but there is a better solution. An independent service OConverter can be used, it implements XActiveDataSink and XInputStream. Now the input stream of OFile can be plugged into OConverter and the input stream of OConverter can be plugged into application (Fig 6.b).
Note that it is not necessary to change either application or OFile, only the plugging before the start of data transfer must be adapted. If desired, this may be decided at runtime.
Note also, that you can easily insert more converters into the chain.
The following code samples show how plugging is done.
/// Plugging for Fig 6b (without error handling)
{
     Reference<XInterface > application;
     Reference<XInterface > OConverter;
     Reference<XInterface > OFile;

     // instantiate the three objects via services, put them into the above references
     .....

     // query the interfaces
     Reference< XInputStream > aFileInputStream( OFile , UNO_QUERY );
     Reference< XInputStream > aConverterInputStream( OConverter , UNO_QUERY );
     Reference< XActiveDataSink > aConverterSink( OConverter , UNO_QUERY );
     Reference< XActiveDataSink > aApplicationSink( application , UNO_QUERY );

     // plug the connection
     aConverterSink.setInputStream( aFileInputStream );
     aApplicationSink.setInputStream( aConverterInputStream );

     // start data transfer
     ...
}
The following figure shows the resulting calling sequence for a readBytes()-call for both cases.
Rahmen7
Rahmen8
Note that the order of plugging may be significant if data transfer is started with the setInputStream method. Again the last line of the above sample code :
a)
{
   // plug the connection
   aConverterSink.setInputStream( aFileInputStream );        // first connect passive parts.
   aApplicationSink.setInputStream( aConverterInputStream ); // active player is connected last,
                                                             // no problems can arise
}

b)
{
   // plug the connection
   aApplicationSink.setInputStream( aConverterInputStream ); // application starts to read but OConverter
   // is not connected to anything.
   aConverterSink.setInputStream( aFileInputStream );
}

The problem can be avoided :

  1. Use code sample a) instead of b) when plugging. ( Responsibility is left to the user ). When b) is used, OConverter will throw an exception.
  2. OConverter can be designed, so that it can cope with the problem. If it is not connected to anything, but its read()-method is called, it could wait in a condition up to the moment when connection is established. Note that this is not an ideal solution, if the connection never gets established there remains a dead thread without any sign of an error. It also must be implemented again and again for each converter.
  3. Do not start the data transfer with these set-methods. Better use XActiveDataControl (see below)
Note : It can be quite difficult to implement a OConverter, that modifies the number of bytes to read. The skipBytes(), readBytes() and available()-methods must always return the requested number of bytes, in such cases, converters must do at least some buffering on their own. For such applications it is in general easier to implement an output stream (and use pipes and threads to get the desired interface, see below). However this may be too expensive in some cases.

2.2.2 Piping

Motivation :
For a lot of applications it is necessary to have more than one active player in the game. Imagine you have a datasource that can only write actively and a datasink that can only read actively and you want to connect them. This can be done by a pipe. A pipe offers an OutputStream to write to and an InputStream to read from. Thus it must buffer the data that come in via XOutputStream.
Fig. 9 shows a concrete example for a pipe. A file x is copied into file y. OFileReader reads file x in certain slices. Each slice is written via the output stream to OPipe. OFileWriter gets actively data from OPipe and writes it directly into file y. The connection is plugged with standard components (which still need to be implemented), they can be used in other scenarios independent of each other.
Rahmen9
The following source code shows how the above connection is established (for simplicity without error handling).
{
     Reference< XInterface > OFileWriter, OPipe, OFileReader;

     // instantiate them via a standard service
     ....

     // get the interfaces
     Reference< XActiveDataSink > sink( OFileWriter, UNO_QUERY );
     Reference< XOutputStream > pipeOut( OPipe , UNO_QUERY );
     Reference< XInputStream > pipeIn( OPipe , UNO_QUERY );
     Reference< XActiveDataSource > source( OFileReader , UNO_QUERY );

     // do the plugging
     sink.setInputStream( pipeIn );
     source.setOutputStream( pipeOut );
     // now work with the connection ( sink and source )
}
Note : You can insert further filters and converters via chaining. For example you can insert an instances of OConverter (above example) between OFileWriter and OPipe.
Note : A pipe can also be seen as a mechanism to transform an output stream into an input stream at the costs of the buffer resources.
Note : By exchanging OFileWriter with an object that implements XActiveDataSink and XActiveDataSource, one could add a further pipe. This shows the flexibility of the concept.

The 3rd level : XActiveDataControl

The question 'when should data transfer start ?' is not solved sufficiently. One can say that as soon as the last connection has been plugged, the data starts to flow. That is not acceptable for some applications. An generic interface is needed, which can one use to initiate the data transfer. This is the role of XActiveDataControl.
XActiveDataControl should be implemented with XActiveDataSink or XActiveDataSource, thus each active player in the game should implement it.
XActiveDataControl offers a non-blocking start()-method to initiate data transfer. The terminate()-method should abort the data transfer before it is normally finished. Usually this should be done by calling the close()-method of the connected streams. See error handling for further information.
Rahmen10
To startup the above sample pipe, the following code is necessary :
{
     Reference< XInterface > OFileWriter, OPipe, OFileReader;

     // instantiate them via a standard service
     ....

     // get the interfaces for plugging
     Reference< XActiveDataSink > sink( OFileWriter, UNO_QUERY );
     Reference< XOutputStream > pipeOut( OPipe , UNO_QUERY );
     Reference< XInputStream > pipeIn( OPipe , UNO_QUERY );
     Reference< XActiveDataSource > source( OFileReader , UNO_QUERY );

     // plug ( data transfer is not started )
     sink.setInputStream( pipeIn );
     source.setOutputStream( pipeOut );

     // get the interfaces for starting
     Reference< XActiveDataControl > controlSink( OFileWriter );
     Reference< XActiveDataControl > controlSource( OFileReader );

     // start
     controlSink.start();   // order of starting is not important, the whole connection already exists.
     controlSource.start(); // it may be less expensive to start sink first, because OPipe
                            // may need less resources for buffering.
}

XActiveDataControl also offers a listener administration.

The 4th level : XConnectable

The held references in Fig. 10 are unidirectional. There is no generic way to get from OFileWriter to OFileReader. This maybe interesting for further streaming administration services.
Here now appears XConnectable on stage. Every class in the connection should implement XConnectable. Below (Fig.11 and Fig. 12) you find the extended version of the above examples.
Stream connected chain
Fig 11. Extended example (see fig. 6b). Each element in the connection implements XConnectable and connects to their neighbours. Thus it is possible to reach Application from OFile. The direction of the connection (successor/predecessor) is determined by the direction of dataflow.
Stream connected pipe
Fig 12. Extended example (see fig 10). Now every element of the connection is reachable with one arbitrary element.
The extra plugging work should not be left to the user. In order to get this working with the least effort for the user, the implementation of the XActiveDataSink.setInputStream() or XActiveDataSource2.setOutputStream() must be modified the following way:
OFileWriter::setInputStream( const Reference< XInputStream > &r )
{
     /// store the inputstream reference for later use
     m_in = r;

     /// does the instance provide a XDataSourceRef ?
     Reference< XConnectable > predecessor( r , UNO_QUERY );
     setPredecessor( predecessor );
}

application::setPredecessor( const Reference< XConnectable > &r )
{
     /// if the references match, nothing needs to be done
     if( m_pred != r ) {
         /// store the reference for later use
         m_pred = r;

         if( m_pred.is() ) {
              /// set this instance as the sink !
              m_pred.setSuccessor( this );
         }
     }
}

A similar implementation of for setSuccessor allows to get the desired references with no extra work for the user.
There now exists a ring reference, which raises a deallocation-problem. This can be solved sufficiently, when in close()-operations all references are explicitly deleted.
OConverter::closeInput()
{
     /// send the close to the chained partner
     getInputStream()->closeInput();

     //.... do some tiding up, that is necessary when closing.

     /// release all references
     setPredecessor( Reference< XConnectable >() );
     setSource( Reference< XConnectable >() );
     setInputStream( Reference< XInputStream >() );
}

Note that any object, which does not support this XConnectable mechanism will interrupt the connection.

The sense of XConnectable is maybe not very clear. Note that implementing this underlying connection of elements will allow to create standard services, that can help to establish and administrate connections of streams. This may be an important reason for using this new streaming interfaces, that's why you are strongly encouraged to implement them.
An use case may be a video stream, where the sink wants to change some settings at source, but the sink doesn't know the source. It can then use XConnectable to wander to the source, where it can query on a known interface.

2.5 Errorhandling

This chapter deals with abnormal conditions that may occur when using streams.
Stream connected pipe
Let's take the pipe example as a basis for discussion.
In the normal case both FileReader and FileWriter start to do their job. When FileReader has read all data from the file x and has written all data into the outputstream, it must call closeOutput on XOutputStream. This will disconnect OPipe and OFileReader, OFileReader may be deleted. When OFileWriter tries to read beyond eof, it realizes that it gets less bytes than questioned. This is interpreted by OFileWriter as eof sign. It then calls closeInput on the XInputStream and frees its own resources. The reference between OPipe and OFileWriter is removed, OFileWriter and OPipe may be deleted.
Possible error scenarios are :
  1. Error during reading file x (OFileReader). In that case OFileReader closes its internal resources, broadcasts an error to all listeners and calls closeOutput() on outputstream. Note that OPipe and OFileWriter cannot distinguish between a normal eof-close and a error close (unless they are listeners).
  2. Error during writing file y (OFileWriter). In that case, OFileWriter closes its internal resources, broadcasts an error to all listeners and calls closeInput() (which will disconnect OFileWriter from OPipe ) on inputstream. OFileReader is still writing on OPipe's outputstream. At any time after the closeInput() call, OPipe may throw an IOException, if OFileReader calls a write-method. OFileReader catches the exception, calls closeOutput() on the outputstream, which will disconnect OPipe from OFileReader. It may also broadcast an error to all listeners.
  3. Error in OPipe (e.g. buffer full). At any time after the exceptional state has occurred, OPipe will throw exceptions for every read call and every write call. OFileReader and OFileWriter must then call closeInput()/closeOutput() and broadcast an error.
  4. terminate()-method on OFileReader or OFileWriter is called. This can be handled very similar to the above error conditions, except that the terminated()-handler must be called.
Note that all above errors in general occur in a multi-threaded environment. Especially situations as that the terminate()-method of OFileWriter gets called when another thread is in the read-Method of OPipe must be handled correctly. In this case, terminate must call closeInput(), OPipe must return out of the reading thread with eof-signature (less bytes read than questioned) and OFileWriter must call only the terminated() but not the error() handler!.

Standard services

The following standard services have been implemented . Implementations can be found in udk/io/source/stm. The dll is stm.dll. All implementations are thread safe. However it is probably not very meaningful, if two threads read the same input stream at the same time which thread gets which data is then arbitrary.

Note: All streams must be explicitly closed in order to free their resources and release their ring references.

com.sun.star.uno.io.Pipe : implements XOutputStream, XInputStream and XConnectable

Basic implementation (see fig 12) to connect input stream and output stream. In general one thread is writing data into the pipe and another is reading from the pipe. The pipe does the necessary buffering in memory.

com.sun.star.uno.io.DataInputStream : implements XDataInputStream, XActiveDataSink and XConnectable

Provides the XDataInputStream functionality. Must be chained to another XInputStream (e.g. a pipe) to be meaningful.

com.sun.star.uno.io.DataOutputStream : implements XDataOutputStream, XActiveDataSource and XConnectable

Provides the XDataOutputStream functionality. Must be chained to another XOutputStream (e.g. a pipe) to be meaningful.

com.sun.star.uno.io.MarkableInputStream : implements XMarkableStream, XInputStream, XActiveDataSink and XConnectable

Provides the XMarkableStream functionality. When a mark is created, the implementations buffers all further read data. Must be chained to another inputstream to be meaningful.

com.sun.star.uno.io.MarkableOutputStream : implements XMarkableStream, XOutputStream, XActiveDataSource and XConnectable

Provides the XMarkableStream functionality. When a mark is created, the implementations buffers all further written data. The data is written further through the chain, when the mark is deleted. Even a flush will not have any effect. Must be chained to another outputstream to be meaningful.

com.sun.star.uno.io.ObjectInputStream : implements XObjectInputStream, XActiveDataSink and XConnectable

Provides XObjectInputStream functionality. All services of instances, that shall be read, must be registered with the global service manager. All services must implement XPersistObject. Must be chained to another XInputStream. Somewhere in the chain must exist an XMarkableStream. Be careful that all elements between this and XMarkableStream must not buffer data ( otherwise the markable stream becomes meaningless ) and one receives coincident results.

com.sun.star.uno.io.ObjectOutputStream : implements XObjectOutputStream, XActiveDataSource and XConnectable

Provides XObjectOutputStream functionality. All services, that shall be serialized, must implement XPersistObject. Must be chained to another XOutputStream. Somewhere in the chain must exist a XMarkableStream. Be careful that all elements between this and XMarkableStream must not buffer data ( otherwise the markable stream becomes meaningless) and one receives coincident results.

Author: Joerg Budischewski ($Date: 2004/11/27 07:50:15 $)
Copyright 2001 Sun Microsystems, Inc., 901 San Antonio Road, Palo Alto, CA 94303 USA.

Apache Software Foundation

Copyright & License | Privacy | Contact Us | Donate | Thanks

Apache, OpenOffice, OpenOffice.org and the seagull logo are registered trademarks of The Apache Software Foundation. The Apache feather logo is a trademark of The Apache Software Foundation. Other names appearing on the site may be trademarks of their respective owners.