Server/NodeJS & NestJS

NodeJS) Bulk Job 만들기 - async/await & Promise

Juzdalua 2023. 6. 15. 18:52

필수 지식

- for문은 비동기로 작동한다.

- forEach와 같은 배열 람다식은 동기로 작동한다.(순서를 기다리지 않음)

 => https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/forEach#description

- 동기작업(forEach등 람다식)은 프로세스가 따로 작동하므로 내부에 try/catch문을 작성해야한다.

- Promise 객체는 reject로 에러 핸들링을 할 수 있지만, async/await은 이를 위한 try/catch문이 필요하다.

- Promise 객체와 async/await 예약어는 동일하게 작동한다.

 

 

1. 시간차를 두기 위한 setTimeout() 이해하기

const sleep = (delay: number): Promise<number> => {
    return new Promise((resolve, reject) => {
      setTimeout(() => {
        const a = Math.random();
        // console.log(a);
        if (a < 0.5) {
          resolve(a);
        } else {
          resolve(a);
          // reject(a);
        }
      }, delay);
    });
  };

setTimeout 함수는 Promise를 리턴하지 않는다. === async/await이 작동하지 않는다.

위 sleep 함수에서 제네릭 number는 프라미스 객체의 resolve 값이다.

resolve 함수는 1개의 인자만 소유할 수 있다.

 

// 동기로 작동. 동시 작업
for (let i = 0; i < 5; i++) {
  sleep(500);
}
  
// 비동기로 순차 작동. 딜레이 존재
for (let i = 0; i < 5; i++) {
  await sleep(500);
}

시간차를 두기 위한 벌크잡에서는 비동기로 sleep을 실행시켜야 함을 확인했다.

 

1-1 Promise 객체

https://ko.javascript.info/promise-basics

딜레이가 없는 벌크라도 Promise 객체를 이용해야 한다.

let promise = new Promise(function(resolve, reject) {
  // executor (제작 코드, '가수')
});

Promise 객체는 생성됨과 동시에 excutor를 실행하기 때문이다.

벌크로 돌릴 잡들을 한 번에 실행시켜야 하기 때문.

 

2. 벌크 구성하기

위 슬립 함수를 동시에 5 묶음으로 여러번 동작시키려 한다.

sleep함수가 신규 유저 가입을 위한 insert문을 추가하는 부분이라고 가정해보자.

sleep5함수는 sleep함수 5번을 실행하고 db 커넥션이 이루어지는 작업을 수행한다고 가정해보자.

신규 유저를 1명씩 5번 가입하기 위한 5번의 커넥션을 만들기 보다, 5명이 한번에 가입하는 1번의 커넥션이 속도와 안정성에서 유리하기 때문이다.

 

const sleep5 = (): Promise<void> => {
    console.log('sleep5');
    return new Promise(async (resolve) => {
      let i = 0;
      [1, 2, 3, 4, 5].forEach(async () => {
        const data = await sleep(500);
        console.log(data);
        i++;
        if (i == 5) {
          resolve();
        }
      });
      // Promise.all([sleep(500), sleep(500), sleep(500), sleep(500), sleep(500)])
      //   .then((e) => {
      //     console.log("t",e);
      //     resolve();
      //   })
      //   .catch((e) => {
      //     console.log("c",e);
      //     resolve();
      //   });
    });
  };

** 건너뛰기: 주석된 Promise.all과 그 위에 작성한 Promise 객체를 리턴하는 코드는 동일한 결과를 지닌다.

 

1번에서 만든 sleep함수 제네릭은 number고, 랜덤 난수를 리턴한다.

sleep함수를 5번 실행하는 sleep5 함수의 제네릭은 void이고 아무런 결과를 리턴하지 않는 Promise 객체를 의미한다.

즉, 비동기 작업 속에 동기 작업을 수행하기 위한 구조이다. === 동기작업을 비동기로 5번 수행.

 

이제 벌크잡을 실행해보자.

for (let i = 0; i < 5; i++) {
    await sleep5();
}

 

sleep5함수를 실행하면 시작을 알리는 콘솔이 한 번 찍힌다.

이후 forEach 람다식은 동기로 작업하므로 5번의 sleep함수를 동시에 작업한다.

5번의 작업을 마친 후, 구분자 i 인덱스를 활용하여 객체를 리턴하고 종료시킨다.

sleep5
0.45940343875768264
0.1708248878672014
0.9863943618913773
0.4618988786825444
0.45439255186649574
sleep5
0.7722210622675199
0.5656775390729147
0.3125441099450037
0.9556567913229397
0.46292910824899747
sleep5
0.06999213425326811
0.6407650426856057
0.8335189076661373
0.6780495480576041
0.8486709155688061
sleep5
0.1855098435979703
0.5843424069041712
0.35357378442397813
0.4566282909095911
0.2127173059032803
sleep5
0.46981647370005275
0.13318007260648979
0.46208526757055646
0.13453059929381328
0.41354051134522396

sleep5 함수가 0.5초마다 실행된 결과이다.

5번씩 출력되는 부분을 insert문에서 append하는 코드로 바꾼다면 5명의 유저를 한번에 가입시키는 플로우가 5번 일어나고, 5번의 커넥션으로 25명의 유저가 가입하는 설계가 구성됐다.

 

1번에서 주석된 reject를 살펴보자.

마지막 출력 결과인 5번씩 실행되는 sleep5함수에서 5번 중 1번이라도 sleep 함수가 reject를 반환한다면 프로세스는 죽게된다.

에러핸들링은 매우 중요하다.

 

3. 모든 유저에게 공지사항 보내기

const users = await this.model.User.getAllUser();

const createScheduleMail = async (user: User): Promise<any> => {
    const notifications = await this.model.Notification.getNotificationFromUserId(user.id, period);

    const mailStruct: ScheduleMailStruct = {
      rows: notifications.contents,
      title: notifications.subject,
      footer: true,
      header: true
    };

    return {
      type: 'notification',
      email: user.email,
      subject: notifications.subject,
      mailStruct: mailStruct,
      period: 1
    };
};

const bulkCreateMail = async (users: User[]): Promise<void> => {
    const list: MailContents[] = [];
    let i = 0;
    return new Promise((resolve, reject) => {
      users.forEach(async (user) => {
        const item = await createScheduleMail(user); 
        list.push(item); // insert문 append
        i++;
        if (i == users.length) {
          await this.model.ScheduleMail.createScheduleMailBulk(list); // 공지사항 발송
          resolve(); // 종료
        }
      });
    });
};

logger.info(`Number of Users`, users.length);

const bulkLimit = 500;
const round = Math.ceil(users.length / bulkLimit);
for (let i = 1; i < round + 1; i++) {
logger.info(round, (i - 1) * bulkLimit, '~', i * bulkLimit, users.slice((i - 1) * bulkLimit, i * bulkLimit).length);
await bulkCreateMail(users.slice((i - 1) * bulkLimit, i * bulkLimit)); // 500명씩 1번 작업
}

여러 묶음을 동기작업으로 수행하는 하나의 함수를 만든다.

그 함수를 비동기로 여러번 동작시킨다.