1. 구글 코랩으로 Redshift 연동하기
ID/PW 입력하고 접속할 DB정보 입력
import psycopg2
# Redshift connection 함수
# 본인 ID/PW 사용!
def get_Redshift_connection():
host = ""
redshift_user = ""
redshift_pass = ""
port = 5439
dbname = "dev"
conn = psycopg2.connect("dbname={dbname} user={user} host={host} password={password} port={port}".format(
dbname=dbname,
user=redshift_user,
password=redshift_pass,
host=host,
port=port
))
conn.set_session(autocommit=True)
return conn.cursor()
autocommit = True : SQL에서 테이블을 수정할 경우 바로 반영되는 것.
-> 방지하려면 transaction 사용하기
2. ETL 함수를 하나씩 정의
(1) extract 함수: url 정보가 들어오면 해당 문서의 내용을 text로 리턴
* 로그인 해야되는 문서라면 아래 코드로는 해결 불가..!
import requests
def extract(url):
f = requests.get(url)
return (f.text)
(2) transform 함수: 위에서 불러온 text를 name, gender로 파싱해서 리스트로 만들기
def transform(text):
#csv 파일 레코드별 하나의 아이템이 되는 리스트
records = []
lines = text.strip().split("\n")
#header 정보는 테이블에 로딩 x, 첫번째 레코드는 제외하고 반복문 실행
for l in lines[1:]:
#콤마를 기준으로 데이터 분리, 저장
(name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
records.append([name, gender])
return records
(3) load 함수: 위에서 records 받아오고 테이블에 적재하기
레코드 형태 : 리스트 안에 리스트 형식
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
테이블에 데이터 적재하기
데이터 파이프라인 업데이트 방법 2가지
1. Full refresh
2. Incremental update
Full refresh : 테이블을 항상 새로 만들어서 적재하는 것 -> 오류 발생할 수 있음 -> transaction 사용하기
트렌젝션 사용하면 중간에 오류 발생할 경우 다시 처음 상태로 돌아감 -> 데이터 정확성, 멱등성 보장됨.
def load(records):
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
#BEGIN transaction 열어주기, 에러나면 except 실행
#완료가 안되면 다시 처음 상태로 돌아가는 것 => 데이터 정확성 보장, 멱등성 보장
cur = get_Redshift_connection()
cur.execute("BEGIN;")
try:
#스키마에 테이블 만들기
cur.execute("""DROP TABLE IF EXISTS o1111lha.name_gender;
CREATE TABLE keeyong.name_gender (
name varchar(32) primary key,
gender varchar(8)
);""")
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = "INSERT INTO o1111lha.name_gender VALUES ('{n}', '{g}')".format(n=name, g=gender)
cur.execute(sql)
#모든게 잘 실행되면 끝내기
cur.execute("END") # cur.execute("COMMIT"); conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK")
실행 결과
data
name,gender
Adaleigh,F
Amryn,Unisex
Apurva,Unisex
Aryion,M
Alixia,F
Alyssarose,F
Arvell,M
Aibel,M
Atiyyah,F
.
Adaleigh,F
Amryn,Unisex
Apurva,Unisex
Aryion,M
Alixia,F
Alyssarose,F
Arvell,M
Aibel,M
Atiyyah,F
.
.
.
생략
lines = transform(data)
[['Adaleigh', 'F'],
['Amryn', 'Unisex'],
['Apurva', 'Unisex'],
['Aryion', 'M'],
['Alixia', 'F'],
['Alyssarose', 'F'],
['Arvell', 'M'],
['Aibel', 'M'],
['Atiyyah', 'F'],
['Adlie', 'F']]
load(lines)
Adaleigh - F Amryn - Unisex Apurva - Unisex Aryion - M Alixia - F Alyssarose - F Arvell - M Aibel - M Atiyyah - F Adlie - F Anyely - F Aamoni - F Ahman - M Arlane - F Armoney - F Atzhiry - F Antonette - F Akeelah - F Abdikadir - M Arinze - M Arshaun - M Alexandro - M Ayriauna - F Aqib - M Alleya - F Aavah - F Anesti - Unisex Adalaide - F Analena - F Alaeyah - F Albena - F Aimi - F Adwaith - M Arkady - M Astyn - Unisex Adelee - F Agata - F Alegna - F Altan - M Ahnaleigh - F Algie - Unisex Ashanti - F Aislyn - F Adaleine - F Anthnoy - M Algernon - M Aeryona - F Adrinne - F Addell - F Avril - F Ahni - F Aimon - M Adolpho - M Ahuva - F Aurielle - F Aveana - F Aliyia - F Alesander - M Adnrea - F Anjae - F Alvine - F Adorah - F Adlemi - F Alesi - F Alontae - M Antonny - M Adarah - F Ayreanna - F Antyon - M Andia - F Ashla - F Aspyn - F Antwanett - F Aundreia - F Audella - F Amari - Unisex Arsha - Unisex Aricella - F Adan - M Apasra - F Alaysha - F Anderson - M Aurelius - M Aerial - F Averleigh - F Aslean - F Arniesha - F Asyana - F Annjane - F Amabella - F Austinjohn - M Arloween - F Alula - M Anemone - F Amorina - F Anureet - F Arric - M Antonne - M Alyre - M Annaise - F
%%sql
SELECT COUNT(1)
FROM o1111lha.name_gender;
count
100 |
%%sql
SELECT *
FROM o1111lha.name_gender;
* postgresql://o1111lha:***@learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev
100 rows affected.
namegender
Adaleigh | F |
Amryn | Unisex |
Apurva | Unisex |
Aryion | M |
Alixia | F |
Alyssarose | F |
Arvell | M |
Aibel | M |
Atiyyah | F |
Adlie | F |
Anyely | F |
Aamoni | F |
%%sql
SELECT gender, COUNT(1) count
FROM o1111lha.name_gender
GROUP BY gender;
gendercount
F | 65 |
Unisex | 7 |
M | 28 |
'DA' 카테고리의 다른 글
KPI란? | Metrics vs. KPI | 좋은 지표의 기준 - 3A, SMART (1) | 2024.01.26 |
---|---|
Snowflake 실습 - AWS S3에 Bucket, IAM 생성 | Snowflake에서 DB, Schema, Table 생성 | COPY 하기 (0) | 2024.01.26 |
Snowflake소개 및 특징 (0) | 2024.01.23 |
데이퍼 파이프라인이란 | 데이터 파이프 라인 만들 때 고려할 점 (0) | 2024.01.23 |
데이터레이크|데이터웨어하우스|Airflow|ETL,ELT (2) | 2024.01.22 |