皆様こんにちは、ネットワーク事業部の生方と申します。
前回の投稿に引き続きAWSに関するネタとなります。
今回はとあるユーザーから、AWSで稼働しているRDSのログを外部で運用しているログ監視サービスに取り込みたいとのご要望をいただきました。そこで作成したシステムについて簡単にまとめてご紹介したいと思います。
システムの概要
まず、RDSからCloudWatchLogsへログを出力するように設定します。これはRDSの基本設定で可能です。
ここで出力されたログを、1時間毎にまとめたテキストファイルとしてS3バケットへ出力するようにしますが、この抽出や整形はLambdaでコードを記述します。
条件として、コードを実行した直前の1時間のログを抽出するようにします。(例 5:05にコードを実行 → 4:00~4:59までのログを抽出します)
Lambdaのコードを実行するのはEventBridgeを使用し、cronで1時間毎に実行するようにします。
S3バケットへ出力されたテキストファイルを、外部にあるユーザーのログ監視サービスシステムが定期的に取得します。
システムの構築
RDSの設定
今回は検証のため、とりあえずPostgreSQLのRDSインスタンスを稼働させておきます。
RDSの設定から、「ログのエクスポート」で「PostgreSQLログ」を選択するとCloudWatchLogsへ出力されるようになります。
CloudWatchLogsの確認
CloudWatchLogsにRDSのログが出力されることを確認します。
この際に、ロググループとログストリームの名称を確認しておきます。
S3の設定
CloudWatchLogsから抽出したテキストファイルを保存するS3バケットを準備します。
今回はファイルを保存するだけなので、バケットタイプは「汎用」として基本的な設定のままで作成します。
IAMの設定
このあとLambda関数を作成しますが、CloudWatchとS3へのアクセスを許可するための権限を持つIAMロールを作成しておき、これをLambdaへ適用します。
ポリシー名で検索して、「CloudWatchLogsFullAccess」と「AmazonS3FullAccess」の2つを選択します。
Lambdaの設定
Lambda関数を新たに作成します。
今回はランタイムとしてPythonを使用しています。また実行ロールとして、先ほど作成したIAMロールを適用します。
「log_group」「log_stream」「s3_bucket_name」の個所は、環境に応じた名前に変更してください。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
import boto3 from datetime import datetime, timedelta # AWS クライアントの初期化 logs_client = boto3.client('logs') s3_client = boto3.client('s3') def lambda_handler(event, context): # 環境設定 log_group = '/aws/rds/instance/rds-cloudwatch-s3-test/postgresql' # CloudWatch Logs のロググループ log_stream = 'rds-cloudwatch-s3-test.0' # CloudWatch Logs のログストリーム s3_bucket_name = 'bucket-cloudwatch-s3-test' # S3 バケット名 # 現在のUTC時刻を取得し、1時間前の開始時刻と終了時刻を計算 utc_now = datetime.utcnow() one_hour_ago = utc_now - timedelta(hours=1) start_time = int(one_hour_ago.replace(minute=0, second=0, microsecond=0).timestamp() * 1000) # 1時間前の開始時刻 (ミリ秒) end_time = int(one_hour_ago.replace(minute=59, second=59, microsecond=999999).timestamp() * 1000) # 1時間前の終了時刻 (ミリ秒) # ログ取得 try: logs_data = [] next_token = None while True: if next_token: response = logs_client.filter_log_events( logGroupName=log_group, logStreamNames=[log_stream], startTime=start_time, endTime=end_time, nextToken=next_token ) else: response = logs_client.filter_log_events( logGroupName=log_group, logStreamNames=[log_stream], startTime=start_time, endTime=end_time ) # ログメッセージを追加 logs_data.extend(event['message'] for event in response.get('events', [])) # 次のトークンを取得 next_token = response.get('nextToken') # トークンがない場合は終了 if not next_token: break except Exception as e: return { 'statusCode': 500, 'body': f"CloudWatch Logs の取得に失敗しました: {str(e)}" } # 空のログでも出力するため、データが無い場合も空リストを保持 if not logs_data: logs_data = ["指定された時間帯にログはありません"] # S3 に保存するデータの準備(TXT形式) s3_file_name = f"logs_{one_hour_ago.strftime('%Y%m%d_%H')}.txt" # フォルダプレフィックスなし txt_data = "\n".join(logs_data).encode('utf-8') # 各ログメッセージを改行で区切る try: # S3 へアップロード s3_client.put_object( Bucket=s3_bucket_name, Key=s3_file_name, Body=txt_data ) return { 'statusCode': 200, 'body': f"ログデータが S3 バケット '{s3_bucket_name}' に正常にアップロードされました: {s3_file_name}" } except Exception as e: return { 'statusCode': 500, 'body': f"S3 へのアップロードに失敗しました: {str(e)}" } |
EventBridgeの設定
Lambdaをテストして問題なければ、定期実行するためにEventBridgeと連携します。
「トリガーを追加」ボタンをクリックし、ソースとして「EventBridge」を選択します。
今回は直前の1時間のデータを抽出するようにしたいので、少し時間をずらして毎時5分に実行するようにcron式を指定します。
動作確認
正常に実行できていれば、1時間ごとにS3バケットへログを抽出したテキストファイルが保存されます。
RDSのデフォルトではログのタイムスタンプがUTCになっているため、出力内容とログファイル名もUTC準拠となっています。
おわりに
このようにS3にデータを出力することで、外部システムからログデータを参照することが可能になります。
今回はユーザーの運用するログ監視サービスとの連携のため、このようなシステムをAWS内のサービスを組み合わせて構築しましたが、ログデータを長期保存したりローカルにダウンロードして保存する場合などにも利用できそうです。
抽出内容を工夫するといろいろ応用できそうですので、活用次第でいろいろなケースに対応できるかもしれません。