Uploaded image for project: 'HPCC'
  1. HPCC
  2. HPCC-14654 Support parallel activity execution
  3. HPCC-397

Common up hthor/roxie/thor row streaming interfaces

    XMLWordPrintable

Details

    • Sub-task
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • None
    • 6.0.0
    • None
    • None

    Description

      @jakesmith @RussWhitehead @richardkchapman

      Curently hthor/roxie and thor use different interfaces for reading records. This makes it harder to get familiar
      with the code, and restricts the amount that code (e.g., activities and input buffering) can be shared between the
      engines.

      Currently HThor and roxie have the following structure

      ISimpleInputBase
      .IInputBase
      ..IHThorInput
      ..IRoxieInput (+IInterface) - means we have an extra vmt.

      And jlib/thorlcr have the following structure.
      IInterface
      .IRowStream - now used by several jlib functions.
      ..IExtRowStream - used for streaming from a file
      ..ITypedRowStream - prototype interface for c++ implemented activities
      ..IThorDataLink - thor lcr's record reading interface
      ..ISmartRowBuffer

      I would suggest we switch to some interfaces based on IRowStream (some of the intermediate interfaces could be combined):

      IInterface
      .IRowStream - now used by several jlib functions.
      ..IExtRowStream - used for streaming from a file
      ..ITypedRowStream
      ...ISteppedRowStream
      ....IEngineRowStream
      .....IHThorInput
      .....IRoxieInput
      .....IThorDataLink
      ..ISmartRowBuffer

      Here is a first cut of the suggested functions:

          // A stream of rows
          interface IRowStream : extends IInterface 
          {
              virtual const void *nextRow()=0;                      // rows returned must be freed
              virtual void stop(bool abort) = 0;                    // after stop called NULL is returned
      
              inline const void *ungroupedNextRow()
              {
                  const void *ret = nextRow();
                  if (!ret)
                      ret = nextRow();
                  return ret;
              }
          };
          bool nextGroup(ConstPointerArray & group, IRowStream * stream);
      
          // A stream of rows with type information
          interface ITypedRowStream : extends IRowStream 
          {
              virtual IOutputMetaData * queryOutputMeta() const = 0;
      
              inline bool isGrouped() 
              { 
                  IOutputMetaData * meta = queryOutputMeta();
                  return meta && meta->isGrouped();
              }
          };
      
          // All the functions needed to process rows once everything is initialized
          interface ISteppedRowStream : public ITypedRowStream
          {
              virtual const void * nextRowGE(const void * seek, unsigned numFields) { throwUnexpected(); }
              virtual void resetEOF() { }
          }
      
          // All the functions needed to process a stream - including creation/destruction
          interface IIEngineRowStream : public ISteppedRowStream
          {
              virtual void prestart(unsigned parentExtractSize, const byte *parentExtract) = 0;
              virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused) = 0;
              virtual IInputSteppingMeta * querySteppingMeta() { return NULL; }
              virtual bool gatherConjunctions(ISteppedConjunctionCollector & collector) { return false; }
          };
      
          struct IHThorInput : public IEngineRowStream
          {
              //virtual void ready() = 0; use start()
              //virtual void done() = 0; use stop()
              virtual void updateProgress(IWUGraphProgress &progress) const = 0;
           ;
      
      
          interface IRoxieInput : extends IEngineRowStream
          {
          // Additional functions need to cope with graphs.  I think these count as an implementation detail.
              virtual unsigned numConcreteOutputs() const { return 1; }
              virtual IRoxieInput * queryConcreteInput(unsigned idx) { assertex(idx==0); return this; }
      
              virtual void reset() = 0;
              virtual void checkAbort() = 0;              // I'm not sure why this is in this interface
              virtual unsigned queryId() const = 0;
      
              virtual unsigned __int64 queryTotalCycles() const = 0;
              virtual unsigned __int64 queryLocalCycles() const = 0;
              virtual IRoxieInput *queryInput(unsigned idx) const = 0;
              virtual IRoxieServerActivity *queryActivity() = 0;
              virtual IIndexReadActivityInfo *queryIndexReadActivity() = 0;
          };
      
          interface IExtRowStream: extends IRowStream
          {
              virtual offset_t getOffset() = 0;
              virtual void stop(CRC32 *crcout=NULL) = 0;            // could cause problems with stop(bool)?
              virtual const void *prefetchRow(size32_t *sz=NULL) = 0;
              virtual void prefetchDone() = 0;
              virtual void reinit(offset_t offset,offset_t len,unsigned __int64 maxrows) = 0;
          };
      
          interface IThorDataLink : extends IEngineRowStream
          {
          // information routines 
              virtual void getMetaInfo(ThorDataLinkMetaInfo &info) = 0;
              virtual CActivityBase *queryFromActivity() = 0; // activity that has this as an output
              virtual void dataLinkSerialize(MemoryBuffer &mb)=0;
              virtual unsigned __int64 queryTotalCycles() const=0;
          };
      
      
          //Not really sure what this is used for.
          interface ISmartRowBuffer: extends IRowStream
          {
              virtual IRowWriter *queryWriter() = 0;
          };
      
      

      Attachments

        Activity

          People

            richardkchapman Richard Chapman
            ghalliday Gavin Halliday
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: