DA

ETL 실습: Redshift에 연동하고, ETL에 필요한 함수 정의해 테이블에 적재하기 feat.transaction

ha_data 2024. 1. 25. 22:37

1. 구글 코랩으로 Redshift 연동하기 

ID/PW 입력하고 접속할 DB정보 입력 

import psycopg2

# Redshift connection 함수
# 본인 ID/PW 사용!
def get_Redshift_connection():
    host = ""
    redshift_user = ""
    redshift_pass = ""
    port = 5439
    dbname = "dev"
    conn = psycopg2.connect("dbname={dbname} user={user} host={host} password={password} port={port}".format(
        dbname=dbname,
        user=redshift_user,
        password=redshift_pass,
        host=host,
        port=port
    ))
    conn.set_session(autocommit=True
    return conn.cursor()
autocommit = True : SQL에서 테이블을 수정할 경우 바로 반영되는 것.
-> 방지하려면 transaction 사용하기

 

2. ETL 함수를 하나씩 정의

(1) extract 함수: url 정보가 들어오면 해당 문서의 내용을 text로 리턴

* 로그인 해야되는 문서라면 아래 코드로는 해결 불가..!

import requests

def extract(url):
    f = requests.get(url)
    return (f.text)

 

(2) transform 함수: 위에서 불러온 text를 name, gender로 파싱해서 리스트로 만들기

def transform(text):
 #csv 파일 레코드별 하나의 아이템이 되는 리스트
    records = []  
    lines = text.strip().split("\n")
   
    #header 정보는 테이블에 로딩 x, 첫번째 레코드는 제외하고 반복문 실행
    for l in lines[1:]:
    #콤마를 기준으로 데이터 분리, 저장
      (name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
      records.append([name, gender])
    return records

 

(3) load 함수: 위에서 records 받아오고 테이블에 적재하기

 

레코드 형태 : 리스트 안에 리스트 형식 

    records = [
      [ "Keeyong", "M" ],
      [ "Claire", "F" ],
      ...
    ]

 

테이블에 데이터 적재하기

데이터 파이프라인 업데이트 방법 2가지
1. Full refresh
2. Incremental update 
Full refresh :  테이블을 항상 새로 만들어서 적재하는 것 -> 오류 발생할 수 있음 -> transaction 사용하기 
트렌젝션 사용하면 중간에 오류 발생할 경우 다시 처음 상태로 돌아감 -> 데이터 정확성, 멱등성 보장됨.
def load(records):
    """
    records = [
      [ "Keeyong", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
#BEGIN transaction 열어주기, 에러나면 except 실행
#완료가 안되면 다시 처음 상태로 돌아가는 것 => 데이터 정확성 보장, 멱등성 보장
    cur = get_Redshift_connection()
    cur.execute("BEGIN;")
    try:
    #스키마에 테이블 만들기
        cur.execute("""DROP TABLE IF EXISTS o1111lha.name_gender;
CREATE TABLE keeyong.name_gender (
   name varchar(32) primary key,
   gender varchar(8)
);""")
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = "INSERT INTO o1111lha.name_gender VALUES ('{n}', '{g}')".format(n=name, g=gender)
            cur.execute(sql)
    #모든게 잘 실행되면 끝내기
        cur.execute("END")   # cur.execute("COMMIT"); conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK")

 

실행 결과

 

data = extract(link)
data
 
name,gender
Adaleigh,F
Amryn,Unisex
Apurva,Unisex
Aryion,M
Alixia,F
Alyssarose,F
Arvell,M
Aibel,M
Atiyyah,F
.
.
.
생략
lines = transform(data)
[['Adaleigh', 'F'],
 ['Amryn', 'Unisex'],
 ['Apurva', 'Unisex'],
 ['Aryion', 'M'],
 ['Alixia', 'F'],
 ['Alyssarose', 'F'],
 ['Arvell', 'M'],
 ['Aibel', 'M'],
 ['Atiyyah', 'F'],
 ['Adlie', 'F']]
load(lines)
Adaleigh - F Amryn - Unisex Apurva - Unisex Aryion - M Alixia - F Alyssarose - F Arvell - M Aibel - M Atiyyah - F Adlie - F Anyely - F Aamoni - F Ahman - M Arlane - F Armoney - F Atzhiry - F Antonette - F Akeelah - F Abdikadir - M Arinze - M Arshaun - M Alexandro - M Ayriauna - F Aqib - M Alleya - F Aavah - F Anesti - Unisex Adalaide - F Analena - F Alaeyah - F Albena - F Aimi - F Adwaith - M Arkady - M Astyn - Unisex Adelee - F Agata - F Alegna - F Altan - M Ahnaleigh - F Algie - Unisex Ashanti - F Aislyn - F Adaleine - F Anthnoy - M Algernon - M Aeryona - F Adrinne - F Addell - F Avril - F Ahni - F Aimon - M Adolpho - M Ahuva - F Aurielle - F Aveana - F Aliyia - F Alesander - M Adnrea - F Anjae - F Alvine - F Adorah - F Adlemi - F Alesi - F Alontae - M Antonny - M Adarah - F Ayreanna - F Antyon - M Andia - F Ashla - F Aspyn - F Antwanett - F Aundreia - F Audella - F Amari - Unisex Arsha - Unisex Aricella - F Adan - M Apasra - F Alaysha - F Anderson - M Aurelius - M Aerial - F Averleigh - F Aslean - F Arniesha - F Asyana - F Annjane - F Amabella - F Austinjohn - M Arloween - F Alula - M Anemone - F Amorina - F Anureet - F Arric - M Antonne - M Alyre - M Annaise - F
%%sql

SELECT COUNT(1)
FROM o1111lha.name_gender;
count
100
%%sql

SELECT *
FROM o1111lha.name_gender;
 * postgresql://o1111lha:***@learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev
100 rows affected.
namegender
Adaleigh F
Amryn Unisex
Apurva Unisex
Aryion M
Alixia F
Alyssarose F
Arvell M
Aibel M
Atiyyah F
Adlie F
Anyely F
Aamoni F
%%sql

SELECT gender, COUNT(1) count
FROM o1111lha.name_gender
GROUP BY gender;
 
gendercount

 

F 65
Unisex 7
M 28