Custom Class


class dremioBaseHttpRequests:

    def __init__(self)->None:
        self.dremioConn = os.getenv('DREMIO_CONNECTOR')
        self.dremioUser = os.getenv('DREMIO_USER')
        self.dremioPwd = os.getenv('DREMIO_PASSWORD')
        self.dremioPort = os.getenv('DREMIO_PORT')
        self.dremioServer = f"http://{os.getenv('DREMIO_SERVER')}:{self.dremioPort}"
        self.dremioToken = None
        self.dremioHeader = {'content-type': 'application/json'}
        self.generateToken()

    def generateToken(self)->None:
        loginUrl = f"{self.dremioServer}/apiv2/login"
        loginData = {'userName': self.dremioUser, 'password': self.dremioPwd}

        try:
            response = requests.post(
                loginUrl, headers=self.dremioHeader, data=json.dumps(loginData))
            token = json.loads(response.text)['token']
            self.dremioHeader['authorization'] = f"_dremio{token}"
            self.dremioToken = token
        except Exception as e:
            log.error(str({'comment': f"Can't get a valid token: {e}"}))

    def __str__(self)->str:
        return(f"Header: {self.dremioHeader}, Server: {self.dremioServer}, User: {self.dremioUser}")

    def apiGet(self, endpoint:str)->JsonOrNone:
        request = f'{self.dremioServer}/api/v3/{endpoint}'
        oReq = requests.get(request, headers=self.dremioHeader)
        statusCode = oReq.status_code
        if statusCode == 200:
            try:
                return json.loads(oReq.text)
            except:
                log.error(str({'comment': f'failed json.loads({oReq.text})'}))
                return None
        else:
            log.error(str({'comment': f'statusCode={statusCode}'}))
            return None

    def apiPut(self, endpoint:str, body=None)->Json:
        text = requests.put(
            f'{self.dremioServer}/api/v3/{endpoint}', headers=self.dremioHeader, data=json.dumps(body)
        ).text
        return json.loads(text)

    def apiDelete(self, endpoint:str)->Json:
        url = f'{self.dremioServer}/api/v3/{endpoint}'
        return requests.delete(url, headers=self.dremioHeader)

    def apiPost(self, endpoint:str, body=None)->JsonOrNone:
        url = f'{self.dremioServer}/api/v3/{endpoint}'
        text = requests.post(
            url, 
            headers = self.dremioHeader,
            data = json.dumps(body)
        ).text
        if (text):  
            return json.loads(text)
        else:
            return None
            
    def getCatalogEndpoint(self, endpoint:str='')->JsonOrNone:
        return self.apiGet(f"catalog/{endpoint}")
        
    def getReflectionEndpoint(self, endpoint:str='')->JsonOrNone:
        return self.apiGet(f"reflection/{endpoint}")

class dremioJobs(dremioBaseHttpRequests):

    JOB_STATE_CHECK_SEC = 1

    def __init__(self)->None:
        try:
            super().__init__()
        except Exception as e:
            log.error(str({'comment': f"Can't create dremioJobs instance, error : {e}"}))
        self.resultMaxOffset = 100000000
        self.resultMaxRecords = 500

    def _getJobState(self, endpoint:str)->str:
        getResult = self.apiGet(endpoint)
        return getResult['jobState']

    def getJobLogs(self, jobId:str)->Json:
        return self.apiGet(f'job/{jobId}')

    def waitJobCompletion(self, jobId:str)->bool:
        endpoint = f'job/{jobId}'
        try:
            jobState = self._getJobState(endpoint)
            while jobState != 'COMPLETED':
                if jobState == 'FAILED':
                    log.error(str({'comment': f'waitJobCompletion {endpoint} jobState==FAILED'}))
                    return False
                time.sleep(self.JOB_STATE_CHECK_SEC)
                jobState = self._getJobState(endpoint)
            return True
        except Exception as e:
            log.error(str({'comment': f'waitJobCompletion {endpoint} failed ({e})'}))
            return False

    def getJobResult(self, jobId:str)->List[Json]:
        allRows = []
        for offset in range(0, self.resultMaxOffset, self.resultMaxRecords):
            endpoint = f'job/{jobId}/results?offset={offset}&limit={self.resultMaxRecords}'
            jobResult = self.apiGet(endpoint)

            if jobResult == None:
                return jobResult

            rows = jobResult['rows']
            if rows == []:
                break
            allRows.extend(rows)
        return allRows

    def sqlQueryJobId(self, sqlQuery:str)->str:
        """
        Get the job id for an SQL query post.
        """
        response = requests.post(
            f'{self.dremioServer}/api/v3/sql',
            headers=self.dremioHeader, 
            data=json.dumps({'sql': sqlQuery})
        ).json()
        return response['id']

    def runSqlQuery(self, sqlQuery:str)->Union[List[Json], None]:
        """
        Post SQL query and collect result of the corresponding job.
        """
        jobId = self.sqlQueryJobId(sqlQuery)
        jobCompletion = self.waitJobCompletion(jobId)
        if jobCompletion:
            return self.getJobResult(jobId)
        else:
            return None

    def getJobDurations(self, jobLogs:Json)->Json:
        timeFormat = '%Y-%m-%dT%H:%M:%S.%fZ'
        rowCount = jobLogs['rowCount']
        startedAt = datetime.strptime(jobLogs['startedAt'], timeFormat)
        rSchedStartedAt = datetime.strptime(jobLogs['resourceSchedulingStartedAt'], timeFormat)
        rSchedEndedAt = datetime.strptime(jobLogs['resourceSchedulingEndedAt'], timeFormat)
        endedAt = datetime.strptime(jobLogs['endedAt'], timeFormat)
        planning = (rSchedStartedAt - startedAt).total_seconds()
        queued = (rSchedEndedAt - rSchedStartedAt).total_seconds()
        running = (endedAt - rSchedEndedAt).total_seconds()
        result = {
            'rowCount': rowCount, 
            'planning': planning, 
            'queued': queued, 
            'running': running, 
            'total': round((planning + queued + running), 4)
        }
        return result


class dremioDatasets(dremioBaseHttpRequests):
    
    def __init__(self)->None:
        try:
            super().__init__()
        except Exception as e:
            log.error(str({'comment': f"Can't create dremioDatasets instance, error : {e}"}))
    
    def datasetInfoFromName(self, datasetName:str)->str:
        pdsPath = nameToPath(datasetName)
        return self.apiGet(f'catalog/by-path/{pdsPath}')

    def datasetInfoFromId(self, datasetId:str)->str:
        return self.apiGet(f'catalog/{datasetId}')

    def datasetIdFromName(self, datasetName:str)->str:
        datasetInfo = self.datasetInfoFromName(datasetName)
        if datasetInfo != None:
            return datasetInfo['id']
        else:
            None

    def catalogRefresh(self, pdsId:str)->None:
        self.apiPost(f'catalog/{pdsId}/refresh')


class dremioReflections(dremioJobs):

    def __init__(self)->None:
        try:
            super().__init__()
        except Exception as e:
            log.error(str({'comment': f"Can't create dremioReflections instance, error : {e}"}))

    def datasetReflectionsInfo(self, datasetName:str)->dict:
        '''
        Based on the Dremio sys tables, collects ALL the information on given dataset.
        '''
        sysDataset = nameToSysTblName(datasetName)
        sqlQuery = f"""
            SELECT * FROM sys.reflections
            WHERE dataset='{sysDataset}'
        """
        result = self.runSqlQuery(sqlQuery)
        return result

Last updated