# Common Crawl Index Table Build and process the [Common Crawl index table](https://commoncrawl.org/2018/03/index-to-warc-files-and-urls-in-columnar-format/) – an index to WARC files in a columnar data format ([Apache Parquet](https://parquet.apache.org/)). The index table is built from the Common Crawl URL index files by [Apache Spark](https://spark.apache.org/). It can be queried by [SparkSQL](https://spark.apache.org/sql/), [Amazon Athena](https://aws.amazon.com/athena/) (built on [Presto](https://prestosql.io/) or [Trino](https://trino.io/)), [Apache Hive](https://hive.apache.org/) and many other big data frameworks and applications. This projects provides a comprehensive set of example queries (SQL) and also Java code to fetch and process the WARC records matched by a SQL query. ## Build Java tools Java 11 or upwards are required. `mvn package` ### Javadocs The Javadocs are created by `mvn javadoc:javadoc`. Then open the file `target/reports/apidocs/index.html` in a browser. ### Source Code Formatting Run `mvn spotless:check` and `mvn spotless:apply`, see the [Spotless Maven guide](https://github.com/diffplug/spotless/blob/main/plugin-maven/README.md). Java formatting rules are defined in [eclipse-formatter.xml](eclipse-formatter.xml). ## Spark installation [Spark](https://spark.apache.org/) needs to be installed in order to [build the table](#conversion-of-the-url-index) and also (alternatively) [for processing](#process-the-table-with-spark). Please refer to the [Spark documentation](https://spark.apache.org/docs/latest/) how to install Spark and set up a Spark cluster. ## Building and running using Docker A [Dockerfile](./Dockerfile) is provided to compile the project and run the Spark job in a Docker container. 1. build the Docker image: ```sh docker build . -t cc-index-table ``` 2. run the table converter tool, here showing the command-line help (`--help`): ```sh docker run --rm -ti cc-index-table --help ``` More details to run the converter are given below. Note that the Dockerfile defines the conversion tool as entry point. Overriding the entrypoint would allow to inspect the container using an interactive shell: ``` $> docker run --rm --entrypoint=/bin/bash -ti cc-index-table spark@9eb71e5f09a6:/app$ java -version openjdk version "17.0.15" 2025-04-15 OpenJDK Runtime Environment Temurin-17.0.15+6 (build 17.0.15+6) OpenJDK 64-Bit Server VM Temurin-17.0.15+6 (build 17.0.15+6, mixed mode, sharing) ``` Or you could directly call the command `spark-submit`: ```sh docker run --rm --entrypoint=/opt/spark/bin/spark-submit cc-index-table ``` ## Python, PySpark, Jupyter Notebooks Not part of this project. Please have a look at [cc-pyspark](//github.com/commoncrawl/cc-pyspark) for examples how to query and process the tabular URL index with Python and PySpark. The project [cc-notebooks](//github.com/commoncrawl/cc-notebooks) includes some examples how to gain insights into the Common Crawl data sets using the columnar index. ## Conversion of the URL index A Spark job converts the Common Crawl URL index files (a [sharded gzipped index](https://pywb.readthedocs.io/en/latest/manual/indexing.html#zipnum-sharded-index) in [CDXJ format](https://iipc.github.io/warc-specifications/specifications/cdx-format/openwayback-cdxj/)) into a table in [Parquet](https://parquet.apache.org/) or [ORC](https://orc.apache.org/) format. ``` > APPJAR=target/cc-index-table-0.3-SNAPSHOT-jar-with-dependencies.jar > $SPARK_HOME/bin/spark-submit --class org.commoncrawl.spark.CCIndex2Table $APPJAR CCIndex2Table [options] Arguments: pattern describing paths of input CDX files, e.g. s3a://commoncrawl/cc-index/collections/CC-MAIN-2017-43/indexes/cdx-*.gz output directory Options: -h,--help Show this message --outputCompression data output compression codec: gzip/zlib (default), snappy, lzo, none --outputFormat data output format: parquet (default), orc --partitionBy partition data by columns (comma-separated, default: crawl,subset) --useNestedSchema use the schema with nested columns (default: false, use flat schema) ``` The script [convert_url_index.sh](src/script/convert_url_index.sh) runs `CCIndex2Table` using Spark on Yarn. Columns are defined and described in the table schema ([flat](src/main/resources/schema/cc-index-schema-flat.json) or [nested](src/main/resources/schema/cc-index-schema-nested.json)). ### Runing the converter in a Docker container The converter can be run from the Docker container, built from the Dockerfile, see the instructions above. The steps given below are just an example – the way data is passed in and out from the container may vary. ```sh # create a test folder mkdir -p /tmp/data/in # copy CDX files into /tmp/data/in/ cp .../*.cdx.gz /tmp/data/in/ tree /tmp/data/ # outputs: # /tmp/data/ # └── in # └── CC-MAIN-20241208172518-20241208202518-00000.cdx.gz # ensure that also the user "spark" in the container has write permissions chmod a+w /tmp/data # note: the output will be written to /tmp/data/out/, but Spark # will complain if the output folder already exists # launch the Docker container, running the Spark job docker run --mount=type=bind,source=/tmp/data,destination=/data --rm cc-index-table /data/in /data/out tree /tmp/data/ # /tmp/data/ # ├── in # │   └── CC-MAIN-20241208172518-20241208202518-00000.cdx.gz # └── out # ├── crawl=CC-MAIN-2024-51 # │   └── subset=warc # │   └── part-00000-4b2c091d-24db-4248-8c3c-817fd04b7a85.c000.gz.parquet # └── _SUCCESS ``` ## Query the table in Amazon Athena First, the table needs to be imported into [Amazon Athena](https://aws.amazon.com/athena/). In the Athena Query Editor: 1. create a database `ccindex`: `CREATE DATABASE ccindex` and make sure that it's selected as "DATABASE" 2. edit the "create table" statement ([flat](src/sql/athena/cc-index-create-table-flat.sql) or [nested](src/sql/athena/cc-index-create-table-nested.sql)) and add the correct table name and path to the Parquet/ORC data on `s3://`. Execute the "create table" query. 3. make Athena recognize the data partitions on `s3://`: `MSCK REPAIR TABLE ccindex` (do not forget to adapt the table name). This step needs to be repeated every time new data partitions have been added. A couple of sample queries are also provided (for the flat schema): - count captures over partitions (crawls and subsets), get a quick overview how many pages are contained in the monthly crawl archives (and are also indexed in the table): [count-by-partition.sql](src/sql/examples/cc-index/count-by-partition.sql) - page/host/domain counts per top-level domain: [count-by-tld-page-host-domain.sql](src/sql/examples/cc-index/count-by-tld-page-host-domain.sql) - "word" count of - host name elements (split host name at `.` into words): [count-hostname-elements.sql](src/sql/examples/cc-index/count-hostname-elements.sql) - URL path elements (separated by `/`): [count-url-path-elements.sql](src/sql/examples/cc-index/count-url-path-elements.sql) - count - HTTP status codes: [count-fetch-status.sql](src/sql/examples/cc-index/count-fetch-status.sql) - the domains of a specific top-level domain: [count-domains-of-tld.sql](src/sql/examples/cc-index/count-domains-of-tld.sql) - page captures of Internationalized Domain Names (IDNA): [count-idna.sql](src/sql/examples/cc-index/count-idna.sql) - URL paths pointing to robots.txt files [count-robotstxt-url-paths.sql](src/sql/examples/cc-index/count-robotstxt-url-paths.sql) (note: `/robots.txt` may be a redirect) - pages of the Alexa top 1 million sites by joining two tables (ccindex and a CSV file): [count-domains-alexa-top-1m.sql](src/sql/examples/cc-index/count-domains-alexa-top-1m.sql) - compare document MIME types (Content-Type in HTTP response header vs. MIME type detected by [Tika](https://tika.apache.org/): [compare-mime-type-http-vs-detected.sql](src/sql/examples/cc-index/compare-mime-type-http-vs-detected.sql) - distribution/histogram of host name lengths: [host-length-distrib.sql](src/sql/examples/cc-index/host-length-distrib.sql) - export WARC record specs (file, offset, length) for - a single domain: [get-records-of-domain.sql](src/sql/examples/cc-index/get-records-of-domain.sql) - a specific MIME type: [get-records-of-mime-type.sql](src/sql/examples/cc-index/get-records-of-mime-type.sql) - a specific language (e.g., Icelandic): [get-records-for-language.sql](src/sql/examples/cc-index/get-records-for-language.sql) - home pages of a given list of domains: [get-records-home-pages.sql](src/sql/examples/cc-index/get-records-home-pages.sql) - find homepages for low-resource languages: [get-home-pages-languages.sql](src/sql/examples/cc-index/get-home-pages-languages.sql) - obtain a random sample of URLs: [random-sample-urls.sql](src/sql/examples/cc-index/random-sample-urls.sql) - find similar domain names by Levenshtein distance (few characters changed): [similar-domains.sql](src/sql/examples/cc-index/similar-domains.sql) - average length, occupied storage and payload truncation of WARC records by MIME type: [average-warc-record-length-by-mime-type.sql](src/sql/examples/cc-index/average-warc-record-length-by-mime-type.sql) - count pairs of top-level domain and content language: [count-language-tld.sql](src/sql/examples/cc-index/count-language-tld.sql) - find correlations between TLD and content language using the log-likelihood ratio: [loglikelihood-language-tld.sql](src/sql/examples/cc-index/loglikelihood-language-tld.sql) - ... and similar for correlations between content language and character encoding: [correlation-language-charset.sql](src/sql/examples/cc-index/correlation-language-charset.sql) - site discovery by content language: - specific language(s): [site-discovery-by-language.sql](src/sql/examples/cc-index/site-discovery-by-language.sql) - non-English sites: [discovery-of-non-english-sites](src/sql/examples/cc-index/discovery-of-non-english-sites.sql) - Hungarian sites: [site-discovery-hungarian.sql](src/sql/examples/cc-index/site-discovery-hungarian.sql) - find multi-lingual domains by analyzing URL paths: [get-language-translations-url-path.sql](src/sql/examples/cc-index/get-language-translations-url-path.sql) - extract robots.txt records for a list of sites: [get-records-robotstxt.sql](src/sql/examples/cc-index/get-records-robotstxt.sql) Athena creates results in CSV format. E.g., for the last example, the mining of multi-lingual domains we get: domain |n_lang | n_pages | lang_counts --------------------------|-------|----------|------------------ vatican.va | 40 | 42795 | {de=3147, ru=20, be=1, fi=3, pt=4036, bg=11, lt=1, hr=395, fr=5677, hu=79, uc=2, uk=17, sk=20, sl=4, sp=202, sq=5, mk=1, ge=204, sr=2, sv=3, or=2243, sw=5, el=5, mt=2, en=7650, it=10776, es=5360, zh=5, iw=2, cs=12, ar=184, vi=1, th=4, la=1844, pl=658, ro=9, da=2, tr=5, nl=57, po=141} iubilaeummisericordiae.va | 7 | 2916 | {de=445, pt=273, en=454, it=542, fr=422, pl=168, es=612} osservatoreromano.va | 7 | 1848 | {de=284, pt=42, en=738, it=518, pl=62, fr=28, es=176} cultura.va | 3 | 1646 | {en=373, it=1228, es=45} annusfidei.va | 6 | 833 | {de=51, pt=92, en=171, it=273, fr=87, es=159} pas.va | 2 | 689 | {en=468, it=221} photogallery.va | 6 | 616 | {de=90, pt=86, en=107, it=130, fr=83, es=120} im.va | 6 | 325 | {pt=2, en=211, it=106, pl=1, fr=3, es=2} museivaticani.va | 5 | 266 | {de=63, en=54, it=47, fr=37, es=65} laici.va | 4 | 243 | {en=134, it=5, fr=51, es=53} radiovaticana.va | 3 | 220 | {en=5, it=214, fr=1} casinapioiv.va | 2 | 213 | {en=125, it=88} vaticanstate.va | 5 | 193 | {de=25, en=76, it=24, fr=25, es=43} laityfamilylife.va | 5 | 163 | {pt=21, en=60, it=3, fr=78, es=1} camposanto.va | 1 | 156 | {de=156} synod2018.va | 3 | 113 | {en=24, it=67, fr=22} ## Process the Table with Spark ### Export Views As a first use case, let's export parts of the table and save it in one of the formats supported by Spark. The tool [CCIndexExport](src/main/java/org/commoncrawl/spark/examples/CCIndexExport.java) runs a Spark job to extract parts of the index table and save it as a table in Parquet, ORC, JSON or CSV. It may even transform the data into an entirely different table. Please refert to the [Spark SQL programming guide](https://spark.apache.org/docs/latest/sql-programming-guide.html) and the [overview of built-in SQL functions](https://spark.apache.org/docs/latest/api/sql/) for more information. The tool requires as arguments input and output path, but you also want to pass a useful SQL query instead of the default `SELECT * FROM ccindex LIMIT 10`. All available command-line options are show when called with `--help`: ``` > $SPARK_HOME/bin/spark-submit --class org.commoncrawl.spark.examples.CCIndexExport $APPJAR --help CCIndexExport [options] Arguments: path to cc-index table s3://commoncrawl/cc-index/table/cc-main/warc/ output directory Options: -h,--help Show this message -q,--query SQL query to select rows -t,--table name of the table data is loaded into (default: ccindex) --numOutputPartitions repartition data to have output partitions --outputCompression data output compression codec: none, gzip/zlib (default), snappy, lzo, etc. Note: the availability of compression options depends on the chosen output format. --outputFormat data output format: parquet (default), orc, json, csv --outputPartitionBy partition data by columns (comma-separated, default: crawl,subset) ``` The following Spark SQL options are recommended to achieve an optimal query performance: ``` spark.hadoop.parquet.enable.dictionary=true spark.hadoop.parquet.enable.summary-metadata=false spark.sql.hive.metastorePartitionPruning=true spark.sql.parquet.filterPushdown=true ``` Because the schema of the index table has slightly changed over time by adding new columns the following option is required if any of the new columns (e.g., `content_languages`) is used in the query: ``` spark.sql.parquet.mergeSchema=true ``` ### Export Subsets of the Common Crawl Archives The [URL index](https://index.commoncrawl.org/) was initially created to easily fetch web page captures from the Common Crawl archives. The columnar index also contains the necessary information for this task - the fields `warc_filename`, `warc_record_offset` and `warc_record_length`. This allows us to define a subset of the Common Crawl archives by a SQL query, fetch all records of the subset and export them to WARC files for further processing. The tool [CCIndexWarcExport](src/main/java/org/commoncrawl/spark/examples/CCIndexWarcExport.java) addresses this use case: ``` > $SPARK_HOME/bin/spark-submit --class org.commoncrawl.spark.examples.CCIndexWarcExport $APPJAR --help CCIndexWarcExport [options] Arguments: path to cc-index table s3://commoncrawl/cc-index/table/cc-main/warc/ output directory Options: -q,--query SQL query to select rows. Note: the result is required to contain the columns `url', `warc_filename', `warc_record_offset' and `warc_record_length', make sure they're SELECTed. -t,--table name of the table data is loaded into (default: ccindex) --csv CSV file to load WARC records by filename, offset and length.The CSV file must have column headers and the input columns `url', `warc_filename', `warc_record_offset' and `warc_record_length' are mandatory, see also option --query. -h,--help Show this message --numOutputPartitions repartition data to have output partitions --numRecordsPerWarcFile allow max. records per WARC file. This will repartition the data so that in average one partition contains not more than rows. Default is 10000, set to -1 to disable this option. Note: if both --numOutputPartitions and --numRecordsPerWarcFile are used, the former defines the minimum number of partitions, the latter the maximum partition size. --warcCreator (WARC info record) creator of WARC export --warcOperator (WARC info record) operator of WARC export --warcPrefix WARC filename prefix ``` Let's try to put together a couple of WARC files containing only web pages written in Icelandic (ISO-639-3 language code [isl](https://en.wikipedia.org/wiki/ISO_639:isl)). We choose Icelandic because it's not so common and the number of pages in the Common Crawl archives is manageable, cf. the [language statistics](https://commoncrawl.github.io/cc-crawl-statistics/plots/languages). We take the query [get-records-for-language.sql](src/sql/examples/cc-index/get-records-for-language.sql) and run it as Spark job: ``` > $SPARK_HOME/bin/spark-submit \ --conf spark.hadoop.parquet.enable.dictionary=true \ --conf spark.hadoop.parquet.enable.summary-metadata=false \ --conf spark.sql.hive.metastorePartitionPruning=true \ --conf spark.sql.parquet.filterPushdown=true \ --conf spark.sql.parquet.mergeSchema=true \ --class org.commoncrawl.spark.examples.CCIndexWarcExport $APPJAR \ --query "SELECT url, warc_filename, warc_record_offset, warc_record_length FROM ccindex WHERE crawl = 'CC-MAIN-2018-43' AND subset = 'warc' AND content_languages = 'isl'" \ --numOutputPartitions 12 \ --numRecordsPerWarcFile 20000 \ --warcPrefix ICELANDIC-CC-2018-43 \ s3://commoncrawl/cc-index/table/cc-main/warc/ \ .../my_output_path/ ``` It's also possible to pass the result of SQL query as a CSV file, e.g., an Athena result file. If you've already run the [get-records-for-language.sql](src/sql/examples/cc-index/get-records-for-language.sql) and the output file is available on S3, just replace the `--query` argument by `--csv` pointing to the result file: ``` > $SPARK_HOME/bin/spark-submit --class org.commoncrawl.spark.examples.CCIndexWarcExport $APPJAR \ --csv s3://aws-athena-query-results-123456789012-us-east-1/Unsaved/2018/10/26/a1a82705-047c-4902-981d-b7a93338d5ac.csv \ ... ``` ## Part Row Group Test This repository also includes a tool to check whether the row groups in a given table part parquet file have increasing min/max values - see `util/are_part_min_max_increasing.py`. Note that this tool only checks that, within a single `.parquet` file, each row group's `.min` is greater than or equal to the previous row group's `.max`; further, the context of this condition is restricted to a single parquet file. For cases where the table as a whole consists of multiple parquet files, the condition may not hold across file boundaries.