#include "hip/hip_runtime.h" #include <thrust/host_vector.h> #include <thrust/device_vector.h> #include <thrust/sequence.h> #include <thrust/sort.h> #include <thrust/gather.h> #include <thrust/scan.h> #include <iostream> #include <cstdio> #include "lista.h" #include "bpreds.h" #define WARP_SIZE 32 #define TREE_NODE_SIZE WARP_SIZE #define TREE_FANOUT (TREE_NODE_SIZE + 1) #define N_MULTI_P 16 #define BLCK_PER_MP_create 256 // blocks per multiprocessor during tree creation #define BLCK_PER_MP_search 512 // blocks per multiprocessor during tree searching #define WAPRS_PER_BLCK_join 8//16 // blocks per multiprocessor during tree creation #define BLCK_PER_MP_join 512//256 // blocks per multiprocessor during tree searching #define THRD_PER_BLCK_create TREE_NODE_SIZE #define BLCK_PER_GRID_create (N_MULTI_P * BLCK_PER_MP_create) #define THRD_PER_BLCK_search TREE_NODE_SIZE #define BLCK_PER_GRID_search (N_MULTI_P * BLCK_PER_MP_search) #define THRD_PER_GRID_search (THRD_PER_BLCK_search * BLCK_PER_GRID_search) #define THRD_PER_BLCK_join (WARP_SIZE * WAPRS_PER_BLCK_join) #define BLCK_PER_GRID_join (N_MULTI_P * BLCK_PER_MP_join) #define THRD_PER_GRID_join (THRD_PER_BLCK_join * BLCK_PER_GRID_join) #define TEST_MAX 100 typedef int IKeyType; typedef int Record; typedef struct { int keys[TREE_NODE_SIZE]; } IDirectoryNode; typedef struct { Record records[TREE_NODE_SIZE]; } IDataNode; typedef struct { IDataNode* data; unsigned int nDataNodes; IDirectoryNode* dir; unsigned int nDirNodes; } CUDA_CSSTree; struct to_neg { __host__ __device__ bool operator()(const int &r1) { if(r1 < 0) return 1; return 0; } }; __host__ __device__ unsigned int uintCeilingLog(unsigned int base, unsigned int num) { unsigned int result = 0; for(unsigned int temp = 1; temp < num; temp *= base) result++; return result; } __host__ __device__ unsigned int uintCeilingDiv(unsigned int dividend, unsigned int divisor) { return (dividend + divisor - 1) / divisor; } __host__ __device__ unsigned int uintPower(unsigned int base, unsigned int pow) { unsigned int result = 1; for(; pow; pow--) result *= base; return result; } __device__ int getRightMostDescIdx(int tree_size, int nodeIdx) { int tmp = nodeIdx * TREE_NODE_SIZE + TREE_FANOUT; int n = uintCeilingLog(TREE_FANOUT, uintCeilingDiv(TREE_NODE_SIZE * tree_size + TREE_FANOUT, tmp)) - 1; int result = (tmp * uintPower(TREE_FANOUT, n) - TREE_FANOUT) / TREE_NODE_SIZE; return result; } __device__ int getDataArrayIdx(int dirSize, int tree_size, int bottom_start, int treeIdx) { int idx; if(treeIdx < dirSize) { idx = tree_size - bottom_start - 1; } else if( treeIdx < bottom_start ) { idx = tree_size - bottom_start + treeIdx - dirSize; } else { idx = treeIdx - bottom_start; } return idx; } // Binary Search __device__ int firstMatchingKeyInDirNode1(int keys[], int key) { int min = 0; int max = TREE_NODE_SIZE; int mid; int cut; while(max - min > 1) { mid = (min + max) / 2; cut = keys[mid]; if(key > cut) min = mid; else max = mid; } if(keys[min] >= key) return min; return max; } // Binary Search __device__ int firstMatchingKeyInDataNode2(Record records[], IKeyType key) { int min = 0; int max = TREE_NODE_SIZE; int mid; int cut; while(max - min > 1) { mid = (min + max) / 2; cut = records[mid]; if(key > cut) min = mid; else max = mid; } if(records[min] == key) return min; if(max < TREE_NODE_SIZE && records[max] == key) return max; return -1; } __global__ void gCreateIndex(IDataNode data[], IDirectoryNode dir[], int dirSize, int tree_size, int bottom_start, int nNodesPerBlock) { int startIdx = hipBlockIdx_x * nNodesPerBlock; int endIdx = startIdx + nNodesPerBlock; if(endIdx > dirSize) endIdx = dirSize; int keyIdx = hipThreadIdx_x; // Proceed only when in internal nodes for(int nodeIdx = startIdx; nodeIdx < endIdx; nodeIdx++) { int childIdx = nodeIdx * TREE_FANOUT + keyIdx + 1; // One step down to the left // Then look for the right most descendent int rightMostDesIdx; // Common cases if(childIdx < tree_size) { rightMostDesIdx = getRightMostDescIdx(tree_size, childIdx); } // versus the unusual case when the tree is incomplete and the node does not have the full set of children else { // pick the last node in the tree (largest element of the array) rightMostDesIdx = tree_size - 1; } int dataArrayIdx = getDataArrayIdx(dirSize, tree_size, bottom_start, rightMostDesIdx); dir[nodeIdx].keys[keyIdx] = data[dataArrayIdx].records[TREE_NODE_SIZE - 1]; } } __global__ void gSearchTree(IDataNode* data, int nDataNodes, IDirectoryNode* dir, int nDirNodes, int lvlDir, Record* arr, int locations[], int nSearchKeys, int nKeysPerThread, int tree_size, int bottom_start) { // Bringing the root node (visited by every tuple) to the faster shared memory __shared__ IKeyType RootNodeKeys[TREE_NODE_SIZE]; RootNodeKeys[hipThreadIdx_x] = dir->keys[hipThreadIdx_x]; __syncthreads(); int OverallThreadIdx = hipBlockIdx_x * THRD_PER_BLCK_search + hipThreadIdx_x; for(int keyIdx = OverallThreadIdx; keyIdx < nSearchKeys; keyIdx += THRD_PER_GRID_search) { IKeyType val = arr[keyIdx]; int loc = firstMatchingKeyInDirNode1(RootNodeKeys, val) + 1; for(int i = 1; i < lvlDir && loc < nDirNodes; i++) { int kid = firstMatchingKeyInDirNode1(dir[loc].keys, val); loc = loc * TREE_FANOUT + kid + 1; } if(loc >= tree_size) loc = nDataNodes - 1; else loc = getDataArrayIdx(nDirNodes, tree_size, bottom_start, loc); int offset = firstMatchingKeyInDataNode2(data[loc].records, val); locations[keyIdx] = (offset <0)?-1:(loc * TREE_NODE_SIZE + offset); } } /*Counts the number of times a row in 'S' is to be joined to a row in 'R'.*/ __global__ void gIndexJoin(int *R, int *S, int g_locations[], int sLen, int g_ResNums[]) { int s_cur = hipBlockIdx_x * hipBlockDim_x + hipThreadIdx_x; if(s_cur < sLen) { int count = 1; int r_cur = g_locations[s_cur]; int s_key; if(r_cur >= 0) { s_key = S[s_cur]; r_cur++; while(s_key == R[r_cur]) { count++; r_cur++; } g_ResNums[s_cur] = count; } } } /*Corrects 'gSearchTree' results when dealing with a negative multijoin. Uses the values found in 'g_locations' which indicate, for each row in 'R', if its going to be joined (positive number) or not (-1). Works by checking the additional columns to be joined (i.e. all except the two used by 'gSearchTree') and changing to -1 in 'g_locations' those rows that have equal values in the checked columns.*/ __global__ void gIndexMultiJoinNegative(int *R, int *S, int g_locations[], int rLen, int *p1, int *p2, int of1, int of2, int *mloc, int *sloc, int *muljoin, int wj) { extern __shared__ int shared[]; int r_cur = hipBlockIdx_x * hipBlockDim_x + hipThreadIdx_x; int posr, poss, x; if(hipThreadIdx_x < wj) shared[hipThreadIdx_x] = muljoin[hipThreadIdx_x]; __syncthreads(); if(r_cur < rLen) { int s_cur = g_locations[r_cur]; int r_key; if(s_cur >= 0) { r_key = R[r_cur]; if(mloc == NULL) posr = r_cur * of1; else posr = mloc[r_cur] * of1; while(r_key == S[s_cur]) { poss = sloc[s_cur] * of2; for(x = 0; x < wj; x += 2) { if(p1[posr + shared[x]] != p2[poss + shared[x+1]]) break; } if(x >= wj) return; s_cur++; } g_locations[r_cur] = -1; } } } /*Corrects 'gSearchTree' results when dealing with a multijoin. Uses the values found in 'g_locations' which indicate, for each row in 'S', if its going to be joined (positive number) or not (-1). Works by checking the additional columns to be joined (i.e. all except the two used by 'gSearchTree') and counting the number of times a row in 'S' is to be joined to its corresponding row in 'R', storing the new result in 'g_locations'.*/ __global__ void gIndexMultiJoin(int *R, int *S, int g_locations[], int sLen, int g_ResNums[], int *p1, int *p2, int of1, int of2, int *mloc, int *sloc, int *muljoin, int wj) { extern __shared__ int shared[]; int s_cur = hipBlockIdx_x * hipBlockDim_x + hipThreadIdx_x; int posr, poss, x; if(hipThreadIdx_x < wj) shared[hipThreadIdx_x] = muljoin[hipThreadIdx_x]; __syncthreads(); if(s_cur < sLen) { int count = 0; int r_cur = g_locations[s_cur]; int s_key; if(r_cur >= 0) { s_key = S[s_cur]; if(sloc == NULL) poss = s_cur * of2; else poss = sloc[s_cur] * of2; while(s_key == R[r_cur]) { posr = mloc[r_cur] * of1; for(x = 0; x < wj; x += 2) { if(p1[posr + shared[x]] != p2[poss + shared[x+1]]) break; } if(x >= wj) count++; r_cur++; } if(count > 0) g_ResNums[s_cur] = count; } } } /*Writes the result of the join and projects the necessary columns as defined by 'rule'. The difference between this function and 'gJoinWithWrite' is the comparison of the additional join columns.*/ __global__ void multiJoinWithWrite(int g_locations[], int sLen, int g_PrefixSums[], int g_joinResultBuffers[], int *p1, int *p2, int of1, int of2, int *rule, int halfrul, int lenrul, int *mloc, int *sloc, int wj) { extern __shared__ int shared[]; int *extjoins = &shared[lenrul]; int s_cur = hipBlockIdx_x * hipBlockDim_x + hipThreadIdx_x; if(hipThreadIdx_x < (lenrul + wj)) shared[hipThreadIdx_x] = rule[hipThreadIdx_x]; __syncthreads(); if(s_cur < sLen) { int r_cur = g_locations[s_cur]; if(r_cur >= 0) { int x, y, pos, posr, poss; int num1 = g_PrefixSums[s_cur]; int num2 = g_PrefixSums[s_cur+1]; int tmp1, tmp2; if(sloc == NULL) poss = s_cur * of2; else poss = sloc[s_cur] * of2; for(x = num1; x < num2; x++, r_cur++) { pos = mloc[r_cur] * of1; for(y = 0; y < wj; y += 2) /*Additional comparison*/ { tmp1 = p1[pos + extjoins[y]]; tmp2 = p2[poss + extjoins[y+1]]; if(tmp1 != tmp2) break; } if(y < wj) { x--; continue; } posr = x * lenrul; for(y = 0; y < halfrul; y++) g_joinResultBuffers[posr + y] = p1[pos + shared[y]]; for(; y < lenrul; y++) g_joinResultBuffers[posr + y] = p2[poss + shared[y]]; } } } } /*Writes the result of the join and projects the necessary columns as defined by 'rule'. The difference between this function and 'gJoinWithWrite2' is the comparison of the additional join columns.*/ __global__ void multiJoinWithWrite2(int g_locations[], int sLen, int g_PrefixSums[], int g_joinResultBuffers[], int *p1, int *p2, int of1, int of2, int *rule, int cols, int *mloc, int *sloc, int wj) { extern __shared__ int shared[]; int *extjoins = &shared[cols]; int s_cur = hipBlockIdx_x * hipBlockDim_x + hipThreadIdx_x; if(hipThreadIdx_x < (cols + wj)) shared[hipThreadIdx_x] = rule[hipThreadIdx_x]; __syncthreads(); if(s_cur < sLen) { int r_cur = g_locations[s_cur]; if(r_cur >= 0) { int x, y, pos, pos2, posr, cond; int num1 = g_PrefixSums[s_cur]; int num2 = g_PrefixSums[s_cur+1]; if(sloc == NULL) pos2 = s_cur * of2 - 1; else pos2 = sloc[s_cur] * of2 - 1; for(x = num1; x < num2; x++, r_cur++) { pos = mloc[r_cur] * of1 - 1; for(y = 0; y < wj; y += 2) /*Additional comparison*/ { if(p1[pos + extjoins[y] + 1] != p2[pos2 + extjoins[y+1] + 1]) break; } if(y < wj) { x--; continue; } posr = x * cols; for(y = 0; y < cols; y++) { cond = shared[y]; if(cond > 0) g_joinResultBuffers[posr + y] = p1[pos + cond]; else g_joinResultBuffers[posr + y] = p2[pos2 - cond]; } } } } } /*Writes the result of the join and projects the necessary columns as defined by 'rule'. The difference between this function and 'gJoinWithWrite2' is that only the columns in the positve predicate are projected.*/ __global__ void gJoinWithWriteNegative(int g_locations[], int rLen, int g_joinResultBuffers[], int *p1, int of1, int *rule, int halfrul, int *mloc) { extern __shared__ int shared[]; int r_cur = hipBlockIdx_x * hipBlockDim_x + hipThreadIdx_x; int posr; if(hipThreadIdx_x < halfrul) shared[hipThreadIdx_x] = rule[hipThreadIdx_x]; __syncthreads(); if(r_cur < rLen) { posr = g_locations[r_cur]; if(g_locations[r_cur+1] != posr) { int y, pos; if(mloc == NULL) pos = r_cur * of1; else pos = mloc[r_cur] * of1; posr *= halfrul; for(y = 0; y < halfrul; y++) g_joinResultBuffers[posr + y] = p1[pos + shared[y]]; } } } /*Writes the result of the join and projects the necessary columns as defined by 'rule'. The difference between this function and 'gJoinWithWrite' is that only the columns in the positve predicate are projected.*/ __global__ void gJoinWithWriteNegative2(int g_locations[], int rLen, int g_joinResultBuffers[], int *p1, int of1, int *rule, int cols, int *mloc) { extern __shared__ int shared[]; int r_cur = hipBlockIdx_x * hipBlockDim_x + hipThreadIdx_x; int posr; if(hipThreadIdx_x < cols) shared[hipThreadIdx_x] = rule[hipThreadIdx_x]; __syncthreads(); if(r_cur < rLen) { posr = g_locations[r_cur]; if(g_locations[r_cur+1] != posr) { int y, pos; if(mloc == NULL) pos = r_cur * of1 - 1; else pos = mloc[r_cur] * of1 - 1; posr *= cols; for(y = 0; y < cols; y++) g_joinResultBuffers[posr + y] = p1[pos + shared[y]]; } } } /*Writes the result of the join and projects the necessary columns as defined by 'rule'.*/ __global__ void gJoinWithWrite(int g_locations[], int sLen, int g_PrefixSums[], int g_joinResultBuffers[], int *p1, int *p2, int of1, int of2, int *rule, int halfrul, int lenrul, int *mloc, int *sloc) { extern __shared__ int shared[]; int s_cur = hipBlockIdx_x * hipBlockDim_x + hipThreadIdx_x; if(hipThreadIdx_x < lenrul) shared[hipThreadIdx_x] = rule[hipThreadIdx_x]; __syncthreads(); if(s_cur < sLen) { int r_cur = g_locations[s_cur]; if(r_cur >= 0) { int x, y, pos, posr, poss; int num1 = g_PrefixSums[s_cur]; int num2 = g_PrefixSums[s_cur+1]; if(sloc == NULL) poss = s_cur * of2; else poss = sloc[s_cur] * of2; for(x = num1; x < num2; x++, r_cur++) { pos = mloc[r_cur] * of1; posr = x * lenrul; for(y = 0; y < halfrul; y++) g_joinResultBuffers[posr + y] = p1[pos + shared[y]]; for(; y < lenrul; y++) g_joinResultBuffers[posr + y] = p2[poss + shared[y]]; } } } } /*Writes the result of the join and projects the necessary columns as defined by 'rule'. This version is used when performing the final join of the rule and its only difference is the projection, which is performed based on the variables in the head of the rule.*/ __global__ void gJoinWithWrite2(int g_locations[], int sLen, int g_PrefixSums[], int g_joinResultBuffers[], int *p1, int *p2, int of1, int of2, int *rule, int cols, int *mloc, int *sloc) { extern __shared__ int shared[]; int s_cur = hipBlockIdx_x * hipBlockDim_x + hipThreadIdx_x; if(hipThreadIdx_x < cols) shared[hipThreadIdx_x] = rule[hipThreadIdx_x]; __syncthreads(); if(s_cur < sLen) { int r_cur = g_locations[s_cur]; if(r_cur >= 0) { int x, y, pos, pos2, posr, cond; int num1 = g_PrefixSums[s_cur]; int num2 = g_PrefixSums[s_cur+1]; if(sloc == NULL) pos2 = s_cur * of2 - 1; else pos2 = sloc[s_cur] * of2 - 1; for(x = num1; x < num2; x++, r_cur++) { pos = mloc[r_cur] * of1 - 1; posr = x * cols; for(y = 0; y < cols; y++) { cond = shared[y]; if(cond > 0) g_joinResultBuffers[posr + y] = p1[pos + cond]; else g_joinResultBuffers[posr + y] = p2[pos2 - cond]; } } } } } /*Load part of column 'wj' of 'p' in 'R'. Which values are loaded is defined by the prefix sum results in 'pos'.*/ __global__ void llenar(int *p, int *R, int len, int of, int wj, int *pos, int *ids) { int id = hipBlockIdx_x * hipBlockDim_x + hipThreadIdx_x; int cond; if(id < len) { cond = pos[id]; if(pos[id+1] != cond) { R[cond] = p[id * of + wj]; ids[cond] = id; } } } /*Load an entire column from 'p' into 'R'.*/ __global__ void llenarnosel(int *p, int *R, int len, int of, int wj) { int id = hipBlockIdx_x * hipBlockDim_x + hipThreadIdx_x; if(id < len) R[id] = p[id * of + wj]; } __global__ void projectfinal(int *res, int rows, int cols, int *rule, int *out) { extern __shared__ int shared[]; int id = hipBlockIdx_x * hipBlockDim_x + hipThreadIdx_x; if(hipThreadIdx_x < cols) shared[hipThreadIdx_x] = rule[hipThreadIdx_x]; __syncthreads(); if(id < rows) { id *= cols; for(int y = 0; y < cols; y++) out[id + y] = res[id + shared[y]]; } } void project(int *res, int resrows, int numcols1, int numcols2, int *proj, int **ret, int type) { int z, *dcons, *d_Rout; int numthreads = 1024; //numthreads = 32; int blockllen = resrows / numthreads + 1; int sizepro = numcols2 * sizeof(int); reservar(&dcons, sizepro); if(type) { int *pt = (int *)malloc(sizepro); for(z = 0; z < numcols2; z++) pt[z] = proj[z] - 1; hipMemcpy(dcons, pt, sizepro, hipMemcpyHostToDevice); //hipDeviceSynchronize(); //Small cudaMemcpys are asynchronous, uncomment this line if the pointer is being liberated before it is copied. free(pt); } else hipMemcpy(dcons, proj, sizepro, hipMemcpyHostToDevice); reservar(&d_Rout, resrows * sizepro); hipLaunchKernel(HIP_KERNEL_NAME(projectfinal), dim3(blockllen), dim3(numthreads), sizepro, 0, res, resrows, numcols1, dcons, d_Rout); hipFree(dcons); hipFree(*ret); *ret = d_Rout; } __global__ void projectadd(int *dop1, int *dop2, int rows1, int rows2, int cols1, int cols2, int *dhead, int hsize, int *res) { extern __shared__ int shared[]; int id = hipBlockIdx_x * hipBlockDim_x + hipThreadIdx_x; int pos2, posr, x, y, cond; if(hipThreadIdx_x < hsize) shared[hipThreadIdx_x] = dhead[hipThreadIdx_x]; __syncthreads(); if(id < rows2) { posr = id * hsize * rows1; pos2 = id * cols2 - 1; for(x = 0; x < rows1; x++) { for(y = 0; y < hsize; y++) { cond = shared[y]; if(cond > 0) res[posr + y] = dop1[cond-1]; else res[posr + y] = dop2[pos2 - cond]; } posr += hsize; } } } void juntar(int *dop1, int *dop2, int rows1, int rows2, int cols1, int cols2, int *proj, int pcols, int **ret) { int sizepro, *dcons, *d_Rout; int numthreads = 1024; //numthreads = 32; int blockllen = rows2 / numthreads + 1; sizepro = pcols * sizeof(int); reservar(&dcons, sizepro); hipMemcpy(dcons, proj, sizepro, hipMemcpyHostToDevice); reservar(&d_Rout, rows1 * rows2 * sizepro); hipLaunchKernel(HIP_KERNEL_NAME(projectadd), dim3(blockllen), dim3(numthreads), sizepro, 0, dop1, dop2, rows1, rows2, cols1, cols2, dcons, pcols, d_Rout); hipFree(dcons); *ret = d_Rout; } /*Joins two predicates. Starts by performing all preliminary operations (selections, selfjoins, comparisons) on both predicates. Then a column pair is used to construct a CSS-Tree and that tree is searched for join positions. The positions are used in a prefix sum and its result allows us to write the result. Multijoins and negative predicates follow roughly the same process, but use different kernels.*/ int join(int *p1, int *p2, int rLen, int sLen, int of1, int of2, list<rulenode>::iterator rule, int pos, int bothops, int **ret, int ANDlogic) { int pos2 = pos + 1; int *sel1 = NULL, nsel1 = 0; int *sel2 = rule->select[pos2]; int nsel2 = rule->numsel[pos2]; int *proj = rule->project[pos]; int2 projp = rule->projpos[pos]; int *sjoin1 = NULL, nsj1 = 0; int *sjoin2 = rule->selfjoin[pos2]; int nsj2 = rule->numselfj[pos2]; int *pred1 = NULL; int2 npred1 = make_int2(0,0); int *pred2 = rule->preds[pos2]; int2 npred2 = rule->numpreds[pos2]; int npred2tot = npred2.x + npred2.y; int *wherej = rule->wherejoin[pos]; int numj = rule->numjoin[pos]; int negative = rule->negatives[pos2+1]; int flag; #ifdef ROCKIT ANDlogic = 0; #endif if(negative) ANDlogic = 1; #if TIMER cuda_stats.joins++; #endif int size, sizet, sizet2; if(bothops) { sel1 = rule->select[pos]; nsel1 = rule->numsel[pos]; sjoin1 = rule->selfjoin[pos]; nsj1 = rule->numselfj[pos]; pred1 = rule->preds[pos]; npred1 = rule->numpreds[pos]; sizet = maximo(10, of1, of2, nsel1, nsel2, projp.y + numj - 2, nsj1, nsj2, numj, npred1.x, npred2tot) * sizeof(int); } else sizet = maximo(7, of1, of2, nsel2, projp.y + numj - 2, nsj2, numj, npred2tot) * sizeof(int); int *dcons, *temp, *temp2 = NULL; int *d_R, *d_S; int blockllen, numthreads; int extraspace = TREE_NODE_SIZE - rLen % TREE_NODE_SIZE; int m32rLen = rLen + extraspace; int extraspaceS = TREE_NODE_SIZE - sLen % TREE_NODE_SIZE; int m32sLen = sLen + extraspaceS; if(m32rLen > m32sLen) sizet2 = (m32rLen + 1) * sizeof(int); else sizet2 = (m32sLen + 1) * sizeof(int); reservar(&dcons, sizet); reservar(&temp, sizet2); thrust::device_ptr<int> res = thrust::device_pointer_cast(temp); numthreads = 1024; //numthreads = 32; blockllen = sLen / numthreads + 1; int memSizeS, newLen = 0; int *posR = NULL, *posS = NULL; int sizem32S = 0, sizextra; #ifdef TIMER //cout << "INICIO" << endl; hipEvent_t start, stop; float time; hipEventCreate(&start); hipEventCreate(&stop); hipEventRecord(start, 0); #endif if(npred2.x > 0 || npred2.y > 0 || nsel2 > 0 || nsj2 > 0) { newLen = sLen + 1; hipMemsetAsync(temp, 0, newLen * sizeof(int)); } if(npred2.x > 0 || npred2.y > 0) { size = npred2tot * sizeof(int); hipMemcpy(dcons, pred2, size, hipMemcpyHostToDevice); if(npred2.y > 0) /*Fix case when a(X,Y),b(Y,Z),Z > Y*/ { reservar(&temp2, sizet2); hipMemsetAsync(temp2, 0, newLen * sizeof(int)); //res = thrust::device_pointer_cast(temp2); hipLaunchKernel(HIP_KERNEL_NAME(bpreds), dim3(blockllen), dim3(numthreads), size, 0, p1, p2, sLen, of1, of2, dcons, npred2tot, npred2.x, temp + 1, temp2 + 1); } else { if(negative) hipLaunchKernel(HIP_KERNEL_NAME(bpreds), dim3(blockllen), dim3(numthreads), size, 0, p1, p2, sLen, of1, of2, dcons, npred2tot, npred2.x, temp + 1, NULL); else hipLaunchKernel(HIP_KERNEL_NAME(bpredsOR), dim3(blockllen), dim3(numthreads), size, 0, p1, p2, sLen, of1, of2, dcons, npred2tot, npred2.x, temp + 1, NULL); } if(nsel2 > 0) { size = nsel2 * sizeof(int); hipMemcpy(dcons, sel2, size, hipMemcpyHostToDevice); hipLaunchKernel(HIP_KERNEL_NAME(marcar), dim3(blockllen), dim3(numthreads), size, 0, p2, sLen, of2, dcons, nsel2, temp + 1); } if(nsj2 > 0) { size = nsj2 * sizeof(int); hipMemcpy(dcons, sjoin2, size, hipMemcpyHostToDevice); hipLaunchKernel(HIP_KERNEL_NAME(samejoin), dim3(blockllen), dim3(numthreads), size, 0, p2, sLen, of2, dcons, nsj2, temp + 1); } } else { if(nsel2 > 0) { size = nsel2 * sizeof(int); hipMemcpy(dcons, sel2, size, hipMemcpyHostToDevice); hipLaunchKernel(HIP_KERNEL_NAME(marcar2), dim3(blockllen), dim3(numthreads), size, 0, p2, sLen, of2, dcons, nsel2, temp + 1); if(nsj2 > 0) { size = nsj2 * sizeof(int); hipMemcpy(dcons, sjoin2, size, hipMemcpyHostToDevice); hipLaunchKernel(HIP_KERNEL_NAME(samejoin), dim3(blockllen), dim3(numthreads), size, 0, p2, sLen, of2, dcons, nsj2, temp + 1); } } else { if(nsj2 > 0) { size = nsj2 * sizeof(int); hipMemcpy(dcons, sjoin2, size, hipMemcpyHostToDevice); hipLaunchKernel(HIP_KERNEL_NAME(samejoin2), dim3(blockllen), dim3(numthreads), size, 0, p2, sLen, of2, dcons, nsj2, temp + 1); } else { sizem32S = m32sLen * sizeof(int); reservar(&d_S, sizem32S); hipMemsetAsync(d_S + sLen, 0x7f, extraspaceS * sizeof(int)); hipLaunchKernel(HIP_KERNEL_NAME(llenarnosel), dim3(blockllen), dim3(numthreads), 0, 0, p2, d_S, sLen, of2, wherej[1]); } } } if(npred2.x > 0 || npred2.y > 0 || nsel2 > 0 || nsj2 > 0) { flag = 0; while(flag != 1) { try { thrust::inclusive_scan(res + 1, res + newLen, res + 1); flag = 1; } catch(std::bad_alloc &e) { limpiar("inclusive scan in join", 0); } } newLen = res[sLen]; if(newLen == 0) // && !negative) ARREGLAR { hipFree(temp); hipFree(dcons); return 0; } extraspaceS = TREE_NODE_SIZE - newLen % TREE_NODE_SIZE; sizextra = extraspaceS * sizeof(int); m32sLen = newLen + extraspaceS; sizem32S = m32sLen * sizeof(int); reservar(&d_S, sizem32S); reservar(&posS, sizem32S); hipMemsetAsync(d_S + newLen, 0x7f, sizextra); hipMemsetAsync(posS + newLen, 0x7f, sizextra); hipLaunchKernel(HIP_KERNEL_NAME(llenar), dim3(blockllen), dim3(numthreads), 0, 0, p2, d_S, sLen, of2, wherej[1], temp, posS); sLen = newLen; } #ifdef TIMER hipEventRecord(stop, 0); hipEventSynchronize(stop); hipEventElapsedTime(&time, start, stop); //cout << "Select1 = " << time << endl; cuda_stats.select1_time += time; hipEventDestroy(start); hipEventDestroy(stop); hipEventCreate(&start); hipEventCreate(&stop); hipEventRecord(start, 0); #endif blockllen = rLen / numthreads + 1; int sizem32; if(bothops) { if(temp2 != NULL) { hipFree(temp); temp = temp2; res = thrust::device_pointer_cast(temp); newLen = rLen + 1; if(nsel1 > 0) { size = nsel1 * sizeof(int); hipMemcpy(dcons, sel1, size, hipMemcpyHostToDevice); hipLaunchKernel(HIP_KERNEL_NAME(marcar), dim3(blockllen), dim3(numthreads), size, 0, p1, rLen, of1, dcons, nsel1, temp + 1); } if(nsj1 > 0) { size = nsj1 * sizeof(int); hipMemcpy(dcons, sjoin1, size, hipMemcpyHostToDevice); hipLaunchKernel(HIP_KERNEL_NAME(samejoin), dim3(blockllen), dim3(numthreads), size, 0, p1, rLen, of1, dcons, nsj1, temp + 1); } if(npred1.x > 0) { size = npred1.x * sizeof(int); hipMemcpy(dcons, pred1, size, hipMemcpyHostToDevice); if(ANDlogic) hipLaunchKernel(HIP_KERNEL_NAME(bpredsnormal), dim3(blockllen), dim3(numthreads), size, 0, p1, rLen, of1, dcons, npred1.x, temp + 1); else hipLaunchKernel(HIP_KERNEL_NAME(bpredsorlogic), dim3(blockllen), dim3(numthreads), size, 0, p1, rLen, of1, dcons, npred1.x, temp + 1); } } else { if(npred1.x > 0 || nsel1 > 0 || nsj1 > 0) { newLen = rLen + 1; hipMemsetAsync(temp, 0, newLen * sizeof(int)); } if(nsel1 > 0) { size = nsel1 * sizeof(int); hipMemcpy(dcons, sel1, size, hipMemcpyHostToDevice); hipLaunchKernel(HIP_KERNEL_NAME(marcar2), dim3(blockllen), dim3(numthreads), size, 0, p1, rLen, of1, dcons, nsel1, temp + 1); if(nsj1 > 0) { size = nsj1 * sizeof(int); hipMemcpy(dcons, sjoin1, size, hipMemcpyHostToDevice); hipLaunchKernel(HIP_KERNEL_NAME(samejoin), dim3(blockllen), dim3(numthreads), size, 0, p1, rLen, of1, dcons, nsj1, temp + 1); } if(npred1.x > 0) { size = npred1.x * sizeof(int); hipMemcpy(dcons, pred1, size, hipMemcpyHostToDevice); if(ANDlogic) hipLaunchKernel(HIP_KERNEL_NAME(bpredsnormal), dim3(blockllen), dim3(numthreads), size, 0, p1, rLen, of1, dcons, npred1.x, temp + 1); else hipLaunchKernel(HIP_KERNEL_NAME(bpredsorlogic), dim3(blockllen), dim3(numthreads), size, 0, p1, rLen, of1, dcons, npred1.x, temp + 1); } } else { if(nsj1 > 0) { size = nsj1 * sizeof(int); hipMemcpy(dcons, sjoin1, size, hipMemcpyHostToDevice); hipLaunchKernel(HIP_KERNEL_NAME(samejoin2), dim3(blockllen), dim3(numthreads), size, 0, p1, rLen, of1, dcons, nsj1, temp + 1); if(npred1.x > 0) { size = npred1.x * sizeof(int); hipMemcpy(dcons, pred1, size, hipMemcpyHostToDevice); if(ANDlogic) hipLaunchKernel(HIP_KERNEL_NAME(bpredsnormal), dim3(blockllen), dim3(numthreads), size, 0, p1, rLen, of1, dcons, npred1.x, temp + 1); else hipLaunchKernel(HIP_KERNEL_NAME(bpredsorlogic), dim3(blockllen), dim3(numthreads), size, 0, p1, rLen, of1, dcons, npred1.x, temp + 1); } } else { if(npred1.x > 0) { size = npred1.x * sizeof(int); hipMemcpy(dcons, pred1, size, hipMemcpyHostToDevice); if(ANDlogic) hipLaunchKernel(HIP_KERNEL_NAME(bpredsnormal2), dim3(blockllen), dim3(numthreads), size, 0, p1, rLen, of1, dcons, npred1.x, temp + 1); else hipLaunchKernel(HIP_KERNEL_NAME(bpredsorlogic2), dim3(blockllen), dim3(numthreads), size, 0, p1, rLen, of1, dcons, npred1.x, temp + 1); } } } } if(temp2 != NULL || npred1.x > 0 || nsel1 > 0 || nsj1 > 0) { thrust::inclusive_scan(res + 1, res + newLen, res + 1); newLen = res[rLen]; if(newLen == 0) { hipFree(temp); hipFree(dcons); hipFree(d_S); if(posS != NULL) hipFree(posS); return 0; } extraspace = TREE_NODE_SIZE - newLen % TREE_NODE_SIZE; sizextra = extraspace * sizeof(int); m32rLen = newLen + extraspace; sizem32 = m32rLen * sizeof(int); reservar(&d_R, sizem32); reservar(&posR, sizem32); hipMemsetAsync(d_R + newLen, 0x7f, sizextra); hipMemsetAsync(posR + newLen, 0x7f, sizextra); hipLaunchKernel(HIP_KERNEL_NAME(llenar), dim3(blockllen), dim3(numthreads), 0, 0, p1, d_R, rLen, of1, wherej[0], temp, posR); rLen = newLen; } else { sizem32 = m32rLen * sizeof(int); reservar(&d_R, sizem32); hipMemsetAsync(d_R + rLen, 0x7f, extraspace * sizeof(int)); hipLaunchKernel(HIP_KERNEL_NAME(llenarnosel), dim3(blockllen), dim3(numthreads), 0, 0, p1, d_R, rLen, of1, wherej[0]); } } else { sizem32 = m32rLen * sizeof(int); reservar(&d_R, sizem32); hipMemsetAsync(d_R + rLen, 0x7f, extraspace * sizeof(int)); hipLaunchKernel(HIP_KERNEL_NAME(llenarnosel), dim3(blockllen), dim3(numthreads), 0, 0, p1, d_R, rLen, of1, wherej[0]); } #ifdef TIMER hipEventRecord(stop, 0); hipEventSynchronize(stop); hipEventElapsedTime(&time, start, stop); //cout << "Select2 = " << time << endl; cuda_stats.select2_time += time; #endif #ifdef TIMER hipEventDestroy(start); hipEventDestroy(stop); hipEventCreate(&start); hipEventCreate(&stop); hipEventRecord(start, 0); #endif thrust::device_ptr<Record> dvp1; thrust::device_ptr<Record> permutation; if(negative) { dvp1 = thrust::device_pointer_cast(d_S); if(posS == NULL) { reservar(&posS, sizem32S); permutation = thrust::device_pointer_cast(posS); thrust::sequence(permutation, permutation + m32sLen); } else permutation = thrust::device_pointer_cast(posS); flag = 0; while(flag != 1) { try { thrust::stable_sort_by_key(dvp1, dvp1 + m32sLen, permutation); flag = 1; } catch(std::bad_alloc &e) { limpiar("inclusive scan in join", 0); } } } else { dvp1 = thrust::device_pointer_cast(d_R); if(posR == NULL) { reservar(&posR, sizem32); permutation = thrust::device_pointer_cast(posR); thrust::sequence(permutation, permutation + m32rLen); } else permutation = thrust::device_pointer_cast(posR); flag = 0; while(flag != 1) { try { thrust::stable_sort_by_key(dvp1, dvp1 + m32rLen, permutation); flag = 1; } catch(std::bad_alloc &e) { limpiar("inclusive scan in join", 0); } } } #ifdef TIMER hipEventRecord(stop, 0); hipEventSynchronize(stop); hipEventElapsedTime(&time, start, stop); //cout << "Sort = " << time << endl; cuda_stats.sort_time += time; hipEventDestroy(start); hipEventDestroy(stop); hipEventCreate(&start); hipEventCreate(&stop); hipEventRecord(start, 0); #endif IDataNode* d_data; IDirectoryNode* d_dir; unsigned int nDataNodes; if(negative) { nDataNodes = uintCeilingDiv(sLen, TREE_NODE_SIZE); d_data=(IDataNode *)d_S; } else { nDataNodes = uintCeilingDiv(rLen, TREE_NODE_SIZE); d_data=(IDataNode *)d_R; } unsigned int lvlDir = uintCeilingLog(TREE_FANOUT, nDataNodes); unsigned int nDirNodes = uintCeilingDiv(nDataNodes - 1, TREE_NODE_SIZE); unsigned int tree_size = nDirNodes + nDataNodes; unsigned int bottom_start = (uintPower(TREE_FANOUT, lvlDir) - 1) / TREE_NODE_SIZE; d_dir = (IDirectoryNode *)temp; unsigned int nNodesPerBlock = uintCeilingDiv(nDirNodes, BLCK_PER_GRID_create); dim3 Dbc(THRD_PER_BLCK_create, 1, 1); dim3 Dgc(BLCK_PER_GRID_create, 1, 1); hipLaunchKernel(HIP_KERNEL_NAME(gCreateIndex), dim3(Dgc), dim3(Dbc), 0, 0, d_data, d_dir, nDirNodes, tree_size, bottom_start, nNodesPerBlock); int *d_locations; int memSizeR; unsigned int nSearchKeys; if(negative) { memSizeR = (rLen + 1) * sizeof(int); reservar(&d_locations, memSizeR); hipMemsetAsync(d_locations, 0, sizeof(int)); nSearchKeys = rLen; } else { memSizeS = sLen * sizeof(int); reservar(&d_locations, memSizeS); nSearchKeys = sLen; } dim3 Dbs(THRD_PER_BLCK_search, 1, 1); dim3 Dgs(BLCK_PER_GRID_search, 1, 1); unsigned int nKeysPerThread = uintCeilingDiv(nSearchKeys, THRD_PER_GRID_search); if(negative) { hipLaunchKernel(HIP_KERNEL_NAME(gSearchTree), dim3(Dgs), dim3(Dbs), 0, 0, d_data, nDataNodes, d_dir, nDirNodes, lvlDir, d_R, d_locations + 1, nSearchKeys, nKeysPerThread, tree_size, bottom_start); hipMemsetAsync(temp, 0, memSizeR); } else { hipLaunchKernel(HIP_KERNEL_NAME(gSearchTree), dim3(Dgs), dim3(Dbs), 0, 0, d_data, nDataNodes, d_dir, nDirNodes, lvlDir, d_S, d_locations, nSearchKeys, nKeysPerThread, tree_size, bottom_start); hipMemsetAsync(temp, 0, memSizeS); } int muljoin = 0, muljoinsize = 0, sum; int *d_Rout; int resSize, sizepro; if(negative) { blockllen = rLen / numthreads + 1; if(numj > 2) { muljoin = numj - 2; muljoinsize = muljoin * sizeof(int); hipMemcpy(dcons, wherej + 2, muljoinsize, hipMemcpyHostToDevice); hipLaunchKernel(HIP_KERNEL_NAME(gIndexMultiJoinNegative), dim3(blockllen), dim3(numthreads), muljoinsize, 0, d_R, d_S, d_locations + 1, rLen, p1, p2, of1, of2, posR, posS, dcons, muljoin); } res = thrust::device_pointer_cast(d_locations); thrust::transform(res + 1, res + rLen + 1, res + 1, to_neg()); thrust::inclusive_scan(res + 1, res + rLen + 1, res + 1); sum = res[rLen]; if(pos == (rule->num_rows - 3)) { sizepro = rule->num_columns * sizeof(int); hipMemcpy(dcons, proj, sizepro, hipMemcpyHostToDevice); resSize = sum * sizepro; reservar(&d_Rout, resSize); hipLaunchKernel(HIP_KERNEL_NAME(gJoinWithWriteNegative2), dim3(blockllen), dim3(numthreads), sizepro, 0, d_locations, rLen, d_Rout, p1, of1, dcons, rule->num_columns, posR); } else { sizepro = projp.x * sizeof(int); hipMemcpy(dcons, proj, sizepro, hipMemcpyHostToDevice); resSize = sum * sizepro; reservar(&d_Rout, resSize); hipLaunchKernel(HIP_KERNEL_NAME(gJoinWithWriteNegative), dim3(blockllen), dim3(numthreads), sizepro, 0, d_locations, rLen, d_Rout, p1, of1, dcons, projp.x, posR); } hipFree(d_R); hipFree(d_S); } else { blockllen = sLen / numthreads + 1; if(numj > 2) { muljoin = numj - 2; muljoinsize = muljoin * sizeof(int); hipMemcpy(dcons, wherej + 2, muljoinsize, hipMemcpyHostToDevice); hipLaunchKernel(HIP_KERNEL_NAME(gIndexMultiJoin), dim3(blockllen), dim3(numthreads), muljoinsize, 0, d_R, d_S, d_locations, sLen, temp, p1, p2, of1, of2, posR, posS, dcons, muljoin); } else hipLaunchKernel(HIP_KERNEL_NAME(gIndexJoin), dim3(blockllen), dim3(numthreads), 0, 0, d_R, d_S, d_locations, sLen, temp); hipFree(d_R); hipFree(d_S); sum = res[sLen-1]; thrust::exclusive_scan(res, res + sLen, res); sum += res[sLen-1]; if(sum == 0) { hipFree(dcons); hipFree(d_locations); hipFree(temp); if(posS != NULL) hipFree(posS); if(posR != NULL) hipFree(posR); return 0; } res[sLen] = sum; if(pos == (rule->num_rows - 3)) { sizepro = rule->num_columns * sizeof(int); hipMemcpy(dcons, proj, sizepro, hipMemcpyHostToDevice); resSize = sum * sizepro; reservar(&d_Rout, resSize); if(numj > 2) { hipMemcpy(dcons + rule->num_columns, wherej + 2, muljoinsize, hipMemcpyHostToDevice); hipLaunchKernel(HIP_KERNEL_NAME(multiJoinWithWrite2), dim3(blockllen), dim3(numthreads), sizepro + muljoinsize, 0, d_locations, sLen, temp, d_Rout, p1, p2, of1, of2, dcons, rule->num_columns, posR, posS, muljoin); } else hipLaunchKernel(HIP_KERNEL_NAME(gJoinWithWrite2), dim3(blockllen), dim3(numthreads), sizepro, 0, d_locations, sLen, temp, d_Rout, p1, p2, of1, of2, dcons, rule->num_columns, posR, posS); } else { sizepro = projp.y * sizeof(int); hipMemcpy(dcons, proj, sizepro, hipMemcpyHostToDevice); resSize = sum * sizepro; reservar(&d_Rout, resSize); if(numj > 2) { hipMemcpy(dcons + projp.y, wherej + 2, muljoinsize, hipMemcpyHostToDevice); hipLaunchKernel(HIP_KERNEL_NAME(multiJoinWithWrite), dim3(blockllen), dim3(numthreads), sizepro + muljoinsize, 0, d_locations, sLen, temp, d_Rout, p1, p2, of1, of2, dcons, projp.x, projp.y, posR, posS, muljoin); } else hipLaunchKernel(HIP_KERNEL_NAME(gJoinWithWrite), dim3(blockllen), dim3(numthreads), sizepro, 0, d_locations, sLen, temp, d_Rout, p1, p2, of1, of2, dcons, projp.x, projp.y, posR, posS); } } hipFree(dcons); hipFree(d_locations); hipFree(temp); if(posS != NULL) hipFree(posS); if(posR != NULL) hipFree(posR); if(*ret != NULL) hipFree(*ret); *ret = d_Rout; #ifdef TIMER hipEventRecord(stop, 0); hipEventSynchronize(stop); hipEventElapsedTime(&time, start, stop); //cout << "Join = " << time << endl; //cout << "FIN" << endl; cuda_stats.join_time += time; #endif return sum; }