仕事やプライベートで調べたことのメモ書きなど(@札幌)

仕事やプライベートで調べたこと、興味ある事のメモ書きです。2016年4月から札幌で働いてます。※このブログは個人によるもので、団体を代表するものではありません。

Python (pandas) でタブ区切りテキストファイル(tsv)ファイルの読み書き

pandasのタブ区切りテキストファイルファイル(tsvファイル)の読み書き

以下の感じでできるようです。

import pandas as pd

df = pd.read_csv('aaa.tsv', sep='\t', header=None)
print(df)
df.to_csv('bbb.tsv', sep='\t')

読み込みはread_csv/reat_table

read_csvで、sep='\t'を指定するとOK。ヘッダーなければ、header=Noneを指定します。

pandas.pydata.org

read_tableはsepの指定を省略できるものですが、他のオプションは同じのようです。
次に説明する、書き込みでは、to_tableというメソッドはなさそうなので、基本はcsv処理と同じ、という感じで考えればよさそう。

書き込みはto_csv

to_csvで、sep='\t'を指定するとOK。

pandas.pydata.org

Python (pandas) で指定時間間隔でデータを集計する方法

Python (pandas)を使って指定時間間隔のログを集計する方法

やりたかったこと

タイムスタンプ(datetime)列があるcsvデータで、ある時間範囲にある行数をカウントしたい。(5分おき、15分おき、30分おき、1時間おき、など)
pandasのresampleを使うと簡単にできます♪

import pandas as pd
import numpy as np

csv_files = ['1_file.csv', '2_file.csv', '3_file.csv', '4_file.csv']

FILE_HEAD='file_'

for csv_file in csv_files:
	df = pd.read_csv(csv_file, parse_dates=True, index_col='datetime')

	df.resample('5T').count().rename(columns={'val': 'count'})['count'].to_csv(FILE_HEAD + csv_file[0] + '_05m.csv')
	df.resample('15T').count().rename(columns={'val': 'count'})['count'].to_csv(FILE_HEAD + csv_file[0] + '_15m.csv')
	df.resample('30T').count().rename(columns={'val': 'count'})['count'].to_csv(FILE_HEAD + csv_file[0] + '_30m.csv')
	df.resample('60T').count().rename(columns={'val': 'count'})['count'].to_csv(FILE_HEAD + csv_file[0] + '_60m.csv')

Azure FunctionsのTable Storageへのoutバインディングで上書きしようとしたけどできなかった件(Python)

Azure FunctionsからのTables StorageへのoutバインディングPythonでやるには

あまりサンプルが転がってないのですが、以下あたりを参考にすると実現できます。
github.com

Table Storageへのバインディングはデフォルトでは上書きできない

Table StorageはPartitionKeyとRowKeyでユニークになる必要がありますが、Table Storageへのバインディングはデフォルトでは上書きできません。実際、キーが重複するようなメッセージがでて書き込みできません。

一方で以下によるとETagというのを指定すると、上書きできそうです!
stackoverflow.com

でいろいろ試行錯誤して結局できませんでした。他、いろいろ調べると、C#以外の?ETagの使用にはバグがあるみたいです。
github.com

結局はoutバインディングは使わず・・・・。

検討しましたが、上書きするにはbindingだけでは解決できないようです。で、結局TableServiceクラスを使わざるを得ないという結論に至りました。
docs.microsoft.com
ETagが正常に動作するようになるまでの暫定として、削除のみをこのクラスで行い、書き込みはバインディングでさせるという方法もあるのですが、このクラスを使うのだったら insert_or_replace_entity メソッドで一発で上書きできるので、まずはoutバインディングを使わない方向でよさげですね。

Azure FunctionsでQueueTriggerパラメータとバインディングのパラメータを連携させる (Python)

課題設定

やりたかったことは以下のこと。

  • QueueTriggerパラメータで情報を受け取り
  • その情報に応じたTable Storageのレコードを読み取る

そもそもpythonでtable storageの入出力バインドする例がほとんど見当たらなかったので少し試行錯誤しました。以下が参考になりました。
github.com

以下の感じで行けることを確認

  • トリガーで渡されるメッセージがjsonだと、{xxxx}のように書いてやることでそのjsonメッセージを参照し、他のバインディング設定に渡す記述をできる

docs.microsoft.com

function.jsonファイルは以下の感じ

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "queuemsg",
      "type": "queueTrigger",
      "direction": "in",
      "queueName": "myqueue",
      "connection": "AzureStorageConnectionStringIn"
    },
    {
      "name": "tabledata",
      "type": "table",
      "tableName": "mytable",
      "partitionKey": "{partition_id}",
      "rowKey": "{row_key}",
      "connection": "AzureStorageConnectionStringIn",
      "direction": "in"
    }
  ]
}

__init__.pyは以下の感じ

import logging
import azure.functions as func

def main(queuemsg: func.QueueMessage, tabledata) -> None:
    input_msg = queuemsg.get_body().decode('utf-8')
    logging.info('Python queue trigger function processed a queue item: %s', input_msg)
    logging.info('tabledata: %s', tabledata)

入力キューに設定するメッセージ

{"partition_id": "mypartition", "row_key": 1}

みたいな感じでいけるはず。で、ただinput bindingできるレコードがない場合には実行ができないのですが、その例外ハンドルを関数側でできないのが気を付けるところ。

binding時にデータを絞り込むような記述をする

partitionKeyやrowKeyを指定するのではなくfilterを指定するようにすると、条件にあったレコードを取得することができる。(複数返ってくるものも扱える) 例えば以下のような記述になります。

function.jsonファイルは以下の感じ

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "queuemsg",
      "type": "queueTrigger",
      "direction": "in",
      "queueName": "myqueue",
      "connection": "AzureStorageConnectionStringIn"
    },
    {
      "name": "tabledata",
      "type": "table",
      "tableName": "mytable",
      "filter": "name eq '{name_filter}'",
      "connection": "AzureStorageConnectionStringIn",
      "direction": "in"
    }
  ]
}

入力キューに設定するメッセージ

{"name_filter": "test"}

re:Invent 2019に行ってきました

昨年の12月に縁あってAWSの年次カンファレンス re:Invent 2019 (@ラスベガス) に行ってきました!
ブログは会社のページにも書いたので。
www.insight-tec.com

会社のブログに書いてない所感としては超たくさんの人が集まっている巨大カンファレンスの割には、そこまでラスベガスがre:Invent一色になっていないのが、ラスベガスの町の大きさを実感しました。

そして会場が分散していて本当にたくさんたくさん歩くので、健脚でないとつらいかも。。

マラソンシーズン2019終了

札幌はほぼマラソンシーズン終了の季節です。私も先日の札幌マラソンをもって今シーズンは終了となりました。今後半年はスポーツクラブで減量!(できたらいいな)

昨年なら東京マラソン当たったのですが今年はやはりダメでした。。

来年こそは!

Azure Functions (Python)での並列実行について気にしておいた方がよさそうなこと (従量課金)

Azure Functions (Python, 従量課金)を実行していて、いまいち並列に実行してくれない、など感じたことはないでしょうか?私はありました。で簡単なテストコードで実行してみたりすると、直列に実行されているように見えたり、並列に実行されているように見えたり・・・。いまいち再現性もよくわからず、とりあえず放置してました。作ったFunctionsがそんなに頻繁に実行されるものでなければ放置しておいても問題ないのですが、ある程度の頻度で実行されるようなものになるとそうもいきません。と思い、最近調べてわかった(と思っている)ことをまとめておきます。

Azure Functions (Python, 従量課金)使用時に気を付けること

  • 1VMあたり1プロセスしか実行されない
  • VMは最大で200までスケールアウト(VM追加)される
  • VMのスケールアウト/スケールインの制御の手段がない
  • 関数は入力を処理して出力を生成する Python スクリプト内のステートレスなメソッドであること
  • キュートリガー使用時にはbatchSizeを1にした方がよさそう

1VMあたり(基本的に)1プロセスしか実行されない

コンカレンシー
既定では、Functions Python ランタイムで一度に処理できる関数の呼び出しは 1 つだけです。

既定ではPythonは1VMあたりに1関数のみの実行が行われます。複数の関数呼び出しが直列に処理されるように見えるときは、同じVMでの実行がスケジュールされていると思われます。たくさんのリクエストが存在し、VMが新たに起動できるような状態になると、並列に稼働することが可能となりますが、後述の通り、このVM生成を制御する術はありません。

VMは最大で200までスケールアウト(VM追加)される

スケーリングは以下の仕様となっており従量課金プランでは200VMまでスケールアウトされることになっています。

スケーリングの動作について
スケーリングはさまざまな要因によって異なる可能性があり、選択したトリガーと言語に基づいて異なる方法でスケールします。 スケーリング動作には、注意が必要な複雑な作業がいくつかあります。
1 つの関数アプリは、最大 200 インスタンスまでしかスケールアップできません。 1 つのインスタンスで一度に複数のメッセージや要求を処理できるので、同時実行の数に上限は設定されていません。
HTTP トリガーの場合、新しいインスタンスは、1 秒ごとに最大 1 回しか割り当てられません。
非 HTTP トリガーの場合、新しいインスタンスは、30 秒ごとに最大 1 回しか割り当てられません。

この記述には「 1 つのインスタンスで一度に複数のメッセージや要求を処理できる」とありますが、前述の通り、pythonでは1VMあたり1プロセスしか処理されない仕様になっているため、最大でも200並列しか実現できないということになります。なお、これは1関数アプリの制限(1関数アプリ内の複数の関数はこれらを共有)のようなので、なるべく並列に動かすには、作成する関数は、それぞれ別々の関数アプリを作成してデプロイした方がよさそうです。

同じ関数アプリにテスト コードと運用環境のコードを混在させない
Function App 内の関数はリソースを共有します。 たとえば、メモリは共有されます。 運用環境で Function App を使用している場合は、テストに関連する関数およびリソースを追加しないでください。 これが原因で、運用環境のコードの実行中に予期しないオーバーヘッドが発生する可能性があります。

VMは自動で追加されるが追加され具合を制御する手段がない

従量課金プランを使用する場合、Azure Functions ホストのインスタンスは、受信イベントの数に基づいて動的に追加および削除されます。 このサーバーレス プランではスケーリングが自動的に行われ、関数の実行中にのみコンピューティング リソースに対して料金が発生します。 従量課金プランでは、構成可能な期間が経過すると関数の実行はタイムアウトします。

実行時のスケーリング
Azure Functions は "スケール コントローラー" と呼ばれるコンポーネントを使用して、イベント レートを監視し、スケールアウトとスケールインのどちらを実行するかを決定します。 スケール コントローラーは、トリガーの種類ごとにヒューリスティックを使用します。 たとえば、Azure Queue Storage トリガーを使用した場合、拡大縮小はキューの長さや最も古いキュー メッセージの経過時間に基づいて実施されます。

自動でスケールしてくれるのは確かにお手軽なのですが、このスケールについて制御する手段がありません。Azure FunctionisではVMの実行ホストやツールなどを含め数多くがオープンソースで提供されているが一つの特徴でもあるのですが、このスケールコントローラーのソースコードは公開されておらず、どのくらいリクエストやキューがたまったらVMが追加されるのかの制御ができないようです。

github.com

従量課金プランの Functions ホストの各インスタンスは、1.5 GB のメモリと 1 個の CPU に制限されています。

VMのサイズはさほど大きくないので、処理実行時には注意が必要です。

関数は入力を処理して出力を生成する Python スクリプト内のステートレスなメソッドであること

以下に記載の通りです。別の投稿にも書きましたが、同じVM内で実行される際には、グローバル変数やクラス変数が共有されるため、それらの値を前提にしたり、Singletonな何かを作ったり使ったりすると予期せぬ挙動となる可能性があります。

プログラミング モデル
Azure Functions では、関数は入力を処理して出力を生成する Python スクリプト内のステートレスなメソッドであることが求められます。

キュートリガー使用時にはbatchSizeを1にした方がよさそう

batchSizeが規定(16)のままだと、普通に複数のキューを処理しようとしてしまうのですが、上述のように、1VMでは1 Functionsずつしか実行されないという制限があるため、処理を待たされることになってしまいます。

docs.microsoft.com

batchSizeを1にしておくと、1VMで複数を処理しようとはせずに、新たなVMを起動する方向に処理が働きます。が、前述の通り、httpTrigger以外でのVM生成速度は30秒に1つという仕様なので、急なキュー増加には対応できないという制限があります。これに対する効果的な対処は私は今のところはまだわかっていません・・・。キューストレージを使って、1000メッセージくらい追加してみましたが、平均処理速度は、25並列くらいでした。(実際に何VMまでスケールされたかはわからなかったのですが、確認する手段があるのかどうかも不明です・・・)

ちなにに、このqueueTriggerのオプションであるvisibilityTimeoutは、この投稿作成時点ではバグがあり、30秒などと設定しても、ハードコードされた10分で動作するようです。キューの取り出しに失敗すると10分待たされるという・・・・。ことが、制御できません。githubでは修正のpull requestが出ているようですが、何かの問題からか、取り込みが放置されています。。

github.com