diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..59d5265 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +build +*.egg-info diff --git a/ThirdPartyNotices.txt b/ThirdPartyNotices.txt new file mode 100644 index 0000000..fc4dd9e --- /dev/null +++ b/ThirdPartyNotices.txt @@ -0,0 +1,32 @@ +THIRD-PARTY SOFTWARE NOTICES AND INFORMATION +Do Not Translate or Localize + +This project incorporates components from the projects listed below. + +1. langchain (https://github.com/langchain-ai/langchain) + +langchain NOTICES AND INFORMATION BEGIN HERE +========================================= +The MIT License + +Copyright (c) Harrison Chase + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +========================================= +END OF langchain NOTICES AND INFORMATION \ No newline at end of file diff --git a/notebooks/demo.ipynb b/notebooks/demo.ipynb new file mode 100644 index 0000000..e1f5484 --- /dev/null +++ b/notebooks/demo.ipynb @@ -0,0 +1,147 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "! pip install ../\n", + "! pip install matplotlib" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import learn_to_pick" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class fake_llm_caller:\n", + " def predict(self, message):\n", + " return \"5\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pick = learn_to_pick.PickBest.create(llm=fake_llm_caller(), metrics_step=5, metrics_window_size=5)\n", + "random_pick = learn_to_pick.PickBest.create(\n", + " llm=fake_llm_caller(),\n", + " metrics_step=5,\n", + " metrics_window_size=5, # rolling window average\n", + " policy=learn_to_pick.PickBestRandomPolicy # set the random policy instead of default\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# four meals defined, some vegetarian some not\n", + "\n", + "meals = [\n", + " \"Beef Enchiladas with Feta cheese. Mexican-Greek fusion\",\n", + " \"Chicken Flatbreads with red sauce. Italian-Mexican fusion\",\n", + " \"Veggie sweet potato quesadillas with vegan cheese\",\n", + " \"One-Pan Tortelonni bake with peppers and onions\",\n", + "]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# TODO function hook to call LLM between predict and learn?\n", + "# TODO how to pass extra args to scorer? How to get LLMs response back to user and in scorer?\n", + "# TODO slates from the get go?\n", + "for _ in range(100):\n", + " r = pick.run(meal = learn_to_pick.ToSelectFrom(meals),\n", + " user = learn_to_pick.BasedOn(\"Anna\"),\n", + " preference = learn_to_pick.BasedOn([\"Meat eater\", \"loves beef\"]),\n", + " text_to_personalize = \"This is the weeks specialty dish, our master chefs \\\n", + " believe you will love it!\",)\n", + "\n", + " r = pick.run(meal = learn_to_pick.ToSelectFrom(meals),\n", + " user = learn_to_pick.BasedOn(\"Tom\"),\n", + " preference = learn_to_pick.BasedOn([\"Vegetarian\", \"regular dairy is ok\"]),\n", + " text_to_personalize = \"This is the weeks specialty dish, our master chefs \\\n", + " believe you will love it!\",)\n", + " \n", + " r = random_pick.run(meal = learn_to_pick.ToSelectFrom(meals),\n", + " user = learn_to_pick.BasedOn(\"Anna\"),\n", + " preference = learn_to_pick.BasedOn([\"Meat eater\", \"loves beef\"]),\n", + " text_to_personalize = \"This is the weeks specialty dish, our master chefs \\\n", + " believe you will love it!\",)\n", + "\n", + " r = random_pick.run(meal = learn_to_pick.ToSelectFrom(meals),\n", + " user = learn_to_pick.BasedOn(\"Tom\"),\n", + " preference = learn_to_pick.BasedOn([\"Vegetarian\", \"regular dairy is ok\"]),\n", + " text_to_personalize = \"This is the weeks specialty dish, our master chefs \\\n", + " believe you will love it!\",)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from matplotlib import pyplot as plt\n", + "pick.metrics.to_pandas()['score'].plot(label=\"vw learning policy\")\n", + "random_pick.metrics.to_pandas()['score'].plot(label=\"random learning policy\")\n", + "plt.legend()\n", + "\n", + "print(f\"VW, calculated over a rolling window, is: {pick.metrics.to_pandas()['score'].iloc[-1]}\")\n", + "print(f\"Random, calculated over a rolling window, is: {random_pick.metrics.to_pandas()['score'].iloc[-1]}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(r[\"picked_metadata\"].selected.score)\n", + "print(r[\"picked\"])\n", + "print(r[\"picked_metadata\"].to_select_from)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.5" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..4111815 --- /dev/null +++ b/setup.py @@ -0,0 +1,21 @@ +from setuptools import setup, find_packages + +setup( + name="learn_to_pick", + version="0.1", + install_requires=[ + 'numpy', + 'pandas', + 'vowpal-wabbit-next', + 'sentence-transformers', + 'torch', + 'pyskiplist', + 'parameterfree', + ], + author="VowpalWabbit", + description="", + packages=find_packages(where="src"), + package_dir={"": "src"}, + url="https://github.com/VowpalWabbit/learn_to_pick", + python_requires='>=3.8', +) diff --git a/src/learn_to_pick/__init__.py b/src/learn_to_pick/__init__.py new file mode 100644 index 0000000..75e5c6c --- /dev/null +++ b/src/learn_to_pick/__init__.py @@ -0,0 +1,54 @@ +import logging + +from learn_to_pick.base import ( + AutoSelectionScorer, + BasedOn, + Embed, + Embedder, + Policy, + SelectionScorer, + ToSelectFrom, + VwPolicy, + embed, + stringify_embedding, +) +from learn_to_pick.pick_best import ( + PickBest, + PickBestEvent, + PickBestFeatureEmbedder, + PickBestRandomPolicy, + PickBestSelected, +) + + +def configure_logger() -> None: + logger = logging.getLogger(__name__) + logger.setLevel(logging.INFO) + ch = logging.StreamHandler() + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + ch.setFormatter(formatter) + ch.setLevel(logging.INFO) + logger.addHandler(ch) + + +configure_logger() + +__all__ = [ + "PickBest", + "PickBestEvent", + "PickBestSelected", + "PickBestFeatureEmbedder", + "PickBestRandomPolicy", + "Embed", + "BasedOn", + "ToSelectFrom", + "SelectionScorer", + "AutoSelectionScorer", + "Embedder", + "Policy", + "VwPolicy", + "embed", + "stringify_embedding", +] diff --git a/src/learn_to_pick/base.py b/src/learn_to_pick/base.py new file mode 100644 index 0000000..cac4e2a --- /dev/null +++ b/src/learn_to_pick/base.py @@ -0,0 +1,571 @@ +from __future__ import annotations + +import logging +import os +from abc import ABC, abstractmethod +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Generic, + List, + Optional, + Tuple, + Type, + TypeVar, + Union, +) + +from learn_to_pick.metrics import ( + MetricsTrackerAverage, + MetricsTrackerRollingWindow, +) +from learn_to_pick.model_repository import ModelRepository +from learn_to_pick.vw_logger import VwLogger + +if TYPE_CHECKING: + import vowpal_wabbit_next as vw + +logger = logging.getLogger(__name__) + + +class _BasedOn: + def __init__(self, value: Any): + self.value = value + + def __str__(self) -> str: + return str(self.value) + + __repr__ = __str__ + + +def BasedOn(anything: Any) -> _BasedOn: + return _BasedOn(anything) + + +class _ToSelectFrom: + def __init__(self, value: Any): + self.value = value + + def __str__(self) -> str: + return str(self.value) + + __repr__ = __str__ + + +def ToSelectFrom(anything: Any) -> _ToSelectFrom: + if not isinstance(anything, list): + raise ValueError("ToSelectFrom must be a list to select from") + return _ToSelectFrom(anything) + + +class _Embed: + def __init__(self, value: Any, keep: bool = False): + self.value = value + self.keep = keep + + def __str__(self) -> str: + return str(self.value) + + __repr__ = __str__ + + +def Embed(anything: Any, keep: bool = False) -> Any: + if isinstance(anything, _ToSelectFrom): + return ToSelectFrom(Embed(anything.value, keep=keep)) + elif isinstance(anything, _BasedOn): + return BasedOn(Embed(anything.value, keep=keep)) + if isinstance(anything, list): + return [Embed(v, keep=keep) for v in anything] + elif isinstance(anything, dict): + return {k: Embed(v, keep=keep) for k, v in anything.items()} + elif isinstance(anything, _Embed): + return anything + return _Embed(anything, keep=keep) + + +def EmbedAndKeep(anything: Any) -> Any: + return Embed(anything, keep=True) + + +# helper functions + + +def stringify_embedding(embedding: List) -> str: + return " ".join([f"{i}:{e}" for i, e in enumerate(embedding)]) + + +def parse_lines(parser: "vw.TextFormatParser", input_str: str) -> List["vw.Example"]: + return [parser.parse_line(line) for line in input_str.split("\n")] + + +def get_based_on_and_to_select_from(inputs: Dict[str, Any]) -> Tuple[Dict, Dict]: + to_select_from = { + k: inputs[k].value + for k in inputs.keys() + if isinstance(inputs[k], _ToSelectFrom) + } + + if not to_select_from: + raise ValueError( + "No variables using 'ToSelectFrom' found in the inputs. Please include at least one variable containing a list to select from." + ) + + based_on = { + k: inputs[k].value if isinstance(inputs[k].value, list) else [inputs[k].value] + for k in inputs.keys() + if isinstance(inputs[k], _BasedOn) + } + + return based_on, to_select_from + + +def prepare_inputs_for_autoembed(inputs: Dict[str, Any]) -> Dict[str, Any]: + """ + go over all the inputs and if something is either wrapped in _ToSelectFrom or _BasedOn, and if their inner values are not already _Embed, + then wrap them in EmbedAndKeep while retaining their _ToSelectFrom or _BasedOn status + """ + + next_inputs = inputs.copy() + for k, v in next_inputs.items(): + if isinstance(v, _ToSelectFrom) or isinstance(v, _BasedOn): + if not isinstance(v.value, _Embed): + next_inputs[k].value = EmbedAndKeep(v.value) + return next_inputs + + +# end helper functions + + +class Selected(ABC): + pass + + +TSelected = TypeVar("TSelected", bound=Selected) + + +class Event(Generic[TSelected], ABC): + inputs: Dict[str, Any] + selected: Optional[TSelected] + + def __init__(self, inputs: Dict[str, Any], selected: Optional[TSelected] = None): + self.inputs = inputs + self.selected = selected + + +TEvent = TypeVar("TEvent", bound=Event) + + +class Policy(Generic[TEvent], ABC): + def __init__(self, **kwargs: Any): + pass + + @abstractmethod + def predict(self, event: TEvent) -> Any: + ... + + @abstractmethod + def learn(self, event: TEvent) -> None: + ... + + @abstractmethod + def log(self, event: TEvent) -> None: + ... + + def save(self) -> None: + pass + + +class VwPolicy(Policy): + def __init__( + self, + model_repo: ModelRepository, + vw_cmd: List[str], + feature_embedder: Embedder, + vw_logger: VwLogger, + *args: Any, + **kwargs: Any, + ): + super().__init__(*args, **kwargs) + self.model_repo = model_repo + self.workspace = self.model_repo.load(vw_cmd) + self.feature_embedder = feature_embedder + self.vw_logger = vw_logger + + def predict(self, event: TEvent) -> Any: + import vowpal_wabbit_next as vw + + text_parser = vw.TextFormatParser(self.workspace) + return self.workspace.predict_one( + parse_lines(text_parser, self.feature_embedder.format(event)) + ) + + def learn(self, event: TEvent) -> None: + import vowpal_wabbit_next as vw + + vw_ex = self.feature_embedder.format(event) + text_parser = vw.TextFormatParser(self.workspace) + multi_ex = parse_lines(text_parser, vw_ex) + self.workspace.learn_one(multi_ex) + + def log(self, event: TEvent) -> None: + if self.vw_logger.logging_enabled(): + vw_ex = self.feature_embedder.format(event) + self.vw_logger.log(vw_ex) + + def save(self) -> None: + self.model_repo.save(self.workspace) + + +class Embedder(Generic[TEvent], ABC): + def __init__(self, *args: Any, **kwargs: Any): + pass + + @abstractmethod + def format(self, event: TEvent) -> str: + ... + + +class SelectionScorer(Generic[TEvent], ABC): + """Abstract method to grade the chosen selection or the response of the llm""" + + @abstractmethod + def score_response( + self, inputs: Dict[str, Any], event: TEvent + ) -> float: + ... + + +class AutoSelectionScorer(SelectionScorer[Event]): + def __init__(self, + llm, + prompt: Union[str, None] = None, + scoring_criteria_template_str: Optional[str] = None): + self.llm = llm + self.prompt = prompt + if prompt is None and scoring_criteria_template_str is None: + self.prompt = AutoSelectionScorer.get_default_prompt() + elif prompt is None and scoring_criteria_template_str is not None: + default_system_prompt = AutoSelectionScorer.get_default_system_prompt() + self.prompt = default_system_prompt + scoring_criteria_template_str + + @staticmethod + def get_default_system_prompt() -> str: + return """ + PLEASE RESPOND ONLY WITH A SINGLE FLOAT AND NO OTHER TEXT EXPLANATION\n You are a strict judge that is called on to rank a response based on given criteria. You must respond with your ranking by providing a single float within the range [0, 1], 0 being very bad response and 1 being very good response. + """ + + @staticmethod + def get_default_prompt() -> str: + human_template = """Given this based_on "{rl_chain_selected_based_on}" \ + as the most important attribute, rank how good or bad this text is: \ + "{rl_chain_selected}".""" + default_system_prompt = AutoSelectionScorer.get_default_system_prompt() + return default_system_prompt + human_template + + def score_response( + self, inputs: Dict[str, Any], event: Event + ) -> float: + p = self.prompt.format(**inputs) + # pp = {"content": p, "role": "system"} + ranking = self.llm.predict(p) + ranking = ranking.strip() + try: + resp = float(ranking) + return resp + except Exception as e: + raise RuntimeError( + f"The auto selection scorer did not manage to score the response, there is always the option to try again or tweak the reward prompt. Error: {e}" + ) + + +class RLLoop(Generic[TEvent]): + """ + The `RLLoop` class leverages a learned Policy for reinforcement learning. + + Attributes: + - selection_scorer (Union[SelectionScorer, None]): Scorer for the selection. Can be set to None. + - policy (Optional[Policy]): The policy used by the chain to learn to populate a dynamic prompt. + - auto_embed (bool): Determines if embedding should be automatic. Default is False. + - metrics (Optional[Union[MetricsTrackerRollingWindow, MetricsTrackerAverage]]): Tracker for metrics, can be set to None. + + Initialization Attributes: + - feature_embedder (Embedder): Embedder used for the `BasedOn` and `ToSelectFrom` inputs. + - model_save_dir (str, optional): Directory for saving the VW model. Default is the current directory. + - reset_model (bool): If set to True, the model starts training from scratch. Default is False. + - vw_cmd (List[str], optional): Command line arguments for the VW model. + - policy (Type[VwPolicy]): Policy used by the chain. + - vw_logs (Optional[Union[str, os.PathLike]]): Path for the VW logs. + - metrics_step (int): Step for the metrics tracker. Default is -1. If set without metrics_window_size, average metrics will be tracked, otherwise rolling window metrics will be tracked. + - metrics_window_size (int): Window size for the metrics tracker. Default is -1. If set, rolling window metrics will be tracked. + + Notes: + By default the class initializes the VW model using the provided arguments. If `selection_scorer` is not provided, a warning is logged, indicating that no reinforcement learning will occur unless the `update_with_delayed_score` method is called. + """ + + class _NoOpPolicy(Policy): + """Placeholder policy that does nothing""" + + def predict(self, event: TEvent) -> Any: + return None + + def learn(self, event: TEvent) -> None: + pass + + def log(self, event: TEvent) -> None: + pass + + # Define the default values as class attributes + selected_input_key = "rl_chain_selected" + selected_based_on_input_key = "rl_chain_selected_based_on" + + def __init__( + self, + feature_embedder: Embedder, + selection_scorer: Union[SelectionScorer, None] = None, + model_save_dir: str = "./", + reset_model: bool = False, + vw_cmd: Optional[List[str]] = None, + policy: Type[Policy] = VwPolicy, + active_policy: Optional[Policy] = _NoOpPolicy(), + vw_logs: Optional[Union[str, os.PathLike]] = None, + auto_embed: bool = False, + selection_scorer_activated: bool = True, + metrics_step: int = -1, + metrics_window_size: int = -1, + ): + self.selection_scorer = selection_scorer + self.feature_embedder = feature_embedder + self.model_save_dir = model_save_dir + self.reset_model = reset_model + self.vw_cmd = vw_cmd + self.policy = policy + self.active_policy = active_policy + self.vw_logs = vw_logs + self.auto_embed = auto_embed + self.selection_scorer_activated = selection_scorer_activated + self.metrics_step = metrics_step + self.metrics_window_size = metrics_window_size + + if self.selection_scorer is None: + logger.warning( + "No selection scorer provided, which means that no \ + reinforcement learning will be done in the RL chain \ + unless update_with_delayed_score is called." + ) + + if isinstance(self.active_policy, RLLoop._NoOpPolicy): + self.active_policy = policy( + model_repo=ModelRepository( + model_save_dir, with_history=True, reset=reset_model + ), + vw_cmd=vw_cmd or [], + feature_embedder=feature_embedder, + vw_logger=VwLogger(vw_logs), + ) + + if metrics_window_size > 0: + self.metrics = MetricsTrackerRollingWindow( + step=metrics_step, window_size=metrics_window_size + ) + else: + self.metrics = MetricsTrackerAverage(step=metrics_step) + + def update_with_delayed_score( + self, score: float, chain_response: Dict[str, Any], force_score: bool = False + ) -> None: + """ + Updates the learned policy with the score provided. + Will raise an error if selection_scorer is set, and force_score=True was not provided during the method call + """ + if self._can_use_selection_scorer() and not force_score: + raise RuntimeError( + "The selection scorer is set, and force_score was not set to True. Please set force_score=True to use this function." + ) + if self.metrics: + self.metrics.on_feedback(score) + event: TEvent = chain_response["selection_metadata"] + self._call_after_scoring_before_learning(event=event, score=score) + self.active_policy.learn(event=event) + self.active_policy.log(event=event) + + def deactivate_selection_scorer(self) -> None: + """ + Deactivates the selection scorer, meaning that the chain will no longer attempt to use the selection scorer to score responses. + """ + self.selection_scorer_activated = False + + def activate_selection_scorer(self) -> None: + """ + Activates the selection scorer, meaning that the chain will attempt to use the selection scorer to score responses. + """ + self.selection_scorer_activated = True + + def save_progress(self) -> None: + """ + This function should be called to save the state of the learned policy model. + """ + self.active_policy.save() + + def _validate_inputs(self, inputs: Dict[str, Any]) -> None: + super()._validate_inputs(inputs) + if ( + self.selected_input_key in inputs.keys() + or self.selected_based_on_input_key in inputs.keys() + ): + raise ValueError( + f"The rl chain does not accept '{self.selected_input_key}' or '{self.selected_based_on_input_key}' as input keys, they are reserved for internal use during auto reward." + ) + + def _can_use_selection_scorer(self) -> bool: + """ + Returns whether the chain can use the selection scorer to score responses or not. + """ + return self.selection_scorer is not None and self.selection_scorer_activated + + @abstractmethod + def _call_before_predict(self, inputs: Dict[str, Any]) -> TEvent: + ... + + @abstractmethod + def _call_after_predict_before_scoring( + self, + inputs: Dict[str, Any], + event: Event, + prediction: List[Tuple[int, float]], + ) -> Tuple[Dict[str, Any], Event]: + ... + + def _call_after_scoring_before_learning( + self, event: Event, score: Optional[float] + ) -> Event: + ... + + def run(self, *args, **kwargs) -> Dict[str, Any]: + if args and not kwargs: + inputs = args[0] + elif kwargs and not args: + inputs = kwargs + else: + raise ValueError("Either a dictionary positional argument or keyword arguments should be provided") + + event: TEvent = self._call_before_predict(inputs=inputs) + prediction = self.active_policy.predict(event=event) + if self.metrics: + self.metrics.on_decision() + + next_chain_inputs, event = self._call_after_predict_before_scoring( + inputs=inputs, event=event, prediction=prediction + ) + + score = None + try: + if self._can_use_selection_scorer(): + score = self.selection_scorer.score_response( + inputs=next_chain_inputs, event=event + ) + except Exception as e: + logger.info( + f"The selection scorer was not able to score, \ + and the chain was not able to adjust to this response, error: {e}" + ) + if self.metrics and score is not None: + self.metrics.on_feedback(score) + + event = self._call_after_scoring_before_learning(score=score, event=event) + self.active_policy.learn(event=event) + self.active_policy.log(event=event) + + picked = [] + for k, v in event.to_select_from.items(): + picked.append({k: v[event.selected.index]}) + + return {"picked": picked, "picked_metadata": event} + +def is_stringtype_instance(item: Any) -> bool: + """Helper function to check if an item is a string.""" + return isinstance(item, str) or ( + isinstance(item, _Embed) and isinstance(item.value, str) + ) + + +def embed_string_type( + item: Union[str, _Embed], model: Any, namespace: Optional[str] = None +) -> Dict[str, Union[str, List[str]]]: + """Helper function to embed a string or an _Embed object.""" + keep_str = "" + if isinstance(item, _Embed): + encoded = stringify_embedding(model.encode(item.value)) + if item.keep: + keep_str = item.value.replace(" ", "_") + " " + elif isinstance(item, str): + encoded = item.replace(" ", "_") + else: + raise ValueError(f"Unsupported type {type(item)} for embedding") + + if namespace is None: + raise ValueError( + "The default namespace must be provided when embedding a string or _Embed object." + ) + + return {namespace: keep_str + encoded} + + +def embed_dict_type(item: Dict, model: Any) -> Dict[str, Any]: + """Helper function to embed a dictionary item.""" + inner_dict: Dict = {} + for ns, embed_item in item.items(): + if isinstance(embed_item, list): + inner_dict[ns] = [] + for embed_list_item in embed_item: + embedded = embed_string_type(embed_list_item, model, ns) + inner_dict[ns].append(embedded[ns]) + else: + inner_dict.update(embed_string_type(embed_item, model, ns)) + return inner_dict + + +def embed_list_type( + item: list, model: Any, namespace: Optional[str] = None +) -> List[Dict[str, Union[str, List[str]]]]: + ret_list: List = [] + for embed_item in item: + if isinstance(embed_item, dict): + ret_list.append(embed_dict_type(embed_item, model)) + elif isinstance(embed_item, list): + item_embedding = embed_list_type(embed_item, model, namespace) + # Get the first key from the first dictionary + first_key = next(iter(item_embedding[0])) + # Group the values under that key + grouping = {first_key: [item[first_key] for item in item_embedding]} + ret_list.append(grouping) + else: + ret_list.append(embed_string_type(embed_item, model, namespace)) + return ret_list + + +def embed( + to_embed: Union[Union[str, _Embed], Dict, List[Union[str, _Embed]], List[Dict]], + model: Any, + namespace: Optional[str] = None, +) -> List[Dict[str, Union[str, List[str]]]]: + """ + Embeds the actions or context using the SentenceTransformer model (or a model that has an `encode` function) + + Attributes: + to_embed: (Union[Union(str, _Embed(str)), Dict, List[Union(str, _Embed(str))], List[Dict]], required) The text to be embedded, either a string, a list of strings or a dictionary or a list of dictionaries. + namespace: (str, optional) The default namespace to use when dictionary or list of dictionaries not provided. + model: (Any, required) The model to use for embedding + Returns: + List[Dict[str, str]]: A list of dictionaries where each dictionary has the namespace as the key and the embedded string as the value + """ + if (isinstance(to_embed, _Embed) and isinstance(to_embed.value, str)) or isinstance( + to_embed, str + ): + return [embed_string_type(to_embed, model, namespace)] + elif isinstance(to_embed, dict): + return [embed_dict_type(to_embed, model)] + elif isinstance(to_embed, list): + return embed_list_type(to_embed, model, namespace) + else: + raise ValueError("Invalid input format for embedding") diff --git a/src/learn_to_pick/metrics.py b/src/learn_to_pick/metrics.py new file mode 100644 index 0000000..4bd65da --- /dev/null +++ b/src/learn_to_pick/metrics.py @@ -0,0 +1,66 @@ +from collections import deque +from typing import TYPE_CHECKING, Dict, List, Union + +if TYPE_CHECKING: + import pandas as pd + + +class MetricsTrackerAverage: + def __init__(self, step: int): + self.history: List[Dict[str, Union[int, float]]] = [{"step": 0, "score": 0}] + self.step: int = step + self.i: int = 0 + self.num: float = 0 + self.denom: float = 0 + + @property + def score(self) -> float: + return self.num / self.denom if self.denom > 0 else 0 + + def on_decision(self) -> None: + self.denom += 1 + + def on_feedback(self, score: float) -> None: + self.num += score or 0 + self.i += 1 + if self.step > 0 and self.i % self.step == 0: + self.history.append({"step": self.i, "score": self.score}) + + def to_pandas(self) -> "pd.DataFrame": + import pandas as pd + + return pd.DataFrame(self.history) + + +class MetricsTrackerRollingWindow: + def __init__(self, window_size: int, step: int): + self.history: List[Dict[str, Union[int, float]]] = [{"step": 0, "score": 0}] + self.step: int = step + self.i: int = 0 + self.window_size: int = window_size + self.queue: deque = deque() + self.sum: float = 0.0 + + @property + def score(self) -> float: + return self.sum / len(self.queue) if len(self.queue) > 0 else 0 + + def on_decision(self) -> None: + pass + + def on_feedback(self, value: float) -> None: + self.sum += value + self.queue.append(value) + self.i += 1 + + if len(self.queue) > self.window_size: + old_val = self.queue.popleft() + self.sum -= old_val + + if self.step > 0 and self.i % self.step == 0: + self.history.append({"step": self.i, "score": self.sum / len(self.queue)}) + + def to_pandas(self) -> "pd.DataFrame": + import pandas as pd + + return pd.DataFrame(self.history) diff --git a/src/learn_to_pick/model_repository.py b/src/learn_to_pick/model_repository.py new file mode 100644 index 0000000..efe96cc --- /dev/null +++ b/src/learn_to_pick/model_repository.py @@ -0,0 +1,63 @@ +import datetime +import glob +import logging +import os +import shutil +from pathlib import Path +from typing import TYPE_CHECKING, List, Union + +if TYPE_CHECKING: + import vowpal_wabbit_next as vw + +logger = logging.getLogger(__name__) + + +class ModelRepository: + def __init__( + self, + folder: Union[str, os.PathLike], + with_history: bool = True, + reset: bool = False, + ): + self.folder = Path(folder) + self.model_path = self.folder / "latest.vw" + self.with_history = with_history + if reset and self.has_history(): + logger.warning( + "There is non empty history which is recommended to be cleaned up" + ) + if self.model_path.exists(): + os.remove(self.model_path) + + self.folder.mkdir(parents=True, exist_ok=True) + + def get_tag(self) -> str: + return datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + + def has_history(self) -> bool: + return len(glob.glob(str(self.folder / "model-????????-??????.vw"))) > 0 + + def save(self, workspace: "vw.Workspace") -> None: + with open(self.model_path, "wb") as f: + logger.info(f"storing rl_chain model in: {self.model_path}") + f.write(workspace.serialize()) + if self.with_history: # write history + shutil.copyfile(self.model_path, self.folder / f"model-{self.get_tag()}.vw") + + def load(self, commandline: List[str]) -> "vw.Workspace": + try: + import vowpal_wabbit_next as vw + except ImportError as e: + raise ImportError( + "Unable to import vowpal_wabbit_next, please install with " + "`pip install vowpal_wabbit_next`." + ) from e + + model_data = None + if self.model_path.exists(): + with open(self.model_path, "rb") as f: + model_data = f.read() + if model_data: + logger.info(f"rl_chain model is loaded from: {self.model_path}") + return vw.Workspace(commandline, model_data=model_data) + return vw.Workspace(commandline) diff --git a/src/learn_to_pick/pick_best.py b/src/learn_to_pick/pick_best.py new file mode 100644 index 0000000..fbcd276 --- /dev/null +++ b/src/learn_to_pick/pick_best.py @@ -0,0 +1,391 @@ +from __future__ import annotations + +import logging +from typing import Any, Dict, List, Optional, Tuple, Type, Union + +from learn_to_pick import base + +logger = logging.getLogger(__name__) + +# sentinel object used to distinguish between +# user didn't supply anything or user explicitly supplied None +SENTINEL = object() + + +class PickBestSelected(base.Selected): + index: Optional[int] + probability: Optional[float] + score: Optional[float] + + def __init__( + self, + index: Optional[int] = None, + probability: Optional[float] = None, + score: Optional[float] = None, + ): + self.index = index + self.probability = probability + self.score = score + + +class PickBestEvent(base.Event[PickBestSelected]): + def __init__( + self, + inputs: Dict[str, Any], + to_select_from: Dict[str, Any], + based_on: Dict[str, Any], + selected: Optional[PickBestSelected] = None, + ): + super().__init__(inputs=inputs, selected=selected) + self.to_select_from = to_select_from + self.based_on = based_on + + +class PickBestFeatureEmbedder(base.Embedder[PickBestEvent]): + """ + Text Embedder class that embeds the `BasedOn` and `ToSelectFrom` inputs into a format that can be used by the learning policy + + Attributes: + model name (Any, optional): The type of embeddings to be used for feature representation. Defaults to BERT SentenceTransformer. + """ + + def __init__( + self, auto_embed: bool, model: Optional[Any] = None, *args: Any, **kwargs: Any + ): + super().__init__(*args, **kwargs) + + if model is None: + from sentence_transformers import SentenceTransformer + + model = SentenceTransformer("all-mpnet-base-v2") + + self.model = model + self.auto_embed = auto_embed + + @staticmethod + def _str(embedding: List[float]) -> str: + return " ".join([f"{i}:{e}" for i, e in enumerate(embedding)]) + + def get_label(self, event: PickBestEvent) -> tuple: + cost = None + if event.selected: + chosen_action = event.selected.index + cost = ( + -1.0 * event.selected.score + if event.selected.score is not None + else None + ) + prob = event.selected.probability + return chosen_action, cost, prob + else: + return None, None, None + + def get_context_and_action_embeddings(self, event: PickBestEvent) -> tuple: + context_emb = base.embed(event.based_on, self.model) if event.based_on else None + to_select_from_var_name, to_select_from = next( + iter(event.to_select_from.items()), (None, None) + ) + + action_embs = ( + ( + base.embed(to_select_from, self.model, to_select_from_var_name) + if event.to_select_from + else None + ) + if to_select_from + else None + ) + + if not context_emb or not action_embs: + raise ValueError( + "Context and to_select_from must be provided in the inputs dictionary" + ) + return context_emb, action_embs + + def get_indexed_dot_product(self, context_emb: List, action_embs: List) -> Dict: + import numpy as np + + unique_contexts = set() + for context_item in context_emb: + for ns, ee in context_item.items(): + if isinstance(ee, list): + for ea in ee: + unique_contexts.add(f"{ns}={ea}") + else: + unique_contexts.add(f"{ns}={ee}") + + encoded_contexts = self.model.encode(list(unique_contexts)) + context_embeddings = dict(zip(unique_contexts, encoded_contexts)) + + unique_actions = set() + for action in action_embs: + for ns, e in action.items(): + if isinstance(e, list): + for ea in e: + unique_actions.add(f"{ns}={ea}") + else: + unique_actions.add(f"{ns}={e}") + + encoded_actions = self.model.encode(list(unique_actions)) + action_embeddings = dict(zip(unique_actions, encoded_actions)) + + action_matrix = np.stack([v for k, v in action_embeddings.items()]) + context_matrix = np.stack([v for k, v in context_embeddings.items()]) + dot_product_matrix = np.dot(context_matrix, action_matrix.T) + + indexed_dot_product: Dict = {} + + for i, context_key in enumerate(context_embeddings.keys()): + indexed_dot_product[context_key] = {} + for j, action_key in enumerate(action_embeddings.keys()): + indexed_dot_product[context_key][action_key] = dot_product_matrix[i, j] + + return indexed_dot_product + + def format_auto_embed_on(self, event: PickBestEvent) -> str: + chosen_action, cost, prob = self.get_label(event) + context_emb, action_embs = self.get_context_and_action_embeddings(event) + indexed_dot_product = self.get_indexed_dot_product(context_emb, action_embs) + + action_lines = [] + for i, action in enumerate(action_embs): + line_parts = [] + dot_prods = [] + if cost is not None and chosen_action == i: + line_parts.append(f"{chosen_action}:{cost}:{prob}") + for ns, action in action.items(): + line_parts.append(f"|{ns}") + elements = action if isinstance(action, list) else [action] + nsa = [] + for elem in elements: + line_parts.append(f"{elem}") + ns_a = f"{ns}={elem}" + nsa.append(ns_a) + for k, v in indexed_dot_product.items(): + dot_prods.append(v[ns_a]) + nsa_str = " ".join(nsa) + line_parts.append(f"|# {nsa_str}") + + line_parts.append(f"|dotprod {self._str(dot_prods)}") + action_lines.append(" ".join(line_parts)) + + shared = [] + for item in context_emb: + for ns, context in item.items(): + shared.append(f"|{ns}") + elements = context if isinstance(context, list) else [context] + nsc = [] + for elem in elements: + shared.append(f"{elem}") + nsc.append(f"{ns}={elem}") + nsc_str = " ".join(nsc) + shared.append(f"|@ {nsc_str}") + + return "shared " + " ".join(shared) + "\n" + "\n".join(action_lines) + + def format_auto_embed_off(self, event: PickBestEvent) -> str: + """ + Converts the `BasedOn` and `ToSelectFrom` into a format that can be used by VW + """ + chosen_action, cost, prob = self.get_label(event) + context_emb, action_embs = self.get_context_and_action_embeddings(event) + + example_string = "" + example_string += "shared " + for context_item in context_emb: + for ns, based_on in context_item.items(): + e = " ".join(based_on) if isinstance(based_on, list) else based_on + example_string += f"|{ns} {e} " + example_string += "\n" + + for i, action in enumerate(action_embs): + if cost is not None and chosen_action == i: + example_string += f"{chosen_action}:{cost}:{prob} " + for ns, action_embedding in action.items(): + e = ( + " ".join(action_embedding) + if isinstance(action_embedding, list) + else action_embedding + ) + example_string += f"|{ns} {e} " + example_string += "\n" + # Strip the last newline + return example_string[:-1] + + def format(self, event: PickBestEvent) -> str: + if self.auto_embed: + return self.format_auto_embed_on(event) + else: + return self.format_auto_embed_off(event) + + +class PickBestRandomPolicy(base.Policy[PickBestEvent]): + def __init__(self, feature_embedder: base.Embedder, **kwargs: Any): + self.feature_embedder = feature_embedder + + def predict(self, event: PickBestEvent) -> List[Tuple[int, float]]: + num_items = len(event.to_select_from) + return [(i, 1.0 / num_items) for i in range(num_items)] + + def learn(self, event: PickBestEvent) -> None: + pass + + def log(self, event: PickBestEvent) -> None: + pass + + +class PickBest(base.RLLoop[PickBestEvent]): + """ + `PickBest` is a class designed to leverage a learned Policy for reinforcement learning with a context. + + Each invocation of the `run()` method should be equipped with a set of potential actions (`ToSelectFrom`) and will result in the selection of a specific action based on the `BasedOn` input. + + The standard operation flow of this run() call includes a loop: + 1. The loop is invoked with inputs containing the `BasedOn` criteria and a list of potential actions (`ToSelectFrom`). + 2. An action is selected based on the `BasedOn` input. + 3. If a `selection_scorer` is provided, it is used to score the selection. + 4. The internal Policy is updated with the `BasedOn` input, the chosen `ToSelectFrom` action, and the resulting score from the scorer. + 5. The final pick is returned. + + Expected input dictionary format: + - At least one variable encapsulated within `BasedOn` to serve as the selection criteria. + - A single list variable within `ToSelectFrom`, representing potential actions for the learned Policy to pick from. This list can take the form of: + - A list of strings, e.g., `action = ToSelectFrom(["action1", "action2", "action3"])` + - A list of list of strings e.g. `action = ToSelectFrom([["action1", "another identifier of action1"], ["action2", "another identifier of action2"]])` + - A list of dictionaries, where each dictionary represents an action with namespace names as keys and corresponding action strings as values. For instance, `action = ToSelectFrom([{"namespace1": ["action1", "another identifier of action1"], "namespace2": "action2"}, {"namespace1": "action3", "namespace2": "action4"}])`. + + Extends: + RLLoop + + Attributes: + feature_embedder (PickBestFeatureEmbedder, optional): Is an advanced attribute. Responsible for embedding the `BasedOn` and `ToSelectFrom` inputs. If omitted, a default embedder is utilized. + """ + + def __init__( + self, + *args: Any, + **kwargs: Any, + ): + auto_embed = kwargs.get("auto_embed", False) + + feature_embedder = kwargs.get("feature_embedder", None) + if feature_embedder: + if "auto_embed" in kwargs: + logger.warning( + "auto_embed will take no effect when explicit feature_embedder is provided" + ) + # turning auto_embed off for cli setting below + auto_embed = False + else: + feature_embedder = PickBestFeatureEmbedder(auto_embed=auto_embed) + kwargs["feature_embedder"] = feature_embedder + + vw_cmd = kwargs.get("vw_cmd", []) + if vw_cmd: + if "--cb_explore_adf" not in vw_cmd: + raise ValueError( + "If vw_cmd is specified, it must include --cb_explore_adf" + ) + else: + interactions = ["--interactions=::"] + if auto_embed: + interactions = [ + "--interactions=@#", + "--ignore_linear=@", + "--ignore_linear=#", + ] + vw_cmd = interactions + [ + "--cb_explore_adf", + "--coin", + "--squarecb", + "--quiet", + ] + + kwargs["vw_cmd"] = vw_cmd + + super().__init__(*args, **kwargs) + + def _call_before_predict(self, inputs: Dict[str, Any]) -> PickBestEvent: + context, actions = base.get_based_on_and_to_select_from(inputs=inputs) + if not actions: + raise ValueError( + "No variables using 'ToSelectFrom' found in the inputs. Please include at least one variable containing a list to select from." + ) + + if len(list(actions.values())) > 1: + raise ValueError( + "Only one variable using 'ToSelectFrom' can be provided in the inputs for PickBest run() call. Please provide only one variable containing a list to select from." + ) + + if not context: + raise ValueError( + "No variables using 'BasedOn' found in the inputs. Please include at least one variable containing information to base the selected of ToSelectFrom on." + ) + + event = PickBestEvent(inputs=inputs, to_select_from=actions, based_on=context) + return event + + def _call_after_predict_before_scoring( + self, + inputs: Dict[str, Any], + event: PickBestEvent, + prediction: List[Tuple[int, float]], + ) -> Tuple[Dict[str, Any], PickBestEvent]: + import numpy as np + + prob_sum = sum(prob for _, prob in prediction) + probabilities = [prob / prob_sum for _, prob in prediction] + ## sample from the pmf + sampled_index = np.random.choice(len(prediction), p=probabilities) + sampled_ap = prediction[sampled_index] + sampled_action = sampled_ap[0] + sampled_prob = sampled_ap[1] + selected = PickBestSelected(index=sampled_action, probability=sampled_prob) + event.selected = selected + + # only one key, value pair in event.to_select_from + key, value = next(iter(event.to_select_from.items())) + next_inputs = inputs.copy() + next_inputs.update({key: value[event.selected.index]}) + + # only one key, value pair in event.to_select_from + value = next(iter(event.to_select_from.values())) + v = ( + value[event.selected.index] + if event.selected + else event.to_select_from.values() + ) + next_inputs.update( + { + self.selected_based_on_input_key: str(event.based_on), + self.selected_input_key: v, + } + ) + return next_inputs, event + + def _call_after_scoring_before_learning( + self, event: PickBestEvent, score: Optional[float] + ) -> PickBestEvent: + if event.selected: + event.selected.score = score + return event + + def run(self, *args, **kwargs) -> Dict[str, Any]: + return super().run(*args, **kwargs) + + @classmethod + def create( + cls: Type[PickBest], + llm = None, + selection_scorer: Union[base.AutoSelectionScorer, object] = SENTINEL, + **kwargs: Any, + ) -> PickBest: + if selection_scorer is SENTINEL and llm is None: + raise ValueError("Either llm or selection_scorer must be provided") + elif selection_scorer is SENTINEL: + selection_scorer = base.AutoSelectionScorer(llm=llm) + + return PickBest( + selection_scorer=selection_scorer, + **kwargs, + ) diff --git a/src/learn_to_pick/vw_logger.py b/src/learn_to_pick/vw_logger.py new file mode 100644 index 0000000..e8d2e15 --- /dev/null +++ b/src/learn_to_pick/vw_logger.py @@ -0,0 +1,18 @@ +from os import PathLike +from pathlib import Path +from typing import Optional, Union + + +class VwLogger: + def __init__(self, path: Optional[Union[str, PathLike]]): + self.path = Path(path) if path else None + if self.path: + self.path.parent.mkdir(parents=True, exist_ok=True) + + def log(self, vw_ex: str) -> None: + if self.path: + with open(self.path, "a") as f: + f.write(f"{vw_ex}\n\n") + + def logging_enabled(self) -> bool: + return bool(self.path)