ycdl/ycdl/ytapi.py

152 lines
5.3 KiB
Python
Raw Normal View History

2021-10-16 04:00:04 +00:00
import googleapiclient.discovery
import isodate
2021-11-08 02:50:34 +00:00
import typing
2020-11-11 01:53:52 +00:00
from voussoirkit import gentools
2020-11-11 01:53:52 +00:00
from voussoirkit import vlogging
2016-11-29 04:16:16 +00:00
def int_none(x):
if x is None:
return None
return int(x)
2020-07-21 02:03:53 +00:00
class ChannelNotFound(Exception):
pass
2016-12-07 06:11:09 +00:00
class VideoNotFound(Exception):
pass
2016-11-29 04:16:16 +00:00
class Video:
def __init__(self, data):
self.id = data['id']
snippet = data['snippet']
content_details = data['contentDetails']
statistics = data['statistics']
2016-11-29 04:16:16 +00:00
self.title = snippet.get('title', '[untitled]')
self.description = snippet.get('description', '')
2016-11-29 04:16:16 +00:00
self.author_id = snippet['channelId']
self.author_name = snippet.get('channelTitle', self.author_id)
2021-04-04 18:16:44 +00:00
2016-11-29 04:16:16 +00:00
# Something like '2016-10-01T21:00:01'
self.published_string = snippet['publishedAt']
self.published = isodate.parse_datetime(self.published_string).timestamp()
self.live_broadcast = snippet['liveBroadcastContent']
if self.live_broadcast == 'none':
self.live_broadcast = None
self.tags = snippet.get('tags', [])
2016-11-29 04:16:16 +00:00
2021-04-04 18:16:44 +00:00
# Something like 'PT10M25S'
self.duration = isodate.parse_duration(content_details['duration']).seconds
self.views = int_none(statistics.get('viewCount', None))
self.likes = int_none(statistics.get('likeCount', 0))
self.dislikes = int_none(statistics.get('dislikeCount'))
self.comment_count = int_none(statistics.get('commentCount'))
2016-11-29 04:16:16 +00:00
thumbnails = snippet['thumbnails']
2021-08-21 05:59:17 +00:00
ranker = lambda key: thumbnails[key]['width'] * thumbnails[key]['height']
best_thumbnail = max(thumbnails, key=ranker)
2016-11-29 04:16:16 +00:00
self.thumbnail = thumbnails[best_thumbnail]
2017-05-21 20:50:17 +00:00
def __str__(self):
return 'Video:%s' % self.id
2016-11-29 04:16:16 +00:00
class Youtube:
def __init__(self, key):
2021-10-16 04:00:04 +00:00
self.youtube = googleapiclient.discovery.build(
cache_discovery=False,
2016-11-29 04:16:16 +00:00
developerKey=key,
serviceName='youtube',
version='v3',
)
2020-11-11 01:53:52 +00:00
self.log = vlogging.getLogger(__name__)
2016-11-29 04:16:16 +00:00
def _playlist_paginator(self, playlist_id):
2016-11-29 04:16:16 +00:00
page_token = None
while True:
response = self.youtube.playlistItems().list(
2016-11-29 04:16:16 +00:00
maxResults=50,
pageToken=page_token,
part='contentDetails',
2020-03-18 03:53:10 +00:00
playlistId=playlist_id,
2016-11-29 04:16:16 +00:00
).execute()
yield from response['items']
page_token = response.get('nextPageToken', None)
if page_token is None:
2016-11-29 04:16:16 +00:00
break
2021-11-08 02:50:34 +00:00
def get_playlist_videos(self, playlist_id) -> typing.Iterable[Video]:
paginator = self._playlist_paginator(playlist_id)
video_ids = (item['contentDetails']['videoId'] for item in paginator)
videos = self.get_videos(video_ids)
2021-04-01 02:35:13 +00:00
return videos
2021-11-08 02:50:34 +00:00
def get_related_videos(self, video_id, count=50) -> typing.Iterable[Video]:
2017-05-21 20:50:17 +00:00
if isinstance(video_id, Video):
video_id = video_id.id
results = self.youtube.search().list(
part='id',
2017-05-21 20:50:17 +00:00
relatedToVideoId=video_id,
type='video',
maxResults=count,
).execute()
related = [rel['id']['videoId'] for rel in results['items']]
videos = self.get_videos(related)
2017-05-21 20:50:17 +00:00
return videos
2021-11-08 02:50:34 +00:00
def get_user_id(self, username) -> str:
2020-07-01 23:10:48 +00:00
user = self.youtube.channels().list(part='snippet', forUsername=username).execute()
2020-07-21 02:03:53 +00:00
if not user.get('items'):
raise ChannelNotFound(f'username: {username}')
2020-07-01 23:10:48 +00:00
return user['items'][0]['id']
2021-11-08 02:50:34 +00:00
def get_user_name(self, uid) -> str:
2020-07-01 23:10:48 +00:00
user = self.youtube.channels().list(part='snippet', id=uid).execute()
2020-07-21 02:03:53 +00:00
if not user.get('items'):
raise ChannelNotFound(f'uid: {uid}')
2020-07-01 23:10:48 +00:00
return user['items'][0]['snippet']['title']
2021-11-08 02:50:34 +00:00
def get_user_uploads_playlist_id(self, uid) -> str:
2020-07-01 23:10:48 +00:00
user = self.youtube.channels().list(part='contentDetails', id=uid).execute()
2020-07-21 02:03:53 +00:00
if not user.get('items'):
raise ChannelNotFound(f'uid: {uid}')
2020-07-01 23:10:48 +00:00
return user['items'][0]['contentDetails']['relatedPlaylists']['uploads']
2021-11-08 02:50:34 +00:00
def get_user_videos(self, uid) -> typing.Iterable[Video]:
2020-07-01 23:10:48 +00:00
yield from self.get_playlist_videos(self.get_user_uploads_playlist_id(uid))
2021-11-08 02:50:34 +00:00
def get_video(self, video_id) -> Video:
2021-04-01 02:35:13 +00:00
try:
video = next(self.get_videos([video_id]))
return video
except StopIteration:
raise VideoNotFound(video_id) from None
2021-11-08 02:50:34 +00:00
def get_videos(self, video_ids) -> typing.Iterable[Video]:
chunks = gentools.chunk_generator(video_ids, 50)
total_snippets = 0
2016-12-07 06:11:09 +00:00
for chunk in chunks:
self.log.debug('Requesting batch of %d video ids.', len(chunk))
2020-11-11 01:53:52 +00:00
self.log.loud(chunk)
2016-11-29 04:16:16 +00:00
chunk = ','.join(chunk)
data = self.youtube.videos().list(
part='id,contentDetails,snippet,statistics',
id=chunk,
).execute()
2021-04-01 02:35:13 +00:00
snippets = data['items']
self.log.debug('Got %d snippets.', len(snippets))
total_snippets += len(snippets)
2021-04-01 02:35:13 +00:00
self.log.loud(snippets)
for snippet in snippets:
try:
video = Video(snippet)
yield video
except KeyError as exc:
self.log.warning(f'KEYERROR: {exc} not in {snippet}')
self.log.debug('Finished getting a total of %d snippets.', total_snippets)