Custom Class
class dremioBaseHttpRequests:
def __init__(self)->None:
self.dremioConn = os.getenv('DREMIO_CONNECTOR')
self.dremioUser = os.getenv('DREMIO_USER')
self.dremioPwd = os.getenv('DREMIO_PASSWORD')
self.dremioPort = os.getenv('DREMIO_PORT')
self.dremioServer = f"http://{os.getenv('DREMIO_SERVER')}:{self.dremioPort}"
self.dremioToken = None
self.dremioHeader = {'content-type': 'application/json'}
self.generateToken()
def generateToken(self)->None:
loginUrl = f"{self.dremioServer}/apiv2/login"
loginData = {'userName': self.dremioUser, 'password': self.dremioPwd}
try:
response = requests.post(
loginUrl, headers=self.dremioHeader, data=json.dumps(loginData))
token = json.loads(response.text)['token']
self.dremioHeader['authorization'] = f"_dremio{token}"
self.dremioToken = token
except Exception as e:
log.error(str({'comment': f"Can't get a valid token: {e}"}))
def __str__(self)->str:
return(f"Header: {self.dremioHeader}, Server: {self.dremioServer}, User: {self.dremioUser}")
def apiGet(self, endpoint:str)->JsonOrNone:
request = f'{self.dremioServer}/api/v3/{endpoint}'
oReq = requests.get(request, headers=self.dremioHeader)
statusCode = oReq.status_code
if statusCode == 200:
try:
return json.loads(oReq.text)
except:
log.error(str({'comment': f'failed json.loads({oReq.text})'}))
return None
else:
log.error(str({'comment': f'statusCode={statusCode}'}))
return None
def apiPut(self, endpoint:str, body=None)->Json:
text = requests.put(
f'{self.dremioServer}/api/v3/{endpoint}', headers=self.dremioHeader, data=json.dumps(body)
).text
return json.loads(text)
def apiDelete(self, endpoint:str)->Json:
url = f'{self.dremioServer}/api/v3/{endpoint}'
return requests.delete(url, headers=self.dremioHeader)
def apiPost(self, endpoint:str, body=None)->JsonOrNone:
url = f'{self.dremioServer}/api/v3/{endpoint}'
text = requests.post(
url,
headers = self.dremioHeader,
data = json.dumps(body)
).text
if (text):
return json.loads(text)
else:
return None
def getCatalogEndpoint(self, endpoint:str='')->JsonOrNone:
return self.apiGet(f"catalog/{endpoint}")
def getReflectionEndpoint(self, endpoint:str='')->JsonOrNone:
return self.apiGet(f"reflection/{endpoint}")
class dremioJobs(dremioBaseHttpRequests):
JOB_STATE_CHECK_SEC = 1
def __init__(self)->None:
try:
super().__init__()
except Exception as e:
log.error(str({'comment': f"Can't create dremioJobs instance, error : {e}"}))
self.resultMaxOffset = 100000000
self.resultMaxRecords = 500
def _getJobState(self, endpoint:str)->str:
getResult = self.apiGet(endpoint)
return getResult['jobState']
def getJobLogs(self, jobId:str)->Json:
return self.apiGet(f'job/{jobId}')
def waitJobCompletion(self, jobId:str)->bool:
endpoint = f'job/{jobId}'
try:
jobState = self._getJobState(endpoint)
while jobState != 'COMPLETED':
if jobState == 'FAILED':
log.error(str({'comment': f'waitJobCompletion {endpoint} jobState==FAILED'}))
return False
time.sleep(self.JOB_STATE_CHECK_SEC)
jobState = self._getJobState(endpoint)
return True
except Exception as e:
log.error(str({'comment': f'waitJobCompletion {endpoint} failed ({e})'}))
return False
def getJobResult(self, jobId:str)->List[Json]:
allRows = []
for offset in range(0, self.resultMaxOffset, self.resultMaxRecords):
endpoint = f'job/{jobId}/results?offset={offset}&limit={self.resultMaxRecords}'
jobResult = self.apiGet(endpoint)
if jobResult == None:
return jobResult
rows = jobResult['rows']
if rows == []:
break
allRows.extend(rows)
return allRows
def sqlQueryJobId(self, sqlQuery:str)->str:
"""
Get the job id for an SQL query post.
"""
response = requests.post(
f'{self.dremioServer}/api/v3/sql',
headers=self.dremioHeader,
data=json.dumps({'sql': sqlQuery})
).json()
return response['id']
def runSqlQuery(self, sqlQuery:str)->Union[List[Json], None]:
"""
Post SQL query and collect result of the corresponding job.
"""
jobId = self.sqlQueryJobId(sqlQuery)
jobCompletion = self.waitJobCompletion(jobId)
if jobCompletion:
return self.getJobResult(jobId)
else:
return None
def getJobDurations(self, jobLogs:Json)->Json:
timeFormat = '%Y-%m-%dT%H:%M:%S.%fZ'
rowCount = jobLogs['rowCount']
startedAt = datetime.strptime(jobLogs['startedAt'], timeFormat)
rSchedStartedAt = datetime.strptime(jobLogs['resourceSchedulingStartedAt'], timeFormat)
rSchedEndedAt = datetime.strptime(jobLogs['resourceSchedulingEndedAt'], timeFormat)
endedAt = datetime.strptime(jobLogs['endedAt'], timeFormat)
planning = (rSchedStartedAt - startedAt).total_seconds()
queued = (rSchedEndedAt - rSchedStartedAt).total_seconds()
running = (endedAt - rSchedEndedAt).total_seconds()
result = {
'rowCount': rowCount,
'planning': planning,
'queued': queued,
'running': running,
'total': round((planning + queued + running), 4)
}
return result
class dremioDatasets(dremioBaseHttpRequests):
def __init__(self)->None:
try:
super().__init__()
except Exception as e:
log.error(str({'comment': f"Can't create dremioDatasets instance, error : {e}"}))
def datasetInfoFromName(self, datasetName:str)->str:
pdsPath = nameToPath(datasetName)
return self.apiGet(f'catalog/by-path/{pdsPath}')
def datasetInfoFromId(self, datasetId:str)->str:
return self.apiGet(f'catalog/{datasetId}')
def datasetIdFromName(self, datasetName:str)->str:
datasetInfo = self.datasetInfoFromName(datasetName)
if datasetInfo != None:
return datasetInfo['id']
else:
None
def catalogRefresh(self, pdsId:str)->None:
self.apiPost(f'catalog/{pdsId}/refresh')
class dremioReflections(dremioJobs):
def __init__(self)->None:
try:
super().__init__()
except Exception as e:
log.error(str({'comment': f"Can't create dremioReflections instance, error : {e}"}))
def datasetReflectionsInfo(self, datasetName:str)->dict:
'''
Based on the Dremio sys tables, collects ALL the information on given dataset.
'''
sysDataset = nameToSysTblName(datasetName)
sqlQuery = f"""
SELECT * FROM sys.reflections
WHERE dataset='{sysDataset}'
"""
result = self.runSqlQuery(sqlQuery)
return result
Last updated