1. 자연어 이해(NLU)
* 자연어 이해(NLU, Natural Language Understanding)는 기계가 인간의 언어를 의미적으로 해석하고 이해하는 기술을 의미합니다.
* 이는 단순한 단어 분석을 넘어 문맥을 파악하고, 개체 인식(NER), 감성 분석, 의미적 유사성 판단, 문장 의도 분류 등 다양한 과업을 포함합니다.
* NLU는 기계 학습과 딥러닝 기술을 활용하여 문장의 의미를 추론하며, 챗봇, 음성 비서, 기계 번역, 자동 요약 등의 응용 분야에서 중요한 역할을 합니다.
2. 자연어 생성(NLG)
* 자연어 생성(NLG, Natural Language Generation)은 기계가 사람이 이해할 수 있는 자연스러운 텍스트를 생성하는 기술을 의미합니다.
* 이는 데이터를 분석하고 적절한 문맥을 반영하여 문장을 구성하는 과정으로, 보고서 작성, 뉴스 요약, 대화형 AI 응답 생성, 소설 및 시 작성 등 다양한 응용 분야에서 활용됩니다.
* NLG 시스템은 일반적으로 데이터 해석, 문장 계획, 언어 실현의 세 단계를 거치며, 최근에는 GPT와 같은 대규모 언어 모델이 등장하면서 더욱 자연스럽고 창의적인 텍스트 생성이 가능해졌습니다.
3. 시퀀스투시퀀스
* 시퀀스투시퀀스(Sequence-to-Sequence, Seq2Seq) 모델은 입력 시퀀스를 받아 출력 시퀀스를 생성하는 신경망 아키텍처로, 주로 자연어 처리 분야에서 번역, 요약, 질의응답 등에 활용됩니다.
* Seq2Seq 모델은 일반적으로 인코더(Encoder)와 디코더(Decoder) 두 부분으로 구성되며, 인코더는 입력 문장을 고정된 크기의 컨텍스트 벡터로 변환하고, 디코더는 이를 기반으로 출력 문장을 생성합니다.
* RNN, LSTM, GRU 등의 순환 신경망을 활용하며, 최근에는 어텐션 메커니즘과 트랜스포머 모델이 도입되어 더 긴 문맥을 효과적으로 처리할 수 있게 되었습니다.
논문 링크 주소 : https://arxiv.org/pdf/1409.3215
3-1. 인코더
* 인코더는 입력 시퀀스를 읽고 중요한 정보를 하나의 고정된 크기의 벡터(컨텍스트 벡터, Context Vector)로 변환하는 역할을 합니다.
* 쉽게 말하면, 입력 문장을 하나의 "압축된 의미"로 바꿔주는 역할을 합니다.
1. 입력 시퀀스를 처리
예를 들어, 한글 문장 "나는 학교에 간다"를 영어로 번역한다고 가정하면,
인코더는 문장을 단어 단위 또는 문자 단위로 받아들입니다.
2. 순환 신경망(RNN, LSTM, GRU) 처리
인코더는 RNN 계열(LSTM, GRU 등) 구조를 사용하여, 입력 단어를 하나씩 처리하면서 숨겨진 상태(hidden state)를 업데이트합니다.
3. 최종 상태 저장모든 단어를 처리한 후, 마지막 숨겨진 상태가 만들어지는데, 이를 컨텍스트 벡터(Context Vector)라고 합니다.
이 컨텍스트 벡터에는 입력 시퀀스의 중요한 정보가 압축되어 있습니다.
3-2. 디코더
* 디코더는 인코더가 생성한 컨텍스트 벡터를 받아서, 원하는 출력 시퀀스를 생성하는 역할을 합니다.
* 즉, 인코더가 "기억한 내용"을 바탕으로 새로운 문장을 생성하는 과정입니다.
1. 컨텍스트 벡터를 받음
* 디코더는 인코더에서 생성된 컨텍스트 벡터를 첫 번째 입력으로 받습니다.
2. 출력 시퀀스 생성
* 디코더는 한 단어씩 출력을 예측하면서, 이전에 생성된 단어를 다음 입력으로 사용합니다.
* 예를 들어, "나는 학교에 간다"라는 입력을 받았다면, 디코더는 순차적으로 "I" → "go" → "to" → "school"을 생성합니다.
3. RNN 계열 사용 디코더도 LSTM 또는 GRU와 같은 RNN 계열 모델을 사용하여 문장을 생성합니다.
4. 종료 토큰(EOS, End of Sequence)디코더는 문장이 끝났음을 알리는 종료 토큰(EOS)을 생성할 때까지 출력을 계속합니다.
> 시퀀스 투 시퀀스(Seq2Seq) 모델은 자연어 처리에서 널리 사용되지만 몇 가지 단점이 있습니다.
* 먼저, 인코더가 입력을 하나의 고정된 컨텍스트 벡터로 변환하기 때문에 긴 문장에서 정보 손실이 발생할 수 있으며, 이는 성능 저하로 이어집니다.
* 또한, RNN 기반 구조는 장기 의존성 문제로 인해 문장이 길어질수록 초반과 후반 단어 간의 관계를 잘 학습하지 못하는 한계가 있습니다.
* 병렬 처리가 어려워 속도가 느리고, 디코더가 단어를 순차적으로 생성해야 하므로 생성 속도가 느리며 예측 오류가 이후 결과에 영향을 미치는 오토리그레시브 문제도 존재합니다.
* 마지막으로, 훈련 데이터에 의존적이어서 새로운 도메인이나 문장에서 일반화가 어렵습니다.
* 이를 해결하기 위해 어텐션 기법, 트랜스포머 모델, 사전 훈련된 대규모 언어 모델(GPT, BERT 등)이 등장하며 자연어 처리 기술이 발전하고 있습니다.
예시 1)
import os
import requests
import zipfile
import torch
import torch.nn as nn
import torch.optim as optim
import random
import re
import unicodedata
from torch.utils.data import Dataset, DataLoader
from collections import Counter
from torch.nn.utils.rnn import pad_sequence
예시 2)
# 데이터 다운로드 및 압축 해제
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
예시 3)
import requests # requests 라이브러리를 가져옴 (HTTP 요청을 보내기 위해 필요)
# 주어진 URL에서 ZIP 파일을 다운로드하여 지정된 경로에 저장하는 함수
def download_zip(url, output_path):
response = requests.get(url, headers=headers, stream=True) # HTTP GET 요청을 보냄 (스트리밍 방식 사용)
if response.status_code == 200: # HTTP 응답 코드가 200(성공)일 경우
with open(output_path, 'wb') as f: # 지정된 경로에 바이너리 쓰기 모드('wb')로 파일을 엶
for chunk in response.iter_content(chunk_size=8192): # 응답 데이터를 8192바이트씩 반복하여 읽음
f.write(chunk) # 읽어온 데이터를 파일에 씀
print(f"ZIP file downloaded to {output_path}") # 다운로드 완료 메시지 출력
else:
print(f"Failed to download. HTTP Response Code: {response.status_code}") # 실패 메시지 출력
예시 4)
# url을 이용하여 다운
url = "http://www.manythings.org/anki/fra-eng.zip"
output_path = "fra-eng.zip"
download_zip(url, output_path)
예시 5)
# zipfile의 다운된 자기경로
path = os.getcwd()
path
zipfilename = os.path.join(path, output_path)
zipfilename
예시 6)
# ZIP 파일 압축 해제
with zipfile.ZipFile(zipfilename, 'r') as zip_ref: # ZIP 파일을 읽기 모드('r')로 엶
zip_ref.extractall(path) # ZIP 파일을 현재 작업 디렉토리에 압축 해제
print("ZIP file extracted to:", path) # 압축 해제 완료 메시지 출력
예시 7)
# 데이터 로드 및 전처리
# unicodedata.normalize('NFD', s) : 문자열을 NFD 형식으로 변환하는데
# 프랑스어 악센트(accent) 삭제 예시 : 'déjà diné' -> deja dine
# Mn(Mark, non-spacing) : 악센트와 같은 다이어크리틱 기호를 포함하는 문자
# 결론적으로 악센트를 제거하는 함수
def unicode_to_ascii(s):
return ''.join(c for c in unicodedata.normalize('NFD', s) if unicodedata.category(c) != 'Mn')
예시 8)
# Céline, Français을 악센트 제거된 후 출력
unicode_to_ascii('Céline, Français')
-->
Celine, Francais
예시 9)
# 문자열을 정규화하는 함수 (알파벳과 특정 기호만 유지)
# 모든 문자를 소문자로 변환하고 공백을 제거
# 알파벳(a-z, A-Z)과 특수기호(.!?)만 유지하고 나머지는 공백으로 대체
def normalize_string(s):
s = unicode_to_ascii(s.lower().strip()) # 문자열을 소문자로 변환하고 공백 제거
s = re.sub(r"[^a-zA-Z.!?]+", " ", s) # 알파벳과 특정 특수문자를 제외한 모든 문자를 공백으로 변경
return s
예시 10)
# 테스트 실행
normalize_string("J’aime le café!") # j aime le cafe!
예시 11)
normalize_string('C’est une bonne idée.') # c est une bonne idee.
예시 12)
# 파일에서 문장 쌍 데이터를 로드하는 함수
def load_data(filepath, num_samples=50000):
with open(filepath, encoding='utf-8') as f: # UTF-8 인코딩으로 파일을 엶
lines = f.read().strip().split("\n") # 파일의 모든 줄을 읽고 개행 문자를 기준으로 분할
# 각 줄을 탭 문자(\t)로 나누어 첫 번째와 두 번째 항목을 정규화하여 저장
pairs = [[normalize_string(s) for s in l.split('\t')[:2]] for l in lines[:num_samples]]
return pairs
예시 13)
# 파일 경로 설정 및 데이터 로드
file_path = os.path.join(path, 'fra.txt') # 현재 디렉토리에 있는 'fra.txt' 파일 경로 생성
pairs = load_data(file_path, num_samples=20000) # 20,000개의 문장 쌍을 로드
print(f"Loaded {len(pairs)} sentence pairs.") # 로드된 문장 쌍 개수 출력
예시 14)
pairs[:10] # 10개의 데이터 출력
-->
[['go.', 'va !'],
['go.', 'marche.'],
['go.', 'en route !'],
['go.', 'bouge !'],
['hi.', 'salut !'],
['hi.', 'salut.'],
['run!', 'cours !'],
['run!', 'courez !'],
['run!', 'prenez vos jambes a vos cous !'],
['run!', 'file !']]
예시 15)
# 단어 사전(Language Vocabulary) 생성 클래스
# <SOS>: 문장의 시작 토큰
# <EOS>: 문장의 끝 토큰
# <PAD>: 패딩 토큰 (길이 맞추기용)
class Lang:
def __init__(self):
self.word2index = {'<SOS>': 0, '<EOS>': 1, '<PAD>': 2} # 단어 → 인덱스 매핑
self.index2word = {0: '<SOS>', 1: '<EOS>', 2: '<PAD>'} # 인덱스 → 단어 매핑
self.word_count = Counter() # 단어 빈도수 저장
def add_sentence(self, sentence):
"""문장에서 단어를 추출하여 단어 빈도수를 증가시키는 함수"""
for word in sentence.split(): # 문장을 공백 기준으로 단어 분리
self.word_count[word] += 1 # 단어 빈도수 증가
def build_vocab(self, min_count=2):
"""빈도수가 min_count 이상인 단어만 사전에 추가"""
for word, count in self.word_count.items():
if count >= min_count: # 최소 빈도수를 만족하는 경우
index = len(self.word2index) # 현재 단어 사전의 크기를 인덱스로 사용
self.word2index[word] = index # 단어 → 인덱스 매핑 추가
self.index2word[index] = word # 인덱스 → 단어 매핑 추가
def sentence_to_indexes(self, sentence):
"""문장을 단어 인덱스 리스트로 변환하는 함수"""
return [self.word2index.get(word, self.word2index['<PAD>']) for word in sentence.split()] # 단어가 없으면 <PAD> 사용
예시 16)
# 단어 사전(Language Vocabulary) 생성 및 테스트
input_lang = Lang() # 새로운 Lang 객체 생성
input_lang.add_sentence('Hello World hello ai machine Ai ai hello') # 문장 추가하여 단어 빈도수 계산
input_lang.build_vocab(2) # 최소 2번 이상 등장한 단어만 추가
# 생성된 단어 사전 출력
print(input_lang.word2index) # {'<SOS>': 0, '<EOS>': 1, '<PAD>': 2, 'hello': 3, 'ai': 4}
--->
{'<SOS>': 0, '<EOS>': 1, '<PAD>': 2, 'hello': 3, 'ai': 4}
예시 17)
# 문장을 단어 인덱스 리스트로 변환
tokens = input_lang.sentence_to_indexes('hello ai world') # 단어를 사전의 인덱스로 변환
print(tokens) # [3, 4, 2]
예시 18)
# 언어 모델을 위한 Lang 클래스의 인스턴스 생성
input_lang = Lang() # 입력(소스) 언어를 위한 Lang 객체
target_lang = Lang() # 출력(타겟) 언어를 위한 Lang 객체
# 병렬 문장 쌍을 사용하여 단어장 구축
for src, tgt in pairs:
input_lang.add_sentence(src) # 소스 언어 문장을 단어장에 추가
target_lang.add_sentence(tgt) # 타겟 언어 문장을 단어장에 추가
# 단어장 구축 (단어를 인덱스로 매핑하고 고유한 단어 리스트 생성)
input_lang.build_vocab() # 소스 언어 단어장 구축
target_lang.build_vocab() # 타겟 언어 단어장 구축
예시 19)
# 데이터셋 및 DataLoader 생성
# 병렬 문장 쌍을 이용하여 데이터셋을 생성하는 클래스
class TranslationDataset(Dataset):
def __init__(self, pairs, input_lang, target_lang, max_length=20):
self.pairs = pairs # 소스-타겟 문장 쌍 리스트
self.input_lang = input_lang # 소스 언어 단어장 객체
self.target_lang = target_lang # 타겟 언어 단어장 객체
self.max_length = max_length # 최대 문장 길이
def __len__(self): # 데이터셋의 전체 샘플 개수를 반환
return len(self.pairs)
# 인덱스에 해당하는 소스-타겟 문장을 텐서 형태로 반환
def __getitem__(self, idx):
src, tgt = self.pairs[idx]
src_idx = self.input_lang.sentence_to_indexes(src)[:self.max_length] + [self.input_lang.word2index['<EOS>']]
tgt_idx = self.target_lang.sentence_to_indexes(tgt)[:self.max_length] + [self.target_lang.word2index['<EOS>']]
return torch.tensor(src_idx), torch.tensor(tgt_idx)
예제 20)
# DataLoader에서 batch 데이터를 패딩하여 반환하는 collate 함수
# DataLoader에서 batch 단위로 데이터를 가져올 때 길이를 맞추기 위해 패딩을 적용하는 함수
def collate_fn(batch):
src_batch, tgt_batch = zip(*batch) # batch 개수의 (src_tensor, tgt_tensor) 튜플형태
src_batch = pad_sequence(src_batch, batch_first=True, padding_value=input_lang.word2index['<PAD>']) # 소스 문장 패딩
tgt_batch = pad_sequence(tgt_batch, batch_first=True, padding_value=input_lang.word2index['<PAD>']) # 타겟 문장 패딩
return src_batch, tgt_batch
예제 21)
# 데이터셋 및 DataLoader 생성
dataset = TranslationDataset(pairs, input_lang, target_lang) # 번역 데이터셋 생성
dataloader = DataLoader(dataset, batch_size=64, shuffle=True, collate_fn=collate_fn) # DataLoader로 배치 단위로 데이터 로딩
예제 22)
# 인코더 모델 정의
class Encoder(nn.Module):
def __init__(self, input_size, embedding_size, hidden_size, num_layers=2, dropout=0.3):
super().__init__()
self.embedding = nn.Embedding(input_size, embedding_size)
self.rnn = nn.GRU(embedding_size, hidden_size, num_layers, dropout=dropout, bidirectional=True, batch_first=True)
self.fc = nn.Linear(hidden_size * 2, hidden_size)
# 순전파 수행
# x: 입력 텐서 (배치 크기, 시퀀스 길이)
def forward(self, x):
embedded = self.embedding(x) # (batch_size, seq_length, embedding_size)
outputs, hidden = self.rnn(embedded) # (batch_size, seq_length, hidden_size*2)
# 마지막 두 개의 은닉 상태 hidden[-2, :, :], hidden[-1, :, :]를 이어 붙임
# torch.cat(..., dim=1) -> (batch_size, hidden_size * 2) -> 값의 범위를 조정하여 안정적으로 학습
hidden = torch.tanh(self.fc(torch.cat((hidden[-2, :, :], hidden[-1, :, :]), dim=1)))
return hidden.unsqueeze(0).repeat(2, 1, 1) # 인코더의 최종 은닉 상태 반환
예제 23)
# 디코더 모델 정의
class Decoder(nn.Module):
def __init__(self, output_size, embedding_size, hidden_size, num_layers=2, dropout=0.3):
super().__init__()
self.embedding = nn.Embedding(output_size, embedding_size)
self.rnn = nn.GRU(embedding_size, hidden_size, num_layers, dropout=dropout, batch_first=True)
self.fc = nn.Linear(hidden_size, output_size)
# 순전파 수행
def forward(self, x, hidden):
x = x.unsqueeze(1) # (batch_size, 1)
embedded = self.embedding(x) # (batch_size, 1, embedding_size)
output, hidden = self.rnn(embedded, hidden)
prediction = self.fc(output.squeeze(1)) # (batch_size, output_size)
return prediction, hidden
예제 24)
# 모델 학습 함수
# 인코더-디코더 모델 학습을 위한 함수
def train(encoder, decoder, dataloader, optimizer, criterion, device, num_epochs=50, teacher_forcing_ratio=0.3):
for epoch in range(num_epochs):
total_loss = 0
for src, tgt in dataloader:
src, tgt = src.to(device), tgt.to(device)
optimizer.zero_grad()
encode_hidden = encoder(src) # 인코더를 통해 입력 문장을 은닉 상태로 변환
decoder_input = torch.tensor([target_lang.word2index['<SOS>']] * src.shape[0], device=device) # 디코더의 초기 입력
decoder_hidden = encode_hidden # 인코더의 최종 은닉 상태를 디코더의 초기 은닉 상태로 사용
loss = 0
for t in range(tgt.shape[1]):
output, decoder_hidden = decoder(decoder_input, decoder_hidden) # 디코더 실행
loss += criterion(output, tgt[:, t]) # 출력과 타겟 비교하여 손실 계산
teacher_force = random.random() < teacher_forcing_ratio # 일정 확률로 정답을 다음 입력으로 사용
decoder_input = tgt[:, t] if teacher_force else output.argmax(1)
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f'Epoch {epoch+1}, Loss: {total_loss / len(dataloader)}')
예제 25)
# 모델 학습 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # GPU 사용 여부 확인
encoder = Encoder(len(input_lang.word2index), 512, 512).to(device) # 인코더 모델 초기화 및 GPU 이동
decoder = Decoder(len(target_lang.word2index), 512, 512).to(device) # 디코더 모델 초기화 및 GPU 이동
optimizer = optim.Adam(list(encoder.parameters()) + list(decoder.parameters()), lr=0.0005) # Adam 옵티마이저 설정
criterion = nn.CrossEntropyLoss() # 손실 함수로 교차 엔트로피 사용
예제 26)
# 모델 학습 실행
train(encoder, decoder, dataloader, optimizer, criterion, device, num_epochs=50) # 50 에포크 동안 학습 진행
예제 27)
# 번역 테스트 함수
def translate_sentence(sentence, encoder, decoder, input_lang, target_lang, device, max_length=20):
encoder.eval()
decoder.eval()
with torch.no_grad():
src_idx = input_lang.sentence_to_indexes(sentence) + [input_lang.word2index['<EOS>']] # 입력 문장을 인덱스로 변환
src_tensor = torch.tensor(src_idx, device=device).unsqueeze(0) # 텐서 변환 후 배치 차원 추가
encoder_hidden = encoder(src_tensor) # 인코더 실행하여 은닉 상태 획득
decoder_input = torch.tensor([target_lang.word2index['<SOS>']], device=device) # 디코더 시작 토큰 설정
decoder_hidden = encoder_hidden # 인코더의 은닉 상태를 디코더 초기 은닉 상태로 사용
translated_sentence = []
for _ in range(max_length):
output, decoder_hidden = decoder(decoder_input, decoder_hidden) # 디코더 실행
top_word_idx = output.argmax(1).item() # 가장 확률이 높은 단어 인덱스 선택
if top_word_idx == target_lang.word2index['<EOS>']: # 종료 토큰을 만나면 번역 종료
break
translated_sentence.append(target_lang.index2word[top_word_idx]) # 번역 결과 저장
decoder_input = torch.tensor([top_word_idx], device=device) # 다음 입력 설정
return ' '.join(translated_sentence) # 번역된 문장 반환
예제 28)
# 번역 실행 및 결과 출력
print(translate_sentence("hello how are you", encoder, decoder, input_lang, target_lang, device)) # 입력 문장을 번역하여 출력
-->
comment vas tu ?
예제 29)
# 여러 문장 번역 테스트
test_sentences = [
"good morning",
"i love you",
"where are you?",
"what is your name?",
"this is a pen"
]
for sentence in test_sentences:
translation = translate_sentence(sentence, encoder, decoder, input_lang, target_lang, device) # 문장 번역 수행
print(f"Input: {sentence}\nTranslated: {translation}\n") # 원본 문장과 번역 결과 출력
--->
Input: good morning
Translated: bon <PAD> !
Input: i love you
Translated: je vous aime aime.
Input: where are you?
Translated: ou etes vous ?
Input: what is your name?
Translated: quel est votre <PAD> ?
Input: this is a pen
Translated: c est une pied de <PAD>
'자연어 처리 > 자연어 처리' 카테고리의 다른 글
11. 트랜스포머 (2) | 2025.02.12 |
---|---|
10. 어텐션 메커니즘 (1) | 2025.02.12 |
7. LSTM과 GRU (0) | 2025.02.10 |
6-2. RNN 활용 (0) | 2025.02.10 |
6. RNN(Recurrent Neural Network, RNN) (0) | 2025.02.06 |