Details
-
Bug
-
Status: Resolved
-
Not specified
-
Resolution: Duplicate
-
None
-
None
-
None
Description
Generating a dataset from ECL and passing it into embedded C++ code results in the entire dataset being coalesced and passed into the dataset. However, a dataset generated from C++ and passed into another embedded function will result in only the first partition of the dataset being available. Note this is not the local partition of the dataset but the first partition.
See ECL test code & test output below:
------ C++ DS Count: 200, ECL DS Count: 200 ------ Info C++ Generated Dataset 0 SLAVE #2 [192.168.56.102:20100]: Records Processed: 100 Starting at Index: 1 0 0 Info C++ Generated Dataset 0 SLAVE #1 [192.168.56.103:20100]: Records Processed: 100 Starting at Index: 1 0 0 Info ECL Generated Dataset 0 SLAVE #1 [192.168.56.103:20100]: Records Processed: 200 Starting at Index: 1 0 0 Info ECL Generated Dataset 0 SLAVE #2 [192.168.56.102:20100]: Records Processed: 200 Starting at Index: 1 0 0
ECL Code:
rrec := {integer key, integer fill}; STREAMED DATASET(rec) testDistributedWrite(STREAMED DATASET(rec) ds, VARSTRING name) := EMBED(C++ : distributed) #include <sstream> class DummyDataset : public RtlCInterface, implements IRowStream { public: DummyDataset(){} RTLIMPLEMENT_IINTERFACE const void* nextRow() override {return NULL;} void stop() override {} }; #body int64_t startingIndex = -1; size_t count = 0; const void* row = ds->nextRow(); while(row != NULL) { const int64_t* dat = static_cast<const int64_t*>(row); if (count == 0) { startingIndex = *dat; } row = ds->nextRow(); count++; } std::stringstream ss; ss << "Records Processed: " << count << " Starting at Index: " << startingIndex; ctx->addWuException(ss.str().c_str(), 0, 0, name); return new DummyDataset(); ENDEMBED; //------------------------------------------------------------------------------ // Test #1 - C++ Generation //------------------------------------------------------------------------------ STREAMED DATASET(rec) testGenDS() := EMBED(C++ : context,distributed,time) #include <sstream> class StreamDataset : public RtlCInterface, implements IRowStream { unsigned _count = 0; Linked<IEngineRowAllocator> _row_allocator; public: StreamDataset(IEngineRowAllocator* alloc):_row_allocator(alloc){} RTLIMPLEMENT_IINTERFACE const void* nextRow() override { if (_count >= 100) { return NULL; } _count++; unsigned recordLength = sizeof(int64_t)*2; RtlDynamicRowBuilder rowBuilder(_row_allocator); byte* row = rowBuilder.ensureCapacity(recordLength,NULL); int64_t* rowData = reinterpret_cast<int64_t*>(row); rowData[0] = _count; rowData[1] = 0; return rowBuilder.finalizeRowClear(recordLength); } void stop() override {} }; #body return new StreamDataset(_resultAllocator); ENDEMBED; ds := testGenDS(); testDistributedWrite(ds,'C++ Generated Dataset'); //------------------------------------------------------------------------------ // Test #2 - ECL Generation //------------------------------------------------------------------------------ ds2 := DATASET ( 100 * CLUSTERSIZE, TRANSFORM ( rec, SELF.key := COUNTER, SELF.fill := 0, SELF := [] ), DISTRIBUTED ); testDistributedWrite(ds2,'ECL Generated Dataset'); output('C++ DS Count: ' + count(ds) + ', ECL DS Count: ' + count(ds2));
Attachments
Issue Links
- relates to
-
HPCC-13036 Introduce a new activity for a streaming output
-
- Resolved
-