listenshare-bot/app/MusicProvider/SpotifyStrategy.py
2025-04-04 19:58:57 +03:00

79 lines
No EOL
3.1 KiB
Python

import base64
import time
import aiohttp
from app.config import config
from app.MusicProvider.Strategy import MusicProviderStrategy
from app.dependencies import get_session, get_session_context
from sqlalchemy import select, update
from app.models import User, Track
creds = base64.b64encode(config.spotify.client_id.encode() + b':' + config.spotify.client_secret.encode()).decode("utf-8")
class SpotifyStrategy(MusicProviderStrategy):
async def handle_token(self):
async with get_session_context() as session:
res = await session.execute(select(User).where(User.id == self.user_id))
user: User = res.scalars().first()
if not user:
return None
if int(time.time()) < user.spotify_refresh_at:
return user.spotify_access_token
token_headers = {
"Content-Type": "application/x-www-form-urlencoded",
'Authorization': 'Basic ' + creds
}
token_data = {
"grant_type": "refresh_token",
"refresh_token": user.spotify_refresh_token
}
async with aiohttp.ClientSession() as session:
resp = await session.post("https://accounts.spotify.com/api/token", data=token_data, headers=token_headers)
resp = await resp.json()
token, expires_in = resp['access_token'], resp['expires_in']
async with get_session_context() as session:
await session.execute(
update(User).where(User.id == self.user_id).values(spotify_access_token=token,
spotify_refresh_at=int(time.time()) + int(expires_in)
)
)
await session.commit()
return token
async def get_tracks(self, token) -> list[Track]:
user_headers = {
'Authorization': 'Bearer ' + token,
'Content-Type': 'application/json'
}
user_params = {
'limit': 5
}
res = []
async with aiohttp.ClientSession() as session:
resp = await session.get('https://api.spotify.com/v1/me/player/recently-played', params=user_params, headers=user_headers)
data = await resp.json()
for i, el in enumerate(data['items']):
track = el['track']
resp = await session.get(f'https://api.spotify.com/v1/albums/{track['album']['id']}', headers=user_headers)
album = await resp.json()
cover = album['images'][0]
res.append(Track(
name=track['name'],
artist=', '.join(x['name'] for x in track['artists']),
cover_url=cover['url'],
spotify_id=track['id']
))
return res
async def fetch_track(self, track: Track):
async with get_session_context() as session:
resp = await session.execute(
select(Track).where(Track.spotify_id == track.spotify_id)
)
return resp.scalars().first()