Boto3
Output download percentage
class ProgressPercentage:
"""
Show percentage progress bar used in file I/O situation (S3 upload)
"""
def __init__(self, filename:str):
self._filename = filename
self._size = float(os.path.getsize(filename))
self._seen_so_far = 0
self._lock = threading.Lock()
def __call__(self, bytes_amount:int):
# To simplify, assume this is hooked up to a single filename
with self._lock:
self._seen_so_far += bytes_amount
percentage = (self._seen_so_far / self._size) * 100
sys.stdout.write(
"\r%s %s / %s (%.2f%%)" % (
self._filename,
self._seen_so_far,
self._size,
percentage
)
)
sys.stdout.flush()
Custom class
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import json
import requests
import boto3
import time
from boto3.s3.transfer import TransferConfig
from botocore.config import Config
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Type, Union, Dict, List, Any,AnyStr
from fcts_logging import customLogging
from pathlib import Path
from dataclasses import dataclass
from utils.s3_divers import ProgressPercentage
from utils.dremio_divers import nameToPath, nameToSysTblName
from pyarrow import fs
from concurrent.futures import ThreadPoolExecutor
log = customLogging(Path(__file__).stem)
# Declare most common returned datatype for the following classes/functions.
JsonOrNone = Union[None, Dict[str, Any]]
Json = Dict[str, Any]
StrOrNone = Union[AnyStr, None]
@dataclass
class s3Connection(ABC):
awsServiceName = os.getenv('AWS_SERVICE_NAME')
s3AccessKeyId = os.getenv('S3_ACCESSKEYID')
s3AccessKey = os.getenv('S3_ACCESSKEY')
s3RegionName = os.getenv('S3_REGIONNAME')
s3EndPoint = os.getenv('S3_ENDPOINT')
proxyHttp = os.getenv('PROXY_HTTP')
proxyHttps = os.getenv('PROXY_HTTPS')
httpConfig = Config(proxies={'http': proxyHttp, 'https': proxyHttps})
botoParams = {
"service_name": awsServiceName,
"aws_access_key_id": s3AccessKeyId,
"aws_secret_access_key": s3AccessKey,
"region_name": s3RegionName,
"endpoint_url": s3EndPoint
}
pyArrowParams = {
"access_key": s3AccessKeyId,
"secret_key": s3AccessKey,
"endpoint_override": s3EndPoint,
"region": s3RegionName,
}
passProxy = (proxyHttp != '' and proxyHttp != None)
@abstractmethod
def getClient(self)->Union[boto3.client, fs.S3FileSystem]:
pass
class botoS3(s3Connection):
def getClient(self)->Type[boto3.client]:
try:
if self.passProxy:
return boto3.client(**self.botoParams, config=self.httpConfig)
else:
return boto3.client(**self.botoParams)
except Exception as e:
log.error(str({'comment': f'botoS3 connection failed: {e}'}))
class pyArrowS3(s3Connection):
def getClient(self)->Type[fs.S3FileSystem]:
try:
if self.passProxy:
return fs.S3FileSystem(**self.pyArrowParams, proxy_options=f'http://{self.proxyHttp}')
else:
return fs.S3FileSystem(**self.pyArrowParams)
except Exception as e:
log.error(str({'comment': f'pyArrowS3 connection failed: {e}'}))
# Boto3 Handler
class boto3Handler:
def __init__(self)->None:
self.client = botoS3().getClient()
def fixFolderPath(self, folderPath:str)->str:
return folderPath if folderPath[-1:]=='/' else f'{folderPath}/'
def listBucketNames(self)->List:
"""
Return a list of bucket names under current S3 client
"""
listBucketNames = self.client.list_buckets()['Buckets']
return [bucket['Name'] for bucket in listBucketNames]
def listBucketKeys(self, bucketName:str, prefix:str, maxKeys:int=1000)->List:
"""
Return a list of file names (keys) under a bucket
bucketName: name of the backet (Eg. transcoded-com.dpdgroup.geocheck.v1.ttevent)
prefix: path under the bucket (Eg. '2021/11/01')
prefix: if equals "", return all complete paths for each file under this bucket
"""
res = []
paginator = self.client.get_paginator('list_objects')
pageIterator = paginator.paginate(
Bucket=bucketName,
Prefix=prefix,
MaxKeys=maxKeys
)
prefix_ = self.fixFolderPath(prefix)
for page in pageIterator:
if 'Contents' in page:
content = page['Contents']
for keyInfo in content:
key = keyInfo['Key']
if key != prefix_:
res += [key]
return res
def checkFolderExists(self, bucket:str, folderPath:str)->bool:
folderPath = self.fixFolderPath(folderPath)
result = self.client.list_objects(Bucket=bucket, Prefix=folderPath)
if 'Contents' in result:
return True
else:
return False
def downloadFile(self, bucketName:str, prefix:str, targetPath:StrOrNone=None)->bool:
"""
Download all files under a specific folder on S3
:bucketName: Bucket where to find files to download
:prefix: Path inside the bucket
:return: True if files were downloaded successfully, else False
Eg. self.downloadFile(''checked-com.dpdgroup.geodata.v410.avro.core.shpnot','2021/11/15')
"""
allFilesToDownload = self.listBucketKeys(bucketName, prefix)
todayFolder = datetime.strptime(prefix, '%Y/%m/%d').strftime('%Y%m%d')
rootFolder = targetPath if targetPath else f"download/{todayFolder}"
if not os.path.exists(rootFolder):
os.makedirs(rootFolder)
def download(url):
fileName = ".".join(url.split(".")[-2:]).replace("/","_")
targetFolder = f"{rootFolder}/{fileName}"
self.client.download_file(bucketName,url,targetFolder,Config=TransferConfig(max_concurrency=10,max_io_queue=800,))
try:
with ThreadPoolExecutor(max_workers=10) as executor:
executor.map(download,allFilesToDownload)
except Exception as e:
log.error(
str({'comment': f'S3 boto3 download failed, error : {e}'}))
return False
return True
def uploadFile(self, fileName:str, bucketName:str, prefix:StrOrNone=None, objectName:StrOrNone=None)->bool:
"""
Upload a file to aS3 bucket
:fileName: File name to upload
:bucketName: Bucket to upload to
:prefix: Path inside the bucket, if None will upload file directly under the bucket
:objectName: S3 object name. If not specified then fileName is used
:return: True if file was uploaded, else False
Eg. self.uploadFile('a.py','tests','Zeliang') / self.uploadFile('a.py','tests')
"""
uploadConfig = TransferConfig(multipart_threshold=1024*25, max_concurrency=10)
# If S3 object_name was not specified, use fileName
targetS3Name = objectName if objectName else fileName
if prefix:
targetS3Path = prefix+'/'+targetS3Name
else:
targetS3Path = targetS3Name
try:
self.client.upload_file(
fileName, bucketName, targetS3Path, Config=uploadConfig,Callback=ProgressPercentage(fileName))
except Exception as e:
log.error(str({'comment': f'S3 boto3 upload failed, error : {e}'}))
return False
return True
def copyFile(self, originBucket:str, originPrefix:str, destBucket:str, destPrefix:str)->bool:
'''
Copy 1 file in a bucket to another bucket.
:originBucket: source bucket name
:originPrefix: source prefix and filename ( "2021/11/04/xxx.parquet")
:destBucket: target bucket name
:destPrefix: target prefix and filename
'''
copySource = {
'Bucket': originBucket, 'Key': originPrefix
}
try:
self.client.copy(copySource, destBucket, destPrefix)
return True
except Exception as e:
log.error(str({'comment': f'S3 boto3 copy file failed, error : {e}'}))
return False
def deleteFiles(self, bucketName, keys, maxKeys=1000)->bool:
"""
Remove files inside a bucket, keys (files to remove) can be retrived by using self.listBucketKeys function
:bucket:bucket name
:keys: files to be remove
"""
keys = list(set(keys))
keySlices = [keys[i:i + maxKeys] for i in range(0, len(keys), maxKeys)]
for keySlice in keySlices:
keyList = []
for key in keySlice:
keyList += [{'Key': key}]
response = self.client.delete_objects(
Bucket=bucketName
, Delete={'Objects': keyList}
)
deleted = [] if not 'Deleted' in response else response['Deleted']
errors = [] if not 'Errors' in response else response['Errors']
if len(deleted)!=len(keySlice) or errors!=[]:
log.error(str({'comment': f'S3 boto3 delete file failed, error : {errors}'}))
return False
return True
Last updated