In this part of series, I will describe, how to fetch German law texts from https://www.gesetze-im-internet.de.
Four formats
The (federal) laws in Germany are published by the Federal Ministry of Justice and Consumer Protection on https://www.gesetze-im-internet.de. There are also land (i.e. state) laws, published here, administrative regulations, published here, and many more laws, but for the sake of simplicity we will use the texts of federal laws only.
As stated in the notes page, there are four formats available:
- HTML (which you can view in browser)
- PDF (most suitable for archive or for printed documents)
- EPUB (for e-book readers)
- XML (original format, which can be converted easily to other formats)
The format of the XML representation is defined by this DTD, which will become very helpful in the next part of this series.
As also stated on the mentioned above notes page, the index XML documents is available at http://www.gesetze-im-internet.de/gii-toc.xml. This index links to XML documents, packed into ZIP archives, all of them having the same name xml.zip
.
The choice of the format
From the four available formats, we need the one, which represents the resulting text with the least markup. The requirement comes from the need to generate a future law text with as little markup as possible.
This requirement, of course, eliminates the PDF format, because it is adapted to the printed media. While the HTML format could be converted to text, for example with the veritable html2text, the contents of law texts are split between small sections, hence complicating the conversion. The conversion of the EPUB format to text is difficult to customise, at least in comparison to XML. Finally, for XML format, there is already a converter to plain text, described in another post.
So we need the documents in XML format.
How to parse HTML with batteries included
Even before Beautiful Soup, it was possible to parse HTML data using the class HTMLParser
from the package html.parser
, documented here.
Also, even before requests, it was possible to fetch data over HTTP with the functions urlopen
and urlretrieve
from the package urllib.request
, documented here and here.
Should you ask yourself at this point, why do I overlook two very nice and tried Python packages, please read the list under First things first in this article.
To parse HTML with the HTMLParser
class, you simply create a subclass from it. Then, depending on what you need to get from HTML data, you implement the handle_*
methods. For example, to parse links from the https://www.gesetze-im-internet.de front page, you need the following code:
from html.parser import HTMLParser | |
from urllib.request import urlopen | |
class Linkparser(HTMLParser): | |
def __init__(self): | |
super(Linkparser, self).__init__() | |
self.links = [] | |
def handle_starttag(self, tag, attrs): | |
if tag == 'a': | |
href = self.get_href(attrs) | |
self.links.append(href) | |
def get_href(self, attrs): | |
for key, value in attrs: | |
if key == 'href': | |
return value | |
else: | |
return None | |
def get_links(self): | |
return self.links | |
parser = Linkparser() | |
with urlopen('https://www.gesetze-im-internet.de/') as response: | |
parser.feed(response.read().decode('iso-8859-1')) | |
for link in parser.get_links(): | |
print(link) |
Collecting all XML documents
While, as mentioned above, there is a list of XML documents here, we will try to collect URLs of all XML documents from the list of current documents at http://www.gesetze-im-internet.de/aktuell.html.
The parser implemented for this page is similar to the previous example. As the current documents are grouped by the first character into separate lists, this parser collects the links to these lists:
import re | |
from html.parser import HTMLParser | |
RE_TEILLISTE = re.compile(r'/Teilliste_\w\.html$', re.IGNORECASE) | |
def get_url(attrs): | |
"""Find href attribute and join it with base URL""" | |
for key, value in attrs: | |
if key == 'href': | |
return urljoin(START_URL, value) | |
else: | |
return None | |
class AktuellParser(HTMLParser): | |
def __init__(self): | |
super(AktuellParser, self).__init__() | |
self.urls = [] | |
def error(self, message): | |
logging.error('HTML parse error: {}'.format(message)) | |
def handle_starttag(self, tag, attrs): | |
if tag == 'a': | |
url = get_url(attrs) | |
if url is None: | |
return | |
if RE_TEILLISTE.search(url): | |
self.urls.append(url) | |
def get_urls(self): | |
return self.urls | |
parser = AktuellParser() | |
with urlopen(START_URL) as response: | |
parser.feed(response.read().decode('iso-8859-1')) | |
partial_list_urls = parser.get_urls() |
As all links to document lists are stored in the variable partial_list_urls
, we must add another parser to fetch the links to XML documents. This parser also stores law names.
import re | |
from html.parser import HTMLParser | |
RE_GESETZ = re.compile(r'\.de/(.+)/index\.html$', re.IGNORECASE) | |
def get_law_title(attrs): | |
"""Find law title""" | |
for key, value in attrs: | |
if key == 'title': | |
return value | |
else: | |
return None | |
class TeillisteParser(HTMLParser): | |
def __init__(self): | |
super(TeillisteParser, self).__init__() | |
self.law_props = {} | |
self.laws = [] | |
def error(self, message): | |
logging.error('HTML parse error: {}'.format(message)) | |
def handle_starttag(self, tag, attrs): | |
if tag == 'a': | |
url = get_url(attrs) | |
if url is None: | |
return | |
match = RE_GESETZ.search(url) | |
if match: | |
self.law_props = {'name': match.group(1), 'url': url} | |
elif tag == 'abbr' and self.law_props: | |
title = get_law_title(attrs) | |
if title is None: | |
raise ValueError('Unknown law with abbreviation {}'.format(self.law_props['name'])) | |
self.law_props['title'] = title | |
self.laws.append(self.law_props) | |
self.law_props = {} | |
def get_laws(self): | |
return self.laws | |
parser = TeillisteParser() | |
for url in partial_list_urls: | |
with urlopen(url) as response: | |
parser.feed(response.read().decode('iso-8859-1')) | |
laws = parser.get_laws() |
Complete fetch code
If we combine the two examples, and add some error handling and some urlretrieve
action as well, we get this:
#!/usr/bin/env python3 | |
"""This is German laws corpus builder.""" | |
import re | |
import sys | |
import logging | |
import argparse | |
from pathlib import Path | |
from html.parser import HTMLParser | |
from urllib.request import urlopen, urlretrieve | |
from urllib.error import HTTPError, URLError | |
from urllib.parse import urljoin | |
START_URL = 'http://www.gesetze-im-internet.de/aktuell.html' | |
RE_TEILLISTE = re.compile(r'/Teilliste_\w\.html$', re.IGNORECASE) | |
RE_GESETZ = re.compile(r'\.de/(.+)/index\.html$', re.IGNORECASE) | |
def get_url(attrs): | |
"""Find href attribute and join it with base URL""" | |
for key, value in attrs: | |
if key == 'href': | |
return urljoin(START_URL, value) | |
else: | |
return None | |
def get_law_title(attrs): | |
"""Find law title""" | |
for key, value in attrs: | |
if key == 'title': | |
return value | |
else: | |
return None | |
class AktuellParser(HTMLParser): | |
def __init__(self): | |
super(AktuellParser, self).__init__() | |
self.urls = [] | |
def error(self, message): | |
logging.error('HTML parse error: {}'.format(message)) | |
def handle_starttag(self, tag, attrs): | |
if tag == 'a': | |
url = get_url(attrs) | |
if url is None: | |
return | |
if RE_TEILLISTE.search(url): | |
self.urls.append(url) | |
def get_urls(self): | |
return self.urls | |
class TeillisteParser(HTMLParser): | |
def __init__(self): | |
super(TeillisteParser, self).__init__() | |
self.law_props = {} | |
self.laws = [] | |
def error(self, message): | |
logging.error('HTML parse error: {}'.format(message)) | |
def handle_starttag(self, tag, attrs): | |
if tag == 'a': | |
url = get_url(attrs) | |
if url is None: | |
return | |
match = RE_GESETZ.search(url) | |
if match: | |
self.law_props = {'name': match.group(1), 'url': url} | |
elif tag == 'abbr' and self.law_props: | |
title = get_law_title(attrs) | |
if title is None: | |
raise ValueError('Unknown law with abbreviation {}'.format(self.law_props['name'])) | |
self.law_props['title'] = title | |
self.laws.append(self.law_props) | |
self.law_props = {} | |
def get_laws(self): | |
return self.laws | |
def fetch(args: argparse.Namespace): | |
"""fetch command""" | |
logging.info('Downloading law metadata...') | |
parser = AktuellParser() | |
try: | |
logging.debug('Fetching {}'.format(START_URL)) | |
with urlopen(START_URL) as response: | |
parser.feed(response.read().decode('iso-8859-1')) | |
except (HTTPError, URLError): | |
logging.error('Error fetching {}'.format(START_URL)) | |
return | |
partial_list_urls = parser.get_urls() | |
parser = TeillisteParser() | |
for url in partial_list_urls: | |
try: | |
logging.debug('Fetching {}'.format(url)) | |
with urlopen(url) as response: | |
parser.feed(response.read().decode('iso-8859-1')) | |
except (HTTPError, URLError): | |
logging.error('Error fetching {}'.format(url)) | |
return | |
laws = parser.get_laws() | |
if args.list: | |
for law in laws: | |
print('{}\t{}'.format(law['name'], law['title'])) | |
else: | |
for index, law in enumerate(laws, 1): | |
if args.only and law['name'] not in args.only: | |
continue | |
logging.info('Downloading "{}" ({}) [{}/{}]...'.format(law['title'], law['name'], index, len(laws))) | |
try: | |
url = urljoin(law['url'], 'xml.zip') | |
logging.debug('Fetching {}'.format(url)) | |
local_filename, _ = urlretrieve(url) | |
except (HTTPError, URLError) as error: | |
logging.warning('Error fetching {}: {}'.format(url, error)) | |
continue | |
target_filename = args.cache / '{}.xml.zip'.format(law['name']) | |
logging.debug('Moving downloaded file to target filename {}'.format(target_filename)) | |
if not args.cache.exists(): | |
args.cache.mkdir() | |
Path(local_filename).replace(target_filename) | |
def main(): | |
class SplitArgs(argparse.Action): | |
"""Command line argument as comma separated list""" | |
def __call__(self, parser, namespace, values, option_string=None): | |
setattr(namespace, self.dest, [v.lower() for v in values.split(',') if v]) | |
parser = argparse.ArgumentParser(description=sys.modules[__name__].__doc__) | |
parser.set_defaults(func=lambda args: parser.print_usage()) | |
parser.add_argument('-c', '--cache', help='Cache directory for fetched files', type=Path, default=Path.cwd() / 'cache') | |
parser.add_argument('--debug', action='store_true', help='Print debug messages') | |
parser.add_argument('--quiet', action='store_true', help='Print errors only') | |
subparsers = parser.add_subparsers(title='Commands') | |
fetch_parser = subparsers.add_parser('fetch', help='Fetch laws in xml.zip format into the cache directory') | |
fetch_parser.add_argument('-o', '--only', action=SplitArgs, help='List of comma separated law abbreviations (example: BGB,hGb)') | |
fetch_parser.add_argument('-l', '--list', action='store_true', help='List laws and exit') | |
fetch_parser.set_defaults(func=fetch) | |
args = parser.parse_args() | |
logging_level = logging.DEBUG if args.debug else logging.WARNING if args.quiet else logging.INFO | |
logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s', level=logging_level) | |
args.func(args) | |
if __name__ == '__main__': | |
main() |
After executing this code, we get 6518 ZIP files into the cache directory.
Next step
In the next step, we will build the text corpus from all the law texts fetched.
Stay tuned!
Top comments (0)