Lambda(node.js)のトリガーでS3にPUTされたCSVをRDSにインサートする – AWS SDK for JavaScript v3

Lambda(node.js)のトリガーでS3にPUTされたCSVをRDSにインサートする – AWS SDK for JavaScript v3

前提

  • RDSはパブリックアクセス可能,Aurora(MySQL)
  • Serverless Lambda
項目
文字コード utf-8
改行コード CRLF
csv

2行3列のcsv(headerあり)

Lambdaのトリガー設定

S3にPUTされたcsvをトリガーとして、そのタイミングでLambdaを実行することができます。

Lambdaのトリガー設定します。

Lambda(node.js)のトリガーでS3にPUTされたCSVをRDSにインサートする – AWS SDK for JavaScript v3

プレフィックスでオブジェクトキーを指定したり、サフィックスで拡張子を指定したりと細かい設定ができるようです。

使用するモジュールは以下です。

npm i @aws-sdk/client-s3 mysql2 csv-parse

Lambda(index.js)

Lambdaソースです。

import {S3Client, SelectObjectContentCommand} from '@aws-sdk/client-s3'
import * as mysql2  from 'mysql2/promise'
import {parse} from 'csv-parse/sync'
export async function handler(event, context) {
  // PUTされたバケット、オブジェクトキーが取得可能
  const bucket = event['Records'][0]['s3']['bucket']['name']
  const key = event['Records'][0]['s3']['object']['key']
  const input = {
    Bucket: bucket,
    Key: key,
    ExpressionType: 'SQL',
    Expression: 'SELECT s._1,s._2 FROM S3Object s',
    InputSerialization: {
      CSV: {
        FileHeaderInfo: 'IGNORE', // ヘッダ行無視
        RecordDelimiter: '\r\n',
        FieldDelimiter: ','
      }
    },
    OutputSerialization: {
      CSV: {}
    }
  };
  const client = new S3Client({
    region: 'ap-northeast-1'
  })
  try {
    // S3
    const command = new SelectObjectContentCommand(input)
    const data = await client.send(command)
    const csv = parse(await streamToCsv(data.Payload))

    // RDS
    const conn = await mysql2.createConnection({
      host:'データベースエンドポイント',
      user: 'ユーザ名',
      password: 'パスワード',
      database: 'データベース名'
  })
    for await (const records of csv) {
      await conn.query("insert into user(id,name) values(?,?)", [records[0],records[1]])
    }
    await conn.end()
  }catch (e) {
    console.log(e)
    return
  }
  const response = {
    statusCode: 200,
    body: JSON.stringify('Hello from Lambda!'),
  }
  return response
}
  
async function streamToCsv(generator) {
  const chunks = []
  for await (const value of generator) {
    if (value.Records) {
      chunks.push(value.Records.Payload);
    }
  }
  let payload = Buffer.concat(chunks).toString('utf8')
  return payload
}

トリガーとなるPUTを行います。

aws s3 cp testdata.csv s3://バケット名/tmp/testdata.json

正常にuploadされると、Lambdaが実行され、RDSのテーブルに2レコード追加されています。

Lambda(node.js)のトリガーでS3にPUTされたCSVをRDSにインサートする – AWS SDK for JavaScript v3

オブジェクトキー名の仕様

オブジェクトキー名に半角スペースが入ると、Lambdaのevent.Records[0].s3.object.keyでは+に変換されます。

ファイル名 Lambda
a b.csv a+b.csv

Lambdaでは「a+b.csv」となってCSV読み込むことできません。※半角スペースは特殊な処理を必要とする可能性がある文字

オブジェクトキー名にセーフ文字以外を使用するのは避けたほうが良いです。

ちなみにオブジェクトキー内に日本語を使用するとパーセントエンコーディングされるのでLambda側でdecodeURIしてあげる必要があります。

decodeURI(event.Records[0].s3.object.key) event.Records[0].s3.object.key
data/日本語.csv data/%E6%97%A5%E6%9C%AC%E8%AA%9E.csv
オブジェクトキー名の作成 - Amazon Simple Storage Service
オブジェクトキー (オブジェクトキー名) によって、Amazon S3 オブジェクトが一意に識別されます。

コメント

株式会社CONFRAGE ITソリューション事業部をもっと見る

今すぐ購読し、続きを読んで、すべてのアーカイブにアクセスしましょう。

続きを読む

タイトルとURLをコピーしました