Sqlite3 & Df
A simple class to create, read , insert dataframe to Sqlite3
import pandas as pd
import numpy as np
import sqlite3
"""
Custom class to connect a Sqlite3 database
Return in format Datframe or cursor
"""
class DatabaseSqlite3:
def __init__(self,db_name):
"""Create a connection"""
self.db_name =db_name
self.status=False
try:
self.connection = sqlite3.connect(self.db_name)
print(f"Connected to << {self.db_name}>>")
self.status = True
except(Exception,sqlite3.Error) as error:
print("Error while trying connect",error)
def close_connection(self):
""" Close a connction """
if self.status:
self.connection.close()
print(f"Connection for << {self.db_name} >> is closed")
else:
print(f"Connection for << {self.db_name} >> is already closed")
def __enter__(self):
return self
def __exit__(self,exc_type,exc_value,exc_tb):
if exc_type:
print(f"Error occurred: {exc_type},{exc_value}")
self.close_connection()
def read_database_version(self):
""" Get current database version """
try:
cursor = self.connection.cursor()
cursor.execute("select sqlite_version();")
db_version = cursor.fetchone()
print(f"<< {self.db_name} >> 's version is {db_version}")
except(Exception,sqlite3.Error) as error:
print(f"Error while getting data",error)
def get_table_names(self):
"""Return all table names in the current database"""
try:
cursor = self.connection.cursor()
query = cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
records =cursor.fetchall()
cols = [column[0] for column in query.description]
cursor.close()
except sqlite3.Error as error:
print(f"Failed to read data from sqlite table",error)
results = pd.DataFrame.from_records(data=records,columns=cols).rename(columns={'name':'Table Name'})
return results
def read_table_with_df(self,table_name,conditions=None,limit=None):
"""
Get a table in current database, return the table with format Dataframe
conditions: SQL query
limit: number of rows returns
"""
extra_conditon = ""
if conditions:
extra_conditon=f" WHERE {conditions}"
try:
if limit==None:
sqlite_query = f"""SELECT * from {table_name}"""+extra_conditon
else:
sqlite_query = f"""SELECT * from {table_name} LIMIT {limit}"""+extra_conditon
df = pd.read_sql(sqlite_query,self.connection)
except sqlite3.Error as error:
print("Failed to retrive data from sqlite table")
return df
def get_column_names_from_table(self,table_name):
"""Return a list of column names from a table in database"""
columns_names=list()
try:
cursor =self.connection.cursor()
table_column_names = 'PRAGMA table_info('+table_name+');'
cursor.execute(table_column_names)
records = cursor.fetchall()
for name in records:
columns_names.append(name[1])
cursor.close()
except sqlite3.Error as error:
print("Failed to get data",error)
return columns_names
def replace_table_with_df(self,table_name,df,replace=False):
"""
Replace the selected table with Dataframe
replace=False:append data to the table
replace=True:replace all data with df
"""
try:
if table_name in list(self.get_table_names()['Table Name']):
print(f"Found table <<{table_name}>> in Database <<{self.db_name}>>")
else:
print(f"Attention , creating new table <<{table_name}>> in Database <<{self.db_name}>> ")
if replace:
df.to_sql(name=table_name,con=self.connection,if_exists="replace", index=False)
else:
df.to_sql(name=table_name,con=self.connection,if_exists="append", index=False)
print("Sql insert process finished.")
except sqlite3.Error as error:
print("Failed to update",error)
print("If it's a creation, be careful with columns format and value types")
def __getitem__(self,table_name):
try:
return self.read_table_with_df(table_name)
except:
raise KeyValueError(f"{table_name} not found in database.")
def update_table(self,table_name,update_values,conditions):
"""
Update a table with new values and conditons
update_values:list of update values
conditions: string / list of SQL expression
"""
updated = update_values
cond = conditions
if isinstance(updated,list):
updated = ", ".join(update_values)
if isinstance(conditions,list):
cond = " AND ".join(conditions)
sqlite_query = f"UPDATE {table_name} SET {updated} WHERE {cond};"
print(sqlite_query)
try:
cursor = self.connection.cursor()
cursor.execute(sqlite_query)
self.connection.commit()
cursor.close()
print(f"Update table << {table_name} >> success.")
except sqlite3.Error as error:
print(f"Failed to update table {table_name}",error)
def delete_table(self,table_name):
"""Remove a table in the current database"""
try:
cursor =self.connection.cursor()
sqlite_query = f"DROP TABLE {table_name};"
cursor.execute(sqlite_query)
self.connection.commit()
cursor.close()
print(f"Drop table << {table_name} >> success.")
except sqlite3.Error as error:
print(f"Failed to delete table <<{table_name}>>",error)
def back_up_to(self,dest):
current_path = os.getcwd()
os.chdir(dest)
new_name ="Backup"+datetime.now().strftime("%d-%m-%Y")+self.db_name
bck = sqlite3.connect(new_name)
self.connection.backup(bck)
bck.close()
print("Back Up finished.")
os.chdir(current_path)
Last updated