낼름낼름 동동이

뷰티 쇼츠 광고 조회수 예측 2: 음원 데이터 수집 본문

데이터분석/프로젝트

뷰티 쇼츠 광고 조회수 예측 2: 음원 데이터 수집

인죠인간 2024. 6. 15. 15:43

 

음원 데이터 수집

유튜브 영상 ID만 있다면 메타 데이터는 youtube data api를 사용해서 수집 가능하며, 시청각 데이터는 pytube, youtube_DL 패키지와 librosa 패키지를 활용하면 수집하여 전처리도 가능해진다.

1. 환경 설치 + Import

# !pip install pytube
# !pip install pydub 
# 두가지 활용해보고 youtube-dl이 가장 에러 없이 처리가 되어 이렇게 진행

!pip install ffmpeg
!pip3 install youtube-dl
!pip3 install librosa

!pip install matplotlib
!pip3 install soundfilex

from yt_dlp import YoutubeDL

import time
import pandas as pd
import numpy as np
import csv
import matplotlib.pyplot as plt
import os

1. 음원(WAV형식) 데이터 받기

# 메타데이터, youtube_id가 저장된 파일 불러오기
from google.colab import drive
drive.mount('/content/drive')

data = pd.read_csv('data.csv')
#다운 받을 음원 형식
audio_format = 'wav'

# 다운 받을 폴더
output_folder = 'OUTPUT'

max_retries = 3  # 최대 재시도 횟수
retry_delay = 5  # 재시도 사이의 대기 시간 (초)

ffmpeg_path = '/opt/homebrew/bin/ffmpeg'  # FFmpeg 설치 경로
cookies_file = 'path/to/your/cookies.txt'  # Cookies file path

## 폴더 이름이 없었다면 만들기
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

### 유튜브 음원 파일 다운로드 함수
def download_audio(video_id):
    ydl_opts = {
        'format': 'bestaudio/best',
        'outtmpl': os.path.join(output_folder, f'{video_id}.%(ext)s'),
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': audio_format,
            'preferredquality': '192',
        }],
        'ffmpeg_location': ffmpeg_path,  # FFmpeg 설치 경로로 변경
        'cookies': cookies_file,  # Path to cookies file
        'retries': max_retries,  # 최대 재시도 횟수 설정
        'http_chunk_size': 10485760,  # 10 MB per chunk
        'timeout': 60  # 타임아웃 설정
    }
    video_url = f"<https://www.youtube.com/watch?v={video_id}>"
    retry = 0
    while retry < max_retries:
        try:
            with YoutubeDL(ydl_opts) as ydl:
                ydl.download([video_url])
            return os.path.join(output_folder, f'{video_id}.{audio_format}')
        except Exception as e:
            print(f"Error downloading {video_id}: {e}")
            retry += 1
            if retry < max_retries:
                print(f"Retrying in {retry_delay} seconds...")
                time.sleep(retry_delay)
            else:
                print(f"Failed to download {video_id} after {max_retries} attempts")
                return None

2. 유튜버의 목소리만 분리

  • 목소리와 노래는 분리하기 위해서 파이썬의 Spleeter를 사용하였다.
  • Spleeter는 목소리와 다른 반주 사운드로 분리하는 AI 패키지
!pip install spleeter
from spleeter.separator import Separator
import shutil

# Spleeter 초기화 (2stems 모델 사용)
separator = Separator('spleeter:2stems')

# 보컬파일만 저장할 폴더를 설정
vocal_directory = 'VOCAL_OUTPUT'
os.makedirs(vocal_directory, exist_ok=True)

# 각 파일에 대해 Spleeter 실행 및 보컬 파일 저장
for video_id in data.loc[:, 'video_id']:
    file_path = f'ORIGIN_DIRECTORY/{video_id}.wav'
    # 파일 이름 추출
    base_name = os.path.basename(file_path)
    print(base_name)
    name_without_extension = os.path.splitext(base_name)[0]
    print(name_without_extension)
    # Spleeter 실행
    separator.separate_to_file(file_path, 'OUTPUT_DIRECTORY')

    # 분리된 보컬 파일 이름 변경 및 복사
    vocal_file = os.path.join(f'OUTPUT_DIRECTORY/{name_without_extension}', 'vocals.wav')
    new_vocal_file_name = f'{name_without_extension}_vocals.wav'
    shutil.copy(vocal_file, os.path.join(vocal_directory, new_vocal_file_name))

# 보컬 파일 압축
shutil.make_archive(vocal_directory, 'zip', vocal_directory)

print(f"파일 저장 완료: {vocal_directory}.zip")

3. 목소리만 추출된 파일의 Feature 특징 추출

  • 목소리만 추출되었으므로 이 목소리의 매력도를 측정해줄 정형화된 데이터가 필요했다.
  • 우선 데이터를 뽑을 때는 사용할 수 있을 데이터를 모두 최대한 뽑아냈으며, 이 중 목소리의 매력도라 할 수 있을 음도(f0_mean), 음도편차(f0_var_mean), 빠르기(tempo)를 사용하여 최종 feature로 세팅하였다.
from scipy import stats
from scipy.stats import skew, kurtosis
import librosa
import numpy as np
import pandas as pd

# 파일 저장된 음원 데이터 열어서 특징 분석
def process_audio(file_path):
    # Load audio file
    y, sr = librosa.load(file_path, sr=None)

    # Extract features 필요 특성 추출
    mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
    chroma_stft = librosa.feature.chroma_stft(y=y, sr=sr)
    spectral_centroid = librosa.feature.spectral_centroid(y=y, sr=sr)
    spectral_rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr)
    tempo, _ = librosa.beat.beat_track(y=y, sr=sr)

    # 하모닉, 퍼커시브 이펙트 추출
    y_harmonic, y_percussive = librosa.effects.hpss(y)

    # 기본 주파수 추출
    f0, voiced_flag, voiced_probs = librosa.pyin(y, fmin=librosa.note_to_hz('C2'), fmax=librosa.note_to_hz('C7'))
    f0_filtered = np.where((f0 >= 80) & (f0 <= 300), f0, np.nan)

    # Short-time Fourier transform
    D = librosa.stft(y)
    S, phase = librosa.magphase(D)
    magnitude = np.abs(S)

    # Spectral bandwidth
    bandwidths = librosa.feature.spectral_bandwidth(S=S, sr=sr)
    bandwidths_series = pd.Series(bandwidths.flatten())

    # Statistical description of the bandwidth
    description = bandwidths_series.describe()
    data_skewness = skew(bandwidths_series)
    data_kurtosis = kurtosis(bandwidths_series)

    # 전부 딕셔너리화 하여 return
    data = {
        'file_path': file_path,
        'mfcc_mean': np.mean(mfccs, axis=1).tolist(),
        'mfcc_var': np.var(mfccs, axis=1).tolist(),
        'chroma_mean': np.mean(chroma_stft, axis=1).tolist(),
        'chroma_var': np.var(chroma_stft, axis=1).tolist(),
        'spectral_centroid_mean': np.mean(spectral_centroid),
        'spectral_centroid_var': np.var(spectral_centroid),
        'spectral_rolloff_mean': np.mean(spectral_rolloff),
        'spectral_rolloff_var': np.var(spectral_rolloff),
        'tempo': tempo,
        'f0_mean': np.nanmean(f0_filtered),
        'f0_var': np.nanvar(f0_filtered),
        'bandwidth_mean': description['mean'],
        'bandwidth_std': description['std'],
        'bandwidth_min': description['min'],
        'bandwidth_25%': description['25%'],
        'bandwidth_50%': description['50%'],
        'bandwidth_75%': description['75%'],
        'bandwidth_max': description['max'],
        'bandwidth_skewness': data_skewness,
        'bandwidth_kurtosis': data_kurtosis
    }
    return data

보컬 특징 추출 전처리 함수

## 총 완료한 결과를 저장하기 위한 list 생성
results = []
## 완료한 개수를 파악하기 위해 변수
count = 0

## 에러가 발생하면 어떤 videoID에서 몇개정도 오류가 생기는지 체크 -> 이런건 노가다로 따로 수집하기 위해!
error = []
error_count = 0
for video in data.loc[:, "videoId"]:
    file_path = f'./beauty_promotion/vocal_file/{video}_vocals.wav'

    if os.path.exists(file_path):
        result = process_audio(file_path)
        results.append(result)
        print(count)
        count+=1
    else:
        error.append(video)
        error_count +=1
        print("video file_path_error",video)
        pass

# 실행
voice = pd.DataFrame(results)