My Note / Zeliang YAO
  • Zeliang's Note
  • Dremio
    • Custom Class
  • 💕Python
    • Design Pattern
      • Creational
        • Abstract Factory
        • Factory Method
        • Singleton
        • Builder / Director
      • Structural
        • Adapter
    • Boto3
    • Typing
    • String
    • Requests
    • Iterator & Iterable
      • Consuming iterator manually
      • Lazy Iterable
    • Genrators
    • itertools
    • Collections
    • Customization
      • Customize built-in
      • Logging
      • Hdf5
      • Sqlite3 & Df
    • Pandas
      • Basic
      • Data cleaning
      • Merge, Join, Concat
      • Useful tricks
      • Simple model
      • Pandas acceleration
    • Pandas time series
      • Date Range
      • Datetime Index
      • Holidays
      • Function_to_date_time
      • Period
      • Time zone
    • *args and**kwargs
    • Context Manager
    • Lambda
    • SHA
    • Multithreading
      • Threading
      • Speed Up
    • Email
    • Improvement
    • Useful functions
    • Python OOP
      • Basic
      • @static / @class method
      • attrs module
      • Dataclasses
      • Dataclasses example
      • Others
    • Design patterns
      • Creational Patterns
      • Structural Patterns
      • Behavioral Patterns
  • 🐣Git/Github
    • Commands
  • K8s
    • Useful commands
  • Linux
    • Chmod
Powered by GitBook
On this page
  • About dataframe
  • Files
  • Time
  • Test iterable
  • Get df from list of tuples
  • Split a list to groups with almost equal file size
  • Add extra params in command
  • Filter out empty files
  • Convert a file size to mb,gb...

Was this helpful?

  1. Python

Useful functions

Other useful functions, notes, class

import numpy as np
import pandas as pd
import glob
import shutil
import os
import sqlite3
from copy import deepcopy
from datetime import datetime,timedelta,date

About dataframe

# Get memory used 
def get_mem_usage(df): 
    print(f"{df.memory_usage(deep=True).sum()/1024 **2:3.2f}Mb")


# Get basic info
def get_general_info(df):
    name = [x for x in globals() if globals()[x] is df][0]
    print(f"Dataframe << {name}>>has {df.shape[0]} rows, {df.shape[1]} columns")
    print("=======================================")
    print("Column Types:\n")
    print(df.dtypes)
    print("=======================================")
    print("Missing values per column: ")
    percent_missing = df.isnull().sum()*100 / len(df)
    missing_value_df = pd.DataFrame({'column_name':df.columns,'percent_missing':percent_missing})
    missing_value_df['percent_missing'] = ["{:.2f}%".format(x) for x in missing_value_df['percent_missing'] ]
    print(missing_value_df)
    print("=======================================")
    print(f"Memory Use: {df.memory_usage(deep=True).sum()/1024 **2:3.2f}Mb")    
    print("=======================================")
    print("Missing Values in columns: ")
    print(df.isnull().sum())
    
# Change col format
def change_col_format(df,target_type):
    for c in df.columns:
        df[c] = df[c].astype(target_type)
    return df
    
# Optimize a df
def get_optimize_df(df):
    df.astype({col:'category' for col in df.columns if df[col].nunique() / df[col].shape[0]<0.5})
    return df
    

# Save / read pickle
def save_as_pickle(df,name,path=None):
    try:
        if path==None:
            df.to_pickle(f"{name}.pkl")
            print(f"Dataframe saved as pickle in => {os.getcwd()}")
        else:
            current_path = os.getcwd()
            df.to_pickle(f"{path}/{name}.pkl")
            print(f"Dataframe saved as pickle in => {path}/{name}.pkl")
            os.chdir(current_path)
    except:
        print("Save failed. Make sure it's a dataframe or the path is correct")

def read_pickle_as_df(path=None):
    result={}
    current_path = os.getcwd()
    target_path = os.getcwd()
    if path!=None:
        target_path = path

    os.chdir(target_path)
    lst = glob.glob(f"*.pkl")
        
    for p in lst:
        name = p.split(".")[0]
        result[name]=pd.read_pickle(p)
    
    os.chdir(current_path)
    return result

# Read big csv file    
def read_large_csv(name,chunkSize=1000000,encoding='utf-8'):

    reader = pd.read_csv(name,iterator=True,encoding=encoding)
    chunks=[]
    loop=True
    while loop:
        try:
            chunk = reader.get_chunk(chunkSize)
            chunks.append(chunk)
        except StopIteration:
            loop=False
    df = pd.concat(chunks,ignore_index=True)
    return df

Files

# Return a list of file with assigned extension
def get_files(extension):
    return glob.glob(f"*.{extension}")

# Remove the folder if exist and create new one with same name 
def create_clean_dir(name):
    if os.path.isdir(name):
        shutil.rmtree(name)
        os.makedirs(name)
    else:
        os.makedirs(name)
    os.chdir(name)
    print(f"Current working dir => :{os.getcwd()}")
    
    
# Create a temporary dir
import tempfile
from contextlib import contextmanager


@contextmanager
def tmpdir(path=None):
    if path:
        tmpdir = tempfile.TemporaryDirectory(dir=path)
    else:
        tmpdir = tempfile.TemporaryDirectory()
    try:
        yield tmpdir
        print(tmpdir.name)
    finally:
        try:
            tmpdir.cleanup()
        except OSError as e:
            print(e)
                
default_path="F:\\"

with tmpdir(default_path) as tmp:
    # Do sth
    time.sleep(5)

Time

def get_n_days_ago(n=0,time_format="%d-%m-%Y"):
    time_stamp = datetime.now()-timedelta(days=n)
    return time_stamp.strftime(time_format)
    
 
# strftime("%d/%m/%Y, %H:%M:%S")  
# 05/13/2019, 13:38:14

Test iterable

def is_iterable(obj):
    try:
        iter(obj)
        return True
    except TypeError:
        return False
        
        
obj = 100
if is_iterable(obj):
    for i in obj:
        print(i)
else:
    print('Error: obj is not iterable')        

Error: obj is not iterable

Get df from list of tuples

df = pd.DataFrame.from_records(
   [namedtuple_instance1, namedtuple_instance2],
   columns=namedtuple_type._fields
)

Split a list to groups with almost equal file size

def makeGroup(lst:List,n:int)->List:
    lists = [[] for _ in range(n)]
    totals = [(0, i) for i in range(n)]
    heapq.heapify(totals)
    for value in lst:
        total, index = heapq.heappop(totals)
        lists[index].append(value)
        heapq.heappush(totals, (total + value, index))
    return lists

Add extra params in command

import os
from glob import glob
import heapq

def filterOutEmptyFiles(files):
    return [ f for f in files if os.stat(f).st_size!=0]

# Split a list to different groups
def makeGroup(lst, n):
    lists = [[] for _ in range(n)]
    totals = [(0, i) for i in range(n)]
    heapq.heapify(totals)
    for value in lst:
        total, index = heapq.heappop(totals)
        lists[index].append(value)
        heapq.heappush(totals, (total + value, index))
    return lists
    

def groupFiles(pqFiles,numberGroup):
    DownloadParquet = namedtuple("DownloadParquet", "name size")
    allDownload = [DownloadParquet(name = f,size=os.stat(f).st_size) for f in pqFiles]
    sizeList = [d.size for d in allDownload ]
    afterSizeGroupe= makeGroup(sizeList,numberGroup)
    result = []   
    for each in afterSizeGroupe:
        tmp=[]
        for everyFileSize in each:
            for d in allDownload:
                if d.size==everyFileSize:
                    tmp.append(d.name)
        result.append(tmp)
    del allDownload,sizeList,afterSizeGroupe
    return result
    
    
import os, sys, ast
import pandas as pd

def main():
    
    START_DATE = sys.argv[1] 
    END_DATE = sys.argv[2]
    ORIGIN_BUCKETS = ast.literal_eval(sys.argv[3]) 
    DESTINATION_BUCKET = sys.argv[4]
    DATA_TYPE = sys.argv[5]
    FLATTEN = sys.argv[6]

    dtRange = pd.date_range(start=START_DATE, end=END_DATE)
    for dt in dtRange:
        print(dt.strftime('%Y%m%d'))


    print(START_DATE)
    print(END_DATE)
    print(ORIGIN_BUCKETS)
    print(DESTINATION_BUCKET)
    print(DATA_TYPE)
    print(FLATTEN)

    print('a'==ORIGIN_BUCKETS[0])
    print('ttevent'==DATA_TYPE)

if __name__ == '__main__':
    main()

# In cmd
python test.py 2021-09-01 2021-09-03 "['a','b']"  dest ttevent true

20210901
20210902
20210903
2021-09-01
2021-09-03
['a', 'b']
dest
ttevent
true
True
True

Filter out empty files

def filterOutEmptyFiles(files):
    return [ f for f in files if os.stat(f).st_size!=0]

Convert a file size to mb,gb...

def convertSize(sizeBytes:int)->Union[int,str]:
    if sizeBytes == 0:
        return "0B"
    size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
    i = int(math.floor(math.log(sizeBytes, 1024)))
    p = math.pow(1024, i)
    s = round(sizeBytes / p, 2)
    return (sizeBytes, f"{s} {size_name[i]}")
PreviousImprovementNextPython OOP

Last updated 3 years ago

Was this helpful?

💕
Page cover image