DAG
airflow celery worker -q <WORKER_NAME> -D
t1 = BashOperator( task_id='echo_hello_world', bash_command='echo "Hello, World!"', queue=${WORKER_NAMEvi }, dag=dag, )
필수 패키지
Install Guide
wget <https://www.python.org/ftp/python/3.10.12/Python-3.10.12.tar.xz>
tar -xf Python-3.10.12.tar.xz
cd Python-3.10.12
# Compile
./configure
make altinstall
make install
python3 --version
# or
python --version
# 환경 설정
echo "alias python='python3.10' >> ~/.bashrc
# 새로운 환경변수 적용
source ~/.bashrc
# 설치 여부 확인
python --version
Airflow 설치
Airflow의 핵심 모듈은 아래와 같다.
*.py
파일에서 DAG을 파싱하여 DB에 저장# Then download Spark
wget <https://archive.apache.org/dist/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz>
tar xvf spark-2.2.1-bin-hadoop2.7.tgz
export SPARK_HOME=$HOME/spark-2.2.1-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin
# Start a Spark master
cd $SPARK_HOME
./sbin/start-master.sh
requests.get()
is not execute. / request hanging out problemSource code
import json
import pathlib
import airflow
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
dag=DAG(
dag_id="download_rocket_launches",
start_date=airflow.utils.dates.days_ago(14),
schedule_interval=None,
)
end_point="<http://ll.thespacedevs.com/2.0.0/launch/upcoming>"
download_path="/Users/black/dev/bigdata/workflow/data"
bash_command="curl -o {0} -L '{1}'".format(end_point+"/launches.json", end_point)
download_launches=BashOperator(
# bash_command=bash_command,
task_id="download_launches",
bash_command="curl -o /Users/black/dev/bigdata/workflow/data/launches.json -L '<http://ll.thespacedevs.com/2.0.0/launch/upcoming>'",
dag=dag
)
def _get_pictures():
pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)
with open(download_path+"/launches.json") as f:
launches=json.load(f)
image_urls=[launch["image"] for launch in launches["results"]]
print(f"image_urls => {image_urls}")
for image_url in image_urls:
try:
print(f"image_url => {image_url}")
response=requests.get(image_url)
print(f"response => {response.ok}")
image_filename=image_url.split("/")[-1]
print(f"image_filename => {image_filename}")
#target_file=f"{download_path}/images/{image_filename}"
target_file=f"/tmp/images/{image_filename}"
print(f"target_file => {target_file}")
if image_filename == "spaceshiptwo_image_20210522140909.jpeg":
with open(target_file, "wb") as fa:
fa.write(response.content)
print(f"Download {image_url} to {target_file}")
except requests_exceptions.MissingSchema:
print(f"{image_url} appears to be an invalid URL.")
except requests_exceptions.ConnectionError:
print(f"Could not connect to {image_url}")
except:
print(f"Error ...")
finally:
print("finally")
get_pictures=PythonOperator(
task_id="get_pictures",
python_callable=_get_pictures,
dag=dag
)
notify=BashOperator(
task_id="notify",
bash_command='echo "There are now $(ls {0} | wc -l) images."'.format(download_path+"/images"),
dag=dag
)
# if __name__ == '__main__':
# _get_pictures()
download_launches >> get_pictures >> notify
운영체제 : MacOS Apple M1
Airflow : v2.6.3
문제: requests 라이브러리를 이용한 HTTP통신을 하는 Taskr가 행에 걸리는 현상 발생
원인: only MacOS, Python 패키지와 Airflow 패키지 간의 충돌이 발생해서 생기는 문제로 확인
해결방안: 환경변수(NO_PROXY
) 추가를 통해서 bypass로 설정 변경
NO_PROXY
to bypass the
required host.export NO_PROXY="*"
airflow attributeerror : 'NoneType' object has no attribute 'Redis’