@@ -119,6 +119,8 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error {
119
119
}
120
120
switch kv .Kind {
121
121
case pb .KV_DATA_KEY :
122
+ sw .writeLock .Lock ()
123
+ defer sw .writeLock .Unlock ()
122
124
y .AssertTrue (len (sw .db .opt .EncryptionKey ) > 0 )
123
125
var dk pb.DataKey
124
126
if err := proto .Unmarshal (kv .Value , & dk ); err != nil {
@@ -136,6 +138,8 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error {
136
138
}
137
139
return nil
138
140
case pb .KV_FILE :
141
+ sw .writeLock .Lock ()
142
+ defer sw .writeLock .Unlock ()
139
143
// All tables should be recieved before any of the keys.
140
144
if sw .processingKeys {
141
145
return errors .New ("Received pb.KV_FILE after pb.KV_KEY" )
@@ -169,22 +173,27 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error {
169
173
// Pass. The following code will handle the keys.
170
174
}
171
175
176
+ sw .writeLock .Lock ()
172
177
sw .processingKeys = true
178
+ if sw .maxVersion < kv .Version {
179
+ sw .maxVersion = kv .Version
180
+ }
173
181
if sw .prevLevel == 0 {
174
- // If prevLevel is 0, that means that we have not written anything yet. Equivalently,
175
- // we were virtually writing to the maxLevel+1.
182
+ // If prevLevel is 0, that means that we have not written anything yet.
183
+ // So, we can write to the maxLevel. newWriter writes to prevLevel - 1,
184
+ // so we can set prevLevel to len(levels).
176
185
sw .prevLevel = len (sw .db .lc .levels )
177
186
}
187
+ sw .writeLock .Unlock ()
188
+
178
189
var meta , userMeta byte
179
190
if len (kv .Meta ) > 0 {
180
191
meta = kv .Meta [0 ]
181
192
}
182
193
if len (kv .UserMeta ) > 0 {
183
194
userMeta = kv .UserMeta [0 ]
184
195
}
185
- if sw .maxVersion < kv .Version {
186
- sw .maxVersion = kv .Version
187
- }
196
+
188
197
e := & Entry {
189
198
Key : y .KeyWithTs (kv .Key , kv .Version ),
190
199
Value : y .Copy (kv .Value ),
0 commit comments