Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#620 Add support for shards - SolrSpout #1343

Merged
merged 16 commits into from
Nov 10, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions external/solr/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ solr.status.bucket.maxsize: 100

This feature can be combined with the [partition features](https://github.com/apache/incubator-stormcrawler/wiki/Configuration#fetching-and-partitioning) provided by StormCrawler to balance the crawling process and not just the URL coverage.

It is recommended to use Solr in Cloud mode. The following configuration options are available for distributing the `status` collection across multiple shards.
* `solr.status.routing.fieldname`: Field to be used for routing documents to different shards. The values depend on the `partition.url.mode` (`byHost`, `byDomain`, `byIP`)
* `solr.status.routing.shards`: Number of shards for the `status` collection

### Metadata

The metadata associated with each URL is also persisted in the Solr collection configured. By default the metadata is stored as separated fields in the collection using a prefix that can be configured using the `solr.status.metadata.prefix` option. If no value is supplied for this option the `metadata` value is used. Take a look at the following example record:
Expand All @@ -85,6 +89,7 @@ The metadata associated with each URL is also persisted in the Solr collection c
"url": "http://test.com",
"host": "test.com",
"status": "DISCOVERED",
"key": "test.com",
"metadata.url.path": "http://test.com",
"metadata.depth": "1",
"nextFetchDate": "2015-10-30T17:26:34.386Z"
Expand All @@ -103,7 +108,7 @@ To use a SolrCloud cluster instead of a single Solr server, you must use the fol

## Solr configuration

An example collection configuration for each type of data is also provided in the [`cores`](cores) directory. The configuration is very basic but it will allow you to view all the stored data in Solr.
An example collection configuration for each type of data is also provided in the [`configsets`](configsets) directory. The configuration is very basic but it will allow you to view all the stored data in Solr.

The configuration is only useful as a testing resource, mainly because everything is stored as a `Solr.StrField` which is not very useful for search purposes. Numeric values and dates are also **stored as strings** using dynamic fields.

Expand All @@ -113,5 +118,5 @@ In the `parse` and `status` cores the `uniqueKey` is defined to be the `url` fie

Also keep in mind that depending on your needs you can use the [Schemaless Mode](https://cwiki.apache.org/confluence/display/solr/Schemaless+Mode) available in Solr.

To start SOLR with the preconfigured cores for StormCrawler, you can do `bin/solr start -s stormcrawler/external/solr/cores`, then open the SOLR UI (http://localhost:8983) to check that they have been loaded correctly. Alternatively, create the cores (here `status`) by `bin/solr create -c status -d stormcrawler/external/solr/cores/status/`.

To start Solr with the preconfigured cores for StormCrawler, you may run the script `setup-solr.sh`, then open the SOLR UI (http://localhost:8983) to check that they have been loaded correctly.
The script starts Solr in Cloud mode, uploads the configsets and creates the collections.
18 changes: 12 additions & 6 deletions external/solr/cores/status/core.properties → external/solr/clear-collections.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#Written by CorePropertiesLocator
#Sun Jun 14 04:10:47 CDT 2015
name=status
config=solrconfig.xml
schema=schema.xml
dataDir=data

#!/bin/bash

collections=("docs" "metrics" "status")

for collection in "${collections[@]}"; do
solr_url="http://localhost:8983/solr/$collection/update?commit=true"

echo -e "\n\e[1mDeleting all documents from collection: $collection ...\e[0m"

curl -X POST -H 'Content-Type: application/json' --data-binary '{"delete": {"query": "*:*"}}' "$solr_url"
done
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<schema version="1.5" name="simplest-solr">
<schema version="1.7" name="simplest-solr">
<fieldType name="string" class="solr.StrField"/>

<!-- A general text field that has reasonable, generic
Expand All @@ -44,10 +44,12 @@ under the License.
</analyzer>
</fieldType>

<fieldType name="plong" class="solr.LongPointField"/>
<field name="_version_" type="plong" indexed="false" stored="false"/>

<field name="url" type="string" indexed="true" stored="true" required="true"/>
<field name="content" type="text_general" indexed="true" stored="true"/>

<dynamicField name="*" type="text_general" indexed="true" stored="true"/>
<uniqueKey>url</uniqueKey>
</schema>

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ specific language governing permissions and limitations
under the License.
-->
<config>
<luceneMatchVersion>7.5.0</luceneMatchVersion>
<luceneMatchVersion>9.0.0</luceneMatchVersion>
<requestDispatcher handleSelect="false">
<httpCaching never304="true" />
</requestDispatcher>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<schema version="1.5" name="simplest-solr">
<schema version="1.7" name="simplest-solr">
<fieldType name="string" class="solr.StrField"/>
<fieldType name="pdate" class="solr.DatePointField" docValues="true"/>

Expand All @@ -29,9 +29,11 @@ under the License.
<field name="value" type="string" indexed="true" stored="true" required="true"/>
<field name="timestamp" type="pdate" indexed="true" stored="true" required="true"/>

<fieldType name="plong" class="solr.LongPointField"/>
<field name="_version_" type="plong" indexed="false" stored="false"/>

<dynamicField name="*" type="string" indexed="true" stored="true"/>

<field name="id" type="string" indexed="true" stored="true" required="true"/>
<uniqueKey>id</uniqueKey>
</schema>

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ specific language governing permissions and limitations
under the License.
-->
<config>
<luceneMatchVersion>7.5.0</luceneMatchVersion>
<luceneMatchVersion>9.0.0</luceneMatchVersion>
<requestDispatcher handleSelect="false">
<httpCaching never304="true" />
</requestDispatcher>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<schema version="1.5" name="simplest-solr">
<schema version="1.7" name="simplest-solr">
<fieldType name="string" class="solr.StrField"/>
<fieldType name="pdate" class="solr.DatePointField" docValues="true"/>

<fieldType name="plong" class="solr.LongPointField"/>
<field name="_version_" type="plong" indexed="false" stored="false"/>

<field name="url" type="string" indexed="true" stored="true" required="true"/>
<field name="host" type="string" indexed="true" stored="true" required="true"/>
<field name="status" type="string" indexed="true" stored="true" required="true"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ specific language governing permissions and limitations
under the License.
-->
<config>
<luceneMatchVersion>7.5.0</luceneMatchVersion>
<luceneMatchVersion>9.0.0</luceneMatchVersion>
<requestDispatcher handleSelect="false">
<httpCaching never304="true" />
</requestDispatcher>
Expand Down
20 changes: 0 additions & 20 deletions external/solr/cores/docs/core.properties

This file was deleted.

20 changes: 0 additions & 20 deletions external/solr/cores/metrics/core.properties

This file was deleted.

20 changes: 0 additions & 20 deletions external/solr/cores/solr.xml

This file was deleted.

71 changes: 71 additions & 0 deletions external/solr/setup-solr.sh
jnioche marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#!/bin/bash

STATUS_SHARDS=$(grep 'solr.status.routing.shards' solr-conf.yaml | sed -e 's/.*: //' | tr -d ' ')
ROUTER_FIELD=$(grep 'solr.status.routing.fieldname' solr-conf.yaml | sed -e 's/.*: //' | tr -d ' ')

if [ -z "$STATUS_SHARDS" ]; then
echo -e "\e[1mProperty 'solr.status.routing.shards not defined in solr-conf.yaml'. Defaulting to 1 ...\e[0m\n"
STATUS_SHARDS=1
fi

if [ -z "$ROUTER_FIELD" ]; then
echo -e "\e[1mProperty 'solr.status.routing.fieldname' not defined in solr-conf.yaml. Defaulting to 'key' ...\e[0m\n"
ROUTER_FIELD="key"
fi

SOLR_PORT=8983
SOLR_HOME=/opt/solr-9.7.0

$SOLR_HOME/bin/solr start -cloud -p $SOLR_PORT

echo -e "\n\e[1mUploading configsets ...\e[0m\n"

$SOLR_HOME/bin/solr zk upconfig -n "docs" -d configsets/docs -z localhost:9983
$SOLR_HOME/bin/solr zk upconfig -n "status" -d configsets/status -z localhost:9983
$SOLR_HOME/bin/solr zk upconfig -n "metrics" -d configsets/metrics -z localhost:9983

echo -e "\n\n\e[1mCreating 'docs' collection ...\e[0m\n"
curl -X POST "http://localhost:$SOLR_PORT/api/collections" -H "Content-type:application/json" -d '
{
"name": "docs",
mvolikas marked this conversation as resolved.
Show resolved Hide resolved
"numShards": 1,
"replicationFactor": 1,
"config": "docs"
}'

echo -e "\n\n\e[1mCreating 'status' collection with $STATUS_SHARDS replicas and routing based on '$ROUTER_FIELD' ...\e[0m\n"
curl -X POST "http://localhost:$SOLR_PORT/api/collections" -H "Content-type:application/json" -d '
{
"name": "status",
"numShards": '$STATUS_SHARDS',
"replicationFactor": 1,
"router": {
"name": "compositeId",
"field": '$ROUTER_FIELD'
},
"config": "status"
}'

echo -e "\n\n\e[1mCreating 'metrics' collection ...\e[0m\n"
curl -X POST "http://localhost:$SOLR_PORT/api/collections" -H "Content-type:application/json" -d '
{
"name": "metrics",
"numShards": 1,
"replicationFactor": 1,
"config": "metrics"
}'
1 change: 1 addition & 0 deletions external/solr/solr-conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ config:
# the routing is done on the value of 'partition.url.mode'
# stores the value used for grouping the URLs as a separate field
solr.status.routing.fieldname: "key"
solr.status.routing.shards: 10

# time in secs for which the URLs will be considered for fetching after a ack or fail
spout.ttl.purgatory: 30
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.time.Instant;
import java.util.Collection;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.SolrQuery;
Expand All @@ -32,11 +33,17 @@
import org.apache.storm.task.TopologyContext;
import org.apache.stormcrawler.Metadata;
import org.apache.stormcrawler.persistence.AbstractQueryingSpout;
import org.apache.stormcrawler.solr.Constants;
import org.apache.stormcrawler.solr.SolrConnection;
import org.apache.stormcrawler.util.ConfUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Spout which pulls URLs from a Solr index. The number of Spout instances should be the same as the
* number of Solr shards (`solr.status.routing.shards`). Guarantees a good mix of URLs by
* aggregating them by an arbitrary field e.g. key.
*/
@SuppressWarnings("serial")
public class SolrSpout extends AbstractQueryingSpout {

Expand All @@ -49,6 +56,12 @@ public class SolrSpout extends AbstractQueryingSpout {
private static final String SolrMetadataPrefix = "solr.status.metadata.prefix";
private static final String SolrMaxResultsParam = "solr.status.max.results";

private static final String SolrShardsParamName = Constants.PARAMPREFIX + "%s.routing.shards";

private int solrShards;

private int shardID = 1;

private SolrConnection connection;

private int maxNumResults = 10;
Expand All @@ -71,13 +84,19 @@ public void open(

super.open(stormConf, context, collector);

// This implementation works only where there is a single instance
// of the spout. Having more than one instance means that they would run
// the same queries and send the same tuples down the topology.
solrShards =
ConfUtils.getInt(
stormConf,
String.format(Locale.ROOT, SolrSpout.SolrShardsParamName, BOLT_TYPE),
1);

int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
if (totalTasks > 1) {
throw new RuntimeException("Can't have more than one instance of SOLRSpout");
if (totalTasks != solrShards) {
throw new RuntimeException(
"Number of SolrSpout instances should be the same as 'status' collection shards");
} else {
// Solr uses 1-based indexing in shard names (shard1, shard2 ...)
shardID = context.getThisTaskIndex() + 1;
}

diversityField = ConfUtils.getString(stormConf, SolrDiversityFieldParam);
Expand Down Expand Up @@ -135,7 +154,8 @@ else if (resetFetchDateAfterNSecs != -1) {

query.setQuery("*:*")
.addFilterQuery("nextFetchDate:[* TO " + lastNextFetchDate + "]")
.setSort("nextFetchDate", ORDER.asc);
.setSort("nextFetchDate", ORDER.asc)
.setParam("shards", "shard" + shardID);
mvolikas marked this conversation as resolved.
Show resolved Hide resolved

if (StringUtils.isNotBlank(diversityField) && diversityBucketSize > 0) {
String[] diversityFields = diversityField.split(",");
Expand All @@ -156,10 +176,15 @@ else if (resetFetchDateAfterNSecs != -1) {
LOG.debug("QUERY => {}", query);

try {
LOG.trace("isInQuery set to true");
isInQuery.set(true);

long startQuery = System.currentTimeMillis();
QueryResponse response = connection.getClient().query(query);
long endQuery = System.currentTimeMillis();

markQueryReceivedNow();

mvolikas marked this conversation as resolved.
Show resolved Hide resolved
queryTimes.addMeasurement(endQuery - startQuery);

SolrDocumentList docs = new SolrDocumentList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ public class IndexerBoltTest extends SolrContainerTest {
private static final Logger LOG = LoggerFactory.getLogger(IndexerBoltTest.class);

@Before
public void setupIndexerBolt() throws IOException, InterruptedException {
public void setup() throws IOException, InterruptedException {
container.start();
createCore("docs");
createCollection("docs", 1);

bolt = new IndexerBolt();
output = new TestOutputCollector();
Expand Down
Loading