English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

Node.js + Redis Sorted Setでタスクキューを実現

要求:機能Aはデータを取得するために第三者APIを呼び出す必要がありますが、第三者API自体は非同期処理方式であり、呼び出しの後にはデータと状態{data: "クエリ結果", "status": "非同期処理中"}が返されます。このため、第三者APIが非同期処理中であるためユーザーが機能Aを使用する際に待つ必要があることを避けるために、ユーザーのリクエストをタスクキューに追加し、部分データを返し、リクエストを閉じます。その後、タスクキューからタスクを定期的に取得して第三者APIを呼び出します。返信状態が"非同期処理中"の場合、そのタスクを再度タスクキューに追加し、返信状態が"処理完了"の場合、データをデータベースに格納します。

以上の問題に対して、Node.jsの使用を思いつきました。 + Redis sorted setを使用してタスクキューを実現します。Node.jsは自身のアプリケーションAPIを実装し、ユーザーからのリクエストを受け取り、データベースに保存されているデータとAPIからの部分データをユーザーに返し、タスクをタスクキューに追加します。Node.jsのchild processとcronを使用して、タスクキューからタスクを定期的に取得して実行します。

タスクキューの設計プロセスにおいて考慮すべきいくつかの問題

  • 複数のタスクを並列に実行する
  • タスクのユニーク性
  • タスクの成功または失敗後の処理

以上の問題に対する解決策

  • 複数のタスクを並列に実行するためにはPromise.allを使用します。
  • タスクのユニーク性はRedis sorted setを使用して実現されます。タイムスタンプをスコアとして使用することで、sorted setをlistとして使用することができます。タスクを追加する際にタスクがすでに存在するかどうかを判断し、タスクを実行する際にそのスコアを0に設定することで、タスクの重複実行を避けることができます。
  • 执行任务成功后删除任务,执行任务失败后将任务分值更新为当前时间时间戳,这样就可以将失败的任务重新加入任务队列尾部

示例代码

// remote_api.js 模拟第三方 API
'use strict';
const app = require('express')();
app.get('/', (req, res) => {
  setTimeout(() => {
    let arr = [200, 300]; // 200 代表成功,300 代表失败需要重新请求
    res.status(200).send({ 'status': arr[parseInt(Math.random() * 2]);
  }, 3000);
});
app.listen('9001', () => {
  console.log('API 服务监听端口:9001);
});
// producer.js 自身应用 API,用来接受用户请求并将任务加入任务队列
'use strict';
const app = require('express')();
const redisClient = require('redis').createClient();
const QUEUE_NAME = 'queue:example';
function addTaskToQueue(taskName, callback) {
  // 先判断任务是否已经存在,存在:跳过,不存在:加入任务队列
  redisClient.zscore(QUEUE_NAME, taskName, (error, task) => {
    if (error) {
      console.log(error);
    }
      if (task) {
        console.log('任务已存在,不新增相同任务');
        callback(null, task);
      }
        redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
          if (error) {
            callback(error);
          }
            callback(null, result);
          }
        });
      }
    }
  });
}
app.get('/', (req, res) => {
  let taskName = req.query['task-name'];
  addTaskToQueue(taskName, (error, result) => {
    if (error) {
      console.log(error);
    }
      res.status(200).send('正在查询中......');
    }
  });
});
app.listen(9002, () => {
  console.log('生产者服务监听端口:9002);
});
// consumer.js 定时获取任务并执行
'use strict';
const redisClient = require('redis').createClient();
const request = require('request');
const schedule = require('node-schedule);
const QUEUE_NAME = 'queue:expmple';
const PARALLEL_TASK_NUMBER = 2; // タスクの並行実行数
function getTasksFromQueue(callback) {
  // 複数のタスクを取得
  redisClient.zrangebyscore([QUEUE_NAME, 1, new Date().getTime(), 'LIMIT', 0, PARALLEL_TASK_NUMBER], (error, tasks) => {
    if (error) {
      callback(error);
    }
      // タスクのスコアを0に設定して、処理中を表します
      if (tasks.length > 0) {
        let tmp = [];
        tasks.forEach((task) => {
          tmp.push(0);
          tmp.push(task);
        });
        redisClient.zadd([QUEUE_NAME].concat(tmp), (error, result) => {
          if (error) {
            callback(error);
          }
            callback(null, tasks);
          }
        });
      }
    }
  });
}
function addFailedTaskToQueue(taskName, callback) {
  redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
    if (error) {
      callback(error);
    }
      callback(null, result);
    }
  });
}
function removeSucceedTaskFromQueue(taskName, callback) {
  redisClient.zrem(QUEUE_NAME, taskName, (error, result) => {
    if (error) {
      callback(error);
    }
      callback(null, result);
    }
  }
}
function execTask(taskName) {
  return new Promise((resolve, reject) => {
    let requestOptions = {
      'url': 'http://127.0.0.1:9001',
      'method': 'GET',
      'timeout': 5000
    };
    request(requestOptions, (error, response, body) => {
      if (error) {
        resolve('failed');
        console.log(error);
        addFailedTaskToQueue(taskName, (error) => {
          if (error) {
            console.log(error);
          }
          }
        });
      }
        try {
          body = typeof body !== 'object' &&63; JSON.parse(body) : body;
        }
          resolve('failed');
          console.log(error);
          addFailedTaskToQueue(taskName, (error, result) => {
            if (error) {
              console.log(error);
            }
            }
          });
          return;
        }
        if (body.status !== 200) {
          resolve('failed');
          addFailedTaskToQueue(taskName, (error, result) => {
            if (error) {
              console.log(error);
            }
            }
          });
        }
          resolve('succeed');
          removeSucceedTaskFromQueue(taskName, (error, result) => {
            if (error) {
              console.log(error);
            }
            }
          });
        }
      }
    });
  });
}
// 定期的に、隔 5 秒ごとに新しいタスクを実行する
let job = schedule.scheduleJob('*/5 * * * * *', () => {
  console.log('新タスクの取得');
  getTasksFromQueue((error, tasks) => {
    if (error) {
      console.log(error);
    }
      if (tasks.length > 0) {
        console.log(tasks);
        Promise.all(tasks.map(execTask))
        .then((results) => {
          console.log(results);
        }
        .catch((error) => {
          console.log(error);
        });
      }
    }
  });
});
おすすめ