@@ -66,6 +66,11 @@ class FileJobStore(AbstractJobStore):
66
66
# 10Mb RAM chunks when reading/writing files
67
67
BUFFER_SIZE = 10485760 # 10Mb
68
68
69
+ # When a log file is still being written, what will its name end with?
70
+ LOG_TEMP_SUFFIX = ".new"
71
+ # All log files start with this prefix
72
+ LOG_PREFIX = "stats"
73
+
69
74
def default_caching (self ) -> bool :
70
75
"""
71
76
Jobstore's preference as to whether it likes caching or doesn't care about it.
@@ -90,6 +95,9 @@ def __init__(self, path: str, fanOut: int = 1000) -> None:
90
95
self .jobsDir = os .path .join (self .jobStoreDir , "jobs" )
91
96
# Directory where stats files go
92
97
self .statsDir = os .path .join (self .jobStoreDir , "stats" )
98
+ # Which has subdirectories for new and seen stats files
99
+ self .stats_inbox = os .path .join (self .statsDir , "inbox" )
100
+ self .stats_archive = os .path .join (self .statsDir , "archive" )
93
101
# Directory where non-job-associated files for the file store go
94
102
self .filesDir = os .path .join (self .jobStoreDir , "files/no-job" )
95
103
# Directory where job-associated files for the file store go.
@@ -118,6 +126,8 @@ def initialize(self, config):
118
126
raise
119
127
os .makedirs (self .jobsDir , exist_ok = True )
120
128
os .makedirs (self .statsDir , exist_ok = True )
129
+ os .makedirs (self .stats_inbox , exist_ok = True )
130
+ os .makedirs (self .stats_archive , exist_ok = True )
121
131
os .makedirs (self .filesDir , exist_ok = True )
122
132
os .makedirs (self .jobFilesDir , exist_ok = True )
123
133
os .makedirs (self .sharedFilesDir , exist_ok = True )
@@ -836,29 +846,66 @@ def list_all_file_names(self, for_job: Optional[str] = None) -> Iterable[str]:
836
846
837
847
def write_logs (self , msg ):
838
848
# Temporary files are placed in the stats directory tree
839
- tempStatsFileName = "stats" + str (uuid .uuid4 ().hex ) + ".new"
840
- tempStatsFile = os .path .join (self ._get_arbitrary_stats_dir (), tempStatsFileName )
849
+ tempStatsFileName = self . LOG_PREFIX + str (uuid .uuid4 ().hex ) + self . LOG_TEMP_SUFFIX
850
+ tempStatsFile = os .path .join (self ._get_arbitrary_stats_inbox_dir (), tempStatsFileName )
841
851
writeFormat = "w" if isinstance (msg , str ) else "wb"
842
852
with open (tempStatsFile , writeFormat ) as f :
843
853
f .write (msg )
844
- os .rename (tempStatsFile , tempStatsFile [:- 4 ]) # This operation is atomic
854
+ os .rename (tempStatsFile , tempStatsFile [:- len ( self . LOG_TEMP_SUFFIX ) ]) # This operation is atomic
845
855
846
856
def read_logs (self , callback , read_all = False ):
847
- numberOfFilesProcessed = 0
848
- for tempDir in self ._stats_directories ():
849
- for tempFile in os .listdir (tempDir ):
850
- if tempFile .startswith ("stats" ):
851
- absTempFile = os .path .join (tempDir , tempFile )
852
- if os .path .isfile (absTempFile ):
853
- if read_all or not tempFile .endswith (".new" ):
854
- with open (absTempFile , "rb" ) as fH :
855
- callback (fH )
856
- numberOfFilesProcessed += 1
857
- newName = tempFile .rsplit ("." , 1 )[0 ] + ".new"
858
- newAbsTempFile = os .path .join (tempDir , newName )
857
+ files_processed = 0
858
+
859
+ # Holds pairs of a function to call to get directories to look at, and
860
+ # a flag for whether to archive the files found.
861
+ queries = []
862
+ if read_all :
863
+ # If looking at all logs, check the archive
864
+ queries .append ((self ._stats_archive_directories , False ))
865
+ # Always check the inbox and archive from it. But do it after checking
866
+ # the archive to avoid duplicates in the same pass.
867
+ queries .append ((self ._stats_inbox_directories , True ))
868
+
869
+ for to_call , should_archive in queries :
870
+ for log_dir in to_call ():
871
+ for log_file in os .listdir (log_dir ):
872
+ if not log_file .startswith (self .LOG_PREFIX ):
873
+ # Skip anything not a log file (like the other spray
874
+ # directories)
875
+ continue
876
+ if log_file .endswith (self .LOG_TEMP_SUFFIX ):
877
+ # Skip partially-written files, always.
878
+ continue
879
+
880
+ abs_log_file = os .path .join (log_dir , log_file )
881
+ if not os .path .isfile (abs_log_file ):
882
+ # This can't be a log file.
883
+ continue
884
+ try :
885
+ opened_file = open (abs_log_file , "rb" )
886
+ except FileNotFoundError :
887
+ # File disappeared before we could open it.
888
+ # Maybe someone else is reading logs?
889
+ continue
890
+ with opened_file as f :
891
+ callback (f )
892
+ files_processed += 1
893
+
894
+ if should_archive :
895
+ # We need to move the stats file to the archive.
896
+ # Since we have UUID stats file names we don't need
897
+ # to worry about collisions when it gets there.
898
+ new_dir = self ._get_arbitrary_stats_archive_dir ()
899
+ new_abs_log_file = os .path .join (new_dir , log_file )
900
+ try :
859
901
# Mark this item as read
860
- os .rename (absTempFile , newAbsTempFile )
861
- return numberOfFilesProcessed
902
+ os .rename (abs_log_file , new_abs_log_file )
903
+ except FileNotFoundError :
904
+ # File we wanted to archive disappeared.
905
+ # Maybe someone else is reading logs?
906
+ # TODO: Raise ConcurrentFileModificationException?
907
+ continue
908
+ return files_processed
862
909
863
910
##########################################
864
911
# Private methods
@@ -1010,17 +1057,31 @@ def _get_arbitrary_jobs_dir_for_name(self, jobNameSlug):
1010
1057
os .path .join (self .jobsDir , self .JOB_NAME_DIR_PREFIX + jobNameSlug )
1011
1058
)
1012
1059
1013
- def _get_arbitrary_stats_dir (self ):
1060
+ def _get_arbitrary_stats_inbox_dir (self ):
1014
1061
"""
1015
- Gets a temporary directory in a multi-level hierarchy in self.statsDir.
1062
+ Gets a temporary directory in a multi-level hierarchy in
1063
+ self.stats_inbox, where stats files not yet seen by the leader live.
1016
1064
The directory is not unique and may already have other stats files in it.
1017
1065
1018
1066
:rtype : string, path to temporary directory in which to place files/directories.
1019
1067
1020
1068
1021
1069
"""
1022
1070
1023
- return self ._get_dynamic_spray_dir (self .statsDir )
1071
+ return self ._get_dynamic_spray_dir (self .stats_inbox )
1072
+
1073
+ def _get_arbitrary_stats_archive_dir (self ):
1074
+ """
1075
+ Gets a temporary directory in a multi-level hierarchy in
1076
+ self.stats_archive, where stats files already seen by the leader live.
1077
+ The directory is not unique and may already have other stats files in it.
1078
+
1079
+ :rtype : string, path to temporary directory in which to place files/directories.
1080
+
1081
+
1082
+ """
1083
+
1084
+ return self ._get_dynamic_spray_dir (self .stats_archive )
1024
1085
1025
1086
def _get_arbitrary_files_dir (self ):
1026
1087
"""
@@ -1156,14 +1217,23 @@ def _job_directories(self):
1156
1217
os .path .join (jobHoldingDir , jobNameDir )
1157
1218
)
1158
1219
1159
- def _stats_directories (self ):
1220
+ def _stats_inbox_directories (self ):
1160
1221
"""
1161
- :rtype : an iterator to the temporary directories containing stats
1162
- files. They may also contain directories containing more
1163
- stats files.
1222
+ :returns: an iterator to the temporary directories containing new stats
1223
+ files. They may also contain directories containing more stats
1224
+ files.
1225
+ """
1226
+
1227
+ return self ._walk_dynamic_spray_dir (self .stats_inbox )
1228
+
1229
+ def _stats_archive_directories (self ):
1230
+ """
1231
+ :returns: an iterator to the temporary directories containing
1232
+ previously observed stats files. They may also contain directories
1233
+ containing more stats files.
1164
1234
"""
1165
1235
1166
- return self ._walk_dynamic_spray_dir (self .statsDir )
1236
+ return self ._walk_dynamic_spray_dir (self .stats_archive )
1167
1237
1168
1238
def _get_unique_file_path (self , fileName , jobStoreID = None , cleanup = False ):
1169
1239
"""
0 commit comments