187 lines
5.6 KiB
Python
187 lines
5.6 KiB
Python
#!/usr/bin/python
|
|
# Title: RadioFrance Podcast Downloader
|
|
# Author: Rey Joachim, @_@mamot.fr
|
|
# "A complex solution to a dumb problem"
|
|
import json
|
|
import requests
|
|
import argparse
|
|
from pathlib import Path
|
|
|
|
# --- SETTINGS ---
|
|
|
|
parser = argparse.ArgumentParser(prog="RadioFrance Podcast Downloader")
|
|
parser.add_argument("-u", "--url", help="The podcast's URL, might work with other media, e.g.: https://www.radiofrance.fr/franceinter/podcasts/[name]")
|
|
parser.add_argument(
|
|
"-p",
|
|
"--page",
|
|
help="The page you want to download, should be formatted like a-b (e.g for page 1 to 30: 0-30)",
|
|
)
|
|
parser.add_argument("--noLimit", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
assert 2 >= len(args.page.split("-")) or 3 > len(
|
|
args.page.split("-")
|
|
), "Wrong page formatting, should be formatted like a-b (e.g for page 1 to 30: 0-30)"
|
|
|
|
# "remove" recursion limit if needed
|
|
if args.noLimit:
|
|
from sys import setrecursionlimit
|
|
|
|
setrecursionlimit(10**6)
|
|
|
|
# define URL
|
|
if args.url[-1] == "/":
|
|
BASE_URL = args.url
|
|
else:
|
|
BASE_URL = args.url + "/"
|
|
URL_END = "__data.json"
|
|
|
|
PATH = str(Path().resolve())
|
|
TOTAL = 1 # avoid div. by 0 error if there is no content.
|
|
SUCCESS = 0
|
|
WARN = 0
|
|
ERR = 0
|
|
|
|
# re-set path
|
|
user_path = str(input(f"Where should the files be downloaded ? [default: {PATH}] "))
|
|
if len(user_path) > 0:
|
|
if user_path[-1] != "/":
|
|
PATH = user_path + "/"
|
|
else:
|
|
PATH = user_path
|
|
|
|
# --- FUNCTIONS ---
|
|
|
|
|
|
def getPage(p: int) -> dict:
|
|
"""
|
|
Fetch le JSON d'une page
|
|
In: p -> page (int)
|
|
Out: (dict)
|
|
"""
|
|
# only request from p=2 to p=N are valid and [URL]__data.json seem to retun p1 for some reason (but not always...)
|
|
if p == 1:
|
|
api_aws = requests.get(BASE_URL + URL_END).json()["nodes"][3]["data"]
|
|
else:
|
|
api_aws = requests.get(BASE_URL + URL_END + "?p=" + str(p)).json()["nodes"][3][
|
|
"data"
|
|
]
|
|
return lookForChilds(api_aws[0], api_aws)
|
|
|
|
|
|
def lookForChilds(item: any, un_list: list) -> dict:
|
|
"""
|
|
Reconstitue recusivement le JSON.
|
|
In: item(any), un_list(list)
|
|
Out: (dict)
|
|
"""
|
|
|
|
if type(item) is dict:
|
|
tmp = {}
|
|
for key in item.keys():
|
|
tmp[key] = lookForChilds(item[key], un_list)
|
|
return tmp
|
|
|
|
if type(item) is list:
|
|
tmp = []
|
|
for elem in item:
|
|
tmp.append(lookForChilds(elem, un_list))
|
|
return tmp
|
|
|
|
if type(item) is int and 0 <= item < len(un_list):
|
|
tmp = un_list[item]
|
|
# "break" link, prevent infinite loop but at the cost of information loss (a better method is needed)
|
|
un_list[item] = str(un_list[item])
|
|
return lookForChilds(tmp, un_list)
|
|
|
|
return item
|
|
|
|
|
|
def extractData(a: int, b: int) -> dict:
|
|
"""
|
|
Forme un dictionnaire au format {titre: URL} avec le JSON reconstitué de b - a pages.
|
|
Si l'objet n'a pas de titre alors, titre = NameError id + indice de l'objet.
|
|
Si l'objet n'a pas de source alors, source = No sources found.
|
|
In: a(int), b(int)
|
|
Out: (dict)
|
|
"""
|
|
|
|
extracted = {}
|
|
|
|
for p in range(int(a), int(b)):
|
|
try: # e.g., si la page n'existe pas (out of range)
|
|
page = getPage(p)["metadata"]["pagination"]["items"]
|
|
for i in range(len(page)):
|
|
# parfois le nom ou la source manque
|
|
try:
|
|
for source in page[i]["playerInfo"]["media"]["sources"]:
|
|
url = source["url"]
|
|
if source["url"].split(".")[-1] == "mp3":
|
|
break
|
|
except TypeError:
|
|
url = None
|
|
try:
|
|
name = f'{page[0]["playerInfo"]["playerMetadata"]["firstLine"]} - {page[i]["playerInfo"]["playerMetadata"]["secondLine"]}'
|
|
except TypeError:
|
|
name = f'{page[0]["playerInfo"]["playerMetadata"]["firstLine"]} - NameError P{p}L{i} '
|
|
extracted[name] = url
|
|
print(f"[INFO] page {p} loaded")
|
|
except:
|
|
print(f"[WARN] page {p} did not return valid data, exiting...")
|
|
break
|
|
return extracted
|
|
|
|
|
|
def downloadPocast(info: dict) -> int:
|
|
"""
|
|
Télécharge les fichiers contenus dans un dictionnaire au format {titre: URL} dans le dossier courrant.
|
|
In: info(dict)
|
|
Out: (int)
|
|
"""
|
|
global TOTAL, SUCCESS, WARN, ERR # saved if the function crash
|
|
rem_k = ""
|
|
|
|
try:
|
|
for key in info.copy():
|
|
rem_k = key
|
|
if info[key] is not None:
|
|
aws = requests.get(info[key])
|
|
fname = f"{key}.{info[key].split('.')[-1]}"
|
|
with open(PATH + fname, "wb") as fd:
|
|
for chunk in aws.iter_content(chunk_size=128):
|
|
fd.write(chunk)
|
|
print(f"[INFO] {key} downloaded")
|
|
SUCCESS += 1
|
|
else:
|
|
print(f"[WARN] {key} does not have a source ({info[key]})")
|
|
WARN += 1
|
|
info.pop(key)
|
|
TOTAL += 1
|
|
return 0
|
|
except Exception as e:
|
|
print(f"[ERROR] {e}")
|
|
info.pop(rem_k) # remove problematic entry
|
|
TOTAL += 1
|
|
ERR += 1
|
|
return 1
|
|
|
|
|
|
# --- CODE ---
|
|
|
|
# fetch info
|
|
print("[INFO] Generating download queue")
|
|
podcasts = extractData(*args.page.split("-"))
|
|
|
|
# download files
|
|
print(f"[INFO] Downloading file in {PATH}")
|
|
exit = 1
|
|
while exit != 0:
|
|
exit = downloadPocast(podcasts)
|
|
|
|
# end
|
|
print("-- RESULTS --")
|
|
print(f"Tried to download {TOTAL} files")
|
|
print(f"x{ERR} errors")
|
|
print(f"x{WARN} warns")
|
|
print(f"x{SUCCESS} success")
|
|
print(f"{int(SUCCESS/TOTAL*100)}% success, {int((WARN + ERR)/TOTAL*100)}% problem")
|