From 3e216eb0a8aa6a28bdbc1594c9051eb252ce4074 Mon Sep 17 00:00:00 2001 From: root Date: Sat, 10 Aug 2024 21:01:36 +0000 Subject: [PATCH] Initial commit of the five flows --- hncomments.py | 196 +++++++++++++++++++++++++++++++++++++++++++++++++ hncrawl.py | 104 ++++++++++++++++++++++++++ hninit.py | 38 ++++++++++ hnposts.py | 124 +++++++++++++++++++++++++++++++ hnsentiment.py | 122 ++++++++++++++++++++++++++++++ 5 files changed, 584 insertions(+) create mode 100644 hncomments.py create mode 100644 hncrawl.py create mode 100644 hninit.py create mode 100644 hnposts.py create mode 100644 hnsentiment.py diff --git a/hncomments.py b/hncomments.py new file mode 100644 index 0000000..6e50e68 --- /dev/null +++ b/hncomments.py @@ -0,0 +1,196 @@ +from metaflow import ( + FlowSpec, + IncludeFile, + step, + conda, + project, + Flow, + Parameter, + current, + retry, + card, + S3, + resources, +) +from metaflow.cards import Markdown, ProgressBar +import math, tarfile, shutil +import tempfile, os, time, io +import html + +# Flow 4 +# Produce comments threads for HN posts of interest +# +# First, head to Google Bigquery to export comment data +# as parquet files: +# +# EXPORT DATA +# OPTIONS ( +# uri = 'gs://your-gs-bucket/hnexport/*.parquet', +# format = 'parquet') +# AS +# SELECT * FROM `bigquery-public-data.hacker_news.full +# WHERE type='comment' and timestamp > timestamp '2020-01-01' +# ORDER BY timestamp ASC; +# +# Download the parquet files to a local directory 'hn-comments' +# and run locally +# +# python hncomments.py --environment=conda run +# +# The flow will upload the parquet files to S3 when run for +# the first time. You can interrupt the run after the start +# step unless you have a beefy workstation. Tag the run with +# +# python hncomments.py --environment=conda tag add commentdata --run-id [YOUR_RUN] +# +# to use the uploaded data for future runs. Next, to analyze +# data we need more resources, so run in the cloud: +# +# python hncomments.py --environment=conda run --with kubernetes +# +# If you are happy with the results, tag the result with +# +# python hncomments.py --environment=conda tag add commentresults --run-id [YOUR_RUN] +# +# After this, open hnsentiment.py + + +@project(name="hn_sentiment") +class HNSentimentCommentData(FlowSpec): + + local_parquets = Parameter("local-parquets", default="hn-comments") + local_mode = Parameter("local-mode", default=False, is_flag=True) + num_shards = Parameter("num-shards", default=50) + + @step + def start(self): + if not self.local_mode: + self.comment_parquets = self.ensure_parquets() + posts = Flow("HNSentimentInit").latest_successful_run.data.posts + self.posts = {id for id, _, _, _ in posts} + self.next(self.construct_comments) + + def ensure_parquets(self): + try: + run = list(Flow("HNSentimentCommentData").runs("commentdata"))[0] + ret = run["start"].task["comment_parquets"].data + print(f"Using existing parquets from run {run.id}") + return ret + except: + print(f"Uploading parquets from {self.local_parquets}") + with S3(run=self) as s3: + fnames = [ + (f"hn-comments/{f}", os.path.join(self.local_parquets, f)) + for f in os.listdir(self.local_parquets) + ] + return [url for _, url in s3.put_files(fnames)] + + def ensure_local(self): + if self.local_mode: + return self.local_parquets + else: + root = "hn-comments" + with S3() as s3: + s3objs = s3.get_many(self.comment_parquets) + os.makedirs(root) + for obj in s3objs: + os.rename( + obj.path, + os.path.join(root, os.path.basename(obj.path) + ".parquet"), + ) + return root + + @conda(packages={"duckdb": "1.0.0"}) + @resources(disk=10000, cpu=8, memory=32000) + @card(type="blank") + @retry + @step + def construct_comments(self): + self.parquet_root = self.ensure_local() + print(f"{len(os.listdir(self.parquet_root))} parquet files loaded") + post_comments = self.construct() + self.post_comments_meta = {} + current.card.append(Markdown("Packaging tarballs")) + current.card.refresh() + shards = [] + files = [] + for i in range(self.num_shards): + tarname = f"comments-{i}.tar.gz" + shards.append(tarfile.open(tarname, mode="w:gz")) + files.append((f"comments/{tarname}", tarname)) + for post_id, comments in post_comments.items(): + s = post_id % self.num_shards + self.post_comments_meta[post_id] = (len(comments), files[s]) + data = html.unescape("\n".join(comments)).encode("utf-8") + buf = io.BytesIO(data) + tarinfo = tarfile.TarInfo(name=f"comments/{post_id}") + tarinfo.size = len(data) + buf.seek(0) + shards[s].addfile(tarinfo, buf) + for shard in shards: + shard.close() + with S3(run=self) as s3: + self.tarballs = [url for _, url in s3.put_files(files)] + print(f"uploaded {len(self.tarballs)} tarballs") + self.next(self.end) + + def construct(self): + BS = 1000 + import duckdb # pylint: disable=import-error + + num_rows = ( + duckdb.query(f"select count(*) from '{self.parquet_root}/*.parquet'") + .execute() + .fetchone() + ) + status = Markdown(f"# Starting to process: {num_rows} comments in the DB") + progress = ProgressBar(max=num_rows, label="Comments processed") + current.card.append(status) + current.card.append(progress) + current.card.refresh() + + # BigQuery Hacker News table has an annoying format: Comments don't have + # a direct reference to their post, just to their immediate parent which + # may be a post or another comment. Hence we must link deep comments back + # to their parent post recursively. + # + # We do this by going through all comments starting with the oldest + # comments. As we encounter comments, we create a mapping on the fly that + # map comments to the original post through their parents. This is + # based on the assumption that parent comments always precede child comments + # in time-ordered traversal. + # + # The end result is a flattened list of comments per post, returned in + # post_comments. + + res = duckdb.query( + f"select text, by, id, parent from '{self.parquet_root}/*.parquet' where" + "(dead is null or dead=false) and " + "(deleted is null or deleted=false) " + "order by timestamp asc" + ).execute() + post_comments = {id: [] for id in self.posts} + mapping = {id: id for id in self.posts} + mapped = 0 + rows = res.fetchmany(BS) + row_count = 0 + while rows: + for text, by, id, parent in rows: + if parent in mapping: + post_comments[mapping[parent]].append(f"<{by}> {text}") + mapping[id] = mapping[parent] + mapped += 1 + row_count += len(rows) + status.update(f"## Mapped {mapped} comments") + progress.update(row_count) + current.card.refresh() + rows = res.fetchmany(BS) + return post_comments + + @step + def end(self): + pass + + +if __name__ == "__main__": + HNSentimentCommentData() diff --git a/hncrawl.py b/hncrawl.py new file mode 100644 index 0000000..97437d4 --- /dev/null +++ b/hncrawl.py @@ -0,0 +1,104 @@ +from metaflow import ( + FlowSpec, + IncludeFile, + step, + conda, + project, + Flow, + Parameter, + current, + retry, + card, + S3, + resources, +) +from metaflow.cards import Markdown, ProgressBar +import math, tarfile +import tempfile, os + +# Flow 2 +# Download HN posts of interest +# +# then run or deploy as +# python hncrawl.py run --with kubernetes --max-workers 100 +# +# Note that you can have a high number for --max-workers as +# each task hits a different set of websites (no DDOS'ing). +# +# After the run succeeds, tag it with +# +# python hncrawl.py tag add --run-id [YOUR_RUN_ID] crawldata +# +# you can choose which crawl to use by moving the tag to +# a run you like +# +# After this, open hnposts.py + +def make_batches(items, n): + bs = math.ceil(len(items) / n) + return [items[i * bs : (i + 1) * bs] for i in range(n)] + + +@project(name="hn_sentiment") +class HNSentimentCrawl(FlowSpec): + + num_parallel = Parameter("num-parallel", default=50) + max_posts = Parameter("max-posts", default=-1) + + @step + def start(self): + maxp = None if self.max_posts == -1 else self.max_posts + posts = Flow("HNSentimentInit").latest_successful_run.data.posts[:maxp] + self.batches = make_batches(posts, self.num_parallel) + self.next(self.crawl, foreach="batches") + + @resources(disk=1000, cpu=2, memory=4000) + @card(type="blank") + @retry + @step + def crawl(self): + import requests + + self.dir = tempfile.mkdtemp("hncrawl") + ok = failed = 0 + status = Markdown("# Starting to download") + progress = ProgressBar(max=len(self.input), label="Urls processed") + current.card.append(status) + current.card.append(progress) + self.successful = set() + self.failed = set() + for i, (id, title, score, url) in enumerate(self.input): + try: + resp = requests.get(url, allow_redirects=True, timeout=10) + resp.raise_for_status() + with open(os.path.join(self.dir, str(id)), mode="wb") as f: + f.write(resp.content) + self.successful.add(id) + ok += 1 + except: + self.failed.add(id) + failed += 1 + if i == len(self.input) - 1 or not i % 20: + status.update(f"## Successful downloads {ok}, failed {failed}") + progress.update(i + 1) + current.card.refresh() + tarname = f"crawl-{self.index}.tar.gz" + with tarfile.open(tarname, mode="w:gz") as tar: + tar.add(self.dir, arcname="data") + print(f"file size {os.path.getsize(tarname) / 1024**2}MB") + with S3(run=self) as s3: + [(_, self.url)] = s3.put_files([(tarname, tarname)]) + print(f"uploaded {self.url}") + self.next(self.join) + + @step + def join(self, inputs): + self.next(self.end) + + @step + def end(self): + pass + + +if __name__ == "__main__": + HNSentimentCrawl() diff --git a/hninit.py b/hninit.py new file mode 100644 index 0000000..0b47a5f --- /dev/null +++ b/hninit.py @@ -0,0 +1,38 @@ +from metaflow import FlowSpec, IncludeFile, step, conda, project + +# download story.parquet from +# https://huggingface.co/datasets/julien040/hacker-news-posts +# +# then run or deploy as +# python hninit.py --package-suffixes .parquet --environment=conda run + +@project(name="hn_sentiment") +class HNSentimentInit(FlowSpec): + + @conda(packages={"duckdb": "1.0.0"}) + @step + def start(self): + import duckdb # pylint: disable=import-error + + self.posts = ( + duckdb.query( + """ + select id, title, score, url from 'story.parquet' + where score > 20 and + to_timestamp(time) > timestamp '2020-01-01' and + comments > 5 and + url is not null + """ + ) + .execute() + .fetchall() + ) + self.next(self.end) + + @step + def end(self): + pass + + +if __name__ == "__main__": + HNSentimentInit() diff --git a/hnposts.py b/hnposts.py new file mode 100644 index 0000000..d0b5e2a --- /dev/null +++ b/hnposts.py @@ -0,0 +1,124 @@ +from metaflow import ( + FlowSpec, + IncludeFile, + step, + conda, + project, + Flow, + S3, + Parameter, + current, + retry, + card, +) +from metaflow.cards import Markdown, ProgressBar +from metaflow import nim +import tarfile, os, tempfile, shutil + +# Flow 3 +# Produce tags for crawled HN posts using an LLM +# +# python hnposts.py --environment=conda run --with kubernetes --max-workers 5 +# +# Note two things: +# 1. --max-workers controls the concurrency sent to your LLM backend. Going +# higher than what the backend can handle is not useful. +# 2. Depending on the number of posts to analyze and your LLM backend, this +# flow is likely to take 5-10h to run. It makes sense to deploy to Argo +# Workflows to keep the run running reliably +# +# After this, open hncomments.py + +PROMPT = """Assign 10 tags that best describe the following article. Reply only the tags in the following format: +1. first tag +2. second tag +N. Nth tag""" + +MODEL = "meta/llama3-70b-instruct" + + +@nim(models=[MODEL]) +@project(name="hn_sentiment") +class HNSentimentAnalyzePosts(FlowSpec): + + prompt = Parameter("prompt", default=PROMPT) + num_input_tokens = Parameter("num-input-tokens", default=5000) + + @step + def start(self): + crawl_run = list(Flow("HNSentimentCrawl").runs("crawldata"))[0] + self.crawl_id = crawl_run.id + print(f"Using data from crawl {self.crawl_id}") + self.tarballs = [] + for task in crawl_run["crawl"]: + self.tarballs.append(task["url"].data) + self.next(self.analyze_posts, foreach="tarballs") + + @card(type="blank") + @retry + @conda(packages={"beautifulsoup4": "4.12.3"}) + @step + def analyze_posts(self): + from bs4 import BeautifulSoup # pylint: disable=import-error + + print("downloading data from", self.input) + root = tempfile.mkdtemp("hnpost") + with S3() as s3: + res = s3.get(self.input) + tarfile.open(res.path).extractall(path=root) + datapath = os.path.join(root, "data") + print("Data extracted ok") + status = Markdown("# Starting to analyze") + progress = ProgressBar(max=len(os.listdir(datapath)), label="Docs analyzed") + current.card.append(status) + current.card.append(progress) + ok = failed = 0 + self.post_tags = {} + for i, post_id in enumerate(os.listdir(datapath)): + with open(os.path.join(datapath, post_id)) as f: + try: + tags, num_tokens = self.analyze(f.read(), BeautifulSoup) + self.post_tags[post_id] = (tags, num_tokens) + ok += 1 + except Exception as ex: + failed += 1 + print(f"analyzing post {post_id} failed: ", ex) + status.update(f"## Successfully processed {ok} docs, failed {failed}") + progress.update(i + 1) + current.card.refresh() + shutil.rmtree(root) + self.next(self.join) + + def analyze(self, data, BeautifulSoup): + soup = BeautifulSoup(data, "html.parser") + tokens = soup.get_text().split()[: self.num_input_tokens] + doc = " ".join(tokens) + llm = current.nim.models[MODEL] + prompt = {"role": "user", "content": f"{self.prompt}\n---\n{doc}"} + chat_completion = llm(messages=[prompt], model=MODEL, n=1, max_tokens=400) + s = chat_completion["choices"][0]["message"]["content"] + tags = [] + for line in s.strip().splitlines(): + try: + if "." in line: + [_, tag] = line.split(".", 1) + tags.append(tag.strip()) + except: + print(f"Invalid response format: {s}") + break + return tags, len(tokens) + + @step + def join(self, inputs): + self.post_tags = {} + for inp in inputs: + self.post_tags.update(inp.post_tags) + self.next(self.end) + + @step + def end(self): + pass + + +if __name__ == "__main__": + HNSentimentAnalyzePosts() diff --git a/hnsentiment.py b/hnsentiment.py new file mode 100644 index 0000000..b4fda89 --- /dev/null +++ b/hnsentiment.py @@ -0,0 +1,122 @@ +from metaflow import ( + FlowSpec, + IncludeFile, + step, + conda, + project, + Flow, + S3, + Parameter, + current, + retry, + card, +) +from metaflow.cards import Markdown, ProgressBar +from metaflow import nim, namespace +import tarfile, os, tempfile, shutil, re + +# Flow 5 +# Analyze sentiment of post comments using an LLM +# +# python hnsentiment.py --environment=conda run --with kubernetes --max-workers 5 +# +# The same comments as for hnposts.py apply here: +# 1. --max-workers controls the concurrency sent to your LLM backend. Going +# higher than what the backend can handle is not useful. +# 2. Depending on the number of posts to analyze and your LLM backend, this +# flow is likely to take 5-10h to run. It makes sense to deploy to Argo +# Workflows to keep the run running reliably +# +# After this, you have the datasets ready and you can analyze them +# in a notebook! + +PROMPT = """In the scale between 0-10 where 0 is the most negative sentiment and 10 is the most positive sentiment, +rank the following discussion. Reply in this format: + +SENTIMENT X + +where X is the sentiment rating +""" + +MODEL = "meta/llama3-70b-instruct" + + +@nim(models=[MODEL]) +@project(name="hn_sentiment") +class HNSentimentAnalyzeComments(FlowSpec): + + prompt = Parameter("prompt", default=PROMPT) + num_input_tokens = Parameter("num-input-tokens", default=3000) + + @step + def start(self): + # namespace(None) + comments_run = list(Flow("HNSentimentCommentData").runs("commentresults"))[0] + self.comments_id = comments_run.id + print(f"Using comments from {self.comments_id}") + self.tarballs = comments_run["end"].task["tarballs"].data + self.next(self.analyze_comments, foreach="tarballs") + + @card(type="blank") + @retry + @step + def analyze_comments(self): + print("downloading data from", self.input) + root = tempfile.mkdtemp("hnpost") + with S3() as s3: + res = s3.get(self.input) + tarfile.open(res.path).extractall(path=root) + datapath = os.path.join(root, "comments") + print("Data extracted ok") + status = Markdown("# Starting to analyze") + progress = ProgressBar(max=len(os.listdir(datapath)), label="Docs analyzed") + current.card.append(status) + current.card.append(progress) + ok = failed = 0 + self.post_sentiment = {} + for i, post_id in enumerate(os.listdir(datapath)): + with open(os.path.join(datapath, post_id)) as f: + try: + sentiment, num_tokens = self.analyze(f.read()) + self.post_sentiment[post_id] = (sentiment, num_tokens) + ok += 1 + except Exception as ex: + failed += 1 + print(f"analyzing comments of post {post_id} failed: ", ex) + status.update(f"## Successfully processed {ok} posts, failed {failed}") + progress.update(i + 1) + current.card.refresh() + shutil.rmtree(root) + self.next(self.join) + + def analyze(self, data): + parser = re.compile("SENTIMENT (\d)") + tokens = data.split()[: self.num_input_tokens] + doc = " ".join(tokens) + llm = current.nim.models[MODEL] + prompt = {"role": "user", "content": f"{self.prompt}\n---\n{doc}"} + chat_completion = llm(messages=[prompt], model=MODEL, n=1, max_tokens=10) + s = chat_completion["choices"][0]["message"]["content"] + try: + [sentiment_str] = parser.findall(s) + sentiment = int(sentiment_str) + # print(f"PRMPT {prompt['content']}: {sentiment}") + except: + print(f"Invalid output: {s}") + return sentiment, len(tokens) + + @step + def join(self, inputs): + self.post_sentiment = {} + for inp in inputs: + self.post_sentiment.update(inp.post_sentiment) + print(f"Sentiment recorded for {len(self.post_sentiment)} posts") + self.next(self.end) + + @step + def end(self): + pass + + +if __name__ == "__main__": + HNSentimentAnalyzeComments()