Téléverser les fichiers vers "/"
This commit is contained in:
parent
a9a1f6aeff
commit
56b2a66c78
1 changed files with 187 additions and 0 deletions
187
main.py
Normal file
187
main.py
Normal file
|
@ -0,0 +1,187 @@
|
|||
#!/usr/bin/python
|
||||
# Title: RadioFrance Podcast Downloader
|
||||
# Author: Rey Joachim, @_@mamot.fr
|
||||
# "A complex solution to a dumb problem"
|
||||
import json
|
||||
import requests
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
# --- SETTINGS ---
|
||||
|
||||
parser = argparse.ArgumentParser(prog="RadioFrance Podcast Downloader")
|
||||
parser.add_argument("-u", "--url", help="The podcast's URL, might work with other media, e.g.: https://www.radiofrance.fr/franceinter/podcasts/[name]")
|
||||
parser.add_argument(
|
||||
"-p",
|
||||
"--page",
|
||||
help="The page you want to download, should be formatted like a-b (e.g for page 1 to 30: 0-30)",
|
||||
)
|
||||
parser.add_argument("--noLimit", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
assert 2 >= len(args.page.split("-")) or 3 > len(
|
||||
args.page.split("-")
|
||||
), "Wrong page formatting, should be formatted like a-b (e.g for page 1 to 30: 0-30)"
|
||||
|
||||
# "remove" recursion limit if needed
|
||||
if args.noLimit:
|
||||
from sys import setrecursionlimit
|
||||
|
||||
setrecursionlimit(10**6)
|
||||
|
||||
# define URL
|
||||
if args.url[-1] == "/":
|
||||
BASE_URL = args.url
|
||||
else:
|
||||
BASE_URL = args.url + "/"
|
||||
URL_END = "__data.json"
|
||||
|
||||
PATH = str(Path().resolve())
|
||||
TOTAL = 1 # avoid div. by 0 error if there is no content.
|
||||
SUCCESS = 0
|
||||
WARN = 0
|
||||
ERR = 0
|
||||
|
||||
# re-set path
|
||||
user_path = str(input(f"Where should the files be downloaded ? [default: {PATH}] "))
|
||||
if len(user_path) > 0:
|
||||
if user_path[-1] != "/":
|
||||
PATH = user_path + "/"
|
||||
else:
|
||||
PATH = user_path
|
||||
|
||||
# --- FUNCTIONS ---
|
||||
|
||||
|
||||
def getPage(p: int) -> dict:
|
||||
"""
|
||||
Fetch le JSON d'une page
|
||||
In: p -> page (int)
|
||||
Out: (dict)
|
||||
"""
|
||||
# only request from p=2 to p=N are valid and [URL]__data.json seem to retun p1 for some reason (but not always...)
|
||||
if p == 1:
|
||||
api_aws = requests.get(BASE_URL + URL_END).json()["nodes"][3]["data"]
|
||||
else:
|
||||
api_aws = requests.get(BASE_URL + URL_END + "?p=" + str(p)).json()["nodes"][3][
|
||||
"data"
|
||||
]
|
||||
return lookForChilds(api_aws[0], api_aws)
|
||||
|
||||
|
||||
def lookForChilds(item: any, un_list: list) -> dict:
|
||||
"""
|
||||
Reconstitue recusivement le JSON.
|
||||
In: item(any), un_list(list)
|
||||
Out: (dict)
|
||||
"""
|
||||
|
||||
if type(item) is dict:
|
||||
tmp = {}
|
||||
for key in item.keys():
|
||||
tmp[key] = lookForChilds(item[key], un_list)
|
||||
return tmp
|
||||
|
||||
if type(item) is list:
|
||||
tmp = []
|
||||
for elem in item:
|
||||
tmp.append(lookForChilds(elem, un_list))
|
||||
return tmp
|
||||
|
||||
if type(item) is int and 0 <= item < len(un_list):
|
||||
tmp = un_list[item]
|
||||
# "break" link, prevent infinite loop but at the cost of information loss (a better method is needed)
|
||||
un_list[item] = str(un_list[item])
|
||||
return lookForChilds(tmp, un_list)
|
||||
|
||||
return item
|
||||
|
||||
|
||||
def extractData(a: int, b: int) -> dict:
|
||||
"""
|
||||
Forme un dictionnaire au format {titre: URL} avec le JSON reconstitué de b - a pages.
|
||||
Si l'objet n'a pas de titre alors, titre = NameError id + indice de l'objet.
|
||||
Si l'objet n'a pas de source alors, source = No sources found.
|
||||
In: a(int), b(int)
|
||||
Out: (dict)
|
||||
"""
|
||||
|
||||
extracted = {}
|
||||
|
||||
for p in range(int(a), int(b)):
|
||||
try: # e.g., si la page n'existe pas (out of range)
|
||||
page = getPage(p)["metadata"]["pagination"]["items"]
|
||||
for i in range(len(page)):
|
||||
# parfois le nom ou la source manque
|
||||
try:
|
||||
for source in page[i]["playerInfo"]["media"]["sources"]:
|
||||
url = source["url"]
|
||||
if source["url"].split(".")[-1] == "mp3":
|
||||
break
|
||||
except TypeError:
|
||||
url = None
|
||||
try:
|
||||
name = f'{page[0]["playerInfo"]["playerMetadata"]["firstLine"]} - {page[i]["playerInfo"]["playerMetadata"]["secondLine"]}'
|
||||
except TypeError:
|
||||
name = f'{page[0]["playerInfo"]["playerMetadata"]["firstLine"]} - NameError P{p}L{i} '
|
||||
extracted[name] = url
|
||||
print(f"[INFO] page {p} loaded")
|
||||
except:
|
||||
print(f"[WARN] page {p} did not return valid data, exiting...")
|
||||
break
|
||||
return extracted
|
||||
|
||||
|
||||
def downloadPocast(info: dict) -> int:
|
||||
"""
|
||||
Télécharge les fichiers contenus dans un dictionnaire au format {titre: URL} dans le dossier courrant.
|
||||
In: info(dict)
|
||||
Out: (int)
|
||||
"""
|
||||
global TOTAL, SUCCESS, WARN, ERR # saved if the function crash
|
||||
rem_k = ""
|
||||
|
||||
try:
|
||||
for key in info.copy():
|
||||
rem_k = key
|
||||
if info[key] is not None:
|
||||
aws = requests.get(info[key])
|
||||
fname = f"{key}.{info[key].split('.')[-1]}"
|
||||
with open(PATH + fname, "wb") as fd:
|
||||
for chunk in aws.iter_content(chunk_size=128):
|
||||
fd.write(chunk)
|
||||
print(f"[INFO] {key} downloaded")
|
||||
SUCCESS += 1
|
||||
else:
|
||||
print(f"[WARN] {key} does not have a source ({info[key]})")
|
||||
WARN += 1
|
||||
info.pop(key)
|
||||
TOTAL += 1
|
||||
return 0
|
||||
except Exception as e:
|
||||
print(f"[ERROR] {e}")
|
||||
info.pop(rem_k) # remove problematic entry
|
||||
TOTAL += 1
|
||||
ERR += 1
|
||||
return 1
|
||||
|
||||
|
||||
# --- CODE ---
|
||||
|
||||
# fetch info
|
||||
print("[INFO] Generating download queue")
|
||||
podcasts = extractData(*args.page.split("-"))
|
||||
|
||||
# download files
|
||||
print(f"[INFO] Downloading file in {PATH}")
|
||||
exit = 1
|
||||
while exit != 0:
|
||||
exit = downloadPocast(podcasts)
|
||||
|
||||
# end
|
||||
print("-- RESULTS --")
|
||||
print(f"Tried to download {TOTAL} files")
|
||||
print(f"x{ERR} errors")
|
||||
print(f"x{WARN} warns")
|
||||
print(f"x{SUCCESS} success")
|
||||
print(f"{int(SUCCESS/TOTAL*100)}% success, {int((WARN + ERR)/TOTAL*100)}% problem")
|
Loading…
Add table
Reference in a new issue