どん底から這い上がるまでの記録

どん底から這い上がりたいけど這い上がれない人がいろいろ書くブログ(主にプログラミング)

pythonのAPSchedulerによるプログラム、関数の定期実行

 

APSchedulerを使ってみたのでそのメモ。

pythonのAPSchedulerを使うとプログラムを指定した日時に実行したり、一時間おきに実行することが可能になります。

本記事ではAPSchedulerの使い方についてまとめてみます。

 

 

関連サイト

APSchedulerを使ってみる

APSchedulerはpipでインストールできます。

pip install apscheduler

APSchedulerの使い方の例。

  1. BlockingSchedulerのインスタンスを生成。
  2. ジョブの設定。
  3. ジョブを始める。

1. BlockingSchedulerのインスタンスを生成。

ライブラリをインポートし、BlockingSchedulerのインスタンスを生成しています。

from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()

2. ジョブの設定。

プログラムを指定した日時に実行するにはcron、指定した時間おきに繰り返し実行するにはintervalをジョブに設定することで可能になります。

ジョブの設定には次の2通りの方法があります。

  1. デコレータを使う。
  2. add_job()を使う。

2-1. デコレータを使う。

デコレータを使うと下のようになります。

@scheduler.scheduled_job('interval', seconds=3)
def job_0():
    print('3秒毎に実行')

生成したインスタンスscheduled_job()を使ってジョブの設定をしています。

この例では3秒おきにjob_0()が実行されます。

2-2. add_job()を使う。

add_job()を使うと下のようになります。

def job_1():
    print("20時です。")

scheduler.add_job(job_1, 'cron', hour="20")

この例では20時になるとjob_1()が実行されます。

3. ジョブを始める。

start()を使うとジョブが始まります。

scheduler.start()

これまで書いたコードをまとめると下のようになります。

from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()
@scheduler.scheduled_job('interval', seconds=3)
def job_0():
    print('3秒毎に実行')

def job_1():
    print("20時です。")

scheduler.add_job(job_1, 'cron', hour="20")
scheduler.start()

ここまでAPSchedulerの基本的な使い方について書いてみました。

scheduled_job()add_job()を使うだけで簡単にプログラムを指定した日時に実行したり、指定した時間を繰り返し実行することができるようになります。

サンプルプログラム

APSchedulerの使い方についてサンプルプログラムを通して、もう少し紹介してみます。

このサンプルプログラムではdatetimeを追加しget_time()で時間を確認できるようにしてみました。全てadd_job()でジョブの設定をしていますが、デコレータを使ったscheduled_job()でも同じようにジョブの設定ができます。

これから紹介するコードは全てコメントアウトのところに書いてあるものとします。

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime

def get_time():
    return str(datetime.now()).split(".")[0]


scheduler = BlockingScheduler()

"""
ここのコメントアウトをサンプルコードに置き換えてー。
"""

try:
    scheduler.start()
except KeyboardInterrupt:
    pass

サンプル1:1分おきに実行

def sample_1():
    print(get_time(), '|', '1分おきに実行')
scheduler.add_job(sample_1, 'interval', minutes=1)

サンプル2:1時間1秒おきに実行

def sample_2():
    print(get_time(), '|', '1時間1秒おきに実行')
scheduler.add_job(sample_2, 'interval', hours=1, seconds=1)

サンプル3:火曜日の19時15分に実行

def sample_3():
    print(get_time(), '|', '火曜の19時15分に実行')
scheduler.add_job(sample_3, 'cron', day_of_week='tue', hour=19, minute=15)

サンプル4:1日おきに実行

def sample_4():
    print(get_time(), '|', '1日おきに実行')
scheduler.add_job(sample_4, 'interval', days=1)

サンプル5:1週間おきに実行

def sample_5():
    print(get_time(), '|', '1週間おきに実行')
scheduler.add_job(sample_5, 'interval', weeks=1)

サンプル6:2019/03/31 17:55から2019/03/31 18:00の間、1分おきに実行

def sample_6():
    print(get_time(), '|', '2019/03/31 17:55から2019/03/31 18:00の間1分おきに実行')
scheduler.add_job(
    sample_6, 'interval', minutes=1,
    start_date="2019-03-31 17:55:00",
    end_date="2019-03-31 18:00:00"
)

サンプル7:2019/03/20 17:00から2019/03/31 20:00の間、毎日19時に実行

def sample_7():
    print(get_time(), '|', '2019/03/20 17:00から2019/03/31 18:00の間、毎日19時に実行')
scheduler.add_job(
    sample_7, 'cron', hour=19,
    start_date="2019-03-20 17:00:00",
    end_date="2019-03-31 20:00:00"
)

サンプル8:2019/03/28から31の間、毎日19時10分に実行

def sample_8():
    print(get_time(), '|', '2019/03/28から31の間、毎日19時10分に実行')
scheduler.add_job(
    sample_8, 'cron', hour=19, minute=10,
    year=2019, month=3, day='28-31'
)

サンプル9:第13週の毎日19時16分に実行

def sample_9():
    print(get_time(), '|', '第13週、毎日19時16分に実行')
scheduler.add_job(sample_9, 'cron', hour=19, minute=16, week=13)

サンプル10:毎時20分に実行

def sample_10():
    print(get_time(), '|', '毎時20分になると実行')
scheduler.add_job(sample_10, 'cron', minute=20)

サンプル11:土日の間、?時?分3秒になると実行

def sample_11():
    print(get_time(), '|', '土日の間、?時?分3秒になると実行')
scheduler.add_job(sample_11, 'cron', second=3, day_of_week='sat, sun')

サンプル12:月曜から金曜の間、21時になると実行

def sample_12():
    print(get_time(), '|', '月曜から金曜の間、21時になると実行')
scheduler.add_job(sample_12, 'cron', hour=21, day_of_week='mon-fri')

サンプル13:19時40分に指定したプログラムの自動実行

os.system()でコマンドを実行できる。

import os
def sample_13():
    os.system("python sample.py")
scheduler.add_job(sample_13, 'cron', hour=19, minute=40)