forked from apache/hudi
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[HUDI-271] Create QuickstartUtils for simplifying quickstart guide
- This will be used in Quickstart guide (Doc changes to follow in a seperate PR). The intention is to simplify quickstart to showcase hudi APIs by writing and reading using spark datasources. - This is located in hudi-spark module intentionally to bring all the necessary classes in hudi-spark-bundle finally.
- Loading branch information
Showing
1 changed file
with
206 additions
and
0 deletions.
There are no files selected for viewing
206 changes: 206 additions & 0 deletions
206
hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,206 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.hudi; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Random; | ||
import java.util.UUID; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.IntStream; | ||
import java.util.stream.Stream; | ||
import org.apache.avro.Schema; | ||
import org.apache.avro.generic.GenericData; | ||
import org.apache.avro.generic.GenericRecord; | ||
import org.apache.hudi.common.model.HoodieKey; | ||
import org.apache.hudi.common.model.HoodieRecord; | ||
import org.apache.hudi.common.util.HoodieAvroUtils; | ||
import org.apache.hudi.common.util.Option; | ||
import org.apache.hudi.exception.HoodieIOException; | ||
|
||
/** | ||
* Class to be used in quickstart guide for generating inserts and updates against a corpus. | ||
* Test data uses a toy Uber trips, data model. | ||
*/ | ||
public class QuickstartUtils { | ||
|
||
public static class DataGenerator { | ||
private static final String DEFAULT_FIRST_PARTITION_PATH = "americas/united_states/san_francisco"; | ||
private static final String DEFAULT_SECOND_PARTITION_PATH = "americas/brazil/sao_paulo"; | ||
private static final String DEFAULT_THIRD_PARTITION_PATH = "asia/india/chennai"; | ||
|
||
private static final String[] DEFAULT_PARTITION_PATHS = { | ||
DEFAULT_FIRST_PARTITION_PATH, | ||
DEFAULT_SECOND_PARTITION_PATH, | ||
DEFAULT_THIRD_PARTITION_PATH | ||
}; | ||
static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ " | ||
+ "{\"name\": \"ts\",\"type\": \"double\"}," | ||
+ "{\"name\": \"uuid\", \"type\": \"string\"}," | ||
+ "{\"name\": \"rider\", \"type\": \"string\"}," | ||
+ "{\"name\": \"driver\", \"type\": \"string\"}," | ||
+ "{\"name\": \"begin_lat\", \"type\": \"double\"}," | ||
+ "{\"name\": \"begin_lon\", \"type\": \"double\"}," | ||
+ "{\"name\": \"end_lat\", \"type\": \"double\"}," | ||
+ "{\"name\": \"end_lon\", \"type\": \"double\"}," | ||
+ "{\"name\":\"fare\",\"type\": \"double\"}]}"; | ||
static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); | ||
|
||
private static Random rand = new Random(46474747); | ||
|
||
private final Map<Integer, HoodieKey> existingKeys; | ||
private final String[] partitionPaths; | ||
private int numExistingKeys; | ||
|
||
public DataGenerator() { | ||
this(DEFAULT_PARTITION_PATHS, new HashMap<>()); | ||
} | ||
|
||
private DataGenerator(String[] partitionPaths, Map<Integer, HoodieKey> keyPartitionMap) { | ||
this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length); | ||
this.existingKeys = keyPartitionMap; | ||
} | ||
|
||
private static String generateRandomString() { | ||
int leftLimit = 48; // ascii for 0 | ||
int rightLimit = 57; // ascii for 9 | ||
int stringLength = 3; | ||
StringBuilder buffer = new StringBuilder(stringLength); | ||
for (int i = 0; i < stringLength; i++) { | ||
int randomLimitedInt = leftLimit + (int) | ||
(rand.nextFloat() * (rightLimit - leftLimit + 1)); | ||
buffer.append((char) randomLimitedInt); | ||
} | ||
return buffer.toString(); | ||
} | ||
|
||
public int getNumExistingKeys() { | ||
return numExistingKeys; | ||
} | ||
|
||
public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName, | ||
double timestamp) { | ||
GenericRecord rec = new GenericData.Record(avroSchema); | ||
rec.put("uuid", rowKey); | ||
rec.put("ts", timestamp); | ||
rec.put("rider", riderName); | ||
rec.put("driver", driverName); | ||
rec.put("begin_lat", rand.nextDouble()); | ||
rec.put("begin_lon", rand.nextDouble()); | ||
rec.put("end_lat", rand.nextDouble()); | ||
rec.put("end_lon", rand.nextDouble()); | ||
rec.put("fare", rand.nextDouble() * 100); | ||
return rec; | ||
} | ||
|
||
/** | ||
* Generates a new avro record of the above schema format, retaining the key if optionally provided. | ||
* The riderDriverSuffix string is a random String to simulate updates by changing the rider driver fields | ||
* for records belonging to the same commit. It is purely used for demo purposes. In real world, the actual | ||
* updates are assumed to be provided based on the application requirements. | ||
*/ | ||
public static OverwriteWithLatestAvroPayload generateRandomValue(HoodieKey key, String riderDriverSuffix) throws | ||
IOException { | ||
GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + riderDriverSuffix, "driver-" | ||
+ riderDriverSuffix, 0.0); | ||
return new OverwriteWithLatestAvroPayload(Option.of(rec)); | ||
} | ||
|
||
/** | ||
* Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys. | ||
*/ | ||
public Stream<HoodieRecord> generateInsertsStream(String randomString, Integer n) { | ||
int currSize = getNumExistingKeys(); | ||
|
||
return IntStream.range(0, n).boxed().map(i -> { | ||
String partitionPath = partitionPaths[rand.nextInt(partitionPaths.length)]; | ||
HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath); | ||
existingKeys.put(currSize + i, key); | ||
numExistingKeys++; | ||
try { | ||
return new HoodieRecord(key, generateRandomValue(key, randomString)); | ||
} catch (IOException e) { | ||
throw new HoodieIOException(e.getMessage(), e); | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
* Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys. | ||
*/ | ||
public List<HoodieRecord> generateInserts(Integer n) throws IOException { | ||
String randomString = generateRandomString(); | ||
return generateInsertsStream(randomString, n).collect(Collectors.toList()); | ||
} | ||
|
||
public HoodieRecord generateUpdateRecord(HoodieKey key, String randomString) throws IOException { | ||
return new HoodieRecord(key, generateRandomValue(key, randomString)); | ||
} | ||
|
||
/** | ||
* Generates new updates, randomly distributed across the keys above. There can be duplicates within the returned | ||
* list | ||
* | ||
* @param n Number of updates (including dups) | ||
* @return list of hoodie record updates | ||
*/ | ||
public List<HoodieRecord> generateUpdates(Integer n) throws IOException { | ||
String randomString = generateRandomString(); | ||
List<HoodieRecord> updates = new ArrayList<>(); | ||
for (int i = 0; i < n; i++) { | ||
HoodieKey key = existingKeys.get(rand.nextInt(numExistingKeys - 1)); | ||
HoodieRecord record = generateUpdateRecord(key, randomString); | ||
updates.add(record); | ||
} | ||
return updates; | ||
} | ||
|
||
public void close() { | ||
existingKeys.clear(); | ||
} | ||
} | ||
|
||
private static Option<String> convertToString(HoodieRecord record) { | ||
try { | ||
String str = HoodieAvroUtils.bytesToAvro(((OverwriteWithLatestAvroPayload) record.getData()).recordBytes, | ||
DataGenerator.avroSchema).toString(); | ||
str = "{" + str.substring(str.indexOf("\"ts\":")); | ||
return Option.of(str.replaceAll("}", | ||
", \"partitionpath\": \"" + record.getPartitionPath() + "\"}")); | ||
} catch (IOException e) { | ||
return Option.empty(); | ||
} | ||
} | ||
|
||
public static List<String> convertToStringList(List<HoodieRecord> records) { | ||
return records.stream().map(hr -> convertToString(hr)).filter(os -> os.isPresent()) | ||
.map(os -> os.get()).collect(Collectors.toList()); | ||
} | ||
|
||
public static Map<String, String> getQuickstartWriteConfigs() { | ||
Map<String, String> demoConfigs = new HashMap<>(); | ||
demoConfigs.put("hoodie.insert.shuffle.parallelism", "2"); | ||
demoConfigs.put("hoodie.upsert.shuffle.parallelism", "2"); | ||
return demoConfigs; | ||
} | ||
} |