Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the parse function to accept an entity id #189

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/buildingcrawler.md
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,14 @@ parse:
author: .//meta[@name="author"]/@content
publishedAt: .//*[@class="date"]/text()
description: .//meta[@property="og:description"]/@content
keys:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest we use syntax similar to ftm mappings for consistency. Something like https://github.com/alephdata/aleph/blob/main/mappings/md_companies.yml#L15-L17

So the keys section will look like:

keys:
  - title
  - author

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The keys section needs to be updated now I think?

- title
- author
- publishedAt
```

We also need to supply a set of values to act as a unique key for this entity. Keys should be selected from the set of properties that you collect and should aim to be unique for the collection of items that you are scraping.

The `data` `alpeh_emit_entity` emits to the next stages includes the following new items:

* `aleph_id`: id of the uploaded entity in Alpeh
Expand Down
4 changes: 4 additions & 0 deletions example/config/simple_article_scraper.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ pipeline:
author: .//meta[@name="author"]/@content
publishedAt: .//*[@class="date"]/text()
description: .//meta[@property="og:description"]/@content
keys:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be the method needs to be changed to use the built-in parse method instead of a custom Python method to match the documentation example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. The problem with doing that though is that the scraper won't be able to extract the body of the article, which is why the custom script exists. I guess, as it's an example it doesn't really matter too much, but that is why we have a difference.

Personally I don't have an issue with having the documentation not match the example in the repo

- title
- author
- publishedAt
handle:
store: store
fetch: fetch
Expand Down
142 changes: 89 additions & 53 deletions memorious/operations/aleph.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from pathlib import Path
from pprint import pprint # noqa
from pprint import pprint
from typing import Optional # noqa
from banal import clean_dict # type: ignore

from alephclient import settings
from alephclient.api import AlephAPI
Expand All @@ -9,59 +11,84 @@
from servicelayer.cache import make_key # type: ignore

from memorious.core import get_rate_limit # type: ignore


def _create_document_metadata(context, data) -> dict:
meta = {}
languages = context.params.get("languages")
meta["languages"] = ensure_list(data.get("languages", languages))
countries = context.params.get("countries")
meta["countries"] = ensure_list(data.get("countries", countries))
mime_type = context.params.get("mime_type")
meta["mime_type"] = data.get("mime_type", mime_type)
return meta


def _create_meta_object(context, data) -> dict:
from memorious.logic.context import Context


class Meta(MetaBase, total=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MetaBase is undefined

crawler: Optional[str]
foreign_id: Optional[str]
source_url: Optional[str]
title: Optional[str]
author: Optional[str]
publisher: Optional[str]
file_name: Optional[str]
retrieved_at: Optional[str]
modified_at: Optional[str]
published_at: Optional[str]
headers: any
keywords: any
parent: any
languages: any
countries: any
mime_type: any


def _create_meta_object(context: Context, data: dict) -> Meta:
languages_default: list[str] = list(context.params.get("languages", []))
countries_default: list[str] = list(context.params.get("countries", []))
mime_type_default: str = context.params.get("mime_type", "")

languages = data.get("languages", languages_default)
countries = data.get("countries", countries_default)
mime_type = data.get("mime_type", mime_type_default)
source_url = data.get("source_url", data.get("url"))
foreign_id = data.get("foreign_id", data.get("request_id", source_url))

meta = {
"crawler": context.crawler.name,
"foreign_id": foreign_id,
"source_url": source_url,
"title": data.get("title"),
"author": data.get("author"),
"publisher": data.get("publisher"),
"file_name": data.get("file_name"),
"retrieved_at": data.get("retrieved_at"),
"modified_at": data.get("modified_at"),
"published_at": data.get("published_at"),
"headers": ensure_dict(data.get("headers")),
"keywords": ensure_list(data.get("keywords")),
}
parent = {}

if data.get("aleph_folder_id"):
meta["parent"] = {"id": data.get("aleph_folder_id")}
parent = {"id": data.get("aleph_folder_id")}

meta = Meta(
crawler=context.crawler.name,
foreign_id=foreign_id,
source_url=source_url,
title=data.get("title"),
author=data.get("author"),
publisher=data.get("publisher"),
file_name=data.get("file_name"),
retrieved_at=data.get("retrieved_at"),
modified_at=data.get("modified_at"),
published_at=data.get("published_at"),
headers=data.get("headers", {}),
keywords=data.get("keywords", []),
parent=parent,
languages=languages,
countries=countries,
mime_type=mime_type,
)

return meta


def aleph_emit(context, data):
def aleph_emit(context: Context, data: dict):
aleph_emit_document(context, data)


def aleph_emit_document(context, data):
def aleph_emit_document(context: Context, data: dict):
api = get_api(context)
if api is None:
return
collection_id = get_collection_id(context, api)
content_hash = data.get("content_hash")
source_url = data.get("source_url", data.get("url"))
foreign_id = data.get("foreign_id", data.get("request_id", source_url))
# Fetch document id from cache
document = context.get_tag(make_key(collection_id, foreign_id, content_hash))
if isinstance(document, dict):

collection_id: str = get_collection_id(context, api)
content_hash: Optional[str] = data.get("content_hash")
source_url: str = data.get("source_url", data.get("url"))
foreign_id: str = data.get("foreign_id", data.get("request_id", source_url))
document_id: Optional[str] = context.get_tag(
make_key(collection_id, foreign_id, content_hash)
)

if document_id:
context.log.info("Skip aleph upload: %s", foreign_id)
data["aleph_id"] = document["id"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document is undefined

data["aleph_document"] = document
Expand All @@ -70,10 +97,9 @@ def aleph_emit_document(context, data):
return

meta = clean_dict(_create_meta_object(context, data))
meta.update(_create_document_metadata(context, data))

label = meta.get("file_name", meta.get("source_url"))
context.log.info("Upload: %s", label)

with context.load_file(content_hash) as fh:
if fh is None:
return
Expand Down Expand Up @@ -102,7 +128,7 @@ def aleph_emit_document(context, data):
backoff(exc, try_number)


def aleph_folder(context, data):
def aleph_folder(context: Context, data: dict):
api = get_api(context)
if api is None:
return
Expand Down Expand Up @@ -136,19 +162,29 @@ def aleph_folder(context, data):
backoff(ae, try_number)


def aleph_emit_entity(context, data):
def aleph_emit_entity(context: Context, data: dict) -> None:
context.log.info("Emit to entity: {}".format(data.get("entity_id")))

api = get_api(context)
if api is None:
return
collection_id = get_collection_id(context, api)
entity_id = data.get("entity_id")
source_url = data.get("source_url", data.get("url"))
foreign_id = data.get("foreign_id", data.get("request_id", source_url))
collection_id: str = get_collection_id(context, api)
entity_id: Optional[str] = data.get("entity_id")
source_url: Optional[str] = data.get("source_url", data.get("url"))
foreign_id: Optional[str] = data.get(
"foreign_id", data.get("request_id", source_url)
)

# Fetch id from cache
if entity_id is None:
context.log.warn("No entity_id found. Skipping store")
context.emit(data=data, optional=True)
return

cached_key = context.get_tag(make_key(collection_id, foreign_id, entity_id))

if cached_key:
context.log.info("Skip entity creation: {}".format(foreign_id))
context.log.info("Entity exists. Skip creation: {}".format(cached_key))
data["aleph_id"] = cached_key
context.emit(data=data, optional=True)
return
Expand All @@ -158,7 +194,7 @@ def aleph_emit_entity(context, data):
rate_limit = get_rate_limit("aleph", limit=rate)
rate_limit.comply()
try:
res = api.write_entity(
res: dict[str, str] = api.write_entity(
collection_id,
{
"schema": data.get("schema"),
Expand All @@ -168,7 +204,7 @@ def aleph_emit_entity(context, data):
)

aleph_id = res.get("id")
context.log.info("Aleph entity ID: %s", aleph_id)
context.log.info("Entity created. entity_id is: %s", aleph_id)

# Save the entity id in cache for future use
context.set_tag(make_key(collection_id, foreign_id, entity_id), aleph_id)
Expand All @@ -184,19 +220,19 @@ def aleph_emit_entity(context, data):
backoff(exc, try_number)


def get_api(context):
def get_api(context: Context) -> Optional[AlephAPI]:
if not settings.HOST:
context.log.warning("No $ALEPHCLIENT_HOST, skipping upload...")
return None
if not settings.API_KEY:
context.log.warning("No $ALEPHCLIENT_API_KEY, skipping upload...")
return None

session_id = "memorious:%s" % context.crawler.name
session_id: str = "memorious:%s" % context.crawler.name
return AlephAPI(settings.HOST, settings.API_KEY, session_id=session_id)


def get_collection_id(context, api):
def get_collection_id(context: Context, api: AlephAPI) -> str:
if not hasattr(context.stage, "aleph_cid"):
foreign_id = context.get("collection", context.crawler.name)
config = {"label": context.crawler.description}
Expand Down
28 changes: 23 additions & 5 deletions memorious/operations/parse.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import logging

from typing import Optional
from banal import ensure_list
from urllib.parse import urljoin
from normality import collapse_spaces
from servicelayer.cache import make_key

from memorious.helpers.key import make_id
from memorious.helpers.rule import Rule
from memorious.helpers.dates import iso_date
from memorious.logic.context import Context


log = logging.getLogger(__name__)
Expand All @@ -18,7 +22,7 @@
]


def parse_html(context, data, result):
def parse_html(context: Context, data, result: dict) -> None:
context.log.info("Parse: %r", result.url)

for title in result.html.xpath(".//title/text()"):
Expand Down Expand Up @@ -69,7 +73,7 @@ def parse_html(context, data, result):
context.emit(rule="fetch", data=data)


def parse_for_metadata(context, data, html):
def parse_for_metadata(context: Context, data, html) -> dict:
meta = context.params.get("meta", {})
meta_date = context.params.get("meta_date", {})

Expand All @@ -89,22 +93,36 @@ def parse_for_metadata(context, data, html):
if value is not None:
data[key] = value
break

return data


def parse_ftm(context, data, html):
def get_entity_id_from_keys(keys: Optional[list], properties: dict, html) -> str:
if isinstance(keys, list):
if keys is not None and len(keys) > 0:
return make_id(*keys)


def parse_ftm(context: Context, data, html):
properties = context.params.get("properties")
properties_dict = {}

for key, value in properties.items():
properties_dict[key] = html.xpath(value)

data["schema"] = context.params.get("schema")
data["properties"] = properties_dict

generated_entity_id = get_entity_id_from_keys(
context.params.get("keys"), properties_dict, html
)

def parse(context, data):
with context.http.rehash(data) as result:
if generated_entity_id:
data["entity_id"] = generated_entity_id


def parse(context: Context, data):
with context.http.rehash(data) as result:
if result.html is not None:
# Get extra metadata from the DOM
parse_for_metadata(context, data, result.html)
Expand Down
Loading