Useful functions
Other useful functions, notes, class
import numpy as np
import pandas as pd
import glob
import shutil
import os
import sqlite3
from copy import deepcopy
from datetime import datetime,timedelta,date
About dataframe
# Get memory used
def get_mem_usage(df):
print(f"{df.memory_usage(deep=True).sum()/1024 **2:3.2f}Mb")
# Get basic info
def get_general_info(df):
name = [x for x in globals() if globals()[x] is df][0]
print(f"Dataframe << {name}>>has {df.shape[0]} rows, {df.shape[1]} columns")
print("=======================================")
print("Column Types:\n")
print(df.dtypes)
print("=======================================")
print("Missing values per column: ")
percent_missing = df.isnull().sum()*100 / len(df)
missing_value_df = pd.DataFrame({'column_name':df.columns,'percent_missing':percent_missing})
missing_value_df['percent_missing'] = ["{:.2f}%".format(x) for x in missing_value_df['percent_missing'] ]
print(missing_value_df)
print("=======================================")
print(f"Memory Use: {df.memory_usage(deep=True).sum()/1024 **2:3.2f}Mb")
print("=======================================")
print("Missing Values in columns: ")
print(df.isnull().sum())
# Change col format
def change_col_format(df,target_type):
for c in df.columns:
df[c] = df[c].astype(target_type)
return df
# Optimize a df
def get_optimize_df(df):
df.astype({col:'category' for col in df.columns if df[col].nunique() / df[col].shape[0]<0.5})
return df
# Save / read pickle
def save_as_pickle(df,name,path=None):
try:
if path==None:
df.to_pickle(f"{name}.pkl")
print(f"Dataframe saved as pickle in => {os.getcwd()}")
else:
current_path = os.getcwd()
df.to_pickle(f"{path}/{name}.pkl")
print(f"Dataframe saved as pickle in => {path}/{name}.pkl")
os.chdir(current_path)
except:
print("Save failed. Make sure it's a dataframe or the path is correct")
def read_pickle_as_df(path=None):
result={}
current_path = os.getcwd()
target_path = os.getcwd()
if path!=None:
target_path = path
os.chdir(target_path)
lst = glob.glob(f"*.pkl")
for p in lst:
name = p.split(".")[0]
result[name]=pd.read_pickle(p)
os.chdir(current_path)
return result
# Read big csv file
def read_large_csv(name,chunkSize=1000000,encoding='utf-8'):
reader = pd.read_csv(name,iterator=True,encoding=encoding)
chunks=[]
loop=True
while loop:
try:
chunk = reader.get_chunk(chunkSize)
chunks.append(chunk)
except StopIteration:
loop=False
df = pd.concat(chunks,ignore_index=True)
return df
Files
# Return a list of file with assigned extension
def get_files(extension):
return glob.glob(f"*.{extension}")
# Remove the folder if exist and create new one with same name
def create_clean_dir(name):
if os.path.isdir(name):
shutil.rmtree(name)
os.makedirs(name)
else:
os.makedirs(name)
os.chdir(name)
print(f"Current working dir => :{os.getcwd()}")
# Create a temporary dir
import tempfile
from contextlib import contextmanager
@contextmanager
def tmpdir(path=None):
if path:
tmpdir = tempfile.TemporaryDirectory(dir=path)
else:
tmpdir = tempfile.TemporaryDirectory()
try:
yield tmpdir
print(tmpdir.name)
finally:
try:
tmpdir.cleanup()
except OSError as e:
print(e)
default_path="F:\\"
with tmpdir(default_path) as tmp:
# Do sth
time.sleep(5)
Time
def get_n_days_ago(n=0,time_format="%d-%m-%Y"):
time_stamp = datetime.now()-timedelta(days=n)
return time_stamp.strftime(time_format)
# strftime("%d/%m/%Y, %H:%M:%S")
# 05/13/2019, 13:38:14
Test iterable
def is_iterable(obj):
try:
iter(obj)
return True
except TypeError:
return False
obj = 100
if is_iterable(obj):
for i in obj:
print(i)
else:
print('Error: obj is not iterable')
Error: obj is not iterable
Get df from list of tuples
df = pd.DataFrame.from_records(
[namedtuple_instance1, namedtuple_instance2],
columns=namedtuple_type._fields
)
Split a list to groups with almost equal file size
def makeGroup(lst:List,n:int)->List:
lists = [[] for _ in range(n)]
totals = [(0, i) for i in range(n)]
heapq.heapify(totals)
for value in lst:
total, index = heapq.heappop(totals)
lists[index].append(value)
heapq.heappush(totals, (total + value, index))
return lists
Add extra params in command
import os
from glob import glob
import heapq
def filterOutEmptyFiles(files):
return [ f for f in files if os.stat(f).st_size!=0]
# Split a list to different groups
def makeGroup(lst, n):
lists = [[] for _ in range(n)]
totals = [(0, i) for i in range(n)]
heapq.heapify(totals)
for value in lst:
total, index = heapq.heappop(totals)
lists[index].append(value)
heapq.heappush(totals, (total + value, index))
return lists
def groupFiles(pqFiles,numberGroup):
DownloadParquet = namedtuple("DownloadParquet", "name size")
allDownload = [DownloadParquet(name = f,size=os.stat(f).st_size) for f in pqFiles]
sizeList = [d.size for d in allDownload ]
afterSizeGroupe= makeGroup(sizeList,numberGroup)
result = []
for each in afterSizeGroupe:
tmp=[]
for everyFileSize in each:
for d in allDownload:
if d.size==everyFileSize:
tmp.append(d.name)
result.append(tmp)
del allDownload,sizeList,afterSizeGroupe
return result
import os, sys, ast
import pandas as pd
def main():
START_DATE = sys.argv[1]
END_DATE = sys.argv[2]
ORIGIN_BUCKETS = ast.literal_eval(sys.argv[3])
DESTINATION_BUCKET = sys.argv[4]
DATA_TYPE = sys.argv[5]
FLATTEN = sys.argv[6]
dtRange = pd.date_range(start=START_DATE, end=END_DATE)
for dt in dtRange:
print(dt.strftime('%Y%m%d'))
print(START_DATE)
print(END_DATE)
print(ORIGIN_BUCKETS)
print(DESTINATION_BUCKET)
print(DATA_TYPE)
print(FLATTEN)
print('a'==ORIGIN_BUCKETS[0])
print('ttevent'==DATA_TYPE)
if __name__ == '__main__':
main()
# In cmd
python test.py 2021-09-01 2021-09-03 "['a','b']" dest ttevent true
20210901
20210902
20210903
2021-09-01
2021-09-03
['a', 'b']
dest
ttevent
true
True
True
Filter out empty files
def filterOutEmptyFiles(files):
return [ f for f in files if os.stat(f).st_size!=0]
Convert a file size to mb,gb...
def convertSize(sizeBytes:int)->Union[int,str]:
if sizeBytes == 0:
return "0B"
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(sizeBytes, 1024)))
p = math.pow(1024, i)
s = round(sizeBytes / p, 2)
return (sizeBytes, f"{s} {size_name[i]}")
Last updated