English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية
要求:機能Aはデータを取得するために第三者APIを呼び出す必要がありますが、第三者API自体は非同期処理方式であり、呼び出しの後にはデータと状態{data: "クエリ結果", "status": "非同期処理中"}が返されます。このため、第三者APIが非同期処理中であるためユーザーが機能Aを使用する際に待つ必要があることを避けるために、ユーザーのリクエストをタスクキューに追加し、部分データを返し、リクエストを閉じます。その後、タスクキューからタスクを定期的に取得して第三者APIを呼び出します。返信状態が"非同期処理中"の場合、そのタスクを再度タスクキューに追加し、返信状態が"処理完了"の場合、データをデータベースに格納します。
以上の問題に対して、Node.jsの使用を思いつきました。 + Redis sorted setを使用してタスクキューを実現します。Node.jsは自身のアプリケーションAPIを実装し、ユーザーからのリクエストを受け取り、データベースに保存されているデータとAPIからの部分データをユーザーに返し、タスクをタスクキューに追加します。Node.jsのchild processとcronを使用して、タスクキューからタスクを定期的に取得して実行します。
タスクキューの設計プロセスにおいて考慮すべきいくつかの問題
以上の問題に対する解決策
示例代码
// remote_api.js 模拟第三方 API 'use strict'; const app = require('express')(); app.get('/', (req, res) => { setTimeout(() => { let arr = [200, 300]; // 200 代表成功,300 代表失败需要重新请求 res.status(200).send({ 'status': arr[parseInt(Math.random() * 2]); }, 3000); }); app.listen('9001', () => { console.log('API 服务监听端口:9001); }); // producer.js 自身应用 API,用来接受用户请求并将任务加入任务队列 'use strict'; const app = require('express')(); const redisClient = require('redis').createClient(); const QUEUE_NAME = 'queue:example'; function addTaskToQueue(taskName, callback) { // 先判断任务是否已经存在,存在:跳过,不存在:加入任务队列 redisClient.zscore(QUEUE_NAME, taskName, (error, task) => { if (error) { console.log(error); } if (task) { console.log('任务已存在,不新增相同任务'); callback(null, task); } redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => { if (error) { callback(error); } callback(null, result); } }); } } }); } app.get('/', (req, res) => { let taskName = req.query['task-name']; addTaskToQueue(taskName, (error, result) => { if (error) { console.log(error); } res.status(200).send('正在查询中......'); } }); }); app.listen(9002, () => { console.log('生产者服务监听端口:9002); }); // consumer.js 定时获取任务并执行 'use strict'; const redisClient = require('redis').createClient(); const request = require('request'); const schedule = require('node-schedule); const QUEUE_NAME = 'queue:expmple'; const PARALLEL_TASK_NUMBER = 2; // タスクの並行実行数 function getTasksFromQueue(callback) { // 複数のタスクを取得 redisClient.zrangebyscore([QUEUE_NAME, 1, new Date().getTime(), 'LIMIT', 0, PARALLEL_TASK_NUMBER], (error, tasks) => { if (error) { callback(error); } // タスクのスコアを0に設定して、処理中を表します if (tasks.length > 0) { let tmp = []; tasks.forEach((task) => { tmp.push(0); tmp.push(task); }); redisClient.zadd([QUEUE_NAME].concat(tmp), (error, result) => { if (error) { callback(error); } callback(null, tasks); } }); } } }); } function addFailedTaskToQueue(taskName, callback) { redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => { if (error) { callback(error); } callback(null, result); } }); } function removeSucceedTaskFromQueue(taskName, callback) { redisClient.zrem(QUEUE_NAME, taskName, (error, result) => { if (error) { callback(error); } callback(null, result); } } } function execTask(taskName) { return new Promise((resolve, reject) => { let requestOptions = { 'url': 'http://127.0.0.1:9001', 'method': 'GET', 'timeout': 5000 }; request(requestOptions, (error, response, body) => { if (error) { resolve('failed'); console.log(error); addFailedTaskToQueue(taskName, (error) => { if (error) { console.log(error); } } }); } try { body = typeof body !== 'object' &&63; JSON.parse(body) : body; } resolve('failed'); console.log(error); addFailedTaskToQueue(taskName, (error, result) => { if (error) { console.log(error); } } }); return; } if (body.status !== 200) { resolve('failed'); addFailedTaskToQueue(taskName, (error, result) => { if (error) { console.log(error); } } }); } resolve('succeed'); removeSucceedTaskFromQueue(taskName, (error, result) => { if (error) { console.log(error); } } }); } } }); }); } // 定期的に、隔 5 秒ごとに新しいタスクを実行する let job = schedule.scheduleJob('*/5 * * * * *', () => { console.log('新タスクの取得'); getTasksFromQueue((error, tasks) => { if (error) { console.log(error); } if (tasks.length > 0) { console.log(tasks); Promise.all(tasks.map(execTask)) .then((results) => { console.log(results); } .catch((error) => { console.log(error); }); } } }); });