Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Issue with HttpOperator Data Handling in DAGFactory #370

Open
1 task
a-chumagin opened this issue Jan 22, 2025 · 8 comments
Open
1 task

[Bug] Issue with HttpOperator Data Handling in DAGFactory #370

a-chumagin opened this issue Jan 22, 2025 · 8 comments
Labels
bug Something isn't working triage-needed

Comments

@a-chumagin
Copy link

a-chumagin commented Jan 22, 2025

DAG Factory version

0.22.0

airflow version

2.10.2

Python version

3.12

Deployment

Docker-Compose

Deployment details

No response

What happened?

I am experiencing an issue when using dag-factory to configure HttpOperator tasks. The request body is incorrectly serialized, and the content type is being treated as form data instead of JSON. This issue occurs only when the task is created via DAGFactory, but works fine when using raw HttpOperator code.

Relevant log output

How to reproduce

  1. Create a DAG using DAGFactory with an HttpOperator task that sends JSON data.
  2. Ensure the data is correctly formatted as a JSON string (e.g., data: '{"data_source": "postgres"}').
  3. When the task runs, check the request body sent to the HTTP endpoint.
  4. The body is sent as form-encoded data (e.g., data_source=postgres), not as JSON.

Expected Behavior:
The request body should be serialized as valid JSON and sent with the Content-Type: application/json header.

Actual Behavior:
The body is sent as form data (e.g., data_source=postgres), not as JSON. This causes the request to fail.

Example of YAML

basic_example_dag:
  default_args:
    owner: "@a.chumagin"
  description: "this is an example dag"
  schedule_interval: "0 3 * * *"
  tags: ['dqa']
  render_template_as_native_obj: True
  tasks:
    send_request:
      operator: airflow.providers.http.operators.http.HttpOperator
      http_conn_id: "dqp_host_dev"
      method: "POST"
      endpoint: "/run_test"
      data: '{"data_source": "postgres"}'
      headers:
        Content-Type: application/json
      log_response: True

The headers send correct . I use echo service for handle request

POST /run_test HTTP/1.1
Host: echo-local.com:8087
Accept: */*
Accept-Encoding: gzip, deflate
Content-Length: 20
Content-Type: application/json
User-Agent: Python/3.12 aiohttp/3.10.5
data_source=postgres

I tried different ways to present data

  1. As dictionary
   data: 
          data_source: "postgres"
  1. With """ and > wrappers. Same result

When I use raw HttpOperator without DagFactory the issues does not occur

HttpOperator(task_id='http_soda_check',
                                   http_conn_id='dqp_host_dev',
                                    method='POST',
                                    endpoint="/run_test",
                                    data="""
                                    {
                                    "data_source": "postgres"
                                   }
                                   """,
                                headers={"Content-Type": "application/json"},
                                   log_response=True ,
                                )

Anything else :)?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Contact Details

No response

@a-chumagin a-chumagin added bug Something isn't working triage-needed labels Jan 22, 2025
@a-chumagin
Copy link
Author

I used standard code for generate dag

import os
from pathlib import Path

# The following import is here so Airflow parses this file
# from airflow import DAG
import dagfactory

DEFAULT_CONFIG_ROOT_DIR = "/dev/a_chumagin"

CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))

config_file = str(CONFIG_ROOT_DIR / "example_dag_factory.yaml")

example_dag_factory = dagfactory.DagFactory(config_file)

# Creating task dependencies
example_dag_factory.clean_dags(globals())
example_dag_factory.generate_dags(globals())

@pankajkoti
Copy link
Contributor

Hi @a-chumagin,

Thank you for reporting this issue—it definitely seems valid.

While we work on prioritizing this issue, I wanted to check if you might be interested in contributing a fix. The fix would likely involve handling this specific case of serializing the data in a manner similar to this example.

If you’re open to contributing, you can set up your local development environment by following the contributing guide. Please don’t hesitate to reach out if you need any additional support!

@a-chumagin
Copy link
Author

a-chumagin commented Jan 24, 2025

Hi @pankajkoti
I'm totally interesting to contribute and I tried to fix this issue by patching with code

    if operator == "airflow.providers.http.operators.http.HttpOperator":
        headers = task_params.get("headers", {})
        if (
            "data" in task_params 
            and isinstance(task_params["data"], dict)
            and headers.get("Content-Type") == "application/json"
        ):
            task_params["data"] = json.dumps(task_params["data"])

lemme look at links.
thank you for your response!
Btw, I switched to PythonOperator and sending request works fine :)

@tatiana
Copy link
Collaborator

tatiana commented Jan 30, 2025

HI @a-chumagin, as mentioned by @pankajkoti , we'd love your contribution to fixing this behaviour. It could undoubtedly help other users. Did the patch work?

@a-chumagin
Copy link
Author

HI @a-chumagin, as mentioned by @pankajkoti , we'd love your contribution to fixing this behaviour. It could undoubtedly help other users. Did the patch work?

Hi @tatiana I'm going to start on this week.

@a-chumagin
Copy link
Author

@pankajkoti hello! do you have a Slack for communication? I faced a several issues with setup env and test and implementation. Need to chat with you. if you have a time.

@pankajkoti
Copy link
Contributor

pankajkoti commented Feb 5, 2025

hi @a-chumagin

did you already try taking the steps mentioned in the contributing docs https://astronomer.github.io/dag-factory/latest/contributing/howto/#set-up-a-local-development-environment

If you're still facing issues would be nice to know as we can then enhance the documentation. Are you already part of the "Apache Airflow Community" Slack? If not, could you sign up there https://apache-airflow-slack.herokuapp.com/ and then you can find me there on Slack "Pankaj Koti". Looking forward to hearing from you :)

@a-chumagin
Copy link
Author

@pankajkoti @tatiana JFYI: I was able to fix it. Not tested fully but smoke test passed
[2025-03-20, 14:30:06 UTC] {http.py:222} INFO - Request served by 00dcfd448acd POST /run_test HTTP/1.1 Host: host.docker.internal:8081 Accept: */* Accept-Encoding: gzip, deflate Connection: keep-alive Content-Length: 39 Content-Type: application/json User-Agent: python-requests/2.32.3 {"data": "fake_data", "format": "json"}

need more time to prepare PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage-needed
Projects
None yet
Development

No branches or pull requests

3 participants