Hướng dẫn Streaming SQL với Node.js

streaming sql

Sử dụng streaming SQL với Node.js là cách để bạn có thể tra từng hàng của bảng trong cơ sở dữ liệu và thao tác trên đó.

Các cơ sở dữ liệu SQL nói chung rất tuyệt vời trong việc xử lý lượng dữ liệu khổng lồ. Mình không nghĩ rằng đó là một sự cường điệu khi nói rằng hầu hết các công ty mà mình thấy sử dụng Hadoop cho hầu hết “mọi” bài toán về xử lý dữ liệu. Trong khi sẽ tốt hơn và tiết kiệm chi phí hơn khi chỉ cần thêm một chỉ mục hoặc RAM bổ sung vào máy chủ Postgres / MySQL của họ. Bạn có thể thực hiện lọc và tổng hợp khá tốt trong SQL. Nhưng đôi khi, những tác vụ mạnh mẽ này không hỗ trợ những thứ bạn cần. Trong một số trường hợp bạn sẽ muốn xử lý từng hàng trong node.js

Lặp với Async

Hãy tưởng tượng trong bài toán này:

  • Chúng ta có một database chứa một số lượng lớn IDs của Tweets từ Twitter. Ít nhất 100 ngàn.
  • Chúng ta yêu cầu số lượng likes của mỗi tweet từ Twitter.
  • Chúng ta muốn xử lý tác vụ này dưới nền và không ngốn quá nhiều bộ nhớ.

Chúng ta sẽ stream kết quả từ truy vấn của chúng ta để lấy ra từng hàng từ database. Ví dụ dưới đây sẽ hoạt động khá tốt, bất kể bạn muốn sử dụng  @databases/pg cho Postgres, @databases/mysql cho MySQL hoặc @databases/sqlite cho SQLite:

// thay thế để phù hợp với CSDL của bạn
import connect, {sql} from '@databases/pg';
const db = connect();

export default async function updateTweets() {
  const tweets = db.queryStream(
    sql`SELECT id, likes FROM tweets;`
  );

  for await (const tweet of tweets) {
    const likes = await getLikes(tweet.id);
    if (likes !== tweet.likes) {
      await db.query(
        sql`
          UPDATE tweets
          SET likes = ${likes}
          WHERE id = ${tweet.id};
        `
      );
    }
  }
}

Code này cần phải có phiên bản gần hoặc mới nhất của Node, hoặc sử dụng transpiler như là babel để hỗ trợ. Nó sẽ stream tweets từ CSDL, cho phép một buffer của một vài tweets trước khi bắt đầu làm chậm lại quá trình gửi dữ liệu. Một khi chúng ta nhận một tweet mới, và kết thúc quá trình xử lý tweet trước đó, chúng ta có thể xử lý nó.

Node.js Streams

Đây là phương án khác cho việc sử dụng streaming SQL với Node.js.

Tưởng tượng bạn cần xuất một bảng dữ liệu lớn ra file CSV để xử lý ở hệ thống khác. Ví dụ một phần mềm phân tích hoặc nhập dữ liệu vào một gói dịch vụ kế toán chẳng hạn. Bạn có thể chờ cho đến khi bạn lấy về hết tất cả các bản ghi, nhưng nó sẽ thuận tiên hơn khi chúng ta có thể bắt đầu gửi dữ liệu đi ngay khi nó sẵn sàng. Việc này thật sự hữu ích khi client tải tập tin về thông qua một kết nối internet chậm.

Ví dụ này cũng hoạt động tốt, bất kể là sử dụng @databases/pg cho Postgres, @databases/mysql cho MySQL. Tuy nhiên hiện tại thì nó chưa hỗ trợ SQLite.

// thay thế để phù hợp với CSDL của bạn
import connect, {sql} from '@databases/pg';
import {Map} from 'barrage';
import stringify from 'csv-stringify';

const db = connect();

export default function getTweetsCSV() {
  const tweets = db.queryNodeStream(
    sql`SELECT id, likes FROM tweets;`
  );
  const map = new Map(tweet => [tweet.id, tweet.likes]);
  const stringifier = stringify();
  stringifier.write(['Tweet ID', 'Likes']);

  tweets.pipe(map).pipe(stringifier);
  tweets.on('error', e => stringifier.emit('error', e);
  map.on('error', e => stringifier.emit('error', e);

  return stringifier;
}

Ở đây chúng ta yêu cầu tất cả tweets như một node.js Object Stream. Sau đó chúng ta phân phối stream đó vào một Map stream. Cuối cùng chúng ta sẽ phân phối mapped stream này vào csv-stringifier stream. Nó sẽ nhận một mảng cho mỗi dòng và xuất ra file CSV. Chúng ta còn có thể phân phối nó qua createGzip nếu client hỗ trợ gzip.

Kết luận

Lưu ý:

  • 99% công việc hằng ngày bạn làm, bạn sẽ không muốn streaming. Nó làm cho code khó đọc và theo dõi hơn và hầu như là hiếm khi bạn cần lấy về tất cả dữ liệu của bảng một lúc để xử lý. Vì vậy hãy sử dụng khi thật cần thiết!
  • Khi bạn muốn triển khai một tác vụ cần thay đổi mỗi hàng trong một bảng lớn, sử dụng vòng lặp với async (i.e. .queryStream)
  • Khi bạn muốn
  • Khi bạn muốn tuần tự hóa từng hàng trong một bảng lớn thành định dạng chuỗi / nhị phân, hãy sử dụng phương pháp node.js stream (i.e. .queryNodeStream).
>
Secured By miniOrange