listenshare-bot/app/MusicProvider/SpotifyStrategy.py

100 lines
3.3 KiB
Python

import asyncio
import time
import aiohttp
from app.config import config
from app.MusicProvider.Strategy import MusicProviderStrategy
from app.dependencies import get_session_context
from sqlalchemy import select, update
from app.models import User, Track
from app.MusicProvider.auth import refresh_token, get_oauth_creds, get_encoded_creds
class SpotifyStrategy(MusicProviderStrategy):
def __init__(self, user_id):
super().__init__(user_id)
self.token = None
async def handle_token(self):
async with get_session_context() as session:
res = await session.execute(select(User).where(User.id == self.user_id))
user: User = res.scalars().first()
if not user:
return None
if int(time.time()) < user.spotify_auth["refresh_at"]:
return user.spotify_auth["access_token"]
token, expires_in = await refresh_token(
"https://accounts.spotify.com/api/token",
user.spotify_auth["refresh_token"],
get_encoded_creds(
user.spotify_auth["client_id"], user.spotify_auth["client_secret"]
),
config.proxy,
)
async with get_session_context() as session:
user = await session.get(User, self.user_id)
user.spotify_auth.update(
get_oauth_creds(token, user.spotify_auth["refresh_token"], expires_in)
)
return token
async def request(self, endpoint, token):
user_headers = {
"Authorization": "Bearer " + token,
"Content-Type": "application/json",
}
async with aiohttp.ClientSession(proxy=config.proxy) as session:
resp = await session.get(
f"https://api.spotify.com/v1{endpoint}", headers=user_headers
)
if resp.status != 200:
return None
return await resp.json()
@staticmethod
def convert_track(track: dict):
if track["type"] != "track":
return None
return Track(
name=track["name"],
artist=", ".join(x["name"] for x in track["artists"]),
cover_url=track["album"]["images"][0]["url"],
spotify_id=track["id"],
)
async def get_tracks(self, token) -> list[Track]:
current, recent = await asyncio.gather(
self.request("/me/player/currently-playing", token),
self.request("/me/player/recently-played", token),
)
tracks = []
if current:
tracks.append(self.convert_track(current["item"]))
for item in recent["items"]:
tracks.append(self.convert_track(item["track"]))
tracks = [x for x in tracks if x]
tracks = list(dict.fromkeys(tracks))
print(tracks)
return tracks
async def fetch_track(self, track: Track):
async with get_session_context() as session:
resp = await session.execute(
select(Track).where(Track.spotify_id == track.spotify_id)
)
return resp.scalars().first()
def song_link(self, track: Track):
return f'<a href="https://open.spotify.com/track/{track.spotify_id}">Spotify</a> | <a href="https://song.link/s/{track.spotify_id}">Other</a>'
def track_id(self, track: Track):
return track.spotify_id
__all__ = ["SpotifyStrategy"]