Source code for AdDownloader.media_download

"""This module provides the functionality of media content download of the AdDownloader using Selenium."""

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from selenium.common.exceptions import NoSuchElementException
from selenium.common.exceptions import TimeoutException
import requests
import os
import cv2
from AdDownloader.helpers import configure_logging, close_logger

chrome_opts = Options()
chrome_opts.add_argument("--disable-gpu")
chrome_opts.add_argument("--no-sandbox")
chrome_opts.add_argument("--enable-unsafe-swiftshader")
chrome_opts.add_argument("--log-level=4") # suppress logs
chrome_opts.add_argument("--disable-notifications") 

[docs] def download_media(media_url, media_type, ad_id, media_folder): """ Download media content for an ad given its ID. :param media_url: The url address for accessing the media content. :type media_url: str :param media_type: The type of the media content to download, can be 'image' or 'videos'. :type media_type: str :param ad_id: The ID of the ad for which media content is downloaded. :type ad_id: str :param media_folder: The path to the folder where media content will be saved. :type media_folder: str """ try: response = requests.get(media_url, stream=True) response.raise_for_status() # catch any status error # determine the path based on the media type - also change the folder here if media_type == 'image': file_path = f"{media_folder}/ad_{ad_id}_img.png" elif media_type == 'video': file_path = f"{media_folder}/ad_{ad_id}_video.mp4" else: print("Wrong media type.") # save the media file with open(file_path, 'wb') as media_file: media_file.write(response.content) print(f"{media_type} of ad with id {ad_id} downloaded successfully to {file_path}") # catch any possible exceptions except requests.exceptions.RequestException as e: print(f"Error during the request: {e}") except IOError as e: print(f"IOError during file write: {e}") except Exception as e: print(f"An unexpected error occurred: {e}")
[docs] def accept_cookies(driver): """ Accept the cookies in a running Chrome webdriver. Only needs to be done once, when openning the webdriver. :param driver: A running Chrome webdriver. :type driver: webdriver.Chrome """ # accept the cookies if needed try: # wait up to 20 seconds for the accept cookies element to be present cookies = WebDriverWait(driver, 20).until( EC.presence_of_element_located((By.XPATH, '//*[@id="facebook"]/body/div[3]/div[2]/div/div/div/div/div[3]/div[2]/div/div[2]/div[1]')) ) cookies.click() print("Cookies accepted.") except NoSuchElementException: print("Cookies already accepted.") except TimeoutException: print("Cookie dialog not found; maybe already dismissed.") except Exception as e: print(f"An unexpected error occurred: {e}")
[docs] def start_media_download(project_name, nr_ads, data=[]): """ Start media content download for a given project and desired number of ads. The ads media are saved in the output folder with the project_name. :param project_name: The name of the current project. :type project_name: str :param nr_ads: The desired number of ads for which media content should be downloaded. :type nr_ads: int :param data: A dataframe containing an `ad_snapshot_url` column. :type data: pandas.DataFrame """ # configure logger logger = configure_logging(project_name) # check if data was provided if data is None or len(data) == 0: logger.error("No data was provided for media download. Please try again.") return(print("No data was provided for media download. Please try again.")) # check if the nr of ads to download is within the length of the data if nr_ads > len(data): print(f'More ad media requested than available in the data. Downloading the maximum number ({len(data)}).') logger.warning(f'More ads requested than available in the data. Downloading the maximum number ({len(data)}).') nr_ads = len(data) print(f"Downloading media content for project {project_name}.") logger.info(f'Downloading media content for project {project_name}.') nr_ads_processed = 0 nr_ads_failed = 0 # initialize folders for the images and videos of current category folder_path_img = f"output/{project_name}/ads_images" folder_path_vid = f"output/{project_name}/ads_videos" # check if the folders exist if not os.path.exists(folder_path_img): os.makedirs(folder_path_img) if not os.path.exists(folder_path_vid): os.makedirs(folder_path_vid) # define some constants for the xpaths img_xpath_1 = '//*[@id="content"]/div/div/div/div/div/div/div/div[2]/a/div[1]/img' img_xpath_2 = '//*[@id="content"]/div/div/div/div/div/div/div[2]/div[2]/img' video_xpath_1 = '//*[@id="content"]/div/div/div/div/div/div/div[2]/div[2]/video' video_xpath_2 = '//*[@id="content"]/div/div/div/div/div/div/div/div[2]/div[2]/div/div/div/div/video' multpl_img_xpath = '//*[@id="content"]/div/div/div/div/div/div/div/div[3]/div/div[2]/div/div/div[{}]/div/div/a/div[1]/img' # sample the nr_ads data = data.sample(nr_ads) data = data.reset_index(drop=True) # start the downloads here, accept cookies driver = webdriver.Chrome( service = Service(ChromeDriverManager().install()), options = chrome_opts, ) driver.get(data['ad_snapshot_url'][0]) # start from here to accept cookies accept_cookies(driver) # for each ad in the dataset download the media for i in range(0, nr_ads): #TODO: randomize the ads to download # get the target ad success = False driver.get(data['ad_snapshot_url'][i]) try: # first try to get the img using first xpath img_element = driver.find_element(By.XPATH, img_xpath_1) # if it's found, get its url and download it media_url = img_element.get_attribute('src') media_type = 'image' download_media(media_url, media_type, str(data['id'][i]), folder_path_img) nr_ads_processed += 1 success = True except NoSuchElementException: try: # otherwise try the second xpath img_element = driver.find_element(By.XPATH, img_xpath_2) # if it's found, get its url and download it media_url = img_element.get_attribute('src') media_type = 'image' download_media(media_url, media_type, str(data['id'][i]), folder_path_img) nr_ads_processed += 1 success = True except NoSuchElementException: pass try: # if it's not an image, try to find the video with first xpath video_element = driver.find_element(By.XPATH, video_xpath_2) # if it's found, get its url and download it media_url = video_element.get_attribute('src') media_type = 'video' download_media(media_url, media_type, str(data['id'][i]), folder_path_vid) nr_ads_processed += 1 success = True except NoSuchElementException: try: # otherwise try the second xpath video_element = driver.find_element(By.XPATH, video_xpath_1) # if it's found, get its url and download it media_url = video_element.get_attribute('src') media_type = 'video' download_media(media_url, media_type, str(data['id'][i]), folder_path_vid) nr_ads_processed += 1 success = True except NoSuchElementException: pass try: # check if there is more than one image # determine the number of images on the page image_count = len(driver.find_elements(By.XPATH, multpl_img_xpath.format('*'))) if image_count > 0: print(f'{image_count} media content found. Trying to retrieve all of them.') # iterate over the images and download each one for img_index in range(1, image_count + 1): multpl_img_element = driver.find_element(By.XPATH, multpl_img_xpath.format(img_index)) media_url = multpl_img_element.get_attribute('src') media_type = 'image' download_media(media_url, media_type, f"{str(data['id'][i])}_{img_index}", folder_path_img) nr_ads_processed += 1 success = True except NoSuchElementException: pass if not success: nr_ads_failed += 1 print(f"No media were downloaded for ad {data['id'][i]}.") logger.error(f"No media were downloaded for ad {data['id'][i]}") if (i+1)/nr_ads == 0.25: print("===== 25% done =====") elif (i+1)/nr_ads == 0.5: print("===== 50% done =====") elif (i+1)/nr_ads == 0.75: print("===== 75% done =====") print(f'Finished saving media content for {nr_ads_processed} ads for project {project_name}.') logger.info(f'Finished saving media content for {nr_ads_processed} ads for project {project_name}.') logger.info(f'Media failed to download for {nr_ads_failed} ads. Success rate: {nr_ads_processed / nr_ads}') # close the driver once it's done downloading driver.quit() # close the logger close_logger(logger)
[docs] def extract_frames(video, project_name, interval = None, num_frames = None): """ Extract a number of frames from ad videos :param video: The name of the video for which frames should be extracted. :type video: str :param project_name: The name of the current project. :type project_name: str :param interval: The interval between the (in seconds), optional. Should be specified instead of `num_frames`. :type interval: int :param num_frames: The number of frames to extract, distributed evenly, optional. Should be specified instead of the `interval`. :type num_frames: int """ video_path = f"output/{project_name}/ads_videos/{video}" # create a VideoCapture object cap = cv2.VideoCapture(video_path) # check if video opened successfully if not cap.isOpened(): print("Error: Could not open video.") return # get video frame rate fps = cap.get(cv2.CAP_PROP_FPS) frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = frame_count / fps # get the ad id ad_id = os.path.basename(video_path).split('_')[1] frame_dir = f"output/{project_name}/video_frames" if not os.path.exists(frame_dir): os.makedirs(frame_dir) if interval is not None: print(f"Processing {video_path} | FPS: {fps} | Total Frames: {frame_count} | Duration: {duration}s") # read the video and save frames every interval frame_number = 0 while cap.isOpened(): ret, frame = cap.read() if not ret: break # check if the current frame number is the one we want to save if frame_number % (interval * fps) == 0: frame_path = f"{frame_dir}/ad_{ad_id}_frame{frame_number}.png" cv2.imwrite(frame_path, frame) print(f"Saved {frame_path}") frame_number += 1 elif num_frames is not None: frames_to_capture = [(x * frame_count) // (num_frames + 1) for x in range(1, num_frames + 1)] print(f"Processing {video_path} | FPS: {fps} | Total Frames: {frame_count} | Frames to capture: {frames_to_capture}") for frame_number in frames_to_capture: # set the frame position cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) ret, frame = cap.read() if ret: frame_path = f"{frame_dir}/ad_{ad_id}_frame{frame_number}.png" cv2.imwrite(frame_path, frame) print(f"Saved {frame_path}") # release the VideoCapture object cap.release()