@@ -59,13 +59,30 @@ def expand(self, pcoll):
59
59
raise NotImplementedError (self ._spec .__class__ )
60
60
61
61
62
- def _flatten_value_to_list (batch_values ):
63
- """Converts an N-D dense or sparse batch to a 1-D list."""
64
- # Ravel for flattening and tolist so that we go to native Python types
65
- # for more efficient followup processing.
66
- #
67
- batch_value , = batch_values
68
- return batch_value .ravel ().tolist ()
62
+ class _OrderElementsFn (beam .DoFn ):
63
+ """Sort the vocabulary by descending frequency count."""
64
+
65
+ def __init__ (self , store_frequency ):
66
+ self ._store_frequency = store_frequency
67
+
68
+ # Metrics.
69
+ self ._vocab_size_distribution = beam .metrics .Metrics .distribution (
70
+ common .METRICS_NAMESPACE , 'vocabulary_size' )
71
+
72
+ def process (self , element , counts_iter ):
73
+ del element
74
+ counts = list (counts_iter )
75
+ self ._vocab_size_distribution .update (len (counts ))
76
+
77
+ if not counts :
78
+ counts = [(1 , '49d0cd50-04bb-48c0-bc6f-5b575dce351a' )]
79
+
80
+ counts .sort (reverse = True ) # Largest first.
81
+ for count , entry in counts :
82
+ if self ._store_frequency :
83
+ yield '{} {}' .format (count , entry )
84
+ else :
85
+ yield entry
69
86
70
87
71
88
@with_input_types (List [np .ndarray ])
@@ -88,13 +105,21 @@ def expand(self, pcoll):
88
105
# pairs in sorted order by decreasing counts (and by values for equal
89
106
# counts).
90
107
108
+ def flatten_value_to_list (batch_values ):
109
+ """Converts an N-D dense or sparse batch to a 1-D list."""
110
+ # Ravel for flattening and tolist so that we go to native Python types
111
+ # for more efficient followup processing.
112
+ #
113
+ batch_value , = batch_values
114
+ return batch_value .ravel ().tolist ()
115
+
91
116
def is_problematic_string (kv ):
92
117
string , _ = kv # Ignore counts.
93
118
return string and '\n ' not in string and '\r ' not in string
94
119
95
120
counts = (
96
121
pcoll
97
- | 'FlattenStrings' >> beam .FlatMap (_flatten_value_to_list )
122
+ | 'FlattenStrings' >> beam .FlatMap (flatten_value_to_list )
98
123
| 'CountPerString' >> beam .combiners .Count .PerElement ()
99
124
| 'FilterProblematicStrings' >> beam .Filter (is_problematic_string )
100
125
| 'SwapStringsAndCounts' >> beam .KvSwap ())
@@ -105,51 +130,38 @@ def is_problematic_string(kv):
105
130
counts |= ('FilterByFrequencyThreshold(%s)' % frequency_threshold >>
106
131
beam .Filter (lambda kv : kv [0 ] >= frequency_threshold ))
107
132
108
- if top_k is not None :
133
+ if top_k is None :
134
+ # Performance optimization to obviate reading from finely sharded files
135
+ # via AsIter in order_elements below. By breaking fusion, we allow sharded
136
+ # files' sizes to be automatically computed (when possible), so we end up
137
+ # reading from fewer and larger files. This is not needed when top_k is
138
+ # provided since that already induces a single-sharded output (due to the
139
+ # CombineGlobaly).
140
+ counts |= 'Reshard' >> beam .transforms .Reshuffle () # pylint: disable=no-value-for-parameter
141
+ else :
109
142
counts = (counts
110
143
| 'Top(%s)' % top_k
111
- >> beam .transforms .combiners .Top .Largest (top_k )
144
+ # Using without_defaults() below since it obviates unnecessary
145
+ # materializations. This is worth doing because:
146
+ # a) Some vocabs could be really large and allthough they do
147
+ # fit in memory they might go over per-record
148
+ # materialization limits (TopCombineFn is producing
149
+ # single-record with the entire vocabulary as a list).
150
+ # b) More fusion leads to increased performance in general.
151
+ >> beam .CombineGlobally (
152
+ beam .combiners .TopCombineFn (top_k )).without_defaults ()
112
153
| 'FlattenList' >> beam .FlatMap (lambda lst : lst ))
113
154
114
- # Performance optimization to obviate reading from finely sharded files
115
- # via AsIter. By breaking fusion, we allow sharded files' sizes to be
116
- # automatically computed (when possible), so we end up reading from fewer
117
- # and larger files.
118
- counts |= 'Reshard' >> beam .transforms .Reshuffle () # pylint: disable=no-value-for-parameter
119
-
120
- # Using AsIter instead of AsList at the callsite below in order to reduce
121
- # max memory usage (due to AsList caching).
122
- def order_elements (ignored , counts_iter , store_frequency ):
123
- """Sort the vocabulary by descending frequency count."""
124
- del ignored
125
- counts = list (counts_iter )
126
- if not counts :
127
- counts = [(1 , '49d0cd50-04bb-48c0-bc6f-5b575dce351a' )]
128
- counts .sort (reverse = True ) # Largest first.
129
-
130
- # Log vocabulary size to metrics. Note we can call
131
- # beam.metrics.Metrics.distribution here because this function only gets
132
- # called once, so there is no need to amortize the cost of calling the
133
- # constructor by putting in a DoFn initializer.
134
- vocab_size_distribution = beam .metrics .Metrics .distribution (
135
- common .METRICS_NAMESPACE , 'vocabulary_size' )
136
- vocab_size_distribution .update (len (counts ))
137
-
138
- if store_frequency :
139
- # Returns ['count1 element1', ... ]
140
- return ['{} {}' .format (count , element ) for count , element in counts ]
141
- else :
142
- return [element for _ , element in counts ]
143
-
144
155
vocabulary_file = os .path .join (self ._temp_assets_dir ,
145
156
self ._spec .vocab_filename )
146
157
vocab_is_written = (
147
158
pcoll .pipeline
148
159
| 'Prepare' >> beam .Create ([None ])
149
- | 'OrderElements' >> beam .FlatMap (
150
- order_elements ,
151
- counts_iter = beam .pvalue .AsIter (counts ),
152
- store_frequency = self ._spec .store_frequency )
160
+ | 'OrderElements' >> beam .ParDo (
161
+ _OrderElementsFn (self ._spec .store_frequency ),
162
+ # Using AsIter instead of AsList at the callsite below in order to
163
+ # reduce max memory usage.
164
+ counts_iter = beam .pvalue .AsIter (counts ))
153
165
| 'WriteToFile' >> beam .io .WriteToText (vocabulary_file ,
154
166
shard_name_template = '' ))
155
167
# Return the vocabulary path.
0 commit comments