Skip to content

Commit c388e47

Browse files
committed
Merge remote-tracking branch 'upstream/master' into compute-statistics
2 parents 9368b49 + 1c63e0b commit c388e47

37 files changed

+20109
-19
lines changed

cc_pseudo_crawl/.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
sourcing_sheet_seeds/seeds.gz.parquet

cc_pseudo_crawl/DEPTH.md

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
## Strategy to get depth 1
2+
3+
### Context
4+
5+
Once we've extract all the seed pages, we plan to make a pseudo crawl. The idea is simple:
6+
- we extract the outgoing urls from those pages.
7+
- we find the most recent record in CC matching that url (if it exists).
8+
- we do the entire processing for all the new records.pages
9+
- we update `outgoing_urls` to obtain `outgoing_ids`
10+
11+
### Process
12+
13+
- 1) Make Athena query
14+
- 2) Preprocess dataset to: load_warc, obtain pdf_urls, extract external_urls
15+
- 3) Build new query with all `external_urls`
16+
- 4) Repeat 1-3 until reaching the depth we want.
17+
- 5) Finalise `finalise.py` to: generate ids, generate `external_ids` that map to rows inside dataset.

cc_pseudo_crawl/README.md

+44-11
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,14 @@ For every site list
2626
```
2727
aws s3 cp seeds.gz.parquet s3://bucket/path/seeds/
2828
```
29-
Note: the S3 path must point to a bucket with write permissions granted. The path needs to be adjusted also in follwing commands.
29+
Note: the S3 path must point to a bucket with write permissions granted. The path needs to be adjusted also in following commands.
3030

3131
3. import the seed table into Athena
3232
```sql
33-
CREATE EXTERNAL TABLE IF NOT EXISTS bigscience.seeds (
33+
CREATE EXTERNAL TABLE IF NOT EXISTS bigscience.seed (
3434
`id` int,
3535
`title` string,
3636
`link` string,
37-
`language` string,
3837
`url_path_prefix` string,
3938
`url_host_name` string,
4039
`url_host_registered_domain` string,
@@ -48,18 +47,14 @@ For every site list
4847

4948
4. join the seeds table crawl by crawl with Common Crawl's index, creating a temporary table which is later used as one partition of the result table
5049
```
51-
python3 cc_lookup.py s3://bucket/path seeds "CC-MAIN-2021"
50+
python3 cc_lookup_seed.py s3://bucket/path seeds "CC-MAIN-2021"
5251
```
5352
This will run the join for all crawls of the year 2021 and put the join data into `s3://bucket/path/cc`.
5453

5554
5. finally, create a table holding the result data in order to get further metrics or prepare the content export
5655
```sql
57-
CREATE EXTERNAL TABLE IF NOT EXISTS bigscience.cc (
58-
id INT,
59-
title STRING,
60-
link STRING,
61-
language STRING,
62-
url_surtkey_prefix STRING,
56+
CREATE EXTERNAL TABLE IF NOT EXISTS bigscience.cc_seed (
57+
seed_id INT,
6358
url_surtkey STRING,
6459
url_host_tld STRING,
6560
url_host_registered_domain STRING,
@@ -85,5 +80,43 @@ For every site list
8580

8681
6. load the partitions of the join table
8782
```sql
88-
MSCK REPAIR TABLE bigscience.cc;
83+
MSCK REPAIR TABLE bigscience.cc_seed;
8984
```
85+
86+
7. We want to run deduplication in terms of urls.
87+
```sql
88+
CREATE TABLE bigscience.cc_seed_dedup_url
89+
WITH (external_location = 's3://bucket/path/cc-seed_dedup_url/',
90+
partitioned_by = ARRAY['subset'],
91+
format = 'PARQUET',
92+
parquet_compression = 'GZIP')
93+
AS
94+
WITH tmp AS (
95+
SELECT *, row_number() over (partition by url order by fetch_time desc) row
96+
FROM bigscience.cc_seed
97+
)
98+
99+
SELECT
100+
seed_id,
101+
url,
102+
url_surtkey,
103+
url_host_tld,
104+
url_host_registered_domain,
105+
url_host_name,
106+
fetch_status,
107+
fetch_time,
108+
warc_filename,
109+
warc_record_offset,
110+
warc_record_length,
111+
fetch_redirect,
112+
content_mime_detected,
113+
content_languages,
114+
subset
115+
FROM tmp
116+
WHERE row = 1
117+
```
118+
8. Run `download_warc.py` The script will download warc files. We provide two helpers: a bash script and a slurm script for easy use
119+
120+
10. Run `preprocess_dataset.py`. This will help populate specific columns such as `outgoing_links`, `depth` ...
121+
122+
9. Send dataset in a bucket somewhere.

cc_pseudo_crawl/cc_lookup_next.py

+181
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
# iterate over monthly crawls and store
2+
# the joined data as a partition of the result table
3+
4+
import logging
5+
import re
6+
import sys
7+
8+
from pyathena import connect
9+
10+
logging.basicConfig(
11+
level="INFO", format="%(asctime)s %(levelname)s %(name)s: %(message)s"
12+
)
13+
14+
join_template = """
15+
CREATE TABLE {db}._tmp_overlap
16+
WITH (external_location = '{s3_location}/crawl={crawl}/',
17+
partitioned_by = ARRAY['subset'],
18+
format = 'PARQUET',
19+
parquet_compression = 'GZIP')
20+
AS SELECT
21+
cc.url_surtkey AS url_surtkey,
22+
cc.url_host_tld AS url_host_tld,
23+
cc.url_host_registered_domain AS url_host_registered_domain,
24+
cc.url_host_name AS url_host_name,
25+
cc.url AS url,
26+
cc.fetch_status AS fetch_status,
27+
cc.fetch_time AS fetch_time,
28+
cc.warc_filename AS warc_filename,
29+
cc.warc_record_offset AS warc_record_offset,
30+
cc.warc_record_length AS warc_record_length,
31+
cc.fetch_redirect AS fetch_redirect,
32+
cc.content_mime_detected AS content_mime_detected,
33+
cc.content_languages AS content_languages,
34+
cc.subset AS subset
35+
FROM ccindex.ccindex AS cc
36+
RIGHT OUTER JOIN {db}.{url_table} AS {tid}
37+
ON cc.url = {tid}.url
38+
WHERE cc.crawl = '{crawl}'
39+
"""
40+
41+
drop_tmp_table = "DROP TABLE `{db}._tmp_overlap`;"
42+
43+
# list of crawls
44+
# Note: in order to get a list of released crawls:
45+
# - query Athena
46+
# SHOW PARTITIONS ccindex
47+
# - see
48+
# https://commoncrawl.s3.amazonaws.com/crawl-data/index.html
49+
crawls = [
50+
"CC-MAIN-2013-20",
51+
"CC-MAIN-2013-48",
52+
#
53+
"CC-MAIN-2014-10",
54+
"CC-MAIN-2014-15",
55+
"CC-MAIN-2014-23",
56+
"CC-MAIN-2014-35",
57+
"CC-MAIN-2014-41",
58+
"CC-MAIN-2014-42",
59+
"CC-MAIN-2014-49",
60+
"CC-MAIN-2014-52",
61+
#
62+
"CC-MAIN-2015-06",
63+
"CC-MAIN-2015-11",
64+
"CC-MAIN-2015-14",
65+
"CC-MAIN-2015-18",
66+
"CC-MAIN-2015-22",
67+
"CC-MAIN-2015-27",
68+
"CC-MAIN-2015-32",
69+
"CC-MAIN-2015-35",
70+
"CC-MAIN-2015-40",
71+
"CC-MAIN-2015-48",
72+
#
73+
"CC-MAIN-2016-07",
74+
"CC-MAIN-2016-18",
75+
"CC-MAIN-2016-22",
76+
"CC-MAIN-2016-26",
77+
"CC-MAIN-2016-30",
78+
"CC-MAIN-2016-36",
79+
"CC-MAIN-2016-40",
80+
"CC-MAIN-2016-44",
81+
"CC-MAIN-2016-50",
82+
#
83+
"CC-MAIN-2017-04",
84+
"CC-MAIN-2017-09",
85+
"CC-MAIN-2017-13",
86+
"CC-MAIN-2017-17",
87+
"CC-MAIN-2017-22",
88+
"CC-MAIN-2017-26",
89+
"CC-MAIN-2017-30",
90+
"CC-MAIN-2017-34",
91+
"CC-MAIN-2017-39",
92+
"CC-MAIN-2017-43",
93+
"CC-MAIN-2017-47",
94+
"CC-MAIN-2017-51",
95+
#
96+
"CC-MAIN-2018-05",
97+
"CC-MAIN-2018-09",
98+
"CC-MAIN-2018-13",
99+
"CC-MAIN-2018-17",
100+
"CC-MAIN-2018-22",
101+
"CC-MAIN-2018-26",
102+
"CC-MAIN-2018-30",
103+
"CC-MAIN-2018-34",
104+
"CC-MAIN-2018-39",
105+
"CC-MAIN-2018-43",
106+
"CC-MAIN-2018-47",
107+
"CC-MAIN-2018-51",
108+
#
109+
"CC-MAIN-2019-04",
110+
"CC-MAIN-2019-09",
111+
"CC-MAIN-2019-13",
112+
"CC-MAIN-2019-18",
113+
"CC-MAIN-2019-22",
114+
"CC-MAIN-2019-26",
115+
"CC-MAIN-2019-30",
116+
"CC-MAIN-2019-35",
117+
"CC-MAIN-2019-39",
118+
"CC-MAIN-2019-43",
119+
"CC-MAIN-2019-47",
120+
"CC-MAIN-2019-51",
121+
#
122+
"CC-MAIN-2020-05",
123+
"CC-MAIN-2020-10",
124+
"CC-MAIN-2020-16",
125+
"CC-MAIN-2020-24",
126+
"CC-MAIN-2020-29",
127+
"CC-MAIN-2020-34",
128+
"CC-MAIN-2020-40",
129+
"CC-MAIN-2020-45",
130+
"CC-MAIN-2020-50",
131+
#
132+
"CC-MAIN-2021-04",
133+
"CC-MAIN-2021-10",
134+
"CC-MAIN-2021-17",
135+
"CC-MAIN-2021-21",
136+
"CC-MAIN-2021-25",
137+
"CC-MAIN-2021-31",
138+
"CC-MAIN-2021-39",
139+
"CC-MAIN-2021-43",
140+
"CC-MAIN-2021-49",
141+
#
142+
]
143+
144+
145+
s3_location = sys.argv[1]
146+
s3_location = s3_location.rstrip("/") # no trailing slash!
147+
148+
url_table = sys.argv[2]
149+
150+
crawl_selector = re.compile(sys.argv[3], re.IGNORECASE)
151+
152+
153+
crawls = filter(lambda c: crawl_selector.match(c), crawls)
154+
155+
156+
cursor = connect(
157+
s3_staging_dir="{}/staging".format(s3_location), region_name="us-east-1"
158+
).cursor()
159+
160+
for crawl in crawls:
161+
query = join_template.format(
162+
crawl=crawl,
163+
s3_location=f"{s3_location}/cc-{url_table}",
164+
db="bigscience",
165+
url_table=url_table,
166+
tid="bs",
167+
)
168+
logging.info("Athena query: %s", query)
169+
170+
cursor.execute(query)
171+
logging.info("Athena query ID %s: %s", cursor.query_id, cursor.result_set.state)
172+
logging.info(
173+
" data_scanned_in_bytes: %d", cursor.result_set.data_scanned_in_bytes
174+
)
175+
logging.info(
176+
" total_execution_time_in_millis: %d",
177+
cursor.result_set.total_execution_time_in_millis,
178+
)
179+
180+
cursor.execute(drop_tmp_table.format(db="bigscience"))
181+
logging.info("Drop temporary table: %s", cursor.result_set.state)

cc_pseudo_crawl/cc_lookup.py renamed to cc_pseudo_crawl/cc_lookup_seed.py

+2-6
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,7 @@
2020
format = 'PARQUET',
2121
parquet_compression = 'GZIP')
2222
AS SELECT
23-
{tid}.id AS id,
24-
{tid}.title AS title,
25-
{tid}.link AS link,
26-
{tid}.language AS language,
27-
{tid}.url_surtkey AS url_surtkey_prefix,
23+
{tid}.id AS seed_id,
2824
cc.url_surtkey AS url_surtkey,
2925
cc.url_host_tld AS url_host_tld,
3026
cc.url_host_registered_domain AS url_host_registered_domain,
@@ -168,7 +164,7 @@
168164
for crawl in crawls:
169165
query = join_template.format(
170166
crawl=crawl,
171-
s3_location="{}/cc".format(s3_location),
167+
s3_location=f"{s3_location}/cc-{seed_table}",
172168
db="bigscience",
173169
seed_table=seed_table,
174170
tid="bs",
+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import os
2+
import logging
3+
from argparse import ArgumentParser
4+
5+
from datasets import load_from_disk
6+
from datasets.utils.logging import set_verbosity_info
7+
8+
set_verbosity_info()
9+
logger = logging.getLogger(__name__)
10+
11+
12+
def get_args():
13+
parser = ArgumentParser()
14+
parser.add_argument("--dataset-dir", type=str, required=True, help="Dataset name.")
15+
16+
args = parser.parse_args()
17+
return args
18+
19+
20+
def main():
21+
# Setup logging
22+
logging.basicConfig(
23+
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
24+
datefmt="%m/%d/%Y %H:%M:%S",
25+
level=logging.INFO,
26+
)
27+
args = get_args()
28+
logger.info(
29+
f"** The job is runned with the following arguments: **\n{args}\n **** "
30+
)
31+
32+
for dataset_name in os.listdir(args.dataset_dir):
33+
dataset_path = os.path.join(args.dataset_dir, dataset_name)
34+
try:
35+
logging.info(f"Processing: {dataset_path}")
36+
ds = load_from_disk(dataset_path)
37+
new_ds = ds.filter(keep_failed_examples)
38+
logging.info(f"Here's the subset of failed downloads: {new_ds}")
39+
except Exception as e:
40+
logging.warning(f"Failed to process {dataset_path} with error '{str(e)}'")
41+
42+
43+
def keep_failed_examples(example):
44+
if example["download_exception"] is None:
45+
return False
46+
return True
47+
48+
49+
if __name__ == "__main__":
50+
main()

0 commit comments

Comments
 (0)