diff --git a/.env.example b/.env.example index 6450288..af535ac 100644 --- a/.env.example +++ b/.env.example @@ -13,5 +13,8 @@ MUSIC_FOLDER=/path/to/your/music # Spotify client id and secret for searching tracks: # visit https://developer.spotify.com/documentation/web-api to get your client id and secret. SPOTIFY_CLIENT_ID=your_spotify_client_id + SPOTIFY_CLIENT_SECRET=your_spotify_client_secret -REDIRECT_URI=http://localhost:3000 \ No newline at end of file + +REDIRECT_URI=http://localhost:3000 + diff --git a/.gitignore b/.gitignore index b7655b2..fb164c1 100644 --- a/.gitignore +++ b/.gitignore @@ -74,6 +74,8 @@ docs/_build/ #extra .cache + +.DS_Store private/ #shell-script diff --git a/ethos/argparser.py b/ethos/argparser.py new file mode 100644 index 0000000..6aaab72 --- /dev/null +++ b/ethos/argparser.py @@ -0,0 +1,91 @@ +import argparse +from ethos.utils import get_audio_url, fetch_tracks_list +from ethos.player import MusicPlayer, TrackInfo +from ethos.tools import helper +from rich.console import Console +import asyncio +import time + +class ArgumentParser(): + """Argument Parser class for ethos cli""" + + def __init__(self): + self.parser = argparse.ArgumentParser(description="Ethos CLI App") + self.console = Console(color_system='256') + self.player = MusicPlayer() + + def start(self, args) -> None: + """Start the player""" + self.console.print("[magenta]Starting ethos cli...") + + def stop(self, args) -> None: + """Stop the player""" + self.console.print("[magenta]Stopping ethos cli...") + + def status(self, args) -> None: + """Sets the status of the player""" + self.console.print("[magenta]Status ethos cli...") + + async def play(self, args) -> None: + """Plays a track provided by the user""" + + track = args.track if args.track else str(input("Enter track name to play :")) + self.console.print(f"[cyan]Fetching track: {track}") + tracks_list = await fetch_tracks_list(track) + track_no = 0 + if not args.track_no: + if tracks_list: + self.console.print("Search results :") + self.console.print("\n".join(tracks_list)) + track_no = int(input("Enter track number :")) + track_name = helper.Format.clean_hashtag(tracks_list[track_no-1]) + track_url = get_audio_url(track_name+"official music video") + + if not track_url: + self.console.print(f"[red]Could not fetch URL for {track_name}") + return + + volume = args.volume if args.volume else 50 + self.console.print("[deep pink]Playing at default volume: 50") + self.player.set_volume(volume) + self.player.play(track_url) + + self.console.print(f"[deep pink]Playing {track_name}") + track_length = TrackInfo.get_audio_duration_int(track_url) + while TrackInfo.get_current_time_int(self.player) < track_length: + time.sleep(1) + if TrackInfo.get_current_time_int(self.player) == track_length: + return + + async def listen(self) -> None: + """Listens to commands from cli""" + self.subparsers = self.parser.add_subparsers(dest="command", help="Available commands:") + + self.start_parser = self.subparsers.add_parser("start", help="Start the app") + self.start_parser.set_defaults(func=self.start) + + self.stop_parser = self.subparsers.add_parser("stop", help="Stop the parser") + self.stop_parser.set_defaults(func=self.stop) + + self.status_parser = self.subparsers.add_parser("status", help="Check status") + self.status_parser.set_defaults(func=self.status) + + self.play_parser = self.subparsers.add_parser("play", help="play a track") + self.play_parser.add_argument("track", type=str, nargs="?", help="name of the track you want to play") + self.play_parser.add_argument("track_no", type=int, nargs="?", help="track no. of track to be played from search results") + self.play_parser.add_argument("volume", type=int, nargs="?", help="volume of the player") + self.play_parser.set_defaults(func=self.play) + + self.args = self.parser.parse_args() + + if hasattr(self.args, "func"): + await self.args.func(self.args) + + else: + self.parser.print_help() + + +if __name__ == "__main__": + arg_parser = ArgumentParser() + asyncio.run(arg_parser.listen()) + diff --git a/ethos/main.py b/ethos/main.py index 48782eb..10d36a0 100644 --- a/ethos/main.py +++ b/ethos/main.py @@ -1,3 +1,4 @@ + ######################## # Entry point of Ethos # ######################## diff --git a/ethos/player.py b/ethos/player.py index 2d846c0..3b74482 100644 --- a/ethos/player.py +++ b/ethos/player.py @@ -226,4 +226,4 @@ def get_progress(music_player: "MusicPlayer") -> float: if duration > 0: return (current_time / duration) * 100 - return 0.0 \ No newline at end of file + return 0.0 diff --git a/ethos/spotify_importer.py b/ethos/spotify_importer.py index 26d5201..7d31e58 100644 --- a/ethos/spotify_importer.py +++ b/ethos/spotify_importer.py @@ -115,4 +115,4 @@ def refresh_playlist(self, playlist_id: str, playlist_name: str): print(f"Playlist '{selected_playlist['name']}' has been saved.") - # importer.refresh_all_playlists() # Refresh all playlists + importer.refresh_all_playlists() # Refresh all playlists diff --git a/ethos/tools/helper.py b/ethos/tools/helper.py index d554c96..634253a 100644 --- a/ethos/tools/helper.py +++ b/ethos/tools/helper.py @@ -44,15 +44,21 @@ def parse_command(command: str): command_type, value = parts - if command_type == '/play' or command_type == '/queue-add' or command_type == '/queue-remove': + if command_type == '/play' or command_type == '/queue-add' or command_type == '/queue-remove' or command_type == "/vp": return value elif command_type == '/volume' or command_type == '/qp': try: return int(value) except ValueError: raise ValueError("Volume must be a number") - else: - raise ValueError("Unknown command") + elif command_type == '/ap': + try: + parts = value.split(maxsplit=1) + playlist_name, track_name = parts + return playlist_name, track_name + except: + pass + @staticmethod diff --git a/ethos/ui/textual_app.py b/ethos/ui/textual_app.py index 82b74b7..1ecefe8 100644 --- a/ethos/ui/textual_app.py +++ b/ethos/ui/textual_app.py @@ -2,10 +2,10 @@ from textual.reactive import reactive from textual.widgets import Input from textual import work -from ui.rich_layout import RichLayout -from player import MusicPlayer, TrackInfo -from tools import helper -from utils import fetch_tracks_list, get_audio_url, fetch_recents, add_track_to_recents, fetch_tracks_from_playlist, add_track_to_playlist +from ethos.ui.rich_layout import RichLayout +from ethos.player import MusicPlayer, TrackInfo +from ethos.tools import helper +from ethos.utils import Search, UserFiles import random class TextualApp(App): @@ -36,6 +36,7 @@ class TextualApp(App): recents = reactive([]) current_track_duration = reactive("") show_playlists = reactive(False) + layout_widget = "" def compose(self) -> ComposeResult: """Composer function for textual app""" @@ -48,14 +49,15 @@ def on_mount(self): """Handle functions after mounting the app""" self.input = reactive("") - self.recents = fetch_recents() - layout_widget = self.query_one(RichLayout) + self.recents = UserFiles.fetch_recents() + self.layout_widget = self.query_one(RichLayout) + try: if self.recents: - layout_widget.update_dashboard(self.recents, "Recents :-") + self.layout_widget.update_dashboard(self.recents, "Recents :-") else: - layout_widget.update_dashboard("You have not played any tracks yet!", "") - self.set_interval(1, self.update_track_progress) + self.layout_widget.update_dashboard("You have not played any tracks yet!", "") + self.set_interval(1, self.update_) except: pass @@ -64,20 +66,20 @@ async def on_input_submitted(self, event: Input.Submitted): """Handle input submission""" self.input = event.value - layout_widget = self.query_one(RichLayout) + if event.value: if event.value.startswith("/play"): try: search_track = self.helper.parse_command(event.value) - layout_widget.update_log("Searching for tracks") - self.tracks_list = await fetch_tracks_list(search_track) + self.layout_widget.update_log("Searching for tracks") + self.tracks_list = await Search.fetch_tracks_list(search_track) if self.tracks_list: - layout_widget.update_dashboard(self.tracks_list, "Type track no. to be played :-") + self.layout_widget.update_dashboard(self.tracks_list, "Type track no. to be played :-") self.update_input() self.select_from_queue = False except ValueError: - layout_widget.update_dashboard("Invalid command. Make sure to enter a valid command. You can see the list of commands using /help", "") + self.layout_widget.update_dashboard("Invalid command. Make sure to enter a valid command. You can see the list of commands using /help", "") pass @@ -86,7 +88,7 @@ async def on_input_submitted(self, event: Input.Submitted): self.should_play_queue = False self.track_to_play = self.tracks_list[int(event.value)-1] self.handle_play(self.track_to_play) - layout_widget.update_log("Playing track from search") + self.layout_widget.update_log("Playing track from search") self.update_input() except: pass @@ -96,21 +98,21 @@ async def on_input_submitted(self, event: Input.Submitted): volume_to_be_set = self.helper.parse_command(event.value) self.player.set_volume(volume_to_be_set) self.update_input() - layout_widget.update_volume(volume_to_be_set) + self.layout_widget.update_volume(volume_to_be_set) except ValueError: - layout_widget.update_dashboard("Please enter the volume in digits.", "") + self.layout_widget.update_dashboard("Please enter the volume in digits.", "") pass if event.value.startswith("/queue-add"): try: self.search_track = self.helper.parse_command(event.value) - self.queue_options = await fetch_tracks_list(self.search_track) + self.queue_options = await Search.fetch_tracks_list(self.search_track) if self.queue_options: - layout_widget.update_dashboard(self.queue_options, "Type track no. to be added to queue :-") + self.layout_widget.update_dashboard(self.queue_options, "Type track no. to be added to queue :-") self.update_input() self.select_from_queue = True except ValueError: - layout_widget.update_dashboard("Please enter a valid track name. You can view the list of commands using /help", "") + self.layout_widget.update_dashboard("Please enter a valid track name. You can view the list of commands using /help", "") pass if event.value.isdigit() and self.select_from_queue: @@ -127,7 +129,7 @@ async def on_input_submitted(self, event: Input.Submitted): try: tracks = self.queue.values() data = "\n".join(f"{i+1}. {track}" for i, track in enumerate(tracks)) - layout_widget.update_dashboard(data, "Current Queue :-") + self.layout_widget.update_dashboard(data, "Current Queue :-") self.update_input() except: pass @@ -147,79 +149,119 @@ async def on_input_submitted(self, event: Input.Submitted): track = queue[ind-1] del self.queue[key] self.handle_play(track) - layout_widget.update_log("Playing track from current queue") + self.layout_widget.update_log("Playing track from current queue") self.update_input() except ValueError: - layout_widget.update_dashboard("Please enter the no. of track you want to play", "") + self.layout_widget.update_dashboard("Please enter the no. of track you want to play", "") pass + if event.value == "/recents": + try: + self.recents = UserFiles.fetch_recents() + self.layout_widget.update_dashboard(self.recents, "Recents :") + self.update_input() + except: + pass + + if event.value == "/sp" or event.value == "/show-playlists": + self.show_playlists() + + if event.value.startswith("/ap"): + playlist_name, track_name = self.helper.parse_command(event.value) + self.add_to_playlist(track_name, playlist_name) + + if event.value.startswith("/vp"): + playlist_name = self.helper.parse_command(event.value) + self.show_tracks_from_playlist(playlist_name) + + if event.value == "/help": try: - layout_widget.show_commands() + self.layout_widget.show_commands() except: pass def action_pause(self): """Pause the player""" - layout_widget = self.query_one(RichLayout) self.player.pause() - layout_widget.update_playing_status() + self.layout_widget.update_playing_status() def action_resume(self): """Resume the player""" - layout_widget = self.query_one(RichLayout) + if not self.player.is_playing: self.player.resume() - layout_widget.update_playing_status() + self.layout_widget.update_playing_status() def action_volume_up(self): """Increase the volume by 5 levels""" current_volume = self.player.get_volume() self.player.set_volume(current_volume+5) - layout_widget = self.query_one(RichLayout) - layout_widget.update_volume(self.player.get_volume()) + self.layout_widget.update_volume(self.player.get_volume()) def action_volume_down(self): """Decrease the volume by 5 levels""" current_volume = self.player.get_volume() self.player.set_volume(current_volume-5) - layout_widget = self.query_one(RichLayout) - layout_widget.update_volume(self.player.get_volume()) + self.layout_widget.update_volume(self.player.get_volume()) def handle_play(self, track_name: str): """Function to handle the track playback""" - layout_widget = self.query_one(RichLayout) try: - url = get_audio_url(track_name+" official audio") + url = Search.get_audio_url(track_name+" official music video") self.track_url = url self.player.set_volume(50) self.player.play(url) - add_track_to_recents(helper.Format.clean_hashtag(track_name)) - layout_widget.update_track(track_name) + UserFiles.add_track_to_recents(helper.Format.clean_hashtag(track_name)) + self.layout_widget.update_track(track_name) self.current_track_duration = TrackInfo.get_audio_duration(url) - layout_widget.update_total_track_time(TrackInfo.get_audio_duration(url)) + self.layout_widget.update_total_track_time(TrackInfo.get_audio_duration(url)) color_ind = random.randint(0,9) - layout_widget.update_color(color_ind) + self.layout_widget.update_color(color_ind) + except: + pass + + def show_playlists(self) -> None: + try: + playlists = UserFiles.fetch_playlists() + data = "\n".join(playlists) if playlists else "" + self.layout_widget.update_dashboard(data, "Your playlists") except: pass + def show_tracks_from_playlist(self, playlist: str) -> None: + try: + playlist = UserFiles.fetch_tracks_from_playlist(playlist) + data = "\n".join(f"{i+1}. {track}" for i, track in enumerate(playlist)) + self.layout_widget.update_dashboard(data, "Playlist Contents :") + except: + pass + + def add_to_playlist(self, track, playlist: str) -> None: + try: + UserFiles.add_track_to_playlist(track, playlist) + self.layout_widget.update_log("Track added to playlist") + except: + pass + + def update_input(self) -> None: """Function to reset the data in input widget once user enters his input""" input_widget = self.query_one(Input) input_widget.placeholder = "" input_widget.value = "" - def update_track_progress(self) -> None: - """Function to update track progress""" - layout_widget = self.query_one(RichLayout) + def update_(self) -> None: + """Function to update track progress and check for queue""" + try: - layout_widget.update_music_progress(TrackInfo.get_current_time(self.player), int(TrackInfo.get_progress(self.player))) + self.layout_widget.update_music_progress(TrackInfo.get_current_time(self.player), int(TrackInfo.get_progress(self.player))) except: pass - if self.current_track_duration == TrackInfo.get_current_time(self.player): + if TrackInfo.get_progress(self.player) == 100.0: if self.queue: try: keys = list(self.queue.keys()) @@ -228,9 +270,12 @@ def update_track_progress(self) -> None: track = tracks[0] del self.queue[key] self.handle_play(track) - layout_widget.update_log("Currently playing from queue") + entries = self.queue.values() + data = "\n".join(f"{i+1}. {track}" for i, track in enumerate(entries)) + self.layout_widget.update_dashboard(data, "Current Queue :-") + self.layout_widget.update_log("Currently playing from queue") except: pass - \ No newline at end of file + diff --git a/ethos/utils.py b/ethos/utils.py index 7870460..863ef61 100644 --- a/ethos/utils.py +++ b/ethos/utils.py @@ -1,322 +1,342 @@ from yt_dlp import YoutubeDL import os import base64 -from dotenv import load_dotenv +from dotenv import load_dotenv, find_dotenv from time import time import httpx from pathlib import Path from tools.helper import Format import json -load_dotenv() +load_dotenv(dotenv_path=find_dotenv(filename=".env")) +class Search: + """Utility class for searching track metadata and url from external APIs""" -def get_audio_url(query): - """ - Fetches the audio URL for a given search query using YoutubeDL. The function - utilizes specific configuration options to return the best available audio - source while ensuring no playlists are processed and only the top search - result is fetched. It does not download the file, only extracts the URL for - the audio stream. - :param query: A string representing the search query used to find the audio - content on YouTube. It can include keywords or phrases to search for. - :type query: str + @staticmethod + def get_audio_url(query): + """ + Fetches the audio URL for a given search query using YoutubeDL. The function + utilizes specific configuration options to return the best available audio + source while ensuring no playlists are processed and only the top search + result is fetched. It does not download the file, only extracts the URL for + the audio stream. + + :param query: A string representing the search query used to find the audio + content on YouTube. It can include keywords or phrases to search for. + :type query: str + + :return: The URL string of the best audio stream available based on the given + search query. + :rtype: str + """ + ydl_opts = { + 'format': 'bestaudio/best', + 'noplaylist': True, + 'quiet': True, + 'default_search': 'ytsearch1', + } - :return: The URL string of the best audio stream available based on the given - search query. - :rtype: str - """ - ydl_opts = { - 'format': 'bestaudio/best', - 'noplaylist': True, - 'quiet': True, - 'default_search': 'ytsearch1', - } + with YoutubeDL(ydl_opts) as ydl: + result = ydl.extract_info(query, download=False) + if 'entries' in result: + result = result['entries'][0] + return result['url'] - with YoutubeDL(ydl_opts) as ydl: - result = ydl.extract_info(query, download=False) - if 'entries' in result: - result = result['entries'][0] - return result['url'] -CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID") -CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET") + @staticmethod + async def get_spotify_token(client_id="e904c35efb014b76bd8999a211e9b1e1", client_secret="af18ccf7adae4ea7b37ca635c4225928"): + """ + Fetches authorization token from spotify + + Args: client_id(str), client_secret(str) + + return: spotify authorization token + """ + url = "https://accounts.spotify.com/api/token" + headers = { + "Authorization": "Basic " + base64.b64encode(f"{client_id}:{client_secret}".encode()).decode() + } + data = {"grant_type": "client_credentials"} + + async with httpx.AsyncClient() as client: + response = await client.post(url, headers=headers, data=data) + response_data = response.json() -async def get_spotify_token(client_id, client_secret): - """ - Fetches authorization token from spotify - - Args: client_id(str), client_secret(str) - - return: spotify authorization token - """ - - url = "https://accounts.spotify.com/api/token" - headers = { - "Authorization": "Basic " + base64.b64encode(f"{client_id}:{client_secret}".encode()).decode() - } - data = {"grant_type": "client_credentials"} - - async with httpx.AsyncClient() as client: - response = await client.post(url, headers=headers, data=data) - response_data = response.json() + if response.status_code != 200: + raise Exception(f"Failed to get token: {response_data}") + + return response_data["access_token"] - if response.status_code != 200: - raise Exception(f"Failed to get token: {response_data}") - - return response_data["access_token"] + @staticmethod + async def search_tracks_from_spotify(track_name, token): + """ + Searches for a track in spotify and returns first 10 entries of search results + + Args: track_name(str), token(str) + + return: tracks(list) + """ -async def search_tracks_from_spotify(track_name, token): - """ - Searches for a track in spotify and returns first 10 entries of search results - - Args: track_name(str), token(str) - - return: tracks(list) - """ - - url = "https://api.spotify.com/v1/search" - headers = { - "Authorization": f"Bearer {token}" - } - params = { - "q": track_name, - "type": "track", - "limit": 10 - } - - async with httpx.AsyncClient() as client: - response = await client.get(url, headers=headers, params=params) - response_data = response.json() + url = "https://api.spotify.com/v1/search" + headers = { + "Authorization": f"Bearer {token}" + } + params = { + "q": track_name, + "type": "track", + "limit": 10 + } + + async with httpx.AsyncClient() as client: + response = await client.get(url, headers=headers, params=params) + response_data = response.json() - if response.status_code != 200: - raise Exception(f"Failed to fetch tracks: {response_data}") - - return response_data["tracks"]["items"] + if response.status_code != 200: + raise Exception(f"Failed to fetch tracks: {response_data}") + + return response_data["tracks"]["items"] -async def fetch_tracks_list(track_name: str) -> list: - """ - Returns a list of track name and artist name from tracks info + @staticmethod + async def fetch_tracks_list(track_name: str) -> list: + """ + Returns a list of track name and artist name from tracks info - Args: track_name(str) + Args: track_name(str) - return: list - """ + return: list + """ - fetched_tracks = [] - try: + fetched_tracks = [] + try: + + start_time = time() + token = await Search.get_spotify_token() + tracks = await Search.search_tracks_from_spotify(track_name, token) + if tracks: + print(f"\nTracks found for '{track_name}':") + for idx, track in enumerate(tracks, start=1): + track_info = f"{idx}. {track['name']} by {', '.join(artist['name'] for artist in track['artists'])}" + #print(track_info) + fetched_tracks.append(track_info.strip()) + else: + print(f"No tracks found for '{track_name}'.") - start_time = time() - token = await get_spotify_token(CLIENT_ID, CLIENT_SECRET) - tracks = await search_tracks_from_spotify(track_name, token) - if tracks: - print(f"\nTracks found for '{track_name}':") - for idx, track in enumerate(tracks, start=1): - track_info = f"{idx}. {track['name']} by {', '.join(artist['name'] for artist in track['artists'])}" - #print(track_info) - fetched_tracks.append(track_info.strip()) - else: - print(f"No tracks found for '{track_name}'.") - - except Exception as e: - print(f"Error: {e}") - - finally: - end_time = time() - print("Time taken to get metadata = %.2f" % (end_time - start_time)) - return fetched_tracks - - -async def search_artist_id_from_spotify(artist_name, token): #endless-playback - """Search for an artist on Spotify and return their ID.""" - url = f"https://api.spotify.com/v1/search?q={artist_name}&type=artist&limit=1" - headers = {"Authorization": f"Bearer {token}"} - - async with httpx.AsyncClient() as client: - response = await client.get(url, headers=headers) - if response.status_code == 200: - data = response.json() - if data["artists"]["items"]: - return data["artists"]["items"][0]["id"] - else: - raise Exception("No artist found!") - else: - raise Exception(f"Failed to search artist: {response.json()}") + except Exception as e: + print(f"Error: {e}") - + finally: + end_time = time() + print("Time taken to get metadata = %.2f" % (end_time - start_time)) + return fetched_tracks + -async def search_song_id_from_spotify(song_name, token): #endless playback - """Search for a song on Spotify.""" - url = f"https://api.spotify.com/v1/search?q={song_name}&type=track&limit=1" - headers = {"Authorization": f"Bearer {token}"} - - async with httpx.AsyncClient() as client: - response = await client.get(url, headers=headers) + @staticmethod + async def search_artist_id_from_spotify(artist_name, token): + """Search for an artist on Spotify and return their ID.""" + url = f"https://api.spotify.com/v1/search?q={artist_name}&type=artist&limit=1" + headers = {"Authorization": f"Bearer {token}"} + + async with httpx.AsyncClient() as client: + response = await client.get(url, headers=headers) if response.status_code == 200: data = response.json() - if data["tracks"]["items"]: - return data["tracks"]["items"][0]["id"] + if data["artists"]["items"]: + return data["artists"]["items"][0]["id"] else: - raise Exception("No song found!") + raise Exception("No artist found!") else: - raise Exception(f"Failed to search song: {response.json()}") + raise Exception(f"Failed to search artist: {response.json()}") - -async def fetch_top_tracks(artist_id, token, market="US"): - """Fetch top tracks of an artist.""" - url = f"https://api.spotify.com/v1/artists/{artist_id}/top-tracks?market={market}" - headers = {"Authorization": f"Bearer {token}"} - - async with httpx.AsyncClient() as client: - response = await client.get(url, headers=headers) - if response.status_code == 200: - data = response.json() - tracks = [] - for track in data["tracks"]: - tracks.append({ - "name": track["name"], - "artist": track["artists"][0]["name"] - }) - print(tracks) - return tracks - else: - raise Exception(f"Failed to fetch top tracks: {response.json()}") + @staticmethod + async def search_song_id_from_spotify(song_name, token): + """Search for a song on Spotify.""" + url = f"https://api.spotify.com/v1/search?q={song_name}&type=track&limit=1" + headers = {"Authorization": f"Bearer {token}"} + + async with httpx.AsyncClient() as client: + response = await client.get(url, headers=headers) + if response.status_code == 200: + data = response.json() + if data["tracks"]["items"]: + return data["tracks"]["items"][0]["id"] + else: + raise Exception("No song found!") + else: + raise Exception(f"Failed to search song: {response.json()}") -async def get_track_image(song_id, token): - """Fetch the track's album image URL using the Spotify API.""" - url = f"https://api.spotify.com/v1/tracks/{song_id}" - headers = { - "Authorization": f"Bearer {token}" - } - - async with httpx.AsyncClient() as client: - response = await client.get(url, headers=headers) - response_data = response.json() + + @staticmethod + async def fetch_top_tracks(artist_id, token, market="US"): + """Fetch top tracks of an artist.""" + url = f"https://api.spotify.com/v1/artists/{artist_id}/top-tracks?market={market}" + headers = {"Authorization": f"Bearer {token}"} - if response.status_code == 200: - album_images = response_data["album"]["images"] - if album_images: - # Return the highest resolution image (usually the first one) - return album_images[0]["url"] + async with httpx.AsyncClient() as client: + response = await client.get(url, headers=headers) + if response.status_code == 200: + data = response.json() + tracks = [] + for track in data["tracks"]: + tracks.append({ + "name": track["name"], + "artist": track["artists"][0]["name"] + }) + print(tracks) + return tracks else: - return "No album images found." - else: - raise Exception(f"Failed to get track data: {response_data}") - -# move to config.py -def fetch_recents() -> list[str]: - """Fetches the recent tracks and returns it in a list""" - - recents_file = Path.home() / ".ethos" / "userfiles" / "recents.txt" - recents = [] - try: - if os.path.exists(recents_file): - with open(recents_file, 'r') as file: - for line in file: - recents.append(line.strip()) - except: - return - return recents - - -def add_track_to_recents(track: str): - """Writes a track to the recents file, keeping only the last 10 entries.""" - recents_dir = Path.home() / ".ethos" / "userfiles" - recents_file = recents_dir / "recents.txt" + raise Exception(f"Failed to fetch top tracks: {response.json()}") + + + @staticmethod + async def get_track_image(song_id, token): + """Fetch the track's album image URL using the Spotify API.""" + url = f"https://api.spotify.com/v1/tracks/{song_id}" + headers = { + "Authorization": f"Bearer {token}" + } + + async with httpx.AsyncClient() as client: + response = await client.get(url, headers=headers) + response_data = response.json() + + if response.status_code == 200: + album_images = response_data["album"]["images"] + if album_images: + # Return the highest resolution image (usually the first one) + return album_images[0]["url"] + else: + return "No album images found." + else: + raise Exception(f"Failed to get track data: {response_data}") - recents_dir.mkdir(parents=True, exist_ok=True) - lines = [] - if recents_file.exists(): +class UserFiles: + """Utility class for performing operations related to userfiles like recents and playlists""" + + @staticmethod + def fetch_recents() -> list[str]: + """Fetches the recent tracks and returns it in a list""" + + recents_file = Path.home() / ".ethos" / "userfiles" / "recents.txt" + recents = [] try: - with open(recents_file, "r") as file: - lines = file.readlines() + if os.path.exists(recents_file): + with open(recents_file, 'r') as file: + for line in file: + recents.append(line.strip()) + except: + return + return recents + + + @staticmethod + def add_track_to_recents(track: str): + """Writes a track to the recents file, keeping only the last 10 entries.""" + recents_dir = Path.home() / ".ethos" / "userfiles" + recents_file = recents_dir / "recents.txt" + + recents_dir.mkdir(parents=True, exist_ok=True) + + lines = [] + if recents_file.exists(): + try: + with open(recents_file, "r") as file: + lines = file.readlines() + except Exception as e: + return f"Error reading recents file: {e}" + + track = track.strip() + removed_track = track+"\n" + if track+"\n" in lines: + lines.remove(removed_track) + lines.insert(0, track + "\n") + + lines = lines[:10] + + try: + with open(recents_file, "w") as file: + file.writelines(lines) except Exception as e: - return f"Error reading recents file: {e}" + return f"Error writing to recents file: {e}" - track = track.strip() - removed_track = track+"\n" - if track+"\n" in lines: - lines.remove(removed_track) - lines.insert(0, track + "\n") - lines = lines[:10] + @staticmethod + def fetch_tracks_from_playlist(playlist_name: str) -> list[str]: + """ + Function to fetch all songs from a playlist.json file. - try: - with open(recents_file, "w") as file: - file.writelines(lines) - except Exception as e: - return f"Error writing to recents file: {e}" + Args: + - playlist_name (str): name of a playlist + Returns: + - list: List of all songs in a particular playlist + """ + playlist_file = Path.home() / ".ethos" / "userfiles" / "playlists" / f"{playlist_name}.json" + tracks = [] + try: + if os.path.exists(playlist_file): + with open(playlist_file, 'r') as playlist: + tracks_json = json.load(playlist) + for track in tracks_json: + name = track["name"] + artist = track["artist"] + tracks.append(f"{name} by {artist}") + except: + pass + return + return tracks -def fetch_tracks_from_playlist(playlist_name: str) -> list[str]: - """ - Function to fetch all songs from a playlist.json file. + @staticmethod + def add_track_to_playlist(playlist_name: str, track_name: str) -> None: + """ + Function to add tracks to a playlist + Args: - playlist_name (str): name of a playlist - - Returns: - - list: List of all songs in a particular playlist """ - playlist_file = Path.home() / ".ethos" / "userfiles" / "playlists" / f"{playlist_name}.json" + playlist_dir = Path.home() / ".ethos" / "userfiles" / "playlists" + playlist_file = playlist_dir / f"{playlist_name}.json" + + playlist_dir.mkdir(parents=True, exist_ok=True) tracks = [] + track, artist = Format.extract_song_and_artist(track_name) + tracks.append({"name": track, "artist": artist}) try: if os.path.exists(playlist_file): with open(playlist_file, 'r') as playlist: tracks_json = json.load(playlist) for track in tracks_json: - name = track["name"] - artist = track["artist"] - tracks.append(f"{name} by {artist}") + tracks.append(track) + + with open(playlist_file, 'w') as file: + json.dump(tracks, file, indent=4) except: pass - return - return tracks -def add_track_to_playlist(playlist_name: str, track_name: str) -> None: - """ - Function to add tracks to a playlist - - Args: - - playlist_name (str): name of a playlist - """ - playlist_dir = Path.home() / ".ethos" / "userfiles" / "playlists" - playlist_file = playlist_dir / f"{playlist_name}.json" - - playlist_dir.mkdir(parents=True, exist_ok=True) - tracks = [] - track, artist = Format.extract_song_and_artist(track_name) - tracks.append({"name": track, "artist": artist}) - try: - if os.path.exists(playlist_file): - with open(playlist_file, 'r') as playlist: - tracks_json = json.load(playlist) - for track in tracks_json: - tracks.append(track) - - with open(playlist_file, 'w') as file: - json.dump(tracks, file, indent=4) - except: - pass - -def fetch_playlists() -> list[str]: - """ - Function to fetch all playlists from playlist path - """ - playlists = [] - playlist_dir = Path.home() / ".ethos" / "userfiles" / "playlists" - files = os.listdir(playlist_dir) - for f in files: - playlist = os.path.basename(playlist_dir / f).split('.')[0] - if playlist: - playlists.append(playlist) - - return playlists \ No newline at end of file + @staticmethod + def fetch_playlists() -> list[str]: + """ + Function to fetch all playlists from playlist path + """ + try: + playlists = [] + playlist_dir = Path.home() / ".ethos" / "userfiles" / "playlists" + files = os.listdir(playlist_dir) + for f in files: + playlist = os.path.basename(playlist_dir / f).split('.')[0] + if playlist: + playlists.append(playlist) + + return playlists + except: + pass + return \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 457dd61..f136009 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -yt-dlp==2025.01.15 +yt-dlp==2025.01.26 python-vlc==3.0.21203 spotipy==2.25.0 textual==1.0.0 @@ -7,6 +7,5 @@ rich==13.9.4 # Development dependencies python-dotenv==1.0.1 pytest==8.3.4 -pytest-asyncio>=0.21.0 -pytest-mock>=3.10.0 httpx>=0.28.1 +pytest-mock>=3.10.0