|
| 1 | +import gspread |
| 2 | +from enum import Enum |
| 3 | +from googleapiclient.discovery import build |
| 4 | + |
| 5 | + |
| 6 | +import gspread_formatting |
| 7 | + |
| 8 | +class FILE_OVERRIDE_BEHAVIORS(Enum): |
| 9 | + OVERRIDE_IF_IN_SAME_PLACE = 1 |
| 10 | + EXIT_IF_IN_SAME_PLACE = 2 |
| 11 | + EXIT_ANYWHERE = 3 |
| 12 | + |
| 13 | +class WORKSHEET_OVERRIDE_BEHAVIORS(Enum): |
| 14 | + OVERRIDE = 1 |
| 15 | + EXIT = 2 |
| 16 | + |
| 17 | +FONT_SIZE_PTS = 10 |
| 18 | +PTS_PIXELS_RATIO = 4/3 |
| 19 | +DEFAULT_BUFFER_CHARS = 4 |
| 20 | + |
| 21 | +def extract_credentials(authentication_response): |
| 22 | + """Extracts the credentials from the tuple from api.authenticate""" |
| 23 | + return authentication_response[3] |
| 24 | + |
| 25 | +def authenticate_gspread(authentication_response): |
| 26 | + """Authenticates the gspread client using the credentials in the tuple from api.authenticate""" |
| 27 | + gc = gspread.authorize(extract_credentials(authentication_response)) |
| 28 | + return gc |
| 29 | + |
| 30 | +def authenticate_drive_api(authentication_response): |
| 31 | + """Authenticates the Drive API using the credentials in the tuple from api.authenticate""" |
| 32 | + return build('drive', 'v3', credentials=extract_credentials(authentication_response)) |
| 33 | + |
| 34 | +def check_sheet_exists(gc, sheet_name): |
| 35 | + """ |
| 36 | + Determine if a sheet named 'sheet_name' exists anywhere in the project. |
| 37 | +
|
| 38 | + :param gc: the gspread client |
| 39 | + :param sheet_name: the name of the sheet to check for |
| 40 | + :returns: True if the sheet exists, otherwise False |
| 41 | + """ |
| 42 | + try: |
| 43 | + gc.open(sheet_name) |
| 44 | + return True |
| 45 | + except gspread.exceptions.SpreadsheetNotFound: |
| 46 | + return False |
| 47 | + |
| 48 | +def execute_drive_list(drive_api, search_params): |
| 49 | + """ |
| 50 | + Execute a files().list() request on the Drive API with the given search parameters. |
| 51 | + Returns the 'files' components of the response. |
| 52 | +
|
| 53 | + Positional arguments: |
| 54 | + :param drive_api: the Drive API object |
| 55 | + :param search_params: the search parameters, see https://developers.google.com/drive/api/v3/search-files |
| 56 | + :returns: the 'files' components of the response |
| 57 | + """ |
| 58 | + files_found = [] |
| 59 | + page_token = None |
| 60 | + while True: |
| 61 | + request = drive_api.files().list(q=search_params, spaces="drive", pageToken=page_token) |
| 62 | + response = request.execute() |
| 63 | + page_token = response.get("nextPageToken", None) |
| 64 | + files_found += response.get("files", []) |
| 65 | + if page_token is None: |
| 66 | + break |
| 67 | + return files_found |
| 68 | + |
| 69 | +def search_for_folder_id(drive_api, folder_name, allow_trashed = False, allow_duplicates = False): |
| 70 | + """ |
| 71 | + Search for a folder by name in the Drive API. |
| 72 | + Returns a list of folder ids that match the search criteria. |
| 73 | +
|
| 74 | + :param drive_api: the Drive API object |
| 75 | + :param folder_name: the name of the folder to search for |
| 76 | + :param allow_trashed: whether to include trashed folders in the search, defaults to False |
| 77 | + :param allow_duplicates: whether to allow multiple folders with the same name, defaults to False |
| 78 | + :returns: a list of folder ids that match the search criteria |
| 79 | + """ |
| 80 | + search_params = f"name = '{folder_name}' and mimeType = 'application/vnd.google-apps.folder'" |
| 81 | + if not allow_trashed: |
| 82 | + search_params += " and trashed = false" |
| 83 | + |
| 84 | + files = execute_drive_list(drive_api, search_params) |
| 85 | + files_exact_match = tuple(filter(lambda file: file["name"] == folder_name, files)) |
| 86 | + |
| 87 | + if len(files_exact_match) > 1: |
| 88 | + if not allow_duplicates: |
| 89 | + raise RuntimeError("Too many files returned") |
| 90 | + |
| 91 | + return [file["id"] for file in files_exact_match] |
| 92 | + |
| 93 | + |
| 94 | +def create_sheet_in_folder(authentication_response, sheet_name, parent_folder_name=None, override_behavior=FILE_OVERRIDE_BEHAVIORS.EXIT_ANYWHERE): |
| 95 | + """ |
| 96 | + Create a new sheet in the project with the given name and parent folder. |
| 97 | + Returns the new sheet. |
| 98 | +
|
| 99 | + :param authentication_response: the service parameters tuple |
| 100 | + :param sheet_name: the name of the new sheet |
| 101 | + :param parent_folder_name: the name of the parent folder for the new sheet |
| 102 | + :param override_behavior: the behavior to take if the sheet already exists |
| 103 | + :returns: the gspread.Spreadsheet object of the new sheet |
| 104 | + :rtype: gspread.Spreadsheet |
| 105 | + """ |
| 106 | + # Build Drive API |
| 107 | + drive_credentials = extract_credentials(authentication_response) |
| 108 | + gc = gspread.authorize(drive_credentials) |
| 109 | + drive_api = build('drive', 'v3', credentials=drive_credentials) |
| 110 | + parent_folder_id = None if parent_folder_name is None else search_for_folder_id(drive_api, parent_folder_name)[0] |
| 111 | + |
| 112 | + # Check if sheet already exists and handle based on input |
| 113 | + if check_sheet_exists(gc, sheet_name): |
| 114 | + if override_behavior == FILE_OVERRIDE_BEHAVIORS.EXIT_ANYWHERE: |
| 115 | + raise RuntimeError("Sheet already exists") |
| 116 | + matching_search = f"name = '{sheet_name}' and mimeType = 'application/vnd.google-apps.spreadsheet'" |
| 117 | + if parent_folder_id is None: |
| 118 | + matching_search += " and 'root' in parents" |
| 119 | + else: |
| 120 | + matching_search += f" and '{parent_folder_id}' in parents" |
| 121 | + matching_files = execute_drive_list(drive_api, matching_search) |
| 122 | + |
| 123 | + if len(matching_files) > 0: |
| 124 | + if override_behavior == FILE_OVERRIDE_BEHAVIORS.EXIT_IF_IN_SAME_PLACE: |
| 125 | + raise RuntimeError("File already exists in the same folder") |
| 126 | + elif override_behavior == FILE_OVERRIDE_BEHAVIORS.OVERRIDE_IF_IN_SAME_PLACE: |
| 127 | + for file in matching_files: |
| 128 | + drive_api.files().delete(fileId=file["id"]).execute() |
| 129 | + # Create file body |
| 130 | + body = { |
| 131 | + 'name': sheet_name, |
| 132 | + 'mimeType': 'application/vnd.google-apps.spreadsheet', |
| 133 | + } |
| 134 | + if parent_folder_id is not None: |
| 135 | + body["parents"] = [parent_folder_id] |
| 136 | + request = drive_api.files().create(body=body) |
| 137 | + new_sheet = request.execute() |
| 138 | + |
| 139 | + # Get id of fresh sheet |
| 140 | + spread_id = new_sheet["id"] |
| 141 | + |
| 142 | + # Open new file |
| 143 | + return gc.open_by_key(spread_id) |
| 144 | + |
| 145 | +def fill_worksheet_with_df( |
| 146 | + sheet, |
| 147 | + df, |
| 148 | + worksheet_name, |
| 149 | + overlapBehavior, |
| 150 | + options={ |
| 151 | + "bold_header": True, |
| 152 | + "center_header": True, |
| 153 | + "freeze_header": True, |
| 154 | + "column_widths": {"justify": True, "buffer_chars": DEFAULT_BUFFER_CHARS} |
| 155 | + } |
| 156 | + ): |
| 157 | + """ |
| 158 | + Fill a worksheet with the contents of a DataFrame. |
| 159 | + If the worksheet already exists, the behavior is determined by overlapBehavior. |
| 160 | + The options dictionary can be used to customize the formatting of the worksheet. |
| 161 | +
|
| 162 | + :param sheet: the gspread.Spreadsheet object |
| 163 | + :param df: the DataFrame to fill the worksheet with |
| 164 | + :param worksheet_name: the name of the worksheet to fill. Cannot be "Sheet1" |
| 165 | + :param overlapBehavior: the behavior to take if the worksheet already exists. |
| 166 | + :param options: the formatting options for the worksheet. |
| 167 | + Should be a dictionary with optional elements "bold_header", "center_header", "freeze_header", and "column_widths", optional |
| 168 | + """ |
| 169 | + # Sheet1 is special since it's created by default, so it's not allowed |
| 170 | + assert worksheet_name != "Sheet1" |
| 171 | + |
| 172 | + # Check if worksheet already exists and handle based on overlapBehavior |
| 173 | + try: |
| 174 | + worksheet = sheet.worksheet(worksheet_name) |
| 175 | + if overlapBehavior == WORKSHEET_OVERRIDE_BEHAVIORS.EXIT: |
| 176 | + raise RuntimeError("Worksheet already exists") |
| 177 | + except gspread.exceptions.WorksheetNotFound: |
| 178 | + worksheet = sheet.add_worksheet( |
| 179 | + title=worksheet_name, rows=df.shape[0], cols=df.shape[1] |
| 180 | + ) |
| 181 | + |
| 182 | + # Add data to worksheet |
| 183 | + worksheet.update([df.columns.values.tolist()] + df.values.tolist()) |
| 184 | + |
| 185 | + # Format worksheet |
| 186 | + # Justify Column Widths |
| 187 | + if "column_widths" not in options or options["column_widths"]["justify"]: |
| 188 | + text_widths = df.astype(str).columns.map( |
| 189 | + lambda column_name: df[column_name].astype(str).str.len().max() |
| 190 | + ) |
| 191 | + header_widths = df.columns.str.len() |
| 192 | + column_widths = [ |
| 193 | + round((max(len_tuple) + options["column_widths"]["buffer_chars"]) * FONT_SIZE_PTS * 1/PTS_PIXELS_RATIO) |
| 194 | + for len_tuple in zip(text_widths, header_widths) |
| 195 | + ] |
| 196 | + column_positions = [ |
| 197 | + gspread.utils.rowcol_to_a1(1, i + 1)[0] for i, _ in enumerate(column_widths) |
| 198 | + ] |
| 199 | + gspread_formatting.set_column_widths(worksheet, zip(column_positions, column_widths)) |
| 200 | + # Freeze Header |
| 201 | + if "freeze_header" not in options or options["freeze_header"]: |
| 202 | + gspread_formatting.set_frozen(worksheet, rows=1) |
| 203 | + format_options = gspread_formatting.CellFormat() |
| 204 | + # Bold Header |
| 205 | + if "bold_header" not in options or options["bold_header"]: |
| 206 | + format_options += gspread_formatting.CellFormat(textFormat=gspread_formatting.TextFormat(bold=True)) |
| 207 | + # Center Header |
| 208 | + if "center_header" not in options or options["center_header"]: |
| 209 | + format_options += gspread_formatting.CellFormat(horizontalAlignment="CENTER") |
| 210 | + gspread_formatting.format_cell_range( |
| 211 | + worksheet, |
| 212 | + f"A1:{gspread.utils.rowcol_to_a1(1, len(df.columns))}", |
| 213 | + format_options |
| 214 | + ) |
| 215 | + |
| 216 | + # Delete Sheet1 if it has been created by default |
| 217 | + if "Sheet1" in [i.title for i in sheet.worksheets()]: |
| 218 | + sheet.del_worksheet(sheet.worksheet("Sheet1")) |
| 219 | + |
| 220 | +def fill_spreadsheet_with_df_dict(sheet, df_dict, overlapBehavior): |
| 221 | + """ |
| 222 | + Fill a sheet with the contents of a dictionary of DataFrames. |
| 223 | + The keys of the dictionary are the names of the worksheets, and the values contain the data to be placed in the sheet. |
| 224 | + If any worksheets would be overidden, the behavior is determined by overlapBehavior. |
| 225 | +
|
| 226 | + :param sheet: the gspread.Spreadsheet object |
| 227 | + :param df_dict: the dictionary of DataFrames to fill the worksheets with |
| 228 | + :param overlapBehavior: the behavior to take if any of the worksheets already exist |
| 229 | + """ |
| 230 | + if overlapBehavior == WORKSHEET_OVERRIDE_BEHAVIORS.EXIT: |
| 231 | + for worksheet_name in df_dict.keys(): |
| 232 | + try: |
| 233 | + sheet.worksheet(worksheet_name) |
| 234 | + raise RuntimeError("Worksheet already exists") |
| 235 | + except gspread.exceptions.WorksheetNotFound: |
| 236 | + pass |
| 237 | + for worksheet_name, df in df_dict.items(): |
| 238 | + fill_worksheet_with_df(sheet, df, worksheet_name, overlapBehavior) |
| 239 | + |
0 commit comments