-
Notifications
You must be signed in to change notification settings - Fork 0
/
pipeline_00.py
59 lines (48 loc) · 2.29 KB
/
pipeline_00.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import os
import gdown
import duckdb
import pandas as pd
from sqlalchemy import create_engine
from dotenv import load_dotenv
from duckdb import DuckDBPyRelation
from pandas import DataFrame
load_dotenv()
def baixar_os_arquivos_do_google_drive(url_pasta, diretorio_local):
os.makedirs(diretorio_local, exist_ok=True)
gdown.download_folder(url_pasta, output=diretorio_local, quiet=False, use_cookies=False)
# Função para listar arquivos CSV no diretório especificado
def listar_arquivos_csv(diretorio):
arquivos_csv = []
todos_os_arquivos = os.listdir(diretorio)
for arquivo in todos_os_arquivos:
if arquivo.endswith(".csv"):
caminho_completo = os.path.join(diretorio, arquivo)
arquivos_csv.append(caminho_completo)
return arquivos_csv
# Função para ler um arquivo CSV e retornar um DataFrame duckdb
def ler_csv(caminho_do_arquivo):
dataframe_duckdb = duckdb.read_csv(caminho_do_arquivo)
return dataframe_duckdb
# Função para adicionar uma coluna de total de vendas
def transformar(df: DuckDBPyRelation) -> DataFrame:
# Executa a consulta SQL que inclui a nova coluna, operando sobre a tabela virtual
df_transformado = duckdb.sql("SELECT *, quantidade * valor AS total_vendas FROM df").df()
# Remove o registro da tabela virtual para limpeza
return df_transformado
# Função para converter o Duckdb em Pandas e salvar o DataFrame no PostgreSQL
def salvar_no_postgres(df_duckdb, tabela):
DATABASE_URL = os.getenv("DATABASE_URL") # Ex: 'postgresql://user:password@localhost:5432/database_name'
engine = create_engine(DATABASE_URL)
# Salvar o DataFrame no PostgreSQL
df_duckdb.to_sql(tabela, con=engine, if_exists='append', index=False)
if __name__ == "__main__":
url_pasta = 'https://drive.google.com/drive/folders/1C5by1vqyj1h3Dhxy8X6XyRx2sD99H4Et'
diretorio_local = './pasta_gdown'
#baixar_os_arquivos_do_google_drive(url_pasta, diretorio_local)
lista_de_arquivos = listar_arquivos_csv(diretorio_local)
for caminho_do_arquivo in lista_de_arquivos:
duckdb_db_df = ler_csv(caminho_do_arquivo)
pandas_df_transformado = transformar(duckdb_db_df)
salvar_no_postgres(pandas_df_transformado, "vendas_calculado")
data_frame_duckdb = ler_csv(caminho_do_arquivo)
transformar(data_frame_duckdb)