Boto3

Output download percentage

class ProgressPercentage:
    """
    Show percentage progress bar used in file I/O situation (S3 upload)
    """
    def __init__(self, filename:str):
        self._filename = filename
        self._size = float(os.path.getsize(filename))
        self._seen_so_far = 0
        self._lock = threading.Lock()

    def __call__(self, bytes_amount:int):
        # To simplify, assume this is hooked up to a single filename
        with self._lock:
            self._seen_so_far += bytes_amount
            percentage = (self._seen_so_far / self._size) * 100
            sys.stdout.write(
                "\r%s  %s / %s  (%.2f%%)" % (
                    self._filename, 
                    self._seen_so_far, 
                    self._size,
                    percentage
                )
            )
            sys.stdout.flush()

Custom class


#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import json
import requests
import boto3
import time
from boto3.s3.transfer import TransferConfig
from botocore.config import Config
from abc import ABC, abstractmethod
from datetime import datetime 
from typing import Type, Union, Dict, List, Any,AnyStr
from fcts_logging import customLogging
from pathlib import Path
from dataclasses import dataclass
from utils.s3_divers import ProgressPercentage
from utils.dremio_divers import nameToPath, nameToSysTblName
from pyarrow import fs
from concurrent.futures import ThreadPoolExecutor 

log = customLogging(Path(__file__).stem)

# Declare most common returned datatype for the following classes/functions.  
JsonOrNone = Union[None, Dict[str, Any]]
Json = Dict[str, Any]
StrOrNone = Union[AnyStr, None]


@dataclass
class s3Connection(ABC):
    awsServiceName = os.getenv('AWS_SERVICE_NAME')
    s3AccessKeyId = os.getenv('S3_ACCESSKEYID')
    s3AccessKey = os.getenv('S3_ACCESSKEY')
    s3RegionName = os.getenv('S3_REGIONNAME')
    s3EndPoint = os.getenv('S3_ENDPOINT')
    proxyHttp = os.getenv('PROXY_HTTP')
    proxyHttps = os.getenv('PROXY_HTTPS')
    httpConfig = Config(proxies={'http': proxyHttp, 'https': proxyHttps})

    botoParams = {
        "service_name": awsServiceName,
        "aws_access_key_id": s3AccessKeyId,
        "aws_secret_access_key": s3AccessKey,
        "region_name": s3RegionName,
        "endpoint_url": s3EndPoint
    }

    pyArrowParams = {
        "access_key": s3AccessKeyId,
        "secret_key": s3AccessKey,
        "endpoint_override": s3EndPoint,
        "region": s3RegionName,
    }

    passProxy = (proxyHttp != '' and proxyHttp != None)

    @abstractmethod
    def getClient(self)->Union[boto3.client, fs.S3FileSystem]:
        pass


class botoS3(s3Connection):

    def getClient(self)->Type[boto3.client]:
        try:
            if self.passProxy:
                return boto3.client(**self.botoParams, config=self.httpConfig)
            else:
                return boto3.client(**self.botoParams)
        except Exception as e:
            log.error(str({'comment': f'botoS3 connection failed: {e}'}))

class pyArrowS3(s3Connection):

    def getClient(self)->Type[fs.S3FileSystem]:
        try:
            if self.passProxy:
                return fs.S3FileSystem(**self.pyArrowParams, proxy_options=f'http://{self.proxyHttp}')
            else:
                return fs.S3FileSystem(**self.pyArrowParams)
        except Exception as e:
            log.error(str({'comment': f'pyArrowS3 connection failed: {e}'}))
            
            
# Boto3 Handler
class boto3Handler:

    def __init__(self)->None:
        self.client = botoS3().getClient()

    def fixFolderPath(self, folderPath:str)->str:
        return folderPath if folderPath[-1:]=='/' else f'{folderPath}/'

    def listBucketNames(self)->List:
        """
        Return a list of bucket names under current S3 client
        """
        listBucketNames = self.client.list_buckets()['Buckets']
        return [bucket['Name'] for bucket in listBucketNames]

    def listBucketKeys(self, bucketName:str, prefix:str, maxKeys:int=1000)->List:
        """
        Return a list of file names (keys) under a bucket
        bucketName: name of the backet   (Eg. transcoded-com.dpdgroup.geocheck.v1.ttevent)
        prefix: path under the bucket    (Eg. '2021/11/01')
        prefix: if equals "", return all complete paths for each file under this bucket
        """
        res = []
        paginator = self.client.get_paginator('list_objects')
        pageIterator = paginator.paginate(
            Bucket=bucketName,
            Prefix=prefix,
            MaxKeys=maxKeys
        )
        prefix_ = self.fixFolderPath(prefix)
        for page in pageIterator:
            if 'Contents' in page: 
                content = page['Contents']
                for keyInfo in content:
                    key = keyInfo['Key']
                    if key != prefix_:
                        res += [key]
        return res
    
    def checkFolderExists(self, bucket:str, folderPath:str)->bool:
        folderPath = self.fixFolderPath(folderPath)
        result = self.client.list_objects(Bucket=bucket, Prefix=folderPath)
        if 'Contents' in result:
            return True
        else:
            return False

    def downloadFile(self, bucketName:str, prefix:str, targetPath:StrOrNone=None)->bool:
        """
        Download all files under a specific folder on S3
        :bucketName: Bucket where to find files to download
        :prefix: Path inside the bucket
        :return: True if files were downloaded successfully, else False
        Eg.  self.downloadFile(''checked-com.dpdgroup.geodata.v410.avro.core.shpnot','2021/11/15')
        """
        allFilesToDownload = self.listBucketKeys(bucketName, prefix)
        todayFolder = datetime.strptime(prefix, '%Y/%m/%d').strftime('%Y%m%d')
        rootFolder = targetPath if targetPath else f"download/{todayFolder}"

        if not os.path.exists(rootFolder):
            os.makedirs(rootFolder)

        def download(url):
            fileName = ".".join(url.split(".")[-2:]).replace("/","_")
            targetFolder = f"{rootFolder}/{fileName}"
            self.client.download_file(bucketName,url,targetFolder,Config=TransferConfig(max_concurrency=10,max_io_queue=800,))
        try:
            with ThreadPoolExecutor(max_workers=10) as executor:
                executor.map(download,allFilesToDownload)
        except Exception as e:
            log.error(
                str({'comment': f'S3 boto3 download failed, error : {e}'}))
            return False
        return True

    def uploadFile(self, fileName:str, bucketName:str, prefix:StrOrNone=None, objectName:StrOrNone=None)->bool:
        """
        Upload a file to aS3 bucket
        :fileName: File name to upload
        :bucketName: Bucket to upload to
        :prefix: Path inside the bucket, if None will upload file directly under the bucket
        :objectName: S3 object name. If not specified then fileName is used
        :return: True if file was uploaded, else False
        Eg.  self.uploadFile('a.py','tests','Zeliang') / self.uploadFile('a.py','tests')
        """
        uploadConfig = TransferConfig(multipart_threshold=1024*25, max_concurrency=10)
        # If S3 object_name was not specified, use fileName
        targetS3Name = objectName if objectName else fileName
        if prefix:
            targetS3Path = prefix+'/'+targetS3Name
        else:
            targetS3Path = targetS3Name
        try:
            self.client.upload_file(
                fileName, bucketName, targetS3Path, Config=uploadConfig,Callback=ProgressPercentage(fileName))
        except Exception as e:
            log.error(str({'comment': f'S3 boto3 upload failed, error : {e}'}))
            return False
        return True

    def copyFile(self, originBucket:str, originPrefix:str, destBucket:str, destPrefix:str)->bool:
        '''
        Copy 1 file in a bucket to another bucket.
        :originBucket: source bucket name
        :originPrefix: source prefix and filename ( "2021/11/04/xxx.parquet")
        :destBucket: target bucket name
        :destPrefix: target prefix and filename
        '''
        copySource = {
            'Bucket': originBucket, 'Key': originPrefix
        }
        try:
            self.client.copy(copySource, destBucket, destPrefix)
            return True
        except Exception as e:
            log.error(str({'comment': f'S3 boto3 copy file failed, error : {e}'}))
            return False


    def deleteFiles(self, bucketName, keys, maxKeys=1000)->bool:
        """
        Remove files inside a bucket, keys (files to remove) can be retrived by using self.listBucketKeys function
        :bucket:bucket name 
        :keys: files to be remove
        """
        keys = list(set(keys))
        keySlices = [keys[i:i + maxKeys] for i in range(0, len(keys), maxKeys)]

        for keySlice in keySlices:
            keyList = []
            for key in keySlice:
                keyList += [{'Key': key}]
            response = self.client.delete_objects(
                  Bucket=bucketName
                , Delete={'Objects': keyList}
            )
            deleted = [] if not 'Deleted' in response else response['Deleted']
            errors = [] if not 'Errors' in response else  response['Errors']
            if len(deleted)!=len(keySlice) or errors!=[]:
                log.error(str({'comment': f'S3 boto3 delete file failed, error : {errors}'}))
                return False
        return True

Last updated