Skip to content

Commit 4560406

Browse files
SaulLuthomasw21
andauthored
Add exact document deduplicate scripts (#407)
Co-authored-by: thomasw21 <24695242+thomasw21@users.noreply.github.com>
1 parent b255444 commit 4560406

File tree

2 files changed

+263
-0
lines changed

2 files changed

+263
-0
lines changed
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
"""Taken from Teven and Leandro"""
2+
import gzip
3+
import os
4+
import shutil
5+
import time
6+
import logging
7+
import argparse
8+
import datasets
9+
10+
from datasets import load_dataset, Features
11+
from datasets.utils.logging import set_verbosity_info
12+
13+
14+
set_verbosity_info()
15+
logger = logging.getLogger(__name__)
16+
17+
null = None
18+
# features = {
19+
# "HtmlPreprocessor_error": {"dtype": "int64", "id": null, "_type": "Value"},
20+
# "HtmlPreprocessor_error_comment": {"dtype": "string", "id": null, "_type": "Value"},
21+
# "content_languages": {"dtype": "string", "id": null, "_type": "Value"},
22+
# "content_mime_detected": {"dtype": "string", "id": null, "_type": "Value"},
23+
# "depth": {"dtype": "int16", "id": null, "_type": "Value"},
24+
# "download_exception": {"dtype": "string", "id": null, "_type": "Value"},
25+
# "external_urls": [{"dtype": "string", "id": null, "_type": "Value"}],
26+
# "fetch_redirect": {"dtype": "string", "id": null, "_type": "Value"},
27+
# "fetch_status": {"dtype": "int32", "id": null, "_type": "Value"},
28+
# "fetch_time": {"dtype": "timestamp[ns]", "id": null, "_type": "Value"},
29+
# "html_error": {"dtype": "string", "id": null, "_type": "Value"},
30+
# "html_footer": [{"dtype": "string", "id": null, "_type": "Value"}],
31+
# "html_head": [{"dtype": "string", "id": null, "_type": "Value"}],
32+
# "html_str": {"dtype": "string", "id": null, "_type": "Value"},
33+
# "html_title": [{"dtype": "string", "id": null, "_type": "Value"}],
34+
# "metadata_html": [
35+
# {
36+
# "char_end_idx": {"dtype": "int64", "id": null, "_type": "Value"},
37+
# "char_start_idx": {"dtype": "int64", "id": null, "_type": "Value"},
38+
# "html_attrs": {
39+
# "attrs": [{"dtype": "string", "id": null, "_type": "Value"}],
40+
# "values": [{"dtype": "string", "id": null, "_type": "Value"}],
41+
# },
42+
# "key": {"dtype": "string", "id": null, "_type": "Value"},
43+
# "relative_end_pos": {"dtype": "int64", "id": null, "_type": "Value"},
44+
# "relative_start_pos": {"dtype": "int64", "id": null, "_type": "Value"},
45+
# "type": {"dtype": "string", "id": null, "_type": "Value"},
46+
# "value": {"dtype": "string", "id": null, "_type": "Value"},
47+
# }
48+
# ],
49+
# "seed_id": {"dtype": "int32", "id": null, "_type": "Value"},
50+
# "text": {"dtype": "string", "id": null, "_type": "Value"},
51+
# "url": {"dtype": "string", "id": null, "_type": "Value"},
52+
# "url_host_name": {"dtype": "string", "id": null, "_type": "Value"},
53+
# "url_host_registered_domain": {"dtype": "string", "id": null, "_type": "Value"},
54+
# "url_host_tld": {"dtype": "string", "id": null, "_type": "Value"},
55+
# "url_surtkey": {"dtype": "string", "id": null, "_type": "Value"},
56+
# "warc_filename": {"dtype": "string", "id": null, "_type": "Value"},
57+
# "warc_record_length": {"dtype": "int32", "id": null, "_type": "Value"},
58+
# "warc_record_offset": {"dtype": "int32", "id": null, "_type": "Value"},
59+
# }
60+
features = {
61+
"text": {"dtype": "string", "id": null, "_type": "Value"},
62+
"meta": {
63+
"content_languages": {"dtype": "string", "id": null, "_type": "Value"},
64+
"seed_id": {"dtype": "int64", "id": null, "_type": "Value"},
65+
"url": {"dtype": "string", "id": null, "_type": "Value"},
66+
},
67+
}
68+
69+
70+
def convert_types(features):
71+
if isinstance(features, dict) and "_type" in features:
72+
return getattr(datasets, features["_type"])(features["dtype"])
73+
elif isinstance(features, dict):
74+
return {key: convert_types(value) for key, value in features.items()}
75+
elif isinstance(features, list):
76+
return [convert_types(value) for value in features]
77+
78+
79+
final_features = convert_types(features)
80+
final_features = Features(final_features)
81+
final_features
82+
83+
84+
def get_hash(example):
85+
"""Get hash of content field."""
86+
return {"hash": hash(example["text"].replace(" ", ""))}
87+
88+
89+
def check_uniques(example, uniques):
90+
"""Check if current hash is still in set of unique hashes and remove if true."""
91+
if example["hash"] in uniques:
92+
uniques.remove(example["hash"])
93+
return True
94+
else:
95+
return False
96+
97+
98+
def preprocess(example):
99+
"""Chain all preprocessing steps into one function to not fill cache."""
100+
results = dict()
101+
results.update(get_hash(example))
102+
return results
103+
104+
105+
def filter(example, uniques, args):
106+
"""Filter dataset with heuristics."""
107+
if not check_uniques(example, uniques):
108+
return False
109+
else:
110+
return True
111+
112+
113+
def compress_file(file_path):
114+
"""Compress a file with g-zip."""
115+
with open(file_path, "rb") as f_in:
116+
with gzip.open(file_path + ".gz", "wb", compresslevel=6) as f_out:
117+
shutil.copyfileobj(f_in, f_out)
118+
os.unlink(file_path)
119+
120+
121+
def main():
122+
logging.basicConfig(
123+
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
124+
datefmt="%m/%d/%Y %H:%M:%S",
125+
level=logging.INFO,
126+
)
127+
parser = argparse.ArgumentParser(description="Load seed and upload to hub")
128+
parser.add_argument(
129+
"--seed-id",
130+
help="seed ID",
131+
required=True,
132+
type=int,
133+
)
134+
parser.add_argument(
135+
"--save-dir", required=True, type=str, help="Where to save the datasets."
136+
)
137+
parser.add_argument(
138+
"--pseudo_crawl_path",
139+
help="path to where the pseudocrawl is located",
140+
required=True,
141+
type=str,
142+
)
143+
parser.add_argument(
144+
"--gzipped",
145+
help="Write file directly in jsonl.gz compressed format",
146+
action="store_true",
147+
)
148+
parser.add_argument(
149+
"--save-batch-size",
150+
help="Batch size used for saving the dataset",
151+
required=True,
152+
type=int,
153+
)
154+
parser.add_argument(
155+
"--batch-size",
156+
help="Batch size used for the mapping and saving of the dataset",
157+
required=True,
158+
type=int,
159+
)
160+
parser.add_argument(
161+
"--num-proc",
162+
help="Number of processors used for the mapping and saving of the dataset",
163+
required=True,
164+
type=int,
165+
)
166+
args = parser.parse_args()
167+
168+
# Load dataset
169+
t_start = time.time()
170+
ds = load_dataset(
171+
"json",
172+
# data_files=[f"{args.pseudo_crawl_path}/seed_id={args.seed_id}/text__html/*.jsonl.gz"],
173+
data_files=[
174+
f"{args.pseudo_crawl_path}/lm_change_lang_id_seed_id_{args.seed_id}_pseudocrawl_change_name/*.jsonl"
175+
],
176+
features=final_features,
177+
split="train",
178+
)
179+
logger.info(f"Time to load dataset: {time.time()-t_start:.2f}")
180+
181+
# Run preprocessing
182+
t_start = time.time()
183+
ds = ds.map(preprocess, num_proc=args.num_proc)
184+
logger.info(f"Time to preprocess dataset: {time.time()-t_start:.2f}")
185+
186+
# Deduplicate hashes
187+
uniques = set(ds.unique("hash"))
188+
frac = len(uniques) / len(ds)
189+
logger.info(f"Fraction of duplicates: {1-frac:.2%}")
190+
191+
# Deduplicate data and apply heuristics
192+
t_start = time.time()
193+
ds_filter = ds.filter(filter, fn_kwargs={"uniques": uniques, "args": args})
194+
logger.info(f"Time to filter dataset: {time.time()-t_start:.2f}")
195+
logger.info(f"Size of filtered dataset: {len(ds_filter)}")
196+
197+
# Save data
198+
t_start = time.time()
199+
if args.gzipped:
200+
file_name = os.path.join(args.save_dir, f"data.jsonl.gz")
201+
logger.info(f"the dataset will be saved at {file_name}")
202+
ds_filter.to_json(
203+
file_name,
204+
num_proc=args.num_proc,
205+
batch_size=args.save_batch_size,
206+
compression="gzip",
207+
)
208+
else:
209+
file_name = os.path.join(args.save_dir, f"data.jsonl")
210+
logger.info(f"the dataset will be saved at {file_name}")
211+
ds_filter.to_json(
212+
file_name,
213+
num_proc=args.num_proc,
214+
batch_size=args.save_batch_size,
215+
)
216+
217+
logger.info(f"Time to save dataset: {time.time()-t_start:.2f}")
218+
219+
220+
if __name__ == "__main__":
221+
main()
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#!/bin/bash
2+
#SBATCH --job-name=pseudo_crawl_deduplicate
3+
#SBATCH --nodes=1
4+
#SBATCH --ntasks-per-node=1 # crucial - only 1 task per dist per node!
5+
#SBATCH --cpus-per-task=4 # number of cores per tasks
6+
#SBATCH --hint=nomultithread # we get physical cores not logical
7+
#SBATCH --partition=cpu_p1
8+
#SBATCH --time 2:00:00 # maximum execution time (HH:MM:SS)
9+
#SBATCH --output=/gpfsscratch/rech/six/commun/pseudo_crawl/seeds_batch_1_2/logs/deduplicate-on-clean-v2/%x-%j.out # output file name #TODO change path if necessary
10+
#SBATCH --array=0-613
11+
#SBATCH --account=six@cpu
12+
13+
set -x -e
14+
15+
source $six_ALL_CCFRWORK/start-prod
16+
conda activate thomas_data_tooling
17+
18+
DATA_TOOLING_REPO=$WORK/repos/sync_data_tooling/data_tooling
19+
20+
DATASET_PATH=$six_ALL_CCFRSCRATCH/pseudo_crawl/seeds_batch_1_2/datasets-clean/bigscience-catalogue-data
21+
22+
SEED_ID=$(python cc_pseudo_crawl/python_scripts/load_all_seed_ids.py \
23+
--seed-paths "$DATA_TOOLING_REPO"/cc_pseudo_crawl/seeds_batch_1/sourcing_sheet_seeds/seeds.csv,"$DATA_TOOLING_REPO"/cc_pseudo_crawl/seeds_batch_2/sourcing_sheet_seeds/seeds.csv \
24+
--seed-index $SLURM_ARRAY_TASK_ID \
25+
)
26+
27+
SAVE_DATASET_DIR=$six_ALL_CCFRSCRATCH/pseudo_crawl/seeds_batch_1_2/datasets-deduplicate-on-clean-v2/bigscience-catalogue-data/lm_change_lang_id_seed_id_${SEED_ID}_pseudocrawl_change_name
28+
echo $DATASET_PATH
29+
pushd $DATA_TOOLING_REPO
30+
31+
mkdir -p $SAVE_DATASET_DIR
32+
33+
export HF_DATASETS_OFFLINE=1
34+
export HF_DATASETS_CACHE=$SCRATCH/to_delete
35+
36+
python cc_pseudo_crawl/python_scripts/exact_deduplicates.py \
37+
--seed-id $SEED_ID \
38+
--save-dir $SAVE_DATASET_DIR \
39+
--pseudo_crawl_path $DATASET_PATH \
40+
--batch-size 1000 \
41+
--save-batch-size 1000 \
42+
--num-proc 8

0 commit comments

Comments
 (0)