# Useful functions

```python
import numpy as np
import pandas as pd
import glob
import shutil
import os
import sqlite3
from copy import deepcopy
from datetime import datetime,timedelta,date
```

### About dataframe

```python
# Get memory used 
def get_mem_usage(df): 
    print(f"{df.memory_usage(deep=True).sum()/1024 **2:3.2f}Mb")


# Get basic info
def get_general_info(df):
    name = [x for x in globals() if globals()[x] is df][0]
    print(f"Dataframe << {name}>>has {df.shape[0]} rows, {df.shape[1]} columns")
    print("=======================================")
    print("Column Types:\n")
    print(df.dtypes)
    print("=======================================")
    print("Missing values per column: ")
    percent_missing = df.isnull().sum()*100 / len(df)
    missing_value_df = pd.DataFrame({'column_name':df.columns,'percent_missing':percent_missing})
    missing_value_df['percent_missing'] = ["{:.2f}%".format(x) for x in missing_value_df['percent_missing'] ]
    print(missing_value_df)
    print("=======================================")
    print(f"Memory Use: {df.memory_usage(deep=True).sum()/1024 **2:3.2f}Mb")    
    print("=======================================")
    print("Missing Values in columns: ")
    print(df.isnull().sum())
    
# Change col format
def change_col_format(df,target_type):
    for c in df.columns:
        df[c] = df[c].astype(target_type)
    return df
    
# Optimize a df
def get_optimize_df(df):
    df.astype({col:'category' for col in df.columns if df[col].nunique() / df[col].shape[0]<0.5})
    return df
    

# Save / read pickle
def save_as_pickle(df,name,path=None):
    try:
        if path==None:
            df.to_pickle(f"{name}.pkl")
            print(f"Dataframe saved as pickle in => {os.getcwd()}")
        else:
            current_path = os.getcwd()
            df.to_pickle(f"{path}/{name}.pkl")
            print(f"Dataframe saved as pickle in => {path}/{name}.pkl")
            os.chdir(current_path)
    except:
        print("Save failed. Make sure it's a dataframe or the path is correct")

def read_pickle_as_df(path=None):
    result={}
    current_path = os.getcwd()
    target_path = os.getcwd()
    if path!=None:
        target_path = path

    os.chdir(target_path)
    lst = glob.glob(f"*.pkl")
        
    for p in lst:
        name = p.split(".")[0]
        result[name]=pd.read_pickle(p)
    
    os.chdir(current_path)
    return result

# Read big csv file    
def read_large_csv(name,chunkSize=1000000,encoding='utf-8'):

    reader = pd.read_csv(name,iterator=True,encoding=encoding)
    chunks=[]
    loop=True
    while loop:
        try:
            chunk = reader.get_chunk(chunkSize)
            chunks.append(chunk)
        except StopIteration:
            loop=False
    df = pd.concat(chunks,ignore_index=True)
    return df

```

### Files

```python
# Return a list of file with assigned extension
def get_files(extension):
    return glob.glob(f"*.{extension}")

# Remove the folder if exist and create new one with same name 
def create_clean_dir(name):
    if os.path.isdir(name):
        shutil.rmtree(name)
        os.makedirs(name)
    else:
        os.makedirs(name)
    os.chdir(name)
    print(f"Current working dir => :{os.getcwd()}")
    
    
# Create a temporary dir
import tempfile
from contextlib import contextmanager


@contextmanager
def tmpdir(path=None):
    if path:
        tmpdir = tempfile.TemporaryDirectory(dir=path)
    else:
        tmpdir = tempfile.TemporaryDirectory()
    try:
        yield tmpdir
        print(tmpdir.name)
    finally:
        try:
            tmpdir.cleanup()
        except OSError as e:
            print(e)
                
default_path="F:\\"

with tmpdir(default_path) as tmp:
    # Do sth
    time.sleep(5)
```

### Time

```python
def get_n_days_ago(n=0,time_format="%d-%m-%Y"):
    time_stamp = datetime.now()-timedelta(days=n)
    return time_stamp.strftime(time_format)
    
 
# strftime("%d/%m/%Y, %H:%M:%S")  
# 05/13/2019, 13:38:14
```

### Test iterable

```python
def is_iterable(obj):
    try:
        iter(obj)
        return True
    except TypeError:
        return False
        
        
obj = 100
if is_iterable(obj):
    for i in obj:
        print(i)
else:
    print('Error: obj is not iterable')        

Error: obj is not iterable
```

### Get df from list of tuples

```python
df = pd.DataFrame.from_records(
   [namedtuple_instance1, namedtuple_instance2],
   columns=namedtuple_type._fields
)
```

### Split a list to groups with almost equal file size

```python
def makeGroup(lst:List,n:int)->List:
    lists = [[] for _ in range(n)]
    totals = [(0, i) for i in range(n)]
    heapq.heapify(totals)
    for value in lst:
        total, index = heapq.heappop(totals)
        lists[index].append(value)
        heapq.heappush(totals, (total + value, index))
    return lists
```

### Add extra params in command

```python
import os
from glob import glob
import heapq

def filterOutEmptyFiles(files):
    return [ f for f in files if os.stat(f).st_size!=0]

# Split a list to different groups
def makeGroup(lst, n):
    lists = [[] for _ in range(n)]
    totals = [(0, i) for i in range(n)]
    heapq.heapify(totals)
    for value in lst:
        total, index = heapq.heappop(totals)
        lists[index].append(value)
        heapq.heappush(totals, (total + value, index))
    return lists
    

def groupFiles(pqFiles,numberGroup):
    DownloadParquet = namedtuple("DownloadParquet", "name size")
    allDownload = [DownloadParquet(name = f,size=os.stat(f).st_size) for f in pqFiles]
    sizeList = [d.size for d in allDownload ]
    afterSizeGroupe= makeGroup(sizeList,numberGroup)
    result = []   
    for each in afterSizeGroupe:
        tmp=[]
        for everyFileSize in each:
            for d in allDownload:
                if d.size==everyFileSize:
                    tmp.append(d.name)
        result.append(tmp)
    del allDownload,sizeList,afterSizeGroupe
    return result
    
    
```

```python
import os, sys, ast
import pandas as pd

def main():
    
    START_DATE = sys.argv[1] 
    END_DATE = sys.argv[2]
    ORIGIN_BUCKETS = ast.literal_eval(sys.argv[3]) 
    DESTINATION_BUCKET = sys.argv[4]
    DATA_TYPE = sys.argv[5]
    FLATTEN = sys.argv[6]

    dtRange = pd.date_range(start=START_DATE, end=END_DATE)
    for dt in dtRange:
        print(dt.strftime('%Y%m%d'))


    print(START_DATE)
    print(END_DATE)
    print(ORIGIN_BUCKETS)
    print(DESTINATION_BUCKET)
    print(DATA_TYPE)
    print(FLATTEN)

    print('a'==ORIGIN_BUCKETS[0])
    print('ttevent'==DATA_TYPE)

if __name__ == '__main__':
    main()

# In cmd
python test.py 2021-09-01 2021-09-03 "['a','b']"  dest ttevent true

20210901
20210902
20210903
2021-09-01
2021-09-03
['a', 'b']
dest
ttevent
true
True
True
```

### &#x20;Filter out empty files

```python
def filterOutEmptyFiles(files):
    return [ f for f in files if os.stat(f).st_size!=0]
```

### Convert a file size to mb,gb...

```python
def convertSize(sizeBytes:int)->Union[int,str]:
    if sizeBytes == 0:
        return "0B"
    size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
    i = int(math.floor(math.log(sizeBytes, 1024)))
    p = math.pow(1024, i)
    s = round(sizeBytes / p, 2)
    return (sizeBytes, f"{s} {size_name[i]}")

```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://zeliang-yao.gitbook.io/my-note-zeliang-yao/useful/others.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
