Skip to content

Commit 33143cd

Browse files
breejeshBreejesh Rathod
and
Breejesh Rathod
authored
breejesh Submission (gunnarmorling#670)
* 1BRC breejesh * Fix output * Fix formatting * Format and remove preview feature * Optimize merge * Revert "Optimize merge" This reverts commit 28c9b4a. --------- Co-authored-by: Breejesh Rathod <[email protected]>
1 parent a533019 commit 33143cd

File tree

3 files changed

+219
-0
lines changed

3 files changed

+219
-0
lines changed

calculate_average_breejesh.sh

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#!/bin/sh
2+
#
3+
# Copyright 2023 The original authors
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
JAVA_OPTS=""
19+
java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_breejesh

prepare_breejesh.sh

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#!/bin/bash
2+
#
3+
# Copyright 2023 The original authors
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
# Uncomment below to use sdk
19+
# source "$HOME/.sdkman/bin/sdkman-init.sh"
20+
# sdk use java 21.0.1-graal 1>&2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Copyright 2023 The original authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package dev.morling.onebrc;
17+
18+
import java.io.File;
19+
import java.io.IOException;
20+
import java.nio.MappedByteBuffer;
21+
import java.nio.channels.FileChannel;
22+
import java.nio.charset.StandardCharsets;
23+
import java.nio.file.Files;
24+
import java.nio.file.StandardOpenOption;
25+
import java.util.*;
26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.ExecutorService;
28+
import java.util.concurrent.Executors;
29+
30+
public class CalculateAverage_breejesh {
31+
private static final String FILE = "./measurements.txt";
32+
private static final int TWO_BYTE_TO_INT = 480 + 48; // 48 is the ASCII code for '0'
33+
private static final int THREE_BYTE_TO_INT = 4800 + 480 + 48;
34+
35+
private static final class Measurement {
36+
37+
private int min;
38+
private int max;
39+
private int total;
40+
private int count;
41+
42+
public Measurement(int value) {
43+
this.min = value;
44+
this.max = value;
45+
this.total = value;
46+
this.count = 1;
47+
}
48+
49+
@Override
50+
public String toString() {
51+
StringBuilder result = new StringBuilder();
52+
result.append(min / 10.0);
53+
result.append("/");
54+
result.append(Math.round(((double) total) / count) / 10.0);
55+
result.append("/");
56+
result.append(max / 10.0);
57+
return result.toString();
58+
}
59+
60+
private void append(int min, int max, int total, int count) {
61+
if (min < this.min)
62+
this.min = min;
63+
if (max > this.max)
64+
this.max = max;
65+
this.total += total;
66+
this.count += count;
67+
}
68+
69+
public void append(int value) {
70+
append(value, value, value, 1);
71+
}
72+
73+
public void merge(Measurement other) {
74+
append(other.min, other.max, other.total, other.count);
75+
}
76+
}
77+
78+
public static void main(String[] args) throws Exception {
79+
// long start = System.currentTimeMillis();
80+
// Find system details to determine cores and
81+
var file = new File(args.length > 0 ? args[0] : FILE);
82+
long fileSize = file.length();
83+
var numberOfCores = fileSize > 1_000_000 ? Runtime.getRuntime().availableProcessors() : 1;
84+
var splitSectionSize = (int) Math.min(Integer.MAX_VALUE, fileSize / numberOfCores); // bytebuffer position is an int, so can be max Integer.MAX_VALUE
85+
var segmentCount = (int) (fileSize / splitSectionSize);
86+
87+
// Divide file into segments
88+
ExecutorService executor = Executors.newFixedThreadPool(segmentCount);
89+
List<CompletableFuture<Map<String, Measurement>>> futures = new ArrayList<>();
90+
for (int i = 0; i < segmentCount; i++) {
91+
long sectionStart = i * (long) splitSectionSize;
92+
long sectionEnd = Math.min(fileSize, sectionStart + splitSectionSize + 100);
93+
var fileChannel = (FileChannel) Files.newByteChannel(file.toPath(), StandardOpenOption.READ);
94+
CompletableFuture<Map<String, Measurement>> future = CompletableFuture.supplyAsync(() -> {
95+
MappedByteBuffer currentBuffer = null;
96+
try {
97+
currentBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, sectionStart, sectionEnd - sectionStart);
98+
}
99+
catch (IOException e) {
100+
throw new RuntimeException(e);
101+
}
102+
// Skip till new line for unequal segments, not to be done for first section
103+
if (sectionStart > 0) {
104+
while (currentBuffer.get() != '\n')
105+
;
106+
}
107+
Map<String, Measurement> map = new HashMap<>();
108+
while (currentBuffer.position() < splitSectionSize) {
109+
// Read station
110+
String str = getStationFromBuffer(currentBuffer);
111+
// Read number
112+
int value = getValueFromBuffer(currentBuffer);
113+
if (map.containsKey(str)) {
114+
map.get(str).append(value);
115+
}
116+
else {
117+
map.put(str, new Measurement(value));
118+
}
119+
}
120+
return map;
121+
}, executor);
122+
futures.add(future);
123+
}
124+
125+
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
126+
Map<String, Measurement> finalMap = new TreeMap<>();
127+
for (CompletableFuture<Map<String, Measurement>> future : futures) {
128+
Map<String, Measurement> map = future.get();
129+
map.keySet().stream().forEach(
130+
key -> {
131+
if (finalMap.containsKey(key)) {
132+
finalMap.get(key).merge(map.get(key));
133+
}
134+
else {
135+
finalMap.put(key, map.get(key));
136+
}
137+
});
138+
}
139+
140+
System.out.println(finalMap);
141+
// System.out.printf("Time %s", System.currentTimeMillis() - start);
142+
System.exit(0);
143+
}
144+
145+
private static String getStationFromBuffer(MappedByteBuffer currentBuffer) {
146+
byte currentByte;
147+
var byteCounter = 0;
148+
var buffer = new byte[100];
149+
while ((currentByte = currentBuffer.get()) != ';') {
150+
buffer[byteCounter++] = currentByte;
151+
}
152+
return new String(buffer, 0, byteCounter, StandardCharsets.UTF_8);
153+
}
154+
155+
private static int getValueFromBuffer(MappedByteBuffer currentBuffer) {
156+
int value;
157+
byte[] nums = new byte[4];
158+
currentBuffer.get(nums);
159+
if (nums[1] == '.') {
160+
// case of n.n
161+
value = (nums[0] * 10 + nums[2] - TWO_BYTE_TO_INT);
162+
}
163+
else {
164+
if (nums[3] == '.') {
165+
// case of -nn.n
166+
value = -(nums[1] * 100 + nums[2] * 10 + currentBuffer.get() - THREE_BYTE_TO_INT);
167+
}
168+
else if (nums[0] == '-') {
169+
// case of -n.n
170+
value = -(nums[1] * 10 + nums[3] - TWO_BYTE_TO_INT);
171+
}
172+
else {
173+
// case of nn.n
174+
value = (nums[0] * 100 + nums[1] * 10 + nums[3] - THREE_BYTE_TO_INT);
175+
}
176+
currentBuffer.get(); // new line
177+
}
178+
return value;
179+
}
180+
}

0 commit comments

Comments
 (0)